airbyte_cdk.sources.file_based

 1from .config.abstract_file_based_spec import AbstractFileBasedSpec
 2from .config.csv_format import CsvFormat
 3from .config.file_based_stream_config import FileBasedStreamConfig
 4from .config.jsonl_format import JsonlFormat
 5from .exceptions import CustomFileBasedException, ErrorListingFiles, FileBasedSourceError
 6from .file_based_source import DEFAULT_CONCURRENCY, FileBasedSource
 7from .file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
 8from .remote_file import RemoteFile
 9from .stream.cursor import DefaultFileBasedCursor
10
11__all__ = [
12    "AbstractFileBasedSpec",
13    "AbstractFileBasedStreamReader",
14    "CsvFormat",
15    "CustomFileBasedException",
16    "DefaultFileBasedCursor",
17    "ErrorListingFiles",
18    "FileBasedSource",
19    "FileBasedSourceError",
20    "FileBasedStreamConfig",
21    "FileReadMode",
22    "JsonlFormat",
23    "RemoteFile",
24]
class AbstractFileBasedSpec(pydantic.v1.main.BaseModel):
 48class AbstractFileBasedSpec(BaseModel):
 49    """
 50    Used during spec; allows the developer to configure the cloud provider specific options
 51    that are needed when users configure a file-based source.
 52    """
 53
 54    start_date: Optional[str] = Field(
 55        title="Start Date",
 56        description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.",
 57        examples=["2021-01-01T00:00:00.000000Z"],
 58        format="date-time",
 59        pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$",
 60        pattern_descriptor="YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
 61        order=1,
 62    )
 63
 64    streams: List[FileBasedStreamConfig] = Field(
 65        title="The list of streams to sync",
 66        description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
 67        order=10,
 68    )
 69
 70    delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field(
 71        title="Delivery Method",
 72        discriminator="delivery_type",
 73        type="object",
 74        order=7,
 75        display_type="radio",
 76        group="advanced",
 77        default="use_records_transfer",
 78        airbyte_hidden=True,
 79    )
 80
 81    @classmethod
 82    @abstractmethod
 83    def documentation_url(cls) -> AnyUrl:
 84        """
 85        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
 86        """
 87
 88    @classmethod
 89    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
 90        """
 91        Generates the mapping comprised of the config fields
 92        """
 93        schema = super().schema(*args, **kwargs)
 94        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
 95        schema_helpers.expand_refs(transformed_schema)
 96        cls.replace_enum_allOf_and_anyOf(transformed_schema)
 97        cls.remove_discriminator(transformed_schema)
 98
 99        return transformed_schema
100
101    @staticmethod
102    def remove_discriminator(schema: Dict[str, Any]) -> None:
103        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
104        dpath.delete(schema, "properties/**/discriminator")
105
106    @staticmethod
107    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
108        """
109        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
110        Unpacks the enums under allOf and moves them up a level under the enum key
111        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
112        additional validation that an incoming config only matches exactly one of a field's types.
113        """
114        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
115        objects_to_check["type"] = "object"
116        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
117        for format in objects_to_check["oneOf"]:
118            for key in format["properties"]:
119                object_property = format["properties"][key]
120                AbstractFileBasedSpec.move_enum_to_root(object_property)
121
122        properties_to_change = ["validation_policy"]
123        for property_to_change in properties_to_change:
124            property_object = schema["properties"]["streams"]["items"]["properties"][
125                property_to_change
126            ]
127            if "anyOf" in property_object:
128                schema["properties"]["streams"]["items"]["properties"][property_to_change][
129                    "type"
130                ] = "object"
131                schema["properties"]["streams"]["items"]["properties"][property_to_change][
132                    "oneOf"
133                ] = property_object.pop("anyOf")
134            AbstractFileBasedSpec.move_enum_to_root(property_object)
135
136        csv_format_schemas = list(
137            filter(
138                lambda format: format["properties"]["filetype"]["default"] == "csv",
139                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
140            )
141        )
142        if len(csv_format_schemas) != 1:
143            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
144        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
145            "properties"
146        ]["header_definition"].pop("anyOf", [])
147        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
148        return schema
149
150    @staticmethod
151    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
152        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
153            object_property["enum"] = object_property["allOf"][0]["enum"]
154            object_property.pop("allOf")

Used during spec; allows the developer to configure the cloud provider specific options that are needed when users configure a file-based source.

start_date: Optional[str]
streams: List[FileBasedStreamConfig]
@classmethod
@abstractmethod
def documentation_url(cls) -> pydantic.v1.networks.AnyUrl:
81    @classmethod
82    @abstractmethod
83    def documentation_url(cls) -> AnyUrl:
84        """
85        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
86        """
Returns

link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"

@classmethod
def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
88    @classmethod
89    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
90        """
91        Generates the mapping comprised of the config fields
92        """
93        schema = super().schema(*args, **kwargs)
94        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
95        schema_helpers.expand_refs(transformed_schema)
96        cls.replace_enum_allOf_and_anyOf(transformed_schema)
97        cls.remove_discriminator(transformed_schema)
98
99        return transformed_schema

Generates the mapping comprised of the config fields

@staticmethod
def remove_discriminator(schema: Dict[str, Any]) -> None:
101    @staticmethod
102    def remove_discriminator(schema: Dict[str, Any]) -> None:
103        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
104        dpath.delete(schema, "properties/**/discriminator")

pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references

@staticmethod
def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
106    @staticmethod
107    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
108        """
109        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
110        Unpacks the enums under allOf and moves them up a level under the enum key
111        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
112        additional validation that an incoming config only matches exactly one of a field's types.
113        """
114        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
115        objects_to_check["type"] = "object"
116        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
117        for format in objects_to_check["oneOf"]:
118            for key in format["properties"]:
119                object_property = format["properties"][key]
120                AbstractFileBasedSpec.move_enum_to_root(object_property)
121
122        properties_to_change = ["validation_policy"]
123        for property_to_change in properties_to_change:
124            property_object = schema["properties"]["streams"]["items"]["properties"][
125                property_to_change
126            ]
127            if "anyOf" in property_object:
128                schema["properties"]["streams"]["items"]["properties"][property_to_change][
129                    "type"
130                ] = "object"
131                schema["properties"]["streams"]["items"]["properties"][property_to_change][
132                    "oneOf"
133                ] = property_object.pop("anyOf")
134            AbstractFileBasedSpec.move_enum_to_root(property_object)
135
136        csv_format_schemas = list(
137            filter(
138                lambda format: format["properties"]["filetype"]["default"] == "csv",
139                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
140            )
141        )
142        if len(csv_format_schemas) != 1:
143            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
144        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
145            "properties"
146        ]["header_definition"].pop("anyOf", [])
147        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
148        return schema

allOfs are not supported by the UI, but pydantic is automatically writing them for enums. Unpacks the enums under allOf and moves them up a level under the enum key anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the additional validation that an incoming config only matches exactly one of a field's types.

@staticmethod
def move_enum_to_root(object_property: Dict[str, Any]) -> None:
150    @staticmethod
151    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
152        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
153            object_property["enum"] = object_property["allOf"][0]["enum"]
154            object_property.pop("allOf")
class AbstractFileBasedStreamReader(abc.ABC):
 35class AbstractFileBasedStreamReader(ABC):
 36    DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
 37    FILE_RELATIVE_PATH = "file_relative_path"
 38    FILE_NAME = "file_name"
 39    LOCAL_FILE_PATH = "local_file_path"
 40    FILE_FOLDER = "file_folder"
 41    FILE_SIZE_LIMIT = 1_500_000_000
 42
 43    def __init__(self) -> None:
 44        self._config = None
 45
 46    @property
 47    def config(self) -> Optional[AbstractFileBasedSpec]:
 48        return self._config
 49
 50    @config.setter
 51    @abstractmethod
 52    def config(self, value: AbstractFileBasedSpec) -> None:
 53        """
 54        FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.
 55
 56        Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader
 57        will require keys that (for example) allow it to authenticate with the 3rd party.
 58
 59        Therefore, concrete implementations of AbstractFileBasedStreamReader's config setter should assert that `value` is of the correct
 60        config type for that type of StreamReader.
 61        """
 62        ...
 63
 64    @abstractmethod
 65    def open_file(
 66        self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger
 67    ) -> IOBase:
 68        """
 69        Return a file handle for reading.
 70
 71        Many sources will be able to use smart_open to implement this method,
 72        for example:
 73
 74        client = boto3.Session(...)
 75        return smart_open.open(remote_file.uri, transport_params={"client": client})
 76        """
 77        ...
 78
 79    @abstractmethod
 80    def get_matching_files(
 81        self,
 82        globs: List[str],
 83        prefix: Optional[str],
 84        logger: logging.Logger,
 85    ) -> Iterable[RemoteFile]:
 86        """
 87        Return all files that match any of the globs.
 88
 89        Example:
 90
 91        The source has files "a.json", "foo/a.json", "foo/bar/a.json"
 92
 93        If globs = ["*.json"] then this method returns ["a.json"].
 94
 95        If globs = ["foo/*.json"] then this method returns ["foo/a.json"].
 96
 97        Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs`
 98        are available, which may be helpful when implementing this method.
 99        """
