airbyte_cdk.sources.file_based.config.abstract_file_based_spec

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6from abc import abstractmethod
  7from typing import Any, Dict, List, Literal, Optional, Union
  8
  9import dpath
 10from pydantic.v1 import AnyUrl, BaseModel, Field, validator
 11
 12from airbyte_cdk import OneOfOptionConfig
 13from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
 14from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions
 15from airbyte_cdk.sources.utils import schema_helpers
 16from airbyte_cdk.utils.datetime_helpers import ab_datetime_try_parse
 17
 18
 19class DeliverRecords(BaseModel):
 20    class Config(OneOfOptionConfig):
 21        title = "Replicate Records"
 22        description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination."
 23        discriminator = "delivery_type"
 24
 25    delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True)
 26
 27
 28class DeliverRawFiles(BaseModel):
 29    class Config(OneOfOptionConfig):
 30        title = "Copy Raw Files"
 31        description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files."
 32        discriminator = "delivery_type"
 33
 34    delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True)
 35
 36    preserve_directory_structure: bool = Field(
 37        title="Preserve Sub-Directories in File Paths",
 38        description=(
 39            "If enabled, sends subdirectory folder structure "
 40            "along with source file names to the destination. "
 41            "Otherwise, files will be synced by their names only. "
 42            "This option is ignored when file-based replication is not enabled."
 43        ),
 44        default=True,
 45    )
 46
 47
 48class AbstractFileBasedSpec(BaseModel):
 49    """
 50    Used during spec; allows the developer to configure the cloud provider specific options
 51    that are needed when users configure a file-based source.
 52    """
 53
 54    start_date: Optional[str] = Field(
 55        title="Start Date",
 56        description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.",
 57        examples=[
 58            "2021-01-01",
 59            "2021-01-01T00:00:00Z",
 60            "2021-01-01T00:00:00.000Z",
 61            "2021-01-01T00:00:00.000000Z",
 62        ],
 63        format="date-time",
 64        pattern=r"^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|[+-][0-9]{2}:[0-9]{2})?)?$",
 65        pattern_descriptor="YYYY-MM-DD, YYYY-MM-DDTHH:mm:ssZ, or YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
 66        order=1,
 67    )
 68
 69    @validator("start_date", pre=True)
 70    def validate_start_date(
 71        cls,  # noqa: N805  # Pydantic validators use cls, not self
 72        v: Optional[str],
 73    ) -> Optional[str]:
 74        """Validate that start_date is a parseable datetime string.
 75
 76        Uses ab_datetime_try_parse which accepts any common ISO8601/RFC3339 format,
 77        including formats with or without microseconds (e.g., both
 78        '2021-01-01T00:00:00Z' and '2021-01-01T00:00:00.000000Z' are valid).
 79        """
 80        if v is None:
 81            return v
 82        parsed = ab_datetime_try_parse(v)
 83        if parsed is None:
 84            raise ValueError(
 85                f"'{v}' is not a valid datetime string. "
 86                "Please use a format like '2021-01-01T00:00:00Z' or '2021-01-01T00:00:00.000000Z'."
 87            )
 88        return v
 89
 90    streams: List[FileBasedStreamConfig] = Field(
 91        title="The list of streams to sync",
 92        description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
 93        order=10,
 94    )
 95
 96    delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field(
 97        title="Delivery Method",
 98        discriminator="delivery_type",
 99        type="object",
100        order=7,
101        display_type="radio",
102        group="advanced",
103        default="use_records_transfer",
104        airbyte_hidden=True,
105    )
106
107    @classmethod
108    @abstractmethod
109    def documentation_url(cls) -> AnyUrl:
110        """
111        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
112        """
113
114    @classmethod
115    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
116        """
117        Generates the mapping comprised of the config fields
118        """
119        schema = super().schema(*args, **kwargs)
120        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
121        schema_helpers.expand_refs(transformed_schema)
122        cls.replace_enum_allOf_and_anyOf(transformed_schema)
123        cls.remove_discriminator(transformed_schema)
124
125        return transformed_schema
126
127    @staticmethod
128    def remove_discriminator(schema: Dict[str, Any]) -> None:
129        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
130        dpath.delete(schema, "properties/**/discriminator")
131
132    @staticmethod
133    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
134        """
135        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
136        Unpacks the enums under allOf and moves them up a level under the enum key
137        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
138        additional validation that an incoming config only matches exactly one of a field's types.
139        """
140        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
141        objects_to_check["type"] = "object"
142        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
143        for format in objects_to_check["oneOf"]:
144            for key in format["properties"]:
145                object_property = format["properties"][key]
146                AbstractFileBasedSpec.move_enum_to_root(object_property)
147
148        properties_to_change = ["validation_policy"]
149        for property_to_change in properties_to_change:
150            property_object = schema["properties"]["streams"]["items"]["properties"][
151                property_to_change
152            ]
153            if "anyOf" in property_object:
154                schema["properties"]["streams"]["items"]["properties"][property_to_change][
155                    "type"
156                ] = "object"
157                schema["properties"]["streams"]["items"]["properties"][property_to_change][
158                    "oneOf"
159                ] = property_object.pop("anyOf")
160            AbstractFileBasedSpec.move_enum_to_root(property_object)
161
162        csv_format_schemas = list(
163            filter(
164                lambda format: format["properties"]["filetype"]["default"] == "csv",
165                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
166            )
167        )
168        if len(csv_format_schemas) != 1:
169            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
170        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
171            "properties"
172        ]["header_definition"].pop("anyOf", [])
173        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
174        return schema
175
176    @staticmethod
177    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
178        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
179            object_property["enum"] = object_property["allOf"][0]["enum"]
180            object_property.pop("allOf")
class DeliverRecords(pydantic.v1.main.BaseModel):
20class DeliverRecords(BaseModel):
21    class Config(OneOfOptionConfig):
22        title = "Replicate Records"
23        description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination."
24        discriminator = "delivery_type"
25
26    delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True)
delivery_type: Literal['use_records_transfer']
class DeliverRecords.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
21    class Config(OneOfOptionConfig):
22        title = "Replicate Records"
23        description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination."
24        discriminator = "delivery_type"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Replicate Records'
description = 'Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.'
discriminator = 'delivery_type'
class DeliverRawFiles(pydantic.v1.main.BaseModel):
29class DeliverRawFiles(BaseModel):
30    class Config(OneOfOptionConfig):
31        title = "Copy Raw Files"
32        description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files."
33        discriminator = "delivery_type"
34
35    delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True)
36
37    preserve_directory_structure: bool = Field(
38        title="Preserve Sub-Directories in File Paths",
39        description=(
40            "If enabled, sends subdirectory folder structure "
41            "along with source file names to the destination. "
42            "Otherwise, files will be synced by their names only. "
43            "This option is ignored when file-based replication is not enabled."
44        ),
45        default=True,
46    )
delivery_type: Literal['use_file_transfer']
preserve_directory_structure: bool
class DeliverRawFiles.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
30    class Config(OneOfOptionConfig):
31        title = "Copy Raw Files"
32        description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files."
33        discriminator = "delivery_type"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Copy Raw Files'
description = 'Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.'
discriminator = 'delivery_type'
class AbstractFileBasedSpec(pydantic.v1.main.BaseModel):
 49class AbstractFileBasedSpec(BaseModel):
 50    """
 51    Used during spec; allows the developer to configure the cloud provider specific options
 52    that are needed when users configure a file-based source.
 53    """
 54
 55    start_date: Optional[str] = Field(
 56        title="Start Date",
 57        description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.",
 58        examples=[
 59            "2021-01-01",
 60            "2021-01-01T00:00:00Z",
 61            "2021-01-01T00:00:00.000Z",
 62            "2021-01-01T00:00:00.000000Z",
 63        ],
 64        format="date-time",
 65        pattern=r"^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}(\.[0-9]+)?(Z|[+-][0-9]{2}:[0-9]{2})?)?$",
 66        pattern_descriptor="YYYY-MM-DD, YYYY-MM-DDTHH:mm:ssZ, or YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
 67        order=1,
 68    )
 69
 70    @validator("start_date", pre=True)
 71    def validate_start_date(
 72        cls,  # noqa: N805  # Pydantic validators use cls, not self
 73        v: Optional[str],
 74    ) -> Optional[str]:
 75        """Validate that start_date is a parseable datetime string.
 76
 77        Uses ab_datetime_try_parse which accepts any common ISO8601/RFC3339 format,
 78        including formats with or without microseconds (e.g., both
 79        '2021-01-01T00:00:00Z' and '2021-01-01T00:00:00.000000Z' are valid).
 80        """
 81        if v is None:
 82            return v
 83        parsed = ab_datetime_try_parse(v)
 84        if parsed is None:
 85            raise ValueError(
 86                f"'{v}' is not a valid datetime string. "
 87                "Please use a format like '2021-01-01T00:00:00Z' or '2021-01-01T00:00:00.000000Z'."
 88            )
 89        return v
 90
 91    streams: List[FileBasedStreamConfig] = Field(
 92        title="The list of streams to sync",
 93        description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
 94        order=10,
 95    )
 96
 97    delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field(
 98        title="Delivery Method",
 99        discriminator="delivery_type",
100        type="object",
101        order=7,
102        display_type="radio",
103        group="advanced",
104        default="use_records_transfer",
105        airbyte_hidden=True,
106    )
107
108    @classmethod
109    @abstractmethod
110    def documentation_url(cls) -> AnyUrl:
111        """
112        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
113        """
114
115    @classmethod
116    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
117        """
118        Generates the mapping comprised of the config fields
119        """
120        schema = super().schema(*args, **kwargs)
121        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
122        schema_helpers.expand_refs(transformed_schema)
123        cls.replace_enum_allOf_and_anyOf(transformed_schema)
124        cls.remove_discriminator(transformed_schema)
125
126        return transformed_schema
127
128    @staticmethod
129    def remove_discriminator(schema: Dict[str, Any]) -> None:
130        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
131        dpath.delete(schema, "properties/**/discriminator")
132
133    @staticmethod
134    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
135        """
136        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
137        Unpacks the enums under allOf and moves them up a level under the enum key
138        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
139        additional validation that an incoming config only matches exactly one of a field's types.
140        """
141        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
142        objects_to_check["type"] = "object"
143        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
144        for format in objects_to_check["oneOf"]:
145            for key in format["properties"]:
146                object_property = format["properties"][key]
147                AbstractFileBasedSpec.move_enum_to_root(object_property)
148
149        properties_to_change = ["validation_policy"]
150        for property_to_change in properties_to_change:
151            property_object = schema["properties"]["streams"]["items"]["properties"][
152                property_to_change
153            ]
154            if "anyOf" in property_object:
155                schema["properties"]["streams"]["items"]["properties"][property_to_change][
156                    "type"
157                ] = "object"
158                schema["properties"]["streams"]["items"]["properties"][property_to_change][
159                    "oneOf"
160                ] = property_object.pop("anyOf")
161            AbstractFileBasedSpec.move_enum_to_root(property_object)
162
163        csv_format_schemas = list(
164            filter(
165                lambda format: format["properties"]["filetype"]["default"] == "csv",
166                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
167            )
168        )
169        if len(csv_format_schemas) != 1:
170            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
171        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
172            "properties"
173        ]["header_definition"].pop("anyOf", [])
174        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
175        return schema
176
177    @staticmethod
178    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
179        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
180            object_property["enum"] = object_property["allOf"][0]["enum"]
181            object_property.pop("allOf")

