airbyte.caches.snowflake

A Snowflake implementation of the PyAirbyte cache.

Usage Example

Password connection:

from airbyte as ab
from airbyte.caches import SnowflakeCache

cache = SnowflakeCache(
    account="myaccount",
    username="myusername",
    password=ab.get_secret("SNOWFLAKE_PASSWORD"), # optional
    warehouse="mywarehouse",
    database="mydatabase",
    role="myrole",
    schema_name="myschema",
)

Private key connection:

from airbyte as ab
from airbyte.caches import SnowflakeCache

cache = SnowflakeCache(
    account="myaccount",
    username="myusername",
    private_key=ab.get_secret("SNOWFLAKE_PRIVATE_KEY"),
    private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional
    warehouse="mywarehouse",
    database="mydatabase",
    role="myrole",
    schema_name="myschema",
)

Private key path connection:

from airbyte as ab
from airbyte.caches import SnowflakeCache

cache = SnowflakeCache(
    account="myaccount",
    username="myusername",
    private_key_path="path/to/my/private_key.pem",
    private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional
    warehouse="mywarehouse",
    database="mydatabase",
    role="myrole",
    schema_name="myschema",
)
 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2"""A Snowflake implementation of the PyAirbyte cache.
 3
 4## Usage Example
 5
 6# Password connection:
 7
 8```python
 9from airbyte as ab
10from airbyte.caches import SnowflakeCache
11
12cache = SnowflakeCache(
13    account="myaccount",
14    username="myusername",
15    password=ab.get_secret("SNOWFLAKE_PASSWORD"), # optional
16    warehouse="mywarehouse",
17    database="mydatabase",
18    role="myrole",
19    schema_name="myschema",
20)
21```
22
23# Private key connection:
24
25```python
26from airbyte as ab
27from airbyte.caches import SnowflakeCache
28
29cache = SnowflakeCache(
30    account="myaccount",
31    username="myusername",
32    private_key=ab.get_secret("SNOWFLAKE_PRIVATE_KEY"),
33    private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional
34    warehouse="mywarehouse",
35    database="mydatabase",
36    role="myrole",
37    schema_name="myschema",
38)
39```
40
41# Private key path connection:
42
43```python
44from airbyte as ab
45from airbyte.caches import SnowflakeCache
46
47cache = SnowflakeCache(
48    account="myaccount",
49    username="myusername",
50    private_key_path="path/to/my/private_key.pem",
51    private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional
52    warehouse="mywarehouse",
53    database="mydatabase",
54    role="myrole",
55    schema_name="myschema",
56)
57```
58"""
59
60from __future__ import annotations
61
62from typing import ClassVar
63
64from airbyte_api.models import DestinationSnowflake
65
66from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor
67from airbyte.caches.base import CacheBase
68from airbyte.destinations._translate_cache_to_dest import (
69    snowflake_cache_to_destination_configuration,
70)
71from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase
72
73
74class SnowflakeCache(SnowflakeConfig, CacheBase):
75    """Configuration for the Snowflake cache."""
76
77    dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND
78
79    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor
80
81    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
82    paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake
83
84    @property
85    def paired_destination_config(self) -> DestinationSnowflake:
86        """Return a dictionary of destination configuration values."""
87        return snowflake_cache_to_destination_configuration(cache=self)
88
89
90# Expose the Cache class and also the Config class.
91__all__ = [
92    "SnowflakeCache",
93    "SnowflakeConfig",
94]
75class SnowflakeCache(SnowflakeConfig, CacheBase):
76    """Configuration for the Snowflake cache."""
77
78    dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND
79
80    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor
81
82    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
83    paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake
84
85    @property
86    def paired_destination_config(self) -> DestinationSnowflake:
87        """Return a dictionary of destination configuration values."""
88        return snowflake_cache_to_destination_configuration(cache=self)

Configuration for the Snowflake cache.