100        ...
101
102    def filter_files_by_globs_and_start_date(
103        self, files: List[RemoteFile], globs: List[str]
104    ) -> Iterable[RemoteFile]:
105        """
106        Utility method for filtering files based on globs.
107        """
108        start_date = (
109            datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT)
110            if self.config and self.config.start_date
111            else None
112        )
113        seen = set()
114
115        for file in files:
116            if self.file_matches_globs(file, globs):
117                if file.uri not in seen and (not start_date or file.last_modified >= start_date):
118                    seen.add(file.uri)
119                    yield file
120
121    @staticmethod
122    def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool:
123        # Use the GLOBSTAR flag to enable recursive ** matching
124        # (https://facelessuser.github.io/wcmatch/wcmatch/#globstar)
125        return any(globmatch(file.uri, g, flags=GLOBSTAR) for g in globs)
126
127    @staticmethod
128    def get_prefixes_from_globs(globs: List[str]) -> Set[str]:
129        """
130        Utility method for extracting prefixes from the globs.
131        """
132        prefixes = {glob.split("*")[0] for glob in globs}
133        return set(filter(lambda x: bool(x), prefixes))
134
135    def use_file_transfer(self) -> bool:
136        if self.config:
137            return use_file_transfer(self.config)
138        return False
139
140    def preserve_directory_structure(self) -> bool:
141        # fall back to preserve subdirectories if config is not present or incomplete
142        if self.config:
143            return preserve_directory_structure(self.config)
144        return True
145
146    def include_identities_stream(self) -> bool:
147        if self.config:
148            return include_identities_stream(self.config)
149        return False
150
151    def upload(
152        self, file: UploadableRemoteFile, local_directory: str, logger: logging.Logger
153    ) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
154        """
155        This is required for connectors that will support writing to
156        files. It will handle the logic to download,get,read,acquire or
157        whatever is more efficient to get a file from the source.
158
159        Args:
160               file (RemoteFile): The remote file object containing URI and metadata.
161               local_directory (str): The local directory path where the file will be downloaded.
162               logger (logging.Logger): Logger for logging information and errors.
163
164           Returns:
165               AirbyteRecordMessageFileReference: A file reference object containing:
166                   - staging_file_url (str): The absolute path to the referenced file in the staging area.
167                   - file_size_bytes (int): The size of the referenced file in bytes.
168                   - source_file_relative_path (str): The relative path to the referenced file in source.
169        """
170        if not isinstance(file, UploadableRemoteFile):
171            raise TypeError(f"Expected UploadableRemoteFile, got {type(file)}")
172
173        file_size = file.size
174
175        if file_size > self.FILE_SIZE_LIMIT:
176            message = (
177                f"File size exceeds the {self.FILE_SIZE_LIMIT / 1e9} GB limit. File URI: {file.uri}"
178            )
179            raise FileSizeLimitError(
180                message=message, internal_message=message, failure_type=FailureType.config_error
181            )
182
183        file_paths = self._get_file_transfer_paths(
184            source_file_relative_path=file.source_file_relative_path,
185            staging_directory=local_directory,
186        )
187        local_file_path = file_paths[self.LOCAL_FILE_PATH]
188        file_relative_path = file_paths[self.FILE_RELATIVE_PATH]
189        file_name = file_paths[self.FILE_NAME]
190
191        logger.info(
192            f"Starting to download the file {file.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
193        )
194        start_download_time = time.time()
195
196        file.download_to_local_directory(local_file_path)
197
198        write_duration = time.time() - start_download_time
199        logger.info(
200            f"Finished downloading the file {file.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds."
201        )
202
203        file_record_data = FileRecordData(
204            folder=file_paths[self.FILE_FOLDER],
205            file_name=file_name,
206            bytes=file_size,
207            id=file.id,
208            mime_type=file.mime_type,
209            created_at=file.created_at,
210            updated_at=file.updated_at,
211            source_uri=file.uri,
212        )
213        file_reference = AirbyteRecordMessageFileReference(
214            staging_file_url=local_file_path,
215            source_file_relative_path=file_relative_path,
216            file_size_bytes=file_size,
217        )
218        return file_record_data, file_reference
219
220    def _get_file_transfer_paths(
221        self, source_file_relative_path: str, staging_directory: str
222    ) -> MutableMapping[str, Any]:
223        """
224        This method is used to get the file transfer paths for a given source file relative path and local directory.
225        It returns a dictionary with the following keys:
226            - FILE_RELATIVE_PATH: The relative path to file in reference to the staging directory.
227            - LOCAL_FILE_PATH: The absolute path to the file.
228            - FILE_NAME: The name of the referenced file.
229            - FILE_FOLDER: The folder of the referenced file.
230        """
231        preserve_directory_structure = self.preserve_directory_structure()
232
233        file_name = path.basename(source_file_relative_path)
234        file_folder = path.dirname(source_file_relative_path)
235        if preserve_directory_structure:
236            # Remove left slashes from source path format to make relative path for writing locally
237            file_relative_path = source_file_relative_path.lstrip("/")
238        else:
239            file_relative_path = file_name
240        local_file_path = path.join(staging_directory, file_relative_path)
241        # Ensure the local directory exists
242        makedirs(path.dirname(local_file_path), exist_ok=True)
243
244        file_paths = {
245            self.FILE_RELATIVE_PATH: file_relative_path,
246            self.LOCAL_FILE_PATH: local_file_path,
247            self.FILE_NAME: file_name,
248            self.FILE_FOLDER: file_folder,
249        }
250        return file_paths

Helper class that provides a standard way to create an ABC using inheritance.

DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
FILE_RELATIVE_PATH = 'file_relative_path'
FILE_NAME = 'file_name'
LOCAL_FILE_PATH = 'local_file_path'
FILE_FOLDER = 'file_folder'
FILE_SIZE_LIMIT = 1500000000
config: Optional[AbstractFileBasedSpec]
46    @property
47    def config(self) -> Optional[AbstractFileBasedSpec]:
48        return self._config

FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.

Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader will require keys that (for example) allow it to authenticate with the 3rd party.

Therefore, concrete implementations of AbstractFileBasedStreamReader's config setter should assert that value is of the correct config type for that type of StreamReader.

