airbyte.caches.motherduck

A MotherDuck implementation of the PyAirbyte cache, built on DuckDB.

Usage Example

```python from airbyte as ab from airbyte.caches import MotherDuckCache

cache = MotherDuckCache( database="mydatabase", schema_name="myschema", api_key=ab.get_secret("MOTHERDUCK_API_KEY"), )

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""A MotherDuck implementation of the PyAirbyte cache, built on DuckDB.
 3
 4## Usage Example
 5
 6```python
 7from airbyte as ab
 8from airbyte.caches import MotherDuckCache
 9
10cache = MotherDuckCache(
11    database="mydatabase",
12    schema_name="myschema",
13    api_key=ab.get_secret("MOTHERDUCK_API_KEY"),
14)
15"""
16
17from __future__ import annotations
18
19import warnings
20from typing import TYPE_CHECKING, ClassVar
21
22from airbyte_api.models import DestinationDuckdb
23from duckdb_engine import DuckDBEngineWarning
24from overrides import overrides
25from pydantic import Field
26
27from airbyte._processors.sql.duckdb import DuckDBConfig
28from airbyte._processors.sql.motherduck import MotherDuckSqlProcessor
29from airbyte.caches.duckdb import DuckDBCache
30from airbyte.destinations._translate_cache_to_dest import (
31    motherduck_cache_to_destination_configuration,
32)
33from airbyte.secrets import SecretString
34
35
36if TYPE_CHECKING:
37    from airbyte.shared.sql_processor import SqlProcessorBase
38
39
40class MotherDuckConfig(DuckDBConfig):
41    """Configuration for the MotherDuck cache."""
42
43    database: str = Field()
44    api_key: SecretString = Field()
45    db_path: str = Field(default="md:")
46    _paired_destination_name: str = "destination-motherduck"
47
48    @overrides
49    def get_sql_alchemy_url(self) -> SecretString:
50        """Return the SQLAlchemy URL to use."""
51        # Suppress warnings from DuckDB about reflection on indices.
52        # https://github.com/Mause/duckdb_engine/issues/905
53        warnings.filterwarnings(
54            "ignore",
55            message="duckdb-engine doesn't yet support reflection on indices",
56            category=DuckDBEngineWarning,
57        )
58
59        return SecretString(
60            f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
61            # Not sure why this doesn't work. We have to override later in the flow.
62            # f"&schema={self.schema_name}"
63        )
64
65    @overrides
66    def get_database_name(self) -> str:
67        """Return the name of the database."""
68        return self.database
69
70
71class MotherDuckCache(MotherDuckConfig, DuckDBCache):
72    """Cache that uses MotherDuck for external persistent storage."""
73
74    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor
75
76    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
77    paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb
78
79    @property
80    def paired_destination_config(self) -> DestinationDuckdb:
81        """Return a dictionary of destination configuration values."""
82        return motherduck_cache_to_destination_configuration(cache=self)
83
84
85# Expose the Cache class and also the Config class.
86__all__ = [
87    "MotherDuckCache",
88    "MotherDuckConfig",
89]
class MotherDuckCache(MotherDuckConfig, airbyte.caches.duckdb.DuckDBCache):
72class MotherDuckCache(MotherDuckConfig, DuckDBCache):
73    """Cache that uses MotherDuck for external persistent storage."""
74
75    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor
76
77    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
78    paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb
79
80    @property
81    def paired_destination_config(self) -> DestinationDuckdb:
82        """Return a dictionary of destination configuration values."""
83        return motherduck_cache_to_destination_configuration(cache=self)

Cache that uses MotherDuck for external persistent storage.

paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_duckdb.DestinationDuckdb'>
paired_destination_config: airbyte_api.models.destination_duckdb.DestinationDuckdb
80    @property
81    def paired_destination_config(self) -> DestinationDuckdb:
82        """Return a dictionary of destination configuration values."""
83        return motherduck_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
MotherDuckConfig
database
api_key
db_path
get_sql_alchemy_url
get_database_name
airbyte._processors.sql.duckdb.DuckDBConfig
schema_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class MotherDuckConfig(airbyte._processors.sql.duckdb.DuckDBConfig):
41class MotherDuckConfig(DuckDBConfig):
42    """Configuration for the MotherDuck cache."""
43
44    database: str = Field()
45    api_key: SecretString = Field()
46    db_path: str = Field(default="md:")
47    _paired_destination_name: str = "destination-motherduck"
48
49    @overrides
50    def get_sql_alchemy_url(self) -> SecretString:
51        """Return the SQLAlchemy URL to use."""
52        # Suppress warnings from DuckDB about reflection on indices.
53        # https://github.com/Mause/duckdb_engine/issues/905
54        warnings.filterwarnings(
55            "ignore",
56            message="duckdb-engine doesn't yet support reflection on indices",
57            category=DuckDBEngineWarning,
58        )
59
60        return SecretString(
61            f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
62            # Not sure why this doesn't work. We have to override later in the flow.
63            # f"&schema={self.schema_name}"
64        )
65
66    @overrides
67    def get_database_name(self) -> str:
68        """Return the name of the database."""
69        return self.database

Configuration for the MotherDuck cache.

database: str
db_path: str

Normally db_path is a Path object.

The database name will be inferred from the file name. For example, given a db_path of /path/to/my/duckdb-file, the database name is my_db.

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
49    @overrides
50    def get_sql_alchemy_url(self) -> SecretString:
51        """Return the SQLAlchemy URL to use."""
52        # Suppress warnings from DuckDB about reflection on indices.
53        # https://github.com/Mause/duckdb_engine/issues/905
54        warnings.filterwarnings(
55            "ignore",
56            message="duckdb-engine doesn't yet support reflection on indices",
57            category=DuckDBEngineWarning,
58        )
59
60        return SecretString(
61            f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
62            # Not sure why this doesn't work. We have to override later in the flow.
63            # f"&schema={self.schema_name}"
64        )

Return the SQLAlchemy URL to use.

@overrides
def get_database_name(self) -> str:
66    @overrides
67    def get_database_name(self) -> str:
68        """Return the name of the database."""
69        return self.database

Return the name of the database.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
384def init_private_attributes(self: BaseModel, context: Any, /) -> None:
385    """This function is meant to behave like a BaseModel method to initialise private attributes.
386
387    It takes context as an argument since that's what pydantic-core passes when calling it.
388
389    Args:
390        self: The BaseModel instance.
391        context: The context.
392    """
393    if getattr(self, '__pydantic_private__', None) is None:
394        pydantic_private = {}
395        for name, private_attr in self.__private_attributes__.items():
396            default = private_attr.get_default()
397            if default is not PydanticUndefined:
398                pydantic_private[name] = default
399        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._processors.sql.duckdb.DuckDBConfig
schema_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_create_table_extra_clauses
get_vendor_client