airbyte.caches.duckdb

A DuckDB implementation of the PyAirbyte cache.

Usage Example

from airbyte as ab
from airbyte.caches import DuckDBCache

cache = DuckDBCache(
    db_path="/path/to/my/duckdb-file",
    schema_name="myschema",
)
 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2"""A DuckDB implementation of the PyAirbyte cache.
 3
 4## Usage Example
 5
 6```python
 7from airbyte as ab
 8from airbyte.caches import DuckDBCache
 9
10cache = DuckDBCache(
11    db_path="/path/to/my/duckdb-file",
12    schema_name="myschema",
13)
14```
15"""
16
17from __future__ import annotations
18
19import warnings
20from typing import TYPE_CHECKING, ClassVar
21
22from airbyte_api.models import DestinationDuckdb
23from duckdb_engine import DuckDBEngineWarning
24
25from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor
26from airbyte.caches.base import CacheBase
27from airbyte.destinations._translate_cache_to_dest import duckdb_cache_to_destination_configuration
28
29
30if TYPE_CHECKING:
31    from airbyte.shared.sql_processor import SqlProcessorBase
32
33
34# Suppress warnings from DuckDB about reflection on indices.
35# https://github.com/Mause/duckdb_engine/issues/905
36warnings.filterwarnings(
37    "ignore",
38    message="duckdb-engine doesn't yet support reflection on indices",
39    category=DuckDBEngineWarning,
40)
41
42
43class DuckDBCache(DuckDBConfig, CacheBase):
44    """A DuckDB cache."""
45
46    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor
47
48    paired_destination_name: ClassVar[str | None] = "destination-duckdb"
49    paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb
50
51    @property
52    def paired_destination_config(self) -> DestinationDuckdb:
53        """Return a dictionary of destination configuration values."""
54        return duckdb_cache_to_destination_configuration(cache=self)
55
56
57# Expose the Cache class and also the Config class.
58__all__ = [
59    "DuckDBCache",
60    "DuckDBConfig",
61]
44class DuckDBCache(DuckDBConfig, CacheBase):
45    """A DuckDB cache."""
46
47    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor
48
49    paired_destination_name: ClassVar[str | None] = "destination-duckdb"
50    paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb
51
52    @property
53    def paired_destination_config(self) -> DestinationDuckdb:
54        """Return a dictionary of destination configuration values."""
55        return duckdb_cache_to_destination_configuration(cache=self)

A DuckDB cache.

paired_destination_name: ClassVar[str | None] = 'destination-duckdb'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_duckdb.DestinationDuckdb'>
paired_destination_config: airbyte_api.models.destination_duckdb.DestinationDuckdb
52    @property
53    def paired_destination_config(self) -> DestinationDuckdb:
54        """Return a dictionary of destination configuration values."""
55        return duckdb_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
DuckDBConfig
db_path
schema_name
get_sql_alchemy_url
get_database_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class DuckDBConfig(airbyte.shared.sql_processor.SqlConfig):
28class DuckDBConfig(SqlConfig):
29    """Configuration for DuckDB."""
30
31    db_path: Path | str = Field()
32    """Normally db_path is a Path object.
33
34    The database name will be inferred from the file name. For example, given a `db_path` of
35    `/path/to/my/duckdb-file`, the database name is `my_db`.
36    """
37
38    schema_name: str = Field(default="main")
39    """The name of the schema to write to. Defaults to "main"."""
40
41    @overrides
42    def get_sql_alchemy_url(self) -> SecretString:
43        """Return the SQLAlchemy URL to use."""
44        # Suppress warnings from DuckDB about reflection on indices.
45        # https://github.com/Mause/duckdb_engine/issues/905
46        warnings.filterwarnings(
47            "ignore",
48            message="duckdb-engine doesn't yet support reflection on indices",
49            category=DuckDBEngineWarning,
50        )
51        return SecretString(f"duckdb:///{self.db_path!s}")
52
53    @overrides
54    def get_database_name(self) -> str:
55        """Return the name of the database."""
56        if self.db_path == ":memory:":
57            return "memory"
58
59        # Split the path on the appropriate separator ("/" or "\")
60        split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/"
61
62        # Return the file name without the extension
63        return str(self.db_path).split(sep=split_on)[-1].split(".")[0]
64
65    def _is_file_based_db(self) -> bool:
66        """Return whether the database is file-based."""
67        if isinstance(self.db_path, Path):
68            return True
69
70        db_path_str = str(self.db_path)
71        return (
72            ("/" in db_path_str or "\\" in db_path_str)
73            and db_path_str != ":memory:"
74            and "md:" not in db_path_str
75            and "motherduck:" not in db_path_str
76        )
77
78    @overrides
79    def get_sql_engine(self) -> Engine:
80        """Return the SQL Alchemy engine.
81
82        This method is overridden to ensure that the database parent directory is created if it
83        doesn't exist.
84        """
85        if self._is_file_based_db():
86            Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
87
88        return super().get_sql_engine()

Configuration for DuckDB.

db_path: pathlib.Path | str

Normally db_path is a Path object.

The database name will be inferred from the file name. For example, given a db_path of /path/to/my/duckdb-file, the database name is my_db.

schema_name: str

The name of the schema to write to. Defaults to "main".

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
41    @overrides
42    def get_sql_alchemy_url(self) -> SecretString:
43        """Return the SQLAlchemy URL to use."""
44        # Suppress warnings from DuckDB about reflection on indices.
45        # https://github.com/Mause/duckdb_engine/issues/905
46        warnings.filterwarnings(
47            "ignore",
48            message="duckdb-engine doesn't yet support reflection on indices",
49            category=DuckDBEngineWarning,
50        )
51        return SecretString(f"duckdb:///{self.db_path!s}")

Return the SQLAlchemy URL to use.

@overrides
def get_database_name(self) -> str:
53    @overrides
54    def get_database_name(self) -> str:
55        """Return the name of the database."""
56        if self.db_path == ":memory:":
57            return "memory"
58
59        # Split the path on the appropriate separator ("/" or "\")
60        split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/"
61
62        # Return the file name without the extension
63        return str(self.db_path).split(sep=split_on)[-1].split(".")[0]

Return the name of the database.

@overrides
def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
78    @overrides
79    def get_sql_engine(self) -> Engine:
80        """Return the SQL Alchemy engine.
81
82        This method is overridden to ensure that the database parent directory is created if it
83        doesn't exist.
84        """
85        if self._is_file_based_db():
86            Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
87
88        return super().get_sql_engine()

Return the SQL Alchemy engine.

This method is overridden to ensure that the database parent directory is created if it doesn't exist.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_create_table_extra_clauses
get_vendor_client