airbyte.caches
Base module for all caches.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base module for all caches.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING 7 8from airbyte.caches.base import CacheBase 9from airbyte.caches.bigquery import BigQueryCache 10from airbyte.caches.duckdb import DuckDBCache 11from airbyte.caches.motherduck import MotherDuckCache 12from airbyte.caches.postgres import PostgresCache 13from airbyte.caches.snowflake import SnowflakeCache 14from airbyte.caches.util import get_default_cache, new_local_cache 15 16 17# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 18if TYPE_CHECKING: 19 # ruff: noqa: TC004 20 from airbyte.caches import base, bigquery, duckdb, motherduck, postgres, snowflake, util 21 22# We export these classes for easy access: `airbyte.caches...` 23__all__ = [ 24 # Factories 25 "get_default_cache", 26 "new_local_cache", 27 # Classes 28 "BigQueryCache", 29 "CacheBase", 30 "DuckDBCache", 31 "MotherDuckCache", 32 "PostgresCache", 33 "SnowflakeCache", 34 # Submodules, 35 "util", 36 "bigquery", 37 "duckdb", 38 "motherduck", 39 "postgres", 40 "snowflake", 41 "base", 42]
27def get_default_cache() -> DuckDBCache: 28 """Get a local cache for storing data, using the default database path. 29 30 Cache files are stored in the `.cache` directory, relative to the current 31 working directory. 32 """ 33 cache_dir = Path("./.cache/default_cache") 34 return DuckDBCache( 35 db_path=cache_dir / "default_cache.duckdb", 36 cache_dir=cache_dir, 37 )
Get a local cache for storing data, using the default database path.
Cache files are stored in the .cache
directory, relative to the current
working directory.
40def new_local_cache( 41 cache_name: str | None = None, 42 cache_dir: str | Path | None = None, 43 *, 44 cleanup: bool = True, 45) -> DuckDBCache: 46 """Get a local cache for storing data, using a name string to seed the path. 47 48 Args: 49 cache_name: Name to use for the cache. Defaults to None. 50 cache_dir: Root directory to store the cache in. Defaults to None. 51 cleanup: Whether to clean up temporary files. Defaults to True. 52 53 Cache files are stored in the `.cache` directory, relative to the current 54 working directory. 55 """ 56 if cache_name: 57 if " " in cache_name: 58 raise exc.PyAirbyteInputError( 59 message="Cache name cannot contain spaces.", 60 input_value=cache_name, 61 ) 62 63 if not cache_name.replace("_", "").isalnum(): 64 raise exc.PyAirbyteInputError( 65 message="Cache name can only contain alphanumeric characters and underscores.", 66 input_value=cache_name, 67 ) 68 69 cache_name = cache_name or str(ulid.ULID()) 70 cache_dir = cache_dir or Path(f"./.cache/{cache_name}") 71 if not isinstance(cache_dir, Path): 72 cache_dir = Path(cache_dir) 73 74 return DuckDBCache( 75 db_path=cache_dir / f"db_{cache_name}.duckdb", 76 cache_dir=cache_dir, 77 cleanup=cleanup, 78 )
Get a local cache for storing data, using a name string to seed the path.
Arguments:
- cache_name: Name to use for the cache. Defaults to None.
- cache_dir: Root directory to store the cache in. Defaults to None.
- cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the .cache
directory, relative to the current
working directory.
39class BigQueryCache(BigQueryConfig, CacheBase): 40 """The BigQuery cache implementation.""" 41 42 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor 43 44 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 45 paired_destination_config_class: ClassVar[type | None] = DestinationBigquery 46 47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self) 51 52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
The BigQuery cache implementation.
47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table
.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.bigquery.BigQueryConfig
- database_name
- schema_name
- credentials_path
- dataset_location
- project_name
- dataset_name
- get_sql_alchemy_url
- get_database_name
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
43class CacheBase(SqlConfig, AirbyteWriterInterface): 44 """Base configuration for a cache. 45 46 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 47 and basic connectivity to the SQL database. 48 49 The cache is responsible for managing the state of the data synced to the cache, including the 50 stream catalog and stream state. The cache also provides the mechanism to read and write data 51 to the SQL backend specified in the `SqlConfig` class. 52 """ 53 54 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 55 """The directory to store the cache in.""" 56 57 cleanup: bool = TEMP_FILE_CLEANUP 58 """Whether to clean up the cache after use.""" 59 60 _name: str = PrivateAttr() 61 62 _sql_processor_class: ClassVar[type[SqlProcessorBase]] 63 _read_processor: SqlProcessorBase = PrivateAttr() 64 65 _catalog_backend: CatalogBackendBase = PrivateAttr() 66 _state_backend: StateBackendBase = PrivateAttr() 67 68 paired_destination_name: ClassVar[str | None] = None 69 paired_destination_config_class: ClassVar[type | None] = None 70 71 @property 72 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 73 """Return a dictionary of destination configuration values.""" 74 raise NotImplementedError( 75 f"The type '{type(self).__name__}' does not define an equivalent destination " 76 "configuration." 77 ) 78 79 def __init__(self, **data: Any) -> None: # noqa: ANN401 80 """Initialize the cache and backends.""" 81 super().__init__(**data) 82 83 # Create a temporary processor to do the work of ensuring the schema exists 84 temp_processor = self._sql_processor_class( 85 sql_config=self, 86 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 87 state_writer=StdOutStateWriter(), 88 temp_dir=self.cache_dir, 89 temp_file_cleanup=self.cleanup, 90 ) 91 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 92 93 # Initialize the catalog and state backends 94 self._catalog_backend = SqlCatalogBackend( 95 sql_config=self, 96 table_prefix=self.table_prefix or "", 97 ) 98 self._state_backend = SqlStateBackend( 99 sql_config=self, 100 table_prefix=self.table_prefix or "", 101 ) 102 103 # Now we can create the SQL read processor 104 self._read_processor = self._sql_processor_class( 105 sql_config=self, 106 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 107 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 108 temp_dir=self.cache_dir, 109 temp_file_cleanup=self.cleanup, 110 ) 111 112 @property 113 def config_hash(self) -> str | None: 114 """Return a hash of the cache configuration. 115 116 This is the same as the SQLConfig hash from the superclass. 117 """ 118 return super(SqlConfig, self).config_hash 119 120 def execute_sql(self, sql: str | list[str]) -> None: 121 """Execute one or more SQL statements against the cache's SQL backend. 122 123 If multiple SQL statements are given, they are executed in order, 124 within the same transaction. 125 126 This method is useful for creating tables, indexes, and other 127 schema objects in the cache. It does not return any results and it 128 automatically closes the connection after executing all statements. 129 130 This method is not intended for querying data. For that, use the `get_records` 131 method - or for a low-level interface, use the `get_sql_engine` method. 132 133 If any of the statements fail, the transaction is canceled and an exception 134 is raised. Most databases will rollback the transaction in this case. 135 """ 136 if isinstance(sql, str): 137 # Coerce to a list if a single string is given 138 sql = [sql] 139 140 with self.processor.get_sql_connection() as connection: 141 for sql_statement in sql: 142 connection.execute(text(sql_statement)) 143 144 @final 145 @property 146 def processor(self) -> SqlProcessorBase: 147 """Return the SQL processor instance.""" 148 return self._read_processor 149 150 def run_sql_query( 151 self, 152 sql_query: str, 153 *, 154 max_records: int | None = None, 155 ) -> list[dict[str, Any]]: 156 """Run a SQL query against the cache and return results as a list of dictionaries. 157 158 This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. 159 For DDL statements or multiple statements, use the processor directly. 160 161 Args: 162 sql_query: The SQL query to execute 163 max_records: Maximum number of records to return. If None, returns all records. 164 165 Returns: 166 List of dictionaries representing the query results 167 """ 168 # Execute the SQL within a connection context to ensure the connection stays open 169 # while we fetch the results 170 sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query 171 172 with self.processor.get_sql_connection() as conn: 173 try: 174 result = conn.execute(sql_text) 175 except ( 176 sqlalchemy_exc.ProgrammingError, 177 sqlalchemy_exc.SQLAlchemyError, 178 ) as ex: 179 msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}" 180 raise RuntimeError(msg) from ex 181 182 # Convert the result to a list of dictionaries while connection is still open 183 if result.returns_rows: 184 # Get column names 185 columns = list(result.keys()) if result.keys() else [] 186 187 # Fetch rows efficiently based on limit 188 if max_records is not None: 189 rows = result.fetchmany(max_records) 190 else: 191 rows = result.fetchall() 192 193 return [dict(zip(columns, row, strict=True)) for row in rows] 194 195 # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.) 196 return [] 197 198 def get_record_processor( 199 self, 200 source_name: str, 201 catalog_provider: CatalogProvider, 202 state_writer: StateWriterBase | None = None, 203 ) -> SqlProcessorBase: 204 """Return a record processor for the specified source name and catalog. 205 206 We first register the source and its catalog with the catalog manager. Then we create a new 207 SQL processor instance with (only) the given input catalog. 208 209 For the state writer, we use a state writer which stores state in an internal SQL table. 210 """ 211 # First register the source and catalog into durable storage. This is necessary to ensure 212 # that we can later retrieve the catalog information. 213 self.register_source( 214 source_name=source_name, 215 incoming_source_catalog=catalog_provider.configured_catalog, 216 stream_names=set(catalog_provider.stream_names), 217 ) 218 219 # Next create a new SQL processor instance with the given catalog - and a state writer 220 # that writes state to the internal SQL table and associates with the given source name. 221 return self._sql_processor_class( 222 sql_config=self, 223 catalog_provider=catalog_provider, 224 state_writer=state_writer or self.get_state_writer(source_name=source_name), 225 temp_dir=self.cache_dir, 226 temp_file_cleanup=self.cleanup, 227 ) 228 229 # Read methods: 230 231 def get_records( 232 self, 233 stream_name: str, 234 ) -> CachedDataset: 235 """Uses SQLAlchemy to select all rows from the table.""" 236 return CachedDataset(self, stream_name) 237 238 def get_pandas_dataframe( 239 self, 240 stream_name: str, 241 ) -> pd.DataFrame: 242 """Return a Pandas data frame with the stream's data.""" 243 table_name = self._read_processor.get_sql_table_name(stream_name) 244 engine = self.get_sql_engine() 245 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 246 247 def get_arrow_dataset( 248 self, 249 stream_name: str, 250 *, 251 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 252 ) -> ds.Dataset: 253 """Return an Arrow Dataset with the stream's data.""" 254 table_name = self._read_processor.get_sql_table_name(stream_name) 255 engine = self.get_sql_engine() 256 257 # Read the table in chunks to handle large tables which does not fits in memory 258 pandas_chunks = pd.read_sql_table( 259 table_name=table_name, 260 con=engine, 261 schema=self.schema_name, 262 chunksize=max_chunk_size, 263 ) 264 265 arrow_batches_list = [] 266 arrow_schema = None 267 268 for pandas_chunk in pandas_chunks: 269 if arrow_schema is None: 270 # Initialize the schema with the first chunk 271 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 272 273 # Convert each pandas chunk to an Arrow Table 274 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 275 arrow_batches_list.append(arrow_table) 276 277 return ds.dataset(arrow_batches_list) 278 279 @final 280 @property 281 def streams(self) -> dict[str, CachedDataset]: 282 """Return a temporary table name.""" 283 result = {} 284 stream_names = set(self._catalog_backend.stream_names) 285 286 for stream_name in stream_names: 287 result[stream_name] = CachedDataset(self, stream_name) 288 289 return result 290 291 @final 292 def __len__(self) -> int: 293 """Gets the number of streams.""" 294 return len(self._catalog_backend.stream_names) 295 296 @final 297 def __bool__(self) -> bool: 298 """Always True. 299 300 This is needed so that caches with zero streams are not falsey (None-like). 301 """ 302 return True 303 304 def get_state_provider( 305 self, 306 source_name: str, 307 *, 308 refresh: bool = True, 309 destination_name: str | None = None, 310 ) -> StateProviderBase: 311 """Return a state provider for the specified source name.""" 312 return self._state_backend.get_state_provider( 313 source_name=source_name, 314 table_prefix=self.table_prefix or "", 315 refresh=refresh, 316 destination_name=destination_name, 317 ) 318 319 def get_state_writer( 320 self, 321 source_name: str, 322 destination_name: str | None = None, 323 ) -> StateWriterBase: 324 """Return a state writer for the specified source name. 325 326 If syncing to the cache, `destination_name` should be `None`. 327 If syncing to a destination, `destination_name` should be the destination name. 328 """ 329 return self._state_backend.get_state_writer( 330 source_name=source_name, 331 destination_name=destination_name, 332 ) 333 334 def register_source( 335 self, 336 source_name: str, 337 incoming_source_catalog: ConfiguredAirbyteCatalog, 338 stream_names: set[str], 339 ) -> None: 340 """Register the source name and catalog.""" 341 self._catalog_backend.register_source( 342 source_name=source_name, 343 incoming_source_catalog=incoming_source_catalog, 344 incoming_stream_names=stream_names, 345 ) 346 347 def create_source_tables( 348 self, 349 source: Source, 350 streams: Literal["*"] | list[str] | None = None, 351 ) -> None: 352 """Create tables in the cache for the provided source if they do not exist already. 353 354 Tables are created based upon the Source's catalog. 355 356 Args: 357 source: The source to create tables for. 358 streams: Stream names to create tables for. If None, use the Source's selected_streams 359 or "*" if neither is set. If "*", all available streams will be used. 360 """ 361 if streams is None: 362 streams = source.get_selected_streams() or "*" 363 364 catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams)) 365 366 # Register the incoming source catalog 367 self.register_source( 368 source_name=source.name, 369 incoming_source_catalog=catalog_provider.configured_catalog, 370 stream_names=set(catalog_provider.stream_names), 371 ) 372 373 # Ensure schema exists 374 self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 375 376 # Create tables for each stream if they don't exist 377 for stream_name in catalog_provider.stream_names: 378 self.processor._ensure_final_table_exists( # noqa: SLF001 379 stream_name=stream_name, 380 create_if_missing=True, 381 ) 382 383 def __getitem__(self, stream: str) -> CachedDataset: 384 """Return a dataset by stream name.""" 385 return self.streams[stream] 386 387 def __contains__(self, stream: str) -> bool: 388 """Return whether a stream is in the cache.""" 389 return stream in (self._catalog_backend.stream_names) 390 391 def __iter__( # type: ignore [override] # Overriding Pydantic model method 392 self, 393 ) -> Iterator[tuple[str, Any]]: 394 """Iterate over the streams in the cache.""" 395 return ((name, dataset) for name, dataset in self.streams.items()) 396 397 def _write_airbyte_message_stream( 398 self, 399 stdin: IO[str] | AirbyteMessageIterator, 400 *, 401 catalog_provider: CatalogProvider, 402 write_strategy: WriteStrategy, 403 state_writer: StateWriterBase | None = None, 404 progress_tracker: ProgressTracker, 405 ) -> None: 406 """Read from the connector and write to the cache.""" 407 cache_processor = self.get_record_processor( 408 source_name=self.name, 409 catalog_provider=catalog_provider, 410 state_writer=state_writer, 411 ) 412 cache_processor.process_airbyte_messages( 413 messages=stdin, 414 write_strategy=write_strategy, 415 progress_tracker=progress_tracker, 416 ) 417 progress_tracker.log_cache_processing_complete()
Base configuration for a cache.
Caches inherit from the matching SqlConfig
class, which provides the SQL config settings
and basic connectivity to the SQL database.
The cache is responsible for managing the state of the data synced to the cache, including the
stream catalog and stream state. The cache also provides the mechanism to read and write data
to the SQL backend specified in the SqlConfig
class.
79 def __init__(self, **data: Any) -> None: # noqa: ANN401 80 """Initialize the cache and backends.""" 81 super().__init__(**data) 82 83 # Create a temporary processor to do the work of ensuring the schema exists 84 temp_processor = self._sql_processor_class( 85 sql_config=self, 86 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 87 state_writer=StdOutStateWriter(), 88 temp_dir=self.cache_dir, 89 temp_file_cleanup=self.cleanup, 90 ) 91 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 92 93 # Initialize the catalog and state backends 94 self._catalog_backend = SqlCatalogBackend( 95 sql_config=self, 96 table_prefix=self.table_prefix or "", 97 ) 98 self._state_backend = SqlStateBackend( 99 sql_config=self, 100 table_prefix=self.table_prefix or "", 101 ) 102 103 # Now we can create the SQL read processor 104 self._read_processor = self._sql_processor_class( 105 sql_config=self, 106 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 107 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 108 temp_dir=self.cache_dir, 109 temp_file_cleanup=self.cleanup, 110 )
Initialize the cache and backends.
71 @property 72 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 73 """Return a dictionary of destination configuration values.""" 74 raise NotImplementedError( 75 f"The type '{type(self).__name__}' does not define an equivalent destination " 76 "configuration." 77 )
Return a dictionary of destination configuration values.
112 @property 113 def config_hash(self) -> str | None: 114 """Return a hash of the cache configuration. 115 116 This is the same as the SQLConfig hash from the superclass. 117 """ 118 return super(SqlConfig, self).config_hash
Return a hash of the cache configuration.
This is the same as the SQLConfig hash from the superclass.
120 def execute_sql(self, sql: str | list[str]) -> None: 121 """Execute one or more SQL statements against the cache's SQL backend. 122 123 If multiple SQL statements are given, they are executed in order, 124 within the same transaction. 125 126 This method is useful for creating tables, indexes, and other 127 schema objects in the cache. It does not return any results and it 128 automatically closes the connection after executing all statements. 129 130 This method is not intended for querying data. For that, use the `get_records` 131 method - or for a low-level interface, use the `get_sql_engine` method. 132 133 If any of the statements fail, the transaction is canceled and an exception 134 is raised. Most databases will rollback the transaction in this case. 135 """ 136 if isinstance(sql, str): 137 # Coerce to a list if a single string is given 138 sql = [sql] 139 140 with self.processor.get_sql_connection() as connection: 141 for sql_statement in sql: 142 connection.execute(text(sql_statement))
Execute one or more SQL statements against the cache's SQL backend.
If multiple SQL statements are given, they are executed in order, within the same transaction.
This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.
This method is not intended for querying data. For that, use the get_records
method - or for a low-level interface, use the get_sql_engine
method.
If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.
144 @final 145 @property 146 def processor(self) -> SqlProcessorBase: 147 """Return the SQL processor instance.""" 148 return self._read_processor
Return the SQL processor instance.
150 def run_sql_query( 151 self, 152 sql_query: str, 153 *, 154 max_records: int | None = None, 155 ) -> list[dict[str, Any]]: 156 """Run a SQL query against the cache and return results as a list of dictionaries. 157 158 This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. 159 For DDL statements or multiple statements, use the processor directly. 160 161 Args: 162 sql_query: The SQL query to execute 163 max_records: Maximum number of records to return. If None, returns all records. 164 165 Returns: 166 List of dictionaries representing the query results 167 """ 168 # Execute the SQL within a connection context to ensure the connection stays open 169 # while we fetch the results 170 sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query 171 172 with self.processor.get_sql_connection() as conn: 173 try: 174 result = conn.execute(sql_text) 175 except ( 176 sqlalchemy_exc.ProgrammingError, 177 sqlalchemy_exc.SQLAlchemyError, 178 ) as ex: 179 msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}" 180 raise RuntimeError(msg) from ex 181 182 # Convert the result to a list of dictionaries while connection is still open 183 if result.returns_rows: 184 # Get column names 185 columns = list(result.keys()) if result.keys() else [] 186 187 # Fetch rows efficiently based on limit 188 if max_records is not None: 189 rows = result.fetchmany(max_records) 190 else: 191 rows = result.fetchall() 192 193 return [dict(zip(columns, row, strict=True)) for row in rows] 194 195 # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.) 196 return []
Run a SQL query against the cache and return results as a list of dictionaries.
This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. For DDL statements or multiple statements, use the processor directly.
Arguments:
- sql_query: The SQL query to execute
- max_records: Maximum number of records to return. If None, returns all records.
Returns:
List of dictionaries representing the query results
198 def get_record_processor( 199 self, 200 source_name: str, 201 catalog_provider: CatalogProvider, 202 state_writer: StateWriterBase | None = None, 203 ) -> SqlProcessorBase: 204 """Return a record processor for the specified source name and catalog. 205 206 We first register the source and its catalog with the catalog manager. Then we create a new 207 SQL processor instance with (only) the given input catalog. 208 209 For the state writer, we use a state writer which stores state in an internal SQL table. 210 """ 211 # First register the source and catalog into durable storage. This is necessary to ensure 212 # that we can later retrieve the catalog information. 213 self.register_source( 214 source_name=source_name, 215 incoming_source_catalog=catalog_provider.configured_catalog, 216 stream_names=set(catalog_provider.stream_names), 217 ) 218 219 # Next create a new SQL processor instance with the given catalog - and a state writer 220 # that writes state to the internal SQL table and associates with the given source name. 221 return self._sql_processor_class( 222 sql_config=self, 223 catalog_provider=catalog_provider, 224 state_writer=state_writer or self.get_state_writer(source_name=source_name), 225 temp_dir=self.cache_dir, 226 temp_file_cleanup=self.cleanup, 227 )
Return a record processor for the specified source name and catalog.
We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.
For the state writer, we use a state writer which stores state in an internal SQL table.
231 def get_records( 232 self, 233 stream_name: str, 234 ) -> CachedDataset: 235 """Uses SQLAlchemy to select all rows from the table.""" 236 return CachedDataset(self, stream_name)
Uses SQLAlchemy to select all rows from the table.
238 def get_pandas_dataframe( 239 self, 240 stream_name: str, 241 ) -> pd.DataFrame: 242 """Return a Pandas data frame with the stream's data.""" 243 table_name = self._read_processor.get_sql_table_name(stream_name) 244 engine = self.get_sql_engine() 245 return pd.read_sql_table(table_name, engine, schema=self.schema_name)
Return a Pandas data frame with the stream's data.
247 def get_arrow_dataset( 248 self, 249 stream_name: str, 250 *, 251 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 252 ) -> ds.Dataset: 253 """Return an Arrow Dataset with the stream's data.""" 254 table_name = self._read_processor.get_sql_table_name(stream_name) 255 engine = self.get_sql_engine() 256 257 # Read the table in chunks to handle large tables which does not fits in memory 258 pandas_chunks = pd.read_sql_table( 259 table_name=table_name, 260 con=engine, 261 schema=self.schema_name, 262 chunksize=max_chunk_size, 263 ) 264 265 arrow_batches_list = [] 266 arrow_schema = None 267 268 for pandas_chunk in pandas_chunks: 269 if arrow_schema is None: 270 # Initialize the schema with the first chunk 271 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 272 273 # Convert each pandas chunk to an Arrow Table 274 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 275 arrow_batches_list.append(arrow_table) 276 277 return ds.dataset(arrow_batches_list)
Return an Arrow Dataset with the stream's data.
279 @final 280 @property 281 def streams(self) -> dict[str, CachedDataset]: 282 """Return a temporary table name.""" 283 result = {} 284 stream_names = set(self._catalog_backend.stream_names) 285 286 for stream_name in stream_names: 287 result[stream_name] = CachedDataset(self, stream_name) 288 289 return result
Return a temporary table name.
304 def get_state_provider( 305 self, 306 source_name: str, 307 *, 308 refresh: bool = True, 309 destination_name: str | None = None, 310 ) -> StateProviderBase: 311 """Return a state provider for the specified source name.""" 312 return self._state_backend.get_state_provider( 313 source_name=source_name, 314 table_prefix=self.table_prefix or "", 315 refresh=refresh, 316 destination_name=destination_name, 317 )
Return a state provider for the specified source name.
319 def get_state_writer( 320 self, 321 source_name: str, 322 destination_name: str | None = None, 323 ) -> StateWriterBase: 324 """Return a state writer for the specified source name. 325 326 If syncing to the cache, `destination_name` should be `None`. 327 If syncing to a destination, `destination_name` should be the destination name. 328 """ 329 return self._state_backend.get_state_writer( 330 source_name=source_name, 331 destination_name=destination_name, 332 )
Return a state writer for the specified source name.
If syncing to the cache, destination_name
should be None
.
If syncing to a destination, destination_name
should be the destination name.
334 def register_source( 335 self, 336 source_name: str, 337 incoming_source_catalog: ConfiguredAirbyteCatalog, 338 stream_names: set[str], 339 ) -> None: 340 """Register the source name and catalog.""" 341 self._catalog_backend.register_source( 342 source_name=source_name, 343 incoming_source_catalog=incoming_source_catalog, 344 incoming_stream_names=stream_names, 345 )
Register the source name and catalog.
347 def create_source_tables( 348 self, 349 source: Source, 350 streams: Literal["*"] | list[str] | None = None, 351 ) -> None: 352 """Create tables in the cache for the provided source if they do not exist already. 353 354 Tables are created based upon the Source's catalog. 355 356 Args: 357 source: The source to create tables for. 358 streams: Stream names to create tables for. If None, use the Source's selected_streams 359 or "*" if neither is set. If "*", all available streams will be used. 360 """ 361 if streams is None: 362 streams = source.get_selected_streams() or "*" 363 364 catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams)) 365 366 # Register the incoming source catalog 367 self.register_source( 368 source_name=source.name, 369 incoming_source_catalog=catalog_provider.configured_catalog, 370 stream_names=set(catalog_provider.stream_names), 371 ) 372 373 # Ensure schema exists 374 self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 375 376 # Create tables for each stream if they don't exist 377 for stream_name in catalog_provider.stream_names: 378 self.processor._ensure_final_table_exists( # noqa: SLF001 379 stream_name=stream_name, 380 create_if_missing=True, 381 )
Create tables in the cache for the provided source if they do not exist already.
Tables are created based upon the Source's catalog.
Arguments:
- source: The source to create tables for.
- streams: Stream names to create tables for. If None, use the Source's selected_streams or "" if neither is set. If "", all available streams will be used.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
328def init_private_attributes(self: BaseModel, context: Any, /) -> None: 329 """This function is meant to behave like a BaseModel method to initialise private attributes. 330 331 It takes context as an argument since that's what pydantic-core passes when calling it. 332 333 Args: 334 self: The BaseModel instance. 335 context: The context. 336 """ 337 if getattr(self, '__pydantic_private__', None) is None: 338 pydantic_private = {} 339 for name, private_attr in self.__private_attributes__.items(): 340 default = private_attr.get_default() 341 if default is not PydanticUndefined: 342 pydantic_private[name] = default 343 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_sql_alchemy_url
- get_database_name
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
44class DuckDBCache(DuckDBConfig, CacheBase): 45 """A DuckDB cache.""" 46 47 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor 48 49 paired_destination_name: ClassVar[str | None] = "destination-duckdb" 50 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 51 52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
A DuckDB cache.
52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.duckdb.DuckDBConfig
- db_path
- schema_name
- get_sql_alchemy_url
- get_database_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
72class MotherDuckCache(MotherDuckConfig, DuckDBCache): 73 """Cache that uses MotherDuck for external persistent storage.""" 74 75 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor 76 77 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 78 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 79 80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Cache that uses MotherDuck for external persistent storage.
80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte.caches.motherduck.MotherDuckConfig
- database
- api_key
- db_path
- get_sql_alchemy_url
- get_database_name
- airbyte._processors.sql.duckdb.DuckDBConfig
- schema_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
38class PostgresCache(PostgresConfig, CacheBase): 39 """Configuration for the Postgres cache. 40 41 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 42 """ 43 44 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor 45 46 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 47 paired_destination_config_class: ClassVar[type | None] = DestinationPostgres 48 49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self) 53 54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Configuration for the Postgres cache.
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Return a DestinationPostgres instance with the same configuration.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.postgres.PostgresConfig
- host
- port
- database
- username
- password
- get_sql_alchemy_url
- get_database_name
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
75class SnowflakeCache(SnowflakeConfig, CacheBase): 76 """Configuration for the Snowflake cache.""" 77 78 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 79 80 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 81 82 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 83 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 84 85 @property 86 def paired_destination_config(self) -> DestinationSnowflake: 87 """Return a dictionary of destination configuration values.""" 88 return snowflake_cache_to_destination_configuration(cache=self)
Configuration for the Snowflake cache.
85 @property 86 def paired_destination_config(self) -> DestinationSnowflake: 87 """Return a dictionary of destination configuration values.""" 88 return snowflake_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.snowflake.SnowflakeConfig
- account
- username
- password
- private_key
- private_key_path
- private_key_passphrase
- warehouse
- database
- role
- schema_name
- data_retention_time_in_days
- get_sql_alchemy_connect_args
- get_create_table_extra_clauses
- get_database_name
- get_sql_alchemy_url
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name