@abstractmethod
def open_file( self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> io.IOBase:
64    @abstractmethod
65    def open_file(
66        self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger
67    ) -> IOBase:
68        """
69        Return a file handle for reading.
70
71        Many sources will be able to use smart_open to implement this method,
72        for example:
73
74        client = boto3.Session(...)
75        return smart_open.open(remote_file.uri, transport_params={"client": client})
76        """
77        ...

Return a file handle for reading.

Many sources will be able to use smart_open to implement this method, for example:

client = boto3.Session(...) return smart_open.open(remote_file.uri, transport_params={"client": client})

@abstractmethod
def get_matching_files( self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[RemoteFile]:
 79    @abstractmethod
 80    def get_matching_files(
 81        self,
 82        globs: List[str],
 83        prefix: Optional[str],
 84        logger: logging.Logger,
 85    ) -> Iterable[RemoteFile]:
 86        """
 87        Return all files that match any of the globs.
 88
 89        Example:
 90
 91        The source has files "a.json", "foo/a.json", "foo/bar/a.json"
 92
 93        If globs = ["*.json"] then this method returns ["a.json"].
 94
 95        If globs = ["foo/*.json"] then this method returns ["foo/a.json"].
 96
 97        Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs`
 98        are available, which may be helpful when implementing this method.
 99        """
100        ...

Return all files that match any of the globs.

Example:

The source has files "a.json", "foo/a.json", "foo/bar/a.json"

If globs = ["*.json"] then this method returns ["a.json"].

If globs = ["foo/*.json"] then this method returns ["foo/a.json"].

Utility method self.filter_files_by_globs and self.get_prefixes_from_globs are available, which may be helpful when implementing this method.

def filter_files_by_globs_and_start_date( self, files: List[RemoteFile], globs: List[str]) -> Iterable[RemoteFile]:
102    def filter_files_by_globs_and_start_date(
103        self, files: List[RemoteFile], globs: List[str]
104    ) -> Iterable[RemoteFile]:
105        """
106        Utility method for filtering files based on globs.
107        """
108        start_date = (
109            datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT)
110            if self.config and self.config.start_date
111            else None
112        )
113        seen = set()
114
115        for file in files:
116            if self.file_matches_globs(file, globs):
117                if file.uri not in seen and (not start_date or file.last_modified >= start_date):
118                    seen.add(file.uri)
119                    yield file

Utility method for filtering files based on globs.

@staticmethod
def file_matches_globs( file: RemoteFile, globs: List[str]) -> bool:
121    @staticmethod
122    def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool:
123        # Use the GLOBSTAR flag to enable recursive ** matching
124        # (https://facelessuser.github.io/wcmatch/wcmatch/#globstar)
125        return any(globmatch(file.uri, g, flags=GLOBSTAR) for g in globs)
@staticmethod
def get_prefixes_from_globs(globs: List[str]) -> Set[str]:
127    @staticmethod
128    def get_prefixes_from_globs(globs: List[str]) -> Set[str]:
129        """
130        Utility method for extracting prefixes from the globs.
131        """
132        prefixes = {glob.split("*")[0] for glob in globs}
133        return set(filter(lambda x: bool(x), prefixes))

Utility method for extracting prefixes from the globs.

def use_file_transfer(self) -> bool:
135    def use_file_transfer(self) -> bool:
136        if self.config:
137            return use_file_transfer(self.config)
138        return False
def preserve_directory_structure(self) -> bool:
140    def preserve_directory_structure(self) -> bool:
141        # fall back to preserve subdirectories if config is not present or incomplete
142        if self.config:
143            return preserve_directory_structure(self.config)
144        return True
def include_identities_stream(self) -> bool:
146    def include_identities_stream(self) -> bool:
147        if self.config:
148            return include_identities_stream(self.config)
149        return False
def upload( self, file: airbyte_cdk.sources.file_based.remote_file.UploadableRemoteFile, local_directory: str, logger: logging.Logger) -> Tuple[airbyte_cdk.sources.file_based.file_record_data.FileRecordData, airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessageFileReference]:
151    def upload(
152        self, file: UploadableRemoteFile, local_directory: str, logger: logging.Logger
153    ) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
154        """
155        This is required for connectors that will support writing to
156        files. It will handle the logic to download,get,read,acquire or
157        whatever is more efficient to get a file from the source.
158
159        Args:
160               file (RemoteFile): The remote file object containing URI and metadata.
161               local_directory (str): The local directory path where the file will be downloaded.
162               logger (logging.Logger): Logger for logging information and errors.
163
164           Returns:
165               AirbyteRecordMessageFileReference: A file reference object containing:
166                   - staging_file_url (str): The absolute path to the referenced file in the staging area.
167                   - file_size_bytes (int): The size of the referenced file in bytes.
168                   - source_file_relative_path (str): The relative path to the referenced file in source.
169        """
170        if not isinstance(file, UploadableRemoteFile):
171            raise TypeError(f"Expected UploadableRemoteFile, got {type(file)}")
172
173        file_size = file.size
174
175        if file_size > self.FILE_SIZE_LIMIT:
176            message = (
177                f"File size exceeds the {self.FILE_SIZE_LIMIT / 1e9} GB limit. File URI: {file.uri}"
178            )
179            raise FileSizeLimitError(
180                message=message, internal_message=message, failure_type=FailureType.config_error
181            )
182
183        file_paths = self._get_file_transfer_paths(
184            source_file_relative_path=file.source_file_relative_path,
185            staging_directory=local_directory,
186        )
187        local_file_path = file_paths[self.LOCAL_FILE_PATH]
188        file_relative_path = file_paths[self.FILE_RELATIVE_PATH]
189        file_name = file_paths[self.FILE_NAME]
190
191        logger.info(
192            f"Starting to download the file {file.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)"
193        )
194        start_download_time = time.time()
195
196        file.download_to_local_directory(local_file_path)
197
198        write_duration = time.time() - start_download_time
199        logger.info(
200            f"Finished downloading the file {file.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds."
201        )
202
203        file_record_data = FileRecordData(
204            folder=file_paths[self.FILE_FOLDER],
205            file_name=file_name,
206            bytes=file_size,
207            id=file.id,
208            mime_type=file.mime_type,
209            created_at=file.created_at,
210            updated_at=file.updated_at,
211            source_uri=file.uri,
212        )
213        file_reference = AirbyteRecordMessageFileReference(
214            staging_file_url=local_file_path,
215            source_file_relative_path=file_relative_path,
216            file_size_bytes=file_size,
217        )
218        return file_record_data, file_reference

This is required for connectors that will support writing to files. It will handle the logic to download,get,read,acquire or whatever is more efficient to get a file from the source.

Arguments:
  • file (RemoteFile): The remote file object containing URI and metadata. local_directory (str): The local directory path where the file will be downloaded. logger (logging.Logger): Logger for logging information and errors.
  • Returns: AirbyteRecordMessageFileReference: A file reference object containing:
    • staging_file_url (str): The absolute path to the referenced file in the staging area.
    • file_size_bytes (int): The size of the referenced file in bytes.
    • source_file_relative_path (str): The relative path to the referenced file in source.
class CsvFormat(pydantic.v1.main.BaseModel):
 85class CsvFormat(BaseModel):
 86    class Config(OneOfOptionConfig):
 87        title = "CSV Format"
 88        discriminator = "filetype"
 89
 90    filetype: str = Field(
 91        "csv",
 92        const=True,
 93    )
 94    delimiter: str = Field(
 95        title="Delimiter",
 96        description="The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
 97        default=",",
 98    )
 99    quote_char: str = Field(
100        title="Quote Character",
101        default='"',
102        description="The character used for quoting CSV values. To disallow quoting, make this field blank.",
103    )
104    escape_char: Optional[str] = Field(
105        title="Escape Character",
106        default=None,
107        description="The character used for escaping special characters. To disallow escaping, leave this field blank.",
108    )
109    encoding: Optional[str] = Field(
110        default="utf8",
111        description='The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href="https://docs.python.org/3/library/codecs.html#standard-encodings" target="_blank">list of python encodings</a> for allowable options.',
112    )
113    double_quote: bool = Field(
114        title="Double Quote",
115        default=True,
116        description="Whether two quotes in a quoted CSV value denote a single quote in the data.",
117    )
118    null_values: Set[str] = Field(
119        title="Null Values",
120        default=[],
121        description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
122    )
123    strings_can_be_null: bool = Field(
124        title="Strings Can Be Null",
125        default=True,
126        description="Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.",
127    )
128    skip_rows_before_header: int = Field(
129        title="Skip Rows Before Header",
130        default=0,
131        description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
132    )
133    skip_rows_after_header: int = Field(
134        title="Skip Rows After Header",
135        default=0,
136        description="The number of rows to skip after the header row.",
137    )
138    header_definition: Union[CsvHeaderFromCsv, CsvHeaderAutogenerated, CsvHeaderUserProvided] = (
139        Field(
140            title="CSV Header Definition",
141            default=CsvHeaderFromCsv(header_definition_type=CsvHeaderDefinitionType.FROM_CSV.value),
142            description="How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.",
143        )
144    )
145    true_values: Set[str] = Field(
146        title="True Values",
147        default=DEFAULT_TRUE_VALUES,
148        description="A set of case-sensitive strings that should be interpreted as true values.",
149    )
150    false_values: Set[str] = Field(
151        title="False Values",
152        default=DEFAULT_FALSE_VALUES,
153        description="A set of case-sensitive strings that should be interpreted as false values.",
154    )
155    inference_type: InferenceType = Field(
156        title="Inference Type",
157        default=InferenceType.NONE,
158        description="How to infer the types of the columns. If none, inference default to strings.",
159        airbyte_hidden=True,
160    )
161    ignore_errors_on_fields_mismatch: bool = Field(
162        title="Ignore errors on field mismatch",
163        default=False,
164        description="Whether to ignore errors that occur when the number of fields in the CSV does not match the number of columns in the schema.",
165    )
166
167    @validator("delimiter")
168    def validate_delimiter(cls, v: str) -> str:
169        if v == r"\t":
170            v = "\t"
171        if len(v) != 1:
172            raise ValueError("delimiter should only be one character")
173        if v in {"\r", "\n"}:
174            raise ValueError(f"delimiter cannot be {v}")
175        return v
176
177    @validator("quote_char")
178    def validate_quote_char(cls, v: str) -> str:
179        if len(v) != 1:
180            raise ValueError("quote_char should only be one character")
181        return v
182
183    @validator("escape_char")
184    def validate_escape_char(cls, v: str) -> str:
185        if v is not None and len(v) != 1:
186            raise ValueError("escape_char should only be one character")
187        return v
188
189    @validator("encoding")
190    def validate_encoding(cls, v: str) -> str:
191        try:
192            codecs.lookup(v)
193        except LookupError:
194            raise ValueError(f"invalid encoding format: {v}")
195        return v
196
197    @root_validator
198    def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
199        definition_type = values.get("header_definition_type")
200        column_names = values.get("user_provided_column_names")
201        if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names:
202            raise ValidationError(
203                "`user_provided_column_names` should be defined if the definition 'User Provided'.",
204                model=CsvFormat,
205            )
206        if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names:
207            raise ValidationError(
208                "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.",
209                model=CsvFormat,
210            )
211        return values
filetype: str
delimiter: str
quote_char: str
escape_char: Optional[str]
encoding: Optional[str]
double_quote: bool
null_values: Set[str]
strings_can_be_null: bool
skip_rows_before_header: int
skip_rows_after_header: int
true_values: Set[str]
false_values: Set[str]
ignore_errors_on_fields_mismatch: bool
@validator('delimiter')
def validate_delimiter(cls, v: str) -> str:
167    @validator("delimiter")
168    def validate_delimiter(cls, v: str) -> str:
169        if v == r"\t":
170            v = "\t"
171        if len(v) != 1:
172            raise ValueError("delimiter should only be one character")
173        if v in {"\r", "\n"}:
174            raise ValueError(f"delimiter cannot be {v}")
175        return v
@validator('quote_char')
def validate_quote_char(cls, v: str) -> str:
177    @validator("quote_char")
178    def validate_quote_char(cls, v: str) -> str:
179        if len(v) != 1:
180            raise ValueError("quote_char should only be one character")
181        return v
@validator('escape_char')
def validate_escape_char(cls, v: str) -> str:
183    @validator("escape_char")
184    def validate_escape_char(cls, v: str) -> str:
185        if v is not None and len(v) != 1:
186            raise ValueError("escape_char should only be one character")
187        return v
@validator('encoding')
def validate_encoding(cls, v: str) -> str:
189    @validator("encoding")
190    def validate_encoding(cls, v: str) -> str:
191        try:
192            codecs.lookup(v)
193        except LookupError:
194            raise ValueError(f"invalid encoding format: {v}")
195        return v
@root_validator
def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
197    @root_validator
198    def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
199        definition_type = values.get("header_definition_type")
200        column_names = values.get("user_provided_column_names")
201        if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names:
202            raise ValidationError(
203                "`user_provided_column_names` should be defined if the definition 'User Provided'.",
204                model=CsvFormat,
205            )
206        if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names:
207            raise ValidationError(
208                "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.",
209                model=CsvFormat,
210            )
211        return values
class CsvFormat.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
86    class Config(OneOfOptionConfig):
87        title = "CSV Format"
88        discriminator = "filetype"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'CSV Format'
discriminator = 'filetype'
class CustomFileBasedException(airbyte_cdk.utils.traced_exception.AirbyteTracedException):
150class CustomFileBasedException(AirbyteTracedException):
151    """
152    A specialized exception for file-based connectors.
153
154    This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
155    """
156
157    pass

A specialized exception for file-based connectors.

This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.

 18class DefaultFileBasedCursor(AbstractFileBasedCursor):
 19    DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
 20    DEFAULT_MAX_HISTORY_SIZE = 10_000
 21    DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
 22    CURSOR_FIELD = "_ab_source_file_last_modified"
 23
 24    def __init__(self, stream_config: FileBasedStreamConfig, **_: Any):
 25        super().__init__(stream_config)  # type: ignore [safe-super]
 26        self._file_to_datetime_history: MutableMapping[str, str] = {}
 27        self._time_window_if_history_is_full = timedelta(
 28            days=stream_config.days_to_sync_if_history_is_full
 29            or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
 30        )
 31
 32        if self._time_window_if_history_is_full <= timedelta():
 33            raise ValueError(
 34                f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}"
 35            )
 36
 37        self._start_time = self._compute_start_time()
 38        self._initial_earliest_file_in_history: Optional[RemoteFile] = None
 39
 40    def set_initial_state(self, value: StreamState) -> None:
 41        self._file_to_datetime_history = value.get("history", {})
 42        self._start_time = self._compute_start_time()
 43        self._initial_earliest_file_in_history = self._compute_earliest_file_in_history()
 44
 45    def add_file(self, file: RemoteFile) -> None:
 46        self._file_to_datetime_history[file.uri] = file.last_modified.strftime(
 47            self.DATE_TIME_FORMAT
 48        )
 49        if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
 50            # Get the earliest file based on its last modified date and its uri
 51            oldest_file = self._compute_earliest_file_in_history()
 52            if oldest_file:
 53                del self._file_to_datetime_history[oldest_file.uri]
 54            else:
 55                raise Exception(
 56                    "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
 57                )
 58
 59    def get_state(self) -> StreamState:
 60        state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()}
 61        return state
 62
 63    def _get_cursor(self) -> Optional[str]:
 64        """
 65        Returns the cursor value.
 66
 67        Files are synced in order of last-modified with secondary sort on filename, so the cursor value is
 68        a string joining the last-modified timestamp of the last synced file and the name of the file.
 69        """
 70        if self._file_to_datetime_history.items():
 71            filename, timestamp = max(
 72                self._file_to_datetime_history.items(), key=lambda x: (x[1], x[0])
 73            )
 74            return f"{timestamp}_{filename}"
 75        return None
 76
 77    def _is_history_full(self) -> bool:
 78        """
 79        Returns true if the state's history is full, meaning new entries will start to replace old entries.
 80        """
 81        return len(self._file_to_datetime_history) >= self.DEFAULT_MAX_HISTORY_SIZE
 82
 83    def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool:
 84        if file.uri in self._file_to_datetime_history:
 85            # If the file's uri is in the history, we should sync the file if it has been modified since it was synced
 86            updated_at_from_history = datetime.strptime(
 87                self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT
 88            )
 89            if file.last_modified < updated_at_from_history:
 90                logger.warning(
 91                    f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file."
 92                )
 93            else:
 94                return file.last_modified > updated_at_from_history
 95            return file.last_modified > updated_at_from_history
 96        if self._is_history_full():
 97            if self._initial_earliest_file_in_history is None:
 98                return True
 99            if file.last_modified > self._initial_earliest_file_in_history.last_modified:
