airbyte_cdk.sources.file_based.config.abstract_file_based_spec
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6from abc import abstractmethod 7from typing import Any, Dict, List, Literal, Optional, Union 8 9import dpath 10from pydantic.v1 import AnyUrl, BaseModel, Field, validator 11 12from airbyte_cdk import OneOfOptionConfig 13from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig 14from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions 15from airbyte_cdk.sources.utils import schema_helpers 16from airbyte_cdk.utils.datetime_helpers import ab_datetime_try_parse 17 18 19class DeliverRecords(BaseModel): 20 class Config(OneOfOptionConfig): 21 title = "Replicate Records" 22 description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination." 23 discriminator = "delivery_type" 24 25 delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True) 26 27 28class DeliverRawFiles(BaseModel): 29 class Config(OneOfOptionConfig): 30 title = "Copy Raw Files" 31 description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files." 32 discriminator = "delivery_type" 33 34 delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True) 35 36 preserve_directory_structure: bool = Field( 37 title="Preserve Sub-Directories in File Paths", 38 description=( 39 "If enabled, sends subdirectory folder structure " 40 "along with source file names to the destination. " 41 "Otherwise, files will be synced by their names only. " 42 "This option is ignored when file-based replication is not enabled." 43 ), 44 default=True, 45 ) 46 47 48class AbstractFileBasedSpec(BaseModel): 49 """ 50 Used during spec; allows the developer to configure the cloud provider specific options 51 that are needed when users configure a file-based source. 52 """ 53 54 start_date: Optional[str] = Field( 55 title="Start Date", 56 description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.", 57 examples=[ 58 "2021-01-01", 59 "2021-01-01T00:00:00Z", 60 "2021-01-01T00:00:00.000Z", 61 "2021-01-01T00:00:00.000000Z", 62 ], 63 format="date-time", 64 pattern=r"^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|[+-][0-9]{2}:[0-9]{2})?)?$", 65 pattern_descriptor="YYYY-MM-DD, YYYY-MM-DDTHH:mm:ssZ, or YYYY-MM-DDTHH:mm:ss.SSSSSSZ", 66 order=1, 67 ) 68 69 @validator("start_date", pre=True) 70 def validate_start_date( 71 cls, # noqa: N805 # Pydantic validators use cls, not self 72 v: Optional[str], 73 ) -> Optional[str]: 74 """Validate that start_date is a parseable datetime string. 75 76 Uses ab_datetime_try_parse which accepts any common ISO8601/RFC3339 format, 77 including formats with or without microseconds (e.g., both 78 '2021-01-01T00:00:00Z' and '2021-01-01T00:00:00.000000Z' are valid). 79 """ 80 if v is None: 81 return v 82 parsed = ab_datetime_try_parse(v) 83 if parsed is None: 84 raise ValueError( 85 f"'{v}' is not a valid datetime string. " 86 "Please use a format like '2021-01-01T00:00:00Z' or '2021-01-01T00:00:00.000000Z'." 87 ) 88 return v 89 90 streams: List[FileBasedStreamConfig] = Field( 91 title="The list of streams to sync", 92 description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.', 93 order=10, 94 ) 95 96 delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field( 97 title="Delivery Method", 98 discriminator="delivery_type", 99 type="object", 100 order=7, 101 display_type="radio", 102 group="advanced", 103 default="use_records_transfer", 104 airbyte_hidden=True, 105 ) 106 107 @classmethod 108 @abstractmethod 109 def documentation_url(cls) -> AnyUrl: 110 """ 111 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 112 """ 113 114 @classmethod 115 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 116 """ 117 Generates the mapping comprised of the config fields 118 """ 119 schema = super().schema(*args, **kwargs) 120 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 121 schema_helpers.expand_refs(transformed_schema) 122 cls.replace_enum_allOf_and_anyOf(transformed_schema) 123 cls.remove_discriminator(transformed_schema) 124 125 return transformed_schema 126 127 @staticmethod 128 def remove_discriminator(schema: Dict[str, Any]) -> None: 129 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 130 dpath.delete(schema, "properties/**/discriminator") 131 132 @staticmethod 133 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 134 """ 135 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 136 Unpacks the enums under allOf and moves them up a level under the enum key 137 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 138 additional validation that an incoming config only matches exactly one of a field's types. 139 """ 140 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 141 objects_to_check["type"] = "object" 142 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 143 for format in objects_to_check["oneOf"]: 144 for key in format["properties"]: 145 object_property = format["properties"][key] 146 AbstractFileBasedSpec.move_enum_to_root(object_property) 147 148 properties_to_change = ["validation_policy"] 149 for property_to_change in properties_to_change: 150 property_object = schema["properties"]["streams"]["items"]["properties"][ 151 property_to_change 152 ] 153 if "anyOf" in property_object: 154 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 155 "type" 156 ] = "object" 157 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 158 "oneOf" 159 ] = property_object.pop("anyOf") 160 AbstractFileBasedSpec.move_enum_to_root(property_object) 161 162 csv_format_schemas = list( 163 filter( 164 lambda format: format["properties"]["filetype"]["default"] == "csv", 165 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 166 ) 167 ) 168 if len(csv_format_schemas) != 1: 169 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 170 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 171 "properties" 172 ]["header_definition"].pop("anyOf", []) 173 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 174 return schema 175 176 @staticmethod 177 def move_enum_to_root(object_property: Dict[str, Any]) -> None: 178 if "allOf" in object_property and "enum" in object_property["allOf"][0]: 179 object_property["enum"] = object_property["allOf"][0]["enum"] 180 object_property.pop("allOf")
20class DeliverRecords(BaseModel): 21 class Config(OneOfOptionConfig): 22 title = "Replicate Records" 23 description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination." 24 discriminator = "delivery_type" 25 26 delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True)
21 class Config(OneOfOptionConfig): 22 title = "Replicate Records" 23 description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination." 24 discriminator = "delivery_type"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
29class DeliverRawFiles(BaseModel): 30 class Config(OneOfOptionConfig): 31 title = "Copy Raw Files" 32 description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files." 33 discriminator = "delivery_type" 34 35 delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True) 36 37 preserve_directory_structure: bool = Field( 38 title="Preserve Sub-Directories in File Paths", 39 description=( 40 "If enabled, sends subdirectory folder structure " 41 "along with source file names to the destination. " 42 "Otherwise, files will be synced by their names only. " 43 "This option is ignored when file-based replication is not enabled." 44 ), 45 default=True, 46 )
30 class Config(OneOfOptionConfig): 31 title = "Copy Raw Files" 32 description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files." 33 discriminator = "delivery_type"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
49class AbstractFileBasedSpec(BaseModel): 50 """ 51 Used during spec; allows the developer to configure the cloud provider specific options 52 that are needed when users configure a file-based source. 53 """ 54 55 start_date: Optional[str] = Field( 56 title="Start Date", 57 description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.", 58 examples=[ 59 "2021-01-01", 60 "2021-01-01T00:00:00Z", 61 "2021-01-01T00:00:00.000Z", 62 "2021-01-01T00:00:00.000000Z", 63 ], 64 format="date-time", 65 pattern=r"^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|[+-][0-9]{2}:[0-9]{2})?)?$", 66 pattern_descriptor="YYYY-MM-DD, YYYY-MM-DDTHH:mm:ssZ, or YYYY-MM-DDTHH:mm:ss.SSSSSSZ", 67 order=1, 68 ) 69 70 @validator("start_date", pre=True) 71 def validate_start_date( 72 cls, # noqa: N805 # Pydantic validators use cls, not self 73 v: Optional[str], 74 ) -> Optional[str]: 75 """Validate that start_date is a parseable datetime string. 76 77 Uses ab_datetime_try_parse which accepts any common ISO8601/RFC3339 format, 78 including formats with or without microseconds (e.g., both 79 '2021-01-01T00:00:00Z' and '2021-01-01T00:00:00.000000Z' are valid). 80 """ 81 if v is None: 82 return v 83 parsed = ab_datetime_try_parse(v) 84 if parsed is None: 85 raise ValueError( 86 f"'{v}' is not a valid datetime string. " 87 "Please use a format like '2021-01-01T00:00:00Z' or '2021-01-01T00:00:00.000000Z'." 88 ) 89 return v 90 91 streams: List[FileBasedStreamConfig] = Field( 92 title="The list of streams to sync", 93 description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.', 94 order=10, 95 ) 96 97 delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field( 98 title="Delivery Method", 99 discriminator="delivery_type", 100 type="object", 101 order=7, 102 display_type="radio", 103 group="advanced", 104 default="use_records_transfer", 105 airbyte_hidden=True, 106 ) 107 108 @classmethod 109 @abstractmethod 110 def documentation_url(cls) -> AnyUrl: 111 """ 112 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 113 """ 114 115 @classmethod 116 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 117 """ 118 Generates the mapping comprised of the config fields 119 """ 120 schema = super().schema(*args, **kwargs) 121 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 122 schema_helpers.expand_refs(transformed_schema) 123 cls.replace_enum_allOf_and_anyOf(transformed_schema) 124 cls.remove_discriminator(transformed_schema) 125 126 return transformed_schema 127 128 @staticmethod 129 def remove_discriminator(schema: Dict[str, Any]) -> None: 130 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 131 dpath.delete(schema, "properties/**/discriminator") 132 133 @staticmethod 134 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 135 """ 136 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 137 Unpacks the enums under allOf and moves them up a level under the enum key 138 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 139 additional validation that an incoming config only matches exactly one of a field's types. 140 """ 141 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 142 objects_to_check["type"] = "object" 143 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 144 for format in objects_to_check["oneOf"]: 145 for key in format["properties"]: 146 object_property = format["properties"][key] 147 AbstractFileBasedSpec.move_enum_to_root(object_property) 148 149 properties_to_change = ["validation_policy"] 150 for property_to_change in properties_to_change: 151 property_object = schema["properties"]["streams"]["items"]["properties"][ 152 property_to_change 153 ] 154 if "anyOf" in property_object: 155 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 156 "type" 157 ] = "object" 158 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 159 "oneOf" 160 ] = property_object.pop("anyOf") 161 AbstractFileBasedSpec.move_enum_to_root(property_object) 162 163 csv_format_schemas = list( 164 filter( 165 lambda format: format["properties"]["filetype"]["default"] == "csv", 166 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 167 ) 168 ) 169 if len(csv_format_schemas) != 1: 170 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 171 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 172 "properties" 173 ]["header_definition"].pop("anyOf", []) 174 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 175 return schema 176 177 @staticmethod 178 def move_enum_to_root(object_property: Dict[str, Any]) -> None: 179 if "allOf" in object_property and "enum" in object_property["allOf"][0]: 180 object_property["enum"] = object_property["allOf"][0]["enum"] 181 object_property.pop("allOf")
Used during spec; allows the developer to configure the cloud provider specific options that are needed when users configure a file-based source.
70 @validator("start_date", pre=True) 71 def validate_start_date( 72 cls, # noqa: N805 # Pydantic validators use cls, not self 73 v: Optional[str], 74 ) -> Optional[str]: 75 """Validate that start_date is a parseable datetime string. 76 77 Uses ab_datetime_try_parse which accepts any common ISO8601/RFC3339 format, 78 including formats with or without microseconds (e.g., both 79 '2021-01-01T00:00:00Z' and '2021-01-01T00:00:00.000000Z' are valid). 80 """ 81 if v is None: 82 return v 83 parsed = ab_datetime_try_parse(v) 84 if parsed is None: 85 raise ValueError( 86 f"'{v}' is not a valid datetime string. " 87 "Please use a format like '2021-01-01T00:00:00Z' or '2021-01-01T00:00:00.000000Z'." 88 ) 89 return v
Validate that start_date is a parseable datetime string.
Uses ab_datetime_try_parse which accepts any common ISO8601/RFC3339 format, including formats with or without microseconds (e.g., both '2021-01-01T00:00:00Z' and '2021-01-01T00:00:00.000000Z' are valid).
108 @classmethod 109 @abstractmethod 110 def documentation_url(cls) -> AnyUrl: 111 """ 112 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 113 """
Returns
link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
115 @classmethod 116 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 117 """ 118 Generates the mapping comprised of the config fields 119 """ 120 schema = super().schema(*args, **kwargs) 121 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 122 schema_helpers.expand_refs(transformed_schema) 123 cls.replace_enum_allOf_and_anyOf(transformed_schema) 124 cls.remove_discriminator(transformed_schema) 125 126 return transformed_schema
Generates the mapping comprised of the config fields
128 @staticmethod 129 def remove_discriminator(schema: Dict[str, Any]) -> None: 130 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 131 dpath.delete(schema, "properties/**/discriminator")
pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references
133 @staticmethod 134 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 135 """ 136 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 137 Unpacks the enums under allOf and moves them up a level under the enum key 138 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 139 additional validation that an incoming config only matches exactly one of a field's types. 140 """ 141 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 142 objects_to_check["type"] = "object" 143 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 144 for format in objects_to_check["oneOf"]: 145 for key in format["properties"]: 146 object_property = format["properties"][key] 147 AbstractFileBasedSpec.move_enum_to_root(object_property) 148 149 properties_to_change = ["validation_policy"] 150 for property_to_change in properties_to_change: 151 property_object = schema["properties"]["streams"]["items"]["properties"][ 152 property_to_change 153 ] 154 if "anyOf" in property_object: 155 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 156 "type" 157 ] = "object" 158 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 159 "oneOf" 160 ] = property_object.pop("anyOf") 161 AbstractFileBasedSpec.move_enum_to_root(property_object) 162 163 csv_format_schemas = list( 164 filter( 165 lambda format: format["properties"]["filetype"]["default"] == "csv", 166 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 167 ) 168 ) 169 if len(csv_format_schemas) != 1: 170 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 171 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 172 "properties" 173 ]["header_definition"].pop("anyOf", []) 174 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 175 return schema
allOfs are not supported by the UI, but pydantic is automatically writing them for enums. Unpacks the enums under allOf and moves them up a level under the enum key anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the additional validation that an incoming config only matches exactly one of a field's types.