Used during spec; allows the developer to configure the cloud provider specific options that are needed when users configure a file-based source.

start_date: Optional[str]
@validator('start_date', pre=True)
def validate_start_date(cls, v: Optional[str]) -> Optional[str]:
70    @validator("start_date", pre=True)
71    def validate_start_date(
72        cls,  # noqa: N805  # Pydantic validators use cls, not self
73        v: Optional[str],
74    ) -> Optional[str]:
75        """Validate that start_date is a parseable datetime string.
76
77        Uses ab_datetime_try_parse which accepts any common ISO8601/RFC3339 format,
78        including formats with or without microseconds (e.g., both
79        '2021-01-01T00:00:00Z' and '2021-01-01T00:00:00.000000Z' are valid).
80        """
81        if v is None:
82            return v
83        parsed = ab_datetime_try_parse(v)
84        if parsed is None:
85            raise ValueError(
86                f"'{v}' is not a valid datetime string. "
87                "Please use a format like '2021-01-01T00:00:00Z' or '2021-01-01T00:00:00.000000Z'."
88            )
89        return v

Validate that start_date is a parseable datetime string.

Uses ab_datetime_try_parse which accepts any common ISO8601/RFC3339 format, including formats with or without microseconds (e.g., both '2021-01-01T00:00:00Z' and '2021-01-01T00:00:00.000000Z' are valid).