100                # If the history is partial and the file's datetime is strictly greater than the earliest file in the history,
101                # we should sync it
102                return True
103            elif file.last_modified == self._initial_earliest_file_in_history.last_modified:
104                # If the history is partial and the file's datetime is equal to the earliest file in the history,
105                # we should sync it if its uri is strictly greater than the earliest file in the history
106                return file.uri > self._initial_earliest_file_in_history.uri
107            else:
108                # Otherwise, only sync the file if it has been modified since the start of the time window
109                return file.last_modified >= self.get_start_time()
110        else:
111            # The file is not in the history and the history is complete. We know we need to sync the file
112            return True
113
114    def get_files_to_sync(
115        self, all_files: Iterable[RemoteFile], logger: logging.Logger
116    ) -> Iterable[RemoteFile]:
117        if self._is_history_full():
118            logger.warning(
119                f"The state history is full. "
120                f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
121                f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
122            )
123        for f in all_files:
124            if self._should_sync_file(f, logger):
125                yield f
126
127    def get_start_time(self) -> datetime:
128        return self._start_time
129
130    def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:
131        if self._file_to_datetime_history:
132            filename, last_modified = min(
133                self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0])
134            )
135            return RemoteFile(
136                uri=filename, last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT)
137            )
138        else:
139            return None
140
141    def _compute_start_time(self) -> datetime:
142        if not self._file_to_datetime_history:
143            return datetime.min
144        else:
145            earliest = min(self._file_to_datetime_history.values())
146            earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT)
147            if self._is_history_full():
148                time_window = datetime.now() - self._time_window_if_history_is_full
149                earliest_dt = min(earliest_dt, time_window)
150            return earliest_dt

Abstract base class for cursors used by file-based streams.

DefaultFileBasedCursor( stream_config: FileBasedStreamConfig, **_: Any)
24    def __init__(self, stream_config: FileBasedStreamConfig, **_: Any):
25        super().__init__(stream_config)  # type: ignore [safe-super]
26        self._file_to_datetime_history: MutableMapping[str, str] = {}
27        self._time_window_if_history_is_full = timedelta(
28            days=stream_config.days_to_sync_if_history_is_full
29            or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
30        )
31
32        if self._time_window_if_history_is_full <= timedelta():
33            raise ValueError(
34                f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}"
35            )
36
37        self._start_time = self._compute_start_time()
38        self._initial_earliest_file_in_history: Optional[RemoteFile] = None

Common interface for all cursors.

DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
DEFAULT_MAX_HISTORY_SIZE = 10000
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
CURSOR_FIELD = '_ab_source_file_last_modified'
def set_initial_state(self, value: MutableMapping[str, Any]) -> None:
40    def set_initial_state(self, value: StreamState) -> None:
41        self._file_to_datetime_history = value.get("history", {})
42        self._start_time = self._compute_start_time()
43        self._initial_earliest_file_in_history = self._compute_earliest_file_in_history()

Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.

Parameters
  • value: The stream state
def add_file( self, file: RemoteFile) -> None:
45    def add_file(self, file: RemoteFile) -> None:
46        self._file_to_datetime_history[file.uri] = file.last_modified.strftime(
47            self.DATE_TIME_FORMAT
48        )
49        if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
50            # Get the earliest file based on its last modified date and its uri
51            oldest_file = self._compute_earliest_file_in_history()
52            if oldest_file:
53                del self._file_to_datetime_history[oldest_file.uri]
54            else:
55                raise Exception(
56                    "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
57                )

Add a file to the cursor. This method is called when a file is processed by the stream.

Parameters
  • file: The file to add
def get_state(self) -> MutableMapping[str, Any]:
59    def get_state(self) -> StreamState:
60        state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()}
61        return state

Get the state of the cursor.

