airbyte_cdk.sources.file_based.config.abstract_file_based_spec

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6from abc import abstractmethod
  7from typing import Any, Dict, List, Literal, Optional, Union
  8
  9import dpath
 10from pydantic.v1 import AnyUrl, BaseModel, Field
 11
 12from airbyte_cdk import OneOfOptionConfig
 13from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
 14from airbyte_cdk.sources.specs.transfer_modes import DeliverPermissions
 15from airbyte_cdk.sources.utils import schema_helpers
 16
 17
 18class DeliverRecords(BaseModel):
 19    class Config(OneOfOptionConfig):
 20        title = "Replicate Records"
 21        description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination."
 22        discriminator = "delivery_type"
 23
 24    delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True)
 25
 26
 27class DeliverRawFiles(BaseModel):
 28    class Config(OneOfOptionConfig):
 29        title = "Copy Raw Files"
 30        description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files."
 31        discriminator = "delivery_type"
 32
 33    delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True)
 34
 35    preserve_directory_structure: bool = Field(
 36        title="Preserve Sub-Directories in File Paths",
 37        description=(
 38            "If enabled, sends subdirectory folder structure "
 39            "along with source file names to the destination. "
 40            "Otherwise, files will be synced by their names only. "
 41            "This option is ignored when file-based replication is not enabled."
 42        ),
 43        default=True,
 44    )
 45
 46
 47class AbstractFileBasedSpec(BaseModel):
 48    """
 49    Used during spec; allows the developer to configure the cloud provider specific options
 50    that are needed when users configure a file-based source.
 51    """
 52
 53    start_date: Optional[str] = Field(
 54        title="Start Date",
 55        description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.",
 56        examples=["2021-01-01T00:00:00.000000Z"],
 57        format="date-time",
 58        pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$",
 59        pattern_descriptor="YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
 60        order=1,
 61    )
 62
 63    streams: List[FileBasedStreamConfig] = Field(
 64        title="The list of streams to sync",
 65        description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
 66        order=10,
 67    )
 68
 69    delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field(
 70        title="Delivery Method",
 71        discriminator="delivery_type",
 72        type="object",
 73        order=7,
 74        display_type="radio",
 75        group="advanced",
 76        default="use_records_transfer",
 77        airbyte_hidden=True,
 78    )
 79
 80    @classmethod
 81    @abstractmethod
 82    def documentation_url(cls) -> AnyUrl:
 83        """
 84        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
 85        """
 86
 87    @classmethod
 88    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
 89        """
 90        Generates the mapping comprised of the config fields
 91        """
 92        schema = super().schema(*args, **kwargs)
 93        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
 94        schema_helpers.expand_refs(transformed_schema)
 95        cls.replace_enum_allOf_and_anyOf(transformed_schema)
 96        cls.remove_discriminator(transformed_schema)
 97
 98        return transformed_schema
 99
