airbyte_cdk.sources.file_based
1from .config.abstract_file_based_spec import AbstractFileBasedSpec 2from .config.csv_format import CsvFormat 3from .config.file_based_stream_config import FileBasedStreamConfig 4from .config.jsonl_format import JsonlFormat 5from .exceptions import CustomFileBasedException, ErrorListingFiles, FileBasedSourceError 6from .file_based_source import DEFAULT_CONCURRENCY, FileBasedSource 7from .file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode 8from .remote_file import RemoteFile 9from .stream.cursor import DefaultFileBasedCursor 10 11__all__ = [ 12 "AbstractFileBasedSpec", 13 "AbstractFileBasedStreamReader", 14 "CsvFormat", 15 "CustomFileBasedException", 16 "DefaultFileBasedCursor", 17 "ErrorListingFiles", 18 "FileBasedSource", 19 "FileBasedSourceError", 20 "FileBasedStreamConfig", 21 "FileReadMode", 22 "JsonlFormat", 23 "RemoteFile", 24]
48class AbstractFileBasedSpec(BaseModel): 49 """ 50 Used during spec; allows the developer to configure the cloud provider specific options 51 that are needed when users configure a file-based source. 52 """ 53 54 start_date: Optional[str] = Field( 55 title="Start Date", 56 description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.", 57 examples=["2021-01-01T00:00:00.000000Z"], 58 format="date-time", 59 pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$", 60 pattern_descriptor="YYYY-MM-DDTHH:mm:ss.SSSSSSZ", 61 order=1, 62 ) 63 64 streams: List[FileBasedStreamConfig] = Field( 65 title="The list of streams to sync", 66 description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.', 67 order=10, 68 ) 69 70 delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field( 71 title="Delivery Method", 72 discriminator="delivery_type", 73 type="object", 74 order=7, 75 display_type="radio", 76 group="advanced", 77 default="use_records_transfer", 78 airbyte_hidden=True, 79 ) 80 81 @classmethod 82 @abstractmethod 83 def documentation_url(cls) -> AnyUrl: 84 """ 85 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 86 """ 87 88 @classmethod 89 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 90 """ 91 Generates the mapping comprised of the config fields 92 """ 93 schema = super().schema(*args, **kwargs) 94 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 95 schema_helpers.expand_refs(transformed_schema) 96 cls.replace_enum_allOf_and_anyOf(transformed_schema) 97 cls.remove_discriminator(transformed_schema) 98 99 return transformed_schema 100 101 @staticmethod 102 def remove_discriminator(schema: Dict[str, Any]) -> None: 103 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 104 dpath.delete(schema, "properties/**/discriminator") 105 106 @staticmethod 107 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 108 """ 109 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 110 Unpacks the enums under allOf and moves them up a level under the enum key 111 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 112 additional validation that an incoming config only matches exactly one of a field's types. 113 """ 114 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 115 objects_to_check["type"] = "object" 116 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 117 for format in objects_to_check["oneOf"]: 118 for key in format["properties"]: 119 object_property = format["properties"][key] 120 AbstractFileBasedSpec.move_enum_to_root(object_property) 121 122 properties_to_change = ["validation_policy"] 123 for property_to_change in properties_to_change: 124 property_object = schema["properties"]["streams"]["items"]["properties"][ 125 property_to_change 126 ] 127 if "anyOf" in property_object: 128 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 129 "type" 130 ] = "object" 131 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 132 "oneOf" 133 ] = property_object.pop("anyOf") 134 AbstractFileBasedSpec.move_enum_to_root(property_object) 135 136 csv_format_schemas = list( 137 filter( 138 lambda format: format["properties"]["filetype"]["default"] == "csv", 139 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 140 ) 141 ) 142 if len(csv_format_schemas) != 1: 143 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 144 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 145 "properties" 146 ]["header_definition"].pop("anyOf", []) 147 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 148 return schema 149 150 @staticmethod 151 def move_enum_to_root(object_property: Dict[str, Any]) -> None: 152 if "allOf" in object_property and "enum" in object_property["allOf"][0]: 153 object_property["enum"] = object_property["allOf"][0]["enum"] 154 object_property.pop("allOf")
Used during spec; allows the developer to configure the cloud provider specific options that are needed when users configure a file-based source.
81 @classmethod 82 @abstractmethod 83 def documentation_url(cls) -> AnyUrl: 84 """ 85 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 86 """
Returns
link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
88 @classmethod 89 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 90 """ 91 Generates the mapping comprised of the config fields 92 """ 93 schema = super().schema(*args, **kwargs) 94 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 95 schema_helpers.expand_refs(transformed_schema) 96 cls.replace_enum_allOf_and_anyOf(transformed_schema) 97 cls.remove_discriminator(transformed_schema) 98 99 return transformed_schema
Generates the mapping comprised of the config fields
101 @staticmethod 102 def remove_discriminator(schema: Dict[str, Any]) -> None: 103 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 104 dpath.delete(schema, "properties/**/discriminator")
pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references
106 @staticmethod 107 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 108 """ 109 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 110 Unpacks the enums under allOf and moves them up a level under the enum key 111 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 112 additional validation that an incoming config only matches exactly one of a field's types. 113 """ 114 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 115 objects_to_check["type"] = "object" 116 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 117 for format in objects_to_check["oneOf"]: 118 for key in format["properties"]: 119 object_property = format["properties"][key] 120 AbstractFileBasedSpec.move_enum_to_root(object_property) 121 122 properties_to_change = ["validation_policy"] 123 for property_to_change in properties_to_change: 124 property_object = schema["properties"]["streams"]["items"]["properties"][ 125 property_to_change 126 ] 127 if "anyOf" in property_object: 128 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 129 "type" 130 ] = "object" 131 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 132 "oneOf" 133 ] = property_object.pop("anyOf") 134 AbstractFileBasedSpec.move_enum_to_root(property_object) 135 136 csv_format_schemas = list( 137 filter( 138 lambda format: format["properties"]["filetype"]["default"] == "csv", 139 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 140 ) 141 ) 142 if len(csv_format_schemas) != 1: 143 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 144 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 145 "properties" 146 ]["header_definition"].pop("anyOf", []) 147 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 148 return schema
allOfs are not supported by the UI, but pydantic is automatically writing them for enums. Unpacks the enums under allOf and moves them up a level under the enum key anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the additional validation that an incoming config only matches exactly one of a field's types.
30class AbstractFileBasedStreamReader(ABC): 31 DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" 32 33 def __init__(self) -> None: 34 self._config = None 35 36 @property 37 def config(self) -> Optional[AbstractFileBasedSpec]: 38 return self._config 39 40 @config.setter 41 @abstractmethod 42 def config(self, value: AbstractFileBasedSpec) -> None: 43 """ 44 FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader. 45 46 Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader 47 will require keys that (for example) allow it to authenticate with the 3rd party. 48 49 Therefore, concrete implementations of AbstractFileBasedStreamReader's config setter should assert that `value` is of the correct 50 config type for that type of StreamReader. 51 """ 52 ... 53 54 @abstractmethod 55 def open_file( 56 self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger 57 ) -> IOBase: 58 """ 59 Return a file handle for reading. 60 61 Many sources will be able to use smart_open to implement this method, 62 for example: 63 64 client = boto3.Session(...) 65 return smart_open.open(remote_file.uri, transport_params={"client": client}) 66 """ 67 ... 68 69 @abstractmethod 70 def get_matching_files( 71 self, 72 globs: List[str], 73 prefix: Optional[str], 74 logger: logging.Logger, 75 ) -> Iterable[RemoteFile]: 76 """ 77 Return all files that match any of the globs. 78 79 Example: 80 81 The source has files "a.json", "foo/a.json", "foo/bar/a.json" 82 83 If globs = ["*.json"] then this method returns ["a.json"]. 84 85 If globs = ["foo/*.json"] then this method returns ["foo/a.json"]. 86 87 Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs` 88 are available, which may be helpful when implementing this method. 89 """ 90 ... 91 92 def filter_files_by_globs_and_start_date( 93 self, files: List[RemoteFile], globs: List[str] 94 ) -> Iterable[RemoteFile]: 95 """ 96 Utility method for filtering files based on globs. 97 """ 98 start_date = ( 99 datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT) 100 if self.config and self.config.start_date 101 else None 102 ) 103 seen = set() 104 105 for file in files: 106 if self.file_matches_globs(file, globs): 107 if file.uri not in seen and (not start_date or file.last_modified >= start_date): 108 seen.add(file.uri) 109 yield file 110 111 @abstractmethod 112 def file_size(self, file: RemoteFile) -> int: 113 """Utility method to get size of the remote file. 114 115 This is required for connectors that will support writing to 116 files. If the connector does not support writing files, then the 117 subclass can simply `return 0`. 118 """ 119 ... 120 121 @staticmethod 122 def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool: 123 # Use the GLOBSTAR flag to enable recursive ** matching 124 # (https://facelessuser.github.io/wcmatch/wcmatch/#globstar) 125 return any(globmatch(file.uri, g, flags=GLOBSTAR) for g in globs) 126 127 @staticmethod 128 def get_prefixes_from_globs(globs: List[str]) -> Set[str]: 129 """ 130 Utility method for extracting prefixes from the globs. 131 """ 132 prefixes = {glob.split("*")[0] for glob in globs} 133 return set(filter(lambda x: bool(x), prefixes)) 134 135 def use_file_transfer(self) -> bool: 136 if self.config: 137 return use_file_transfer(self.config) 138 return False 139 140 def preserve_directory_structure(self) -> bool: 141 # fall back to preserve subdirectories if config is not present or incomplete 142 if self.config: 143 return preserve_directory_structure(self.config) 144 return True 145 146 def include_identities_stream(self) -> bool: 147 if self.config: 148 return include_identities_stream(self.config) 149 return False 150 151 @abstractmethod 152 def get_file( 153 self, file: RemoteFile, local_directory: str, logger: logging.Logger 154 ) -> Dict[str, Any]: 155 """ 156 This is required for connectors that will support writing to 157 files. It will handle the logic to download,get,read,acquire or 158 whatever is more efficient to get a file from the source. 159 160 Args: 161 file (RemoteFile): The remote file object containing URI and metadata. 162 local_directory (str): The local directory path where the file will be downloaded. 163 logger (logging.Logger): Logger for logging information and errors. 164 165 Returns: 166 dict: A dictionary containing the following: 167 - "file_url" (str): The absolute path of the downloaded file. 168 - "bytes" (int): The file size in bytes. 169 - "file_relative_path" (str): The relative path of the file for local storage. Is relative to local_directory as 170 this a mounted volume in the pod container. 171 172 """ 173 ... 174 175 def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> List[str]: 176 preserve_directory_structure = self.preserve_directory_structure() 177 if preserve_directory_structure: 178 # Remove left slashes from source path format to make relative path for writing locally 179 file_relative_path = file.uri.lstrip("/") 180 else: 181 file_relative_path = path.basename(file.uri) 182 local_file_path = path.join(local_directory, file_relative_path) 183 184 # Ensure the local directory exists 185 makedirs(path.dirname(local_file_path), exist_ok=True) 186 absolute_file_path = path.abspath(local_file_path) 187 return [file_relative_path, local_file_path, absolute_file_path]
Helper class that provides a standard way to create an ABC using inheritance.
FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.
Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader will require keys that (for example) allow it to authenticate with the 3rd party.
Therefore, concrete implementations of AbstractFileBasedStreamReader's config setter should assert that value
is of the correct
config type for that type of StreamReader.
54 @abstractmethod 55 def open_file( 56 self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger 57 ) -> IOBase: 58 """ 59 Return a file handle for reading. 60 61 Many sources will be able to use smart_open to implement this method, 62 for example: 63 64 client = boto3.Session(...) 65 return smart_open.open(remote_file.uri, transport_params={"client": client}) 66 """ 67 ...
Return a file handle for reading.
Many sources will be able to use smart_open to implement this method, for example:
client = boto3.Session(...) return smart_open.open(remote_file.uri, transport_params={"client": client})
69 @abstractmethod 70 def get_matching_files( 71 self, 72 globs: List[str], 73 prefix: Optional[str], 74 logger: logging.Logger, 75 ) -> Iterable[RemoteFile]: 76 """ 77 Return all files that match any of the globs. 78 79 Example: 80 81 The source has files "a.json", "foo/a.json", "foo/bar/a.json" 82 83 If globs = ["*.json"] then this method returns ["a.json"]. 84 85 If globs = ["foo/*.json"] then this method returns ["foo/a.json"]. 86 87 Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs` 88 are available, which may be helpful when implementing this method. 89 """ 90 ...
Return all files that match any of the globs.
Example:
The source has files "a.json", "foo/a.json", "foo/bar/a.json"
If globs = ["*.json"] then this method returns ["a.json"].
If globs = ["foo/*.json"] then this method returns ["foo/a.json"].
Utility method self.filter_files_by_globs
and self.get_prefixes_from_globs
are available, which may be helpful when implementing this method.
92 def filter_files_by_globs_and_start_date( 93 self, files: List[RemoteFile], globs: List[str] 94 ) -> Iterable[RemoteFile]: 95 """ 96 Utility method for filtering files based on globs. 97 """ 98 start_date = ( 99 datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT) 100 if self.config and self.config.start_date 101 else None 102 ) 103 seen = set() 104 105 for file in files: 106 if self.file_matches_globs(file, globs): 107 if file.uri not in seen and (not start_date or file.last_modified >= start_date): 108 seen.add(file.uri) 109 yield file
Utility method for filtering files based on globs.
111 @abstractmethod 112 def file_size(self, file: RemoteFile) -> int: 113 """Utility method to get size of the remote file. 114 115 This is required for connectors that will support writing to 116 files. If the connector does not support writing files, then the 117 subclass can simply `return 0`. 118 """ 119 ...
Utility method to get size of the remote file.
This is required for connectors that will support writing to
files. If the connector does not support writing files, then the
subclass can simply return 0
.
127 @staticmethod 128 def get_prefixes_from_globs(globs: List[str]) -> Set[str]: 129 """ 130 Utility method for extracting prefixes from the globs. 131 """ 132 prefixes = {glob.split("*")[0] for glob in globs} 133 return set(filter(lambda x: bool(x), prefixes))
Utility method for extracting prefixes from the globs.
151 @abstractmethod 152 def get_file( 153 self, file: RemoteFile, local_directory: str, logger: logging.Logger 154 ) -> Dict[str, Any]: 155 """ 156 This is required for connectors that will support writing to 157 files. It will handle the logic to download,get,read,acquire or 158 whatever is more efficient to get a file from the source. 159 160 Args: 161 file (RemoteFile): The remote file object containing URI and metadata. 162 local_directory (str): The local directory path where the file will be downloaded. 163 logger (logging.Logger): Logger for logging information and errors. 164 165 Returns: 166 dict: A dictionary containing the following: 167 - "file_url" (str): The absolute path of the downloaded file. 168 - "bytes" (int): The file size in bytes. 169 - "file_relative_path" (str): The relative path of the file for local storage. Is relative to local_directory as 170 this a mounted volume in the pod container. 171 172 """ 173 ...
This is required for connectors that will support writing to files. It will handle the logic to download,get,read,acquire or whatever is more efficient to get a file from the source.
Arguments:
- file (RemoteFile): The remote file object containing URI and metadata. local_directory (str): The local directory path where the file will be downloaded. logger (logging.Logger): Logger for logging information and errors.
- Returns: dict: A dictionary containing the following:
- "file_url" (str): The absolute path of the downloaded file.
- "bytes" (int): The file size in bytes.
- "file_relative_path" (str): The relative path of the file for local storage. Is relative to local_directory as this a mounted volume in the pod container.
85class CsvFormat(BaseModel): 86 class Config(OneOfOptionConfig): 87 title = "CSV Format" 88 discriminator = "filetype" 89 90 filetype: str = Field( 91 "csv", 92 const=True, 93 ) 94 delimiter: str = Field( 95 title="Delimiter", 96 description="The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.", 97 default=",", 98 ) 99 quote_char: str = Field( 100 title="Quote Character", 101 default='"', 102 description="The character used for quoting CSV values. To disallow quoting, make this field blank.", 103 ) 104 escape_char: Optional[str] = Field( 105 title="Escape Character", 106 default=None, 107 description="The character used for escaping special characters. To disallow escaping, leave this field blank.", 108 ) 109 encoding: Optional[str] = Field( 110 default="utf8", 111 description='The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href="https://docs.python.org/3/library/codecs.html#standard-encodings" target="_blank">list of python encodings</a> for allowable options.', 112 ) 113 double_quote: bool = Field( 114 title="Double Quote", 115 default=True, 116 description="Whether two quotes in a quoted CSV value denote a single quote in the data.", 117 ) 118 null_values: Set[str] = Field( 119 title="Null Values", 120 default=[], 121 description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.", 122 ) 123 strings_can_be_null: bool = Field( 124 title="Strings Can Be Null", 125 default=True, 126 description="Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.", 127 ) 128 skip_rows_before_header: int = Field( 129 title="Skip Rows Before Header", 130 default=0, 131 description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.", 132 ) 133 skip_rows_after_header: int = Field( 134 title="Skip Rows After Header", 135 default=0, 136 description="The number of rows to skip after the header row.", 137 ) 138 header_definition: Union[CsvHeaderFromCsv, CsvHeaderAutogenerated, CsvHeaderUserProvided] = ( 139 Field( 140 title="CSV Header Definition", 141 default=CsvHeaderFromCsv(header_definition_type=CsvHeaderDefinitionType.FROM_CSV.value), 142 description="How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.", 143 ) 144 ) 145 true_values: Set[str] = Field( 146 title="True Values", 147 default=DEFAULT_TRUE_VALUES, 148 description="A set of case-sensitive strings that should be interpreted as true values.", 149 ) 150 false_values: Set[str] = Field( 151 title="False Values", 152 default=DEFAULT_FALSE_VALUES, 153 description="A set of case-sensitive strings that should be interpreted as false values.", 154 ) 155 inference_type: InferenceType = Field( 156 title="Inference Type", 157 default=InferenceType.NONE, 158 description="How to infer the types of the columns. If none, inference default to strings.", 159 airbyte_hidden=True, 160 ) 161 ignore_errors_on_fields_mismatch: bool = Field( 162 title="Ignore errors on field mismatch", 163 default=False, 164 description="Whether to ignore errors that occur when the number of fields in the CSV does not match the number of columns in the schema.", 165 ) 166 167 @validator("delimiter") 168 def validate_delimiter(cls, v: str) -> str: 169 if v == r"\t": 170 v = "\t" 171 if len(v) != 1: 172 raise ValueError("delimiter should only be one character") 173 if v in {"\r", "\n"}: 174 raise ValueError(f"delimiter cannot be {v}") 175 return v 176 177 @validator("quote_char") 178 def validate_quote_char(cls, v: str) -> str: 179 if len(v) != 1: 180 raise ValueError("quote_char should only be one character") 181 return v 182 183 @validator("escape_char") 184 def validate_escape_char(cls, v: str) -> str: 185 if v is not None and len(v) != 1: 186 raise ValueError("escape_char should only be one character") 187 return v 188 189 @validator("encoding") 190 def validate_encoding(cls, v: str) -> str: 191 try: 192 codecs.lookup(v) 193 except LookupError: 194 raise ValueError(f"invalid encoding format: {v}") 195 return v 196 197 @root_validator 198 def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]: 199 definition_type = values.get("header_definition_type") 200 column_names = values.get("user_provided_column_names") 201 if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names: 202 raise ValidationError( 203 "`user_provided_column_names` should be defined if the definition 'User Provided'.", 204 model=CsvFormat, 205 ) 206 if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names: 207 raise ValidationError( 208 "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.", 209 model=CsvFormat, 210 ) 211 return values
197 @root_validator 198 def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]: 199 definition_type = values.get("header_definition_type") 200 column_names = values.get("user_provided_column_names") 201 if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names: 202 raise ValidationError( 203 "`user_provided_column_names` should be defined if the definition 'User Provided'.", 204 model=CsvFormat, 205 ) 206 if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names: 207 raise ValidationError( 208 "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.", 209 model=CsvFormat, 210 ) 211 return values
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
149class CustomFileBasedException(AirbyteTracedException): 150 """ 151 A specialized exception for file-based connectors. 152 153 This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages. 154 """ 155 156 pass
A specialized exception for file-based connectors.
This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
18class DefaultFileBasedCursor(AbstractFileBasedCursor): 19 DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3 20 DEFAULT_MAX_HISTORY_SIZE = 10_000 21 DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" 22 CURSOR_FIELD = "_ab_source_file_last_modified" 23 24 def __init__(self, stream_config: FileBasedStreamConfig, **_: Any): 25 super().__init__(stream_config) # type: ignore [safe-super] 26 self._file_to_datetime_history: MutableMapping[str, str] = {} 27 self._time_window_if_history_is_full = timedelta( 28 days=stream_config.days_to_sync_if_history_is_full 29 or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL 30 ) 31 32 if self._time_window_if_history_is_full <= timedelta(): 33 raise ValueError( 34 f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}" 35 ) 36 37 self._start_time = self._compute_start_time() 38 self._initial_earliest_file_in_history: Optional[RemoteFile] = None 39 40 def set_initial_state(self, value: StreamState) -> None: 41 self._file_to_datetime_history = value.get("history", {}) 42 self._start_time = self._compute_start_time() 43 self._initial_earliest_file_in_history = self._compute_earliest_file_in_history() 44 45 def add_file(self, file: RemoteFile) -> None: 46 self._file_to_datetime_history[file.uri] = file.last_modified.strftime( 47 self.DATE_TIME_FORMAT 48 ) 49 if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: 50 # Get the earliest file based on its last modified date and its uri 51 oldest_file = self._compute_earliest_file_in_history() 52 if oldest_file: 53 del self._file_to_datetime_history[oldest_file.uri] 54 else: 55 raise Exception( 56 "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK." 57 ) 58 59 def get_state(self) -> StreamState: 60 state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()} 61 return state 62 63 def _get_cursor(self) -> Optional[str]: 64 """ 65 Returns the cursor value. 66 67 Files are synced in order of last-modified with secondary sort on filename, so the cursor value is 68 a string joining the last-modified timestamp of the last synced file and the name of the file. 69 """ 70 if self._file_to_datetime_history.items(): 71 filename, timestamp = max( 72 self._file_to_datetime_history.items(), key=lambda x: (x[1], x[0]) 73 ) 74 return f"{timestamp}_{filename}" 75 return None 76 77 def _is_history_full(self) -> bool: 78 """ 79 Returns true if the state's history is full, meaning new entries will start to replace old entries. 80 """ 81 return len(self._file_to_datetime_history) >= self.DEFAULT_MAX_HISTORY_SIZE 82 83 def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool: 84 if file.uri in self._file_to_datetime_history: 85 # If the file's uri is in the history, we should sync the file if it has been modified since it was synced 86 updated_at_from_history = datetime.strptime( 87 self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT 88 ) 89 if file.last_modified < updated_at_from_history: 90 logger.warning( 91 f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file." 92 ) 93 else: 94 return file.last_modified > updated_at_from_history 95 return file.last_modified > updated_at_from_history 96 if self._is_history_full(): 97 if self._initial_earliest_file_in_history is None: 98 return True 99 if file.last_modified > self._initial_earliest_file_in_history.last_modified: 100 # If the history is partial and the file's datetime is strictly greater than the earliest file in the history, 101 # we should sync it 102 return True 103 elif file.last_modified == self._initial_earliest_file_in_history.last_modified: 104 # If the history is partial and the file's datetime is equal to the earliest file in the history, 105 # we should sync it if its uri is strictly greater than the earliest file in the history 106 return file.uri > self._initial_earliest_file_in_history.uri 107 else: 108 # Otherwise, only sync the file if it has been modified since the start of the time window 109 return file.last_modified >= self.get_start_time() 110 else: 111 # The file is not in the history and the history is complete. We know we need to sync the file 112 return True 113 114 def get_files_to_sync( 115 self, all_files: Iterable[RemoteFile], logger: logging.Logger 116 ) -> Iterable[RemoteFile]: 117 if self._is_history_full(): 118 logger.warning( 119 f"The state history is full. " 120 f"This sync and future syncs won't be able to use the history to filter out duplicate files. " 121 f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files." 122 ) 123 for f in all_files: 124 if self._should_sync_file(f, logger): 125 yield f 126 127 def get_start_time(self) -> datetime: 128 return self._start_time 129 130 def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]: 131 if self._file_to_datetime_history: 132 filename, last_modified = min( 133 self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0]) 134 ) 135 return RemoteFile( 136 uri=filename, last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT) 137 ) 138 else: 139 return None 140 141 def _compute_start_time(self) -> datetime: 142 if not self._file_to_datetime_history: 143 return datetime.min 144 else: 145 earliest = min(self._file_to_datetime_history.values()) 146 earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT) 147 if self._is_history_full(): 148 time_window = datetime.now() - self._time_window_if_history_is_full 149 earliest_dt = min(earliest_dt, time_window) 150 return earliest_dt
Abstract base class for cursors used by file-based streams.
24 def __init__(self, stream_config: FileBasedStreamConfig, **_: Any): 25 super().__init__(stream_config) # type: ignore [safe-super] 26 self._file_to_datetime_history: MutableMapping[str, str] = {} 27 self._time_window_if_history_is_full = timedelta( 28 days=stream_config.days_to_sync_if_history_is_full 29 or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL 30 ) 31 32 if self._time_window_if_history_is_full <= timedelta(): 33 raise ValueError( 34 f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}" 35 ) 36 37 self._start_time = self._compute_start_time() 38 self._initial_earliest_file_in_history: Optional[RemoteFile] = None
Common interface for all cursors.
40 def set_initial_state(self, value: StreamState) -> None: 41 self._file_to_datetime_history = value.get("history", {}) 42 self._start_time = self._compute_start_time() 43 self._initial_earliest_file_in_history = self._compute_earliest_file_in_history()
Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.
Parameters
- value: The stream state
45 def add_file(self, file: RemoteFile) -> None: 46 self._file_to_datetime_history[file.uri] = file.last_modified.strftime( 47 self.DATE_TIME_FORMAT 48 ) 49 if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: 50 # Get the earliest file based on its last modified date and its uri 51 oldest_file = self._compute_earliest_file_in_history() 52 if oldest_file: 53 del self._file_to_datetime_history[oldest_file.uri] 54 else: 55 raise Exception( 56 "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK." 57 )
Add a file to the cursor. This method is called when a file is processed by the stream.
Parameters
- file: The file to add
59 def get_state(self) -> StreamState: 60 state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()} 61 return state
Get the state of the cursor.
114 def get_files_to_sync( 115 self, all_files: Iterable[RemoteFile], logger: logging.Logger 116 ) -> Iterable[RemoteFile]: 117 if self._is_history_full(): 118 logger.warning( 119 f"The state history is full. " 120 f"This sync and future syncs won't be able to use the history to filter out duplicate files. " 121 f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files." 122 ) 123 for f in all_files: 124 if self._should_sync_file(f, logger): 125 yield f
Given the list of files in the source, return the files that should be synced.
Parameters
- all_files: All files in the source
- logger:
Returns
The files that should be synced
Common base class for all non-exit exceptions.
87class FileBasedSource(ConcurrentSourceAdapter, ABC): 88 # We make each source override the concurrency level to give control over when they are upgraded. 89 _concurrency_level = None 90 91 def __init__( 92 self, 93 stream_reader: AbstractFileBasedStreamReader, 94 spec_class: Type[AbstractFileBasedSpec], 95 catalog: Optional[ConfiguredAirbyteCatalog], 96 config: Optional[Mapping[str, Any]], 97 state: Optional[List[AirbyteStateMessage]], 98 availability_strategy: Optional[AbstractFileBasedAvailabilityStrategy] = None, 99 discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(), 100 parsers: Mapping[Type[Any], FileTypeParser] = default_parsers, 101 validation_policies: Mapping[ 102 ValidationPolicy, AbstractSchemaValidationPolicy 103 ] = DEFAULT_SCHEMA_VALIDATION_POLICIES, 104 cursor_cls: Type[ 105 Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor] 106 ] = FileBasedConcurrentCursor, 107 stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None, 108 ): 109 self.stream_reader = stream_reader 110 self.stream_permissions_reader = stream_permissions_reader 111 self.spec_class = spec_class 112 self.config = config 113 self.catalog = catalog 114 self.state = state 115 self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy( 116 stream_reader 117 ) 118 self.discovery_policy = discovery_policy 119 self.parsers = parsers 120 self.validation_policies = validation_policies 121 self.stream_schemas = ( 122 {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {} 123 ) 124 self.cursor_cls = cursor_cls 125 self.logger = init_logger(f"airbyte.{self.name}") 126 self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector() 127 self._message_repository: Optional[MessageRepository] = None 128 concurrent_source = ConcurrentSource.create( 129 MAX_CONCURRENCY, 130 INITIAL_N_PARTITIONS, 131 self.logger, 132 self._slice_logger, 133 self.message_repository, 134 ) 135 self._state = None 136 super().__init__(concurrent_source) 137 138 @property 139 def message_repository(self) -> MessageRepository: 140 if self._message_repository is None: 141 self._message_repository = InMemoryMessageRepository( 142 Level(AirbyteLogFormatter.level_mapping[self.logger.level]) 143 ) 144 return self._message_repository 145 146 def check_connection( 147 self, logger: logging.Logger, config: Mapping[str, Any] 148 ) -> Tuple[bool, Optional[Any]]: 149 """ 150 Check that the source can be accessed using the user-provided configuration. 151 152 For each stream, verify that we can list and read files. 153 154 Returns (True, None) if the connection check is successful. 155 156 Otherwise, the "error" object should describe what went wrong. 157 """ 158 try: 159 streams = self.streams(config) 160 except Exception as config_exception: 161 raise AirbyteTracedException( 162 internal_message="Please check the logged errors for more information.", 163 message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value, 164 exception=AirbyteTracedException(exception=config_exception), 165 failure_type=FailureType.config_error, 166 ) 167 if len(streams) == 0: 168 return ( 169 False, 170 f"No streams are available for source {self.name}. This is probably an issue with the connector. Please verify that your " 171 f"configuration provides permissions to list and read files from the source. Contact support if you are unable to " 172 f"resolve this issue.", 173 ) 174 175 errors = [] 176 tracebacks = [] 177 for stream in streams: 178 if isinstance(stream, FileIdentitiesStream): 179 identity = next(iter(stream.load_identity_groups())) 180 if not identity: 181 errors.append( 182 "Unable to get identities for current configuration, please check your credentials" 183 ) 184 continue 185 if not isinstance(stream, AbstractFileBasedStream): 186 raise ValueError(f"Stream {stream} is not a file-based stream.") 187 try: 188 parsed_config = self._get_parsed_config(config) 189 availability_method = ( 190 stream.availability_strategy.check_availability 191 if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config) 192 else stream.availability_strategy.check_availability_and_parsability 193 ) 194 ( 195 stream_is_available, 196 reason, 197 ) = availability_method(stream, logger, self) 198 except AirbyteTracedException as ate: 199 errors.append(f"Unable to connect to stream {stream.name} - {ate.message}") 200 tracebacks.append(traceback.format_exc()) 201 except Exception: 202 errors.append(f"Unable to connect to stream {stream.name}") 203 tracebacks.append(traceback.format_exc()) 204 else: 205 if not stream_is_available and reason: 206 errors.append(reason) 207 208 if len(errors) == 1 and len(tracebacks) == 1: 209 raise AirbyteTracedException( 210 internal_message=tracebacks[0], 211 message=f"{errors[0]}", 212 failure_type=FailureType.config_error, 213 ) 214 if len(errors) == 1 and len(tracebacks) == 0: 215 raise AirbyteTracedException( 216 message=f"{errors[0]}", 217 failure_type=FailureType.config_error, 218 ) 219 elif len(errors) > 1: 220 raise AirbyteTracedException( 221 internal_message="\n".join(tracebacks), 222 message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}", 223 failure_type=FailureType.config_error, 224 ) 225 226 return not bool(errors), (errors or None) 227 228 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 229 """ 230 Return a list of this source's streams. 231 """ 232 233 if self.catalog: 234 state_manager = ConnectorStateManager(state=self.state) 235 else: 236 # During `check` operations we don't have a catalog so cannot create a state manager. 237 # Since the state manager is only required for incremental syncs, this is fine. 238 state_manager = None 239 240 try: 241 parsed_config = self._get_parsed_config(config) 242 self.stream_reader.config = parsed_config 243 if self.stream_permissions_reader: 244 self.stream_permissions_reader.config = parsed_config 245 streams: List[Stream] = [] 246 for stream_config in parsed_config.streams: 247 # Like state_manager, `catalog_stream` may be None during `check` 248 catalog_stream = self._get_stream_from_catalog(stream_config) 249 stream_state = ( 250 state_manager.get_stream_state(catalog_stream.name, catalog_stream.namespace) 251 if (state_manager and catalog_stream) 252 else None 253 ) 254 self._validate_input_schema(stream_config) 255 256 sync_mode = self._get_sync_mode_from_catalog(stream_config.name) 257 258 if ( 259 sync_mode == SyncMode.full_refresh 260 and hasattr(self, "_concurrency_level") 261 and self._concurrency_level is not None 262 ): 263 cursor = FileBasedFinalStateCursor( 264 stream_config=stream_config, 265 stream_namespace=None, 266 message_repository=self.message_repository, 267 ) 268 stream = FileBasedStreamFacade.create_from_stream( 269 stream=self._make_file_based_stream( 270 stream_config=stream_config, 271 cursor=cursor, 272 parsed_config=parsed_config, 273 ), 274 source=self, 275 logger=self.logger, 276 state=stream_state, 277 cursor=cursor, 278 ) 279 280 elif ( 281 sync_mode == SyncMode.incremental 282 and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor) 283 and hasattr(self, "_concurrency_level") 284 and self._concurrency_level is not None 285 ): 286 assert ( 287 state_manager is not None 288 ), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support." 289 290 cursor = self.cursor_cls( 291 stream_config, 292 stream_config.name, 293 None, 294 stream_state, 295 self.message_repository, 296 state_manager, 297 CursorField(DefaultFileBasedStream.ab_last_mod_col), 298 ) 299 stream = FileBasedStreamFacade.create_from_stream( 300 stream=self._make_file_based_stream( 301 stream_config=stream_config, 302 cursor=cursor, 303 parsed_config=parsed_config, 304 ), 305 source=self, 306 logger=self.logger, 307 state=stream_state, 308 cursor=cursor, 309 ) 310 else: 311 cursor = self.cursor_cls(stream_config) 312 stream = self._make_file_based_stream( 313 stream_config=stream_config, 314 cursor=cursor, 315 parsed_config=parsed_config, 316 ) 317 318 streams.append(stream) 319 320 if include_identities_stream(parsed_config): 321 identities_stream = self._make_identities_stream() 322 streams.append(identities_stream) 323 return streams 324 325 except ValidationError as exc: 326 raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc 327 328 def _make_default_stream( 329 self, 330 stream_config: FileBasedStreamConfig, 331 cursor: Optional[AbstractFileBasedCursor], 332 parsed_config: AbstractFileBasedSpec, 333 ) -> AbstractFileBasedStream: 334 return DefaultFileBasedStream( 335 config=stream_config, 336 catalog_schema=self.stream_schemas.get(stream_config.name), 337 stream_reader=self.stream_reader, 338 availability_strategy=self.availability_strategy, 339 discovery_policy=self.discovery_policy, 340 parsers=self.parsers, 341 validation_policy=self._validate_and_get_validation_policy(stream_config), 342 errors_collector=self.errors_collector, 343 cursor=cursor, 344 use_file_transfer=use_file_transfer(parsed_config), 345 preserve_directory_structure=preserve_directory_structure(parsed_config), 346 ) 347 348 def _ensure_permissions_reader_available(self) -> None: 349 """ 350 Validates that a stream permissions reader is available. 351 Raises a ValueError if the reader is not provided. 352 """ 353 if not self.stream_permissions_reader: 354 raise ValueError( 355 "Stream permissions reader is required for streams that use permissions transfer mode." 356 ) 357 358 def _make_permissions_stream( 359 self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor] 360 ) -> AbstractFileBasedStream: 361 """ 362 Creates a stream that reads permissions from files. 363 """ 364 self._ensure_permissions_reader_available() 365 return PermissionsFileBasedStream( 366 config=stream_config, 367 catalog_schema=self.stream_schemas.get(stream_config.name), 368 stream_reader=self.stream_reader, 369 availability_strategy=self.availability_strategy, 370 discovery_policy=self.discovery_policy, 371 parsers=self.parsers, 372 validation_policy=self._validate_and_get_validation_policy(stream_config), 373 errors_collector=self.errors_collector, 374 cursor=cursor, 375 stream_permissions_reader=self.stream_permissions_reader, # type: ignore 376 ) 377 378 def _make_file_based_stream( 379 self, 380 stream_config: FileBasedStreamConfig, 381 cursor: Optional[AbstractFileBasedCursor], 382 parsed_config: AbstractFileBasedSpec, 383 ) -> AbstractFileBasedStream: 384 """ 385 Creates different streams depending on the type of the transfer mode selected 386 """ 387 if use_permissions_transfer(parsed_config): 388 return self._make_permissions_stream(stream_config, cursor) 389 # we should have a stream for File transfer mode to decouple from DefaultFileBasedStream 390 else: 391 return self._make_default_stream(stream_config, cursor, parsed_config) 392 393 def _make_identities_stream( 394 self, 395 ) -> Stream: 396 self._ensure_permissions_reader_available() 397 return FileIdentitiesStream( 398 catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME), 399 stream_permissions_reader=self.stream_permissions_reader, # type: ignore 400 discovery_policy=self.discovery_policy, 401 errors_collector=self.errors_collector, 402 ) 403 404 def _get_stream_from_catalog( 405 self, stream_config: FileBasedStreamConfig 406 ) -> Optional[AirbyteStream]: 407 if self.catalog: 408 for stream in self.catalog.streams or []: 409 if stream.stream.name == stream_config.name: 410 return stream.stream 411 return None 412 413 def _get_sync_mode_from_catalog(self, stream_name: str) -> Optional[SyncMode]: 414 if self.catalog: 415 for catalog_stream in self.catalog.streams: 416 if stream_name == catalog_stream.stream.name: 417 return catalog_stream.sync_mode 418 self.logger.warning(f"No sync mode was found for {stream_name}.") 419 return None 420 421 def read( 422 self, 423 logger: logging.Logger, 424 config: Mapping[str, Any], 425 catalog: ConfiguredAirbyteCatalog, 426 state: Optional[List[AirbyteStateMessage]] = None, 427 ) -> Iterator[AirbyteMessage]: 428 yield from super().read(logger, config, catalog, state) 429 # emit all the errors collected 430 yield from self.errors_collector.yield_and_raise_collected() 431 # count streams using a certain parser 432 parsed_config = self._get_parsed_config(config) 433 for parser, count in Counter( 434 stream.format.filetype for stream in parsed_config.streams 435 ).items(): 436 yield create_analytics_message(f"file-cdk-{parser}-stream-count", count) 437 438 def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification: 439 """ 440 Returns the specification describing what fields can be configured by a user when setting up a file-based source. 441 """ 442 443 return ConnectorSpecification( 444 documentationUrl=self.spec_class.documentation_url(), 445 connectionSpecification=self.spec_class.schema(), 446 ) 447 448 def _get_parsed_config(self, config: Mapping[str, Any]) -> AbstractFileBasedSpec: 449 return self.spec_class(**config) 450 451 def _validate_and_get_validation_policy( 452 self, stream_config: FileBasedStreamConfig 453 ) -> AbstractSchemaValidationPolicy: 454 if stream_config.validation_policy not in self.validation_policies: 455 # This should never happen because we validate the config against the schema's validation_policy enum 456 raise ValidationError( 457 f"`validation_policy` must be one of {list(self.validation_policies.keys())}", 458 model=FileBasedStreamConfig, 459 ) 460 return self.validation_policies[stream_config.validation_policy] 461 462 def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None: 463 if stream_config.schemaless and stream_config.input_schema: 464 raise ValidationError( 465 "`input_schema` and `schemaless` options cannot both be set", 466 model=FileBasedStreamConfig, 467 )
Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.
91 def __init__( 92 self, 93 stream_reader: AbstractFileBasedStreamReader, 94 spec_class: Type[AbstractFileBasedSpec], 95 catalog: Optional[ConfiguredAirbyteCatalog], 96 config: Optional[Mapping[str, Any]], 97 state: Optional[List[AirbyteStateMessage]], 98 availability_strategy: Optional[AbstractFileBasedAvailabilityStrategy] = None, 99 discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(), 100 parsers: Mapping[Type[Any], FileTypeParser] = default_parsers, 101 validation_policies: Mapping[ 102 ValidationPolicy, AbstractSchemaValidationPolicy 103 ] = DEFAULT_SCHEMA_VALIDATION_POLICIES, 104 cursor_cls: Type[ 105 Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor] 106 ] = FileBasedConcurrentCursor, 107 stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None, 108 ): 109 self.stream_reader = stream_reader 110 self.stream_permissions_reader = stream_permissions_reader 111 self.spec_class = spec_class 112 self.config = config 113 self.catalog = catalog 114 self.state = state 115 self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy( 116 stream_reader 117 ) 118 self.discovery_policy = discovery_policy 119 self.parsers = parsers 120 self.validation_policies = validation_policies 121 self.stream_schemas = ( 122 {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {} 123 ) 124 self.cursor_cls = cursor_cls 125 self.logger = init_logger(f"airbyte.{self.name}") 126 self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector() 127 self._message_repository: Optional[MessageRepository] = None 128 concurrent_source = ConcurrentSource.create( 129 MAX_CONCURRENCY, 130 INITIAL_N_PARTITIONS, 131 self.logger, 132 self._slice_logger, 133 self.message_repository, 134 ) 135 self._state = None 136 super().__init__(concurrent_source)
ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.
The source's streams are still defined through the streams() method. Streams wrapped in a StreamFacade will be processed concurrently. Other streams will be processed sequentially as a later step.
146 def check_connection( 147 self, logger: logging.Logger, config: Mapping[str, Any] 148 ) -> Tuple[bool, Optional[Any]]: 149 """ 150 Check that the source can be accessed using the user-provided configuration. 151 152 For each stream, verify that we can list and read files. 153 154 Returns (True, None) if the connection check is successful. 155 156 Otherwise, the "error" object should describe what went wrong. 157 """ 158 try: 159 streams = self.streams(config) 160 except Exception as config_exception: 161 raise AirbyteTracedException( 162 internal_message="Please check the logged errors for more information.", 163 message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value, 164 exception=AirbyteTracedException(exception=config_exception), 165 failure_type=FailureType.config_error, 166 ) 167 if len(streams) == 0: 168 return ( 169 False, 170 f"No streams are available for source {self.name}. This is probably an issue with the connector. Please verify that your " 171 f"configuration provides permissions to list and read files from the source. Contact support if you are unable to " 172 f"resolve this issue.", 173 ) 174 175 errors = [] 176 tracebacks = [] 177 for stream in streams: 178 if isinstance(stream, FileIdentitiesStream): 179 identity = next(iter(stream.load_identity_groups())) 180 if not identity: 181 errors.append( 182 "Unable to get identities for current configuration, please check your credentials" 183 ) 184 continue 185 if not isinstance(stream, AbstractFileBasedStream): 186 raise ValueError(f"Stream {stream} is not a file-based stream.") 187 try: 188 parsed_config = self._get_parsed_config(config) 189 availability_method = ( 190 stream.availability_strategy.check_availability 191 if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config) 192 else stream.availability_strategy.check_availability_and_parsability 193 ) 194 ( 195 stream_is_available, 196 reason, 197 ) = availability_method(stream, logger, self) 198 except AirbyteTracedException as ate: 199 errors.append(f"Unable to connect to stream {stream.name} - {ate.message}") 200 tracebacks.append(traceback.format_exc()) 201 except Exception: 202 errors.append(f"Unable to connect to stream {stream.name}") 203 tracebacks.append(traceback.format_exc()) 204 else: 205 if not stream_is_available and reason: 206 errors.append(reason) 207 208 if len(errors) == 1 and len(tracebacks) == 1: 209 raise AirbyteTracedException( 210 internal_message=tracebacks[0], 211 message=f"{errors[0]}", 212 failure_type=FailureType.config_error, 213 ) 214 if len(errors) == 1 and len(tracebacks) == 0: 215 raise AirbyteTracedException( 216 message=f"{errors[0]}", 217 failure_type=FailureType.config_error, 218 ) 219 elif len(errors) > 1: 220 raise AirbyteTracedException( 221 internal_message="\n".join(tracebacks), 222 message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}", 223 failure_type=FailureType.config_error, 224 ) 225 226 return not bool(errors), (errors or None)
Check that the source can be accessed using the user-provided configuration.
For each stream, verify that we can list and read files.
Returns (True, None) if the connection check is successful.
Otherwise, the "error" object should describe what went wrong.
228 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 229 """ 230 Return a list of this source's streams. 231 """ 232 233 if self.catalog: 234 state_manager = ConnectorStateManager(state=self.state) 235 else: 236 # During `check` operations we don't have a catalog so cannot create a state manager. 237 # Since the state manager is only required for incremental syncs, this is fine. 238 state_manager = None 239 240 try: 241 parsed_config = self._get_parsed_config(config) 242 self.stream_reader.config = parsed_config 243 if self.stream_permissions_reader: 244 self.stream_permissions_reader.config = parsed_config 245 streams: List[Stream] = [] 246 for stream_config in parsed_config.streams: 247 # Like state_manager, `catalog_stream` may be None during `check` 248 catalog_stream = self._get_stream_from_catalog(stream_config) 249 stream_state = ( 250 state_manager.get_stream_state(catalog_stream.name, catalog_stream.namespace) 251 if (state_manager and catalog_stream) 252 else None 253 ) 254 self._validate_input_schema(stream_config) 255 256 sync_mode = self._get_sync_mode_from_catalog(stream_config.name) 257 258 if ( 259 sync_mode == SyncMode.full_refresh 260 and hasattr(self, "_concurrency_level") 261 and self._concurrency_level is not None 262 ): 263 cursor = FileBasedFinalStateCursor( 264 stream_config=stream_config, 265 stream_namespace=None, 266 message_repository=self.message_repository, 267 ) 268 stream = FileBasedStreamFacade.create_from_stream( 269 stream=self._make_file_based_stream( 270 stream_config=stream_config, 271 cursor=cursor, 272 parsed_config=parsed_config, 273 ), 274 source=self, 275 logger=self.logger, 276 state=stream_state, 277 cursor=cursor, 278 ) 279 280 elif ( 281 sync_mode == SyncMode.incremental 282 and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor) 283 and hasattr(self, "_concurrency_level") 284 and self._concurrency_level is not None 285 ): 286 assert ( 287 state_manager is not None 288 ), "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support." 289 290 cursor = self.cursor_cls( 291 stream_config, 292 stream_config.name, 293 None, 294 stream_state, 295 self.message_repository, 296 state_manager, 297 CursorField(DefaultFileBasedStream.ab_last_mod_col), 298 ) 299 stream = FileBasedStreamFacade.create_from_stream( 300 stream=self._make_file_based_stream( 301 stream_config=stream_config, 302 cursor=cursor, 303 parsed_config=parsed_config, 304 ), 305 source=self, 306 logger=self.logger, 307 state=stream_state, 308 cursor=cursor, 309 ) 310 else: 311 cursor = self.cursor_cls(stream_config) 312 stream = self._make_file_based_stream( 313 stream_config=stream_config, 314 cursor=cursor, 315 parsed_config=parsed_config, 316 ) 317 318 streams.append(stream) 319 320 if include_identities_stream(parsed_config): 321 identities_stream = self._make_identities_stream() 322 streams.append(identities_stream) 323 return streams 324 325 except ValidationError as exc: 326 raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc
Return a list of this source's streams.
421 def read( 422 self, 423 logger: logging.Logger, 424 config: Mapping[str, Any], 425 catalog: ConfiguredAirbyteCatalog, 426 state: Optional[List[AirbyteStateMessage]] = None, 427 ) -> Iterator[AirbyteMessage]: 428 yield from super().read(logger, config, catalog, state) 429 # emit all the errors collected 430 yield from self.errors_collector.yield_and_raise_collected() 431 # count streams using a certain parser 432 parsed_config = self._get_parsed_config(config) 433 for parser, count in Counter( 434 stream.format.filetype for stream in parsed_config.streams 435 ).items(): 436 yield create_analytics_message(f"file-cdk-{parser}-stream-count", count)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.
438 def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification: 439 """ 440 Returns the specification describing what fields can be configured by a user when setting up a file-based source. 441 """ 442 443 return ConnectorSpecification( 444 documentationUrl=self.spec_class.documentation_url(), 445 connectionSpecification=self.spec_class.schema(), 446 )
Returns the specification describing what fields can be configured by a user when setting up a file-based source.
Inherited Members
13class FileBasedSourceError(Enum): 14 EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict." 15 GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split." 16 ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding." 17 ERROR_CASTING_VALUE = "Could not cast the value to the expected type." 18 ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string." 19 ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record." 20 ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files." 21 ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files." 22 ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable." 23 ERROR_PARSING_USER_PROVIDED_SCHEMA = ( 24 "The provided schema could not be transformed into valid JSON Schema." 25 ) 26 ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy." 27 ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." 28 ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." 29 STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema." 30 NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key." 31 UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type." 32 SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?" 33 INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns." 34 CONFIG_VALIDATION_ERROR = "Error creating stream config object." 35 MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing." 36 UNDEFINED_PARSER = "No parser is defined for this file type." 37 UNDEFINED_VALIDATION_POLICY = ( 38 "The validation policy defined in the config does not exist for the source." 39 )
An enumeration.
29class FileBasedStreamConfig(BaseModel): 30 name: str = Field(title="Name", description="The name of the stream.") 31 globs: Optional[List[str]] = Field( 32 default=["**"], 33 title="Globs", 34 description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.', 35 order=1, 36 ) 37 legacy_prefix: Optional[str] = Field( 38 title="Legacy Prefix", 39 description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.", 40 airbyte_hidden=True, 41 ) 42 validation_policy: ValidationPolicy = Field( 43 title="Validation Policy", 44 description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.", 45 default=ValidationPolicy.emit_record, 46 ) 47 input_schema: Optional[str] = Field( 48 title="Input Schema", 49 description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.", 50 ) 51 primary_key: Optional[str] = Field( 52 title="Primary Key", 53 description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.", 54 airbyte_hidden=True, # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here. 55 ) 56 days_to_sync_if_history_is_full: int = Field( 57 title="Days To Sync If History Is Full", 58 description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.", 59 default=3, 60 ) 61 format: Union[ 62 AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat 63 ] = Field( 64 title="Format", 65 description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", 66 ) 67 schemaless: bool = Field( 68 title="Schemaless", 69 description="When enabled, syncs will not validate or structure records against the stream's schema.", 70 default=False, 71 ) 72 recent_n_files_to_read_for_schema_discovery: Optional[int] = Field( 73 title="Files To Read For Schema Discover", 74 description="The number of resent files which will be used to discover the schema for this stream.", 75 default=None, 76 gt=0, 77 ) 78 79 @validator("input_schema", pre=True) 80 def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: 81 if v: 82 if type_mapping_to_jsonschema(v): 83 return v 84 else: 85 raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA) 86 return None 87 88 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 89 """ 90 User defined input_schema is defined as a string in the config. This method takes the string representation 91 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 92 """ 93 if self.input_schema: 94 schema = type_mapping_to_jsonschema(self.input_schema) 95 if not schema: 96 raise ValueError( 97 f"Unable to create JSON schema from input schema {self.input_schema}" 98 ) 99 return schema 100 return None
88 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 89 """ 90 User defined input_schema is defined as a string in the config. This method takes the string representation 91 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 92 """ 93 if self.input_schema: 94 schema = type_mapping_to_jsonschema(self.input_schema) 95 if not schema: 96 raise ValueError( 97 f"Unable to create JSON schema from input schema {self.input_schema}" 98 ) 99 return schema 100 return None
User defined input_schema is defined as a string in the config. This method takes the string representation and converts it into a Mapping[str, Any] which is used by file-based CDK components.
An enumeration.
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
12class RemoteFile(BaseModel): 13 """ 14 A file in a file-based stream. 15 """ 16 17 uri: str 18 last_modified: datetime 19 mime_type: Optional[str] = None
A file in a file-based stream.