def get_files_to_sync( self, all_files: Iterable[RemoteFile], logger: logging.Logger) -> Iterable[RemoteFile]:
114    def get_files_to_sync(
115        self, all_files: Iterable[RemoteFile], logger: logging.Logger
116    ) -> Iterable[RemoteFile]:
117        if self._is_history_full():
118            logger.warning(
119                f"The state history is full. "
120                f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
121                f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
122            )
123        for f in all_files:
124            if self._should_sync_file(f, logger):
125                yield f

Given the list of files in the source, return the files that should be synced.

Parameters
  • all_files: All files in the source
  • logger:
Returns

The files that should be synced

def get_start_time(self) -> datetime.datetime:
127    def get_start_time(self) -> datetime:
128        return self._start_time

Returns the start time of the current sync.

112class ErrorListingFiles(BaseFileBasedSourceError):
113    pass

Common base class for all non-exit exceptions.

 87class FileBasedSource(ConcurrentSourceAdapter, ABC):
 88    # We make each source override the concurrency level to give control over when they are upgraded.
 89    _concurrency_level = None
 90
 91    def __init__(
 92        self,
 93        stream_reader: AbstractFileBasedStreamReader,
 94        spec_class: Type[AbstractFileBasedSpec],
 95        catalog: Optional[ConfiguredAirbyteCatalog],
 96        config: Optional[Mapping[str, Any]],
 97        state: Optional[List[AirbyteStateMessage]],
 98        availability_strategy: Optional[AbstractFileBasedAvailabilityStrategy] = None,
 99        discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(),
100        parsers: Mapping[Type[Any], FileTypeParser] = default_parsers,
101        validation_policies: Mapping[
102            ValidationPolicy, AbstractSchemaValidationPolicy
103        ] = DEFAULT_SCHEMA_VALIDATION_POLICIES,
104        cursor_cls: Type[
105            Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor]
106        ] = FileBasedConcurrentCursor,
107        stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None,
108    ):
109        self.stream_reader = stream_reader
110        self.stream_permissions_reader = stream_permissions_reader
111        self.spec_class = spec_class
112        self.config = config
113        self.catalog = catalog
114        self.state = state
115        self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy(
116            stream_reader
117        )
118        self.discovery_policy = discovery_policy
119        self.parsers = parsers
120        self.validation_policies = validation_policies
121        self.stream_schemas = (
122            {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {}
123        )
124        self.cursor_cls = cursor_cls
125        self.logger = init_logger(f"airbyte.{self.name}")
126        self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector()
127        self._message_repository: Optional[MessageRepository] = None
128        concurrent_source = ConcurrentSource.create(
129            MAX_CONCURRENCY,
130            INITIAL_N_PARTITIONS,
131            self.logger,
132            self._slice_logger,
133            self.message_repository,
134        )
135        self._state = None
136        super().__init__(concurrent_source)
137
138    @property
139    def message_repository(self) -> MessageRepository:
140        if self._message_repository is None:
141            self._message_repository = InMemoryMessageRepository(
142                Level(AirbyteLogFormatter.level_mapping[self.logger.level])
143            )
144        return self._message_repository
145
146    def check_connection(
147        self, logger: logging.Logger, config: Mapping[str, Any]
148    ) -> Tuple[bool, Optional[Any]]:
149        """
150        Check that the source can be accessed using the user-provided configuration.
151
152        For each stream, verify that we can list and read files.
153
154        Returns (True, None) if the connection check is successful.
155
156        Otherwise, the "error" object should describe what went wrong.
157        """
158        try:
159            streams = self.streams(config)
160        except ConfigValidationError as config_exception:
161            raise AirbyteTracedException(
162                internal_message="Please check the logged errors for more information.",
163                message=str(config_exception),
164                exception=AirbyteTracedException(exception=config_exception),
165                failure_type=FailureType.config_error,
166            )
167        except Exception as exp:
168            raise AirbyteTracedException(
169                internal_message="Please check the logged errors for more information.",
170                message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
171                exception=AirbyteTracedException(exception=exp),
172                failure_type=FailureType.config_error,
173            )
174        if len(streams) == 0:
175            return (
176                False,
177                f"No streams are available for source {self.name}. This is probably an issue with the connector. Please verify that your "
178                f"configuration provides permissions to list and read files from the source. Contact support if you are unable to "
179                f"resolve this issue.",
180            )
181
182        errors = []
183        tracebacks = []
184        for stream in streams:
185            if isinstance(stream, FileIdentitiesStream):
186                identity = next(iter(stream.load_identity_groups()))
187                if not identity:
188                    errors.append(
189                        "Unable to get identities for current configuration, please check your credentials"
190                    )
191                continue
192            if not isinstance(stream, AbstractFileBasedStream):
193                raise ValueError(f"Stream {stream} is not a file-based stream.")
194            try:
195                parsed_config = self._get_parsed_config(config)
196                availability_method = (
197                    stream.availability_strategy.check_availability
198                    if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config)
199                    else stream.availability_strategy.check_availability_and_parsability
200                )
201                (
202                    stream_is_available,
203                    reason,
204                ) = availability_method(stream, logger, self)
205            except AirbyteTracedException as ate:
206                errors.append(f"Unable to connect to stream {stream.name} - {ate.message}")
207                tracebacks.append(traceback.format_exc())
208            except Exception:
209                errors.append(f"Unable to connect to stream {stream.name}")
210                tracebacks.append(traceback.format_exc())
211            else:
212                if not stream_is_available and reason:
213                    errors.append(reason)
214
215        if len(errors) == 1 and len(tracebacks) == 1:
216            raise AirbyteTracedException(
217                internal_message=tracebacks[0],
218                message=f"{errors[0]}",
219                failure_type=FailureType.config_error,
220            )
221        if len(errors) == 1 and len(tracebacks) == 0:
222            raise AirbyteTracedException(
223                message=f"{errors[0]}",
224                failure_type=FailureType.config_error,
225            )
226        elif len(errors) > 1:
227            raise AirbyteTracedException(
228                internal_message="\n".join(tracebacks),
229                message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}",
230                failure_type=FailureType.config_error,
231            )
232
233        return not bool(errors), (errors or None)
234
235    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
236        """
237        Return a list of this source's streams.
238        """
239
240        if self.catalog:
241            state_manager = ConnectorStateManager(state=self.state)
242        else:
243            # During `check` operations we don't have a catalog so cannot create a state manager.
244            # Since the state manager is only required for incremental syncs, this is fine.
245            state_manager = None
246
247        try:
248            parsed_config = self._get_parsed_config(config)
249            self.stream_reader.config = parsed_config
250            if self.stream_permissions_reader:
251                self.stream_permissions_reader.config = parsed_config
252            streams: List[Stream] = []
253            for stream_config in parsed_config.streams:
254                # Like state_manager, `catalog_stream` may be None during `check`
255                catalog_stream = self._get_stream_from_catalog(stream_config)
256                stream_state = (
257                    state_manager.get_stream_state(catalog_stream.name, catalog_stream.namespace)
258                    if (state_manager and catalog_stream)
259                    else None
260                )
261
262                sync_mode = self._get_sync_mode_from_catalog(stream_config.name)
263
264                if (
265                    sync_mode == SyncMode.full_refresh
266                    and hasattr(self, "_concurrency_level")
267                    and self._concurrency_level is not None
268                ):
269                    cursor = FileBasedFinalStateCursor(
270                        stream_config=stream_config,
271                        stream_namespace=None,
272                        message_repository=self.message_repository,
273                    )
274                    stream = FileBasedStreamFacade.create_from_stream(
275                        stream=self._make_file_based_stream(
276                            stream_config=stream_config,
277                            cursor=cursor,
278                            parsed_config=parsed_config,
279                        ),
280                        source=self,
281                        logger=self.logger,
282                        state=stream_state,
283                        cursor=cursor,
284                    )
285
286                elif (
287                    sync_mode == SyncMode.incremental
288                    and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor)
289                    and hasattr(self, "_concurrency_level")
290                    and self._concurrency_level is not None
291                ):
292                    assert state_manager is not None, (
293                        "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support."
294                    )
295
296                    cursor = self.cursor_cls(
297                        stream_config,
298                        stream_config.name,
299                        None,
300                        stream_state,
301                        self.message_repository,
302                        state_manager,
303                        CursorField(DefaultFileBasedStream.ab_last_mod_col),
304                    )
305                    stream = FileBasedStreamFacade.create_from_stream(
306                        stream=self._make_file_based_stream(
307                            stream_config=stream_config,
308                            cursor=cursor,
309                            parsed_config=parsed_config,
310                        ),
311                        source=self,
312                        logger=self.logger,
313                        state=stream_state,
314                        cursor=cursor,
315                    )
316                else:
317                    cursor = self.cursor_cls(stream_config)
318                    stream = self._make_file_based_stream(
319                        stream_config=stream_config,
320                        cursor=cursor,
321                        parsed_config=parsed_config,
322                    )
323
324                streams.append(stream)
325
326            if include_identities_stream(parsed_config):
327                identities_stream = self._make_identities_stream()
328                streams.append(identities_stream)
329            return streams
330
331        except ValidationError as exc:
332            raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc
333
334    def _make_default_stream(
335        self,
336        stream_config: FileBasedStreamConfig,
337        cursor: Optional[AbstractFileBasedCursor],
338        parsed_config: AbstractFileBasedSpec,
339    ) -> AbstractFileBasedStream:
340        return DefaultFileBasedStream(
341            config=stream_config,
342            catalog_schema=self.stream_schemas.get(stream_config.name),
343            stream_reader=self.stream_reader,
344            availability_strategy=self.availability_strategy,
345            discovery_policy=self.discovery_policy,
346            parsers=self.parsers,
347            validation_policy=self._validate_and_get_validation_policy(stream_config),
348            errors_collector=self.errors_collector,
349            cursor=cursor,
350            use_file_transfer=use_file_transfer(parsed_config),
351            preserve_directory_structure=preserve_directory_structure(parsed_config),
352        )
353
354    def _ensure_permissions_reader_available(self) -> None:
355        """
356        Validates that a stream permissions reader is available.
357        Raises a ValueError if the reader is not provided.
358        """
359        if not self.stream_permissions_reader:
360            raise ValueError(
361                "Stream permissions reader is required for streams that use permissions transfer mode."
362            )
363
364    def _make_permissions_stream(
365        self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
366    ) -> AbstractFileBasedStream:
367        """
368        Creates a stream that reads permissions from files.
369        """
370        self._ensure_permissions_reader_available()
371        return PermissionsFileBasedStream(
372            config=stream_config,
373            catalog_schema=self.stream_schemas.get(stream_config.name),
374            stream_reader=self.stream_reader,
375            availability_strategy=self.availability_strategy,
376            discovery_policy=self.discovery_policy,
377            parsers=self.parsers,
378            validation_policy=self._validate_and_get_validation_policy(stream_config),
379            errors_collector=self.errors_collector,
380            cursor=cursor,
381            stream_permissions_reader=self.stream_permissions_reader,  # type: ignore
382        )
383
384    def _make_file_based_stream(
385        self,
386        stream_config: FileBasedStreamConfig,
387        cursor: Optional[AbstractFileBasedCursor],
388        parsed_config: AbstractFileBasedSpec,
389    ) -> AbstractFileBasedStream:
390        """
391        Creates different streams depending on the type of the transfer mode selected
392        """
393        if use_permissions_transfer(parsed_config):
394            return self._make_permissions_stream(stream_config, cursor)
395        # we should have a stream for File transfer mode to decouple from DefaultFileBasedStream
396        else:
397            return self._make_default_stream(stream_config, cursor, parsed_config)
398
399    def _make_identities_stream(
400        self,
401    ) -> Stream:
402        self._ensure_permissions_reader_available()
403        return FileIdentitiesStream(
404            catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME),
405            stream_permissions_reader=self.stream_permissions_reader,  # type: ignore
406            discovery_policy=self.discovery_policy,
407            errors_collector=self.errors_collector,
408        )
409
410    def _get_stream_from_catalog(
411        self, stream_config: FileBasedStreamConfig
412    ) -> Optional[AirbyteStream]:
413        if self.catalog:
414            for stream in self.catalog.streams or []:
415                if stream.stream.name == stream_config.name:
416                    return stream.stream
417        return None
418
419    def _get_sync_mode_from_catalog(self, stream_name: str) -> Optional[SyncMode]:
420        if self.catalog:
421            for catalog_stream in self.catalog.streams:
422                if stream_name == catalog_stream.stream.name:
423                    return catalog_stream.sync_mode
424            self.logger.warning(f"No sync mode was found for {stream_name}.")
425        return None
426
427    def read(
428        self,
429        logger: logging.Logger,
430        config: Mapping[str, Any],
431        catalog: ConfiguredAirbyteCatalog,
432        state: Optional[List[AirbyteStateMessage]] = None,
433    ) -> Iterator[AirbyteMessage]:
434        yield from super().read(logger, config, catalog, state)
435        # emit all the errors collected
436        yield from self.errors_collector.yield_and_raise_collected()
437        # count streams using a certain parser
438        parsed_config = self._get_parsed_config(config)
439        for parser, count in Counter(
440            stream.format.filetype for stream in parsed_config.streams
441        ).items():
442            yield create_analytics_message(f"file-cdk-{parser}-stream-count", count)
443
444    def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
445        """
446        Returns the specification describing what fields can be configured by a user when setting up a file-based source.
447        """
448
449        return ConnectorSpecification(
450            documentationUrl=self.spec_class.documentation_url(),
451            connectionSpecification=self.spec_class.schema(),
452        )
453
454    def _get_parsed_config(self, config: Mapping[str, Any]) -> AbstractFileBasedSpec:
455        return self.spec_class(**config)
456
457    def _validate_and_get_validation_policy(
458        self, stream_config: FileBasedStreamConfig
459    ) -> AbstractSchemaValidationPolicy:
460        if stream_config.validation_policy not in self.validation_policies:
461            # This should never happen because we validate the config against the schema's validation_policy enum
462            raise ValidationError(
463                f"`validation_policy` must be one of {list(self.validation_policies.keys())}",
464                model=FileBasedStreamConfig,
465            )
466        return self.validation_policies[stream_config.validation_policy]

Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.

