airbyte_cdk.sources.file_based

 1from .config.abstract_file_based_spec import AbstractFileBasedSpec
 2from .config.csv_format import CsvFormat
 3from .config.file_based_stream_config import FileBasedStreamConfig
 4from .config.jsonl_format import JsonlFormat
 5from .exceptions import CustomFileBasedException, ErrorListingFiles, FileBasedSourceError
 6from .file_based_source import DEFAULT_CONCURRENCY, FileBasedSource
 7from .file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
 8from .remote_file import RemoteFile
 9from .stream.cursor import DefaultFileBasedCursor
10
11__all__ = [
12    "AbstractFileBasedSpec",
13    "AbstractFileBasedStreamReader",
14    "CsvFormat",
15    "CustomFileBasedException",
16    "DefaultFileBasedCursor",
17    "ErrorListingFiles",
18    "FileBasedSource",
19    "FileBasedSourceError",
20    "FileBasedStreamConfig",
21    "FileReadMode",
22    "JsonlFormat",
23    "RemoteFile",
24]
class AbstractFileBasedSpec(pydantic.v1.main.BaseModel):
 48class AbstractFileBasedSpec(BaseModel):
 49    """
 50    Used during spec; allows the developer to configure the cloud provider specific options
 51    that are needed when users configure a file-based source.
 52    """
 53
 54    start_date: Optional[str] = Field(
 55        title="Start Date",
 56        description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.",
 57        examples=["2021-01-01T00:00:00.000000Z"],
 58        format="date-time",
 59        pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$",
 60        pattern_descriptor="YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
 61        order=1,
 62    )
 63
 64    streams: List[FileBasedStreamConfig] = Field(
 65        title="The list of streams to sync",
 66        description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
 67        order=10,
 68    )
 69
 70    delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field(
 71        title="Delivery Method",
 72        discriminator="delivery_type",
 73        type="object",
 74        order=7,
 75        display_type="radio",
 76        group="advanced",
 77        default="use_records_transfer",
 78        airbyte_hidden=True,
 79    )
 80
 81    @classmethod
 82    @abstractmethod
 83    def documentation_url(cls) -> AnyUrl:
 84        """
 85        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
 86        """
 87
 88    @classmethod
 89    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
 90        """
 91        Generates the mapping comprised of the config fields
 92        """
 93        schema = super().schema(*args, **kwargs)
 94        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
 95        schema_helpers.expand_refs(transformed_schema)
 96        cls.replace_enum_allOf_and_anyOf(transformed_schema)
 97        cls.remove_discriminator(transformed_schema)
 98
 99        return transformed_schema
100
101    @staticmethod
102    def remove_discriminator(schema: Dict[str, Any]) -> None:
103        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
104        dpath.delete(schema, "properties/**/discriminator")
105
106    @staticmethod
107    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
108        """
109        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
110        Unpacks the enums under allOf and moves them up a level under the enum key
111        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
112        additional validation that an incoming config only matches exactly one of a field's types.
113        """
114        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
115        objects_to_check["type"] = "object"
116        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
117        for format in objects_to_check["oneOf"]:
118            for key in format["properties"]:
119                object_property = format["properties"][key]
120                AbstractFileBasedSpec.move_enum_to_root(object_property)
121
122        properties_to_change = ["validation_policy"]
123        for property_to_change in properties_to_change:
124            property_object = schema["properties"]["streams"]["items"]["properties"][
125                property_to_change
126            ]
127            if "anyOf" in property_object:
128                schema["properties"]["streams"]["items"]["properties"][property_to_change][
129                    "type"
130                ] = "object"
131                schema["properties"]["streams"]["items"]["properties"][property_to_change][
132                    "oneOf"
133                ] = property_object.pop("anyOf")
134            AbstractFileBasedSpec.move_enum_to_root(property_object)
135
136        csv_format_schemas = list(
137            filter(
138                lambda format: format["properties"]["filetype"]["default"] == "csv",
139                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
140            )
141        )
142        if len(csv_format_schemas) != 1:
143            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
144        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
145            "properties"
146        ]["header_definition"].pop("anyOf", [])
147        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
148        return schema
149
150    @staticmethod
151    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
152        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
153            object_property["enum"] = object_property["allOf"][0]["enum"]
154            object_property.pop("allOf")

Used during spec; allows the developer to configure the cloud provider specific options that are needed when users configure a file-based source.

start_date: Optional[str]
streams: List[FileBasedStreamConfig]
@classmethod
@abstractmethod
def documentation_url(cls) -> pydantic.v1.networks.AnyUrl:
81    @classmethod
82    @abstractmethod
83    def documentation_url(cls) -> AnyUrl:
84        """
85        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
86        """
Returns

link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"

@classmethod
def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
88    @classmethod
89    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
90        """
91        Generates the mapping comprised of the config fields
92        """
93        schema = super().schema(*args, **kwargs)
94        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
95        schema_helpers.expand_refs(transformed_schema)
96        cls.replace_enum_allOf_and_anyOf(transformed_schema)
97        cls.remove_discriminator(transformed_schema)
98
99        return transformed_schema

Generates the mapping comprised of the config fields

@staticmethod
def remove_discriminator(schema: Dict[str, Any]) -> None:
101    @staticmethod
102    def remove_discriminator(schema: Dict[str, Any]) -> None:
103        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
104        dpath.delete(schema, "properties/**/discriminator")

pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references

@staticmethod
def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
106    @staticmethod
107    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
108        """
109        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
110        Unpacks the enums under allOf and moves them up a level under the enum key
111        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
112        additional validation that an incoming config only matches exactly one of a field's types.
113        """
114        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
115        objects_to_check["type"] = "object"
116        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
117        for format in objects_to_check["oneOf"]:
118            for key in format["properties"]:
119                object_property = format["properties"][key]
120                AbstractFileBasedSpec.move_enum_to_root(object_property)
121
122        properties_to_change = ["validation_policy"]
123        for property_to_change in properties_to_change:
124            property_object = schema["properties"]["streams"]["items"]["properties"][
125                property_to_change
126            ]
127            if "anyOf" in property_object:
128                schema["properties"]["streams"]["items"]["properties"][property_to_change][
129                    "type"
130                ] = "object"
131                schema["properties"]["streams"]["items"]["properties"][property_to_change][
132                    "oneOf"
133                ] = property_object.pop("anyOf")
134            AbstractFileBasedSpec.move_enum_to_root(property_object)
135
136        csv_format_schemas = list(
137            filter(
138                lambda format: format["properties"]["filetype"]["default"] == "csv",
139                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
140            )
141        )
142        if len(csv_format_schemas) != 1:
143            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
144        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
145            "properties"
146        ]["header_definition"].pop("anyOf", [])
147        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
148        return schema

allOfs are not supported by the UI, but pydantic is automatically writing them for enums. Unpacks the enums under allOf and moves them up a level under the enum key anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the additional validation that an incoming config only matches exactly one of a field's types.

@staticmethod
def move_enum_to_root(object_property: Dict[str, Any]) -> None:
150    @staticmethod
151    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
152        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
153            object_property["enum"] = object_property["allOf"][0]["enum"]
154            object_property.pop("allOf")
class AbstractFileBasedStreamReader(abc.ABC):
 30class AbstractFileBasedStreamReader(ABC):
 31    DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
 32
 33    def __init__(self) -> None:
 34        self._config = None
 35
 36    @property
 37    def config(self) -> Optional[AbstractFileBasedSpec]:
 38        return self._config
 39
 40    @config.setter
 41    @abstractmethod
 42    def config(self, value: AbstractFileBasedSpec) -> None:
 43        """
 44        FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.
 45
 46        Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader
 47        will require keys that (for example) allow it to authenticate with the 3rd party.
 48
 49        Therefore, concrete implementations of AbstractFileBasedStreamReader's config setter should assert that `value` is of the correct
 50        config type for that type of StreamReader.
 51        """
 52        ...
 53
 54    @abstractmethod
 55    def open_file(
 56        self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger
 57    ) -> IOBase:
 58        """
 59        Return a file handle for reading.
 60
 61        Many sources will be able to use smart_open to implement this method,
 62        for example:
 63
 64        client = boto3.Session(...)
 65        return smart_open.open(remote_file.uri, transport_params={"client": client})
 66        """
 67        ...
 68
 69    @abstractmethod
 70    def get_matching_files(
 71        self,
 72        globs: List[str],
 73        prefix: Optional[str],
 74        logger: logging.Logger,
 75    ) -> Iterable[RemoteFile]:
 76        """
 77        Return all files that match any of the globs.
 78
 79        Example:
 80
 81        The source has files "a.json", "foo/a.json", "foo/bar/a.json"
 82
 83        If globs = ["*.json"] then this method returns ["a.json"].
 84
 85        If globs = ["foo/*.json"] then this method returns ["foo/a.json"].
 86
 87        Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs`
 88        are available, which may be helpful when implementing this method.
 89        """
 90        ...
 91
 92    def filter_files_by_globs_and_start_date(
 93        self, files: List[RemoteFile], globs: List[str]
 94    ) -> Iterable[RemoteFile]:
 95        """
 96        Utility method for filtering files based on globs.
 97        """
 98        start_date = (
 99            datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT)
