airbyte_cdk.sources.declarative.retrievers.file_uploader

 1from .connector_builder_file_uploader import ConnectorBuilderFileUploader
 2from .default_file_uploader import DefaultFileUploader
 3from .file_uploader import FileUploader
 4from .file_writer import FileWriter
 5from .local_file_system_file_writer import LocalFileSystemFileWriter
 6from .noop_file_writer import NoopFileWriter
 7
 8__all__ = [
 9    "DefaultFileUploader",
10    "LocalFileSystemFileWriter",
11    "NoopFileWriter",
12    "ConnectorBuilderFileUploader",
13    "FileUploader",
14    "FileWriter",
15]
@dataclass
class DefaultFileUploader(airbyte_cdk.sources.declarative.retrievers.file_uploader.FileUploader):
32@dataclass
33class DefaultFileUploader(FileUploader):
34    """
35    File uploader class
36    Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write()
37    Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies.
38    """
39
40    requester: Requester
41    download_target_extractor: RecordExtractor
42    config: Config
43    file_writer: FileWriter
44    parameters: InitVar[Mapping[str, Any]]
45
46    filename_extractor: Optional[Union[InterpolatedString, str]] = None
47
48    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
49        if self.filename_extractor:
50            self.filename_extractor = InterpolatedString.create(
51                self.filename_extractor,
52                parameters=parameters,
53            )
54
55    def upload(self, record: Record) -> None:
56        mocked_response = SafeResponse()
57        mocked_response.content = json.dumps(record.data).encode()
58        download_targets = list(self.download_target_extractor.extract_records(mocked_response))
59        if not download_targets:
60            raise ValueError("No download targets found")
61
62        download_target = download_targets[0]  # we just expect one download target
63        if not isinstance(download_target, str):
64            raise ValueError(
65                f"download_target is expected to be a str but was {type(download_target)}: {download_target}"
66            )
67
68        response = self.requester.send_request(
69            stream_slice=StreamSlice(
70                partition={}, cursor_slice={}, extra_fields={"download_target": download_target}
71            ),
72        )
73
74        files_directory = Path(get_files_directory())
75
76        file_name = (
77            self.filename_extractor.eval(self.config, record=record)
78            if self.filename_extractor
79            else str(uuid.uuid4())
80        )
81        file_name = file_name.lstrip("/")
82        file_relative_path = Path(record.stream_name) / Path(file_name)
83
84        full_path = files_directory / file_relative_path
85        full_path.parent.mkdir(parents=True, exist_ok=True)
86
87        file_size_bytes = self.file_writer.write(full_path, content=response.content)
88
89        logger.info("File uploaded successfully")
90        logger.info(f"File url: {str(full_path)}")
91        logger.info(f"File size: {file_size_bytes / 1024} KB")
92        logger.info(f"File relative path: {str(file_relative_path)}")
93
94        record.file_reference = AirbyteRecordMessageFileReference(
95            staging_file_url=str(full_path),
96            source_file_relative_path=str(file_relative_path),
97            file_size_bytes=file_size_bytes,
98        )

File uploader class Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write() Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies.

DefaultFileUploader( requester: airbyte_cdk.Requester, download_target_extractor: airbyte_cdk.RecordExtractor, config: Mapping[str, Any], file_writer: FileWriter, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], filename_extractor: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None)
download_target_extractor: airbyte_cdk.RecordExtractor
config: Mapping[str, Any]
file_writer: FileWriter
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
filename_extractor: Union[airbyte_cdk.InterpolatedString, str, NoneType] = None
def upload(self, record: airbyte_cdk.Record) -> None:
55    def upload(self, record: Record) -> None:
56        mocked_response = SafeResponse()
57        mocked_response.content = json.dumps(record.data).encode()
58        download_targets = list(self.download_target_extractor.extract_records(mocked_response))
59        if not download_targets:
60            raise ValueError("No download targets found")
61
62        download_target = download_targets[0]  # we just expect one download target
63        if not isinstance(download_target, str):
64            raise ValueError(
65                f"download_target is expected to be a str but was {type(download_target)}: {download_target}"
66            )
67
68        response = self.requester.send_request(
69            stream_slice=StreamSlice(
70                partition={}, cursor_slice={}, extra_fields={"download_target": download_target}
71            ),
72        )
73
74        files_directory = Path(get_files_directory())
75
76        file_name = (
77            self.filename_extractor.eval(self.config, record=record)
78            if self.filename_extractor
79            else str(uuid.uuid4())
80        )
81        file_name = file_name.lstrip("/")
82        file_relative_path = Path(record.stream_name) / Path(file_name)
83
84        full_path = files_directory / file_relative_path
85        full_path.parent.mkdir(parents=True, exist_ok=True)
86
87        file_size_bytes = self.file_writer.write(full_path, content=response.content)
88
89        logger.info("File uploaded successfully")
90        logger.info(f"File url: {str(full_path)}")
91        logger.info(f"File size: {file_size_bytes / 1024} KB")
92        logger.info(f"File relative path: {str(file_relative_path)}")
93
94        record.file_reference = AirbyteRecordMessageFileReference(
95            staging_file_url=str(full_path),
96            source_file_relative_path=str(file_relative_path),
97            file_size_bytes=file_size_bytes,
98        )