FileBasedSource( stream_reader: AbstractFileBasedStreamReader, spec_class: Type[AbstractFileBasedSpec], catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]], availability_strategy: Optional[airbyte_cdk.sources.file_based.availability_strategy.AbstractFileBasedAvailabilityStrategy] = None, discovery_policy: airbyte_cdk.sources.file_based.discovery_policy.AbstractDiscoveryPolicy = <airbyte_cdk.sources.file_based.discovery_policy.DefaultDiscoveryPolicy object>, parsers: Mapping[Type[Any], airbyte_cdk.sources.file_based.file_types.file_type_parser.FileTypeParser] = {<class 'airbyte_cdk.sources.file_based.config.avro_format.AvroFormat'>: <airbyte_cdk.sources.file_based.file_types.AvroParser object>, <class 'CsvFormat'>: <airbyte_cdk.sources.file_based.file_types.CsvParser object>, <class 'airbyte_cdk.sources.file_based.config.excel_format.ExcelFormat'>: <airbyte_cdk.sources.file_based.file_types.ExcelParser object>, <class 'JsonlFormat'>: <airbyte_cdk.sources.file_based.file_types.JsonlParser object>, <class 'airbyte_cdk.sources.file_based.config.parquet_format.ParquetFormat'>: <airbyte_cdk.sources.file_based.file_types.ParquetParser object>, <class 'airbyte_cdk.sources.file_based.config.unstructured_format.UnstructuredFormat'>: <airbyte_cdk.sources.file_based.file_types.UnstructuredParser object>}, validation_policies: Mapping[airbyte_cdk.sources.file_based.config.file_based_stream_config.ValidationPolicy, airbyte_cdk.sources.file_based.schema_validation_policies.AbstractSchemaValidationPolicy] = {<ValidationPolicy.emit_record: 'Emit Record'>: <airbyte_cdk.sources.file_based.schema_validation_policies.EmitRecordPolicy object>, <ValidationPolicy.skip_record: 'Skip Record'>: <airbyte_cdk.sources.file_based.schema_validation_policies.SkipRecordPolicy object>, <ValidationPolicy.wait_for_discover: 'Wait for Discover'>: <airbyte_cdk.sources.file_based.schema_validation_policies.WaitForDiscoverPolicy object>}, cursor_cls: Type[Union[airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor, airbyte_cdk.sources.file_based.stream.cursor.AbstractFileBasedCursor]] = <class 'airbyte_cdk.sources.file_based.stream.concurrent.cursor.FileBasedConcurrentCursor'>, stream_permissions_reader: Optional[airbyte_cdk.sources.file_based.file_based_stream_permissions_reader.AbstractFileBasedStreamPermissionsReader] = None)
 91    def __init__(
 92        self,
 93        stream_reader: AbstractFileBasedStreamReader,
 94        spec_class: Type[AbstractFileBasedSpec],
 95        catalog: Optional[ConfiguredAirbyteCatalog],
 96        config: Optional[Mapping[str, Any]],
 97        state: Optional[List[AirbyteStateMessage]],
 98        availability_strategy: Optional[AbstractFileBasedAvailabilityStrategy] = None,
 99        discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(),
100        parsers: Mapping[Type[Any], FileTypeParser] = default_parsers,
101        validation_policies: Mapping[
102            ValidationPolicy, AbstractSchemaValidationPolicy
103        ] = DEFAULT_SCHEMA_VALIDATION_POLICIES,
104        cursor_cls: Type[
105            Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor]
106        ] = FileBasedConcurrentCursor,
107        stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None,
108    ):
109        self.stream_reader = stream_reader
110        self.stream_permissions_reader = stream_permissions_reader
111        self.spec_class = spec_class
112        self.config = config
113        self.catalog = catalog
114        self.state = state
115        self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy(
116            stream_reader
117        )
118        self.discovery_policy = discovery_policy
119        self.parsers = parsers
120        self.validation_policies = validation_policies
121        self.stream_schemas = (
122            {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {}
123        )
124        self.cursor_cls = cursor_cls
125        self.logger = init_logger(f"airbyte.{self.name}")
126        self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector()
127        self._message_repository: Optional[MessageRepository] = None
128        concurrent_source = ConcurrentSource.create(
129            MAX_CONCURRENCY,
130            INITIAL_N_PARTITIONS,
131            self.logger,
132            self._slice_logger,
133            self.message_repository,
134        )
135        self._state = None
136        super().__init__(concurrent_source)

ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.

The source's streams are still defined through the streams() method. Streams wrapped in a StreamFacade will be processed concurrently. Other streams will be processed sequentially as a later step.

stream_reader
stream_permissions_reader
spec_class
config
catalog
state
availability_strategy
discovery_policy
parsers
validation_policies
stream_schemas
cursor_cls
logger
message_repository: airbyte_cdk.MessageRepository
138    @property
139    def message_repository(self) -> MessageRepository:
140        if self._message_repository is None:
141            self._message_repository = InMemoryMessageRepository(
142                Level(AirbyteLogFormatter.level_mapping[self.logger.level])
143            )
144        return self._message_repository
def check_connection( self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
146    def check_connection(
147        self, logger: logging.Logger, config: Mapping[str, Any]
148    ) -> Tuple[bool, Optional[Any]]:
149        """
150        Check that the source can be accessed using the user-provided configuration.
151
152        For each stream, verify that we can list and read files.
153
154        Returns (True, None) if the connection check is successful.
155
156        Otherwise, the "error" object should describe what went wrong.
157        """
158        try:
159            streams = self.streams(config)
160        except ConfigValidationError as config_exception:
161            raise AirbyteTracedException(
162                internal_message="Please check the logged errors for more information.",
163                message=str(config_exception),
164                exception=AirbyteTracedException(exception=config_exception),
165                failure_type=FailureType.config_error,
166            )
167        except Exception as exp:
168            raise AirbyteTracedException(
169                internal_message="Please check the logged errors for more information.",
170                message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
171                exception=AirbyteTracedException(exception=exp),
172                failure_type=FailureType.config_error,
173            )
174        if len(streams) == 0:
175            return (
176                False,
177                f"No streams are available for source {self.name}. This is probably an issue with the connector. Please verify that your "
178                f"configuration provides permissions to list and read files from the source. Contact support if you are unable to "
179                f"resolve this issue.",
180            )
181
182        errors = []
183        tracebacks = []
184        for stream in streams:
185            if isinstance(stream, FileIdentitiesStream):
186                identity = next(iter(stream.load_identity_groups()))
187                if not identity:
188                    errors.append(
189                        "Unable to get identities for current configuration, please check your credentials"
190                    )
191                continue
192            if not isinstance(stream, AbstractFileBasedStream):
193                raise ValueError(f"Stream {stream} is not a file-based stream.")
194            try:
195                parsed_config = self._get_parsed_config(config)
196                availability_method = (
197                    stream.availability_strategy.check_availability
198                    if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config)
199                    else stream.availability_strategy.check_availability_and_parsability
200                )
201                (
202                    stream_is_available,
203                    reason,
204                ) = availability_method(stream, logger, self)
205            except AirbyteTracedException as ate:
206                errors.append(f"Unable to connect to stream {stream.name} - {ate.message}")
207                tracebacks.append(traceback.format_exc())
208            except Exception:
209                errors.append(f"Unable to connect to stream {stream.name}")
210                tracebacks.append(traceback.format_exc())
211            else:
212                if not stream_is_available and reason:
213                    errors.append(reason)
214
215        if len(errors) == 1 and len(tracebacks) == 1:
216            raise AirbyteTracedException(
217                internal_message=tracebacks[0],
218                message=f"{errors[0]}",
219                failure_type=FailureType.config_error,
220            )
221        if len(errors) == 1 and len(tracebacks) == 0:
222            raise AirbyteTracedException(
223                message=f"{errors[0]}",
224                failure_type=FailureType.config_error,
225            )
226        elif len(errors) > 1:
227            raise AirbyteTracedException(
228                internal_message="\n".join(tracebacks),
229                message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}",
230                failure_type=FailureType.config_error,
231            )
232
233        return not bool(errors), (errors or None)