100            if self.config and self.config.start_date
101            else None
102        )
103        seen = set()
104
105        for file in files:
106            if self.file_matches_globs(file, globs):
107                if file.uri not in seen and (not start_date or file.last_modified >= start_date):
108                    seen.add(file.uri)
109                    yield file
110
111    @abstractmethod
112    def file_size(self, file: RemoteFile) -> int:
113        """Utility method to get size of the remote file.
114
115        This is required for connectors that will support writing to
116        files. If the connector does not support writing files, then the
117        subclass can simply `return 0`.
118        """
119        ...
120
121    @staticmethod
122    def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool:
123        # Use the GLOBSTAR flag to enable recursive ** matching
124        # (https://facelessuser.github.io/wcmatch/wcmatch/#globstar)
125        return any(globmatch(file.uri, g, flags=GLOBSTAR) for g in globs)
126
127    @staticmethod
128    def get_prefixes_from_globs(globs: List[str]) -> Set[str]:
129        """
130        Utility method for extracting prefixes from the globs.
131        """
132        prefixes = {glob.split("*")[0] for glob in globs}
133        return set(filter(lambda x: bool(x), prefixes))
134
135    def use_file_transfer(self) -> bool:
136        if self.config:
137            return use_file_transfer(self.config)
138        return False
139
140    def preserve_directory_structure(self) -> bool:
141        # fall back to preserve subdirectories if config is not present or incomplete
142        if self.config:
143            return preserve_directory_structure(self.config)
144        return True
145
146    def include_identities_stream(self) -> bool:
147        if self.config:
148            return include_identities_stream(self.config)
149        return False
150
151    @abstractmethod
152    def get_file(
153        self, file: RemoteFile, local_directory: str, logger: logging.Logger
154    ) -> Dict[str, Any]:
155        """
156        This is required for connectors that will support writing to
157        files. It will handle the logic to download,get,read,acquire or
158        whatever is more efficient to get a file from the source.
159
160        Args:
161               file (RemoteFile): The remote file object containing URI and metadata.
162               local_directory (str): The local directory path where the file will be downloaded.
163               logger (logging.Logger): Logger for logging information and errors.
164
165           Returns:
166               dict: A dictionary containing the following:
167                   - "file_url" (str): The absolute path of the downloaded file.
168                   - "bytes" (int): The file size in bytes.
169                   - "file_relative_path" (str): The relative path of the file for local storage. Is relative to local_directory as
170                   this a mounted volume in the pod container.
171
172        """
173        ...
174
175    def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> List[str]:
176        preserve_directory_structure = self.preserve_directory_structure()
177        if preserve_directory_structure:
178            # Remove left slashes from source path format to make relative path for writing locally
179            file_relative_path = file.uri.lstrip("/")
180        else:
181            file_relative_path = path.basename(file.uri)
182        local_file_path = path.join(local_directory, file_relative_path)
183
184        # Ensure the local directory exists
185        makedirs(path.dirname(local_file_path), exist_ok=True)
186        absolute_file_path = path.abspath(local_file_path)
187        return [file_relative_path, local_file_path, absolute_file_path]

Helper class that provides a standard way to create an ABC using inheritance.

DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
config: Optional[AbstractFileBasedSpec]
36    @property
37    def config(self) -> Optional[AbstractFileBasedSpec]:
38        return self._config

FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.

Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader will require keys that (for example) allow it to authenticate with the 3rd party.

Therefore, concrete implementations of AbstractFileBasedStreamReader's config setter should assert that value is of the correct config type for that type of StreamReader.

@abstractmethod
def open_file( self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger) -> io.IOBase:
54    @abstractmethod
55    def open_file(
56        self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger
57    ) -> IOBase:
58        """
59        Return a file handle for reading.
60
61        Many sources will be able to use smart_open to implement this method,
62        for example:
63
64        client = boto3.Session(...)
65        return smart_open.open(remote_file.uri, transport_params={"client": client})
66        """
67        ...

Return a file handle for reading.

Many sources will be able to use smart_open to implement this method, for example:

client = boto3.Session(...) return smart_open.open(remote_file.uri, transport_params={"client": client})

@abstractmethod
def get_matching_files( self, globs: List[str], prefix: Optional[str], logger: logging.Logger) -> Iterable[RemoteFile]:
69    @abstractmethod
70    def get_matching_files(
71        self,
72        globs: List[str],
73        prefix: Optional[str],
74        logger: logging.Logger,
75    ) -> Iterable[RemoteFile]:
76        """
77        Return all files that match any of the globs.
78
79        Example:
80
81        The source has files "a.json", "foo/a.json", "foo/bar/a.json"
82
83        If globs = ["*.json"] then this method returns ["a.json"].
84
85        If globs = ["foo/*.json"] then this method returns ["foo/a.json"].
86
87        Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs`
88        are available, which may be helpful when implementing this method.
89        """
90        ...

Return all files that match any of the globs.

Example:

The source has files "a.json", "foo/a.json", "foo/bar/a.json"

If globs = ["*.json"] then this method returns ["a.json"].

If globs = ["foo/*.json"] then this method returns ["foo/a.json"].

Utility method self.filter_files_by_globs and self.get_prefixes_from_globs are available, which may be helpful when implementing this method.

def filter_files_by_globs_and_start_date( self, files: List[RemoteFile], globs: List[str]) -> Iterable[RemoteFile]:
 92    def filter_files_by_globs_and_start_date(
 93        self, files: List[RemoteFile], globs: List[str]
 94    ) -> Iterable[RemoteFile]:
 95        """
 96        Utility method for filtering files based on globs.
 97        """
 98        start_date = (
 99            datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT)
100            if self.config and self.config.start_date
101            else None
102        )
103        seen = set()
104
105        for file in files:
106            if self.file_matches_globs(file, globs):
107                if file.uri not in seen and (not start_date or file.last_modified >= start_date):
108                    seen.add(file.uri)
109                    yield file

Utility method for filtering files based on globs.

@abstractmethod
def file_size(self, file: RemoteFile) -> int:
111    @abstractmethod
112    def file_size(self, file: RemoteFile) -> int:
113        """Utility method to get size of the remote file.
114
115        This is required for connectors that will support writing to
116        files. If the connector does not support writing files, then the
117        subclass can simply `return 0`.
118        """
119        ...

Utility method to get size of the remote file.

This is required for connectors that will support writing to files. If the connector does not support writing files, then the subclass can simply return 0.

@staticmethod
def file_matches_globs( file: RemoteFile, globs: List[str]) -> bool:
121    @staticmethod
122    def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool:
123        # Use the GLOBSTAR flag to enable recursive ** matching
124        # (https://facelessuser.github.io/wcmatch/wcmatch/#globstar)
125        return any(globmatch(file.uri, g, flags=GLOBSTAR) for g in globs)
@staticmethod
def get_prefixes_from_globs(globs: List[str]) -> Set[str]:
127    @staticmethod
128    def get_prefixes_from_globs(globs: List[str]) -> Set[str]:
129        """
130        Utility method for extracting prefixes from the globs.
131        """
132        prefixes = {glob.split("*")[0] for glob in globs}
133        return set(filter(lambda x: bool(x), prefixes))

Utility method for extracting prefixes from the globs.

