airbyte_cdk.sources.file_based.config.file_based_stream_config

 1#
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 3#
 4
 5from enum import Enum
 6from typing import Any, List, Mapping, Optional, Union
 7
 8from pydantic.v1 import BaseModel, Field, validator
 9
10from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
11from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
12from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
13from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
14from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
15from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
16from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
17from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema
18
19PrimaryKeyType = Optional[Union[str, List[str]]]
20
21
22class ValidationPolicy(Enum):
23    emit_record = "Emit Record"
24    skip_record = "Skip Record"
25    wait_for_discover = "Wait for Discover"
26
27
28class FileBasedStreamConfig(BaseModel):
29    name: str = Field(title="Name", description="The name of the stream.")
30    globs: Optional[List[str]] = Field(
31        default=["**"],
32        title="Globs",
33        description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.',
34        order=1,
35    )
36    legacy_prefix: Optional[str] = Field(
37        title="Legacy Prefix",
38        description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.",
39        airbyte_hidden=True,
40    )
41    validation_policy: ValidationPolicy = Field(
42        title="Validation Policy",
43        description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
44        default=ValidationPolicy.emit_record,
45    )
46    input_schema: Optional[str] = Field(
47        title="Input Schema",
48        description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
49    )
50    primary_key: Optional[str] = Field(
51        title="Primary Key",
52        description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.",
53        airbyte_hidden=True,  # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here.
54    )
55    days_to_sync_if_history_is_full: int = Field(
56        title="Days To Sync If History Is Full",
57        description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
58        default=3,
59    )
60    format: Union[
61        AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat
62    ] = Field(
63        title="Format",
64        description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
65    )
66    schemaless: bool = Field(
67        title="Schemaless",
68        description="When enabled, syncs will not validate or structure records against the stream's schema.",
69        default=False,
70    )
71    recent_n_files_to_read_for_schema_discovery: Optional[int] = Field(
72        title="Files To Read For Schema Discover",
73        description="The number of resent files which will be used to discover the schema for this stream.",
74        default=None,
75        gt=0,
76    )
77
78    @validator("input_schema", pre=True)
79    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
80        if v:
81            if type_mapping_to_jsonschema(v):
82                return v
83            else:
84                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
85        return None
86
87    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
88        """
89        User defined input_schema is defined as a string in the config. This method takes the string representation
90        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
91        """
92        if self.input_schema:
93            schema = type_mapping_to_jsonschema(self.input_schema)
94            if not schema:
95                raise ValueError(
96                    f"Unable to create JSON schema from input schema {self.input_schema}"
97                )
98            return schema
99        return None
PrimaryKeyType = typing.Union[str, typing.List[str], NoneType]
class ValidationPolicy(enum.Enum):
23class ValidationPolicy(Enum):
24    emit_record = "Emit Record"
25    skip_record = "Skip Record"
26    wait_for_discover = "Wait for Discover"

An enumeration.

emit_record = <ValidationPolicy.emit_record: 'Emit Record'>
skip_record = <ValidationPolicy.skip_record: 'Skip Record'>
wait_for_discover = <ValidationPolicy.wait_for_discover: 'Wait for Discover'>
class FileBasedStreamConfig(pydantic.v1.main.BaseModel):
 29class FileBasedStreamConfig(BaseModel):
 30    name: str = Field(title="Name", description="The name of the stream.")
 31    globs: Optional[List[str]] = Field(
 32        default=["**"],
 33        title="Globs",
 34        description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.',
 35        order=1,
 36    )
 37    legacy_prefix: Optional[str] = Field(
 38        title="Legacy Prefix",
 39        description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.",
 40        airbyte_hidden=True,
 41    )
 42    validation_policy: ValidationPolicy = Field(
 43        title="Validation Policy",
 44        description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
 45        default=ValidationPolicy.emit_record,
 46    )
 47    input_schema: Optional[str] = Field(
 48        title="Input Schema",
 49        description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
 50    )
 51    primary_key: Optional[str] = Field(
 52        title="Primary Key",
 53        description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.",
 54        airbyte_hidden=True,  # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here.
 55    )
 56    days_to_sync_if_history_is_full: int = Field(
 57        title="Days To Sync If History Is Full",
 58        description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
 59        default=3,
 60    )
 61    format: Union[
 62        AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat
 63    ] = Field(
 64        title="Format",
 65        description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
 66    )
 67    schemaless: bool = Field(
 68        title="Schemaless",
 69        description="When enabled, syncs will not validate or structure records against the stream's schema.",
 70        default=False,
 71    )
 72    recent_n_files_to_read_for_schema_discovery: Optional[int] = Field(
 73        title="Files To Read For Schema Discover",
 74        description="The number of resent files which will be used to discover the schema for this stream.",
 75        default=None,
 76        gt=0,
 77    )
 78
 79    @validator("input_schema", pre=True)
 80    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
 81        if v:
 82            if type_mapping_to_jsonschema(v):
 83                return v
 84            else:
 85                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
 86        return None
 87
 88    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
 89        """
 90        User defined input_schema is defined as a string in the config. This method takes the string representation
 91        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
 92        """
 93        if self.input_schema:
 94            schema = type_mapping_to_jsonschema(self.input_schema)
 95            if not schema:
 96                raise ValueError(
 97                    f"Unable to create JSON schema from input schema {self.input_schema}"
 98                )
 99            return schema
100        return None
name: str
globs: Optional[List[str]]
legacy_prefix: Optional[str]
validation_policy: ValidationPolicy
input_schema: Optional[str]
primary_key: Optional[str]
days_to_sync_if_history_is_full: int
schemaless: bool
recent_n_files_to_read_for_schema_discovery: Optional[int]
@validator('input_schema', pre=True)
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
79    @validator("input_schema", pre=True)
80    def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
81        if v:
82            if type_mapping_to_jsonschema(v):
83                return v
84            else:
85                raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
86        return None
def get_input_schema(self) -> Optional[Mapping[str, Any]]:
 88    def get_input_schema(self) -> Optional[Mapping[str, Any]]:
 89        """
 90        User defined input_schema is defined as a string in the config. This method takes the string representation
 91        and converts it into a Mapping[str, Any] which is used by file-based CDK components.
 92        """
 93        if self.input_schema:
 94            schema = type_mapping_to_jsonschema(self.input_schema)
 95            if not schema:
 96                raise ValueError(
 97                    f"Unable to create JSON schema from input schema {self.input_schema}"
 98                )
 99            return schema
100        return None

User defined input_schema is defined as a string in the config. This method takes the string representation and converts it into a Mapping[str, Any] which is used by file-based CDK components.