100    @staticmethod
101    def remove_discriminator(schema: Dict[str, Any]) -> None:
102        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
103        dpath.delete(schema, "properties/**/discriminator")
104
105    @staticmethod
106    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
107        """
108        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
109        Unpacks the enums under allOf and moves them up a level under the enum key
110        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
111        additional validation that an incoming config only matches exactly one of a field's types.
112        """
113        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
114        objects_to_check["type"] = "object"
115        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
116        for format in objects_to_check["oneOf"]:
117            for key in format["properties"]:
118                object_property = format["properties"][key]
119                AbstractFileBasedSpec.move_enum_to_root(object_property)
120
121        properties_to_change = ["validation_policy"]
122        for property_to_change in properties_to_change:
123            property_object = schema["properties"]["streams"]["items"]["properties"][
124                property_to_change
125            ]
126            if "anyOf" in property_object:
127                schema["properties"]["streams"]["items"]["properties"][property_to_change][
128                    "type"
129                ] = "object"
130                schema["properties"]["streams"]["items"]["properties"][property_to_change][
131                    "oneOf"
132                ] = property_object.pop("anyOf")
133            AbstractFileBasedSpec.move_enum_to_root(property_object)
134
135        csv_format_schemas = list(
136            filter(
137                lambda format: format["properties"]["filetype"]["default"] == "csv",
138                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
139            )
140        )
141        if len(csv_format_schemas) != 1:
142            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
143        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
144            "properties"
145        ]["header_definition"].pop("anyOf", [])
146        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
147        return schema
148
149    @staticmethod
150    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
151        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
152            object_property["enum"] = object_property["allOf"][0]["enum"]
153            object_property.pop("allOf")
class DeliverRecords(pydantic.v1.main.BaseModel):
19class DeliverRecords(BaseModel):
20    class Config(OneOfOptionConfig):
21        title = "Replicate Records"
22        description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination."
23        discriminator = "delivery_type"
24
25    delivery_type: Literal["use_records_transfer"] = Field("use_records_transfer", const=True)
delivery_type: Literal['use_records_transfer']
class DeliverRecords.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
20    class Config(OneOfOptionConfig):
21        title = "Replicate Records"
22        description = "Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination."
23        discriminator = "delivery_type"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Replicate Records'
description = 'Recommended - Extract and load structured records into your destination of choice. This is the classic method of moving data in Airbyte. It allows for blocking and hashing individual fields or files from a structured schema. Data can be flattened, typed and deduped depending on the destination.'
discriminator = 'delivery_type'
class DeliverRawFiles(pydantic.v1.main.BaseModel):
28class DeliverRawFiles(BaseModel):
29    class Config(OneOfOptionConfig):
30        title = "Copy Raw Files"
31        description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files."
32        discriminator = "delivery_type"
33
34    delivery_type: Literal["use_file_transfer"] = Field("use_file_transfer", const=True)
35
36    preserve_directory_structure: bool = Field(
37        title="Preserve Sub-Directories in File Paths",
38        description=(
39            "If enabled, sends subdirectory folder structure "
40            "along with source file names to the destination. "
41            "Otherwise, files will be synced by their names only. "
42            "This option is ignored when file-based replication is not enabled."
43        ),
44        default=True,
45    )
delivery_type: Literal['use_file_transfer']
preserve_directory_structure: bool
class DeliverRawFiles.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
29    class Config(OneOfOptionConfig):
30        title = "Copy Raw Files"
31        description = "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files."
32        discriminator = "delivery_type"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Copy Raw Files'
description = 'Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.'
discriminator = 'delivery_type'
class AbstractFileBasedSpec(pydantic.v1.main.BaseModel):
 48class AbstractFileBasedSpec(BaseModel):
 49    """
 50    Used during spec; allows the developer to configure the cloud provider specific options
 51    that are needed when users configure a file-based source.
 52    """
 53
 54    start_date: Optional[str] = Field(
 55        title="Start Date",
 56        description="UTC date and time in the format 2017-01-25T00:00:00.000000Z. Any file modified before this date will not be replicated.",
 57        examples=["2021-01-01T00:00:00.000000Z"],
 58        format="date-time",
 59        pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}Z$",
 60        pattern_descriptor="YYYY-MM-DDTHH:mm:ss.SSSSSSZ",
 61        order=1,
 62    )
 63
 64    streams: List[FileBasedStreamConfig] = Field(
 65        title="The list of streams to sync",
 66        description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
 67        order=10,
 68    )
 69
 70    delivery_method: Union[DeliverRecords, DeliverRawFiles, DeliverPermissions] = Field(
 71        title="Delivery Method",
 72        discriminator="delivery_type",
 73        type="object",
 74        order=7,
 75        display_type="radio",
 76        group="advanced",
 77        default="use_records_transfer",
 78        airbyte_hidden=True,
 79    )
 80
 81    @classmethod
 82    @abstractmethod
 83    def documentation_url(cls) -> AnyUrl:
 84        """
 85        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
 86        """
 87
 88    @classmethod
 89    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
 90        """
 91        Generates the mapping comprised of the config fields
 92        """
 93        schema = super().schema(*args, **kwargs)
 94        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
 95        schema_helpers.expand_refs(transformed_schema)
 96        cls.replace_enum_allOf_and_anyOf(transformed_schema)
 97        cls.remove_discriminator(transformed_schema)
 98
 99        return transformed_schema
100
101    @staticmethod
102    def remove_discriminator(schema: Dict[str, Any]) -> None:
103        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
104        dpath.delete(schema, "properties/**/discriminator")
105
106    @staticmethod
107    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
108        """
109        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
110        Unpacks the enums under allOf and moves them up a level under the enum key
111        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
112        additional validation that an incoming config only matches exactly one of a field's types.
113        """
114        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
115        objects_to_check["type"] = "object"
116        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
117        for format in objects_to_check["oneOf"]:
118            for key in format["properties"]:
119                object_property = format["properties"][key]
120                AbstractFileBasedSpec.move_enum_to_root(object_property)
121
122        properties_to_change = ["validation_policy"]
123        for property_to_change in properties_to_change:
124            property_object = schema["properties"]["streams"]["items"]["properties"][
125                property_to_change
126            ]
127            if "anyOf" in property_object:
128                schema["properties"]["streams"]["items"]["properties"][property_to_change][
129                    "type"
130                ] = "object"
131                schema["properties"]["streams"]["items"]["properties"][property_to_change][
132                    "oneOf"
133                ] = property_object.pop("anyOf")
134            AbstractFileBasedSpec.move_enum_to_root(property_object)
135
136        csv_format_schemas = list(
137            filter(
138                lambda format: format["properties"]["filetype"]["default"] == "csv",
139                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
140            )
141        )
142        if len(csv_format_schemas) != 1:
143            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
144        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
145            "properties"
146        ]["header_definition"].pop("anyOf", [])
147        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
148        return schema
149
150    @staticmethod
151    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
152        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
153            object_property["enum"] = object_property["allOf"][0]["enum"]
154            object_property.pop("allOf")

