airbyte.caches.motherduck
A MotherDuck implementation of the PyAirbyte cache, built on DuckDB.
Usage Example
```python from airbyte as ab from airbyte.caches import MotherDuckCache
cache = MotherDuckCache( database="mydatabase", schema_name="myschema", api_key=ab.get_secret("MOTHERDUCK_API_KEY"), )
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""A MotherDuck implementation of the PyAirbyte cache, built on DuckDB. 3 4## Usage Example 5 6```python 7from airbyte as ab 8from airbyte.caches import MotherDuckCache 9 10cache = MotherDuckCache( 11 database="mydatabase", 12 schema_name="myschema", 13 api_key=ab.get_secret("MOTHERDUCK_API_KEY"), 14) 15""" 16 17from __future__ import annotations 18 19import warnings 20from typing import TYPE_CHECKING, ClassVar 21 22from airbyte_api.models import DestinationDuckdb 23from duckdb_engine import DuckDBEngineWarning 24from overrides import overrides 25from pydantic import Field 26 27from airbyte._processors.sql.duckdb import DuckDBConfig 28from airbyte._processors.sql.motherduck import MotherDuckSqlProcessor 29from airbyte.caches.duckdb import DuckDBCache 30from airbyte.destinations._translate_cache_to_dest import ( 31 motherduck_cache_to_destination_configuration, 32) 33from airbyte.secrets import SecretString 34 35 36if TYPE_CHECKING: 37 from airbyte.shared.sql_processor import SqlProcessorBase 38 39 40class MotherDuckConfig(DuckDBConfig): 41 """Configuration for the MotherDuck cache.""" 42 43 database: str = Field() 44 api_key: SecretString = Field() 45 db_path: str = Field(default="md:") 46 _paired_destination_name: str = "destination-motherduck" 47 48 @overrides 49 def get_sql_alchemy_url(self) -> SecretString: 50 """Return the SQLAlchemy URL to use.""" 51 # Suppress warnings from DuckDB about reflection on indices. 52 # https://github.com/Mause/duckdb_engine/issues/905 53 warnings.filterwarnings( 54 "ignore", 55 message="duckdb-engine doesn't yet support reflection on indices", 56 category=DuckDBEngineWarning, 57 ) 58 59 return SecretString( 60 f"duckdb:///md:{self.database}?motherduck_token={self.api_key}" 61 # Not sure why this doesn't work. We have to override later in the flow. 62 # f"&schema={self.schema_name}" 63 ) 64 65 @overrides 66 def get_database_name(self) -> str: 67 """Return the name of the database.""" 68 return self.database 69 70 71class MotherDuckCache(MotherDuckConfig, DuckDBCache): 72 """Cache that uses MotherDuck for external persistent storage.""" 73 74 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor 75 76 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 77 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 78 79 @property 80 def paired_destination_config(self) -> DestinationDuckdb: 81 """Return a dictionary of destination configuration values.""" 82 return motherduck_cache_to_destination_configuration(cache=self) 83 84 85# Expose the Cache class and also the Config class. 86__all__ = [ 87 "MotherDuckCache", 88 "MotherDuckConfig", 89]
72class MotherDuckCache(MotherDuckConfig, DuckDBCache): 73 """Cache that uses MotherDuck for external persistent storage.""" 74 75 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor 76 77 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 78 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 79 80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Cache that uses MotherDuck for external persistent storage.
80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.duckdb.DuckDBConfig
- schema_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
41class MotherDuckConfig(DuckDBConfig): 42 """Configuration for the MotherDuck cache.""" 43 44 database: str = Field() 45 api_key: SecretString = Field() 46 db_path: str = Field(default="md:") 47 _paired_destination_name: str = "destination-motherduck" 48 49 @overrides 50 def get_sql_alchemy_url(self) -> SecretString: 51 """Return the SQLAlchemy URL to use.""" 52 # Suppress warnings from DuckDB about reflection on indices. 53 # https://github.com/Mause/duckdb_engine/issues/905 54 warnings.filterwarnings( 55 "ignore", 56 message="duckdb-engine doesn't yet support reflection on indices", 57 category=DuckDBEngineWarning, 58 ) 59 60 return SecretString( 61 f"duckdb:///md:{self.database}?motherduck_token={self.api_key}" 62 # Not sure why this doesn't work. We have to override later in the flow. 63 # f"&schema={self.schema_name}" 64 ) 65 66 @overrides 67 def get_database_name(self) -> str: 68 """Return the name of the database.""" 69 return self.database
Configuration for the MotherDuck cache.
Normally db_path is a Path object.
The database name will be inferred from the file name. For example, given a db_path
of
/path/to/my/duckdb-file
, the database name is my_db
.
49 @overrides 50 def get_sql_alchemy_url(self) -> SecretString: 51 """Return the SQLAlchemy URL to use.""" 52 # Suppress warnings from DuckDB about reflection on indices. 53 # https://github.com/Mause/duckdb_engine/issues/905 54 warnings.filterwarnings( 55 "ignore", 56 message="duckdb-engine doesn't yet support reflection on indices", 57 category=DuckDBEngineWarning, 58 ) 59 60 return SecretString( 61 f"duckdb:///md:{self.database}?motherduck_token={self.api_key}" 62 # Not sure why this doesn't work. We have to override later in the flow. 63 # f"&schema={self.schema_name}" 64 )
Return the SQLAlchemy URL to use.
66 @overrides 67 def get_database_name(self) -> str: 68 """Return the name of the database.""" 69 return self.database
Return the name of the database.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
384def init_private_attributes(self: BaseModel, context: Any, /) -> None: 385 """This function is meant to behave like a BaseModel method to initialise private attributes. 386 387 It takes context as an argument since that's what pydantic-core passes when calling it. 388 389 Args: 390 self: The BaseModel instance. 391 context: The context. 392 """ 393 if getattr(self, '__pydantic_private__', None) is None: 394 pydantic_private = {} 395 for name, private_attr in self.__private_attributes__.items(): 396 default = private_attr.get_default() 397 if default is not PydanticUndefined: 398 pydantic_private[name] = default 399 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._processors.sql.duckdb.DuckDBConfig
- schema_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- config_hash
- get_create_table_extra_clauses
- get_vendor_client