airbyte_cdk.sources.file_based.config.file_based_stream_config
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5from enum import Enum 6from typing import Any, Dict, List, Mapping, Optional, Union 7 8from pydantic.v1 import BaseModel, Field, root_validator, validator 9 10from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat 11from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat 12from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat 13from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat 14from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat 15from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat 16from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError 17from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema 18 19PrimaryKeyType = Optional[Union[str, List[str]]] 20 21 22class ValidationPolicy(Enum): 23 emit_record = "Emit Record" 24 skip_record = "Skip Record" 25 wait_for_discover = "Wait for Discover" 26 27 28class FileBasedStreamConfig(BaseModel): 29 name: str = Field(title="Name", description="The name of the stream.") 30 globs: Optional[List[str]] = Field( 31 default=["**"], 32 title="Globs", 33 description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.', 34 order=1, 35 ) 36 legacy_prefix: Optional[str] = Field( 37 title="Legacy Prefix", 38 description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.", 39 airbyte_hidden=True, 40 ) 41 validation_policy: ValidationPolicy = Field( 42 title="Validation Policy", 43 description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.", 44 default=ValidationPolicy.emit_record, 45 ) 46 input_schema: Optional[str] = Field( 47 title="Input Schema", 48 description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.", 49 ) 50 primary_key: Optional[str] = Field( 51 title="Primary Key", 52 description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.", 53 airbyte_hidden=True, # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here. 54 ) 55 days_to_sync_if_history_is_full: int = Field( 56 title="Days To Sync If History Is Full", 57 description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.", 58 default=3, 59 ) 60 format: Union[ 61 AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat 62 ] = Field( 63 title="Format", 64 description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", 65 ) 66 schemaless: bool = Field( 67 title="Schemaless", 68 description="When enabled, syncs will not validate or structure records against the stream's schema.", 69 default=False, 70 ) 71 recent_n_files_to_read_for_schema_discovery: Optional[int] = Field( 72 title="Files To Read For Schema Discover", 73 description="The number of resent files which will be used to discover the schema for this stream.", 74 default=None, 75 gt=0, 76 ) 77 use_first_found_file_for_schema_discovery: bool = Field( 78 title="Use First Found File For Schema Discover", 79 description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.", 80 default=False, 81 ) 82 83 @validator("input_schema", pre=True) 84 def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: 85 if v: 86 if type_mapping_to_jsonschema(v): 87 return v 88 else: 89 raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA) 90 return None 91 92 @root_validator 93 def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]: 94 """ 95 Please update this validation when new related to schema discovery field is added. 96 Validates schema discovery options compatibility. 97 Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided. 98 So this method doesn't check it to do not break already created connections. 99 If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided, 100 recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default. 101 """ 102 input_schema = values["input_schema"] is not None 103 schemaless = values["schemaless"] 104 recent_n_files_to_read_for_schema_discovery = ( 105 values["recent_n_files_to_read_for_schema_discovery"] is not None 106 ) 107 use_first_found_file_for_schema_discovery = values[ 108 "use_first_found_file_for_schema_discovery" 109 ] 110 111 if ( 112 recent_n_files_to_read_for_schema_discovery 113 and use_first_found_file_for_schema_discovery 114 ) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1: 115 raise ConfigValidationError( 116 FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS 117 ) 118 119 return values 120 121 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 122 """ 123 User defined input_schema is defined as a string in the config. This method takes the string representation 124 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 125 """ 126 if self.input_schema: 127 schema = type_mapping_to_jsonschema(self.input_schema) 128 if not schema: 129 raise ValueError( 130 f"Unable to create JSON schema from input schema {self.input_schema}" 131 ) 132 return schema 133 return None
PrimaryKeyType =
typing.Union[str, typing.List[str], NoneType]
class
ValidationPolicy(enum.Enum):
23class ValidationPolicy(Enum): 24 emit_record = "Emit Record" 25 skip_record = "Skip Record" 26 wait_for_discover = "Wait for Discover"
emit_record =
<ValidationPolicy.emit_record: 'Emit Record'>
skip_record =
<ValidationPolicy.skip_record: 'Skip Record'>
wait_for_discover =
<ValidationPolicy.wait_for_discover: 'Wait for Discover'>
class
FileBasedStreamConfig(pydantic.v1.main.BaseModel):
29class FileBasedStreamConfig(BaseModel): 30 name: str = Field(title="Name", description="The name of the stream.") 31 globs: Optional[List[str]] = Field( 32 default=["**"], 33 title="Globs", 34 description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.', 35 order=1, 36 ) 37 legacy_prefix: Optional[str] = Field( 38 title="Legacy Prefix", 39 description="The path prefix configured in v3 versions of the S3 connector. This option is deprecated in favor of a single glob.", 40 airbyte_hidden=True, 41 ) 42 validation_policy: ValidationPolicy = Field( 43 title="Validation Policy", 44 description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.", 45 default=ValidationPolicy.emit_record, 46 ) 47 input_schema: Optional[str] = Field( 48 title="Input Schema", 49 description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.", 50 ) 51 primary_key: Optional[str] = Field( 52 title="Primary Key", 53 description="The column or columns (for a composite key) that serves as the unique identifier of a record. If empty, the primary key will default to the parser's default primary key.", 54 airbyte_hidden=True, # Users can create/modify primary keys in the connection configuration so we shouldn't duplicate it here. 55 ) 56 days_to_sync_if_history_is_full: int = Field( 57 title="Days To Sync If History Is Full", 58 description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.", 59 default=3, 60 ) 61 format: Union[ 62 AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat 63 ] = Field( 64 title="Format", 65 description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", 66 ) 67 schemaless: bool = Field( 68 title="Schemaless", 69 description="When enabled, syncs will not validate or structure records against the stream's schema.", 70 default=False, 71 ) 72 recent_n_files_to_read_for_schema_discovery: Optional[int] = Field( 73 title="Files To Read For Schema Discover", 74 description="The number of resent files which will be used to discover the schema for this stream.", 75 default=None, 76 gt=0, 77 ) 78 use_first_found_file_for_schema_discovery: bool = Field( 79 title="Use First Found File For Schema Discover", 80 description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step.", 81 default=False, 82 ) 83 84 @validator("input_schema", pre=True) 85 def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: 86 if v: 87 if type_mapping_to_jsonschema(v): 88 return v 89 else: 90 raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA) 91 return None 92 93 @root_validator 94 def validate_discovery_related_fields(cls, values: Dict[str, Any]) -> Dict[str, Any]: 95 """ 96 Please update this validation when new related to schema discovery field is added. 97 Validates schema discovery options compatibility. 98 Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided. 99 So this method doesn't check it to do not break already created connections. 100 If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided, 101 recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default. 102 """ 103 input_schema = values["input_schema"] is not None 104 schemaless = values["schemaless"] 105 recent_n_files_to_read_for_schema_discovery = ( 106 values["recent_n_files_to_read_for_schema_discovery"] is not None 107 ) 108 use_first_found_file_for_schema_discovery = values[ 109 "use_first_found_file_for_schema_discovery" 110 ] 111 112 if ( 113 recent_n_files_to_read_for_schema_discovery 114 and use_first_found_file_for_schema_discovery 115 ) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1: 116 raise ConfigValidationError( 117 FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS 118 ) 119 120 return values 121 122 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 123 """ 124 User defined input_schema is defined as a string in the config. This method takes the string representation 125 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 126 """ 127 if self.input_schema: 128 schema = type_mapping_to_jsonschema(self.input_schema) 129 if not schema: 130 raise ValueError( 131 f"Unable to create JSON schema from input schema {self.input_schema}" 132 ) 133 return schema 134 return None
validation_policy: ValidationPolicy
format: Union[airbyte_cdk.sources.file_based.config.avro_format.AvroFormat, airbyte_cdk.sources.file_based.CsvFormat, airbyte_cdk.sources.file_based.JsonlFormat, airbyte_cdk.sources.file_based.config.parquet_format.ParquetFormat, airbyte_cdk.sources.file_based.config.unstructured_format.UnstructuredFormat, airbyte_cdk.sources.file_based.config.excel_format.ExcelFormat]
@validator('input_schema', pre=True)
def
validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
def
get_input_schema(self) -> Optional[Mapping[str, Any]]:
122 def get_input_schema(self) -> Optional[Mapping[str, Any]]: 123 """ 124 User defined input_schema is defined as a string in the config. This method takes the string representation 125 and converts it into a Mapping[str, Any] which is used by file-based CDK components. 126 """ 127 if self.input_schema: 128 schema = type_mapping_to_jsonschema(self.input_schema) 129 if not schema: 130 raise ValueError( 131 f"Unable to create JSON schema from input schema {self.input_schema}" 132 ) 133 return schema 134 return None
User defined input_schema is defined as a string in the config. This method takes the string representation and converts it into a Mapping[str, Any] which is used by file-based CDK components.