airbyte_cdk.sources.file_based.config.csv_format

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import codecs
  6from enum import Enum
  7from typing import Any, Dict, List, Optional, Set, Union
  8
  9from pydantic.v1 import BaseModel, Field, root_validator, validator
 10from pydantic.v1.error_wrappers import ValidationError
 11
 12from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig
 13
 14
 15class InferenceType(Enum):
 16    NONE = "None"
 17    PRIMITIVE_TYPES_ONLY = "Primitive Types Only"
 18
 19
 20class CsvHeaderDefinitionType(Enum):
 21    FROM_CSV = "From CSV"
 22    AUTOGENERATED = "Autogenerated"
 23    USER_PROVIDED = "User Provided"
 24
 25
 26class CsvHeaderFromCsv(BaseModel):
 27    class Config(OneOfOptionConfig):
 28        title = "From CSV"
 29        discriminator = "header_definition_type"
 30
 31    header_definition_type: str = Field(
 32        CsvHeaderDefinitionType.FROM_CSV.value,
 33        const=True,
 34    )
 35
 36    def has_header_row(self) -> bool:
 37        return True
 38
 39
 40class CsvHeaderAutogenerated(BaseModel):
 41    class Config(OneOfOptionConfig):
 42        title = "Autogenerated"
 43        discriminator = "header_definition_type"
 44
 45    header_definition_type: str = Field(
 46        CsvHeaderDefinitionType.AUTOGENERATED.value,
 47        const=True,
 48    )
 49
 50    def has_header_row(self) -> bool:
 51        return False
 52
 53
 54class CsvHeaderUserProvided(BaseModel):
 55    class Config(OneOfOptionConfig):
 56        title = "User Provided"
 57        discriminator = "header_definition_type"
 58
 59    header_definition_type: str = Field(
 60        CsvHeaderDefinitionType.USER_PROVIDED.value,
 61        const=True,
 62    )
 63    column_names: List[str] = Field(
 64        title="Column Names",
 65        description="The column names that will be used while emitting the CSV records",
 66    )
 67
 68    def has_header_row(self) -> bool:
 69        return False
 70
 71    @validator("column_names")
 72    def validate_column_names(cls, v: List[str]) -> List[str]:
 73        if not v:
 74            raise ValueError(
 75                "At least one column name needs to be provided when using user provided headers"
 76            )
 77        return v
 78
 79
 80DEFAULT_TRUE_VALUES = ["y", "yes", "t", "true", "on", "1"]
 81DEFAULT_FALSE_VALUES = ["n", "no", "f", "false", "off", "0"]
 82
 83
 84class CsvFormat(BaseModel):
 85    class Config(OneOfOptionConfig):
 86        title = "CSV Format"
 87        discriminator = "filetype"
 88
 89    filetype: str = Field(
 90        "csv",
 91        const=True,
 92    )
 93    delimiter: str = Field(
 94        title="Delimiter",
 95        description="The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
 96        default=",",
 97    )
 98    quote_char: str = Field(
 99        title="Quote Character",
100        default='"',
101        description="The character used for quoting CSV values. To disallow quoting, make this field blank.",
102    )
103    escape_char: Optional[str] = Field(
104        title="Escape Character",
105        default=None,
106        description="The character used for escaping special characters. To disallow escaping, leave this field blank.",
107    )
108    encoding: Optional[str] = Field(
109        default="utf8",
110        description='The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href="https://docs.python.org/3/library/codecs.html#standard-encodings" target="_blank">list of python encodings</a> for allowable options.',
111    )
112    double_quote: bool = Field(
113        title="Double Quote",
114        default=True,
115        description="Whether two quotes in a quoted CSV value denote a single quote in the data.",
116    )
117    null_values: Set[str] = Field(
118        title="Null Values",
119        default=[],
120        description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
121    )
122    strings_can_be_null: bool = Field(
123        title="Strings Can Be Null",
124        default=True,
125        description="Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.",
126    )
127    skip_rows_before_header: int = Field(
128        title="Skip Rows Before Header",
129        default=0,
130        description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
131    )
132    skip_rows_after_header: int = Field(
133        title="Skip Rows After Header",
134        default=0,
135        description="The number of rows to skip after the header row.",
136    )
137    header_definition: Union[CsvHeaderFromCsv, CsvHeaderAutogenerated, CsvHeaderUserProvided] = (
138        Field(
139            title="CSV Header Definition",
140            default=CsvHeaderFromCsv(header_definition_type=CsvHeaderDefinitionType.FROM_CSV.value),
141            description="How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.",
142        )
143    )
144    true_values: Set[str] = Field(
145        title="True Values",
146        default=DEFAULT_TRUE_VALUES,
147        description="A set of case-sensitive strings that should be interpreted as true values.",
148    )
149    false_values: Set[str] = Field(
150        title="False Values",
151        default=DEFAULT_FALSE_VALUES,
152        description="A set of case-sensitive strings that should be interpreted as false values.",
153    )
154    inference_type: InferenceType = Field(
155        title="Inference Type",
156        default=InferenceType.NONE,
157        description="How to infer the types of the columns. If none, inference default to strings.",
158        airbyte_hidden=True,
159    )
160    ignore_errors_on_fields_mismatch: bool = Field(
161        title="Ignore errors on field mismatch",
162        default=False,
163        description="Whether to ignore errors that occur when the number of fields in the CSV does not match the number of columns in the schema.",
164    )
165
166    @validator("delimiter")
167    def validate_delimiter(cls, v: str) -> str:
168        if v == r"\t":
169            v = "\t"
170        if len(v) != 1:
171            raise ValueError("delimiter should only be one character")
172        if v in {"\r", "\n"}:
173            raise ValueError(f"delimiter cannot be {v}")
174        return v
175
176    @validator("quote_char")
177    def validate_quote_char(cls, v: str) -> str:
178        if len(v) != 1:
179            raise ValueError("quote_char should only be one character")
180        return v
181
182    @validator("escape_char")
183    def validate_escape_char(cls, v: str) -> str:
184        if v is not None and len(v) != 1:
185            raise ValueError("escape_char should only be one character")
186        return v
187
188    @validator("encoding")
189    def validate_encoding(cls, v: str) -> str:
190        try:
191            codecs.lookup(v)
192        except LookupError:
193            raise ValueError(f"invalid encoding format: {v}")
194        return v
195
196    @root_validator
197    def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
198        definition_type = values.get("header_definition_type")
199        column_names = values.get("user_provided_column_names")
200        if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names:
201            raise ValidationError(
202                "`user_provided_column_names` should be defined if the definition 'User Provided'.",
203                model=CsvFormat,
204            )
205        if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names:
206            raise ValidationError(
207                "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.",
208                model=CsvFormat,
209            )
210        return values
class InferenceType(enum.Enum):
16class InferenceType(Enum):
17    NONE = "None"
18    PRIMITIVE_TYPES_ONLY = "Primitive Types Only"

