airbyte.caches.snowflake
A Snowflake implementation of the PyAirbyte cache.
Usage Example
from airbyte as ab
from airbyte.caches import SnowflakeCache
cache = SnowflakeCache(
account="myaccount",
username="myusername",
password=ab.get_secret("SNOWFLAKE_PASSWORD"),
warehouse="mywarehouse",
database="mydatabase",
role="myrole",
schema_name="myschema",
)
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""A Snowflake implementation of the PyAirbyte cache. 3 4## Usage Example 5 6```python 7from airbyte as ab 8from airbyte.caches import SnowflakeCache 9 10cache = SnowflakeCache( 11 account="myaccount", 12 username="myusername", 13 password=ab.get_secret("SNOWFLAKE_PASSWORD"), 14 warehouse="mywarehouse", 15 database="mydatabase", 16 role="myrole", 17 schema_name="myschema", 18) 19``` 20""" 21 22from __future__ import annotations 23 24from typing import ClassVar 25 26from airbyte_api.models import DestinationSnowflake 27 28from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor 29from airbyte.caches.base import CacheBase 30from airbyte.destinations._translate_cache_to_dest import ( 31 snowflake_cache_to_destination_configuration, 32) 33from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase 34 35 36class SnowflakeCache(SnowflakeConfig, CacheBase): 37 """Configuration for the Snowflake cache.""" 38 39 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 40 41 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 42 43 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 44 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 45 46 @property 47 def paired_destination_config(self) -> DestinationSnowflake: 48 """Return a dictionary of destination configuration values.""" 49 return snowflake_cache_to_destination_configuration(cache=self) 50 51 52# Expose the Cache class and also the Config class. 53__all__ = [ 54 "SnowflakeCache", 55 "SnowflakeConfig", 56]
37class SnowflakeCache(SnowflakeConfig, CacheBase): 38 """Configuration for the Snowflake cache.""" 39 40 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 41 42 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 43 44 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 45 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 46 47 @property 48 def paired_destination_config(self) -> DestinationSnowflake: 49 """Return a dictionary of destination configuration values.""" 50 return snowflake_cache_to_destination_configuration(cache=self)
Configuration for the Snowflake cache.
paired_destination_config_class: ClassVar[type | None] =
<class 'airbyte_api.models.destination_snowflake.DestinationSnowflake'>
paired_destination_config: airbyte_api.models.destination_snowflake.DestinationSnowflake
47 @property 48 def paired_destination_config(self) -> DestinationSnowflake: 49 """Return a dictionary of destination configuration values.""" 50 return snowflake_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
model_config: ClassVar[pydantic.config.ConfigDict] =
{}
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
def
model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- SnowflakeConfig
- account
- username
- password
- warehouse
- database
- role
- schema_name
- data_retention_time_in_days
- get_create_table_extra_clauses
- get_database_name
- get_sql_alchemy_url
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
class
SnowflakeConfig(airbyte.shared.sql_processor.SqlConfig):
36class SnowflakeConfig(SqlConfig): 37 """Configuration for the Snowflake cache.""" 38 39 account: str 40 username: str 41 password: SecretString 42 warehouse: str 43 database: str 44 role: str 45 schema_name: str = Field(default=DEFAULT_CACHE_SCHEMA_NAME) 46 data_retention_time_in_days: int | None = None 47 48 @overrides 49 def get_create_table_extra_clauses(self) -> list[str]: 50 """Return a list of clauses to append on CREATE TABLE statements.""" 51 clauses = [] 52 53 if self.data_retention_time_in_days is not None: 54 clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}") 55 56 return clauses 57 58 @overrides 59 def get_database_name(self) -> str: 60 """Return the name of the database.""" 61 return self.database 62 63 @overrides 64 def get_sql_alchemy_url(self) -> SecretString: 65 """Return the SQLAlchemy URL to use.""" 66 return SecretString( 67 URL( 68 account=self.account, 69 user=self.username, 70 password=self.password, 71 database=self.database, 72 warehouse=self.warehouse, 73 schema=self.schema_name, 74 role=self.role, 75 ) 76 ) 77 78 def get_vendor_client(self) -> object: 79 """Return the Snowflake connection object.""" 80 return connector.connect( 81 user=self.username, 82 password=self.password, 83 account=self.account, 84 warehouse=self.warehouse, 85 database=self.database, 86 schema=self.schema_name, 87 role=self.role, 88 )
Configuration for the Snowflake cache.
password: airbyte.secrets.SecretString
@overrides
def
get_create_table_extra_clauses(self) -> list[str]:
48 @overrides 49 def get_create_table_extra_clauses(self) -> list[str]: 50 """Return a list of clauses to append on CREATE TABLE statements.""" 51 clauses = [] 52 53 if self.data_retention_time_in_days is not None: 54 clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}") 55 56 return clauses
Return a list of clauses to append on CREATE TABLE statements.
@overrides
def
get_database_name(self) -> str:
58 @overrides 59 def get_database_name(self) -> str: 60 """Return the name of the database.""" 61 return self.database
Return the name of the database.
63 @overrides 64 def get_sql_alchemy_url(self) -> SecretString: 65 """Return the SQLAlchemy URL to use.""" 66 return SecretString( 67 URL( 68 account=self.account, 69 user=self.username, 70 password=self.password, 71 database=self.database, 72 warehouse=self.warehouse, 73 schema=self.schema_name, 74 role=self.role, 75 ) 76 )
Return the SQLAlchemy URL to use.
def
get_vendor_client(self) -> object:
78 def get_vendor_client(self) -> object: 79 """Return the Snowflake connection object.""" 80 return connector.connect( 81 user=self.username, 82 password=self.password, 83 account=self.account, 84 warehouse=self.warehouse, 85 database=self.database, 86 schema=self.schema_name, 87 role=self.role, 88 )
Return the Snowflake connection object.
model_config: ClassVar[pydantic.config.ConfigDict] =
{}
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- config_hash
- get_sql_engine