airbyte.caches.motherduck

A MotherDuck implementation of the PyAirbyte cache, built on DuckDB.

Usage Example

```python from airbyte as ab from airbyte.caches import MotherDuckCache

cache = MotherDuckCache( database="mydatabase", schema_name="myschema", api_key=ab.get_secret("MOTHERDUCK_API_KEY"), )

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""A MotherDuck implementation of the PyAirbyte cache, built on DuckDB.
 3
 4## Usage Example
 5
 6```python
 7from airbyte as ab
 8from airbyte.caches import MotherDuckCache
 9
10cache = MotherDuckCache(
11    database="mydatabase",
12    schema_name="myschema",
13    api_key=ab.get_secret("MOTHERDUCK_API_KEY"),
14)
15"""
16
17from __future__ import annotations
18
19import warnings
20
21from duckdb_engine import DuckDBEngineWarning
22from overrides import overrides
23from pydantic import Field, PrivateAttr
24
25from airbyte._processors.sql.duckdb import DuckDBConfig
26from airbyte._processors.sql.motherduck import MotherDuckSqlProcessor
27from airbyte.caches.duckdb import DuckDBCache
28from airbyte.secrets import SecretString
29
30
31class MotherDuckConfig(DuckDBConfig):
32    """Configuration for the MotherDuck cache."""
33
34    database: str = Field()
35    api_key: SecretString = Field()
36    db_path: str = Field(default="md:")
37
38    @overrides
39    def get_sql_alchemy_url(self) -> SecretString:
40        """Return the SQLAlchemy URL to use."""
41        # Suppress warnings from DuckDB about reflection on indices.
42        # https://github.com/Mause/duckdb_engine/issues/905
43        warnings.filterwarnings(
44            "ignore",
45            message="duckdb-engine doesn't yet support reflection on indices",
46            category=DuckDBEngineWarning,
47        )
48
49        return SecretString(
50            f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
51            # Not sure why this doesn't work. We have to override later in the flow.
52            # f"&schema={self.schema_name}"
53        )
54
55    @overrides
56    def get_database_name(self) -> str:
57        """Return the name of the database."""
58        return self.database
59
60
61class MotherDuckCache(MotherDuckConfig, DuckDBCache):
62    """Cache that uses MotherDuck for external persistent storage."""
63
64    _sql_processor_class: type[MotherDuckSqlProcessor] = PrivateAttr(default=MotherDuckSqlProcessor)
65
66
67# Expose the Cache class and also the Config class.
68__all__ = [
69    "MotherDuckCache",
70    "MotherDuckConfig",
71]
class MotherDuckCache(MotherDuckConfig, airbyte.caches.duckdb.DuckDBCache):
62class MotherDuckCache(MotherDuckConfig, DuckDBCache):
63    """Cache that uses MotherDuck for external persistent storage."""
64
65    _sql_processor_class: type[MotherDuckSqlProcessor] = PrivateAttr(default=MotherDuckSqlProcessor)

Cache that uses MotherDuck for external persistent storage.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='main'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'db_path': FieldInfo(annotation=str, required=False, default='md:'), 'database': FieldInfo(annotation=str, required=True), 'api_key': FieldInfo(annotation=SecretString, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
MotherDuckConfig
database
api_key
db_path
get_sql_alchemy_url
get_database_name
airbyte._processors.sql.duckdb.DuckDBConfig
schema_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class MotherDuckConfig(airbyte._processors.sql.duckdb.DuckDBConfig):
32class MotherDuckConfig(DuckDBConfig):
33    """Configuration for the MotherDuck cache."""
34
35    database: str = Field()
36    api_key: SecretString = Field()
37    db_path: str = Field(default="md:")
38
39    @overrides
40    def get_sql_alchemy_url(self) -> SecretString:
41        """Return the SQLAlchemy URL to use."""
42        # Suppress warnings from DuckDB about reflection on indices.
43        # https://github.com/Mause/duckdb_engine/issues/905
44        warnings.filterwarnings(
45            "ignore",
46            message="duckdb-engine doesn't yet support reflection on indices",
47            category=DuckDBEngineWarning,
48        )
49
50        return SecretString(
51            f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
52            # Not sure why this doesn't work. We have to override later in the flow.
53            # f"&schema={self.schema_name}"
54        )
55
56    @overrides
57    def get_database_name(self) -> str:
58        """Return the name of the database."""
59        return self.database

Configuration for the MotherDuck cache.

database: str
db_path: str

Normally db_path is a Path object.

The database name will be inferred from the file name. For example, given a db_path of /path/to/my/duckdb-file, the database name is my_db.

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
39    @overrides
40    def get_sql_alchemy_url(self) -> SecretString:
41        """Return the SQLAlchemy URL to use."""
42        # Suppress warnings from DuckDB about reflection on indices.
43        # https://github.com/Mause/duckdb_engine/issues/905
44        warnings.filterwarnings(
45            "ignore",
46            message="duckdb-engine doesn't yet support reflection on indices",
47            category=DuckDBEngineWarning,
48        )
49
50        return SecretString(
51            f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
52            # Not sure why this doesn't work. We have to override later in the flow.
53            # f"&schema={self.schema_name}"
54        )

Return the SQLAlchemy URL to use.

@overrides
def get_database_name(self) -> str:
56    @overrides
57    def get_database_name(self) -> str:
58        """Return the name of the database."""
59        return self.database

Return the name of the database.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='main'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'db_path': FieldInfo(annotation=str, required=False, default='md:'), 'database': FieldInfo(annotation=str, required=True), 'api_key': FieldInfo(annotation=SecretString, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._processors.sql.duckdb.DuckDBConfig
schema_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_create_table_extra_clauses
get_vendor_client