Check that the source can be accessed using the user-provided configuration.

For each stream, verify that we can list and read files.

Returns (True, None) if the connection check is successful.

Otherwise, the "error" object should describe what went wrong.

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.Stream]:
235    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
236        """
237        Return a list of this source's streams.
238        """
239
240        if self.catalog:
241            state_manager = ConnectorStateManager(state=self.state)
242        else:
243            # During `check` operations we don't have a catalog so cannot create a state manager.
244            # Since the state manager is only required for incremental syncs, this is fine.
245            state_manager = None
246
247        try:
248            parsed_config = self._get_parsed_config(config)
249            self.stream_reader.config = parsed_config
250            if self.stream_permissions_reader:
251                self.stream_permissions_reader.config = parsed_config
252            streams: List[Stream] = []
253            for stream_config in parsed_config.streams:
254                # Like state_manager, `catalog_stream` may be None during `check`
255                catalog_stream = self._get_stream_from_catalog(stream_config)
256                stream_state = (
257                    state_manager.get_stream_state(catalog_stream.name, catalog_stream.namespace)
258                    if (state_manager and catalog_stream)
259                    else None
260                )
261
262                sync_mode = self._get_sync_mode_from_catalog(stream_config.name)
263
264                if (
265                    sync_mode == SyncMode.full_refresh
266                    and hasattr(self, "_concurrency_level")
267                    and self._concurrency_level is not None
268                ):
269                    cursor = FileBasedFinalStateCursor(
270                        stream_config=stream_config,
271                        stream_namespace=None,
272                        message_repository=self.message_repository,
273                    )
274                    stream = FileBasedStreamFacade.create_from_stream(
275                        stream=self._make_file_based_stream(
276                            stream_config=stream_config,
277                            cursor=cursor,
278                            parsed_config=parsed_config,
279                        ),
280                        source=self,
281                        logger=self.logger,
282                        state=stream_state,
283                        cursor=cursor,
284                    )
285
286                elif (
287                    sync_mode == SyncMode.incremental
288                    and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor)
289                    and hasattr(self, "_concurrency_level")
290                    and self._concurrency_level is not None
291                ):
292                    assert state_manager is not None, (
293                        "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support."
294                    )
295
296                    cursor = self.cursor_cls(
297                        stream_config,
298                        stream_config.name,
299                        None,
300                        stream_state,
301                        self.message_repository,
302                        state_manager,
303                        CursorField(DefaultFileBasedStream.ab_last_mod_col),
304                    )
305                    stream = FileBasedStreamFacade.create_from_stream(
306                        stream=self._make_file_based_stream(
307                            stream_config=stream_config,
308                            cursor=cursor,
309                            parsed_config=parsed_config,
310                        ),
311                        source=self,
312                        logger=self.logger,
313                        state=stream_state,
314                        cursor=cursor,
315                    )
316                else:
317                    cursor = self.cursor_cls(stream_config)
318                    stream = self._make_file_based_stream(
319                        stream_config=stream_config,
320                        cursor=cursor,
321                        parsed_config=parsed_config,
322                    )
323
324                streams.append(stream)
325
326            if include_identities_stream(parsed_config):
327                identities_stream = self._make_identities_stream()
328                streams.append(identities_stream)
329            return streams
330
331        except ValidationError as exc:
332            raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc

Return a list of this source's streams.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
427    def read(
428        self,
429        logger: logging.Logger,
430        config: Mapping[str, Any],
431        catalog: ConfiguredAirbyteCatalog,
432        state: Optional[List[AirbyteStateMessage]] = None,
433    ) -> Iterator[AirbyteMessage]:
434        yield from super().read(logger, config, catalog, state)
435        # emit all the errors collected
436        yield from self.errors_collector.yield_and_raise_collected()
437        # count streams using a certain parser
438        parsed_config = self._get_parsed_config(config)
439        for parser, count in Counter(
440            stream.format.filetype for stream in parsed_config.streams
441        ).items():
442            yield create_analytics_message(f"file-cdk-{parser}-stream-count", count)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.

def spec( self, *args: Any, **kwargs: Any) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
444    def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
445        """
446        Returns the specification describing what fields can be configured by a user when setting up a file-based source.
447        """
448
449        return ConnectorSpecification(
450            documentationUrl=self.spec_class.documentation_url(),
451            connectionSpecification=self.spec_class.schema(),
452        )

Returns the specification describing what fields can be configured by a user when setting up a file-based source.