Used during spec; allows the developer to configure the cloud provider specific options that are needed when users configure a file-based source.

start_date: Optional[str]
@classmethod
@abstractmethod
def documentation_url(cls) -> pydantic.v1.networks.AnyUrl:
81    @classmethod
82    @abstractmethod
83    def documentation_url(cls) -> AnyUrl:
84        """
85        :return: link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"
86        """
Returns

link to docs page for this source e.g. "https://docs.airbyte.com/integrations/sources/s3"

@classmethod
def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
88    @classmethod
89    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
90        """
91        Generates the mapping comprised of the config fields
92        """
93        schema = super().schema(*args, **kwargs)
94        transformed_schema: Dict[str, Any] = copy.deepcopy(schema)
95        schema_helpers.expand_refs(transformed_schema)
96        cls.replace_enum_allOf_and_anyOf(transformed_schema)
97        cls.remove_discriminator(transformed_schema)
98
99        return transformed_schema

Generates the mapping comprised of the config fields

@staticmethod
def remove_discriminator(schema: Dict[str, Any]) -> None:
101    @staticmethod
102    def remove_discriminator(schema: Dict[str, Any]) -> None:
103        """pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references"""
104        dpath.delete(schema, "properties/**/discriminator")

pydantic adds "discriminator" to the schema for oneOfs, which is not treated right by the platform as we inline all references

@staticmethod
def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
106    @staticmethod
107    def replace_enum_allOf_and_anyOf(schema: Dict[str, Any]) -> Dict[str, Any]:
108        """
109        allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
110        Unpacks the enums under allOf and moves them up a level under the enum key
111        anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
112        additional validation that an incoming config only matches exactly one of a field's types.
113        """
114        objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
115        objects_to_check["type"] = "object"
116        objects_to_check["oneOf"] = objects_to_check.pop("anyOf", [])
117        for format in objects_to_check["oneOf"]:
118            for key in format["properties"]:
119                object_property = format["properties"][key]
120                AbstractFileBasedSpec.move_enum_to_root(object_property)
121
122        properties_to_change = ["validation_policy"]
123        for property_to_change in properties_to_change:
124            property_object = schema["properties"]["streams"]["items"]["properties"][
125                property_to_change
126            ]
127            if "anyOf" in property_object:
128                schema["properties"]["streams"]["items"]["properties"][property_to_change][
129                    "type"
130                ] = "object"
131                schema["properties"]["streams"]["items"]["properties"][property_to_change][
132                    "oneOf"
133                ] = property_object.pop("anyOf")
134            AbstractFileBasedSpec.move_enum_to_root(property_object)
135
136        csv_format_schemas = list(
137            filter(
138                lambda format: format["properties"]["filetype"]["default"] == "csv",
139                schema["properties"]["streams"]["items"]["properties"]["format"]["oneOf"],
140            )
141        )
142        if len(csv_format_schemas) != 1:
143            raise ValueError(f"Expecting only one CSV format but got {csv_format_schemas}")
144        csv_format_schemas[0]["properties"]["header_definition"]["oneOf"] = csv_format_schemas[0][
145            "properties"
146        ]["header_definition"].pop("anyOf", [])
147        csv_format_schemas[0]["properties"]["header_definition"]["type"] = "object"
148        return schema

allOfs are not supported by the UI, but pydantic is automatically writing them for enums. Unpacks the enums under allOf and moves them up a level under the enum key anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the additional validation that an incoming config only matches exactly one of a field's types.

@staticmethod
def move_enum_to_root(object_property: Dict[str, Any]) -> None:
150    @staticmethod
151    def move_enum_to_root(object_property: Dict[str, Any]) -> None:
152        if "allOf" in object_property and "enum" in object_property["allOf"][0]:
153            object_property["enum"] = object_property["allOf"][0]["enum"]
154            object_property.pop("allOf")