airbyte.caches.base
SQL Cache implementation.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""SQL Cache implementation.""" 3 4from __future__ import annotations 5 6import contextlib 7from pathlib import Path 8from typing import IO, TYPE_CHECKING, Any, ClassVar, Literal, final 9 10import pandas as pd 11import pyarrow as pa 12import pyarrow.dataset as ds 13from pydantic import Field, PrivateAttr 14from sqlalchemy import exc as sqlalchemy_exc 15from sqlalchemy import text 16from typing_extensions import Self 17 18from airbyte_protocol.models import ConfiguredAirbyteCatalog 19 20from airbyte import constants 21from airbyte._writers.base import AirbyteWriterInterface 22from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend 23from airbyte.caches._state_backend import SqlStateBackend 24from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP 25from airbyte.datasets._sql import CachedDataset 26from airbyte.shared.catalog_providers import CatalogProvider 27from airbyte.shared.sql_processor import SqlConfig 28from airbyte.shared.state_writers import StdOutStateWriter 29 30 31if TYPE_CHECKING: 32 from collections.abc import Iterator 33 from types import TracebackType 34 35 from airbyte._message_iterators import AirbyteMessageIterator 36 from airbyte.caches._state_backend_base import StateBackendBase 37 from airbyte.progress import ProgressTracker 38 from airbyte.shared.sql_processor import SqlProcessorBase 39 from airbyte.shared.state_providers import StateProviderBase 40 from airbyte.shared.state_writers import StateWriterBase 41 from airbyte.sources.base import Source 42 from airbyte.strategies import WriteStrategy 43 44 45class CacheBase(SqlConfig, AirbyteWriterInterface): # noqa: PLR0904 46 """Base configuration for a cache. 47 48 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 49 and basic connectivity to the SQL database. 50 51 The cache is responsible for managing the state of the data synced to the cache, including the 52 stream catalog and stream state. The cache also provides the mechanism to read and write data 53 to the SQL backend specified in the `SqlConfig` class. 54 """ 55 56 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 57 """The directory to store the cache in.""" 58 59 cleanup: bool = TEMP_FILE_CLEANUP 60 """Whether to clean up the cache after use.""" 61 62 _name: str = PrivateAttr() 63 64 _sql_processor_class: ClassVar[type[SqlProcessorBase]] 65 _read_processor: SqlProcessorBase = PrivateAttr() 66 67 _catalog_backend: CatalogBackendBase = PrivateAttr() 68 _state_backend: StateBackendBase = PrivateAttr() 69 70 paired_destination_name: ClassVar[str | None] = None 71 paired_destination_config_class: ClassVar[type | None] = None 72 73 @property 74 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 75 """Return a dictionary of destination configuration values.""" 76 raise NotImplementedError( 77 f"The type '{type(self).__name__}' does not define an equivalent destination " 78 "configuration." 79 ) 80 81 def __init__(self, **data: Any) -> None: # noqa: ANN401 82 """Initialize the cache and backends.""" 83 super().__init__(**data) 84 85 # Create a temporary processor to do the work of ensuring the schema exists 86 temp_processor = self._sql_processor_class( 87 sql_config=self, 88 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 89 state_writer=StdOutStateWriter(), 90 temp_dir=self.cache_dir, 91 temp_file_cleanup=self.cleanup, 92 ) 93 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 94 95 # Initialize the catalog and state backends 96 self._catalog_backend = SqlCatalogBackend( 97 sql_config=self, 98 table_prefix=self.table_prefix or "", 99 ) 100 self._state_backend = SqlStateBackend( 101 sql_config=self, 102 table_prefix=self.table_prefix or "", 103 ) 104 105 # Now we can create the SQL read processor 106 self._read_processor = self._sql_processor_class( 107 sql_config=self, 108 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 109 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 110 temp_dir=self.cache_dir, 111 temp_file_cleanup=self.cleanup, 112 ) 113 114 def close(self) -> None: 115 """Close all database connections and dispose of connection pools. 116 117 This method ensures that all SQLAlchemy engines created by this cache 118 and its processors are properly disposed, releasing all database connections. 119 This is especially important for file-based databases like DuckDB, which 120 lock the database file until all connections are closed. 121 122 This method is idempotent and can be called multiple times safely. 123 124 Raises: 125 Exception: If any engine disposal fails, the exception will propagate 126 to the caller. This ensures callers are aware of cleanup failures. 127 """ 128 if self._read_processor is not None: 129 self._read_processor.sql_config.dispose_engine() 130 131 if self._catalog_backend is not None: 132 self._catalog_backend._sql_config.dispose_engine() # noqa: SLF001 133 134 if self._state_backend is not None: 135 self._state_backend._sql_config.dispose_engine() # noqa: SLF001 136 137 self.dispose_engine() 138 139 def __enter__(self) -> Self: 140 """Enter context manager.""" 141 return self 142 143 def __exit__( 144 self, 145 exc_type: type[BaseException] | None, 146 exc_val: BaseException | None, 147 exc_tb: TracebackType | None, 148 ) -> None: 149 """Exit context manager and clean up resources.""" 150 self.close() 151 152 def __del__(self) -> None: 153 """Clean up resources when cache is garbage collected.""" 154 with contextlib.suppress(Exception): 155 self.close() 156 157 @property 158 def config_hash(self) -> str | None: 159 """Return a hash of the cache configuration. 160 161 This is the same as the SQLConfig hash from the superclass. 162 """ 163 return super(SqlConfig, self).config_hash 164 165 def execute_sql(self, sql: str | list[str]) -> None: 166 """Execute one or more SQL statements against the cache's SQL backend. 167 168 If multiple SQL statements are given, they are executed in order, 169 within the same transaction. 170 171 This method is useful for creating tables, indexes, and other 172 schema objects in the cache. It does not return any results and it 173 automatically closes the connection after executing all statements. 174 175 This method is not intended for querying data. For that, use the `get_records` 176 method - or for a low-level interface, use the `get_sql_engine` method. 177 178 If any of the statements fail, the transaction is canceled and an exception 179 is raised. Most databases will rollback the transaction in this case. 180 """ 181 if isinstance(sql, str): 182 # Coerce to a list if a single string is given 183 sql = [sql] 184 185 with self.processor.get_sql_connection() as connection: 186 for sql_statement in sql: 187 connection.execute(text(sql_statement)) 188 189 @final 190 @property 191 def processor(self) -> SqlProcessorBase: 192 """Return the SQL processor instance.""" 193 return self._read_processor 194 195 def run_sql_query( 196 self, 197 sql_query: str, 198 *, 199 max_records: int | None = None, 200 ) -> list[dict[str, Any]]: 201 """Run a SQL query against the cache and return results as a list of dictionaries. 202 203 This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. 204 For DDL statements or multiple statements, use the processor directly. 205 206 Args: 207 sql_query: The SQL query to execute 208 max_records: Maximum number of records to return. If None, returns all records. 209 210 Returns: 211 List of dictionaries representing the query results 212 """ 213 # Execute the SQL within a connection context to ensure the connection stays open 214 # while we fetch the results 215 sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query 216 217 with self.processor.get_sql_connection() as conn: 218 try: 219 result = conn.execute(sql_text) 220 except ( 221 sqlalchemy_exc.ProgrammingError, 222 sqlalchemy_exc.SQLAlchemyError, 223 ) as ex: 224 msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}" 225 raise RuntimeError(msg) from ex 226 227 # Convert the result to a list of dictionaries while connection is still open 228 if result.returns_rows: 229 # Get column names 230 columns = list(result.keys()) if result.keys() else [] 231 232 # Fetch rows efficiently based on limit 233 if max_records is not None: 234 rows = result.fetchmany(max_records) 235 else: 236 rows = result.fetchall() 237 238 return [dict(zip(columns, row, strict=True)) for row in rows] 239 240 # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.) 241 return [] 242 243 def get_record_processor( 244 self, 245 source_name: str, 246 catalog_provider: CatalogProvider, 247 state_writer: StateWriterBase | None = None, 248 ) -> SqlProcessorBase: 249 """Return a record processor for the specified source name and catalog. 250 251 We first register the source and its catalog with the catalog manager. Then we create a new 252 SQL processor instance with (only) the given input catalog. 253 254 For the state writer, we use a state writer which stores state in an internal SQL table. 255 """ 256 # First register the source and catalog into durable storage. This is necessary to ensure 257 # that we can later retrieve the catalog information. 258 self.register_source( 259 source_name=source_name, 260 incoming_source_catalog=catalog_provider.configured_catalog, 261 stream_names=set(catalog_provider.stream_names), 262 ) 263 264 # Next create a new SQL processor instance with the given catalog - and a state writer 265 # that writes state to the internal SQL table and associates with the given source name. 266 return self._sql_processor_class( 267 sql_config=self, 268 catalog_provider=catalog_provider, 269 state_writer=state_writer or self.get_state_writer(source_name=source_name), 270 temp_dir=self.cache_dir, 271 temp_file_cleanup=self.cleanup, 272 ) 273 274 # Read methods: 275 276 def get_records( 277 self, 278 stream_name: str, 279 ) -> CachedDataset: 280 """Uses SQLAlchemy to select all rows from the table.""" 281 return CachedDataset(self, stream_name) 282 283 def get_pandas_dataframe( 284 self, 285 stream_name: str, 286 ) -> pd.DataFrame: 287 """Return a Pandas data frame with the stream's data.""" 288 table_name = self._read_processor.get_sql_table_name(stream_name) 289 engine = self.get_sql_engine() 290 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 291 292 def get_arrow_dataset( 293 self, 294 stream_name: str, 295 *, 296 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 297 ) -> ds.Dataset: 298 """Return an Arrow Dataset with the stream's data.""" 299 table_name = self._read_processor.get_sql_table_name(stream_name) 300 engine = self.get_sql_engine() 301 302 # Read the table in chunks to handle large tables which does not fits in memory 303 pandas_chunks = pd.read_sql_table( 304 table_name=table_name, 305 con=engine, 306 schema=self.schema_name, 307 chunksize=max_chunk_size, 308 ) 309 310 arrow_batches_list = [] 311 arrow_schema = None 312 313 for pandas_chunk in pandas_chunks: 314 if arrow_schema is None: 315 # Initialize the schema with the first chunk 316 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 317 318 # Convert each pandas chunk to an Arrow Table 319 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 320 arrow_batches_list.append(arrow_table) 321 322 return ds.dataset(arrow_batches_list) 323 324 @final 325 @property 326 def streams(self) -> dict[str, CachedDataset]: 327 """Return a temporary table name.""" 328 result = {} 329 stream_names = set(self._catalog_backend.stream_names) 330 331 for stream_name in stream_names: 332 result[stream_name] = CachedDataset(self, stream_name) 333 334 return result 335 336 @final 337 def __len__(self) -> int: 338 """Gets the number of streams.""" 339 return len(self._catalog_backend.stream_names) 340 341 @final 342 def __bool__(self) -> bool: 343 """Always True. 344 345 This is needed so that caches with zero streams are not falsey (None-like). 346 """ 347 return True 348 349 def get_state_provider( 350 self, 351 source_name: str, 352 *, 353 refresh: bool = True, 354 destination_name: str | None = None, 355 ) -> StateProviderBase: 356 """Return a state provider for the specified source name.""" 357 return self._state_backend.get_state_provider( 358 source_name=source_name, 359 table_prefix=self.table_prefix or "", 360 refresh=refresh, 361 destination_name=destination_name, 362 ) 363 364 def get_state_writer( 365 self, 366 source_name: str, 367 destination_name: str | None = None, 368 ) -> StateWriterBase: 369 """Return a state writer for the specified source name. 370 371 If syncing to the cache, `destination_name` should be `None`. 372 If syncing to a destination, `destination_name` should be the destination name. 373 """ 374 return self._state_backend.get_state_writer( 375 source_name=source_name, 376 destination_name=destination_name, 377 ) 378 379 def register_source( 380 self, 381 source_name: str, 382 incoming_source_catalog: ConfiguredAirbyteCatalog, 383 stream_names: set[str], 384 ) -> None: 385 """Register the source name and catalog.""" 386 self._catalog_backend.register_source( 387 source_name=source_name, 388 incoming_source_catalog=incoming_source_catalog, 389 incoming_stream_names=stream_names, 390 ) 391 392 def create_source_tables( 393 self, 394 source: Source, 395 streams: Literal["*"] | list[str] | None = None, 396 ) -> None: 397 """Create tables in the cache for the provided source if they do not exist already. 398 399 Tables are created based upon the Source's catalog. 400 401 Args: 402 source: The source to create tables for. 403 streams: Stream names to create tables for. If None, use the Source's selected_streams 404 or "*" if neither is set. If "*", all available streams will be used. 405 """ 406 if streams is None: 407 streams = source.get_selected_streams() or "*" 408 409 catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams)) 410 411 # Register the incoming source catalog 412 self.register_source( 413 source_name=source.name, 414 incoming_source_catalog=catalog_provider.configured_catalog, 415 stream_names=set(catalog_provider.stream_names), 416 ) 417 418 # Ensure schema exists 419 self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 420 421 # Create tables for each stream if they don't exist 422 for stream_name in catalog_provider.stream_names: 423 self.processor._ensure_final_table_exists( # noqa: SLF001 424 stream_name=stream_name, 425 create_if_missing=True, 426 ) 427 428 def __getitem__(self, stream: str) -> CachedDataset: 429 """Return a dataset by stream name.""" 430 return self.streams[stream] 431 432 def __contains__(self, stream: str) -> bool: 433 """Return whether a stream is in the cache.""" 434 return stream in (self._catalog_backend.stream_names) 435 436 def __iter__( # type: ignore [override] # Overriding Pydantic model method 437 self, 438 ) -> Iterator[tuple[str, Any]]: 439 """Iterate over the streams in the cache.""" 440 return ((name, dataset) for name, dataset in self.streams.items()) 441 442 def _write_airbyte_message_stream( 443 self, 444 stdin: IO[str] | AirbyteMessageIterator, 445 *, 446 catalog_provider: CatalogProvider, 447 write_strategy: WriteStrategy, 448 state_writer: StateWriterBase | None = None, 449 progress_tracker: ProgressTracker, 450 ) -> None: 451 """Read from the connector and write to the cache.""" 452 cache_processor = self.get_record_processor( 453 source_name=self.name, 454 catalog_provider=catalog_provider, 455 state_writer=state_writer, 456 ) 457 cache_processor.process_airbyte_messages( 458 messages=stdin, 459 write_strategy=write_strategy, 460 progress_tracker=progress_tracker, 461 ) 462 progress_tracker.log_cache_processing_complete()
46class CacheBase(SqlConfig, AirbyteWriterInterface): # noqa: PLR0904 47 """Base configuration for a cache. 48 49 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 50 and basic connectivity to the SQL database. 51 52 The cache is responsible for managing the state of the data synced to the cache, including the 53 stream catalog and stream state. The cache also provides the mechanism to read and write data 54 to the SQL backend specified in the `SqlConfig` class. 55 """ 56 57 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 58 """The directory to store the cache in.""" 59 60 cleanup: bool = TEMP_FILE_CLEANUP 61 """Whether to clean up the cache after use.""" 62 63 _name: str = PrivateAttr() 64 65 _sql_processor_class: ClassVar[type[SqlProcessorBase]] 66 _read_processor: SqlProcessorBase = PrivateAttr() 67 68 _catalog_backend: CatalogBackendBase = PrivateAttr() 69 _state_backend: StateBackendBase = PrivateAttr() 70 71 paired_destination_name: ClassVar[str | None] = None 72 paired_destination_config_class: ClassVar[type | None] = None 73 74 @property 75 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 76 """Return a dictionary of destination configuration values.""" 77 raise NotImplementedError( 78 f"The type '{type(self).__name__}' does not define an equivalent destination " 79 "configuration." 80 ) 81 82 def __init__(self, **data: Any) -> None: # noqa: ANN401 83 """Initialize the cache and backends.""" 84 super().__init__(**data) 85 86 # Create a temporary processor to do the work of ensuring the schema exists 87 temp_processor = self._sql_processor_class( 88 sql_config=self, 89 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 90 state_writer=StdOutStateWriter(), 91 temp_dir=self.cache_dir, 92 temp_file_cleanup=self.cleanup, 93 ) 94 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 95 96 # Initialize the catalog and state backends 97 self._catalog_backend = SqlCatalogBackend( 98 sql_config=self, 99 table_prefix=self.table_prefix or "", 100 ) 101 self._state_backend = SqlStateBackend( 102 sql_config=self, 103 table_prefix=self.table_prefix or "", 104 ) 105 106 # Now we can create the SQL read processor 107 self._read_processor = self._sql_processor_class( 108 sql_config=self, 109 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 110 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 111 temp_dir=self.cache_dir, 112 temp_file_cleanup=self.cleanup, 113 ) 114 115 def close(self) -> None: 116 """Close all database connections and dispose of connection pools. 117 118 This method ensures that all SQLAlchemy engines created by this cache 119 and its processors are properly disposed, releasing all database connections. 120 This is especially important for file-based databases like DuckDB, which 121 lock the database file until all connections are closed. 122 123 This method is idempotent and can be called multiple times safely. 124 125 Raises: 126 Exception: If any engine disposal fails, the exception will propagate 127 to the caller. This ensures callers are aware of cleanup failures. 128 """ 129 if self._read_processor is not None: 130 self._read_processor.sql_config.dispose_engine() 131 132 if self._catalog_backend is not None: 133 self._catalog_backend._sql_config.dispose_engine() # noqa: SLF001 134 135 if self._state_backend is not None: 136 self._state_backend._sql_config.dispose_engine() # noqa: SLF001 137 138 self.dispose_engine() 139 140 def __enter__(self) -> Self: 141 """Enter context manager.""" 142 return self 143 144 def __exit__( 145 self, 146 exc_type: type[BaseException] | None, 147 exc_val: BaseException | None, 148 exc_tb: TracebackType | None, 149 ) -> None: 150 """Exit context manager and clean up resources.""" 151 self.close() 152 153 def __del__(self) -> None: 154 """Clean up resources when cache is garbage collected.""" 155 with contextlib.suppress(Exception): 156 self.close() 157 158 @property 159 def config_hash(self) -> str | None: 160 """Return a hash of the cache configuration. 161 162 This is the same as the SQLConfig hash from the superclass. 163 """ 164 return super(SqlConfig, self).config_hash 165 166 def execute_sql(self, sql: str | list[str]) -> None: 167 """Execute one or more SQL statements against the cache's SQL backend. 168 169 If multiple SQL statements are given, they are executed in order, 170 within the same transaction. 171 172 This method is useful for creating tables, indexes, and other 173 schema objects in the cache. It does not return any results and it 174 automatically closes the connection after executing all statements. 175 176 This method is not intended for querying data. For that, use the `get_records` 177 method - or for a low-level interface, use the `get_sql_engine` method. 178 179 If any of the statements fail, the transaction is canceled and an exception 180 is raised. Most databases will rollback the transaction in this case. 181 """ 182 if isinstance(sql, str): 183 # Coerce to a list if a single string is given 184 sql = [sql] 185 186 with self.processor.get_sql_connection() as connection: 187 for sql_statement in sql: 188 connection.execute(text(sql_statement)) 189 190 @final 191 @property 192 def processor(self) -> SqlProcessorBase: 193 """Return the SQL processor instance.""" 194 return self._read_processor 195 196 def run_sql_query( 197 self, 198 sql_query: str, 199 *, 200 max_records: int | None = None, 201 ) -> list[dict[str, Any]]: 202 """Run a SQL query against the cache and return results as a list of dictionaries. 203 204 This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. 205 For DDL statements or multiple statements, use the processor directly. 206 207 Args: 208 sql_query: The SQL query to execute 209 max_records: Maximum number of records to return. If None, returns all records. 210 211 Returns: 212 List of dictionaries representing the query results 213 """ 214 # Execute the SQL within a connection context to ensure the connection stays open 215 # while we fetch the results 216 sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query 217 218 with self.processor.get_sql_connection() as conn: 219 try: 220 result = conn.execute(sql_text) 221 except ( 222 sqlalchemy_exc.ProgrammingError, 223 sqlalchemy_exc.SQLAlchemyError, 224 ) as ex: 225 msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}" 226 raise RuntimeError(msg) from ex 227 228 # Convert the result to a list of dictionaries while connection is still open 229 if result.returns_rows: 230 # Get column names 231 columns = list(result.keys()) if result.keys() else [] 232 233 # Fetch rows efficiently based on limit 234 if max_records is not None: 235 rows = result.fetchmany(max_records) 236 else: 237 rows = result.fetchall() 238 239 return [dict(zip(columns, row, strict=True)) for row in rows] 240 241 # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.) 242 return [] 243 244 def get_record_processor( 245 self, 246 source_name: str, 247 catalog_provider: CatalogProvider, 248 state_writer: StateWriterBase | None = None, 249 ) -> SqlProcessorBase: 250 """Return a record processor for the specified source name and catalog. 251 252 We first register the source and its catalog with the catalog manager. Then we create a new 253 SQL processor instance with (only) the given input catalog. 254 255 For the state writer, we use a state writer which stores state in an internal SQL table. 256 """ 257 # First register the source and catalog into durable storage. This is necessary to ensure 258 # that we can later retrieve the catalog information. 259 self.register_source( 260 source_name=source_name, 261 incoming_source_catalog=catalog_provider.configured_catalog, 262 stream_names=set(catalog_provider.stream_names), 263 ) 264 265 # Next create a new SQL processor instance with the given catalog - and a state writer 266 # that writes state to the internal SQL table and associates with the given source name. 267 return self._sql_processor_class( 268 sql_config=self, 269 catalog_provider=catalog_provider, 270 state_writer=state_writer or self.get_state_writer(source_name=source_name), 271 temp_dir=self.cache_dir, 272 temp_file_cleanup=self.cleanup, 273 ) 274 275 # Read methods: 276 277 def get_records( 278 self, 279 stream_name: str, 280 ) -> CachedDataset: 281 """Uses SQLAlchemy to select all rows from the table.""" 282 return CachedDataset(self, stream_name) 283 284 def get_pandas_dataframe( 285 self, 286 stream_name: str, 287 ) -> pd.DataFrame: 288 """Return a Pandas data frame with the stream's data.""" 289 table_name = self._read_processor.get_sql_table_name(stream_name) 290 engine = self.get_sql_engine() 291 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 292 293 def get_arrow_dataset( 294 self, 295 stream_name: str, 296 *, 297 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 298 ) -> ds.Dataset: 299 """Return an Arrow Dataset with the stream's data.""" 300 table_name = self._read_processor.get_sql_table_name(stream_name) 301 engine = self.get_sql_engine() 302 303 # Read the table in chunks to handle large tables which does not fits in memory 304 pandas_chunks = pd.read_sql_table( 305 table_name=table_name, 306 con=engine, 307 schema=self.schema_name, 308 chunksize=max_chunk_size, 309 ) 310 311 arrow_batches_list = [] 312 arrow_schema = None 313 314 for pandas_chunk in pandas_chunks: 315 if arrow_schema is None: 316 # Initialize the schema with the first chunk 317 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 318 319 # Convert each pandas chunk to an Arrow Table 320 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 321 arrow_batches_list.append(arrow_table) 322 323 return ds.dataset(arrow_batches_list) 324 325 @final 326 @property 327 def streams(self) -> dict[str, CachedDataset]: 328 """Return a temporary table name.""" 329 result = {} 330 stream_names = set(self._catalog_backend.stream_names) 331 332 for stream_name in stream_names: 333 result[stream_name] = CachedDataset(self, stream_name) 334 335 return result 336 337 @final 338 def __len__(self) -> int: 339 """Gets the number of streams.""" 340 return len(self._catalog_backend.stream_names) 341 342 @final 343 def __bool__(self) -> bool: 344 """Always True. 345 346 This is needed so that caches with zero streams are not falsey (None-like). 347 """ 348 return True 349 350 def get_state_provider( 351 self, 352 source_name: str, 353 *, 354 refresh: bool = True, 355 destination_name: str | None = None, 356 ) -> StateProviderBase: 357 """Return a state provider for the specified source name.""" 358 return self._state_backend.get_state_provider( 359 source_name=source_name, 360 table_prefix=self.table_prefix or "", 361 refresh=refresh, 362 destination_name=destination_name, 363 ) 364 365 def get_state_writer( 366 self, 367 source_name: str, 368 destination_name: str | None = None, 369 ) -> StateWriterBase: 370 """Return a state writer for the specified source name. 371 372 If syncing to the cache, `destination_name` should be `None`. 373 If syncing to a destination, `destination_name` should be the destination name. 374 """ 375 return self._state_backend.get_state_writer( 376 source_name=source_name, 377 destination_name=destination_name, 378 ) 379 380 def register_source( 381 self, 382 source_name: str, 383 incoming_source_catalog: ConfiguredAirbyteCatalog, 384 stream_names: set[str], 385 ) -> None: 386 """Register the source name and catalog.""" 387 self._catalog_backend.register_source( 388 source_name=source_name, 389 incoming_source_catalog=incoming_source_catalog, 390 incoming_stream_names=stream_names, 391 ) 392 393 def create_source_tables( 394 self, 395 source: Source, 396 streams: Literal["*"] | list[str] | None = None, 397 ) -> None: 398 """Create tables in the cache for the provided source if they do not exist already. 399 400 Tables are created based upon the Source's catalog. 401 402 Args: 403 source: The source to create tables for. 404 streams: Stream names to create tables for. If None, use the Source's selected_streams 405 or "*" if neither is set. If "*", all available streams will be used. 406 """ 407 if streams is None: 408 streams = source.get_selected_streams() or "*" 409 410 catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams)) 411 412 # Register the incoming source catalog 413 self.register_source( 414 source_name=source.name, 415 incoming_source_catalog=catalog_provider.configured_catalog, 416 stream_names=set(catalog_provider.stream_names), 417 ) 418 419 # Ensure schema exists 420 self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 421 422 # Create tables for each stream if they don't exist 423 for stream_name in catalog_provider.stream_names: 424 self.processor._ensure_final_table_exists( # noqa: SLF001 425 stream_name=stream_name, 426 create_if_missing=True, 427 ) 428 429 def __getitem__(self, stream: str) -> CachedDataset: 430 """Return a dataset by stream name.""" 431 return self.streams[stream] 432 433 def __contains__(self, stream: str) -> bool: 434 """Return whether a stream is in the cache.""" 435 return stream in (self._catalog_backend.stream_names) 436 437 def __iter__( # type: ignore [override] # Overriding Pydantic model method 438 self, 439 ) -> Iterator[tuple[str, Any]]: 440 """Iterate over the streams in the cache.""" 441 return ((name, dataset) for name, dataset in self.streams.items()) 442 443 def _write_airbyte_message_stream( 444 self, 445 stdin: IO[str] | AirbyteMessageIterator, 446 *, 447 catalog_provider: CatalogProvider, 448 write_strategy: WriteStrategy, 449 state_writer: StateWriterBase | None = None, 450 progress_tracker: ProgressTracker, 451 ) -> None: 452 """Read from the connector and write to the cache.""" 453 cache_processor = self.get_record_processor( 454 source_name=self.name, 455 catalog_provider=catalog_provider, 456 state_writer=state_writer, 457 ) 458 cache_processor.process_airbyte_messages( 459 messages=stdin, 460 write_strategy=write_strategy, 461 progress_tracker=progress_tracker, 462 ) 463 progress_tracker.log_cache_processing_complete()
Base configuration for a cache.
Caches inherit from the matching SqlConfig
class, which provides the SQL config settings
and basic connectivity to the SQL database.
The cache is responsible for managing the state of the data synced to the cache, including the
stream catalog and stream state. The cache also provides the mechanism to read and write data
to the SQL backend specified in the SqlConfig
class.
82 def __init__(self, **data: Any) -> None: # noqa: ANN401 83 """Initialize the cache and backends.""" 84 super().__init__(**data) 85 86 # Create a temporary processor to do the work of ensuring the schema exists 87 temp_processor = self._sql_processor_class( 88 sql_config=self, 89 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 90 state_writer=StdOutStateWriter(), 91 temp_dir=self.cache_dir, 92 temp_file_cleanup=self.cleanup, 93 ) 94 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 95 96 # Initialize the catalog and state backends 97 self._catalog_backend = SqlCatalogBackend( 98 sql_config=self, 99 table_prefix=self.table_prefix or "", 100 ) 101 self._state_backend = SqlStateBackend( 102 sql_config=self, 103 table_prefix=self.table_prefix or "", 104 ) 105 106 # Now we can create the SQL read processor 107 self._read_processor = self._sql_processor_class( 108 sql_config=self, 109 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 110 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 111 temp_dir=self.cache_dir, 112 temp_file_cleanup=self.cleanup, 113 )
Initialize the cache and backends.
74 @property 75 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 76 """Return a dictionary of destination configuration values.""" 77 raise NotImplementedError( 78 f"The type '{type(self).__name__}' does not define an equivalent destination " 79 "configuration." 80 )
Return a dictionary of destination configuration values.
115 def close(self) -> None: 116 """Close all database connections and dispose of connection pools. 117 118 This method ensures that all SQLAlchemy engines created by this cache 119 and its processors are properly disposed, releasing all database connections. 120 This is especially important for file-based databases like DuckDB, which 121 lock the database file until all connections are closed. 122 123 This method is idempotent and can be called multiple times safely. 124 125 Raises: 126 Exception: If any engine disposal fails, the exception will propagate 127 to the caller. This ensures callers are aware of cleanup failures. 128 """ 129 if self._read_processor is not None: 130 self._read_processor.sql_config.dispose_engine() 131 132 if self._catalog_backend is not None: 133 self._catalog_backend._sql_config.dispose_engine() # noqa: SLF001 134 135 if self._state_backend is not None: 136 self._state_backend._sql_config.dispose_engine() # noqa: SLF001 137 138 self.dispose_engine()
Close all database connections and dispose of connection pools.
This method ensures that all SQLAlchemy engines created by this cache and its processors are properly disposed, releasing all database connections. This is especially important for file-based databases like DuckDB, which lock the database file until all connections are closed.
This method is idempotent and can be called multiple times safely.
Raises:
- Exception: If any engine disposal fails, the exception will propagate to the caller. This ensures callers are aware of cleanup failures.
158 @property 159 def config_hash(self) -> str | None: 160 """Return a hash of the cache configuration. 161 162 This is the same as the SQLConfig hash from the superclass. 163 """ 164 return super(SqlConfig, self).config_hash
Return a hash of the cache configuration.
This is the same as the SQLConfig hash from the superclass.
166 def execute_sql(self, sql: str | list[str]) -> None: 167 """Execute one or more SQL statements against the cache's SQL backend. 168 169 If multiple SQL statements are given, they are executed in order, 170 within the same transaction. 171 172 This method is useful for creating tables, indexes, and other 173 schema objects in the cache. It does not return any results and it 174 automatically closes the connection after executing all statements. 175 176 This method is not intended for querying data. For that, use the `get_records` 177 method - or for a low-level interface, use the `get_sql_engine` method. 178 179 If any of the statements fail, the transaction is canceled and an exception 180 is raised. Most databases will rollback the transaction in this case. 181 """ 182 if isinstance(sql, str): 183 # Coerce to a list if a single string is given 184 sql = [sql] 185 186 with self.processor.get_sql_connection() as connection: 187 for sql_statement in sql: 188 connection.execute(text(sql_statement))
Execute one or more SQL statements against the cache's SQL backend.
If multiple SQL statements are given, they are executed in order, within the same transaction.
This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.
This method is not intended for querying data. For that, use the get_records
method - or for a low-level interface, use the get_sql_engine
method.
If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.
190 @final 191 @property 192 def processor(self) -> SqlProcessorBase: 193 """Return the SQL processor instance.""" 194 return self._read_processor
Return the SQL processor instance.
196 def run_sql_query( 197 self, 198 sql_query: str, 199 *, 200 max_records: int | None = None, 201 ) -> list[dict[str, Any]]: 202 """Run a SQL query against the cache and return results as a list of dictionaries. 203 204 This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. 205 For DDL statements or multiple statements, use the processor directly. 206 207 Args: 208 sql_query: The SQL query to execute 209 max_records: Maximum number of records to return. If None, returns all records. 210 211 Returns: 212 List of dictionaries representing the query results 213 """ 214 # Execute the SQL within a connection context to ensure the connection stays open 215 # while we fetch the results 216 sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query 217 218 with self.processor.get_sql_connection() as conn: 219 try: 220 result = conn.execute(sql_text) 221 except ( 222 sqlalchemy_exc.ProgrammingError, 223 sqlalchemy_exc.SQLAlchemyError, 224 ) as ex: 225 msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}" 226 raise RuntimeError(msg) from ex 227 228 # Convert the result to a list of dictionaries while connection is still open 229 if result.returns_rows: 230 # Get column names 231 columns = list(result.keys()) if result.keys() else [] 232 233 # Fetch rows efficiently based on limit 234 if max_records is not None: 235 rows = result.fetchmany(max_records) 236 else: 237 rows = result.fetchall() 238 239 return [dict(zip(columns, row, strict=True)) for row in rows] 240 241 # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.) 242 return []
Run a SQL query against the cache and return results as a list of dictionaries.
This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. For DDL statements or multiple statements, use the processor directly.
Arguments:
- sql_query: The SQL query to execute
- max_records: Maximum number of records to return. If None, returns all records.
Returns:
List of dictionaries representing the query results
244 def get_record_processor( 245 self, 246 source_name: str, 247 catalog_provider: CatalogProvider, 248 state_writer: StateWriterBase | None = None, 249 ) -> SqlProcessorBase: 250 """Return a record processor for the specified source name and catalog. 251 252 We first register the source and its catalog with the catalog manager. Then we create a new 253 SQL processor instance with (only) the given input catalog. 254 255 For the state writer, we use a state writer which stores state in an internal SQL table. 256 """ 257 # First register the source and catalog into durable storage. This is necessary to ensure 258 # that we can later retrieve the catalog information. 259 self.register_source( 260 source_name=source_name, 261 incoming_source_catalog=catalog_provider.configured_catalog, 262 stream_names=set(catalog_provider.stream_names), 263 ) 264 265 # Next create a new SQL processor instance with the given catalog - and a state writer 266 # that writes state to the internal SQL table and associates with the given source name. 267 return self._sql_processor_class( 268 sql_config=self, 269 catalog_provider=catalog_provider, 270 state_writer=state_writer or self.get_state_writer(source_name=source_name), 271 temp_dir=self.cache_dir, 272 temp_file_cleanup=self.cleanup, 273 )
Return a record processor for the specified source name and catalog.
We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.
For the state writer, we use a state writer which stores state in an internal SQL table.
277 def get_records( 278 self, 279 stream_name: str, 280 ) -> CachedDataset: 281 """Uses SQLAlchemy to select all rows from the table.""" 282 return CachedDataset(self, stream_name)
Uses SQLAlchemy to select all rows from the table.
284 def get_pandas_dataframe( 285 self, 286 stream_name: str, 287 ) -> pd.DataFrame: 288 """Return a Pandas data frame with the stream's data.""" 289 table_name = self._read_processor.get_sql_table_name(stream_name) 290 engine = self.get_sql_engine() 291 return pd.read_sql_table(table_name, engine, schema=self.schema_name)
Return a Pandas data frame with the stream's data.
293 def get_arrow_dataset( 294 self, 295 stream_name: str, 296 *, 297 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 298 ) -> ds.Dataset: 299 """Return an Arrow Dataset with the stream's data.""" 300 table_name = self._read_processor.get_sql_table_name(stream_name) 301 engine = self.get_sql_engine() 302 303 # Read the table in chunks to handle large tables which does not fits in memory 304 pandas_chunks = pd.read_sql_table( 305 table_name=table_name, 306 con=engine, 307 schema=self.schema_name, 308 chunksize=max_chunk_size, 309 ) 310 311 arrow_batches_list = [] 312 arrow_schema = None 313 314 for pandas_chunk in pandas_chunks: 315 if arrow_schema is None: 316 # Initialize the schema with the first chunk 317 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 318 319 # Convert each pandas chunk to an Arrow Table 320 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 321 arrow_batches_list.append(arrow_table) 322 323 return ds.dataset(arrow_batches_list)
Return an Arrow Dataset with the stream's data.
325 @final 326 @property 327 def streams(self) -> dict[str, CachedDataset]: 328 """Return a temporary table name.""" 329 result = {} 330 stream_names = set(self._catalog_backend.stream_names) 331 332 for stream_name in stream_names: 333 result[stream_name] = CachedDataset(self, stream_name) 334 335 return result
Return a temporary table name.
350 def get_state_provider( 351 self, 352 source_name: str, 353 *, 354 refresh: bool = True, 355 destination_name: str | None = None, 356 ) -> StateProviderBase: 357 """Return a state provider for the specified source name.""" 358 return self._state_backend.get_state_provider( 359 source_name=source_name, 360 table_prefix=self.table_prefix or "", 361 refresh=refresh, 362 destination_name=destination_name, 363 )
Return a state provider for the specified source name.
365 def get_state_writer( 366 self, 367 source_name: str, 368 destination_name: str | None = None, 369 ) -> StateWriterBase: 370 """Return a state writer for the specified source name. 371 372 If syncing to the cache, `destination_name` should be `None`. 373 If syncing to a destination, `destination_name` should be the destination name. 374 """ 375 return self._state_backend.get_state_writer( 376 source_name=source_name, 377 destination_name=destination_name, 378 )
Return a state writer for the specified source name.
If syncing to the cache, destination_name
should be None
.
If syncing to a destination, destination_name
should be the destination name.
380 def register_source( 381 self, 382 source_name: str, 383 incoming_source_catalog: ConfiguredAirbyteCatalog, 384 stream_names: set[str], 385 ) -> None: 386 """Register the source name and catalog.""" 387 self._catalog_backend.register_source( 388 source_name=source_name, 389 incoming_source_catalog=incoming_source_catalog, 390 incoming_stream_names=stream_names, 391 )
Register the source name and catalog.
393 def create_source_tables( 394 self, 395 source: Source, 396 streams: Literal["*"] | list[str] | None = None, 397 ) -> None: 398 """Create tables in the cache for the provided source if they do not exist already. 399 400 Tables are created based upon the Source's catalog. 401 402 Args: 403 source: The source to create tables for. 404 streams: Stream names to create tables for. If None, use the Source's selected_streams 405 or "*" if neither is set. If "*", all available streams will be used. 406 """ 407 if streams is None: 408 streams = source.get_selected_streams() or "*" 409 410 catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams)) 411 412 # Register the incoming source catalog 413 self.register_source( 414 source_name=source.name, 415 incoming_source_catalog=catalog_provider.configured_catalog, 416 stream_names=set(catalog_provider.stream_names), 417 ) 418 419 # Ensure schema exists 420 self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 421 422 # Create tables for each stream if they don't exist 423 for stream_name in catalog_provider.stream_names: 424 self.processor._ensure_final_table_exists( # noqa: SLF001 425 stream_name=stream_name, 426 create_if_missing=True, 427 )
Create tables in the cache for the provided source if they do not exist already.
Tables are created based upon the Source's catalog.
Arguments:
- source: The source to create tables for.
- streams: Stream names to create tables for. If None, use the Source's selected_streams or "" if neither is set. If "", all available streams will be used.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_sql_alchemy_url
- get_database_name
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- get_sql_engine
- dispose_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name