airbyte.caches.duckdb

A DuckDB implementation of the PyAirbyte cache.

Usage Example

from airbyte as ab
from airbyte.caches import DuckDBCache

cache = DuckDBCache(
    db_path="/path/to/my/duckdb-file",
    schema_name="myschema",
)
 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2"""A DuckDB implementation of the PyAirbyte cache.
 3
 4## Usage Example
 5
 6```python
 7from airbyte as ab
 8from airbyte.caches import DuckDBCache
 9
10cache = DuckDBCache(
11    db_path="/path/to/my/duckdb-file",
12    schema_name="myschema",
13)
14```
15"""
16
17from __future__ import annotations
18
19import warnings
20
21from duckdb_engine import DuckDBEngineWarning
22from pydantic import PrivateAttr
23
24from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor
25from airbyte.caches.base import CacheBase
26
27
28# Suppress warnings from DuckDB about reflection on indices.
29# https://github.com/Mause/duckdb_engine/issues/905
30warnings.filterwarnings(
31    "ignore",
32    message="duckdb-engine doesn't yet support reflection on indices",
33    category=DuckDBEngineWarning,
34)
35
36
37class DuckDBCache(DuckDBConfig, CacheBase):
38    """A DuckDB cache."""
39
40    _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)
41
42
43# Expose the Cache class and also the Config class.
44__all__ = [
45    "DuckDBCache",
46    "DuckDBConfig",
47]
38class DuckDBCache(DuckDBConfig, CacheBase):
39    """A DuckDB cache."""
40
41    _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)

A DuckDB cache.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='main'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'db_path': FieldInfo(annotation=Union[Path, str], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
DuckDBConfig
db_path
schema_name
get_sql_alchemy_url
get_database_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class DuckDBConfig(airbyte.shared.sql_processor.SqlConfig):
28class DuckDBConfig(SqlConfig):
29    """Configuration for DuckDB."""
30
31    db_path: Path | str = Field()
32    """Normally db_path is a Path object.
33
34    The database name will be inferred from the file name. For example, given a `db_path` of
35    `/path/to/my/duckdb-file`, the database name is `my_db`.
36    """
37
38    schema_name: str = Field(default="main")
39    """The name of the schema to write to. Defaults to "main"."""
40
41    @overrides
42    def get_sql_alchemy_url(self) -> SecretString:
43        """Return the SQLAlchemy URL to use."""
44        # Suppress warnings from DuckDB about reflection on indices.
45        # https://github.com/Mause/duckdb_engine/issues/905
46        warnings.filterwarnings(
47            "ignore",
48            message="duckdb-engine doesn't yet support reflection on indices",
49            category=DuckDBEngineWarning,
50        )
51        return SecretString(f"duckdb:///{self.db_path!s}")
52
53    @overrides
54    def get_database_name(self) -> str:
55        """Return the name of the database."""
56        if self.db_path == ":memory:":
57            return "memory"
58
59        # Split the path on the appropriate separator ("/" or "\")
60        split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/"
61
62        # Return the file name without the extension
63        return str(self.db_path).split(sep=split_on)[-1].split(".")[0]
64
65    def _is_file_based_db(self) -> bool:
66        """Return whether the database is file-based."""
67        if isinstance(self.db_path, Path):
68            return True
69
70        db_path_str = str(self.db_path)
71        return (
72            ("/" in db_path_str or "\\" in db_path_str)
73            and db_path_str != ":memory:"
74            and "md:" not in db_path_str
75            and "motherduck:" not in db_path_str
76        )
77
78    @overrides
79    def get_sql_engine(self) -> Engine:
80        """Return the SQL Alchemy engine.
81
82        This method is overridden to ensure that the database parent directory is created if it
83        doesn't exist.
84        """
85        if self._is_file_based_db():
86            Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
87
88        return super().get_sql_engine()

Configuration for DuckDB.

db_path: pathlib.Path | str

Normally db_path is a Path object.

The database name will be inferred from the file name. For example, given a db_path of /path/to/my/duckdb-file, the database name is my_db.

schema_name: str

The name of the schema to write to. Defaults to "main".

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
41    @overrides
42    def get_sql_alchemy_url(self) -> SecretString:
43        """Return the SQLAlchemy URL to use."""
44        # Suppress warnings from DuckDB about reflection on indices.
45        # https://github.com/Mause/duckdb_engine/issues/905
46        warnings.filterwarnings(
47            "ignore",
48            message="duckdb-engine doesn't yet support reflection on indices",
49            category=DuckDBEngineWarning,
50        )
51        return SecretString(f"duckdb:///{self.db_path!s}")

Return the SQLAlchemy URL to use.

@overrides
def get_database_name(self) -> str:
53    @overrides
54    def get_database_name(self) -> str:
55        """Return the name of the database."""
56        if self.db_path == ":memory:":
57            return "memory"
58
59        # Split the path on the appropriate separator ("/" or "\")
60        split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/"
61
62        # Return the file name without the extension
63        return str(self.db_path).split(sep=split_on)[-1].split(".")[0]

Return the name of the database.

@overrides
def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
78    @overrides
79    def get_sql_engine(self) -> Engine:
80        """Return the SQL Alchemy engine.
81
82        This method is overridden to ensure that the database parent directory is created if it
83        doesn't exist.
84        """
85        if self._is_file_based_db():
86            Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
87
88        return super().get_sql_engine()

Return the SQL Alchemy engine.

This method is overridden to ensure that the database parent directory is created if it doesn't exist.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='main'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'db_path': FieldInfo(annotation=Union[Path, str], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_create_table_extra_clauses
get_vendor_client