airbyte_cdk.sources.file_based.config.file_based_stream_config
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5from enum import Enum 6from typing import Any, List, Mapping, Optional, Union 7 8from pydantic.v1 import BaseModel, Field, validator 9 10from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat 11from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat 12from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat 13from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat 14from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat 15from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat 16from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError 17from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema 18 19PrimaryKeyType = Optional[Union[str, List[str]]] 20 21 22class ValidationPolicy(Enum): 23 emit_record = "Emit Record" 24 skip_record = "Skip Record" 25 wait_for_discover = "Wait for Discover" 26 27 28class FileBasedStreamConfig(BaseModel): 29 name: str = Field(title="Name", description="The name of the stream.") 30 globs: Optional[List[str]] = Field( 31 default=["**"], 32 title="Globs", 33 description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.', 34 order=1, 35 ) 36 legacy_prefix: Optional[str] = Field( 37 title="Legacy Prefix", 38 description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.", 39 airbyte_hidden=True, 40 ) 41 validation_policy: ValidationPolicy = Field( 42 title="Validation Policy", 43 description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.", 44 default=ValidationPolicy.emit_record, 45 ) 46 input_schema: Optional[str] = Field( 47 title="Input Schema", 48 description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.", 49 ) 50 primary_key: Optional[str] = Field( 51 title="Primary Key", 52 description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.", 53 airbyte_hidden=True, # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here. 54 ) 55 days_to_sync_if_history_is_full: int = Field( 56 title="Days To Sync If History Is Full", 57 description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.", 58 default=3, 59 ) 60 format: Union[ 61 AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat 62 ] = Field( 63 title="Format", 64 description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", 65 ) 66 schemaless: bool = Field( 67 title="Schemaless", 68 description="When enabled, syncs will not validate or structure records against the stream's schema.", 69 default=False, 70 ) 71 recent_n_files_to_read_for_schema_discovery: Optional[int] = Field( 72 title="Files To Read For Schema Discover", 73 description="The number of resent files which will be used to discover the schema for this stream.", 74 default=None, 75 gt=0, 76 ) 77 78 @validator("input_schema", pre=True) 79 def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: 80 if v: 81 if type_mapping_to_jsonschema(v): 82 return v 83 else: 84 raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA) 85 return None 86 87 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 88 """ 89 User defined input_schema is defined as a string in the config. This method takes the string representation 90 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 91 """ 92 if self.input_schema: 93 schema = type_mapping_to_jsonschema(self.input_schema) 94 if not schema: 95 raise ValueError( 96 f"Unable to create JSON schema from input schema {self.input_schema}" 97 ) 98 return schema 99 return None
PrimaryKeyType =
typing.Union[str, typing.List[str], NoneType]
class
ValidationPolicy(enum.Enum):
23class ValidationPolicy(Enum): 24 emit_record = "Emit Record" 25 skip_record = "Skip Record" 26 wait_for_discover = "Wait for Discover"
An enumeration.
emit_record =
<ValidationPolicy.emit_record: 'Emit Record'>
skip_record =
<ValidationPolicy.skip_record: 'Skip Record'>
wait_for_discover =
<ValidationPolicy.wait_for_discover: 'Wait for Discover'>
class
FileBasedStreamConfig(pydantic.v1.main.BaseModel):
29class FileBasedStreamConfig(BaseModel): 30 name: str = Field(title="Name", description="The name of the stream.") 31 globs: Optional[List[str]] = Field( 32 default=["**"], 33 title="Globs", 34 description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.', 35 order=1, 36 ) 37 legacy_prefix: Optional[str] = Field( 38 title="Legacy Prefix", 39 description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.", 40 airbyte_hidden=True, 41 ) 42 validation_policy: ValidationPolicy = Field( 43 title="Validation Policy", 44 description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.", 45 default=ValidationPolicy.emit_record, 46 ) 47 input_schema: Optional[str] = Field( 48 title="Input Schema", 49 description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.", 50 ) 51 primary_key: Optional[str] = Field( 52 title="Primary Key", 53 description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.", 54 airbyte_hidden=True, # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here. 55 ) 56 days_to_sync_if_history_is_full: int = Field( 57 title="Days To Sync If History Is Full", 58 description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.", 59 default=3, 60 ) 61 format: Union[ 62 AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat 63 ] = Field( 64 title="Format", 65 description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", 66 ) 67 schemaless: bool = Field( 68 title="Schemaless", 69 description="When enabled, syncs will not validate or structure records against the stream's schema.", 70 default=False, 71 ) 72 recent_n_files_to_read_for_schema_discovery: Optional[int] = Field( 73 title="Files To Read For Schema Discover", 74 description="The number of resent files which will be used to discover the schema for this stream.", 75 default=None, 76 gt=0, 77 ) 78 79 @validator("input_schema", pre=True) 80 def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: 81 if v: 82 if type_mapping_to_jsonschema(v): 83 return v 84 else: 85 raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA) 86 return None 87 88 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 89 """ 90 User defined input_schema is defined as a string in the config. This method takes the string representation 91 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 92 """ 93 if self.input_schema: 94 schema = type_mapping_to_jsonschema(self.input_schema) 95 if not schema: 96 raise ValueError( 97 f"Unable to create JSON schema from input schema {self.input_schema}" 98 ) 99 return schema 100 return None
validation_policy: ValidationPolicy
format: Union[airbyte_cdk.sources.file_based.config.avro_format.AvroFormat, airbyte_cdk.sources.file_based.CsvFormat, airbyte_cdk.sources.file_based.JsonlFormat, airbyte_cdk.sources.file_based.config.parquet_format.ParquetFormat, airbyte_cdk.sources.file_based.config.unstructured_format.UnstructuredFormat, airbyte_cdk.sources.file_based.config.excel_format.ExcelFormat]
@validator('input_schema', pre=True)
def
validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
def
get_input_schema(self) -> Optional[Mapping[str, Any]]:
88 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 89 """ 90 User defined input_schema is defined as a string in the config. This method takes the string representation 91 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 92 """ 93 if self.input_schema: 94 schema = type_mapping_to_jsonschema(self.input_schema) 95 if not schema: 96 raise ValueError( 97 f"Unable to create JSON schema from input schema {self.input_schema}" 98 ) 99 return schema 100 return None
User defined input_schema is defined as a string in the config. This method takes the string representation and converts it into a Mapping[str, Any] which is used by file-based CDK components.