airbyte_cdk.sources.file_based
1from .config.abstract_file_based_spec import AbstractFileBasedSpec 2from .config.csv_format import CsvFormat 3from .config.file_based_stream_config import FileBasedStreamConfig 4from .config.jsonl_format import JsonlFormat 5from .exceptions import CustomFileBasedException, ErrorListingFiles, FileBasedSourceError 6from .file_based_source import DEFAULT_CONCURRENCY, FileBasedSource 7from .file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode 8from .remote_file import RemoteFile 9from .stream.cursor import DefaultFileBasedCursor 10 11__all__ = [ 12 "AbstractFileBasedSpec", 13 "AbstractFileBasedStreamReader", 14 "CsvFormat", 15 "CustomFileBasedException", 16 "DefaultFileBasedCursor", 17 "ErrorListingFiles", 18 "FileBasedSource", 19 "FileBasedSourceError", 20 "FileBasedStreamConfig", 21 "FileReadMode", 22 "JsonlFormat", 23 "RemoteFile", 24]
48class AbstractFileBasedSpec(BaseModel): 49 """ 50 Used during spec; allows the developer to configure the cloud provider specific options 51 that are needed when users configure a file-based source. 52 """ 53 54 start_date: Optional[str] = Field( 55 title="Start Date", 56 description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.", 57 examples=["2021-01-01T00:00:00.000000Z"], 58 format="date-time", 59 pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$", 60 pattern_descriptor="YYYY-MM-DDTHH:mm:ss.SSSSSSZ", 61 order=1, 62 ) 63 64 streams: List[FileBasedStreamConfig] = Field( 65 title="The list of streams to sync", 66 description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.', 67 order=10, 68 ) 69 70 delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field( 71 title="Delivery Method", 72 discriminator="delivery_type", 73 type="object", 74 order=7, 75 display_type="radio", 76 group="advanced", 77 default="use_records_transfer", 78 airbyte_hidden=True, 79 ) 80 81 @classmethod 82 @abstractmethod 83 def documentation_url(cls) -> AnyUrl: 84 """ 85 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 86 """ 87 88 @classmethod 89 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 90 """ 91 Generates the mapping comprised of the config fields 92 """ 93 schema = super().schema(*args, **kwargs) 94 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 95 schema_helpers.expand_refs(transformed_schema) 96 cls.replace_enum_allOf_and_anyOf(transformed_schema) 97 cls.remove_discriminator(transformed_schema) 98 99 return transformed_schema 100 101 @staticmethod 102 def remove_discriminator(schema: Dict[str, Any]) -> None: 103 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 104 dpath.delete(schema, "properties/**/discriminator") 105 106 @staticmethod 107 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 108 """ 109 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 110 Unpacks the enums under allOf and moves them up a level under the enum key 111 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 112 additional validation that an incoming config only matches exactly one of a field's types. 113 """ 114 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 115 objects_to_check["type"] = "object" 116 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 117 for format in objects_to_check["oneOf"]: 118 for key in format["properties"]: 119 object_property = format["properties"][key] 120 AbstractFileBasedSpec.move_enum_to_root(object_property) 121 122 properties_to_change = ["validation_policy"] 123 for property_to_change in properties_to_change: 124 property_object = schema["properties"]["streams"]["items"]["properties"][ 125 property_to_change 126 ] 127 if "anyOf" in property_object: 128 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 129 "type" 130 ] = "object" 131 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 132 "oneOf" 133 ] = property_object.pop("anyOf") 134 AbstractFileBasedSpec.move_enum_to_root(property_object) 135 136 csv_format_schemas = list( 137 filter( 138 lambda format: format["properties"]["filetype"]["default"] == "csv", 139 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 140 ) 141 ) 142 if len(csv_format_schemas) != 1: 143 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 144 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 145 "properties" 146 ]["header_definition"].pop("anyOf", []) 147 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 148 return schema 149 150 @staticmethod 151 def move_enum_to_root(object_property: Dict[str, Any]) -> None: 152 if "allOf" in object_property and "enum" in object_property["allOf"][0]: 153 object_property["enum"] = object_property["allOf"][0]["enum"] 154 object_property.pop("allOf")
Used during spec; allows the developer to configure the cloud provider specific options that are needed when users configure a file-based source.
81 @classmethod 82 @abstractmethod 83 def documentation_url(cls) -> AnyUrl: 84 """ 85 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 86 """
Returns
link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
88 @classmethod 89 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 90 """ 91 Generates the mapping comprised of the config fields 92 """ 93 schema = super().schema(*args, **kwargs) 94 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 95 schema_helpers.expand_refs(transformed_schema) 96 cls.replace_enum_allOf_and_anyOf(transformed_schema) 97 cls.remove_discriminator(transformed_schema) 98 99 return transformed_schema
Generates the mapping comprised of the config fields
101 @staticmethod 102 def remove_discriminator(schema: Dict[str, Any]) -> None: 103 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 104 dpath.delete(schema, "properties/**/discriminator")
pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references
106 @staticmethod 107 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 108 """ 109 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 110 Unpacks the enums under allOf and moves them up a level under the enum key 111 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 112 additional validation that an incoming config only matches exactly one of a field's types. 113 """ 114 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 115 objects_to_check["type"] = "object" 116 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 117 for format in objects_to_check["oneOf"]: 118 for key in format["properties"]: 119 object_property = format["properties"][key] 120 AbstractFileBasedSpec.move_enum_to_root(object_property) 121 122 properties_to_change = ["validation_policy"] 123 for property_to_change in properties_to_change: 124 property_object = schema["properties"]["streams"]["items"]["properties"][ 125 property_to_change 126 ] 127 if "anyOf" in property_object: 128 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 129 "type" 130 ] = "object" 131 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 132 "oneOf" 133 ] = property_object.pop("anyOf") 134 AbstractFileBasedSpec.move_enum_to_root(property_object) 135 136 csv_format_schemas = list( 137 filter( 138 lambda format: format["properties"]["filetype"]["default"] == "csv", 139 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 140 ) 141 ) 142 if len(csv_format_schemas) != 1: 143 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 144 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 145 "properties" 146 ]["header_definition"].pop("anyOf", []) 147 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 148 return schema
allOfs are not supported by the UI, but pydantic is automatically writing them for enums. Unpacks the enums under allOf and moves them up a level under the enum key anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the additional validation that an incoming config only matches exactly one of a field's types.
35class AbstractFileBasedStreamReader(ABC): 36 DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" 37 FILE_RELATIVE_PATH = "file_relative_path" 38 FILE_NAME = "file_name" 39 LOCAL_FILE_PATH = "local_file_path" 40 FILE_FOLDER = "file_folder" 41 FILE_SIZE_LIMIT = 1_500_000_000 42 43 def __init__(self) -> None: 44 self._config = None 45 46 @property 47 def config(self) -> Optional[AbstractFileBasedSpec]: 48 return self._config 49 50 @config.setter 51 @abstractmethod 52 def config(self, value: AbstractFileBasedSpec) -> None: 53 """ 54 FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader. 55 56 Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader 57 will require keys that (for example) allow it to authenticate with the 3rd party. 58 59 Therefore, concrete implementations of AbstractFileBasedStreamReader's config setter should assert that `value` is of the correct 60 config type for that type of StreamReader. 61 """ 62 ... 63 64 @abstractmethod 65 def open_file( 66 self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger 67 ) -> IOBase: 68 """ 69 Return a file handle for reading. 70 71 Many sources will be able to use smart_open to implement this method, 72 for example: 73 74 client = boto3.Session(...) 75 return smart_open.open(remote_file.uri, transport_params={"client": client}) 76 """ 77 ... 78 79 @abstractmethod 80 def get_matching_files( 81 self, 82 globs: List[str], 83 prefix: Optional[str], 84 logger: logging.Logger, 85 ) -> Iterable[RemoteFile]: 86 """ 87 Return all files that match any of the globs. 88 89 Example: 90 91 The source has files "a.json", "foo/a.json", "foo/bar/a.json" 92 93 If globs = ["*.json"] then this method returns ["a.json"]. 94 95 If globs = ["foo/*.json"] then this method returns ["foo/a.json"]. 96 97 Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs` 98 are available, which may be helpful when implementing this method. 99 """ 100 ... 101 102 def filter_files_by_globs_and_start_date( 103 self, files: List[RemoteFile], globs: List[str] 104 ) -> Iterable[RemoteFile]: 105 """ 106 Utility method for filtering files based on globs. 107 """ 108 start_date = ( 109 datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT) 110 if self.config and self.config.start_date 111 else None 112 ) 113 seen = set() 114 115 for file in files: 116 if self.file_matches_globs(file, globs): 117 if file.uri not in seen and (not start_date or file.last_modified >= start_date): 118 seen.add(file.uri) 119 yield file 120 121 @staticmethod 122 def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool: 123 # Use the GLOBSTAR flag to enable recursive ** matching 124 # (https://facelessuser.github.io/wcmatch/wcmatch/#globstar) 125 return any(globmatch(file.uri, g, flags=GLOBSTAR) for g in globs) 126 127 @staticmethod 128 def get_prefixes_from_globs(globs: List[str]) -> Set[str]: 129 """ 130 Utility method for extracting prefixes from the globs. 131 """ 132 prefixes = {glob.split("*")[0] for glob in globs} 133 return set(filter(lambda x: bool(x), prefixes)) 134 135 def use_file_transfer(self) -> bool: 136 if self.config: 137 return use_file_transfer(self.config) 138 return False 139 140 def preserve_directory_structure(self) -> bool: 141 # fall back to preserve subdirectories if config is not present or incomplete 142 if self.config: 143 return preserve_directory_structure(self.config) 144 return True 145 146 def include_identities_stream(self) -> bool: 147 if self.config: 148 return include_identities_stream(self.config) 149 return False 150 151 def upload( 152 self, file: UploadableRemoteFile, local_directory: str, logger: logging.Logger 153 ) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]: 154 """ 155 This is required for connectors that will support writing to 156 files. It will handle the logic to download,get,read,acquire or 157 whatever is more efficient to get a file from the source. 158 159 Args: 160 file (RemoteFile): The remote file object containing URI and metadata. 161 local_directory (str): The local directory path where the file will be downloaded. 162 logger (logging.Logger): Logger for logging information and errors. 163 164 Returns: 165 AirbyteRecordMessageFileReference: A file reference object containing: 166 - staging_file_url (str): The absolute path to the referenced file in the staging area. 167 - file_size_bytes (int): The size of the referenced file in bytes. 168 - source_file_relative_path (str): The relative path to the referenced file in source. 169 """ 170 if not isinstance(file, UploadableRemoteFile): 171 raise TypeError(f"Expected UploadableRemoteFile, got {type(file)}") 172 173 file_size = file.size 174 175 if file_size > self.FILE_SIZE_LIMIT: 176 message = ( 177 f"File size exceeds the {self.FILE_SIZE_LIMIT / 1e9} GB limit. File URI: {file.uri}" 178 ) 179 raise FileSizeLimitError( 180 message=message, internal_message=message, failure_type=FailureType.config_error 181 ) 182 183 file_paths = self._get_file_transfer_paths( 184 source_file_relative_path=file.source_file_relative_path, 185 staging_directory=local_directory, 186 ) 187 local_file_path = file_paths[self.LOCAL_FILE_PATH] 188 file_relative_path = file_paths[self.FILE_RELATIVE_PATH] 189 file_name = file_paths[self.FILE_NAME] 190 191 logger.info( 192 f"Starting to download the file {file.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)" 193 ) 194 start_download_time = time.time() 195 196 file.download_to_local_directory(local_file_path) 197 198 write_duration = time.time() - start_download_time 199 logger.info( 200 f"Finished downloading the file {file.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds." 201 ) 202 203 file_record_data = FileRecordData( 204 folder=file_paths[self.FILE_FOLDER], 205 file_name=file_name, 206 bytes=file_size, 207 id=file.id, 208 mime_type=file.mime_type, 209 created_at=file.created_at, 210 updated_at=file.updated_at, 211 source_uri=file.uri, 212 ) 213 file_reference = AirbyteRecordMessageFileReference( 214 staging_file_url=local_file_path, 215 source_file_relative_path=file_relative_path, 216 file_size_bytes=file_size, 217 ) 218 return file_record_data, file_reference 219 220 def _get_file_transfer_paths( 221 self, source_file_relative_path: str, staging_directory: str 222 ) -> MutableMapping[str, Any]: 223 """ 224 This method is used to get the file transfer paths for a given source file relative path and local directory. 225 It returns a dictionary with the following keys: 226 - FILE_RELATIVE_PATH: The relative path to file in reference to the staging directory. 227 - LOCAL_FILE_PATH: The absolute path to the file. 228 - FILE_NAME: The name of the referenced file. 229 - FILE_FOLDER: The folder of the referenced file. 230 """ 231 preserve_directory_structure = self.preserve_directory_structure() 232 233 file_name = path.basename(source_file_relative_path) 234 file_folder = path.dirname(source_file_relative_path) 235 if preserve_directory_structure: 236 # Remove left slashes from source path format to make relative path for writing locally 237 file_relative_path = source_file_relative_path.lstrip("/") 238 else: 239 file_relative_path = file_name 240 local_file_path = path.join(staging_directory, file_relative_path) 241 # Ensure the local directory exists 242 makedirs(path.dirname(local_file_path), exist_ok=True) 243 244 file_paths = { 245 self.FILE_RELATIVE_PATH: file_relative_path, 246 self.LOCAL_FILE_PATH: local_file_path, 247 self.FILE_NAME: file_name, 248 self.FILE_FOLDER: file_folder, 249 } 250 return file_paths
Helper class that provides a standard way to create an ABC using inheritance.
FileBasedSource reads the config from disk and parses it, and once parsed, the source sets the config on its StreamReader.
Note: FileBasedSource only requires the keys defined in the abstract config, whereas concrete implementations of StreamReader will require keys that (for example) allow it to authenticate with the 3rd party.
Therefore, concrete implementations of AbstractFileBasedStreamReader's config setter should assert that value is of the correct
config type for that type of StreamReader.
64 @abstractmethod 65 def open_file( 66 self, file: RemoteFile, mode: FileReadMode, encoding: Optional[str], logger: logging.Logger 67 ) -> IOBase: 68 """ 69 Return a file handle for reading. 70 71 Many sources will be able to use smart_open to implement this method, 72 for example: 73 74 client = boto3.Session(...) 75 return smart_open.open(remote_file.uri, transport_params={"client": client}) 76 """ 77 ...
Return a file handle for reading.
Many sources will be able to use smart_open to implement this method, for example:
client = boto3.Session(...) return smart_open.open(remote_file.uri, transport_params={"client": client})
79 @abstractmethod 80 def get_matching_files( 81 self, 82 globs: List[str], 83 prefix: Optional[str], 84 logger: logging.Logger, 85 ) -> Iterable[RemoteFile]: 86 """ 87 Return all files that match any of the globs. 88 89 Example: 90 91 The source has files "a.json", "foo/a.json", "foo/bar/a.json" 92 93 If globs = ["*.json"] then this method returns ["a.json"]. 94 95 If globs = ["foo/*.json"] then this method returns ["foo/a.json"]. 96 97 Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs` 98 are available, which may be helpful when implementing this method. 99 """ 100 ...
Return all files that match any of the globs.
Example:
The source has files "a.json", "foo/a.json", "foo/bar/a.json"
If globs = ["*.json"] then this method returns ["a.json"].
If globs = ["foo/*.json"] then this method returns ["foo/a.json"].
Utility method self.filter_files_by_globs and self.get_prefixes_from_globs
are available, which may be helpful when implementing this method.
102 def filter_files_by_globs_and_start_date( 103 self, files: List[RemoteFile], globs: List[str] 104 ) -> Iterable[RemoteFile]: 105 """ 106 Utility method for filtering files based on globs. 107 """ 108 start_date = ( 109 datetime.strptime(self.config.start_date, self.DATE_TIME_FORMAT) 110 if self.config and self.config.start_date 111 else None 112 ) 113 seen = set() 114 115 for file in files: 116 if self.file_matches_globs(file, globs): 117 if file.uri not in seen and (not start_date or file.last_modified >= start_date): 118 seen.add(file.uri) 119 yield file
Utility method for filtering files based on globs.
127 @staticmethod 128 def get_prefixes_from_globs(globs: List[str]) -> Set[str]: 129 """ 130 Utility method for extracting prefixes from the globs. 131 """ 132 prefixes = {glob.split("*")[0] for glob in globs} 133 return set(filter(lambda x: bool(x), prefixes))
Utility method for extracting prefixes from the globs.
151 def upload( 152 self, file: UploadableRemoteFile, local_directory: str, logger: logging.Logger 153 ) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]: 154 """ 155 This is required for connectors that will support writing to 156 files. It will handle the logic to download,get,read,acquire or 157 whatever is more efficient to get a file from the source. 158 159 Args: 160 file (RemoteFile): The remote file object containing URI and metadata. 161 local_directory (str): The local directory path where the file will be downloaded. 162 logger (logging.Logger): Logger for logging information and errors. 163 164 Returns: 165 AirbyteRecordMessageFileReference: A file reference object containing: 166 - staging_file_url (str): The absolute path to the referenced file in the staging area. 167 - file_size_bytes (int): The size of the referenced file in bytes. 168 - source_file_relative_path (str): The relative path to the referenced file in source. 169 """ 170 if not isinstance(file, UploadableRemoteFile): 171 raise TypeError(f"Expected UploadableRemoteFile, got {type(file)}") 172 173 file_size = file.size 174 175 if file_size > self.FILE_SIZE_LIMIT: 176 message = ( 177 f"File size exceeds the {self.FILE_SIZE_LIMIT / 1e9} GB limit. File URI: {file.uri}" 178 ) 179 raise FileSizeLimitError( 180 message=message, internal_message=message, failure_type=FailureType.config_error 181 ) 182 183 file_paths = self._get_file_transfer_paths( 184 source_file_relative_path=file.source_file_relative_path, 185 staging_directory=local_directory, 186 ) 187 local_file_path = file_paths[self.LOCAL_FILE_PATH] 188 file_relative_path = file_paths[self.FILE_RELATIVE_PATH] 189 file_name = file_paths[self.FILE_NAME] 190 191 logger.info( 192 f"Starting to download the file {file.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)" 193 ) 194 start_download_time = time.time() 195 196 file.download_to_local_directory(local_file_path) 197 198 write_duration = time.time() - start_download_time 199 logger.info( 200 f"Finished downloading the file {file.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds." 201 ) 202 203 file_record_data = FileRecordData( 204 folder=file_paths[self.FILE_FOLDER], 205 file_name=file_name, 206 bytes=file_size, 207 id=file.id, 208 mime_type=file.mime_type, 209 created_at=file.created_at, 210 updated_at=file.updated_at, 211 source_uri=file.uri, 212 ) 213 file_reference = AirbyteRecordMessageFileReference( 214 staging_file_url=local_file_path, 215 source_file_relative_path=file_relative_path, 216 file_size_bytes=file_size, 217 ) 218 return file_record_data, file_reference
This is required for connectors that will support writing to files. It will handle the logic to download,get,read,acquire or whatever is more efficient to get a file from the source.
Arguments:
- file (RemoteFile): The remote file object containing URI and metadata. local_directory (str): The local directory path where the file will be downloaded. logger (logging.Logger): Logger for logging information and errors.
- Returns: AirbyteRecordMessageFileReference: A file reference object containing:
- staging_file_url (str): The absolute path to the referenced file in the staging area.
- file_size_bytes (int): The size of the referenced file in bytes.
- source_file_relative_path (str): The relative path to the referenced file in source.
85class CsvFormat(BaseModel): 86 class Config(OneOfOptionConfig): 87 title = "CSV Format" 88 discriminator = "filetype" 89 90 filetype: str = Field( 91 "csv", 92 const=True, 93 ) 94 delimiter: str = Field( 95 title="Delimiter", 96 description="The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.", 97 default=",", 98 ) 99 quote_char: str = Field( 100 title="Quote Character", 101 default='"', 102 description="The character used for quoting CSV values. To disallow quoting, make this field blank.", 103 ) 104 escape_char: Optional[str] = Field( 105 title="Escape Character", 106 default=None, 107 description="The character used for escaping special characters. To disallow escaping, leave this field blank.", 108 ) 109 encoding: Optional[str] = Field( 110 default="utf8", 111 description='The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href="https://docs.python.org/3/library/codecs.html#standard-encodings" target="_blank">list of python encodings</a> for allowable options.', 112 ) 113 double_quote: bool = Field( 114 title="Double Quote", 115 default=True, 116 description="Whether two quotes in a quoted CSV value denote a single quote in the data.", 117 ) 118 null_values: Set[str] = Field( 119 title="Null Values", 120 default=[], 121 description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.", 122 ) 123 strings_can_be_null: bool = Field( 124 title="Strings Can Be Null", 125 default=True, 126 description="Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.", 127 ) 128 skip_rows_before_header: int = Field( 129 title="Skip Rows Before Header", 130 default=0, 131 description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.", 132 ) 133 skip_rows_after_header: int = Field( 134 title="Skip Rows After Header", 135 default=0, 136 description="The number of rows to skip after the header row.", 137 ) 138 header_definition: Union[CsvHeaderFromCsv, CsvHeaderAutogenerated, CsvHeaderUserProvided] = ( 139 Field( 140 title="CSV Header Definition", 141 default=CsvHeaderFromCsv(header_definition_type=CsvHeaderDefinitionType.FROM_CSV.value), 142 description="How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.", 143 ) 144 ) 145 true_values: Set[str] = Field( 146 title="True Values", 147 default=DEFAULT_TRUE_VALUES, 148 description="A set of case-sensitive strings that should be interpreted as true values.", 149 ) 150 false_values: Set[str] = Field( 151 title="False Values", 152 default=DEFAULT_FALSE_VALUES, 153 description="A set of case-sensitive strings that should be interpreted as false values.", 154 ) 155 inference_type: InferenceType = Field( 156 title="Inference Type", 157 default=InferenceType.NONE, 158 description="How to infer the types of the columns. If none, inference default to strings.", 159 airbyte_hidden=True, 160 ) 161 ignore_errors_on_fields_mismatch: bool = Field( 162 title="Ignore errors on field mismatch", 163 default=False, 164 description="Whether to ignore errors that occur when the number of fields in the CSV does not match the number of columns in the schema.", 165 ) 166 167 @validator("delimiter") 168 def validate_delimiter(cls, v: str) -> str: 169 if v == r"\t": 170 v = "\t" 171 if len(v) != 1: 172 raise ValueError("delimiter should only be one character") 173 if v in {"\r", "\n"}: 174 raise ValueError(f"delimiter cannot be {v}") 175 return v 176 177 @validator("quote_char") 178 def validate_quote_char(cls, v: str) -> str: 179 if len(v) != 1: 180 raise ValueError("quote_char should only be one character") 181 return v 182 183 @validator("escape_char") 184 def validate_escape_char(cls, v: str) -> str: 185 if v is not None and len(v) != 1: 186 raise ValueError("escape_char should only be one character") 187 return v 188 189 @validator("encoding") 190 def validate_encoding(cls, v: str) -> str: 191 try: 192 codecs.lookup(v) 193 except LookupError: 194 raise ValueError(f"invalid encoding format: {v}") 195 return v 196 197 @root_validator 198 def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]: 199 definition_type = values.get("header_definition_type") 200 column_names = values.get("user_provided_column_names") 201 if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names: 202 raise ValidationError( 203 "`user_provided_column_names` should be defined if the definition 'User Provided'.", 204 model=CsvFormat, 205 ) 206 if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names: 207 raise ValidationError( 208 "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.", 209 model=CsvFormat, 210 ) 211 return values
197 @root_validator 198 def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]: 199 definition_type = values.get("header_definition_type") 200 column_names = values.get("user_provided_column_names") 201 if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names: 202 raise ValidationError( 203 "`user_provided_column_names` should be defined if the definition 'User Provided'.", 204 model=CsvFormat, 205 ) 206 if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names: 207 raise ValidationError( 208 "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.", 209 model=CsvFormat, 210 ) 211 return values
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
150class CustomFileBasedException(AirbyteTracedException): 151 """ 152 A specialized exception for file-based connectors. 153 154 This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages. 155 """ 156 157 pass
A specialized exception for file-based connectors.
This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
18class DefaultFileBasedCursor(AbstractFileBasedCursor): 19 DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3 20 DEFAULT_MAX_HISTORY_SIZE = 10_000 21 DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" 22 CURSOR_FIELD = "_ab_source_file_last_modified" 23 24 def __init__(self, stream_config: FileBasedStreamConfig, **_: Any): 25 super().__init__(stream_config) # type: ignore [safe-super] 26 self._file_to_datetime_history: MutableMapping[str, str] = {} 27 self._time_window_if_history_is_full = timedelta( 28 days=stream_config.days_to_sync_if_history_is_full 29 or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL 30 ) 31 32 if self._time_window_if_history_is_full <= timedelta(): 33 raise ValueError( 34 f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}" 35 ) 36 37 self._start_time = self._compute_start_time() 38 self._initial_earliest_file_in_history: Optional[RemoteFile] = None 39 40 def set_initial_state(self, value: StreamState) -> None: 41 self._file_to_datetime_history = value.get("history", {}) 42 self._start_time = self._compute_start_time() 43 self._initial_earliest_file_in_history = self._compute_earliest_file_in_history() 44 45 def add_file(self, file: RemoteFile) -> None: 46 self._file_to_datetime_history[file.uri] = file.last_modified.strftime( 47 self.DATE_TIME_FORMAT 48 ) 49 if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: 50 # Get the earliest file based on its last modified date and its uri 51 oldest_file = self._compute_earliest_file_in_history() 52 if oldest_file: 53 del self._file_to_datetime_history[oldest_file.uri] 54 else: 55 raise Exception( 56 "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK." 57 ) 58 59 def get_state(self) -> StreamState: 60 state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()} 61 return state 62 63 def _get_cursor(self) -> Optional[str]: 64 """ 65 Returns the cursor value. 66 67 Files are synced in order of last-modified with secondary sort on filename, so the cursor value is 68 a string joining the last-modified timestamp of the last synced file and the name of the file. 69 """ 70 if self._file_to_datetime_history.items(): 71 filename, timestamp = max( 72 self._file_to_datetime_history.items(), key=lambda x: (x[1], x[0]) 73 ) 74 return f"{timestamp}_{filename}" 75 return None 76 77 def _is_history_full(self) -> bool: 78 """ 79 Returns true if the state's history is full, meaning new entries will start to replace old entries. 80 """ 81 return len(self._file_to_datetime_history) >= self.DEFAULT_MAX_HISTORY_SIZE 82 83 def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool: 84 if file.uri in self._file_to_datetime_history: 85 # If the file's uri is in the history, we should sync the file if it has been modified since it was synced 86 updated_at_from_history = datetime.strptime( 87 self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT 88 ) 89 if file.last_modified < updated_at_from_history: 90 logger.warning( 91 f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file." 92 ) 93 else: 94 return file.last_modified > updated_at_from_history 95 return file.last_modified > updated_at_from_history 96 if self._is_history_full(): 97 if self._initial_earliest_file_in_history is None: 98 return True 99 if file.last_modified > self._initial_earliest_file_in_history.last_modified: 100 # If the history is partial and the file's datetime is strictly greater than the earliest file in the history, 101 # we should sync it 102 return True 103 elif file.last_modified == self._initial_earliest_file_in_history.last_modified: 104 # If the history is partial and the file's datetime is equal to the earliest file in the history, 105 # we should sync it if its uri is strictly greater than the earliest file in the history 106 return file.uri > self._initial_earliest_file_in_history.uri 107 else: 108 # Otherwise, only sync the file if it has been modified since the start of the time window 109 return file.last_modified >= self.get_start_time() 110 else: 111 # The file is not in the history and the history is complete. We know we need to sync the file 112 return True 113 114 def get_files_to_sync( 115 self, all_files: Iterable[RemoteFile], logger: logging.Logger 116 ) -> Iterable[RemoteFile]: 117 if self._is_history_full(): 118 logger.warning( 119 f"The state history is full. " 120 f"This sync and future syncs won't be able to use the history to filter out duplicate files. " 121 f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files." 122 ) 123 for f in all_files: 124 if self._should_sync_file(f, logger): 125 yield f 126 127 def get_start_time(self) -> datetime: 128 return self._start_time 129 130 def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]: 131 if self._file_to_datetime_history: 132 filename, last_modified = min( 133 self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0]) 134 ) 135 return RemoteFile( 136 uri=filename, last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT) 137 ) 138 else: 139 return None 140 141 def _compute_start_time(self) -> datetime: 142 if not self._file_to_datetime_history: 143 return datetime.min 144 else: 145 earliest = min(self._file_to_datetime_history.values()) 146 earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT) 147 if self._is_history_full(): 148 time_window = datetime.now() - self._time_window_if_history_is_full 149 earliest_dt = min(earliest_dt, time_window) 150 return earliest_dt
Abstract base class for cursors used by file-based streams.
24 def __init__(self, stream_config: FileBasedStreamConfig, **_: Any): 25 super().__init__(stream_config) # type: ignore [safe-super] 26 self._file_to_datetime_history: MutableMapping[str, str] = {} 27 self._time_window_if_history_is_full = timedelta( 28 days=stream_config.days_to_sync_if_history_is_full 29 or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL 30 ) 31 32 if self._time_window_if_history_is_full <= timedelta(): 33 raise ValueError( 34 f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}" 35 ) 36 37 self._start_time = self._compute_start_time() 38 self._initial_earliest_file_in_history: Optional[RemoteFile] = None
Common interface for all cursors.
40 def set_initial_state(self, value: StreamState) -> None: 41 self._file_to_datetime_history = value.get("history", {}) 42 self._start_time = self._compute_start_time() 43 self._initial_earliest_file_in_history = self._compute_earliest_file_in_history()
Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.
Parameters
- value: The stream state
45 def add_file(self, file: RemoteFile) -> None: 46 self._file_to_datetime_history[file.uri] = file.last_modified.strftime( 47 self.DATE_TIME_FORMAT 48 ) 49 if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: 50 # Get the earliest file based on its last modified date and its uri 51 oldest_file = self._compute_earliest_file_in_history() 52 if oldest_file: 53 del self._file_to_datetime_history[oldest_file.uri] 54 else: 55 raise Exception( 56 "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK." 57 )
Add a file to the cursor. This method is called when a file is processed by the stream.
Parameters
- file: The file to add
59 def get_state(self) -> StreamState: 60 state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()} 61 return state
Get the state of the cursor.
114 def get_files_to_sync( 115 self, all_files: Iterable[RemoteFile], logger: logging.Logger 116 ) -> Iterable[RemoteFile]: 117 if self._is_history_full(): 118 logger.warning( 119 f"The state history is full. " 120 f"This sync and future syncs won't be able to use the history to filter out duplicate files. " 121 f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files." 122 ) 123 for f in all_files: 124 if self._should_sync_file(f, logger): 125 yield f
Given the list of files in the source, return the files that should be synced.
Parameters
- all_files: All files in the source
- logger:
Returns
The files that should be synced
Common base class for all non-exit exceptions.
87class FileBasedSource(ConcurrentSourceAdapter, ABC): 88 # We make each source override the concurrency level to give control over when they are upgraded. 89 _concurrency_level = None 90 91 def __init__( 92 self, 93 stream_reader: AbstractFileBasedStreamReader, 94 spec_class: Type[AbstractFileBasedSpec], 95 catalog: Optional[ConfiguredAirbyteCatalog], 96 config: Optional[Mapping[str, Any]], 97 state: Optional[List[AirbyteStateMessage]], 98 availability_strategy: Optional[AbstractFileBasedAvailabilityStrategy] = None, 99 discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(), 100 parsers: Mapping[Type[Any], FileTypeParser] = default_parsers, 101 validation_policies: Mapping[ 102 ValidationPolicy, AbstractSchemaValidationPolicy 103 ] = DEFAULT_SCHEMA_VALIDATION_POLICIES, 104 cursor_cls: Type[ 105 Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor] 106 ] = FileBasedConcurrentCursor, 107 stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None, 108 ): 109 self.stream_reader = stream_reader 110 self.stream_permissions_reader = stream_permissions_reader 111 self.spec_class = spec_class 112 self.config = config 113 self.catalog = catalog 114 self.state = state 115 self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy( 116 stream_reader 117 ) 118 self.discovery_policy = discovery_policy 119 self.parsers = parsers 120 self.validation_policies = validation_policies 121 self.stream_schemas = ( 122 {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {} 123 ) 124 self.cursor_cls = cursor_cls 125 self.logger = init_logger(f"airbyte.{self.name}") 126 self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector() 127 self._message_repository: Optional[MessageRepository] = None 128 concurrent_source = ConcurrentSource.create( 129 MAX_CONCURRENCY, 130 INITIAL_N_PARTITIONS, 131 self.logger, 132 self._slice_logger, 133 self.message_repository, 134 ) 135 self._state = None 136 super().__init__(concurrent_source) 137 138 @property 139 def message_repository(self) -> MessageRepository: 140 if self._message_repository is None: 141 self._message_repository = InMemoryMessageRepository( 142 Level(AirbyteLogFormatter.level_mapping[self.logger.level]) 143 ) 144 return self._message_repository 145 146 def check_connection( 147 self, logger: logging.Logger, config: Mapping[str, Any] 148 ) -> Tuple[bool, Optional[Any]]: 149 """ 150 Check that the source can be accessed using the user-provided configuration. 151 152 For each stream, verify that we can list and read files. 153 154 Returns (True, None) if the connection check is successful. 155 156 Otherwise, the "error" object should describe what went wrong. 157 """ 158 try: 159 streams = self.streams(config) 160 except ConfigValidationError as config_exception: 161 raise AirbyteTracedException( 162 internal_message="Please check the logged errors for more information.", 163 message=str(config_exception), 164 exception=AirbyteTracedException(exception=config_exception), 165 failure_type=FailureType.config_error, 166 ) 167 except Exception as exp: 168 raise AirbyteTracedException( 169 internal_message="Please check the logged errors for more information.", 170 message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value, 171 exception=AirbyteTracedException(exception=exp), 172 failure_type=FailureType.config_error, 173 ) 174 if len(streams) == 0: 175 return ( 176 False, 177 f"No streams are available for source {self.name}. This is probably an issue with the connector. Please verify that your " 178 f"configuration provides permissions to list and read files from the source. Contact support if you are unable to " 179 f"resolve this issue.", 180 ) 181 182 errors = [] 183 tracebacks = [] 184 for stream in streams: 185 if isinstance(stream, FileIdentitiesStream): 186 identity = next(iter(stream.load_identity_groups())) 187 if not identity: 188 errors.append( 189 "Unable to get identities for current configuration, please check your credentials" 190 ) 191 continue 192 if not isinstance(stream, AbstractFileBasedStream): 193 raise ValueError(f"Stream {stream} is not a file-based stream.") 194 try: 195 parsed_config = self._get_parsed_config(config) 196 availability_method = ( 197 stream.availability_strategy.check_availability 198 if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config) 199 else stream.availability_strategy.check_availability_and_parsability 200 ) 201 ( 202 stream_is_available, 203 reason, 204 ) = availability_method(stream, logger, self) 205 except AirbyteTracedException as ate: 206 errors.append(f"Unable to connect to stream {stream.name} - {ate.message}") 207 tracebacks.append(traceback.format_exc()) 208 except Exception: 209 errors.append(f"Unable to connect to stream {stream.name}") 210 tracebacks.append(traceback.format_exc()) 211 else: 212 if not stream_is_available and reason: 213 errors.append(reason) 214 215 if len(errors) == 1 and len(tracebacks) == 1: 216 raise AirbyteTracedException( 217 internal_message=tracebacks[0], 218 message=f"{errors[0]}", 219 failure_type=FailureType.config_error, 220 ) 221 if len(errors) == 1 and len(tracebacks) == 0: 222 raise AirbyteTracedException( 223 message=f"{errors[0]}", 224 failure_type=FailureType.config_error, 225 ) 226 elif len(errors) > 1: 227 raise AirbyteTracedException( 228 internal_message="\n".join(tracebacks), 229 message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}", 230 failure_type=FailureType.config_error, 231 ) 232 233 return not bool(errors), (errors or None) 234 235 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 236 """ 237 Return a list of this source's streams. 238 """ 239 240 if self.catalog: 241 state_manager = ConnectorStateManager(state=self.state) 242 else: 243 # During `check` operations we don't have a catalog so cannot create a state manager. 244 # Since the state manager is only required for incremental syncs, this is fine. 245 state_manager = None 246 247 try: 248 parsed_config = self._get_parsed_config(config) 249 self.stream_reader.config = parsed_config 250 if self.stream_permissions_reader: 251 self.stream_permissions_reader.config = parsed_config 252 streams: List[Stream] = [] 253 for stream_config in parsed_config.streams: 254 # Like state_manager, `catalog_stream` may be None during `check` 255 catalog_stream = self._get_stream_from_catalog(stream_config) 256 stream_state = ( 257 state_manager.get_stream_state(catalog_stream.name, catalog_stream.namespace) 258 if (state_manager and catalog_stream) 259 else None 260 ) 261 262 sync_mode = self._get_sync_mode_from_catalog(stream_config.name) 263 264 if ( 265 sync_mode == SyncMode.full_refresh 266 and hasattr(self, "_concurrency_level") 267 and self._concurrency_level is not None 268 ): 269 cursor = FileBasedFinalStateCursor( 270 stream_config=stream_config, 271 stream_namespace=None, 272 message_repository=self.message_repository, 273 ) 274 stream = FileBasedStreamFacade.create_from_stream( 275 stream=self._make_file_based_stream( 276 stream_config=stream_config, 277 cursor=cursor, 278 parsed_config=parsed_config, 279 ), 280 source=self, 281 logger=self.logger, 282 state=stream_state, 283 cursor=cursor, 284 ) 285 286 elif ( 287 sync_mode == SyncMode.incremental 288 and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor) 289 and hasattr(self, "_concurrency_level") 290 and self._concurrency_level is not None 291 ): 292 assert state_manager is not None, ( 293 "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support." 294 ) 295 296 cursor = self.cursor_cls( 297 stream_config, 298 stream_config.name, 299 None, 300 stream_state, 301 self.message_repository, 302 state_manager, 303 CursorField(DefaultFileBasedStream.ab_last_mod_col), 304 ) 305 stream = FileBasedStreamFacade.create_from_stream( 306 stream=self._make_file_based_stream( 307 stream_config=stream_config, 308 cursor=cursor, 309 parsed_config=parsed_config, 310 ), 311 source=self, 312 logger=self.logger, 313 state=stream_state, 314 cursor=cursor, 315 ) 316 else: 317 cursor = self.cursor_cls(stream_config) 318 stream = self._make_file_based_stream( 319 stream_config=stream_config, 320 cursor=cursor, 321 parsed_config=parsed_config, 322 ) 323 324 streams.append(stream) 325 326 if include_identities_stream(parsed_config): 327 identities_stream = self._make_identities_stream() 328 streams.append(identities_stream) 329 return streams 330 331 except ValidationError as exc: 332 raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc 333 334 def _make_default_stream( 335 self, 336 stream_config: FileBasedStreamConfig, 337 cursor: Optional[AbstractFileBasedCursor], 338 parsed_config: AbstractFileBasedSpec, 339 ) -> AbstractFileBasedStream: 340 return DefaultFileBasedStream( 341 config=stream_config, 342 catalog_schema=self.stream_schemas.get(stream_config.name), 343 stream_reader=self.stream_reader, 344 availability_strategy=self.availability_strategy, 345 discovery_policy=self.discovery_policy, 346 parsers=self.parsers, 347 validation_policy=self._validate_and_get_validation_policy(stream_config), 348 errors_collector=self.errors_collector, 349 cursor=cursor, 350 use_file_transfer=use_file_transfer(parsed_config), 351 preserve_directory_structure=preserve_directory_structure(parsed_config), 352 ) 353 354 def _ensure_permissions_reader_available(self) -> None: 355 """ 356 Validates that a stream permissions reader is available. 357 Raises a ValueError if the reader is not provided. 358 """ 359 if not self.stream_permissions_reader: 360 raise ValueError( 361 "Stream permissions reader is required for streams that use permissions transfer mode." 362 ) 363 364 def _make_permissions_stream( 365 self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor] 366 ) -> AbstractFileBasedStream: 367 """ 368 Creates a stream that reads permissions from files. 369 """ 370 self._ensure_permissions_reader_available() 371 return PermissionsFileBasedStream( 372 config=stream_config, 373 catalog_schema=self.stream_schemas.get(stream_config.name), 374 stream_reader=self.stream_reader, 375 availability_strategy=self.availability_strategy, 376 discovery_policy=self.discovery_policy, 377 parsers=self.parsers, 378 validation_policy=self._validate_and_get_validation_policy(stream_config), 379 errors_collector=self.errors_collector, 380 cursor=cursor, 381 stream_permissions_reader=self.stream_permissions_reader, # type: ignore 382 ) 383 384 def _make_file_based_stream( 385 self, 386 stream_config: FileBasedStreamConfig, 387 cursor: Optional[AbstractFileBasedCursor], 388 parsed_config: AbstractFileBasedSpec, 389 ) -> AbstractFileBasedStream: 390 """ 391 Creates different streams depending on the type of the transfer mode selected 392 """ 393 if use_permissions_transfer(parsed_config): 394 return self._make_permissions_stream(stream_config, cursor) 395 # we should have a stream for File transfer mode to decouple from DefaultFileBasedStream 396 else: 397 return self._make_default_stream(stream_config, cursor, parsed_config) 398 399 def _make_identities_stream( 400 self, 401 ) -> Stream: 402 self._ensure_permissions_reader_available() 403 return FileIdentitiesStream( 404 catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME), 405 stream_permissions_reader=self.stream_permissions_reader, # type: ignore 406 discovery_policy=self.discovery_policy, 407 errors_collector=self.errors_collector, 408 ) 409 410 def _get_stream_from_catalog( 411 self, stream_config: FileBasedStreamConfig 412 ) -> Optional[AirbyteStream]: 413 if self.catalog: 414 for stream in self.catalog.streams or []: 415 if stream.stream.name == stream_config.name: 416 return stream.stream 417 return None 418 419 def _get_sync_mode_from_catalog(self, stream_name: str) -> Optional[SyncMode]: 420 if self.catalog: 421 for catalog_stream in self.catalog.streams: 422 if stream_name == catalog_stream.stream.name: 423 return catalog_stream.sync_mode 424 self.logger.warning(f"No sync mode was found for {stream_name}.") 425 return None 426 427 def read( 428 self, 429 logger: logging.Logger, 430 config: Mapping[str, Any], 431 catalog: ConfiguredAirbyteCatalog, 432 state: Optional[List[AirbyteStateMessage]] = None, 433 ) -> Iterator[AirbyteMessage]: 434 yield from super().read(logger, config, catalog, state) 435 # emit all the errors collected 436 yield from self.errors_collector.yield_and_raise_collected() 437 # count streams using a certain parser 438 parsed_config = self._get_parsed_config(config) 439 for parser, count in Counter( 440 stream.format.filetype for stream in parsed_config.streams 441 ).items(): 442 yield create_analytics_message(f"file-cdk-{parser}-stream-count", count) 443 444 def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification: 445 """ 446 Returns the specification describing what fields can be configured by a user when setting up a file-based source. 447 """ 448 449 return ConnectorSpecification( 450 documentationUrl=self.spec_class.documentation_url(), 451 connectionSpecification=self.spec_class.schema(), 452 ) 453 454 def _get_parsed_config(self, config: Mapping[str, Any]) -> AbstractFileBasedSpec: 455 return self.spec_class(**config) 456 457 def _validate_and_get_validation_policy( 458 self, stream_config: FileBasedStreamConfig 459 ) -> AbstractSchemaValidationPolicy: 460 if stream_config.validation_policy not in self.validation_policies: 461 # This should never happen because we validate the config against the schema's validation_policy enum 462 raise ValidationError( 463 f"`validation_policy` must be one of {list(self.validation_policies.keys())}", 464 model=FileBasedStreamConfig, 465 ) 466 return self.validation_policies[stream_config.validation_policy]
Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.
91 def __init__( 92 self, 93 stream_reader: AbstractFileBasedStreamReader, 94 spec_class: Type[AbstractFileBasedSpec], 95 catalog: Optional[ConfiguredAirbyteCatalog], 96 config: Optional[Mapping[str, Any]], 97 state: Optional[List[AirbyteStateMessage]], 98 availability_strategy: Optional[AbstractFileBasedAvailabilityStrategy] = None, 99 discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(), 100 parsers: Mapping[Type[Any], FileTypeParser] = default_parsers, 101 validation_policies: Mapping[ 102 ValidationPolicy, AbstractSchemaValidationPolicy 103 ] = DEFAULT_SCHEMA_VALIDATION_POLICIES, 104 cursor_cls: Type[ 105 Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor] 106 ] = FileBasedConcurrentCursor, 107 stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None, 108 ): 109 self.stream_reader = stream_reader 110 self.stream_permissions_reader = stream_permissions_reader 111 self.spec_class = spec_class 112 self.config = config 113 self.catalog = catalog 114 self.state = state 115 self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy( 116 stream_reader 117 ) 118 self.discovery_policy = discovery_policy 119 self.parsers = parsers 120 self.validation_policies = validation_policies 121 self.stream_schemas = ( 122 {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {} 123 ) 124 self.cursor_cls = cursor_cls 125 self.logger = init_logger(f"airbyte.{self.name}") 126 self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector() 127 self._message_repository: Optional[MessageRepository] = None 128 concurrent_source = ConcurrentSource.create( 129 MAX_CONCURRENCY, 130 INITIAL_N_PARTITIONS, 131 self.logger, 132 self._slice_logger, 133 self.message_repository, 134 ) 135 self._state = None 136 super().__init__(concurrent_source)
ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.
The source's streams are still defined through the streams() method. Streams wrapped in a StreamFacade will be processed concurrently. Other streams will be processed sequentially as a later step.
146 def check_connection( 147 self, logger: logging.Logger, config: Mapping[str, Any] 148 ) -> Tuple[bool, Optional[Any]]: 149 """ 150 Check that the source can be accessed using the user-provided configuration. 151 152 For each stream, verify that we can list and read files. 153 154 Returns (True, None) if the connection check is successful. 155 156 Otherwise, the "error" object should describe what went wrong. 157 """ 158 try: 159 streams = self.streams(config) 160 except ConfigValidationError as config_exception: 161 raise AirbyteTracedException( 162 internal_message="Please check the logged errors for more information.", 163 message=str(config_exception), 164 exception=AirbyteTracedException(exception=config_exception), 165 failure_type=FailureType.config_error, 166 ) 167 except Exception as exp: 168 raise AirbyteTracedException( 169 internal_message="Please check the logged errors for more information.", 170 message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value, 171 exception=AirbyteTracedException(exception=exp), 172 failure_type=FailureType.config_error, 173 ) 174 if len(streams) == 0: 175 return ( 176 False, 177 f"No streams are available for source {self.name}. This is probably an issue with the connector. Please verify that your " 178 f"configuration provides permissions to list and read files from the source. Contact support if you are unable to " 179 f"resolve this issue.", 180 ) 181 182 errors = [] 183 tracebacks = [] 184 for stream in streams: 185 if isinstance(stream, FileIdentitiesStream): 186 identity = next(iter(stream.load_identity_groups())) 187 if not identity: 188 errors.append( 189 "Unable to get identities for current configuration, please check your credentials" 190 ) 191 continue 192 if not isinstance(stream, AbstractFileBasedStream): 193 raise ValueError(f"Stream {stream} is not a file-based stream.") 194 try: 195 parsed_config = self._get_parsed_config(config) 196 availability_method = ( 197 stream.availability_strategy.check_availability 198 if use_file_transfer(parsed_config) or use_permissions_transfer(parsed_config) 199 else stream.availability_strategy.check_availability_and_parsability 200 ) 201 ( 202 stream_is_available, 203 reason, 204 ) = availability_method(stream, logger, self) 205 except AirbyteTracedException as ate: 206 errors.append(f"Unable to connect to stream {stream.name} - {ate.message}") 207 tracebacks.append(traceback.format_exc()) 208 except Exception: 209 errors.append(f"Unable to connect to stream {stream.name}") 210 tracebacks.append(traceback.format_exc()) 211 else: 212 if not stream_is_available and reason: 213 errors.append(reason) 214 215 if len(errors) == 1 and len(tracebacks) == 1: 216 raise AirbyteTracedException( 217 internal_message=tracebacks[0], 218 message=f"{errors[0]}", 219 failure_type=FailureType.config_error, 220 ) 221 if len(errors) == 1 and len(tracebacks) == 0: 222 raise AirbyteTracedException( 223 message=f"{errors[0]}", 224 failure_type=FailureType.config_error, 225 ) 226 elif len(errors) > 1: 227 raise AirbyteTracedException( 228 internal_message="\n".join(tracebacks), 229 message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}", 230 failure_type=FailureType.config_error, 231 ) 232 233 return not bool(errors), (errors or None)
Check that the source can be accessed using the user-provided configuration.
For each stream, verify that we can list and read files.
Returns (True, None) if the connection check is successful.
Otherwise, the "error" object should describe what went wrong.
235 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 236 """ 237 Return a list of this source's streams. 238 """ 239 240 if self.catalog: 241 state_manager = ConnectorStateManager(state=self.state) 242 else: 243 # During `check` operations we don't have a catalog so cannot create a state manager. 244 # Since the state manager is only required for incremental syncs, this is fine. 245 state_manager = None 246 247 try: 248 parsed_config = self._get_parsed_config(config) 249 self.stream_reader.config = parsed_config 250 if self.stream_permissions_reader: 251 self.stream_permissions_reader.config = parsed_config 252 streams: List[Stream] = [] 253 for stream_config in parsed_config.streams: 254 # Like state_manager, `catalog_stream` may be None during `check` 255 catalog_stream = self._get_stream_from_catalog(stream_config) 256 stream_state = ( 257 state_manager.get_stream_state(catalog_stream.name, catalog_stream.namespace) 258 if (state_manager and catalog_stream) 259 else None 260 ) 261 262 sync_mode = self._get_sync_mode_from_catalog(stream_config.name) 263 264 if ( 265 sync_mode == SyncMode.full_refresh 266 and hasattr(self, "_concurrency_level") 267 and self._concurrency_level is not None 268 ): 269 cursor = FileBasedFinalStateCursor( 270 stream_config=stream_config, 271 stream_namespace=None, 272 message_repository=self.message_repository, 273 ) 274 stream = FileBasedStreamFacade.create_from_stream( 275 stream=self._make_file_based_stream( 276 stream_config=stream_config, 277 cursor=cursor, 278 parsed_config=parsed_config, 279 ), 280 source=self, 281 logger=self.logger, 282 state=stream_state, 283 cursor=cursor, 284 ) 285 286 elif ( 287 sync_mode == SyncMode.incremental 288 and issubclass(self.cursor_cls, AbstractConcurrentFileBasedCursor) 289 and hasattr(self, "_concurrency_level") 290 and self._concurrency_level is not None 291 ): 292 assert state_manager is not None, ( 293 "No ConnectorStateManager was created, but it is required for incremental syncs. This is unexpected. Please contact Support." 294 ) 295 296 cursor = self.cursor_cls( 297 stream_config, 298 stream_config.name, 299 None, 300 stream_state, 301 self.message_repository, 302 state_manager, 303 CursorField(DefaultFileBasedStream.ab_last_mod_col), 304 ) 305 stream = FileBasedStreamFacade.create_from_stream( 306 stream=self._make_file_based_stream( 307 stream_config=stream_config, 308 cursor=cursor, 309 parsed_config=parsed_config, 310 ), 311 source=self, 312 logger=self.logger, 313 state=stream_state, 314 cursor=cursor, 315 ) 316 else: 317 cursor = self.cursor_cls(stream_config) 318 stream = self._make_file_based_stream( 319 stream_config=stream_config, 320 cursor=cursor, 321 parsed_config=parsed_config, 322 ) 323 324 streams.append(stream) 325 326 if include_identities_stream(parsed_config): 327 identities_stream = self._make_identities_stream() 328 streams.append(identities_stream) 329 return streams 330 331 except ValidationError as exc: 332 raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) from exc
Return a list of this source's streams.
427 def read( 428 self, 429 logger: logging.Logger, 430 config: Mapping[str, Any], 431 catalog: ConfiguredAirbyteCatalog, 432 state: Optional[List[AirbyteStateMessage]] = None, 433 ) -> Iterator[AirbyteMessage]: 434 yield from super().read(logger, config, catalog, state) 435 # emit all the errors collected 436 yield from self.errors_collector.yield_and_raise_collected() 437 # count streams using a certain parser 438 parsed_config = self._get_parsed_config(config) 439 for parser, count in Counter( 440 stream.format.filetype for stream in parsed_config.streams 441 ).items(): 442 yield create_analytics_message(f"file-cdk-{parser}-stream-count", count)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.
444 def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification: 445 """ 446 Returns the specification describing what fields can be configured by a user when setting up a file-based source. 447 """ 448 449 return ConnectorSpecification( 450 documentationUrl=self.spec_class.documentation_url(), 451 connectionSpecification=self.spec_class.schema(), 452 )
Returns the specification describing what fields can be configured by a user when setting up a file-based source.
Inherited Members
13class FileBasedSourceError(Enum): 14 EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict." 15 GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split." 16 ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding." 17 ERROR_CASTING_VALUE = "Could not cast the value to the expected type." 18 ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string." 19 ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record." 20 ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files." 21 ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files." 22 ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable." 23 ERROR_PARSING_USER_PROVIDED_SCHEMA = ( 24 "The provided schema could not be transformed into valid JSON Schema." 25 ) 26 ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy." 27 ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time." 28 ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." 29 ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." 30 STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema." 31 NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key." 32 UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type." 33 SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?" 34 INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns." 35 CONFIG_VALIDATION_ERROR = "Error creating stream config object." 36 MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing." 37 UNDEFINED_PARSER = "No parser is defined for this file type." 38 UNDEFINED_VALIDATION_POLICY = ( 39 "The validation policy defined in the config does not exist for the source." 40 )
29class FileBasedStreamConfig(BaseModel): 30 name: str = Field(title="Name", description="The name of the stream.") 31 globs: Optional[List[str]] = Field( 32 default=["**"], 33 title="Globs", 34 description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.', 35 order=1, 36 ) 37 legacy_prefix: Optional[str] = Field( 38 title="Legacy Prefix", 39 description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.", 40 airbyte_hidden=True, 41 ) 42 validation_policy: ValidationPolicy = Field( 43 title="Validation Policy", 44 description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.", 45 default=ValidationPolicy.emit_record, 46 ) 47 input_schema: Optional[str] = Field( 48 title="Input Schema", 49 description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.", 50 ) 51 primary_key: Optional[str] = Field( 52 title="Primary Key", 53 description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.", 54 airbyte_hidden=True, # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here. 55 ) 56 days_to_sync_if_history_is_full: int = Field( 57 title="Days To Sync If History Is Full", 58 description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.", 59 default=3, 60 ) 61 format: Union[ 62 AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat 63 ] = Field( 64 title="Format", 65 description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", 66 ) 67 schemaless: bool = Field( 68 title="Schemaless", 69 description="When enabled, syncs will not validate or structure records against the stream's schema.", 70 default=False, 71 ) 72 recent_n_files_to_read_for_schema_discovery: Optional[int] = Field( 73 title="Files To Read For Schema Discover", 74 description="The number of resent files which will be used to discover the schema for this stream.", 75 default=None, 76 gt=0, 77 ) 78 use_first_found_file_for_schema_discovery: bool = Field( 79 title="Use First Found File For Schema Discover", 80 description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.", 81 default=False, 82 ) 83 84 @validator("input_schema", pre=True) 85 def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: 86 if v: 87 if type_mapping_to_jsonschema(v): 88 return v 89 else: 90 raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA) 91 return None 92 93 @root_validator 94 def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]: 95 """ 96 Please update this validation when new related to schema discovery field is added. 97 Validates schema discovery options compatibility. 98 Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided. 99 So this method doesn't check it to do not break already created connections. 100 If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided, 101 recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default. 102 """ 103 input_schema = values["input_schema"] is not None 104 schemaless = values["schemaless"] 105 recent_n_files_to_read_for_schema_discovery = ( 106 values["recent_n_files_to_read_for_schema_discovery"] is not None 107 ) 108 use_first_found_file_for_schema_discovery = values[ 109 "use_first_found_file_for_schema_discovery" 110 ] 111 112 if ( 113 recent_n_files_to_read_for_schema_discovery 114 and use_first_found_file_for_schema_discovery 115 ) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1: 116 raise ConfigValidationError( 117 FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS 118 ) 119 120 return values 121 122 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 123 """ 124 User defined input_schema is defined as a string in the config. This method takes the string representation 125 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 126 """ 127 if self.input_schema: 128 schema = type_mapping_to_jsonschema(self.input_schema) 129 if not schema: 130 raise ValueError( 131 f"Unable to create JSON schema from input schema {self.input_schema}" 132 ) 133 return schema 134 return None
122 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 123 """ 124 User defined input_schema is defined as a string in the config. This method takes the string representation 125 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 126 """ 127 if self.input_schema: 128 schema = type_mapping_to_jsonschema(self.input_schema) 129 if not schema: 130 raise ValueError( 131 f"Unable to create JSON schema from input schema {self.input_schema}" 132 ) 133 return schema 134 return None
User defined input_schema is defined as a string in the config. This method takes the string representation and converts it into a Mapping[str, Any] which is used by file-based CDK components.
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
12class RemoteFile(BaseModel): 13 """ 14 A file in a file-based stream. 15 """ 16 17 uri: str 18 last_modified: datetime 19 mime_type: Optional[str] = None
A file in a file-based stream.