def use_file_transfer(self) -> bool:
135    def use_file_transfer(self) -> bool:
136        if self.config:
137            return use_file_transfer(self.config)
138        return False
def preserve_directory_structure(self) -> bool:
140    def preserve_directory_structure(self) -> bool:
141        # fall back to preserve subdirectories if config is not present or incomplete
142        if self.config:
143            return preserve_directory_structure(self.config)
144        return True
def include_identities_stream(self) -> bool:
146    def include_identities_stream(self) -> bool:
147        if self.config:
148            return include_identities_stream(self.config)
149        return False
@abstractmethod
def get_file( self, file: RemoteFile, local_directory: str, logger: logging.Logger) -> Dict[str, Any]:
151    @abstractmethod
152    def get_file(
153        self, file: RemoteFile, local_directory: str, logger: logging.Logger
154    ) -> Dict[str, Any]:
155        """
156        This is required for connectors that will support writing to
157        files. It will handle the logic to download,get,read,acquire or
158        whatever is more efficient to get a file from the source.
159
160        Args:
161               file (RemoteFile): The remote file object containing URI and metadata.
162               local_directory (str): The local directory path where the file will be downloaded.
163               logger (logging.Logger): Logger for logging information and errors.
164
165           Returns:
166               dict: A dictionary containing the following:
167                   - "file_url" (str): The absolute path of the downloaded file.
168                   - "bytes" (int): The file size in bytes.
169                   - "file_relative_path" (str): The relative path of the file for local storage. Is relative to local_directory as
170                   this a mounted volume in the pod container.
171
172        """
173        ...

This is required for connectors that will support writing to files. It will handle the logic to download,get,read,acquire or whatever is more efficient to get a file from the source.

Arguments:
  • file (RemoteFile): The remote file object containing URI and metadata. local_directory (str): The local directory path where the file will be downloaded. logger (logging.Logger): Logger for logging information and errors.
  • Returns: dict: A dictionary containing the following:
    • "file_url" (str): The absolute path of the downloaded file.
    • "bytes" (int): The file size in bytes.
    • "file_relative_path" (str): The relative path of the file for local storage. Is relative to local_directory as this a mounted volume in the pod container.
class CsvFormat(pydantic.v1.main.BaseModel):
 85class CsvFormat(BaseModel):
 86    class Config(OneOfOptionConfig):
 87        title = "CSV Format"
 88        discriminator = "filetype"
 89
 90    filetype: str = Field(
 91        "csv",
 92        const=True,
 93    )
 94    delimiter: str = Field(
 95        title="Delimiter",
 96        description="The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
 97        default=",",
 98    )
 99    quote_char: str = Field(
100        title="Quote Character",
101        default='"',
102        description="The character used for quoting CSV values. To disallow quoting, make this field blank.",
103    )
104    escape_char: Optional[str] = Field(
105        title="Escape Character",
106        default=None,
107        description="The character used for escaping special characters. To disallow escaping, leave this field blank.",
108    )
109    encoding: Optional[str] = Field(
110        default="utf8",
111        description='The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href="https://docs.python.org/3/library/codecs.html#standard-encodings" target="_blank">list of python encodings</a> for allowable options.',
112    )
113    double_quote: bool = Field(
114        title="Double Quote",
115        default=True,
116        description="Whether two quotes in a quoted CSV value denote a single quote in the data.",
117    )
118    null_values: Set[str] = Field(
119        title="Null Values",
120        default=[],
121        description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
122    )
123    strings_can_be_null: bool = Field(
124        title="Strings Can Be Null",
125        default=True,
126        description="Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.",
127    )
128    skip_rows_before_header: int = Field(
129        title="Skip Rows Before Header",
130        default=0,
131        description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
132    )
133    skip_rows_after_header: int = Field(
134        title="Skip Rows After Header",
135        default=0,
136        description="The number of rows to skip after the header row.",
137    )
138    header_definition: Union[CsvHeaderFromCsv, CsvHeaderAutogenerated, CsvHeaderUserProvided] = (
139        Field(
140            title="CSV Header Definition",
141            default=CsvHeaderFromCsv(header_definition_type=CsvHeaderDefinitionType.FROM_CSV.value),
142            description="How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.",
143        )
144    )
145    true_values: Set[str] = Field(
146        title="True Values",
147        default=DEFAULT_TRUE_VALUES,
148        description="A set of case-sensitive strings that should be interpreted as true values.",
149    )
150    false_values: Set[str] = Field(
151        title="False Values",
152        default=DEFAULT_FALSE_VALUES,
153        description="A set of case-sensitive strings that should be interpreted as false values.",
154    )
155    inference_type: InferenceType = Field(
156        title="Inference Type",
157        default=InferenceType.NONE,
158        description="How to infer the types of the columns. If none, inference default to strings.",
159        airbyte_hidden=True,
160    )
161    ignore_errors_on_fields_mismatch: bool = Field(
162        title="Ignore errors on field mismatch",
163        default=False,
164        description="Whether to ignore errors that occur when the number of fields in the CSV does not match the number of columns in the schema.",
165    )
166
167    @validator("delimiter")
168    def validate_delimiter(cls, v: str) -> str:
169        if v == r"\t":
170            v = "\t"
171        if len(v) != 1:
172            raise ValueError("delimiter should only be one character")
173        if v in {"\r", "\n"}:
174            raise ValueError(f"delimiter cannot be {v}")
175        return v
176
177    @validator("quote_char")
178    def validate_quote_char(cls, v: str) -> str:
179        if len(v) != 1:
180            raise ValueError("quote_char should only be one character")
181        return v
182
183    @validator("escape_char")
184    def validate_escape_char(cls, v: str) -> str:
185        if v is not None and len(v) != 1:
186            raise ValueError("escape_char should only be one character")
187        return v
188
189    @validator("encoding")
190    def validate_encoding(cls, v: str) -> str:
191        try:
192            codecs.lookup(v)
193        except LookupError:
194            raise ValueError(f"invalid encoding format: {v}")
195        return v
196
197    @root_validator
198    def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
199        definition_type = values.get("header_definition_type")
200        column_names = values.get("user_provided_column_names")
201        if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names:
202            raise ValidationError(
203                "`user_provided_column_names` should be defined if the definition 'User Provided'.",
204                model=CsvFormat,
205            )
206        if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names:
207            raise ValidationError(
208                "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.",
209                model=CsvFormat,
210            )
211        return values
filetype: str
delimiter: str
quote_char: str
escape_char: Optional[str]
encoding: Optional[str]
double_quote: bool
null_values: Set[str]
strings_can_be_null: bool
skip_rows_before_header: int
skip_rows_after_header: int
true_values: Set[str]
false_values: Set[str]
ignore_errors_on_fields_mismatch: bool
@validator('delimiter')
def validate_delimiter(cls, v: str) -> str:
167    @validator("delimiter")
168    def validate_delimiter(cls, v: str) -> str:
169        if v == r"\t":
170            v = "\t"
171        if len(v) != 1:
172            raise ValueError("delimiter should only be one character")
173        if v in {"\r", "\n"}:
174            raise ValueError(f"delimiter cannot be {v}")
175        return v
@validator('quote_char')
def validate_quote_char(cls, v: str) -> str:
177    @validator("quote_char")
178    def validate_quote_char(cls, v: str) -> str:
179        if len(v) != 1:
180            raise ValueError("quote_char should only be one character")
181        return v
@validator('escape_char')
def validate_escape_char(cls, v: str) -> str:
183    @validator("escape_char")
184    def validate_escape_char(cls, v: str) -> str:
185        if v is not None and len(v) != 1:
186            raise ValueError("escape_char should only be one character")
187        return v
@validator('encoding')
def validate_encoding(cls, v: str) -> str:
189    @validator("encoding")
190    def validate_encoding(cls, v: str) -> str:
191        try:
192            codecs.lookup(v)
193        except LookupError:
194            raise ValueError(f"invalid encoding format: {v}")
195        return v
@root_validator
def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
197    @root_validator
198    def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
199        definition_type = values.get("header_definition_type")
200        column_names = values.get("user_provided_column_names")
201        if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names:
202            raise ValidationError(
203                "`user_provided_column_names` should be defined if the definition 'User Provided'.",
204                model=CsvFormat,
205            )
206        if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names:
207            raise ValidationError(
208                "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.",
209                model=CsvFormat,
210            )
211        return values
class CsvFormat.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
86    class Config(OneOfOptionConfig):
87        title = "CSV Format"
88        discriminator = "filetype"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'CSV Format'
discriminator = 'filetype'
class CustomFileBasedException(airbyte_cdk.utils.traced_exception.AirbyteTracedException):
149class CustomFileBasedException(AirbyteTracedException):
150    """
151    A specialized exception for file-based connectors.
152
153    This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
154    """
155
156    pass