class FileBasedSourceError(enum.Enum):
13class FileBasedSourceError(Enum):
14    EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict."
15    GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
16    ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding."
17    ERROR_CASTING_VALUE = "Could not cast the value to the expected type."
18    ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string."
19    ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record."
20    ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files."
21    ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files."
22    ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable."
23    ERROR_PARSING_USER_PROVIDED_SCHEMA = (
24        "The provided schema could not be transformed into valid JSON Schema."
25    )
26    ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
27    ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time."
28    ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
29    ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
30    STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."
31    NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key."
32    UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type."
33    SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?"
34    INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns."
35    CONFIG_VALIDATION_ERROR = "Error creating stream config object."
36    MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing."
37    UNDEFINED_PARSER = "No parser is defined for this file type."
38    UNDEFINED_VALIDATION_POLICY = (
39        "The validation policy defined in the config does not exist for the source."
40    )
EMPTY_STREAM = <FileBasedSourceError.EMPTY_STREAM: 'No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict.'>
GLOB_PARSE_ERROR = <FileBasedSourceError.GLOB_PARSE_ERROR: 'Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split.'>
ENCODING_ERROR = <FileBasedSourceError.ENCODING_ERROR: 'File encoding error. The configured encoding must match file encoding.'>
ERROR_CASTING_VALUE = <FileBasedSourceError.ERROR_CASTING_VALUE: 'Could not cast the value to the expected type.'>
ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = <FileBasedSourceError.ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE: 'Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string.'>
ERROR_DECODING_VALUE = <FileBasedSourceError.ERROR_DECODING_VALUE: 'Expected a JSON-decodeable value but could not decode record.'>
ERROR_LISTING_FILES = <FileBasedSourceError.ERROR_LISTING_FILES: 'Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files.'>
ERROR_READING_FILE = <FileBasedSourceError.ERROR_READING_FILE: 'Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files.'>
ERROR_PARSING_RECORD = <FileBasedSourceError.ERROR_PARSING_RECORD: "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable.">
ERROR_PARSING_USER_PROVIDED_SCHEMA = <FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA: 'The provided schema could not be transformed into valid JSON Schema.'>
ERROR_VALIDATING_RECORD = <FileBasedSourceError.ERROR_VALIDATING_RECORD: 'One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy.'>
ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = <FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS: "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time.">
ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = <FileBasedSourceError.ERROR_PARSING_RECORD_MISMATCHED_COLUMNS: "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows.">
ERROR_PARSING_RECORD_MISMATCHED_ROWS = <FileBasedSourceError.ERROR_PARSING_RECORD_MISMATCHED_ROWS: "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows.">
STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = <FileBasedSourceError.STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY: 'Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema.'>
NULL_VALUE_IN_SCHEMA = <FileBasedSourceError.NULL_VALUE_IN_SCHEMA: 'Error during schema inference: no type was detected for key.'>
UNRECOGNIZED_TYPE = <FileBasedSourceError.UNRECOGNIZED_TYPE: 'Error during schema inference: unrecognized type.'>
SCHEMA_INFERENCE_ERROR = <FileBasedSourceError.SCHEMA_INFERENCE_ERROR: 'Error inferring schema from files. Are the files valid?'>
INVALID_SCHEMA_ERROR = <FileBasedSourceError.INVALID_SCHEMA_ERROR: "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns.">
CONFIG_VALIDATION_ERROR = <FileBasedSourceError.CONFIG_VALIDATION_ERROR: 'Error creating stream config object.'>
MISSING_SCHEMA = <FileBasedSourceError.MISSING_SCHEMA: 'Expected `json_schema` in the configured catalog but it is missing.'>
UNDEFINED_PARSER = <FileBasedSourceError.UNDEFINED_PARSER: 'No parser is defined for this file type.'>
UNDEFINED_VALIDATION_POLICY = <FileBasedSourceError.UNDEFINED_VALIDATION_POLICY: 'The validation policy defined in the config does not exist for the source.'>
class FileBasedStreamConfig(pydantic.v1.main.BaseModel):
 29class FileBasedStreamConfig(BaseModel):
 30    name: str = Field(title="Name", description="The name of the stream.")
 31    globs: Optional[List[str]] = Field(
 32        default=["**"],
 33        title="Globs",
 34        description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.',
 35        order=1,
 36    )
 37    legacy_prefix: Optional[str] = Field(
 38        title="Legacy Prefix",
 39        description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.",
 40        airbyte_hidden=True,
 41    )
 42    validation_policy: ValidationPolicy = Field(
 43        title="Validation Policy",
 44        description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
 45        default=ValidationPolicy.emit_record,
 46    )
 47    input_schema: Optional[str] = Field(
 48        title="Input Schema",
 49        description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
 50    )
 51    primary_key: Optional[str] = Field(
 52        title="Primary Key",
 53        description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.",
 54        airbyte_hidden=True,  # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here.
 55    )
 56    days_to_sync_if_history_is_full: int = Field(
 57        title="Days To Sync If History Is Full",
 58        description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
 59        default=3,
 60    )
 61    format: Union[
 62        AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat
 63    ] = Field(
 64        title="Format",
 65        description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
 66    )
 67    schemaless: bool = Field(
 68        title="Schemaless",
 69        description="When enabled, syncs will not validate or structure records against the stream's schema.",
 70        default=False,
 71    )
 72    recent_n_files_to_read_for_schema_discovery: Optional[int] = Field(
 73        title="Files To Read For Schema Discover",
 74        description="The number of resent files which will be used to discover the schema for this stream.",
 75        default=None,
 76        gt=0,
 77    )
 78    use_first_found_file_for_schema_discovery: bool = Field(
 79        title="Use First Found File For Schema Discover",
 80        description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.",
 81        default=False,
 82    )
 83
 84    @validator("input_schema", pre=True)
 85    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
 86        if v:
 87            if type_mapping_to_jsonschema(v):
 88                return v
 89            else:
 90                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
 91        return None
 92
 93    @root_validator
 94    def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]:
 95        """
 96        Please update this validation when new related to schema discovery field is added.
 97        Validates schema discovery options compatibility.
 98        Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided.
 99        So this method doesn't check it to do not break already created connections.
100        If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided,
101        recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default.
102        """
103        input_schema = values["input_schema"] is not None
104        schemaless = values["schemaless"]
105        recent_n_files_to_read_for_schema_discovery = (
106            values["recent_n_files_to_read_for_schema_discovery"] is not None
107        )
108        use_first_found_file_for_schema_discovery = values[
109            "use_first_found_file_for_schema_discovery"
110        ]
111
112        if (
113            recent_n_files_to_read_for_schema_discovery
114            and use_first_found_file_for_schema_discovery
115        ) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1:
116            raise ConfigValidationError(
117                FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS
118            )
119
120        return values
121
122    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
123        """
124        User defined input_schema is defined as a string in the config. This method takes the string representation
125        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
126        """
127        if self.input_schema:
128            schema = type_mapping_to_jsonschema(self.input_schema)
129            if not schema:
130                raise ValueError(
131                    f"Unable to create JSON schema from input schema {self.input_schema}"
132                )
133            return schema
134        return None
name: str
globs: Optional[List[str]]
legacy_prefix: Optional[str]
input_schema: Optional[str]
primary_key: Optional[str]
days_to_sync_if_history_is_full: int
schemaless: bool
recent_n_files_to_read_for_schema_discovery: Optional[int]
use_first_found_file_for_schema_discovery: bool
@validator('input_schema', pre=True)
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
84    @validator("input_schema", pre=True)
85    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
86        if v:
87            if type_mapping_to_jsonschema(v):
88                return v
89            else:
90                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
91        return None
def get_input_schema(self) -> Optional[Mapping[str, Any]]:
122    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
123        """
124        User defined input_schema is defined as a string in the config. This method takes the string representation
125        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
126        """
127        if self.input_schema:
128            schema = type_mapping_to_jsonschema(self.input_schema)
129            if not schema:
130                raise ValueError(
131                    f"Unable to create JSON schema from input schema {self.input_schema}"
132                )
133            return schema
134        return None

User defined input_schema is defined as a string in the config. This method takes the string representation and converts it into a Mapping[str, Any] which is used by file-based CDK components.

class FileReadMode(enum.Enum):
30class FileReadMode(Enum):
31    READ = "r"
32    READ_BINARY = "rb"
READ = <FileReadMode.READ: 'r'>
READ_BINARY = <FileReadMode.READ_BINARY: 'rb'>
class JsonlFormat(pydantic.v1.main.BaseModel):
11class JsonlFormat(BaseModel):
12    class Config(OneOfOptionConfig):
13        title = "Jsonl Format"
14        discriminator = "filetype"
15
16    filetype: str = Field(
17        "jsonl",
18        const=True,
19    )
filetype: str
class JsonlFormat.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
12    class Config(OneOfOptionConfig):
13        title = "Jsonl Format"
14        discriminator = "filetype"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Jsonl Format'
discriminator = 'filetype'
class RemoteFile(pydantic.v1.main.BaseModel):
12class RemoteFile(BaseModel):
13    """
14    A file in a file-based stream.
15    """
16
17    uri: str
18    last_modified: datetime
19    mime_type: Optional[str] = None

A file in a file-based stream.

uri: str
last_modified: datetime.datetime
mime_type: Optional[str]