An enumeration.

NONE = <InferenceType.NONE: 'None'>
PRIMITIVE_TYPES_ONLY = <InferenceType.PRIMITIVE_TYPES_ONLY: 'Primitive Types Only'>
class CsvHeaderDefinitionType(enum.Enum):
21class CsvHeaderDefinitionType(Enum):
22    FROM_CSV = "From CSV"
23    AUTOGENERATED = "Autogenerated"
24    USER_PROVIDED = "User Provided"

An enumeration.

FROM_CSV = <CsvHeaderDefinitionType.FROM_CSV: 'From CSV'>
AUTOGENERATED = <CsvHeaderDefinitionType.AUTOGENERATED: 'Autogenerated'>
USER_PROVIDED = <CsvHeaderDefinitionType.USER_PROVIDED: 'User Provided'>
class CsvHeaderFromCsv(pydantic.v1.main.BaseModel):
27class CsvHeaderFromCsv(BaseModel):
28    class Config(OneOfOptionConfig):
29        title = "From CSV"
30        discriminator = "header_definition_type"
31
32    header_definition_type: str = Field(
33        CsvHeaderDefinitionType.FROM_CSV.value,
34        const=True,
35    )
36
37    def has_header_row(self) -> bool:
38        return True
header_definition_type: str
def has_header_row(self) -> bool:
37    def has_header_row(self) -> bool:
38        return True
class CsvHeaderFromCsv.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
28    class Config(OneOfOptionConfig):
29        title = "From CSV"
30        discriminator = "header_definition_type"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'From CSV'
discriminator = 'header_definition_type'
class CsvHeaderAutogenerated(pydantic.v1.main.BaseModel):
41class CsvHeaderAutogenerated(BaseModel):
42    class Config(OneOfOptionConfig):
43        title = "Autogenerated"
44        discriminator = "header_definition_type"
45
46    header_definition_type: str = Field(
47        CsvHeaderDefinitionType.AUTOGENERATED.value,
48        const=True,
49    )
50
51    def has_header_row(self) -> bool:
52        return False
header_definition_type: str
def has_header_row(self) -> bool:
51    def has_header_row(self) -> bool:
52        return False
class CsvHeaderAutogenerated.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
42    class Config(OneOfOptionConfig):
43        title = "Autogenerated"
44        discriminator = "header_definition_type"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'Autogenerated'
discriminator = 'header_definition_type'
class CsvHeaderUserProvided(pydantic.v1.main.BaseModel):
55class CsvHeaderUserProvided(BaseModel):
56    class Config(OneOfOptionConfig):
57        title = "User Provided"
58        discriminator = "header_definition_type"
59
60    header_definition_type: str = Field(
61        CsvHeaderDefinitionType.USER_PROVIDED.value,
62        const=True,
63    )
64    column_names: List[str] = Field(
65        title="Column Names",
66        description="The column names that will be used while emitting the CSV records",
67    )
68
69    def has_header_row(self) -> bool:
70        return False
71
72    @validator("column_names")
73    def validate_column_names(cls, v: List[str]) -> List[str]:
74        if not v:
75            raise ValueError(
76                "At least one column name needs to be provided when using user provided headers"
77            )
78        return v
header_definition_type: str
column_names: List[str]
def has_header_row(self) -> bool:
69    def has_header_row(self) -> bool:
70        return False
@validator('column_names')
def validate_column_names(cls, v: List[str]) -> List[str]:
72    @validator("column_names")
73    def validate_column_names(cls, v: List[str]) -> List[str]:
74        if not v:
75            raise ValueError(
76                "At least one column name needs to be provided when using user provided headers"
77            )
78        return v
class CsvHeaderUserProvided.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
56    class Config(OneOfOptionConfig):
57        title = "User Provided"
58        discriminator = "header_definition_type"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'User Provided'
discriminator = 'header_definition_type'
DEFAULT_TRUE_VALUES = ['y', 'yes', 't', 'true', 'on', '1']
DEFAULT_FALSE_VALUES = ['n', 'no', 'f', 'false', 'off', '0']
class CsvFormat(pydantic.v1.main.BaseModel):
 85class CsvFormat(BaseModel):
 86    class Config(OneOfOptionConfig):
 87        title = "CSV Format"
 88        discriminator = "filetype"
 89
 90    filetype: str = Field(
 91        "csv",
 92        const=True,
 93    )
 94    delimiter: str = Field(
 95        title="Delimiter",
 96        description="The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
 97        default=",",
 98    )
 99    quote_char: str = Field(
100        title="Quote Character",
101        default='"',
102        description="The character used for quoting CSV values. To disallow quoting, make this field blank.",
103    )
104    escape_char: Optional[str] = Field(
105        title="Escape Character",
106        default=None,
107        description="The character used for escaping special characters. To disallow escaping, leave this field blank.",
108    )
109    encoding: Optional[str] = Field(
110        default="utf8",
111        description='The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href="https://docs.python.org/3/library/codecs.html#standard-encodings" target="_blank">list of python encodings</a> for allowable options.',
112    )
113    double_quote: bool = Field(
114        title="Double Quote",
115        default=True,
116        description="Whether two quotes in a quoted CSV value denote a single quote in the data.",
117    )
118    null_values: Set[str] = Field(
119        title="Null Values",
120        default=[],
121        description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
122    )
123    strings_can_be_null: bool = Field(
124        title="Strings Can Be Null",
125        default=True,
126        description="Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.",
127    )
128    skip_rows_before_header: int = Field(
129        title="Skip Rows Before Header",
130        default=0,
131        description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
132    )
133    skip_rows_after_header: int = Field(
134        title="Skip Rows After Header",
135        default=0,
136        description="The number of rows to skip after the header row.",
137    )
138    header_definition: Union[CsvHeaderFromCsv, CsvHeaderAutogenerated, CsvHeaderUserProvided] = (
139        Field(
140            title="CSV Header Definition",
141            default=CsvHeaderFromCsv(header_definition_type=CsvHeaderDefinitionType.FROM_CSV.value),
142            description="How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.",
143        )
144    )
145    true_values: Set[str] = Field(
146        title="True Values",
147        default=DEFAULT_TRUE_VALUES,
148        description="A set of case-sensitive strings that should be interpreted as true values.",
149    )
150    false_values: Set[str] = Field(
151        title="False Values",
152        default=DEFAULT_FALSE_VALUES,
153        description="A set of case-sensitive strings that should be interpreted as false values.",
154    )
155    inference_type: InferenceType = Field(
156        title="Inference Type",
157        default=InferenceType.NONE,
158        description="How to infer the types of the columns. If none, inference default to strings.",
159        airbyte_hidden=True,
160    )
161    ignore_errors_on_fields_mismatch: bool = Field(
162        title="Ignore errors on field mismatch",
163        default=False,
164        description="Whether to ignore errors that occur when the number of fields in the CSV does not match the number of columns in the schema.",
165    )
166
167    @validator("delimiter")
168    def validate_delimiter(cls, v: str) -> str:
169        if v == r"\t":
170            v = "\t"
171        if len(v) != 1:
172            raise ValueError("delimiter should only be one character")
173        if v in {"\r", "\n"}:
174            raise ValueError(f"delimiter cannot be {v}")
175        return v
176
177    @validator("quote_char")
178    def validate_quote_char(cls, v: str) -> str:
179        if len(v) != 1:
180            raise ValueError("quote_char should only be one character")
181        return v
182
183    @validator("escape_char")
184    def validate_escape_char(cls, v: str) -> str:
185        if v is not None and len(v) != 1:
186            raise ValueError("escape_char should only be one character")
187        return v
188
189    @validator("encoding")
190    def validate_encoding(cls, v: str) -> str:
191        try:
192            codecs.lookup(v)
193        except LookupError:
194            raise ValueError(f"invalid encoding format: {v}")
195        return v
196
197    @root_validator
198    def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
199        definition_type = values.get("header_definition_type")
200        column_names = values.get("user_provided_column_names")
201        if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names:
202            raise ValidationError(
203                "`user_provided_column_names` should be defined if the definition 'User Provided'.",
204                model=CsvFormat,
205            )
206        if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names:
207            raise ValidationError(
208                "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.",
209                model=CsvFormat,
210            )
211        return values
filetype: str
delimiter: str
quote_char: str
escape_char: Optional[str]
encoding: Optional[str]
double_quote: bool
null_values: Set[str]
strings_can_be_null: bool
skip_rows_before_header: int
skip_rows_after_header: int
true_values: Set[str]
false_values: Set[str]
inference_type: InferenceType
ignore_errors_on_fields_mismatch: bool
@validator('delimiter')
def validate_delimiter(cls, v: str) -> str:
167    @validator("delimiter")
168    def validate_delimiter(cls, v: str) -> str:
169        if v == r"\t":
170            v = "\t"
171        if len(v) != 1:
172            raise ValueError("delimiter should only be one character")
173        if v in {"\r", "\n"}:
174            raise ValueError(f"delimiter cannot be {v}")
175        return v
@validator('quote_char')
def validate_quote_char(cls, v: str) -> str:
177    @validator("quote_char")
178    def validate_quote_char(cls, v: str) -> str:
179        if len(v) != 1:
180            raise ValueError("quote_char should only be one character")
181        return v
@validator('escape_char')
def validate_escape_char(cls, v: str) -> str:
183    @validator("escape_char")
184    def validate_escape_char(cls, v: str) -> str:
185        if v is not None and len(v) != 1:
186            raise ValueError("escape_char should only be one character")
187        return v
@validator('encoding')
def validate_encoding(cls, v: str) -> str:
189    @validator("encoding")
190    def validate_encoding(cls, v: str) -> str:
191        try:
192            codecs.lookup(v)
193        except LookupError:
194            raise ValueError(f"invalid encoding format: {v}")
195        return v
@root_validator
def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
197    @root_validator
198    def validate_optional_args(cls, values: Dict[str, Any]) -> Dict[str, Any]:
199        definition_type = values.get("header_definition_type")
200        column_names = values.get("user_provided_column_names")
201        if definition_type == CsvHeaderDefinitionType.USER_PROVIDED and not column_names:
202            raise ValidationError(
203                "`user_provided_column_names` should be defined if the definition 'User Provided'.",
204                model=CsvFormat,
205            )
206        if definition_type != CsvHeaderDefinitionType.USER_PROVIDED and column_names:
207            raise ValidationError(
208                "`user_provided_column_names` should not be defined if the definition is not 'User Provided'.",
209                model=CsvFormat,
210            )
211        return values
class CsvFormat.Config(airbyte_cdk.utils.oneof_option_config.OneOfOptionConfig):
86    class Config(OneOfOptionConfig):
87        title = "CSV Format"
88        discriminator = "filetype"

Base class to configure a Pydantic model that's used as a oneOf option in a parent model in a way that's compatible with all Airbyte consumers.

Inherit from this class in the nested Config class in a model and set title and description (these show up in the UI) and discriminator (this is making sure it's marked as required in the schema).

Usage:
class OptionModel(BaseModel):
    mode: Literal["option_a"] = Field("option_a", const=True)
    option_a_field: str = Field(...)

    class Config(OneOfOptionConfig):
        title = "Option A"
        description = "Option A description"
        discriminator = "mode"
title = 'CSV Format'
discriminator = 'filetype'