dedupe_mode: airbyte.shared.sql_processor.RecordDedupeMode
paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_snowflake.DestinationSnowflake'>
paired_destination_config: airbyte_api.models.destination_snowflake.DestinationSnowflake
85    @property
86    def paired_destination_config(self) -> DestinationSnowflake:
87        """Return a dictionary of destination configuration values."""
88        return snowflake_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
create_source_tables
SnowflakeConfig
account
username
password
private_key
private_key_path
private_key_passphrase
warehouse
database
role
schema_name
data_retention_time_in_days
get_sql_alchemy_connect_args
get_create_table_extra_clauses
get_database_name
get_sql_alchemy_url
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class SnowflakeConfig(airbyte.shared.sql_processor.SqlConfig):
 37class SnowflakeConfig(SqlConfig):
 38    """Configuration for the Snowflake cache."""
 39
 40    account: str
 41    username: str
 42    password: SecretString | None = None
 43
 44    private_key: SecretString | None = None
 45    private_key_path: str | None = None
 46    private_key_passphrase: SecretString | None = None
 47
 48    warehouse: str
 49    database: str
 50    role: str
 51    schema_name: str = Field(default=DEFAULT_CACHE_SCHEMA_NAME)
 52    data_retention_time_in_days: int | None = None
 53
 54    def _validate_authentication_config(self) -> None:
 55        """Validate that authentication configuration is correct."""
 56        auth_methods = {
 57            "password": self.password is not None,
 58            "private_key": self.private_key is not None,
 59            "private_key_path": self.private_key_path is not None,
 60        }
 61        has_passphrase = self.private_key_passphrase is not None
 62        primary_auth_count = sum(auth_methods.values())
 63
 64        if primary_auth_count == 0:
 65            if has_passphrase:
 66                raise ValueError(
 67                    "You have to provide a primary authentication method "
 68                    "if you want to use a private key passphrase."
 69                )
 70            return
 71
 72        if primary_auth_count > 1:
 73            provided_methods = [method for method, has_method in auth_methods.items() if has_method]
 74            raise ValueError(
 75                f"Multiple primary authentication methods provided: {', '.join(provided_methods)}. "
 76                "Please provide only one of: 'password', 'private_key', or 'private_key_path'."
 77            )
 78
 79        if has_passphrase and auth_methods["password"]:
 80            raise ValueError(
 81                "private_key_passphrase cannot be used with password authentication. "
 82                "It can only be used with 'private_key' or 'private_key_path'."
 83            )
 84
 85    def _get_private_key_content(self) -> bytes:
 86        """Get the private key content from either private_key or private_key_path."""
 87        if self.private_key:
 88            return str(self.private_key).encode("utf-8")
 89        if self.private_key_path:
 90            return Path(self.private_key_path).read_bytes()
 91        raise ValueError("No private key provided")
 92
 93    def _get_private_key_bytes(self) -> bytes:
 94        private_key_content = self._get_private_key_content()
 95
 96        passphrase = None
 97        if self.private_key_passphrase:
 98            passphrase = str(self.private_key_passphrase).encode("utf-8")
 99
100        private_key = serialization.load_pem_private_key(
101            private_key_content,
102            password=passphrase,
103            backend=default_backend(),
104        )
105
106        return private_key.private_bytes(
107            encoding=serialization.Encoding.DER,
108            format=serialization.PrivateFormat.PKCS8,
109            encryption_algorithm=serialization.NoEncryption(),
110        )
111
112    @overrides
113    def get_sql_alchemy_connect_args(self) -> dict[str, Any]:
114        """Return the SQL Alchemy connect_args."""
115        if self.private_key is None and self.private_key_path is None:
116            return {}
117        return {"private_key": self._get_private_key_bytes()}
118
119    @overrides
120    def get_create_table_extra_clauses(self) -> list[str]:
121        """Return a list of clauses to append on CREATE TABLE statements."""
122        clauses = []
123
124        if self.data_retention_time_in_days is not None:
125            clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}")
126
127        return clauses
128
129    @overrides
130    def get_database_name(self) -> str:
131        """Return the name of the database."""
132        return self.database
133
134    @overrides
135    def get_sql_alchemy_url(self) -> SecretString:
136        """Return the SQLAlchemy URL to use."""
137        self._validate_authentication_config()
138
139        config = {
140            "account": self.account,
141            "user": self.username,
142            "database": self.database,
143            "warehouse": self.warehouse,
144            "schema": self.schema_name,
145            "role": self.role,
146        }
147        # password is absent when using key pair authentication
148        if self.password:
149            config["password"] = self.password
150        return SecretString(URL(**config))
151
152    def get_vendor_client(self) -> object:
153        """Return the Snowflake connection object."""
154        self._validate_authentication_config()
155
156        connection_config: dict[str, Any] = {
157            "user": self.username,
158            "account": self.account,
159            "warehouse": self.warehouse,
160            "database": self.database,
161            "schema": self.schema_name,
162            "role": self.role,
163        }
164
165        if self.password:
166            connection_config["password"] = self.password
167        elif self.private_key_path:
168            connection_config["private_key_file"] = self.private_key_path
169            if self.private_key_passphrase:
170                connection_config["private_key_file_pwd"] = self.private_key_passphrase
171            connection_config["authenticator"] = "SNOWFLAKE_JWT"
172        else:
173            connection_config["private_key"] = self._get_private_key_bytes()
174            connection_config["authenticator"] = "SNOWFLAKE_JWT"
175
176        return connector.connect(**connection_config)

