airbyte.caches.duckdb
A DuckDB implementation of the PyAirbyte cache.
Usage Example
from airbyte as ab
from airbyte.caches import DuckDBCache
cache = DuckDBCache(
db_path="/path/to/my/duckdb-file",
schema_name="myschema",
)
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""A DuckDB implementation of the PyAirbyte cache. 3 4## Usage Example 5 6```python 7from airbyte as ab 8from airbyte.caches import DuckDBCache 9 10cache = DuckDBCache( 11 db_path="/path/to/my/duckdb-file", 12 schema_name="myschema", 13) 14``` 15""" 16 17from __future__ import annotations 18 19import warnings 20 21from duckdb_engine import DuckDBEngineWarning 22from pydantic import PrivateAttr 23 24from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor 25from airbyte.caches.base import CacheBase 26 27 28# Suppress warnings from DuckDB about reflection on indices. 29# https://github.com/Mause/duckdb_engine/issues/905 30warnings.filterwarnings( 31 "ignore", 32 message="duckdb-engine doesn't yet support reflection on indices", 33 category=DuckDBEngineWarning, 34) 35 36 37class DuckDBCache(DuckDBConfig, CacheBase): 38 """A DuckDB cache.""" 39 40 _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor) 41 42 43# Expose the Cache class and also the Config class. 44__all__ = [ 45 "DuckDBCache", 46 "DuckDBConfig", 47]
38class DuckDBCache(DuckDBConfig, CacheBase): 39 """A DuckDB cache.""" 40 41 _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)
A DuckDB cache.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
28class DuckDBConfig(SqlConfig): 29 """Configuration for DuckDB.""" 30 31 db_path: Path | str = Field() 32 """Normally db_path is a Path object. 33 34 The database name will be inferred from the file name. For example, given a `db_path` of 35 `/path/to/my/duckdb-file`, the database name is `my_db`. 36 """ 37 38 schema_name: str = Field(default="main") 39 """The name of the schema to write to. Defaults to "main".""" 40 41 @overrides 42 def get_sql_alchemy_url(self) -> SecretString: 43 """Return the SQLAlchemy URL to use.""" 44 # Suppress warnings from DuckDB about reflection on indices. 45 # https://github.com/Mause/duckdb_engine/issues/905 46 warnings.filterwarnings( 47 "ignore", 48 message="duckdb-engine doesn't yet support reflection on indices", 49 category=DuckDBEngineWarning, 50 ) 51 return SecretString(f"duckdb:///{self.db_path!s}") 52 53 @overrides 54 def get_database_name(self) -> str: 55 """Return the name of the database.""" 56 if self.db_path == ":memory:": 57 return "memory" 58 59 # Split the path on the appropriate separator ("/" or "\") 60 split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/" 61 62 # Return the file name without the extension 63 return str(self.db_path).split(sep=split_on)[-1].split(".")[0] 64 65 def _is_file_based_db(self) -> bool: 66 """Return whether the database is file-based.""" 67 if isinstance(self.db_path, Path): 68 return True 69 70 db_path_str = str(self.db_path) 71 return ( 72 ("/" in db_path_str or "\\" in db_path_str) 73 and db_path_str != ":memory:" 74 and "md:" not in db_path_str 75 and "motherduck:" not in db_path_str 76 ) 77 78 @overrides 79 def get_sql_engine(self) -> Engine: 80 """Return the SQL Alchemy engine. 81 82 This method is overridden to ensure that the database parent directory is created if it 83 doesn't exist. 84 """ 85 if self._is_file_based_db(): 86 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 87 88 return super().get_sql_engine()
Configuration for DuckDB.
Normally db_path is a Path object.
The database name will be inferred from the file name. For example, given a db_path
of
/path/to/my/duckdb-file
, the database name is my_db
.
41 @overrides 42 def get_sql_alchemy_url(self) -> SecretString: 43 """Return the SQLAlchemy URL to use.""" 44 # Suppress warnings from DuckDB about reflection on indices. 45 # https://github.com/Mause/duckdb_engine/issues/905 46 warnings.filterwarnings( 47 "ignore", 48 message="duckdb-engine doesn't yet support reflection on indices", 49 category=DuckDBEngineWarning, 50 ) 51 return SecretString(f"duckdb:///{self.db_path!s}")
Return the SQLAlchemy URL to use.
53 @overrides 54 def get_database_name(self) -> str: 55 """Return the name of the database.""" 56 if self.db_path == ":memory:": 57 return "memory" 58 59 # Split the path on the appropriate separator ("/" or "\") 60 split_on: Literal["/", "\\"] = "\\" if "\\" in str(self.db_path) else "/" 61 62 # Return the file name without the extension 63 return str(self.db_path).split(sep=split_on)[-1].split(".")[0]
Return the name of the database.
78 @overrides 79 def get_sql_engine(self) -> Engine: 80 """Return the SQL Alchemy engine. 81 82 This method is overridden to ensure that the database parent directory is created if it 83 doesn't exist. 84 """ 85 if self._is_file_based_db(): 86 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 87 88 return super().get_sql_engine()
Return the SQL Alchemy engine.
This method is overridden to ensure that the database parent directory is created if it doesn't exist.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- config_hash
- get_create_table_extra_clauses
- get_vendor_client