airbyte.caches.snowflake
A Snowflake implementation of the PyAirbyte cache.
Usage Example
from airbyte as ab
from airbyte.caches import SnowflakeCache
cache = SnowflakeCache(
account="myaccount",
username="myusername",
password=ab.get_secret("SNOWFLAKE_PASSWORD"),
warehouse="mywarehouse",
database="mydatabase",
role="myrole",
schema_name="myschema",
)
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""A Snowflake implementation of the PyAirbyte cache. 3 4## Usage Example 5 6```python 7from airbyte as ab 8from airbyte.caches import SnowflakeCache 9 10cache = SnowflakeCache( 11 account="myaccount", 12 username="myusername", 13 password=ab.get_secret("SNOWFLAKE_PASSWORD"), 14 warehouse="mywarehouse", 15 database="mydatabase", 16 role="myrole", 17 schema_name="myschema", 18) 19``` 20""" 21 22from __future__ import annotations 23 24from pydantic import PrivateAttr 25 26from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor 27from airbyte.caches.base import CacheBase 28from airbyte.shared.sql_processor import RecordDedupeMode 29 30 31class SnowflakeCache(SnowflakeConfig, CacheBase): 32 """Configuration for the Snowflake cache.""" 33 34 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 35 36 _sql_processor_class = PrivateAttr(default=SnowflakeSqlProcessor) 37 38 39# Expose the Cache class and also the Config class. 40__all__ = [ 41 "SnowflakeCache", 42 "SnowflakeConfig", 43]
32class SnowflakeCache(SnowflakeConfig, CacheBase): 33 """Configuration for the Snowflake cache.""" 34 35 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 36 37 _sql_processor_class = PrivateAttr(default=SnowflakeSqlProcessor)
Configuration for the Snowflake cache.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- SnowflakeConfig
- account
- username
- password
- warehouse
- database
- role
- schema_name
- data_retention_time_in_days
- get_create_table_extra_clauses
- get_database_name
- get_sql_alchemy_url
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
36class SnowflakeConfig(SqlConfig): 37 """Configuration for the Snowflake cache.""" 38 39 account: str 40 username: str 41 password: SecretString 42 warehouse: str 43 database: str 44 role: str 45 schema_name: str = Field(default=DEFAULT_CACHE_SCHEMA_NAME) 46 data_retention_time_in_days: int | None = None 47 48 @overrides 49 def get_create_table_extra_clauses(self) -> list[str]: 50 """Return a list of clauses to append on CREATE TABLE statements.""" 51 clauses = [] 52 53 if self.data_retention_time_in_days is not None: 54 clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}") 55 56 return clauses 57 58 @overrides 59 def get_database_name(self) -> str: 60 """Return the name of the database.""" 61 return self.database 62 63 @overrides 64 def get_sql_alchemy_url(self) -> SecretString: 65 """Return the SQLAlchemy URL to use.""" 66 return SecretString( 67 URL( 68 account=self.account, 69 user=self.username, 70 password=self.password, 71 database=self.database, 72 warehouse=self.warehouse, 73 schema=self.schema_name, 74 role=self.role, 75 ) 76 ) 77 78 def get_vendor_client(self) -> object: 79 """Return the Snowflake connection object.""" 80 return connector.connect( 81 user=self.username, 82 password=self.password, 83 account=self.account, 84 warehouse=self.warehouse, 85 database=self.database, 86 schema=self.schema_name, 87 role=self.role, 88 )
Configuration for the Snowflake cache.
48 @overrides 49 def get_create_table_extra_clauses(self) -> list[str]: 50 """Return a list of clauses to append on CREATE TABLE statements.""" 51 clauses = [] 52 53 if self.data_retention_time_in_days is not None: 54 clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}") 55 56 return clauses
Return a list of clauses to append on CREATE TABLE statements.
58 @overrides 59 def get_database_name(self) -> str: 60 """Return the name of the database.""" 61 return self.database
Return the name of the database.
63 @overrides 64 def get_sql_alchemy_url(self) -> SecretString: 65 """Return the SQLAlchemy URL to use.""" 66 return SecretString( 67 URL( 68 account=self.account, 69 user=self.username, 70 password=self.password, 71 database=self.database, 72 warehouse=self.warehouse, 73 schema=self.schema_name, 74 role=self.role, 75 ) 76 )
Return the SQLAlchemy URL to use.
78 def get_vendor_client(self) -> object: 79 """Return the Snowflake connection object.""" 80 return connector.connect( 81 user=self.username, 82 password=self.password, 83 account=self.account, 84 warehouse=self.warehouse, 85 database=self.database, 86 schema=self.schema_name, 87 role=self.role, 88 )
Return the Snowflake connection object.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- config_hash
- get_sql_engine