Configuration for the Snowflake cache.

account: str
username: str
password: airbyte.secrets.SecretString | None
private_key: airbyte.secrets.SecretString | None
private_key_path: str | None
private_key_passphrase: airbyte.secrets.SecretString | None
warehouse: str
database: str
role: str
schema_name: str

The name of the schema to write to.

data_retention_time_in_days: int | None
@overrides
def get_sql_alchemy_connect_args(self) -> dict[str, typing.Any]:
112    @overrides
113    def get_sql_alchemy_connect_args(self) -> dict[str, Any]:
114        """Return the SQL Alchemy connect_args."""
115        if self.private_key is None and self.private_key_path is None:
116            return {}
117        return {"private_key": self._get_private_key_bytes()}

Return the SQL Alchemy connect_args.

@overrides
def get_create_table_extra_clauses(self) -> list[str]:
119    @overrides
120    def get_create_table_extra_clauses(self) -> list[str]:
121        """Return a list of clauses to append on CREATE TABLE statements."""
122        clauses = []
123
124        if self.data_retention_time_in_days is not None:
125            clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}")
126
127        return clauses

Return a list of clauses to append on CREATE TABLE statements.

@overrides
def get_database_name(self) -> str:
129    @overrides
130    def get_database_name(self) -> str:
131        """Return the name of the database."""
132        return self.database

Return the name of the database.

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
134    @overrides
135    def get_sql_alchemy_url(self) -> SecretString:
136        """Return the SQLAlchemy URL to use."""
137        self._validate_authentication_config()
138
139        config = {
140            "account": self.account,
141            "user": self.username,
142            "database": self.database,
143            "warehouse": self.warehouse,
144            "schema": self.schema_name,
145            "role": self.role,
146        }
147        # password is absent when using key pair authentication
148        if self.password:
149            config["password"] = self.password
150        return SecretString(URL(**config))

Return the SQLAlchemy URL to use.

def get_vendor_client(self) -> object:
152    def get_vendor_client(self) -> object:
153        """Return the Snowflake connection object."""
154        self._validate_authentication_config()
155
156        connection_config: dict[str, Any] = {
157            "user": self.username,
158            "account": self.account,
159            "warehouse": self.warehouse,
160            "database": self.database,
161            "schema": self.schema_name,
162            "role": self.role,
163        }
164
165        if self.password:
166            connection_config["password"] = self.password
167        elif self.private_key_path:
168            connection_config["private_key_file"] = self.private_key_path
169            if self.private_key_passphrase:
170                connection_config["private_key_file_pwd"] = self.private_key_passphrase
171            connection_config["authenticator"] = "SNOWFLAKE_JWT"
172        else:
173            connection_config["private_key"] = self._get_private_key_bytes()
174            connection_config["authenticator"] = "SNOWFLAKE_JWT"
175
176        return connector.connect(**connection_config)

Return the Snowflake connection object.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_sql_engine