@classmethod
@abstractmethod
def documentation_url(cls) -> pydantic.v1.networks.AnyUrl:
108    @classmethod
109    @abstractmethod
110    def documentation_url(cls) -> AnyUrl:
111        """
112        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
113        """
Returns

link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"

@classmethod
def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
115    @classmethod
116    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
117        """
118        Generates the mapping comprised of the config fields
119        """
120        schema = super().schema(*args, **kwargs)
121        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
122        schema_helpers.expand_refs(transformed_schema)
123        cls.replace_enum_allOf_and_anyOf(transformed_schema)
124        cls.remove_discriminator(transformed_schema)
125
126        return transformed_schema

Generates the mapping comprised of the config fields

@staticmethod
def remove_discriminator(schema: Dict[str, Any]) -> None:
128    @staticmethod
129    def remove_discriminator(schema: Dict[str, Any]) -> None:
130        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
131        dpath.delete(schema, "properties/**/discriminator")

pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references

@staticmethod
def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
133    @staticmethod
134    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
135        """
136        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
137        Unpacks the enums under allOf and moves them up a level under the enum key
138        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
139        additional validation that an incoming config only matches exactly one of a field's types.
140        """
141        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
142        objects_to_check["type"] = "object"
143        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
144        for format in objects_to_check["oneOf"]:
145            for key in format["properties"]:
146                object_property = format["properties"][key]
147                AbstractFileBasedSpec.move_enum_to_root(object_property)
148
149        properties_to_change = ["validation_policy"]
150        for property_to_change in properties_to_change:
151            property_object = schema["properties"]["streams"]["items"]["properties"][
152                property_to_change
153            ]
154            if "anyOf" in property_object:
155                schema["properties"]["streams"]["items"]["properties"][property_to_change][
156                    "type"
157                ] = "object"
158                schema["properties"]["streams"]["items"]["properties"][property_to_change][
159                    "oneOf"
160                ] = property_object.pop("anyOf")
161            AbstractFileBasedSpec.move_enum_to_root(property_object)
162
163        csv_format_schemas = list(
164            filter(
165                lambda format: format["properties"]["filetype"]["default"] == "csv",
166                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
167            )
168        )
169        if len(csv_format_schemas) != 1:
170            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
171        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
172            "properties"
173        ]["header_definition"].pop("anyOf", [])
174        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
175        return schema

allOfs are not supported by the UI, but pydantic is automatically writing them for enums. Unpacks the enums under allOf and moves them up a level under the enum key anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the additional validation that an incoming config only matches exactly one of a field's types.

@staticmethod
def move_enum_to_root(object_property: Dict[str, Any]) -> None:
177    @staticmethod
178    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
179        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
180            object_property["enum"] = object_property["allOf"][0]["enum"]
181            object_property.pop("allOf")