airbyte_cdk.sources.declarative.retrievers.file_uploader
1from .connector_builder_file_uploader import ConnectorBuilderFileUploader 2from .default_file_uploader import DefaultFileUploader 3from .file_uploader import FileUploader 4from .file_writer import FileWriter 5from .local_file_system_file_writer import LocalFileSystemFileWriter 6from .noop_file_writer import NoopFileWriter 7 8__all__ = [ 9 "DefaultFileUploader", 10 "LocalFileSystemFileWriter", 11 "NoopFileWriter", 12 "ConnectorBuilderFileUploader", 13 "FileUploader", 14 "FileWriter", 15]
@dataclass
class
DefaultFileUploader32@dataclass 33class DefaultFileUploader(FileUploader): 34 """ 35 File uploader class 36 Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write() 37 Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies. 38 """ 39 40 requester: Requester 41 download_target_extractor: RecordExtractor 42 config: Config 43 file_writer: FileWriter 44 parameters: InitVar[Mapping[str, Any]] 45 46 filename_extractor: Optional[Union[InterpolatedString, str]] = None 47 48 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 49 if self.filename_extractor: 50 self.filename_extractor = InterpolatedString.create( 51 self.filename_extractor, 52 parameters=parameters, 53 ) 54 55 def upload(self, record: Record) -> None: 56 mocked_response = SafeResponse() 57 mocked_response.content = json.dumps(record.data).encode() 58 download_targets = list(self.download_target_extractor.extract_records(mocked_response)) 59 if not download_targets: 60 raise ValueError("No download targets found") 61 62 download_target = download_targets[0] # we just expect one download target 63 if not isinstance(download_target, str): 64 raise ValueError( 65 f"download_target is expected to be a str but was {type(download_target)}: {download_target}" 66 ) 67 68 response = self.requester.send_request( 69 stream_slice=StreamSlice( 70 partition={}, cursor_slice={}, extra_fields={"download_target": download_target} 71 ), 72 ) 73 74 files_directory = Path(get_files_directory()) 75 76 file_name = ( 77 self.filename_extractor.eval(self.config, record=record) 78 if self.filename_extractor 79 else str(uuid.uuid4()) 80 ) 81 file_name = file_name.lstrip("/") 82 file_relative_path = Path(record.stream_name) / Path(file_name) 83 84 full_path = files_directory / file_relative_path 85 full_path.parent.mkdir(parents=True, exist_ok=True) 86 87 file_size_bytes = self.file_writer.write(full_path, content=response.content) 88 89 logger.info("File uploaded successfully") 90 logger.info(f"File url: {str(full_path)}") 91 logger.info(f"File size: {file_size_bytes / 1024} KB") 92 logger.info(f"File relative path: {str(file_relative_path)}") 93 94 record.file_reference = AirbyteRecordMessageFileReference( 95 staging_file_url=str(full_path), 96 source_file_relative_path=str(file_relative_path), 97 file_size_bytes=file_size_bytes, 98 )
File uploader class Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write() Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies.
DefaultFileUploader( requester: airbyte_cdk.Requester, download_target_extractor: airbyte_cdk.RecordExtractor, config: Mapping[str, Any], file_writer: FileWriter, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], filename_extractor: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None)
requester: airbyte_cdk.Requester
download_target_extractor: airbyte_cdk.RecordExtractor
file_writer: FileWriter
55 def upload(self, record: Record) -> None: 56 mocked_response = SafeResponse() 57 mocked_response.content = json.dumps(record.data).encode() 58 download_targets = list(self.download_target_extractor.extract_records(mocked_response)) 59 if not download_targets: 60 raise ValueError("No download targets found") 61 62 download_target = download_targets[0] # we just expect one download target 63 if not isinstance(download_target, str): 64 raise ValueError( 65 f"download_target is expected to be a str but was {type(download_target)}: {download_target}" 66 ) 67 68 response = self.requester.send_request( 69 stream_slice=StreamSlice( 70 partition={}, cursor_slice={}, extra_fields={"download_target": download_target} 71 ), 72 ) 73 74 files_directory = Path(get_files_directory()) 75 76 file_name = ( 77 self.filename_extractor.eval(self.config, record=record) 78 if self.filename_extractor 79 else str(uuid.uuid4()) 80 ) 81 file_name = file_name.lstrip("/") 82 file_relative_path = Path(record.stream_name) / Path(file_name) 83 84 full_path = files_directory / file_relative_path 85 full_path.parent.mkdir(parents=True, exist_ok=True) 86 87 file_size_bytes = self.file_writer.write(full_path, content=response.content) 88 89 logger.info("File uploaded successfully") 90 logger.info(f"File url: {str(full_path)}") 91 logger.info(f"File size: {file_size_bytes / 1024} KB") 92 logger.info(f"File relative path: {str(file_relative_path)}") 93 94 record.file_reference = AirbyteRecordMessageFileReference( 95 staging_file_url=str(full_path), 96 source_file_relative_path=str(file_relative_path), 97 file_size_bytes=file_size_bytes, 98 )
Uploads the file to the specified location
class
LocalFileSystemFileWriter(airbyte_cdk.sources.declarative.retrievers.file_uploader.FileWriter):
11class LocalFileSystemFileWriter(FileWriter): 12 def write(self, file_path: Path, content: bytes) -> int: 13 """ 14 Writes the file to the specified location 15 """ 16 with open(str(file_path), "wb") as f: 17 f.write(content) 18 19 return file_path.stat().st_size
Base File writer class
def
write(self, file_path: pathlib.Path, content: bytes) -> int:
12 def write(self, file_path: Path, content: bytes) -> int: 13 """ 14 Writes the file to the specified location 15 """ 16 with open(str(file_path), "wb") as f: 17 f.write(content) 18 19 return file_path.stat().st_size
Writes the file to the specified location
11class NoopFileWriter(FileWriter): 12 NOOP_FILE_SIZE = -1 13 14 def write(self, file_path: Path, content: bytes) -> int: 15 """ 16 Noop file writer 17 """ 18 return self.NOOP_FILE_SIZE
Base File writer class
@dataclass
class
ConnectorBuilderFileUploader14@dataclass 15class ConnectorBuilderFileUploader(FileUploader): 16 """ 17 Connector builder file uploader 18 Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data. 19 """ 20 21 file_uploader: DefaultFileUploader 22 23 def upload(self, record: Record) -> None: 24 self.file_uploader.upload(record=record) 25 for file_reference_key, file_reference_value in record.file_reference.__dict__.items(): 26 if not file_reference_key.startswith("_"): 27 record.data[file_reference_key] = file_reference_value # type: ignore
Connector builder file uploader Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data.
ConnectorBuilderFileUploader( file_uploader: DefaultFileUploader)
file_uploader: DefaultFileUploader
23 def upload(self, record: Record) -> None: 24 self.file_uploader.upload(record=record) 25 for file_reference_key, file_reference_value in record.file_reference.__dict__.items(): 26 if not file_reference_key.startswith("_"): 27 record.data[file_reference_key] = file_reference_value # type: ignore
Uploads the file to the specified location
@dataclass
class
FileUploader12@dataclass 13class FileUploader(ABC): 14 """ 15 Base class for file uploader 16 """ 17 18 @abstractmethod 19 def upload(self, record: Record) -> None: 20 """ 21 Uploads the file to the specified location 22 """ 23 ...
Base class for file uploader
class
FileWriter(abc.ABC):
10class FileWriter(ABC): 11 """ 12 Base File writer class 13 """ 14 15 @abstractmethod 16 def write(self, file_path: Path, content: bytes) -> int: 17 """ 18 Writes the file to the specified location 19 """ 20 ...
Base File writer class