A specialized exception for file-based connectors.

This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.

 18class DefaultFileBasedCursor(AbstractFileBasedCursor):
 19    DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
 20    DEFAULT_MAX_HISTORY_SIZE = 10_000
 21    DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
 22    CURSOR_FIELD = "_ab_source_file_last_modified"
 23
 24    def __init__(self, stream_config: FileBasedStreamConfig, **_: Any):
 25        super().__init__(stream_config)  # type: ignore [safe-super]
 26        self._file_to_datetime_history: MutableMapping[str, str] = {}
 27        self._time_window_if_history_is_full = timedelta(
 28            days=stream_config.days_to_sync_if_history_is_full
 29            or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
 30        )
 31
 32        if self._time_window_if_history_is_full <= timedelta():
 33            raise ValueError(
 34                f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}"
 35            )
 36
 37        self._start_time = self._compute_start_time()
 38        self._initial_earliest_file_in_history: Optional[RemoteFile] = None
 39
 40    def set_initial_state(self, value: StreamState) -> None:
 41        self._file_to_datetime_history = value.get("history", {})
 42        self._start_time = self._compute_start_time()
 43        self._initial_earliest_file_in_history = self._compute_earliest_file_in_history()
 44
 45    def add_file(self, file: RemoteFile) -> None:
 46        self._file_to_datetime_history[file.uri] = file.last_modified.strftime(
 47            self.DATE_TIME_FORMAT
 48        )
 49        if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
 50            # Get the earliest file based on its last modified date and its uri
 51            oldest_file = self._compute_earliest_file_in_history()
 52            if oldest_file:
 53                del self._file_to_datetime_history[oldest_file.uri]
 54            else:
 55                raise Exception(
 56                    "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
 57                )
 58
 59    def get_state(self) -> StreamState:
 60        state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()}
 61        return state
 62
 63    def _get_cursor(self) -> Optional[str]:
 64        """
 65        Returns the cursor value.
 66
 67        Files are synced in order of last-modified with secondary sort on filename, so the cursor value is
 68        a string joining the last-modified timestamp of the last synced file and the name of the file.
 69        """
 70        if self._file_to_datetime_history.items():
 71            filename, timestamp = max(
 72                self._file_to_datetime_history.items(), key=lambda x: (x[1], x[0])
 73            )
 74            return f"{timestamp}_{filename}"
 75        return None
 76
 77    def _is_history_full(self) -> bool:
 78        """
 79        Returns true if the state's history is full, meaning new entries will start to replace old entries.
 80        """
 81        return len(self._file_to_datetime_history) >= self.DEFAULT_MAX_HISTORY_SIZE
 82
 83    def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool:
 84        if file.uri in self._file_to_datetime_history:
 85            # If the file's uri is in the history, we should sync the file if it has been modified since it was synced
 86            updated_at_from_history = datetime.strptime(
 87                self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT
 88            )
 89            if file.last_modified < updated_at_from_history:
 90                logger.warning(
 91                    f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file."
 92                )
 93            else:
 94                return file.last_modified > updated_at_from_history
 95            return file.last_modified > updated_at_from_history
 96        if self._is_history_full():
 97            if self._initial_earliest_file_in_history is None:
 98                return True
 99            if file.last_modified > self._initial_earliest_file_in_history.last_modified:
100                # If the history is partial and the file's datetime is strictly greater than the earliest file in the history,
101                # we should sync it
102                return True
103            elif file.last_modified == self._initial_earliest_file_in_history.last_modified:
104                # If the history is partial and the file's datetime is equal to the earliest file in the history,
105                # we should sync it if its uri is strictly greater than the earliest file in the history
106                return file.uri > self._initial_earliest_file_in_history.uri
107            else:
108                # Otherwise, only sync the file if it has been modified since the start of the time window
109                return file.last_modified >= self.get_start_time()
110        else:
111            # The file is not in the history and the history is complete. We know we need to sync the file
112            return True
113
114    def get_files_to_sync(
115        self, all_files: Iterable[RemoteFile], logger: logging.Logger
116    ) -> Iterable[RemoteFile]:
117        if self._is_history_full():
118            logger.warning(
119                f"The state history is full. "
120                f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
121                f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
122            )
123        for f in all_files:
124            if self._should_sync_file(f, logger):
125                yield f
126
127    def get_start_time(self) -> datetime:
128        return self._start_time
129
130    def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:
131        if self._file_to_datetime_history:
132            filename, last_modified = min(
133                self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0])
134            )
135            return RemoteFile(
136                uri=filename, last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT)
137            )
138        else:
139            return None
140
141    def _compute_start_time(self) -> datetime:
142        if not self._file_to_datetime_history:
143            return datetime.min
144        else:
145            earliest = min(self._file_to_datetime_history.values())
146            earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT)
147            if self._is_history_full():
148                time_window = datetime.now() - self._time_window_if_history_is_full
149                earliest_dt = min(earliest_dt, time_window)
150            return earliest_dt

Abstract base class for cursors used by file-based streams.

DefaultFileBasedCursor( stream_config: FileBasedStreamConfig, **_: Any)
24    def __init__(self, stream_config: FileBasedStreamConfig, **_: Any):
25        super().__init__(stream_config)  # type: ignore [safe-super]
26        self._file_to_datetime_history: MutableMapping[str, str] = {}
27        self._time_window_if_history_is_full = timedelta(
28            days=stream_config.days_to_sync_if_history_is_full
29            or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
30        )
31
32        if self._time_window_if_history_is_full <= timedelta():
33            raise ValueError(
34                f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}"
35            )
36
37        self._start_time = self._compute_start_time()
38        self._initial_earliest_file_in_history: Optional[RemoteFile] = None

Common interface for all cursors.

DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
DEFAULT_MAX_HISTORY_SIZE = 10000
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
CURSOR_FIELD = '_ab_source_file_last_modified'
def set_initial_state(self, value: MutableMapping[str, Any]) -> None:
40    def set_initial_state(self, value: StreamState) -> None:
41        self._file_to_datetime_history = value.get("history", {})
42        self._start_time = self._compute_start_time()
43        self._initial_earliest_file_in_history = self._compute_earliest_file_in_history()

Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.

Parameters
  • value: The stream state
def add_file( self, file: RemoteFile) -> None:
45    def add_file(self, file: RemoteFile) -> None:
46        self._file_to_datetime_history[file.uri] = file.last_modified.strftime(
47            self.DATE_TIME_FORMAT
48        )
49        if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
50            # Get the earliest file based on its last modified date and its uri
51            oldest_file = self._compute_earliest_file_in_history()
52            if oldest_file:
53                del self._file_to_datetime_history[oldest_file.uri]
54            else:
55                raise Exception(
56                    "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
57                )

Add a file to the cursor. This method is called when a file is processed by the stream.

Parameters
  • file: The file to add
def get_state(self) -> MutableMapping[str, Any]:
59    def get_state(self) -> StreamState:
60        state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()}
61        return state

Get the state of the cursor.

def get_files_to_sync( self, all_files: Iterable[RemoteFile], logger: logging.Logger) -> Iterable[RemoteFile]:
114    def get_files_to_sync(
115        self, all_files: Iterable[RemoteFile], logger: logging.Logger
116    ) -> Iterable[RemoteFile]:
117        if self._is_history_full():
118            logger.warning(
119                f"The state history is full. "
120                f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
121                f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
122            )
123        for f in all_files:
124            if self._should_sync_file(f, logger):
125                yield f

Given the list of files in the source, return the files that should be synced.

Parameters
  • all_files: All files in the source
  • logger:
Returns

The files that should be synced

def get_start_time(self) -> datetime.datetime:
127    def get_start_time(self) -> datetime:
128        return self._start_time

Returns the start time of the current sync.

111class ErrorListingFiles(BaseFileBasedSourceError):
112    pass

