airbyte.caches.snowflake

A Snowflake implementation of the PyAirbyte cache.

Usage Example

from airbyte as ab
from airbyte.caches import SnowflakeCache

cache = SnowflakeCache(
    account="myaccount",
    username="myusername",
    password=ab.get_secret("SNOWFLAKE_PASSWORD"),
    warehouse="mywarehouse",
    database="mydatabase",
    role="myrole",
    schema_name="myschema",
)
 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2"""A Snowflake implementation of the PyAirbyte cache.
 3
 4## Usage Example
 5
 6```python
 7from airbyte as ab
 8from airbyte.caches import SnowflakeCache
 9
10cache = SnowflakeCache(
11    account="myaccount",
12    username="myusername",
13    password=ab.get_secret("SNOWFLAKE_PASSWORD"),
14    warehouse="mywarehouse",
15    database="mydatabase",
16    role="myrole",
17    schema_name="myschema",
18)
19```
20"""
21
22from __future__ import annotations
23
24from pydantic import PrivateAttr
25
26from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor
27from airbyte.caches.base import CacheBase
28from airbyte.shared.sql_processor import RecordDedupeMode
29
30
31class SnowflakeCache(SnowflakeConfig, CacheBase):
32    """Configuration for the Snowflake cache."""
33
34    dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND
35
36    _sql_processor_class = PrivateAttr(default=SnowflakeSqlProcessor)
37
38
39# Expose the Cache class and also the Config class.
40__all__ = [
41    "SnowflakeCache",
42    "SnowflakeConfig",
43]
32class SnowflakeCache(SnowflakeConfig, CacheBase):
33    """Configuration for the Snowflake cache."""
34
35    dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND
36
37    _sql_processor_class = PrivateAttr(default=SnowflakeSqlProcessor)

Configuration for the Snowflake cache.

dedupe_mode: airbyte.shared.sql_processor.RecordDedupeMode
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'account': FieldInfo(annotation=str, required=True), 'username': FieldInfo(annotation=str, required=True), 'password': FieldInfo(annotation=SecretString, required=True), 'warehouse': FieldInfo(annotation=str, required=True), 'database': FieldInfo(annotation=str, required=True), 'role': FieldInfo(annotation=str, required=True), 'data_retention_time_in_days': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'dedupe_mode': FieldInfo(annotation=RecordDedupeMode, required=False, default=<RecordDedupeMode.APPEND: 'append'>)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
SnowflakeConfig
account
username
password
warehouse
database
role
schema_name
data_retention_time_in_days
get_create_table_extra_clauses
get_database_name
get_sql_alchemy_url
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class SnowflakeConfig(airbyte.shared.sql_processor.SqlConfig):
36class SnowflakeConfig(SqlConfig):
37    """Configuration for the Snowflake cache."""
38
39    account: str
40    username: str
41    password: SecretString
42    warehouse: str
43    database: str
44    role: str
45    schema_name: str = Field(default=DEFAULT_CACHE_SCHEMA_NAME)
46    data_retention_time_in_days: int | None = None
47
48    @overrides
49    def get_create_table_extra_clauses(self) -> list[str]:
50        """Return a list of clauses to append on CREATE TABLE statements."""
51        clauses = []
52
53        if self.data_retention_time_in_days is not None:
54            clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}")
55
56        return clauses
57
58    @overrides
59    def get_database_name(self) -> str:
60        """Return the name of the database."""
61        return self.database
62
63    @overrides
64    def get_sql_alchemy_url(self) -> SecretString:
65        """Return the SQLAlchemy URL to use."""
66        return SecretString(
67            URL(
68                account=self.account,
69                user=self.username,
70                password=self.password,
71                database=self.database,
72                warehouse=self.warehouse,
73                schema=self.schema_name,
74                role=self.role,
75            )
76        )
77
78    def get_vendor_client(self) -> object:
79        """Return the Snowflake connection object."""
80        return connector.connect(
81            user=self.username,
82            password=self.password,
83            account=self.account,
84            warehouse=self.warehouse,
85            database=self.database,
86            schema=self.schema_name,
87            role=self.role,
88        )

Configuration for the Snowflake cache.

account: str
username: str
warehouse: str
database: str
role: str
schema_name: str

The name of the schema to write to.

data_retention_time_in_days: int | None
@overrides
def get_create_table_extra_clauses(self) -> list[str]:
48    @overrides
49    def get_create_table_extra_clauses(self) -> list[str]:
50        """Return a list of clauses to append on CREATE TABLE statements."""
51        clauses = []
52
53        if self.data_retention_time_in_days is not None:
54            clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}")
55
56        return clauses

Return a list of clauses to append on CREATE TABLE statements.

@overrides
def get_database_name(self) -> str:
58    @overrides
59    def get_database_name(self) -> str:
60        """Return the name of the database."""
61        return self.database

Return the name of the database.

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
63    @overrides
64    def get_sql_alchemy_url(self) -> SecretString:
65        """Return the SQLAlchemy URL to use."""
66        return SecretString(
67            URL(
68                account=self.account,
69                user=self.username,
70                password=self.password,
71                database=self.database,
72                warehouse=self.warehouse,
73                schema=self.schema_name,
74                role=self.role,
75            )
76        )

Return the SQLAlchemy URL to use.

def get_vendor_client(self) -> object:
78    def get_vendor_client(self) -> object:
79        """Return the Snowflake connection object."""
80        return connector.connect(
81            user=self.username,
82            password=self.password,
83            account=self.account,
84            warehouse=self.warehouse,
85            database=self.database,
86            schema=self.schema_name,
87            role=self.role,
88        )

Return the Snowflake connection object.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'account': FieldInfo(annotation=str, required=True), 'username': FieldInfo(annotation=str, required=True), 'password': FieldInfo(annotation=SecretString, required=True), 'warehouse': FieldInfo(annotation=str, required=True), 'database': FieldInfo(annotation=str, required=True), 'role': FieldInfo(annotation=str, required=True), 'data_retention_time_in_days': FieldInfo(annotation=Union[int, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_sql_engine