airbyte.caches.snowflake
A Snowflake implementation of the PyAirbyte cache.
Usage Example
Password connection:
from airbyte as ab
from airbyte.caches import SnowflakeCache
cache = SnowflakeCache(
account="myaccount",
username="myusername",
password=ab.get_secret("SNOWFLAKE_PASSWORD"), # optional
warehouse="mywarehouse",
database="mydatabase",
role="myrole",
schema_name="myschema",
)
Private key connection:
from airbyte as ab
from airbyte.caches import SnowflakeCache
cache = SnowflakeCache(
account="myaccount",
username="myusername",
private_key=ab.get_secret("SNOWFLAKE_PRIVATE_KEY"),
private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional
warehouse="mywarehouse",
database="mydatabase",
role="myrole",
schema_name="myschema",
)
Private key path connection:
from airbyte as ab
from airbyte.caches import SnowflakeCache
cache = SnowflakeCache(
account="myaccount",
username="myusername",
private_key_path="path/to/my/private_key.pem",
private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional
warehouse="mywarehouse",
database="mydatabase",
role="myrole",
schema_name="myschema",
)
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""A Snowflake implementation of the PyAirbyte cache. 3 4## Usage Example 5 6# Password connection: 7 8```python 9from airbyte as ab 10from airbyte.caches import SnowflakeCache 11 12cache = SnowflakeCache( 13 account="myaccount", 14 username="myusername", 15 password=ab.get_secret("SNOWFLAKE_PASSWORD"), # optional 16 warehouse="mywarehouse", 17 database="mydatabase", 18 role="myrole", 19 schema_name="myschema", 20) 21``` 22 23# Private key connection: 24 25```python 26from airbyte as ab 27from airbyte.caches import SnowflakeCache 28 29cache = SnowflakeCache( 30 account="myaccount", 31 username="myusername", 32 private_key=ab.get_secret("SNOWFLAKE_PRIVATE_KEY"), 33 private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional 34 warehouse="mywarehouse", 35 database="mydatabase", 36 role="myrole", 37 schema_name="myschema", 38) 39``` 40 41# Private key path connection: 42 43```python 44from airbyte as ab 45from airbyte.caches import SnowflakeCache 46 47cache = SnowflakeCache( 48 account="myaccount", 49 username="myusername", 50 private_key_path="path/to/my/private_key.pem", 51 private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional 52 warehouse="mywarehouse", 53 database="mydatabase", 54 role="myrole", 55 schema_name="myschema", 56) 57``` 58""" 59 60from __future__ import annotations 61 62from typing import ClassVar 63 64from airbyte_api.models import DestinationSnowflake 65 66from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor 67from airbyte.caches.base import CacheBase 68from airbyte.destinations._translate_cache_to_dest import ( 69 snowflake_cache_to_destination_configuration, 70) 71from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase 72 73 74class SnowflakeCache(SnowflakeConfig, CacheBase): 75 """Configuration for the Snowflake cache.""" 76 77 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 78 79 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 80 81 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 82 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 83 84 @property 85 def paired_destination_config(self) -> DestinationSnowflake: 86 """Return a dictionary of destination configuration values.""" 87 return snowflake_cache_to_destination_configuration(cache=self) 88 89 90# Expose the Cache class and also the Config class. 91__all__ = [ 92 "SnowflakeCache", 93 "SnowflakeConfig", 94]
75class SnowflakeCache(SnowflakeConfig, CacheBase): 76 """Configuration for the Snowflake cache.""" 77 78 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 79 80 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 81 82 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 83 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 84 85 @property 86 def paired_destination_config(self) -> DestinationSnowflake: 87 """Return a dictionary of destination configuration values.""" 88 return snowflake_cache_to_destination_configuration(cache=self)
Configuration for the Snowflake cache.
paired_destination_config_class: ClassVar[type | None] =
<class 'airbyte_api.models.destination_snowflake.DestinationSnowflake'>
paired_destination_config: airbyte_api.models.destination_snowflake.DestinationSnowflake
85 @property 86 def paired_destination_config(self) -> DestinationSnowflake: 87 """Return a dictionary of destination configuration values.""" 88 return snowflake_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
model_config: ClassVar[pydantic.config.ConfigDict] =
{}
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
def
model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- SnowflakeConfig
- account
- username
- password
- private_key
- private_key_path
- private_key_passphrase
- warehouse
- database
- role
- schema_name
- data_retention_time_in_days
- get_sql_alchemy_connect_args
- get_create_table_extra_clauses
- get_database_name
- get_sql_alchemy_url
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
class
SnowflakeConfig(airbyte.shared.sql_processor.SqlConfig):
37class SnowflakeConfig(SqlConfig): 38 """Configuration for the Snowflake cache.""" 39 40 account: str 41 username: str 42 password: SecretString | None = None 43 44 private_key: SecretString | None = None 45 private_key_path: str | None = None 46 private_key_passphrase: SecretString | None = None 47 48 warehouse: str 49 database: str 50 role: str 51 schema_name: str = Field(default=DEFAULT_CACHE_SCHEMA_NAME) 52 data_retention_time_in_days: int | None = None 53 54 def _validate_authentication_config(self) -> None: 55 """Validate that authentication configuration is correct.""" 56 auth_methods = { 57 "password": self.password is not None, 58 "private_key": self.private_key is not None, 59 "private_key_path": self.private_key_path is not None, 60 } 61 has_passphrase = self.private_key_passphrase is not None 62 primary_auth_count = sum(auth_methods.values()) 63 64 if primary_auth_count == 0: 65 if has_passphrase: 66 raise ValueError( 67 "You have to provide a primary authentication method " 68 "if you want to use a private key passphrase." 69 ) 70 return 71 72 if primary_auth_count > 1: 73 provided_methods = [method for method, has_method in auth_methods.items() if has_method] 74 raise ValueError( 75 f"Multiple primary authentication methods provided: {', '.join(provided_methods)}. " 76 "Please provide only one of: 'password', 'private_key', or 'private_key_path'." 77 ) 78 79 if has_passphrase and auth_methods["password"]: 80 raise ValueError( 81 "private_key_passphrase cannot be used with password authentication. " 82 "It can only be used with 'private_key' or 'private_key_path'." 83 ) 84 85 def _get_private_key_content(self) -> bytes: 86 """Get the private key content from either private_key or private_key_path.""" 87 if self.private_key: 88 return str(self.private_key).encode("utf-8") 89 if self.private_key_path: 90 return Path(self.private_key_path).read_bytes() 91 raise ValueError("No private key provided") 92 93 def _get_private_key_bytes(self) -> bytes: 94 private_key_content = self._get_private_key_content() 95 96 passphrase = None 97 if self.private_key_passphrase: 98 passphrase = str(self.private_key_passphrase).encode("utf-8") 99 100 private_key = serialization.load_pem_private_key( 101 private_key_content, 102 password=passphrase, 103 backend=default_backend(), 104 ) 105 106 return private_key.private_bytes( 107 encoding=serialization.Encoding.DER, 108 format=serialization.PrivateFormat.PKCS8, 109 encryption_algorithm=serialization.NoEncryption(), 110 ) 111 112 @overrides 113 def get_sql_alchemy_connect_args(self) -> dict[str, Any]: 114 """Return the SQL Alchemy connect_args.""" 115 if self.private_key is None and self.private_key_path is None: 116 return {} 117 return {"private_key": self._get_private_key_bytes()} 118 119 @overrides 120 def get_create_table_extra_clauses(self) -> list[str]: 121 """Return a list of clauses to append on CREATE TABLE statements.""" 122 clauses = [] 123 124 if self.data_retention_time_in_days is not None: 125 clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}") 126 127 return clauses 128 129 @overrides 130 def get_database_name(self) -> str: 131 """Return the name of the database.""" 132 return self.database 133 134 @overrides 135 def get_sql_alchemy_url(self) -> SecretString: 136 """Return the SQLAlchemy URL to use.""" 137 self._validate_authentication_config() 138 139 config = { 140 "account": self.account, 141 "user": self.username, 142 "database": self.database, 143 "warehouse": self.warehouse, 144 "schema": self.schema_name, 145 "role": self.role, 146 } 147 # password is absent when using key pair authentication 148 if self.password: 149 config["password"] = self.password 150 return SecretString(URL(**config)) 151 152 def get_vendor_client(self) -> object: 153 """Return the Snowflake connection object.""" 154 self._validate_authentication_config() 155 156 connection_config: dict[str, Any] = { 157 "user": self.username, 158 "account": self.account, 159 "warehouse": self.warehouse, 160 "database": self.database, 161 "schema": self.schema_name, 162 "role": self.role, 163 } 164 165 if self.password: 166 connection_config["password"] = self.password 167 elif self.private_key_path: 168 connection_config["private_key_file"] = self.private_key_path 169 if self.private_key_passphrase: 170 connection_config["private_key_file_pwd"] = self.private_key_passphrase 171 connection_config["authenticator"] = "SNOWFLAKE_JWT" 172 else: 173 connection_config["private_key"] = self._get_private_key_bytes() 174 connection_config["authenticator"] = "SNOWFLAKE_JWT" 175 176 return connector.connect(**connection_config)
Configuration for the Snowflake cache.
password: airbyte.secrets.SecretString | None
private_key: airbyte.secrets.SecretString | None
private_key_passphrase: airbyte.secrets.SecretString | None
@overrides
def
get_sql_alchemy_connect_args(self) -> dict[str, typing.Any]:
112 @overrides 113 def get_sql_alchemy_connect_args(self) -> dict[str, Any]: 114 """Return the SQL Alchemy connect_args.""" 115 if self.private_key is None and self.private_key_path is None: 116 return {} 117 return {"private_key": self._get_private_key_bytes()}
Return the SQL Alchemy connect_args.
@overrides
def
get_create_table_extra_clauses(self) -> list[str]:
119 @overrides 120 def get_create_table_extra_clauses(self) -> list[str]: 121 """Return a list of clauses to append on CREATE TABLE statements.""" 122 clauses = [] 123 124 if self.data_retention_time_in_days is not None: 125 clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}") 126 127 return clauses
Return a list of clauses to append on CREATE TABLE statements.
@overrides
def
get_database_name(self) -> str:
129 @overrides 130 def get_database_name(self) -> str: 131 """Return the name of the database.""" 132 return self.database
Return the name of the database.
134 @overrides 135 def get_sql_alchemy_url(self) -> SecretString: 136 """Return the SQLAlchemy URL to use.""" 137 self._validate_authentication_config() 138 139 config = { 140 "account": self.account, 141 "user": self.username, 142 "database": self.database, 143 "warehouse": self.warehouse, 144 "schema": self.schema_name, 145 "role": self.role, 146 } 147 # password is absent when using key pair authentication 148 if self.password: 149 config["password"] = self.password 150 return SecretString(URL(**config))
Return the SQLAlchemy URL to use.
def
get_vendor_client(self) -> object:
152 def get_vendor_client(self) -> object: 153 """Return the Snowflake connection object.""" 154 self._validate_authentication_config() 155 156 connection_config: dict[str, Any] = { 157 "user": self.username, 158 "account": self.account, 159 "warehouse": self.warehouse, 160 "database": self.database, 161 "schema": self.schema_name, 162 "role": self.role, 163 } 164 165 if self.password: 166 connection_config["password"] = self.password 167 elif self.private_key_path: 168 connection_config["private_key_file"] = self.private_key_path 169 if self.private_key_passphrase: 170 connection_config["private_key_file_pwd"] = self.private_key_passphrase 171 connection_config["authenticator"] = "SNOWFLAKE_JWT" 172 else: 173 connection_config["private_key"] = self._get_private_key_bytes() 174 connection_config["authenticator"] = "SNOWFLAKE_JWT" 175 176 return connector.connect(**connection_config)
Return the Snowflake connection object.
model_config: ClassVar[pydantic.config.ConfigDict] =
{}
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- config_hash
- get_sql_engine