Common base class for all non-exit exceptions.

 87class FileBasedSource(ConcurrentSourceAdapter, ABC):
 88    # We make each source override the concurrency level to give control over when they are upgraded.
 89    _concurrency_level = None
 90
 91    def __init__(
 92        self,
 93        stream_reader: AbstractFileBasedStreamReader,
 94        spec_class: Type[AbstractFileBasedSpec],
 95        catalog: Optional[ConfiguredAirbyteCatalog],
 96        config: Optional[Mapping[str, Any]],
 97        state: Optional[List[AirbyteStateMessage]],
 98        availability_strategy: Optional[AbstractFileBasedAvailabilityStrategy] = None,
 99        discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(),
100        parsers: Mapping[Type[Any], FileTypeParser] = default_parsers,
101        validation_policies: Mapping[
102            ValidationPolicy, AbstractSchemaValidationPolicy
103        ] = DEFAULT_SCHEMA_VALIDATION_POLICIES,
104        cursor_cls: Type[
105            Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor]
106        ] = FileBasedConcurrentCursor,
107        stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None,
108    ):
109        self.stream_reader = stream_reader
110        self.stream_permissions_reader = stream_permissions_reader
111        self.spec_class = spec_class
112        self.config = config
113        self.catalog = catalog
114        self.state = state
115        self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy(
116            stream_reader
117        )
118        self.discovery_policy = discovery_policy
119        self.parsers = parsers
120        self.validation_policies = validation_policies
121        self.stream_schemas = (
122            {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {}
123        )
124        self.cursor_cls = cursor_cls
125        self.logger = init_logger(f"airbyte.{self.name}")
126        self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector()
127        self._message_repository: Optional[MessageRepository] = None
128        concurrent_source = ConcurrentSource.create(
129            MAX_CONCURRENCY,
130            INITIAL_N_PARTITIONS,
131            self.logger,
132            self._slice_logger,
133            self.message_repository,
134        )
135        self._state = None
136        super().__init__(concurrent_source)
137
138    @property
139    def message_repository(self) -> MessageRepository:
140        if self._message_repository is None:
141            self._message_repository = InMemoryMessageRepository(
142                Level(AirbyteLogFormatter.level_mapping[self.logger.level])
143            )
144        return self._message_repository
145
146    def check_connection(
147        self, logger: logging.Logger, config: Mapping[str, Any]
148    ) -> Tuple[bool, Optional[Any]]:
149        """
150        Check that the source can be accessed using the user-provided configuration.
151
152        For each stream, verify that we can list and read files.
153
154        Returns (True, None) if the connection check is successful.
155
156        Otherwise, the "error" object should describe what went wrong.
157        """
158        try:
159            streams = self.streams(config)
160        except Exception as config_exception:
161            raise AirbyteTracedException(
162                internal_message="Please check the logged errors for more information.",
163                message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
164                exception=AirbyteTracedException(exception=config_exception),
165                failure_type=FailureType.config_error,
166            )
167        if len(streams) == 0:
168            return (
169                False,
170                f"No streams are available for source {self.name}. This is probably an issue with the connector. Please verify that your "
171                f"configuration provides permissions to list and read files from the source. Contact support if you are unable to "
172                f"resolve this issue.",
173            )
174
175        errors = []
176        tracebacks = []
177        for stream in streams:
178            if isinstance(stream, FileIdentitiesStream):
179                identity = next(iter(stream.load_identity_groups()))
180                if not identity:
181                    errors.append(
182                        "Unable to get identities for current configuration, please check your credentials"
183                    )
184                continue
185            if not isinstance(stream, AbstractFileBasedStream):
186                raise ValueError(f"Stream {stream} is not a file-based stream.")
187            try:
188                parsed_config = self._get_parsed_config(config)
189                availability_method = (
190                    stream.availability_strategy.check_availability
191                    if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config)
192                    else stream.availability_strategy.check_availability_and_parsability
193                )
194                (
195                    stream_is_available,
196                    reason,
197                ) = availability_method(stream, logger, self)
198            except AirbyteTracedException as ate:
199                errors.append(f"Unable to connect to stream {stream.name} - {ate.message}")
200                tracebacks.append(traceback.format_exc())
201            except Exception:
202                errors.append(f"Unable to connect to stream {stream.name}")
203                tracebacks.append(traceback.format_exc())
204            else:
205                if not stream_is_available and reason:
206                    errors.append(reason)
207
208        if len(errors) == 1 and len(tracebacks) == 1:
209            raise AirbyteTracedException(
210                internal_message=tracebacks[0],
211                message=f"{errors[0]}",
212                failure_type=FailureType.config_error,
213            )
214        if len(errors) == 1 and len(tracebacks) == 0:
215            raise AirbyteTracedException(
216                message=f"{errors[0]}",
217                failure_type=FailureType.config_error,
218            )
219        elif len(errors) > 1:
220            raise AirbyteTracedException(
221                internal_message="\n".join(tracebacks),
222                message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}",
223                failure_type=FailureType.config_error,
224            )
225
226        return not bool(errors), (errors or None)
227
228    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
229        """
230        Return a list of this source's streams.
231        """
232
233        if self.catalog:
234            state_manager = ConnectorStateManager(state=self.state)
235        else:
236            # During `check` operations we don't have a catalog so cannot create a state manager.
237            # Since the state manager is only required for incremental syncs, this is fine.
238            state_manager = None
239
240        try:
241            parsed_config = self._get_parsed_config(config)
242            self.stream_reader.config = parsed_config
243            if self.stream_permissions_reader:
244                self.stream_permissions_reader.config = parsed_config
245            streams: List[Stream] = []
246            for stream_config in parsed_config.streams:
247                # Like state_manager, `catalog_stream` may be None during `check`
248                catalog_stream = self._get_stream_from_catalog(stream_config)
249                stream_state = (
250                    state_manager.get_stream_state(catalog_stream.name, catalog_stream.namespace)
251                    if (state_manager and catalog_stream)
252                    else None
253                )
254                self._validate_input_schema(stream_config)
255
256                sync_mode = self._get_sync_mode_from_catalog(stream_config.name)
257
258                if (
259                    sync_mode == SyncMode.full_refresh
260                    and hasattr(self, "_concurrency_level")
261                    and self._concurrency_level is not None
262                ):
263                    cursor = FileBasedFinalStateCursor(
264                        stream_config=stream_config,
265                        stream_namespace=None,
266                        message_repository=self.message_repository,
267                    )
268                    stream = FileBasedStreamFacade.create_from_stream(
269                        stream=self._make_file_based_stream(
270                            stream_config=stream_config,
271                            cursor=cursor,
272                            parsed_config=parsed_config,
273                        ),
274                        source=self,
275                        logger=self.logger,
276                        state=stream_state,
277                        cursor=cursor,
278                    )
279
280                elif (
281                    sync_mode == SyncMode.incremental
282                    and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor)
283                    and hasattr(self, "_concurrency_level")
284                    and self._concurrency_level is not None
285                ):
286                    assert (
287                        state_manager is not None
288                    ), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support."
289
290                    cursor = self.cursor_cls(
291                        stream_config,
292                        stream_config.name,
293                        None,
294                        stream_state,
295                        self.message_repository,
296                        state_manager,
297                        CursorField(DefaultFileBasedStream.ab_last_mod_col),
298                    )
299                    stream = FileBasedStreamFacade.create_from_stream(
300                        stream=self._make_file_based_stream(
301                            stream_config=stream_config,
302                            cursor=cursor,
303                            parsed_config=parsed_config,
304                        ),
305                        source=self,
306                        logger=self.logger,
307                        state=stream_state,
308                        cursor=cursor,
309                    )
310                else:
311                    cursor = self.cursor_cls(stream_config)
312                    stream = self._make_file_based_stream(
313                        stream_config=stream_config,
314                        cursor=cursor,
315                        parsed_config=parsed_config,
316                    )
317
318                streams.append(stream)
319
320            if include_identities_stream(parsed_config):
321                identities_stream = self._make_identities_stream()
322                streams.append(identities_stream)
323            return streams
324
325        except ValidationError as exc:
326            raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc
327
328    def _make_default_stream(
329        self,
330        stream_config: FileBasedStreamConfig,
331        cursor: Optional[AbstractFileBasedCursor],
332        parsed_config: AbstractFileBasedSpec,
333    ) -> AbstractFileBasedStream:
334        return DefaultFileBasedStream(
335            config=stream_config,
336            catalog_schema=self.stream_schemas.get(stream_config.name),
337            stream_reader=self.stream_reader,
338            availability_strategy=self.availability_strategy,
339            discovery_policy=self.discovery_policy,
340            parsers=self.parsers,
341            validation_policy=self._validate_and_get_validation_policy(stream_config),
342            errors_collector=self.errors_collector,
343            cursor=cursor,
344            use_file_transfer=use_file_transfer(parsed_config),
345            preserve_directory_structure=preserve_directory_structure(parsed_config),
346        )
347
348    def _ensure_permissions_reader_available(self) -> None:
349        """
350        Validates that a stream permissions reader is available.
351        Raises a ValueError if the reader is not provided.
352        """
353        if not self.stream_permissions_reader:
354            raise ValueError(
355                "Stream permissions reader is required for streams that use permissions transfer mode."
356            )
357
358    def _make_permissions_stream(
359        self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
360    ) -> AbstractFileBasedStream:
361        """
362        Creates a stream that reads permissions from files.
363        """
364        self._ensure_permissions_reader_available()
365        return PermissionsFileBasedStream(
366            config=stream_config,
367            catalog_schema=self.stream_schemas.get(stream_config.name),
368            stream_reader=self.stream_reader,
369            availability_strategy=self.availability_strategy,
370            discovery_policy=self.discovery_policy,
371            parsers=self.parsers,
372            validation_policy=self._validate_and_get_validation_policy(stream_config),
373            errors_collector=self.errors_collector,
374            cursor=cursor,
375            stream_permissions_reader=self.stream_permissions_reader,  # type: ignore
376        )
377
378    def _make_file_based_stream(
379        self,
380        stream_config: FileBasedStreamConfig,
381        cursor: Optional[AbstractFileBasedCursor],
382        parsed_config: AbstractFileBasedSpec,
383    ) -> AbstractFileBasedStream:
384        """
385        Creates different streams depending on the type of the transfer mode selected
386        """
387        if use_permissions_transfer(parsed_config):
388            return self._make_permissions_stream(stream_config, cursor)
389        # we should have a stream for File transfer mode to decouple from DefaultFileBasedStream
390        else:
391            return self._make_default_stream(stream_config, cursor, parsed_config)
392
393    def _make_identities_stream(
394        self,
395    ) -> Stream:
396        self._ensure_permissions_reader_available()
397        return FileIdentitiesStream(
398            catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME),
399            stream_permissions_reader=self.stream_permissions_reader,  # type: ignore
400            discovery_policy=self.discovery_policy,
401            errors_collector=self.errors_collector,
402        )
403
404    def _get_stream_from_catalog(
405        self, stream_config: FileBasedStreamConfig
406    ) -> Optional[AirbyteStream]:
407        if self.catalog:
408            for stream in self.catalog.streams or []:
409                if stream.stream.name == stream_config.name:
410                    return stream.stream
411        return None
412
413    def _get_sync_mode_from_catalog(self, stream_name: str) -> Optional[SyncMode]:
414        if self.catalog:
415            for catalog_stream in self.catalog.streams:
416                if stream_name == catalog_stream.stream.name:
417                    return catalog_stream.sync_mode
418            self.logger.warning(f"No sync mode was found for {stream_name}.")
419        return None
420
421    def read(
422        self,
423        logger: logging.Logger,
424        config: Mapping[str, Any],
425        catalog: ConfiguredAirbyteCatalog,
426        state: Optional[List[AirbyteStateMessage]] = None,
427    ) -> Iterator[AirbyteMessage]:
428        yield from super().read(logger, config, catalog, state)
429        # emit all the errors collected
430        yield from self.errors_collector.yield_and_raise_collected()
431        # count streams using a certain parser
432        parsed_config = self._get_parsed_config(config)
433        for parser, count in Counter(
434            stream.format.filetype for stream in parsed_config.streams
435        ).items():
436            yield create_analytics_message(f"file-cdk-{parser}-stream-count", count)
437
438    def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
439        """
440        Returns the specification describing what fields can be configured by a user when setting up a file-based source.
441        """
442
443        return ConnectorSpecification(
444            documentationUrl=self.spec_class.documentation_url(),
445            connectionSpecification=self.spec_class.schema(),
446        )
447
448    def _get_parsed_config(self, config: Mapping[str, Any]) -> AbstractFileBasedSpec:
449        return self.spec_class(**config)
450
451    def _validate_and_get_validation_policy(
452        self, stream_config: FileBasedStreamConfig
453    ) -> AbstractSchemaValidationPolicy:
454        if stream_config.validation_policy not in self.validation_policies:
455            # This should never happen because we validate the config against the schema's validation_policy enum
456            raise ValidationError(
457                f"`validation_policy` must be one of {list(self.validation_policies.keys())}",
458                model=FileBasedStreamConfig,
459            )
460        return self.validation_policies[stream_config.validation_policy]
461
462    def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None:
463        if stream_config.schemaless and stream_config.input_schema:
464            raise ValidationError(
465                "`input_schema` and `schemaless` options cannot both be set",
466                model=FileBasedStreamConfig,
467            )

Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.