Uploads the file to the specified location

class LocalFileSystemFileWriter(airbyte_cdk.sources.declarative.retrievers.file_uploader.FileWriter):
11class LocalFileSystemFileWriter(FileWriter):
12    def write(self, file_path: Path, content: bytes) -> int:
13        """
14        Writes the file to the specified location
15        """
16        with open(str(file_path), "wb") as f:
17            f.write(content)
18
19        return file_path.stat().st_size

Base File writer class

def write(self, file_path: pathlib.Path, content: bytes) -> int:
12    def write(self, file_path: Path, content: bytes) -> int:
13        """
14        Writes the file to the specified location
15        """
16        with open(str(file_path), "wb") as f:
17            f.write(content)
18
19        return file_path.stat().st_size

Writes the file to the specified location

11class NoopFileWriter(FileWriter):
12    NOOP_FILE_SIZE = -1
13
14    def write(self, file_path: Path, content: bytes) -> int:
15        """
16        Noop file writer
17        """
18        return self.NOOP_FILE_SIZE

Base File writer class

NOOP_FILE_SIZE = -1
def write(self, file_path: pathlib.Path, content: bytes) -> int:
14    def write(self, file_path: Path, content: bytes) -> int:
15        """
16        Noop file writer
17        """
18        return self.NOOP_FILE_SIZE

Noop file writer

@dataclass
class ConnectorBuilderFileUploader(airbyte_cdk.sources.declarative.retrievers.file_uploader.FileUploader):
14@dataclass
15class ConnectorBuilderFileUploader(FileUploader):
16    """
17    Connector builder file uploader
18    Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data.
19    """
20
21    file_uploader: DefaultFileUploader
22
23    def upload(self, record: Record) -> None:
24        self.file_uploader.upload(record=record)
25        for file_reference_key, file_reference_value in record.file_reference.__dict__.items():
26            if not file_reference_key.startswith("_"):
27                record.data[file_reference_key] = file_reference_value  # type: ignore

Connector builder file uploader Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data.

ConnectorBuilderFileUploader( file_uploader: DefaultFileUploader)
file_uploader: DefaultFileUploader
def upload(self, record: airbyte_cdk.Record) -> None:
23    def upload(self, record: Record) -> None:
24        self.file_uploader.upload(record=record)
25        for file_reference_key, file_reference_value in record.file_reference.__dict__.items():
26            if not file_reference_key.startswith("_"):
27                record.data[file_reference_key] = file_reference_value  # type: ignore

Uploads the file to the specified location

@dataclass
class FileUploader(abc.ABC):
12@dataclass
13class FileUploader(ABC):
14    """
15    Base class for file uploader
16    """
17
18    @abstractmethod
19    def upload(self, record: Record) -> None:
20        """
21        Uploads the file to the specified location
22        """
23        ...

Base class for file uploader

@abstractmethod
def upload(self, record: airbyte_cdk.Record) -> None:
18    @abstractmethod
19    def upload(self, record: Record) -> None:
20        """
21        Uploads the file to the specified location
22        """
23        ...

Uploads the file to the specified location

class FileWriter(abc.ABC):
10class FileWriter(ABC):
11    """
12    Base File writer class
13    """
14
15    @abstractmethod
16    def write(self, file_path: Path, content: bytes) -> int:
17        """
18        Writes the file to the specified location
19        """
20        ...

Base File writer class

@abstractmethod
def write(self, file_path: pathlib.Path, content: bytes) -> int:
15    @abstractmethod
16    def write(self, file_path: Path, content: bytes) -> int:
17        """
18        Writes the file to the specified location
19        """
20        ...

Writes the file to the specified location