airbyte_cdk.sources.file_based.config.file_based_stream_config

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5from enum import Enum
  6from typing import Any, Dict, List, Mapping, Optional, Union
  7
  8from pydantic.v1 import BaseModel, Field, root_validator, validator
  9
 10from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
 11from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
 12from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
 13from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
 14from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
 15from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
 16from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
 17from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema
 18
 19PrimaryKeyType = Optional[Union[str, List[str]]]
 20
 21
 22class ValidationPolicy(Enum):
 23    emit_record = "Emit Record"
 24    skip_record = "Skip Record"
 25    wait_for_discover = "Wait for Discover"
 26
 27
 28class FileBasedStreamConfig(BaseModel):
 29    name: str = Field(title="Name", description="The name of the stream.")
 30    globs: Optional[List[str]] = Field(
 31        default=["**"],
 32        title="Globs",
 33        description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.',
 34        order=1,
 35    )
 36    legacy_prefix: Optional[str] = Field(
 37        title="Legacy Prefix",
 38        description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.",
 39        airbyte_hidden=True,
 40    )
 41    validation_policy: ValidationPolicy = Field(
 42        title="Validation Policy",
 43        description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
 44        default=ValidationPolicy.emit_record,
 45    )
 46    input_schema: Optional[str] = Field(
 47        title="Input Schema",
 48        description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
 49    )
 50    primary_key: Optional[str] = Field(
 51        title="Primary Key",
 52        description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.",
 53        airbyte_hidden=True,  # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here.
 54    )
 55    days_to_sync_if_history_is_full: int = Field(
 56        title="Days To Sync If History Is Full",
 57        description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
 58        default=3,
 59    )
 60    format: Union[
 61        AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat
 62    ] = Field(
 63        title="Format",
 64        description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
 65    )
 66    schemaless: bool = Field(
 67        title="Schemaless",
 68        description="When enabled, syncs will not validate or structure records against the stream's schema.",
 69        default=False,
 70    )
 71    recent_n_files_to_read_for_schema_discovery: Optional[int] = Field(
 72        title="Files To Read For Schema Discover",
 73        description="The number of resent files which will be used to discover the schema for this stream.",
 74        default=None,
 75        gt=0,
 76    )
 77    use_first_found_file_for_schema_discovery: bool = Field(
 78        title="Use First Found File For Schema Discover",
 79        description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.",
 80        default=False,
 81    )
 82
 83    @validator("input_schema", pre=True)
 84    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
 85        if v:
 86            if type_mapping_to_jsonschema(v):
 87                return v
 88            else:
 89                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
 90        return None
 91
 92    @root_validator
 93    def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]:
 94        """
 95        Please update this validation when new related to schema discovery field is added.
 96        Validates schema discovery options compatibility.
 97        Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided.
 98        So this method doesn't check it to do not break already created connections.
 99        If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided,
100        recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default.
101        """
102        input_schema = values["input_schema"] is not None
103        schemaless = values["schemaless"]
104        recent_n_files_to_read_for_schema_discovery = (
105            values["recent_n_files_to_read_for_schema_discovery"] is not None
106        )
107        use_first_found_file_for_schema_discovery = values[
108            "use_first_found_file_for_schema_discovery"
109        ]
110
111        if (
112            recent_n_files_to_read_for_schema_discovery
113            and use_first_found_file_for_schema_discovery
114        ) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1:
115            raise ConfigValidationError(
116                FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS
117            )
118
119        return values
120
121    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
122        """
123        User defined input_schema is defined as a string in the config. This method takes the string representation
124        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
125        """
126        if self.input_schema:
127            schema = type_mapping_to_jsonschema(self.input_schema)
128            if not schema:
129                raise ValueError(
130                    f"Unable to create JSON schema from input schema {self.input_schema}"
131                )
132            return schema
133        return None
PrimaryKeyType = typing.Union[str, typing.List[str], NoneType]
class ValidationPolicy(enum.Enum):
23class ValidationPolicy(Enum):
24    emit_record = "Emit Record"
25    skip_record = "Skip Record"
26    wait_for_discover = "Wait for Discover"
emit_record = <ValidationPolicy.emit_record: 'Emit Record'>
skip_record = <ValidationPolicy.skip_record: 'Skip Record'>
wait_for_discover = <ValidationPolicy.wait_for_discover: 'Wait for Discover'>
class FileBasedStreamConfig(pydantic.v1.main.BaseModel):
 29class FileBasedStreamConfig(BaseModel):
 30    name: str = Field(title="Name", description="The name of the stream.")
 31    globs: Optional[List[str]] = Field(
 32        default=["**"],
 33        title="Globs",
 34        description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.',
 35        order=1,
 36    )
 37    legacy_prefix: Optional[str] = Field(
 38        title="Legacy Prefix",
 39        description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.",
 40        airbyte_hidden=True,
 41    )
 42    validation_policy: ValidationPolicy = Field(
 43        title="Validation Policy",
 44        description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
 45        default=ValidationPolicy.emit_record,
 46    )
 47    input_schema: Optional[str] = Field(
 48        title="Input Schema",
 49        description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
 50    )
 51    primary_key: Optional[str] = Field(
 52        title="Primary Key",
 53        description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.",
 54        airbyte_hidden=True,  # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here.
 55    )
 56    days_to_sync_if_history_is_full: int = Field(
 57        title="Days To Sync If History Is Full",
 58        description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
 59        default=3,
 60    )
 61    format: Union[
 62        AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat
 63    ] = Field(
 64        title="Format",
 65        description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
 66    )
 67    schemaless: bool = Field(
 68        title="Schemaless",
 69        description="When enabled, syncs will not validate or structure records against the stream's schema.",
 70        default=False,
 71    )
 72    recent_n_files_to_read_for_schema_discovery: Optional[int] = Field(
 73        title="Files To Read For Schema Discover",
 74        description="The number of resent files which will be used to discover the schema for this stream.",
 75        default=None,
 76        gt=0,
 77    )
 78    use_first_found_file_for_schema_discovery: bool = Field(
 79        title="Use First Found File For Schema Discover",
 80        description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.",
 81        default=False,
 82    )
 83
 84    @validator("input_schema", pre=True)
 85    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
 86        if v:
 87            if type_mapping_to_jsonschema(v):
 88                return v
 89            else:
 90                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
 91        return None
 92
 93    @root_validator
 94    def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]:
 95        """
 96        Please update this validation when new related to schema discovery field is added.
 97        Validates schema discovery options compatibility.
 98        Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided.
 99        So this method doesn't check it to do not break already created connections.
100        If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided,
101        recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default.
102        """
103        input_schema = values["input_schema"] is not None
104        schemaless = values["schemaless"]
105        recent_n_files_to_read_for_schema_discovery = (
106            values["recent_n_files_to_read_for_schema_discovery"] is not None
107        )
108        use_first_found_file_for_schema_discovery = values[
109            "use_first_found_file_for_schema_discovery"
110        ]
111
112        if (
113            recent_n_files_to_read_for_schema_discovery
114            and use_first_found_file_for_schema_discovery
115        ) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1:
116            raise ConfigValidationError(
117                FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS
118            )
119
120        return values
121
122    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
123        """
124        User defined input_schema is defined as a string in the config. This method takes the string representation
125        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
126        """
127        if self.input_schema:
128            schema = type_mapping_to_jsonschema(self.input_schema)
129            if not schema:
130                raise ValueError(
131                    f"Unable to create JSON schema from input schema {self.input_schema}"
132                )
133            return schema
134        return None
name: str
globs: Optional[List[str]]
legacy_prefix: Optional[str]
validation_policy: ValidationPolicy
input_schema: Optional[str]
primary_key: Optional[str]
days_to_sync_if_history_is_full: int
schemaless: bool
recent_n_files_to_read_for_schema_discovery: Optional[int]
use_first_found_file_for_schema_discovery: bool
@validator('input_schema', pre=True)
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
84    @validator("input_schema", pre=True)
85    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
86        if v:
87            if type_mapping_to_jsonschema(v):
88                return v
89            else:
90                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
91        return None
def get_input_schema(self) -> Optional[Mapping[str, Any]]:
122    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
123        """
124        User defined input_schema is defined as a string in the config. This method takes the string representation
125        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
126        """
127        if self.input_schema:
128            schema = type_mapping_to_jsonschema(self.input_schema)
129            if not schema:
130                raise ValueError(
131                    f"Unable to create JSON schema from input schema {self.input_schema}"
132                )
133            return schema
134        return None

User defined input_schema is defined as a string in the config. This method takes the string representation and converts it into a Mapping[str, Any] which is used by file-based CDK components.