airbyte.caches.duckdb
A DuckDB implementation of the PyAirbyte cache.
Usage Example
from airbyte as ab
from airbyte.caches import DuckDBCache
cache = DuckDBCache(
db_path="/path/to/my/duckdb-file",
schema_name="myschema",
)
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""A DuckDB implementation of the PyAirbyte cache. 3 4## Usage Example 5 6```python 7from airbyte as ab 8from airbyte.caches import DuckDBCache 9 10cache = DuckDBCache( 11 db_path="/path/to/my/duckdb-file", 12 schema_name="myschema", 13) 14``` 15""" 16 17from __future__ import annotations 18 19import warnings 20from typing import TYPE_CHECKING, ClassVar 21 22from airbyte_api.models import DestinationDuckdb 23from duckdb_engine import DuckDBEngineWarning 24 25from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor 26from airbyte.caches.base import CacheBase 27from airbyte.destinations._translate_cache_to_dest import duckdb_cache_to_destination_configuration 28 29 30if TYPE_CHECKING: 31 from airbyte.shared.sql_processor import SqlProcessorBase 32 33 34# Suppress warnings from DuckDB about reflection on indices. 35# https://github.com/Mause/duckdb_engine/issues/905 36warnings.filterwarnings( 37 "ignore", 38 message="duckdb-engine doesn't yet support reflection on indices", 39 category=DuckDBEngineWarning, 40) 41 42 43class DuckDBCache(DuckDBConfig, CacheBase): 44 """A DuckDB cache.""" 45 46 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor 47 48 paired_destination_name: ClassVar[str | None] = "destination-duckdb" 49 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 50 51 @property 52 def paired_destination_config(self) -> DestinationDuckdb: 53 """Return a dictionary of destination configuration values.""" 54 return duckdb_cache_to_destination_configuration(cache=self) 55 56 57# Expose the Cache class and also the Config class. 58__all__ = [ 59 "DuckDBCache", 60 "DuckDBConfig", 61]
44class DuckDBCache(DuckDBConfig, CacheBase): 45 """A DuckDB cache.""" 46 47 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor 48 49 paired_destination_name: ClassVar[str | None] = "destination-duckdb" 50 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 51 52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
A DuckDB cache.
52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
28class DuckDBConfig(SqlConfig): 29 """Configuration for DuckDB.""" 30 31 db_path: Path | str = Field() 32 """Normally db_path is a Path object. 33 34 The database name will be inferred from the file name. For example, given a `db_path` of 35 `/path/to/my/duckdb-file`, the database name is `my_db`. 36 """ 37 38 schema_name: str = Field(default="main") 39 """The name of the schema to write to. Defaults to "main".""" 40 41 @overrides 42 def get_sql_alchemy_url(self) -> SecretString: 43 """Return the SQLAlchemy URL to use.""" 44 # Suppress warnings from DuckDB about reflection on indices. 45 # https://github.com/Mause/duckdb_engine/issues/905 46 warnings.filterwarnings( 47 "ignore", 48 message="duckdb-engine doesn't yet support reflection on indices", 49 category=DuckDBEngineWarning, 50 ) 51 return SecretString(f"duckdb:///{self.db_path!s}") 52 53 @overrides 54 def get_database_name(self) -> str: 55 """Return the name of the database.""" 56 if self.db_path == ":memory:": 57 return "memory" 58 59 # Split the path on the appropriate separator ("/" or "\") 60 split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/" 61 62 # Return the file name without the extension 63 return str(self.db_path).split(sep=split_on)[-1].split(".")[0] 64 65 def _is_file_based_db(self) -> bool: 66 """Return whether the database is file-based.""" 67 if isinstance(self.db_path, Path): 68 return True 69 70 db_path_str = str(self.db_path) 71 return ( 72 ("/" in db_path_str or "\\" in db_path_str) 73 and db_path_str != ":memory:" 74 and "md:" not in db_path_str 75 and "motherduck:" not in db_path_str 76 ) 77 78 @overrides 79 def get_sql_engine(self) -> Engine: 80 """Return the SQL Alchemy engine. 81 82 This method is overridden to ensure that the database parent directory is created if it 83 doesn't exist. 84 """ 85 if self._is_file_based_db(): 86 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 87 88 return super().get_sql_engine()
Configuration for DuckDB.
Normally db_path is a Path object.
The database name will be inferred from the file name. For example, given a db_path
of
/path/to/my/duckdb-file
, the database name is my_db
.
41 @overrides 42 def get_sql_alchemy_url(self) -> SecretString: 43 """Return the SQLAlchemy URL to use.""" 44 # Suppress warnings from DuckDB about reflection on indices. 45 # https://github.com/Mause/duckdb_engine/issues/905 46 warnings.filterwarnings( 47 "ignore", 48 message="duckdb-engine doesn't yet support reflection on indices", 49 category=DuckDBEngineWarning, 50 ) 51 return SecretString(f"duckdb:///{self.db_path!s}")
Return the SQLAlchemy URL to use.
53 @overrides 54 def get_database_name(self) -> str: 55 """Return the name of the database.""" 56 if self.db_path == ":memory:": 57 return "memory" 58 59 # Split the path on the appropriate separator ("/" or "\") 60 split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/" 61 62 # Return the file name without the extension 63 return str(self.db_path).split(sep=split_on)[-1].split(".")[0]
Return the name of the database.
78 @overrides 79 def get_sql_engine(self) -> Engine: 80 """Return the SQL Alchemy engine. 81 82 This method is overridden to ensure that the database parent directory is created if it 83 doesn't exist. 84 """ 85 if self._is_file_based_db(): 86 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 87 88 return super().get_sql_engine()
Return the SQL Alchemy engine.
This method is overridden to ensure that the database parent directory is created if it doesn't exist.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- config_hash
- get_create_table_extra_clauses
- get_vendor_client