FileBasedSource( stream_reader: AbstractFileBasedStreamReader, spec_class: Type[AbstractFileBasedSpec], catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]], availability_strategy: Optional[airbyte_cdk.sources.file_based.availability_strategy.AbstractFileBasedAvailabilityStrategy] = None, discovery_policy: airbyte_cdk.sources.file_based.discovery_policy.AbstractDiscoveryPolicy = <airbyte_cdk.sources.file_based.discovery_policy.DefaultDiscoveryPolicy object>, parsers: Mapping[Type[Any], airbyte_cdk.sources.file_based.file_types.file_type_parser.FileTypeParser] = {<class 'airbyte_cdk.sources.file_based.config.avro_format.AvroFormat'>: <airbyte_cdk.sources.file_based.file_types.AvroParser object>, <class 'CsvFormat'>: <airbyte_cdk.sources.file_based.file_types.CsvParser object>, <class 'airbyte_cdk.sources.file_based.config.excel_format.ExcelFormat'>: <airbyte_cdk.sources.file_based.file_types.ExcelParser object>, <class 'JsonlFormat'>: <airbyte_cdk.sources.file_based.file_types.JsonlParser object>, <class 'airbyte_cdk.sources.file_based.config.parquet_format.ParquetFormat'>: <airbyte_cdk.sources.file_based.file_types.ParquetParser object>, <class 'airbyte_cdk.sources.file_based.config.unstructured_format.UnstructuredFormat'>: <airbyte_cdk.sources.file_based.file_types.UnstructuredParser object>}, validation_policies: Mapping[airbyte_cdk.sources.file_based.config.file_based_stream_config.ValidationPolicy, airbyte_cdk.sources.file_based.schema_validation_policies.AbstractSchemaValidationPolicy] = {<ValidationPolicy.emit_record: 'Emit Record'>: <airbyte_cdk.sources.file_based.schema_validation_policies.EmitRecordPolicy object>, <ValidationPolicy.skip_record: 'Skip Record'>: <airbyte_cdk.sources.file_based.schema_validation_policies.SkipRecordPolicy object>, <ValidationPolicy.wait_for_discover: 'Wait for Discover'>: <airbyte_cdk.sources.file_based.schema_validation_policies.WaitForDiscoverPolicy object>}, cursor_cls: Type[Union[airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor, airbyte_cdk.sources.file_based.stream.cursor.AbstractFileBasedCursor]] = <class 'airbyte_cdk.sources.file_based.stream.concurrent.cursor.FileBasedConcurrentCursor'>, stream_permissions_reader: Optional[airbyte_cdk.sources.file_based.file_based_stream_permissions_reader.AbstractFileBasedStreamPermissionsReader] = None)
 91    def __init__(
 92        self,
 93        stream_reader: AbstractFileBasedStreamReader,
 94        spec_class: Type[AbstractFileBasedSpec],
 95        catalog: Optional[ConfiguredAirbyteCatalog],
 96        config: Optional[Mapping[str, Any]],
 97        state: Optional[List[AirbyteStateMessage]],
 98        availability_strategy: Optional[AbstractFileBasedAvailabilityStrategy] = None,
 99        discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(),
100        parsers: Mapping[Type[Any], FileTypeParser] = default_parsers,
101        validation_policies: Mapping[
102            ValidationPolicy, AbstractSchemaValidationPolicy
103        ] = DEFAULT_SCHEMA_VALIDATION_POLICIES,
104        cursor_cls: Type[
105            Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor]
106        ] = FileBasedConcurrentCursor,
107        stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None,
108    ):
109        self.stream_reader = stream_reader
110        self.stream_permissions_reader = stream_permissions_reader
111        self.spec_class = spec_class
112        self.config = config
113        self.catalog = catalog
114        self.state = state
115        self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy(
116            stream_reader
117        )
118        self.discovery_policy = discovery_policy
119        self.parsers = parsers
120        self.validation_policies = validation_policies
121        self.stream_schemas = (
122            {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {}
123        )
124        self.cursor_cls = cursor_cls
125        self.logger = init_logger(f"airbyte.{self.name}")
126        self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector()
127        self._message_repository: Optional[MessageRepository] = None
128        concurrent_source = ConcurrentSource.create(
129            MAX_CONCURRENCY,
130            INITIAL_N_PARTITIONS,
131            self.logger,
132            self._slice_logger,
133            self.message_repository,
134        )
135        self._state = None
136        super().__init__(concurrent_source)

ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.

The source's streams are still defined through the streams() method. Streams wrapped in a StreamFacade will be processed concurrently. Other streams will be processed sequentially as a later step.

stream_reader
stream_permissions_reader
spec_class
config
catalog
state
availability_strategy
discovery_policy
parsers
validation_policies
stream_schemas
cursor_cls
logger
message_repository: airbyte_cdk.MessageRepository
138    @property
139    def message_repository(self) -> MessageRepository:
140        if self._message_repository is None:
141            self._message_repository = InMemoryMessageRepository(
142                Level(AirbyteLogFormatter.level_mapping[self.logger.level])
143            )
144        return self._message_repository
def check_connection( self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
146    def check_connection(
147        self, logger: logging.Logger, config: Mapping[str, Any]
148    ) -> Tuple[bool, Optional[Any]]:
149        """
150        Check that the source can be accessed using the user-provided configuration.
151
152        For each stream, verify that we can list and read files.
153
154        Returns (True, None) if the connection check is successful.
155
156        Otherwise, the "error" object should describe what went wrong.
157        """
158        try:
159            streams = self.streams(config)
160        except Exception as config_exception:
161            raise AirbyteTracedException(
162                internal_message="Please check the logged errors for more information.",
163                message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
164                exception=AirbyteTracedException(exception=config_exception),
165                failure_type=FailureType.config_error,
166            )
167        if len(streams) == 0:
168            return (
169                False,
170                f"No streams are available for source {self.name}. This is probably an issue with the connector. Please verify that your "
171                f"configuration provides permissions to list and read files from the source. Contact support if you are unable to "
172                f"resolve this issue.",
173            )
174
175        errors = []
176        tracebacks = []
177        for stream in streams:
178            if isinstance(stream, FileIdentitiesStream):
179                identity = next(iter(stream.load_identity_groups()))
180                if not identity:
181                    errors.append(
182                        "Unable to get identities for current configuration, please check your credentials"
183                    )
184                continue
185            if not isinstance(stream, AbstractFileBasedStream):
186                raise ValueError(f"Stream {stream} is not a file-based stream.")
187            try:
188                parsed_config = self._get_parsed_config(config)
189                availability_method = (
190                    stream.availability_strategy.check_availability
191                    if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config)
192                    else stream.availability_strategy.check_availability_and_parsability
193                )
194                (
195                    stream_is_available,
196                    reason,
197                ) = availability_method(stream, logger, self)
198            except AirbyteTracedException as ate:
199                errors.append(f"Unable to connect to stream {stream.name} - {ate.message}")
200                tracebacks.append(traceback.format_exc())
201            except Exception:
202                errors.append(f"Unable to connect to stream {stream.name}")
203                tracebacks.append(traceback.format_exc())
204            else:
205                if not stream_is_available and reason:
206                    errors.append(reason)
207
208        if len(errors) == 1 and len(tracebacks) == 1:
209            raise AirbyteTracedException(
210                internal_message=tracebacks[0],
211                message=f"{errors[0]}",
212                failure_type=FailureType.config_error,
213            )
214        if len(errors) == 1 and len(tracebacks) == 0:
215            raise AirbyteTracedException(
216                message=f"{errors[0]}",
217                failure_type=FailureType.config_error,
218            )
219        elif len(errors) > 1:
220            raise AirbyteTracedException(
221                internal_message="\n".join(tracebacks),
222                message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}",
223                failure_type=FailureType.config_error,
224            )
225
226        return not bool(errors), (errors or None)

