airbyte.caches.motherduck
A MotherDuck implementation of the PyAirbyte cache, built on DuckDB.
Usage Example
```python from airbyte as ab from airbyte.caches import MotherDuckCache
cache = MotherDuckCache( database="mydatabase", schema_name="myschema", api_key=ab.get_secret("MOTHERDUCK_API_KEY"), )
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""A MotherDuck implementation of the PyAirbyte cache, built on DuckDB. 3 4## Usage Example 5 6```python 7from airbyte as ab 8from airbyte.caches import MotherDuckCache 9 10cache = MotherDuckCache( 11 database="mydatabase", 12 schema_name="myschema", 13 api_key=ab.get_secret("MOTHERDUCK_API_KEY"), 14) 15""" 16 17from __future__ import annotations 18 19import warnings 20from typing import TYPE_CHECKING, ClassVar 21 22from airbyte_api.models import DestinationDuckdb 23from duckdb_engine import DuckDBEngineWarning 24from overrides import overrides 25from pydantic import Field 26 27from airbyte._processors.sql.duckdb import DuckDBConfig 28from airbyte._processors.sql.motherduck import MotherDuckSqlProcessor 29from airbyte.caches.duckdb import DuckDBCache 30from airbyte.destinations._translate_cache_to_dest import ( 31 motherduck_cache_to_destination_configuration, 32) 33from airbyte.secrets import SecretString 34 35 36if TYPE_CHECKING: 37 from airbyte.shared.sql_processor import SqlProcessorBase 38 39 40class MotherDuckConfig(DuckDBConfig): 41 """Configuration for the MotherDuck cache.""" 42 43 database: str = Field() 44 api_key: SecretString = Field() 45 db_path: str = Field(default="md:") # pyrefly: ignore[bad-override] 46 _paired_destination_name: str = "destination-motherduck" 47 48 @overrides 49 def get_sql_alchemy_url(self) -> SecretString: 50 """Return the SQLAlchemy URL to use.""" 51 # Suppress warnings from DuckDB about reflection on indices. 52 # https://github.com/Mause/duckdb_engine/issues/905 53 warnings.filterwarnings( 54 "ignore", 55 message="duckdb-engine doesn't yet support reflection on indices", 56 category=DuckDBEngineWarning, 57 ) 58 59 return SecretString( 60 f"duckdb:///md:{self.database}?motherduck_token={self.api_key}" 61 # Not sure why this doesn't work. We have to override later in the flow. 62 # f"&schema={self.schema_name}" 63 ) 64 65 @overrides 66 def get_database_name(self) -> str: 67 """Return the name of the database.""" 68 return self.database 69 70 71class MotherDuckCache(MotherDuckConfig, DuckDBCache): 72 """Cache that uses MotherDuck for external persistent storage.""" 73 74 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor 75 76 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 77 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 78 79 @property 80 def paired_destination_config(self) -> DestinationDuckdb: 81 """Return a dictionary of destination configuration values.""" 82 return motherduck_cache_to_destination_configuration(cache=self) 83 84 85# Expose the Cache class and also the Config class. 86__all__ = [ 87 "MotherDuckCache", 88 "MotherDuckConfig", 89]
72class MotherDuckCache(MotherDuckConfig, DuckDBCache): 73 """Cache that uses MotherDuck for external persistent storage.""" 74 75 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor 76 77 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 78 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 79 80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Cache that uses MotherDuck for external persistent storage.
paired_destination_config_class: ClassVar[type | None] =
<class 'airbyte_api.models.destination_duckdb.DestinationDuckdb'>
paired_destination_config: airbyte_api.models.destination_duckdb.DestinationDuckdb
80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Inherited Members
class
MotherDuckConfig(airbyte._processors.sql.duckdb.DuckDBConfig):
41class MotherDuckConfig(DuckDBConfig): 42 """Configuration for the MotherDuck cache.""" 43 44 database: str = Field() 45 api_key: SecretString = Field() 46 db_path: str = Field(default="md:") # pyrefly: ignore[bad-override] 47 _paired_destination_name: str = "destination-motherduck" 48 49 @overrides 50 def get_sql_alchemy_url(self) -> SecretString: 51 """Return the SQLAlchemy URL to use.""" 52 # Suppress warnings from DuckDB about reflection on indices. 53 # https://github.com/Mause/duckdb_engine/issues/905 54 warnings.filterwarnings( 55 "ignore", 56 message="duckdb-engine doesn't yet support reflection on indices", 57 category=DuckDBEngineWarning, 58 ) 59 60 return SecretString( 61 f"duckdb:///md:{self.database}?motherduck_token={self.api_key}" 62 # Not sure why this doesn't work. We have to override later in the flow. 63 # f"&schema={self.schema_name}" 64 ) 65 66 @overrides 67 def get_database_name(self) -> str: 68 """Return the name of the database.""" 69 return self.database
Configuration for the MotherDuck cache.
db_path: str =
'md:'
Normally db_path is a Path object.
The database name will be inferred from the file name. For example, given a db_path of
/path/to/my/duckdb-file, the database name is my_db.
49 @overrides 50 def get_sql_alchemy_url(self) -> SecretString: 51 """Return the SQLAlchemy URL to use.""" 52 # Suppress warnings from DuckDB about reflection on indices. 53 # https://github.com/Mause/duckdb_engine/issues/905 54 warnings.filterwarnings( 55 "ignore", 56 message="duckdb-engine doesn't yet support reflection on indices", 57 category=DuckDBEngineWarning, 58 ) 59 60 return SecretString( 61 f"duckdb:///md:{self.database}?motherduck_token={self.api_key}" 62 # Not sure why this doesn't work. We have to override later in the flow. 63 # f"&schema={self.schema_name}" 64 )
Return the SQLAlchemy URL to use.