airbyte.caches.duckdb
A DuckDB implementation of the PyAirbyte cache.
Usage Example
from airbyte as ab
from airbyte.caches import DuckDBCache
cache = DuckDBCache(
db_path="/path/to/my/duckdb-file",
schema_name="myschema",
)
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""A DuckDB implementation of the PyAirbyte cache. 3 4## Usage Example 5 6```python 7from airbyte as ab 8from airbyte.caches import DuckDBCache 9 10cache = DuckDBCache( 11 db_path="/path/to/my/duckdb-file", 12 schema_name="myschema", 13) 14``` 15""" 16 17from __future__ import annotations 18 19import warnings 20from typing import TYPE_CHECKING, ClassVar 21 22from airbyte_api.models import DestinationDuckdb 23from duckdb_engine import DuckDBEngineWarning 24 25from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor 26from airbyte.caches.base import CacheBase 27from airbyte.destinations._translate_cache_to_dest import duckdb_cache_to_destination_configuration 28 29 30if TYPE_CHECKING: 31 from airbyte.shared.sql_processor import SqlProcessorBase 32 33 34# Suppress warnings from DuckDB about reflection on indices. 35# https://github.com/Mause/duckdb_engine/issues/905 36warnings.filterwarnings( 37 "ignore", 38 message="duckdb-engine doesn't yet support reflection on indices", 39 category=DuckDBEngineWarning, 40) 41 42 43class DuckDBCache(DuckDBConfig, CacheBase): 44 """A DuckDB cache.""" 45 46 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor 47 48 paired_destination_name: ClassVar[str | None] = "destination-duckdb" 49 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 50 51 @property 52 def paired_destination_config(self) -> DestinationDuckdb: 53 """Return a dictionary of destination configuration values.""" 54 return duckdb_cache_to_destination_configuration(cache=self) 55 56 57# Expose the Cache class and also the Config class. 58__all__ = [ 59 "DuckDBCache", 60 "DuckDBConfig", 61]
44class DuckDBCache(DuckDBConfig, CacheBase): 45 """A DuckDB cache.""" 46 47 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor 48 49 paired_destination_name: ClassVar[str | None] = "destination-duckdb" 50 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 51 52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
A DuckDB cache.
paired_destination_config_class: ClassVar[type | None] =
<class 'airbyte_api.models.destination_duckdb.DestinationDuckdb'>
paired_destination_config: airbyte_api.models.destination_duckdb.DestinationDuckdb
52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
class
DuckDBConfig(airbyte.shared.sql_processor.SqlConfig):
28class DuckDBConfig(SqlConfig): 29 """Configuration for DuckDB.""" 30 31 db_path: Path | str = Field() 32 """Normally db_path is a Path object. 33 34 The database name will be inferred from the file name. For example, given a `db_path` of 35 `/path/to/my/duckdb-file`, the database name is `my_db`. 36 """ 37 38 schema_name: str = Field(default="main") 39 """The name of the schema to write to. Defaults to "main".""" 40 41 @overrides 42 def get_sql_alchemy_url(self) -> SecretString: 43 """Return the SQLAlchemy URL to use.""" 44 # Suppress warnings from DuckDB about reflection on indices. 45 # https://github.com/Mause/duckdb_engine/issues/905 46 warnings.filterwarnings( 47 "ignore", 48 message="duckdb-engine doesn't yet support reflection on indices", 49 category=DuckDBEngineWarning, 50 ) 51 return SecretString(f"duckdb:///{self.db_path!s}") 52 53 @overrides 54 def get_database_name(self) -> str: 55 """Return the name of the database.""" 56 if self.db_path == ":memory:": 57 return "memory" 58 59 # Split the path on the appropriate separator ("/" or "\") 60 split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/" 61 62 # Return the file name without the extension 63 return str(self.db_path).split(sep=split_on)[-1].split(".")[0] 64 65 def _is_file_based_db(self) -> bool: 66 """Return whether the database is file-based.""" 67 if isinstance(self.db_path, Path): 68 return True 69 70 db_path_str = str(self.db_path) 71 return ( 72 ("/" in db_path_str or "\\" in db_path_str) 73 and db_path_str != ":memory:" 74 and "md:" not in db_path_str 75 and "motherduck:" not in db_path_str 76 ) 77 78 @overrides 79 def get_sql_engine(self) -> Engine: 80 """Return the SQL Alchemy engine. 81 82 This method is overridden to ensure that the database parent directory is created if it 83 doesn't exist. 84 """ 85 if self._is_file_based_db(): 86 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 87 88 return super().get_sql_engine()
Configuration for DuckDB.
db_path: pathlib.Path | str =
PydanticUndefined
Normally db_path is a Path object.
The database name will be inferred from the file name. For example, given a db_path of
/path/to/my/duckdb-file, the database name is my_db.
41 @overrides 42 def get_sql_alchemy_url(self) -> SecretString: 43 """Return the SQLAlchemy URL to use.""" 44 # Suppress warnings from DuckDB about reflection on indices. 45 # https://github.com/Mause/duckdb_engine/issues/905 46 warnings.filterwarnings( 47 "ignore", 48 message="duckdb-engine doesn't yet support reflection on indices", 49 category=DuckDBEngineWarning, 50 ) 51 return SecretString(f"duckdb:///{self.db_path!s}")
Return the SQLAlchemy URL to use.
@overrides
def
get_database_name(self) -> str:
53 @overrides 54 def get_database_name(self) -> str: 55 """Return the name of the database.""" 56 if self.db_path == ":memory:": 57 return "memory" 58 59 # Split the path on the appropriate separator ("/" or "\") 60 split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/" 61 62 # Return the file name without the extension 63 return str(self.db_path).split(sep=split_on)[-1].split(".")[0]
Return the name of the database.
@overrides
def
get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
78 @overrides 79 def get_sql_engine(self) -> Engine: 80 """Return the SQL Alchemy engine. 81 82 This method is overridden to ensure that the database parent directory is created if it 83 doesn't exist. 84 """ 85 if self._is_file_based_db(): 86 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 87 88 return super().get_sql_engine()
Return the SQL Alchemy engine.
This method is overridden to ensure that the database parent directory is created if it doesn't exist.