airbyte.caches.snowflake

A Snowflake implementation of the PyAirbyte cache.

Usage Example

from airbyte as ab
from airbyte.caches import SnowflakeCache

cache = SnowflakeCache(
    account="myaccount",
    username="myusername",
    password=ab.get_secret("SNOWFLAKE_PASSWORD"),
    warehouse="mywarehouse",
    database="mydatabase",
    role="myrole",
    schema_name="myschema",
)
 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2"""A Snowflake implementation of the PyAirbyte cache.
 3
 4## Usage Example
 5
 6```python
 7from airbyte as ab
 8from airbyte.caches import SnowflakeCache
 9
10cache = SnowflakeCache(
11    account="myaccount",
12    username="myusername",
13    password=ab.get_secret("SNOWFLAKE_PASSWORD"),
14    warehouse="mywarehouse",
15    database="mydatabase",
16    role="myrole",
17    schema_name="myschema",
18)
19```
20"""
21
22from __future__ import annotations
23
24from typing import ClassVar
25
26from airbyte_api.models import DestinationSnowflake
27
28from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor
29from airbyte.caches.base import CacheBase
30from airbyte.destinations._translate_cache_to_dest import (
31    snowflake_cache_to_destination_configuration,
32)
33from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase
34
35
36class SnowflakeCache(SnowflakeConfig, CacheBase):
37    """Configuration for the Snowflake cache."""
38
39    dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND
40
41    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor
42
43    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
44    paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake
45
46    @property
47    def paired_destination_config(self) -> DestinationSnowflake:
48        """Return a dictionary of destination configuration values."""
49        return snowflake_cache_to_destination_configuration(cache=self)
50
51
52# Expose the Cache class and also the Config class.
53__all__ = [
54    "SnowflakeCache",
55    "SnowflakeConfig",
56]
37class SnowflakeCache(SnowflakeConfig, CacheBase):
38    """Configuration for the Snowflake cache."""
39
40    dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND
41
42    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor
43
44    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
45    paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake
46
47    @property
48    def paired_destination_config(self) -> DestinationSnowflake:
49        """Return a dictionary of destination configuration values."""
50        return snowflake_cache_to_destination_configuration(cache=self)

Configuration for the Snowflake cache.

dedupe_mode: airbyte.shared.sql_processor.RecordDedupeMode
paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_snowflake.DestinationSnowflake'>
paired_destination_config: airbyte_api.models.destination_snowflake.DestinationSnowflake
47    @property
48    def paired_destination_config(self) -> DestinationSnowflake:
49        """Return a dictionary of destination configuration values."""
50        return snowflake_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
SnowflakeConfig
account
username
password
warehouse
database
role
schema_name
data_retention_time_in_days
get_create_table_extra_clauses
get_database_name
get_sql_alchemy_url
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class SnowflakeConfig(airbyte.shared.sql_processor.SqlConfig):
36class SnowflakeConfig(SqlConfig):
37    """Configuration for the Snowflake cache."""
38
39    account: str
40    username: str
41    password: SecretString
42    warehouse: str
43    database: str
44    role: str
45    schema_name: str = Field(default=DEFAULT_CACHE_SCHEMA_NAME)
46    data_retention_time_in_days: int | None = None
47
48    @overrides
49    def get_create_table_extra_clauses(self) -> list[str]:
50        """Return a list of clauses to append on CREATE TABLE statements."""
51        clauses = []
52
53        if self.data_retention_time_in_days is not None:
54            clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}")
55
56        return clauses
57
58    @overrides
59    def get_database_name(self) -> str:
60        """Return the name of the database."""
61        return self.database
62
63    @overrides
64    def get_sql_alchemy_url(self) -> SecretString:
65        """Return the SQLAlchemy URL to use."""
66        return SecretString(
67            URL(
68                account=self.account,
69                user=self.username,
70                password=self.password,
71                database=self.database,
72                warehouse=self.warehouse,
73                schema=self.schema_name,
74                role=self.role,
75            )
76        )
77
78    def get_vendor_client(self) -> object:
79        """Return the Snowflake connection object."""
80        return connector.connect(
81            user=self.username,
82            password=self.password,
83            account=self.account,
84            warehouse=self.warehouse,
85            database=self.database,
86            schema=self.schema_name,
87            role=self.role,
88        )

Configuration for the Snowflake cache.

account: str
username: str
warehouse: str
database: str
role: str
schema_name: str

The name of the schema to write to.

data_retention_time_in_days: int | None
@overrides
def get_create_table_extra_clauses(self) -> list[str]:
48    @overrides
49    def get_create_table_extra_clauses(self) -> list[str]:
50        """Return a list of clauses to append on CREATE TABLE statements."""
51        clauses = []
52
53        if self.data_retention_time_in_days is not None:
54            clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}")
55
56        return clauses

Return a list of clauses to append on CREATE TABLE statements.

@overrides
def get_database_name(self) -> str:
58    @overrides
59    def get_database_name(self) -> str:
60        """Return the name of the database."""
61        return self.database

Return the name of the database.

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
63    @overrides
64    def get_sql_alchemy_url(self) -> SecretString:
65        """Return the SQLAlchemy URL to use."""
66        return SecretString(
67            URL(
68                account=self.account,
69                user=self.username,
70                password=self.password,
71                database=self.database,
72                warehouse=self.warehouse,
73                schema=self.schema_name,
74                role=self.role,
75            )
76        )

Return the SQLAlchemy URL to use.

def get_vendor_client(self) -> object:
78    def get_vendor_client(self) -> object:
79        """Return the Snowflake connection object."""
80        return connector.connect(
81            user=self.username,
82            password=self.password,
83            account=self.account,
84            warehouse=self.warehouse,
85            database=self.database,
86            schema=self.schema_name,
87            role=self.role,
88        )

Return the Snowflake connection object.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_sql_engine