airbyte_cdk.sources.file_based.config.abstract_file_based_spec
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6from abc import abstractmethod 7from typing import Any, Dict, List, Literal, Optional, Union 8 9import dpath 10from pydantic.v1 import AnyUrl, BaseModel, Field 11 12from airbyte_cdk import OneOfOptionConfig 13from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig 14from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions 15from airbyte_cdk.sources.utils import schema_helpers 16 17 18class DeliverRecords(BaseModel): 19 class Config(OneOfOptionConfig): 20 title = "Replicate Records" 21 description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination." 22 discriminator = "delivery_type" 23 24 delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True) 25 26 27class DeliverRawFiles(BaseModel): 28 class Config(OneOfOptionConfig): 29 title = "Copy Raw Files" 30 description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files." 31 discriminator = "delivery_type" 32 33 delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True) 34 35 preserve_directory_structure: bool = Field( 36 title="Preserve Sub-Directories in File Paths", 37 description=( 38 "If enabled, sends subdirectory folder structure " 39 "along with source file names to the destination. " 40 "Otherwise, files will be synced by their names only. " 41 "This option is ignored when file-based replication is not enabled." 42 ), 43 default=True, 44 ) 45 46 47class AbstractFileBasedSpec(BaseModel): 48 """ 49 Used during spec; allows the developer to configure the cloud provider specific options 50 that are needed when users configure a file-based source. 51 """ 52 53 start_date: Optional[str] = Field( 54 title="Start Date", 55 description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.", 56 examples=["2021-01-01T00:00:00.000000Z"], 57 format="date-time", 58 pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$", 59 pattern_descriptor="YYYY-MM-DDTHH:mm:ss.SSSSSSZ", 60 order=1, 61 ) 62 63 streams: List[FileBasedStreamConfig] = Field( 64 title="The list of streams to sync", 65 description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.', 66 order=10, 67 ) 68 69 delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field( 70 title="Delivery Method", 71 discriminator="delivery_type", 72 type="object", 73 order=7, 74 display_type="radio", 75 group="advanced", 76 default="use_records_transfer", 77 airbyte_hidden=True, 78 ) 79 80 @classmethod 81 @abstractmethod 82 def documentation_url(cls) -> AnyUrl: 83 """ 84 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 85 """ 86 87 @classmethod 88 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 89 """ 90 Generates the mapping comprised of the config fields 91 """ 92 schema = super().schema(*args, **kwargs) 93 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 94 schema_helpers.expand_refs(transformed_schema) 95 cls.replace_enum_allOf_and_anyOf(transformed_schema) 96 cls.remove_discriminator(transformed_schema) 97 98 return transformed_schema 99 100 @staticmethod 101 def remove_discriminator(schema: Dict[str, Any]) -> None: 102 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 103 dpath.delete(schema, "properties/**/discriminator") 104 105 @staticmethod 106 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 107 """ 108 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 109 Unpacks the enums under allOf and moves them up a level under the enum key 110 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 111 additional validation that an incoming config only matches exactly one of a field's types. 112 """ 113 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 114 objects_to_check["type"] = "object" 115 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 116 for format in objects_to_check["oneOf"]: 117 for key in format["properties"]: 118 object_property = format["properties"][key] 119 AbstractFileBasedSpec.move_enum_to_root(object_property) 120 121 properties_to_change = ["validation_policy"] 122 for property_to_change in properties_to_change: 123 property_object = schema["properties"]["streams"]["items"]["properties"][ 124 property_to_change 125 ] 126 if "anyOf" in property_object: 127 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 128 "type" 129 ] = "object" 130 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 131 "oneOf" 132 ] = property_object.pop("anyOf") 133 AbstractFileBasedSpec.move_enum_to_root(property_object) 134 135 csv_format_schemas = list( 136 filter( 137 lambda format: format["properties"]["filetype"]["default"] == "csv", 138 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 139 ) 140 ) 141 if len(csv_format_schemas) != 1: 142 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 143 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 144 "properties" 145 ]["header_definition"].pop("anyOf", []) 146 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 147 return schema 148 149 @staticmethod 150 def move_enum_to_root(object_property: Dict[str, Any]) -> None: 151 if "allOf" in object_property and "enum" in object_property["allOf"][0]: 152 object_property["enum"] = object_property["allOf"][0]["enum"] 153 object_property.pop("allOf")
19class DeliverRecords(BaseModel): 20 class Config(OneOfOptionConfig): 21 title = "Replicate Records" 22 description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination." 23 discriminator = "delivery_type" 24 25 delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True)
20 class Config(OneOfOptionConfig): 21 title = "Replicate Records" 22 description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination." 23 discriminator = "delivery_type"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
28class DeliverRawFiles(BaseModel): 29 class Config(OneOfOptionConfig): 30 title = "Copy Raw Files" 31 description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files." 32 discriminator = "delivery_type" 33 34 delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True) 35 36 preserve_directory_structure: bool = Field( 37 title="Preserve Sub-Directories in File Paths", 38 description=( 39 "If enabled, sends subdirectory folder structure " 40 "along with source file names to the destination. " 41 "Otherwise, files will be synced by their names only. " 42 "This option is ignored when file-based replication is not enabled." 43 ), 44 default=True, 45 )
29 class Config(OneOfOptionConfig): 30 title = "Copy Raw Files" 31 description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files." 32 discriminator = "delivery_type"
Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.
Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).
Usage:
class OptionModel(BaseModel): mode: Literal["option_a"] = Field("option_a", const=True) option_a_field: str = Field(...) class Config(OneOfOptionConfig): title = "Option A" description = "Option A description" discriminator = "mode"
Inherited Members
48class AbstractFileBasedSpec(BaseModel): 49 """ 50 Used during spec; allows the developer to configure the cloud provider specific options 51 that are needed when users configure a file-based source. 52 """ 53 54 start_date: Optional[str] = Field( 55 title="Start Date", 56 description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.", 57 examples=["2021-01-01T00:00:00.000000Z"], 58 format="date-time", 59 pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$", 60 pattern_descriptor="YYYY-MM-DDTHH:mm:ss.SSSSSSZ", 61 order=1, 62 ) 63 64 streams: List[FileBasedStreamConfig] = Field( 65 title="The list of streams to sync", 66 description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.', 67 order=10, 68 ) 69 70 delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field( 71 title="Delivery Method", 72 discriminator="delivery_type", 73 type="object", 74 order=7, 75 display_type="radio", 76 group="advanced", 77 default="use_records_transfer", 78 airbyte_hidden=True, 79 ) 80 81 @classmethod 82 @abstractmethod 83 def documentation_url(cls) -> AnyUrl: 84 """ 85 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 86 """ 87 88 @classmethod 89 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 90 """ 91 Generates the mapping comprised of the config fields 92 """ 93 schema = super().schema(*args, **kwargs) 94 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 95 schema_helpers.expand_refs(transformed_schema) 96 cls.replace_enum_allOf_and_anyOf(transformed_schema) 97 cls.remove_discriminator(transformed_schema) 98 99 return transformed_schema 100 101 @staticmethod 102 def remove_discriminator(schema: Dict[str, Any]) -> None: 103 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 104 dpath.delete(schema, "properties/**/discriminator") 105 106 @staticmethod 107 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 108 """ 109 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 110 Unpacks the enums under allOf and moves them up a level under the enum key 111 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 112 additional validation that an incoming config only matches exactly one of a field's types. 113 """ 114 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 115 objects_to_check["type"] = "object" 116 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 117 for format in objects_to_check["oneOf"]: 118 for key in format["properties"]: 119 object_property = format["properties"][key] 120 AbstractFileBasedSpec.move_enum_to_root(object_property) 121 122 properties_to_change = ["validation_policy"] 123 for property_to_change in properties_to_change: 124 property_object = schema["properties"]["streams"]["items"]["properties"][ 125 property_to_change 126 ] 127 if "anyOf" in property_object: 128 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 129 "type" 130 ] = "object" 131 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 132 "oneOf" 133 ] = property_object.pop("anyOf") 134 AbstractFileBasedSpec.move_enum_to_root(property_object) 135 136 csv_format_schemas = list( 137 filter( 138 lambda format: format["properties"]["filetype"]["default"] == "csv", 139 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 140 ) 141 ) 142 if len(csv_format_schemas) != 1: 143 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 144 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 145 "properties" 146 ]["header_definition"].pop("anyOf", []) 147 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 148 return schema 149 150 @staticmethod 151 def move_enum_to_root(object_property: Dict[str, Any]) -> None: 152 if "allOf" in object_property and "enum" in object_property["allOf"][0]: 153 object_property["enum"] = object_property["allOf"][0]["enum"] 154 object_property.pop("allOf")
Used during spec; allows the developer to configure the cloud provider specific options that are needed when users configure a file-based source.
81 @classmethod 82 @abstractmethod 83 def documentation_url(cls) -> AnyUrl: 84 """ 85 :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3" 86 """
Returns
link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
88 @classmethod 89 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 90 """ 91 Generates the mapping comprised of the config fields 92 """ 93 schema = super().schema(*args, **kwargs) 94 transformed_schema: Dict[str, Any] = copy.deepcopy(schema) 95 schema_helpers.expand_refs(transformed_schema) 96 cls.replace_enum_allOf_and_anyOf(transformed_schema) 97 cls.remove_discriminator(transformed_schema) 98 99 return transformed_schema
Generates the mapping comprised of the config fields
101 @staticmethod 102 def remove_discriminator(schema: Dict[str, Any]) -> None: 103 """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references""" 104 dpath.delete(schema, "properties/**/discriminator")
pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references
106 @staticmethod 107 def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]: 108 """ 109 allOfs are not supported by the UI, but pydantic is automatically writing them for enums. 110 Unpacks the enums under allOf and moves them up a level under the enum key 111 anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the 112 additional validation that an incoming config only matches exactly one of a field's types. 113 """ 114 objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"] 115 objects_to_check["type"] = "object" 116 objects_to_check["oneOf"] = objects_to_check.pop("anyOf", []) 117 for format in objects_to_check["oneOf"]: 118 for key in format["properties"]: 119 object_property = format["properties"][key] 120 AbstractFileBasedSpec.move_enum_to_root(object_property) 121 122 properties_to_change = ["validation_policy"] 123 for property_to_change in properties_to_change: 124 property_object = schema["properties"]["streams"]["items"]["properties"][ 125 property_to_change 126 ] 127 if "anyOf" in property_object: 128 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 129 "type" 130 ] = "object" 131 schema["properties"]["streams"]["items"]["properties"][property_to_change][ 132 "oneOf" 133 ] = property_object.pop("anyOf") 134 AbstractFileBasedSpec.move_enum_to_root(property_object) 135 136 csv_format_schemas = list( 137 filter( 138 lambda format: format["properties"]["filetype"]["default"] == "csv", 139 schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"], 140 ) 141 ) 142 if len(csv_format_schemas) != 1: 143 raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}") 144 csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][ 145 "properties" 146 ]["header_definition"].pop("anyOf", []) 147 csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object" 148 return schema
allOfs are not supported by the UI, but pydantic is automatically writing them for enums. Unpacks the enums under allOf and moves them up a level under the enum key anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the additional validation that an incoming config only matches exactly one of a field's types.