Check that the source can be accessed using the user-provided configuration.

For each stream, verify that we can list and read files.

Returns (True, None) if the connection check is successful.

Otherwise, the "error" object should describe what went wrong.

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.Stream]:
228    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
229        """
230        Return a list of this source's streams.
231        """
232
233        if self.catalog:
234            state_manager = ConnectorStateManager(state=self.state)
235        else:
236            # During `check` operations we don't have a catalog so cannot create a state manager.
237            # Since the state manager is only required for incremental syncs, this is fine.
238            state_manager = None
239
240        try:
241            parsed_config = self._get_parsed_config(config)
242            self.stream_reader.config = parsed_config
243            if self.stream_permissions_reader:
244                self.stream_permissions_reader.config = parsed_config
245            streams: List[Stream] = []
246            for stream_config in parsed_config.streams:
247                # Like state_manager, `catalog_stream` may be None during `check`
248                catalog_stream = self._get_stream_from_catalog(stream_config)
249                stream_state = (
250                    state_manager.get_stream_state(catalog_stream.name, catalog_stream.namespace)
251                    if (state_manager and catalog_stream)
252                    else None
253                )
254                self._validate_input_schema(stream_config)
255
256                sync_mode = self._get_sync_mode_from_catalog(stream_config.name)
257
258                if (
259                    sync_mode == SyncMode.full_refresh
260                    and hasattr(self, "_concurrency_level")
261                    and self._concurrency_level is not None
262                ):
263                    cursor = FileBasedFinalStateCursor(
264                        stream_config=stream_config,
265                        stream_namespace=None,
266                        message_repository=self.message_repository,
267                    )
268                    stream = FileBasedStreamFacade.create_from_stream(
269                        stream=self._make_file_based_stream(
270                            stream_config=stream_config,
271                            cursor=cursor,
272                            parsed_config=parsed_config,
273                        ),
274                        source=self,
275                        logger=self.logger,
276                        state=stream_state,
277                        cursor=cursor,
278                    )
279
280                elif (
281                    sync_mode == SyncMode.incremental
282                    and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor)
283                    and hasattr(self, "_concurrency_level")
284                    and self._concurrency_level is not None
285                ):
286                    assert (
287                        state_manager is not None
288                    ), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support."
289
290                    cursor = self.cursor_cls(
291                        stream_config,
292                        stream_config.name,
293                        None,
294                        stream_state,
295                        self.message_repository,
296                        state_manager,
297                        CursorField(DefaultFileBasedStream.ab_last_mod_col),
298                    )
299                    stream = FileBasedStreamFacade.create_from_stream(
300                        stream=self._make_file_based_stream(
301                            stream_config=stream_config,
302                            cursor=cursor,
303                            parsed_config=parsed_config,
304                        ),
305                        source=self,
306                        logger=self.logger,
307                        state=stream_state,
308                        cursor=cursor,
309                    )
310                else:
311                    cursor = self.cursor_cls(stream_config)
312                    stream = self._make_file_based_stream(
313                        stream_config=stream_config,
314                        cursor=cursor,
315                        parsed_config=parsed_config,
316                    )
317
318                streams.append(stream)
319
320            if include_identities_stream(parsed_config):
321                identities_stream = self._make_identities_stream()
322                streams.append(identities_stream)
323            return streams
324
325        except ValidationError as exc:
326            raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc

Return a list of this source's streams.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
421    def read(
422        self,
423        logger: logging.Logger,
424        config: Mapping[str, Any],
425        catalog: ConfiguredAirbyteCatalog,
426        state: Optional[List[AirbyteStateMessage]] = None,
427    ) -> Iterator[AirbyteMessage]:
428        yield from super().read(logger, config, catalog, state)
429        # emit all the errors collected
430        yield from self.errors_collector.yield_and_raise_collected()
431        # count streams using a certain parser
432        parsed_config = self._get_parsed_config(config)
433        for parser, count in Counter(
434            stream.format.filetype for stream in parsed_config.streams
435        ).items():
436            yield create_analytics_message(f"file-cdk-{parser}-stream-count", count)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.

def spec( self, *args: Any, **kwargs: Any) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
438    def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
439        """
440        Returns the specification describing what fields can be configured by a user when setting up a file-based source.
441        """
442
443        return ConnectorSpecification(
444            documentationUrl=self.spec_class.documentation_url(),
445            connectionSpecification=self.spec_class.schema(),
446        )

