airbyte.caches.snowflake
A Snowflake implementation of the PyAirbyte cache.
Usage Example
Password connection:
from airbyte as ab
from airbyte.caches import SnowflakeCache
cache = SnowflakeCache(
account="myaccount",
username="myusername",
password=ab.get_secret("SNOWFLAKE_PASSWORD"), # optional
warehouse="mywarehouse",
database="mydatabase",
role="myrole",
schema_name="myschema",
)
Private key connection:
from airbyte as ab
from airbyte.caches import SnowflakeCache
cache = SnowflakeCache(
account="myaccount",
username="myusername",
private_key=ab.get_secret("SNOWFLAKE_PRIVATE_KEY"),
private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional
warehouse="mywarehouse",
database="mydatabase",
role="myrole",
schema_name="myschema",
)
Private key path connection:
from airbyte as ab
from airbyte.caches import SnowflakeCache
cache = SnowflakeCache(
account="myaccount",
username="myusername",
private_key_path="path/to/my/private_key.pem",
private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional
warehouse="mywarehouse",
database="mydatabase",
role="myrole",
schema_name="myschema",
)
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""A Snowflake implementation of the PyAirbyte cache. 3 4## Usage Example 5 6# Password connection: 7 8```python 9from airbyte as ab 10from airbyte.caches import SnowflakeCache 11 12cache = SnowflakeCache( 13 account="myaccount", 14 username="myusername", 15 password=ab.get_secret("SNOWFLAKE_PASSWORD"), # optional 16 warehouse="mywarehouse", 17 database="mydatabase", 18 role="myrole", 19 schema_name="myschema", 20) 21``` 22 23# Private key connection: 24 25```python 26from airbyte as ab 27from airbyte.caches import SnowflakeCache 28 29cache = SnowflakeCache( 30 account="myaccount", 31 username="myusername", 32 private_key=ab.get_secret("SNOWFLAKE_PRIVATE_KEY"), 33 private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional 34 warehouse="mywarehouse", 35 database="mydatabase", 36 role="myrole", 37 schema_name="myschema", 38) 39``` 40 41# Private key path connection: 42 43```python 44from airbyte as ab 45from airbyte.caches import SnowflakeCache 46 47cache = SnowflakeCache( 48 account="myaccount", 49 username="myusername", 50 private_key_path="path/to/my/private_key.pem", 51 private_key_passphrase=ab.get_secret("SNOWFLAKE_PRIVATE_KEY_PASSPHRASE"), # optional 52 warehouse="mywarehouse", 53 database="mydatabase", 54 role="myrole", 55 schema_name="myschema", 56) 57``` 58""" 59 60from __future__ import annotations 61 62from typing import ClassVar 63 64from airbyte_api.models import DestinationSnowflake 65 66from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor 67from airbyte.caches.base import CacheBase 68from airbyte.destinations._translate_cache_to_dest import ( 69 snowflake_cache_to_destination_configuration, 70) 71from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase 72 73 74class SnowflakeCache(SnowflakeConfig, CacheBase): 75 """Configuration for the Snowflake cache.""" 76 77 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 78 79 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 80 81 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 82 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 83 84 @property 85 def paired_destination_config(self) -> DestinationSnowflake: 86 """Return a dictionary of destination configuration values.""" 87 return snowflake_cache_to_destination_configuration(cache=self) 88 89 90# Expose the Cache class and also the Config class. 91__all__ = [ 92 "SnowflakeCache", 93 "SnowflakeConfig", 94]
75class SnowflakeCache(SnowflakeConfig, CacheBase): 76 """Configuration for the Snowflake cache.""" 77 78 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 79 80 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 81 82 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 83 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 84 85 @property 86 def paired_destination_config(self) -> DestinationSnowflake: 87 """Return a dictionary of destination configuration values.""" 88 return snowflake_cache_to_destination_configuration(cache=self)
Configuration for the Snowflake cache.
85 @property 86 def paired_destination_config(self) -> DestinationSnowflake: 87 """Return a dictionary of destination configuration values.""" 88 return snowflake_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- close
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- SnowflakeConfig
- account
- username
- password
- private_key
- private_key_path
- private_key_passphrase
- warehouse
- database
- role
- schema_name
- data_retention_time_in_days
- get_sql_alchemy_connect_args
- get_create_table_extra_clauses
- get_database_name
- get_sql_alchemy_url
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- dispose_engine
- pydantic.main.BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
37class SnowflakeConfig(SqlConfig): 38 """Configuration for the Snowflake cache.""" 39 40 account: str 41 username: str 42 password: SecretString | None = None 43 44 private_key: SecretString | None = None 45 private_key_path: str | None = None 46 private_key_passphrase: SecretString | None = None 47 48 warehouse: str 49 database: str 50 role: str 51 schema_name: str = Field(default=DEFAULT_CACHE_SCHEMA_NAME) 52 data_retention_time_in_days: int | None = None 53 54 def _validate_authentication_config(self) -> None: 55 """Validate that authentication configuration is correct.""" 56 auth_methods = { 57 "password": self.password is not None, 58 "private_key": self.private_key is not None, 59 "private_key_path": self.private_key_path is not None, 60 } 61 has_passphrase = self.private_key_passphrase is not None 62 primary_auth_count = sum(auth_methods.values()) 63 64 if primary_auth_count == 0: 65 if has_passphrase: 66 raise ValueError( 67 "You have to provide a primary authentication method " 68 "if you want to use a private key passphrase." 69 ) 70 return 71 72 if primary_auth_count > 1: 73 provided_methods = [method for method, has_method in auth_methods.items() if has_method] 74 raise ValueError( 75 f"Multiple primary authentication methods provided: {', '.join(provided_methods)}. " 76 "Please provide only one of: 'password', 'private_key', or 'private_key_path'." 77 ) 78 79 if has_passphrase and auth_methods["password"]: 80 raise ValueError( 81 "private_key_passphrase cannot be used with password authentication. " 82 "It can only be used with 'private_key' or 'private_key_path'." 83 ) 84 85 def _get_private_key_content(self) -> bytes: 86 """Get the private key content from either private_key or private_key_path.""" 87 if self.private_key: 88 return str(self.private_key).encode("utf-8") 89 if self.private_key_path: 90 return Path(self.private_key_path).read_bytes() 91 raise ValueError("No private key provided") 92 93 def _get_private_key_bytes(self) -> bytes: 94 private_key_content = self._get_private_key_content() 95 96 passphrase = None 97 if self.private_key_passphrase: 98 passphrase = str(self.private_key_passphrase).encode("utf-8") 99 100 private_key = serialization.load_pem_private_key( 101 private_key_content, 102 password=passphrase, 103 backend=default_backend(), 104 ) 105 106 return private_key.private_bytes( 107 encoding=serialization.Encoding.DER, 108 format=serialization.PrivateFormat.PKCS8, 109 encryption_algorithm=serialization.NoEncryption(), 110 ) 111 112 @overrides 113 def get_sql_alchemy_connect_args(self) -> dict[str, Any]: 114 """Return the SQL Alchemy connect_args.""" 115 if self.private_key is None and self.private_key_path is None: 116 return {} 117 return {"private_key": self._get_private_key_bytes()} 118 119 @overrides 120 def get_create_table_extra_clauses(self) -> list[str]: 121 """Return a list of clauses to append on CREATE TABLE statements.""" 122 clauses = [] 123 124 if self.data_retention_time_in_days is not None: 125 clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}") 126 127 return clauses 128 129 @overrides 130 def get_database_name(self) -> str: 131 """Return the name of the database.""" 132 return self.database 133 134 @overrides 135 def get_sql_alchemy_url(self) -> SecretString: 136 """Return the SQLAlchemy URL to use.""" 137 self._validate_authentication_config() 138 139 config = { 140 "account": self.account, 141 "user": self.username, 142 "database": self.database, 143 "warehouse": self.warehouse, 144 "schema": self.schema_name, 145 "role": self.role, 146 } 147 # password is absent when using key pair authentication 148 if self.password: 149 config["password"] = self.password 150 return SecretString(URL(**config)) 151 152 def get_vendor_client(self) -> object: 153 """Return the Snowflake connection object.""" 154 self._validate_authentication_config() 155 156 connection_config: dict[str, Any] = { 157 "user": self.username, 158 "account": self.account, 159 "warehouse": self.warehouse, 160 "database": self.database, 161 "schema": self.schema_name, 162 "role": self.role, 163 } 164 165 if self.password: 166 connection_config["password"] = self.password 167 elif self.private_key_path: 168 connection_config["private_key_file"] = self.private_key_path 169 if self.private_key_passphrase: 170 connection_config["private_key_file_pwd"] = self.private_key_passphrase 171 connection_config["authenticator"] = "SNOWFLAKE_JWT" 172 else: 173 connection_config["private_key"] = self._get_private_key_bytes() 174 connection_config["authenticator"] = "SNOWFLAKE_JWT" 175 176 return connector.connect(**connection_config)
Configuration for the Snowflake cache.
112 @overrides 113 def get_sql_alchemy_connect_args(self) -> dict[str, Any]: 114 """Return the SQL Alchemy connect_args.""" 115 if self.private_key is None and self.private_key_path is None: 116 return {} 117 return {"private_key": self._get_private_key_bytes()}
Return the SQL Alchemy connect_args.
119 @overrides 120 def get_create_table_extra_clauses(self) -> list[str]: 121 """Return a list of clauses to append on CREATE TABLE statements.""" 122 clauses = [] 123 124 if self.data_retention_time_in_days is not None: 125 clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}") 126 127 return clauses
Return a list of clauses to append on CREATE TABLE statements.
129 @overrides 130 def get_database_name(self) -> str: 131 """Return the name of the database.""" 132 return self.database
Return the name of the database.
134 @overrides 135 def get_sql_alchemy_url(self) -> SecretString: 136 """Return the SQLAlchemy URL to use.""" 137 self._validate_authentication_config() 138 139 config = { 140 "account": self.account, 141 "user": self.username, 142 "database": self.database, 143 "warehouse": self.warehouse, 144 "schema": self.schema_name, 145 "role": self.role, 146 } 147 # password is absent when using key pair authentication 148 if self.password: 149 config["password"] = self.password 150 return SecretString(URL(**config))
Return the SQLAlchemy URL to use.
152 def get_vendor_client(self) -> object: 153 """Return the Snowflake connection object.""" 154 self._validate_authentication_config() 155 156 connection_config: dict[str, Any] = { 157 "user": self.username, 158 "account": self.account, 159 "warehouse": self.warehouse, 160 "database": self.database, 161 "schema": self.schema_name, 162 "role": self.role, 163 } 164 165 if self.password: 166 connection_config["password"] = self.password 167 elif self.private_key_path: 168 connection_config["private_key_file"] = self.private_key_path 169 if self.private_key_passphrase: 170 connection_config["private_key_file_pwd"] = self.private_key_passphrase 171 connection_config["authenticator"] = "SNOWFLAKE_JWT" 172 else: 173 connection_config["private_key"] = self._get_private_key_bytes() 174 connection_config["authenticator"] = "SNOWFLAKE_JWT" 175 176 return connector.connect(**connection_config)
Return the Snowflake connection object.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- config_hash
- get_sql_engine
- dispose_engine