Returns the specification describing what fields can be configured by a user when setting up a file-based source.

class FileBasedSourceError(enum.Enum):
13class FileBasedSourceError(Enum):
14    EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict."
15    GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
16    ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding."
17    ERROR_CASTING_VALUE = "Could not cast the value to the expected type."
18    ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string."
19    ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record."
20    ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files."
21    ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files."
22    ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable."
23    ERROR_PARSING_USER_PROVIDED_SCHEMA = (
24        "The provided schema could not be transformed into valid JSON Schema."
25    )
26    ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
27    ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
28    ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
29    STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."
30    NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key."
31    UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type."
32    SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?"
33    INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns."
34    CONFIG_VALIDATION_ERROR = "Error creating stream config object."
35    MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing."
36    UNDEFINED_PARSER = "No parser is defined for this file type."
37    UNDEFINED_VALIDATION_POLICY = (
38        "The validation policy defined in the config does not exist for the source."
39    )

An enumeration.

EMPTY_STREAM = <FileBasedSourceError.EMPTY_STREAM: 'No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict.'>
GLOB_PARSE_ERROR = <FileBasedSourceError.GLOB_PARSE_ERROR: 'Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split.'>
ENCODING_ERROR = <FileBasedSourceError.ENCODING_ERROR: 'File encoding error. The configured encoding must match file encoding.'>
ERROR_CASTING_VALUE = <FileBasedSourceError.ERROR_CASTING_VALUE: 'Could not cast the value to the expected type.'>
ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = <FileBasedSourceError.ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE: 'Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string.'>
ERROR_DECODING_VALUE = <FileBasedSourceError.ERROR_DECODING_VALUE: 'Expected a JSON-decodeable value but could not decode record.'>
ERROR_LISTING_FILES = <FileBasedSourceError.ERROR_LISTING_FILES: 'Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files.'>
ERROR_READING_FILE = <FileBasedSourceError.ERROR_READING_FILE: 'Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files.'>
ERROR_PARSING_RECORD = <FileBasedSourceError.ERROR_PARSING_RECORD: "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable.">
ERROR_PARSING_USER_PROVIDED_SCHEMA = <FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA: 'The provided schema could not be transformed into valid JSON Schema.'>
ERROR_VALIDATING_RECORD = <FileBasedSourceError.ERROR_VALIDATING_RECORD: 'One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy.'>
ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = <FileBasedSourceError.ERROR_PARSING_RECORD_MISMATCHED_COLUMNS: "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows.">
ERROR_PARSING_RECORD_MISMATCHED_ROWS = <FileBasedSourceError.ERROR_PARSING_RECORD_MISMATCHED_ROWS: "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows.">
STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = <FileBasedSourceError.STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY: 'Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema.'>
NULL_VALUE_IN_SCHEMA = <FileBasedSourceError.NULL_VALUE_IN_SCHEMA: 'Error during schema inference: no type was detected for key.'>
UNRECOGNIZED_TYPE = <FileBasedSourceError.UNRECOGNIZED_TYPE: 'Error during schema inference: unrecognized type.'>
SCHEMA_INFERENCE_ERROR = <FileBasedSourceError.SCHEMA_INFERENCE_ERROR: 'Error inferring schema from files. Are the files valid?'>
INVALID_SCHEMA_ERROR = <FileBasedSourceError.INVALID_SCHEMA_ERROR: "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns.">
CONFIG_VALIDATION_ERROR = <FileBasedSourceError.CONFIG_VALIDATION_ERROR: 'Error creating stream config object.'>
MISSING_SCHEMA = <FileBasedSourceError.MISSING_SCHEMA: 'Expected `json_schema` in the configured catalog but it is missing.'>
UNDEFINED_PARSER = <FileBasedSourceError.UNDEFINED_PARSER: 'No parser is defined for this file type.'>
UNDEFINED_VALIDATION_POLICY = <FileBasedSourceError.UNDEFINED_VALIDATION_POLICY: 'The validation policy defined in the config does not exist for the source.'>
class FileBasedStreamConfig(pydantic.v1.main.BaseModel):
 29class FileBasedStreamConfig(BaseModel):
 30    name: str = Field(title="Name", description="The name of the stream.")
 31    globs: Optional[List[str]] = Field(
 32        default=["**"],
 33        title="Globs",
 34        description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.',
 35        order=1,
 36    )
 37    legacy_prefix: Optional[str] = Field(
 38        title="Legacy Prefix",
 39        description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.",
 40        airbyte_hidden=True,
 41    )
 42    validation_policy: ValidationPolicy = Field(
 43        title="Validation Policy",
 44        description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
 45        default=ValidationPolicy.emit_record,
 46    )
 47    input_schema: Optional[str] = Field(
 48        title="Input Schema",
 49        description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
 50    )
 51    primary_key: Optional[str] = Field(
 52        title="Primary Key",
 53        description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.",
 54        airbyte_hidden=True,  # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here.
 55    )
 56    days_to_sync_if_history_is_full: int = Field(
 57        title="Days To Sync If History Is Full",
 58        description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
 59        default=3,
 60    )
 61    format: Union[
 62        AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat
 63    ] = Field(
 64        title="Format",
 65        description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
 66    )
 67    schemaless: bool = Field(
 68        title="Schemaless",
 69        description="When enabled, syncs will not validate or structure records against the stream's schema.",
 70        default=False,
 71    )
 72    recent_n_files_to_read_for_schema_discovery: Optional[int] = Field(
 73        title="Files To Read For Schema Discover",
 74        description="The number of resent files which will be used to discover the schema for this stream.",
 75        default=None,
 76        gt=0,
 77    )
 78
 79    @validator("input_schema", pre=True)
 80    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
 81        if v:
 82            if type_mapping_to_jsonschema(v):
 83                return v
 84            else:
 85                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
 86        return None
 87
 88    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
 89        """
 90        User defined input_schema is defined as a string in the config. This method takes the string representation
 91        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
 92        """
 93        if self.input_schema:
 94            schema = type_mapping_to_jsonschema(self.input_schema)
 95            if not schema:
 96                raise ValueError(
 97                    f"Unable to create JSON schema from input schema {self.input_schema}"
 98                )
 99            return schema
100        return None
name: str
globs: Optional[List[str]]
legacy_prefix: Optional[str]
input_schema: Optional[str]
primary_key: Optional[str]
days_to_sync_if_history_is_full: int
schemaless: bool
recent_n_files_to_read_for_schema_discovery: Optional[int]
@validator('input_schema', pre=True)
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
79    @validator("input_schema", pre=True)
80    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
81        if v:
82            if type_mapping_to_jsonschema(v):
83                return v
84            else:
85                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
86        return None
def get_input_schema(self) -> Optional[Mapping[str, Any]]:
 88    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
 89        """
 90        User defined input_schema is defined as a string in the config. This method takes the string representation
 91        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
 92        """
 93        if self.input_schema:
 94            schema = type_mapping_to_jsonschema(self.input_schema)
 95            if not schema:
 96                raise ValueError(
 97                    f"Unable to create JSON schema from input schema {self.input_schema}"
 98                )
 99            return schema
100        return None

User defined input_schema is defined as a string in the config. This method takes the string representation and converts it into a Mapping[str, Any] which is used by file-based CDK components.

class FileReadMode(enum.Enum):
25class FileReadMode(Enum):
26    READ = "r"
27    READ_BINARY = "rb"

An enumeration.

READ = <FileReadMode.READ: 'r'>
READ_BINARY = <FileReadMode.READ_BINARY: 'rb'>
class JsonlFormat(pydantic.v1.main.BaseModel):
11class JsonlFormat(BaseModel):
12    class Config(OneOfOptionConfig):
13        title = "Jsonl Format"
14        discriminator = "filetype"
15
16    filetype: str = Field(
17        "jsonl",
18        const=True,
19    )
filetype: str
class JsonlFormat.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
12    class Config(OneOfOptionConfig):
13        title = "Jsonl Format"
14        discriminator = "filetype"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Jsonl Format'
discriminator = 'filetype'
class RemoteFile(pydantic.v1.main.BaseModel):
12class RemoteFile(BaseModel):
13    """
14    A file in a file-based stream.
15    """
16
17    uri: str
18    last_modified: datetime
19    mime_type: Optional[str] = None

A file in a file-based stream.

uri: str
last_modified: datetime.datetime
mime_type: Optional[str]