airbyte.caches.base

SQL Cache implementation.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""SQL Cache implementation."""
  3
  4from __future__ import annotations
  5
  6import contextlib
  7from pathlib import Path
  8from typing import IO, TYPE_CHECKING, Any, ClassVar, Literal, final
  9
 10import pandas as pd
 11import pyarrow as pa
 12import pyarrow.dataset as ds
 13from pydantic import Field, PrivateAttr
 14from sqlalchemy import exc as sqlalchemy_exc
 15from sqlalchemy import text
 16from typing_extensions import Self
 17
 18from airbyte_protocol.models import ConfiguredAirbyteCatalog
 19
 20from airbyte import constants
 21from airbyte._writers.base import AirbyteWriterInterface
 22from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend
 23from airbyte.caches._state_backend import SqlStateBackend
 24from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP
 25from airbyte.datasets._sql import CachedDataset
 26from airbyte.shared.catalog_providers import CatalogProvider
 27from airbyte.shared.sql_processor import SqlConfig
 28from airbyte.shared.state_writers import StdOutStateWriter
 29
 30
 31if TYPE_CHECKING:
 32    from collections.abc import Iterator
 33    from types import TracebackType
 34
 35    from airbyte._message_iterators import AirbyteMessageIterator
 36    from airbyte.caches._state_backend_base import StateBackendBase
 37    from airbyte.progress import ProgressTracker
 38    from airbyte.shared.sql_processor import SqlProcessorBase
 39    from airbyte.shared.state_providers import StateProviderBase
 40    from airbyte.shared.state_writers import StateWriterBase
 41    from airbyte.sources.base import Source
 42    from airbyte.strategies import WriteStrategy
 43
 44
 45class CacheBase(SqlConfig, AirbyteWriterInterface):  # noqa: PLR0904
 46    """Base configuration for a cache.
 47
 48    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 49    and basic connectivity to the SQL database.
 50
 51    The cache is responsible for managing the state of the data synced to the cache, including the
 52    stream catalog and stream state. The cache also provides the mechanism to read and write data
 53    to the SQL backend specified in the `SqlConfig` class.
 54    """
 55
 56    cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT))
 57    """The directory to store the cache in."""
 58
 59    cleanup: bool = TEMP_FILE_CLEANUP
 60    """Whether to clean up the cache after use."""
 61
 62    _name: str = PrivateAttr()
 63
 64    _sql_processor_class: ClassVar[type[SqlProcessorBase]]
 65    _read_processor: SqlProcessorBase = PrivateAttr()
 66
 67    _catalog_backend: CatalogBackendBase = PrivateAttr()
 68    _state_backend: StateBackendBase = PrivateAttr()
 69
 70    paired_destination_name: ClassVar[str | None] = None
 71    paired_destination_config_class: ClassVar[type | None] = None
 72
 73    @property
 74    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
 75        """Return a dictionary of destination configuration values."""
 76        raise NotImplementedError(
 77            f"The type '{type(self).__name__}' does not define an equivalent destination "
 78            "configuration."
 79        )
 80
 81    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 82        """Initialize the cache and backends."""
 83        super().__init__(**data)
 84
 85        # Create a temporary processor to do the work of ensuring the schema exists
 86        temp_processor = self._sql_processor_class(
 87            sql_config=self,
 88            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 89            state_writer=StdOutStateWriter(),
 90            temp_dir=self.cache_dir,
 91            temp_file_cleanup=self.cleanup,
 92        )
 93        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 94
 95        # Initialize the catalog and state backends
 96        self._catalog_backend = SqlCatalogBackend(
 97            sql_config=self,
 98            table_prefix=self.table_prefix or "",
 99        )
100        self._state_backend = SqlStateBackend(
101            sql_config=self,
102            table_prefix=self.table_prefix or "",
103        )
104
105        # Now we can create the SQL read processor
106        self._read_processor = self._sql_processor_class(
107            sql_config=self,
108            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
109            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
110            temp_dir=self.cache_dir,
111            temp_file_cleanup=self.cleanup,
112        )
113
114    def close(self) -> None:
115        """Close all database connections and dispose of connection pools.
116
117        This method ensures that all SQLAlchemy engines created by this cache
118        and its processors are properly disposed, releasing all database connections.
119        This is especially important for file-based databases like DuckDB, which
120        lock the database file until all connections are closed.
121
122        This method is idempotent and can be called multiple times safely.
123
124        Raises:
125            Exception: If any engine disposal fails, the exception will propagate
126                to the caller. This ensures callers are aware of cleanup failures.
127        """
128        if self._read_processor is not None:
129            self._read_processor.sql_config.dispose_engine()
130
131        if self._catalog_backend is not None:
132            self._catalog_backend._sql_config.dispose_engine()  # noqa: SLF001
133
134        if self._state_backend is not None:
135            self._state_backend._sql_config.dispose_engine()  # noqa: SLF001
136
137        self.dispose_engine()
138
139    def __enter__(self) -> Self:
140        """Enter context manager."""
141        return self
142
143    def __exit__(
144        self,
145        exc_type: type[BaseException] | None,
146        exc_val: BaseException | None,
147        exc_tb: TracebackType | None,
148    ) -> None:
149        """Exit context manager and clean up resources."""
150        self.close()
151
152    def __del__(self) -> None:
153        """Clean up resources when cache is garbage collected."""
154        with contextlib.suppress(Exception):
155            self.close()
156
157    @property
158    def config_hash(self) -> str | None:
159        """Return a hash of the cache configuration.
160
161        This is the same as the SQLConfig hash from the superclass.
162        """
163        return super(SqlConfig, self).config_hash
164
165    def execute_sql(self, sql: str | list[str]) -> None:
166        """Execute one or more SQL statements against the cache's SQL backend.
167
168        If multiple SQL statements are given, they are executed in order,
169        within the same transaction.
170
171        This method is useful for creating tables, indexes, and other
172        schema objects in the cache. It does not return any results and it
173        automatically closes the connection after executing all statements.
174
175        This method is not intended for querying data. For that, use the `get_records`
176        method - or for a low-level interface, use the `get_sql_engine` method.
177
178        If any of the statements fail, the transaction is canceled and an exception
179        is raised. Most databases will rollback the transaction in this case.
180        """
181        if isinstance(sql, str):
182            # Coerce to a list if a single string is given
183            sql = [sql]
184
185        with self.processor.get_sql_connection() as connection:
186            for sql_statement in sql:
187                connection.execute(text(sql_statement))
188
189    @final
190    @property
191    def processor(self) -> SqlProcessorBase:
192        """Return the SQL processor instance."""
193        return self._read_processor
194
195    def run_sql_query(
196        self,
197        sql_query: str,
198        *,
199        max_records: int | None = None,
200    ) -> list[dict[str, Any]]:
201        """Run a SQL query against the cache and return results as a list of dictionaries.
202
203        This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE.
204        For DDL statements or multiple statements, use the processor directly.
205
206        Args:
207            sql_query: The SQL query to execute
208            max_records: Maximum number of records to return. If None, returns all records.
209
210        Returns:
211            List of dictionaries representing the query results
212        """
213        # Execute the SQL within a connection context to ensure the connection stays open
214        # while we fetch the results
215        sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query
216
217        with self.processor.get_sql_connection() as conn:
218            try:
219                result = conn.execute(sql_text)
220            except (
221                sqlalchemy_exc.ProgrammingError,
222                sqlalchemy_exc.SQLAlchemyError,
223            ) as ex:
224                msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}"
225                raise RuntimeError(msg) from ex
226
227            # Convert the result to a list of dictionaries while connection is still open
228            if result.returns_rows:
229                # Get column names
230                columns = list(result.keys()) if result.keys() else []
231
232                # Fetch rows efficiently based on limit
233                if max_records is not None:
234                    rows = result.fetchmany(max_records)
235                else:
236                    rows = result.fetchall()
237
238                return [dict(zip(columns, row, strict=True)) for row in rows]
239
240            # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.)
241            return []
242
243    def get_record_processor(
244        self,
245        source_name: str,
246        catalog_provider: CatalogProvider,
247        state_writer: StateWriterBase | None = None,
248    ) -> SqlProcessorBase:
249        """Return a record processor for the specified source name and catalog.
250
251        We first register the source and its catalog with the catalog manager. Then we create a new
252        SQL processor instance with (only) the given input catalog.
253
254        For the state writer, we use a state writer which stores state in an internal SQL table.
255        """
256        # First register the source and catalog into durable storage. This is necessary to ensure
257        # that we can later retrieve the catalog information.
258        self.register_source(
259            source_name=source_name,
260            incoming_source_catalog=catalog_provider.configured_catalog,
261            stream_names=set(catalog_provider.stream_names),
262        )
263
264        # Next create a new SQL processor instance with the given catalog - and a state writer
265        # that writes state to the internal SQL table and associates with the given source name.
266        return self._sql_processor_class(
267            sql_config=self,
268            catalog_provider=catalog_provider,
269            state_writer=state_writer or self.get_state_writer(source_name=source_name),
270            temp_dir=self.cache_dir,
271            temp_file_cleanup=self.cleanup,
272        )
273
274    # Read methods:
275
276    def get_records(
277        self,
278        stream_name: str,
279    ) -> CachedDataset:
280        """Uses SQLAlchemy to select all rows from the table."""
281        return CachedDataset(self, stream_name)
282
283    def get_pandas_dataframe(
284        self,
285        stream_name: str,
286    ) -> pd.DataFrame:
287        """Return a Pandas data frame with the stream's data."""
288        table_name = self._read_processor.get_sql_table_name(stream_name)
289        engine = self.get_sql_engine()
290        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
291
292    def get_arrow_dataset(
293        self,
294        stream_name: str,
295        *,
296        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
297    ) -> ds.Dataset:
298        """Return an Arrow Dataset with the stream's data."""
299        table_name = self._read_processor.get_sql_table_name(stream_name)
300        engine = self.get_sql_engine()
301
302        # Read the table in chunks to handle large tables which does not fits in memory
303        pandas_chunks = pd.read_sql_table(
304            table_name=table_name,
305            con=engine,
306            schema=self.schema_name,
307            chunksize=max_chunk_size,
308        )
309
310        arrow_batches_list = []
311        arrow_schema = None
312
313        for pandas_chunk in pandas_chunks:
314            if arrow_schema is None:
315                # Initialize the schema with the first chunk
316                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
317
318            # Convert each pandas chunk to an Arrow Table
319            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
320            arrow_batches_list.append(arrow_table)
321
322        return ds.dataset(arrow_batches_list)
323
324    @final
325    @property
326    def streams(self) -> dict[str, CachedDataset]:
327        """Return a temporary table name."""
328        result = {}
329        stream_names = set(self._catalog_backend.stream_names)
330
331        for stream_name in stream_names:
332            result[stream_name] = CachedDataset(self, stream_name)
333
334        return result
335
336    @final
337    def __len__(self) -> int:
338        """Gets the number of streams."""
339        return len(self._catalog_backend.stream_names)
340
341    @final
342    def __bool__(self) -> bool:
343        """Always True.
344
345        This is needed so that caches with zero streams are not falsey (None-like).
346        """
347        return True
348
349    def get_state_provider(
350        self,
351        source_name: str,
352        *,
353        refresh: bool = True,
354        destination_name: str | None = None,
355    ) -> StateProviderBase:
356        """Return a state provider for the specified source name."""
357        return self._state_backend.get_state_provider(
358            source_name=source_name,
359            table_prefix=self.table_prefix or "",
360            refresh=refresh,
361            destination_name=destination_name,
362        )
363
364    def get_state_writer(
365        self,
366        source_name: str,
367        destination_name: str | None = None,
368    ) -> StateWriterBase:
369        """Return a state writer for the specified source name.
370
371        If syncing to the cache, `destination_name` should be `None`.
372        If syncing to a destination, `destination_name` should be the destination name.
373        """
374        return self._state_backend.get_state_writer(
375            source_name=source_name,
376            destination_name=destination_name,
377        )
378
379    def register_source(
380        self,
381        source_name: str,
382        incoming_source_catalog: ConfiguredAirbyteCatalog,
383        stream_names: set[str],
384    ) -> None:
385        """Register the source name and catalog."""
386        self._catalog_backend.register_source(
387            source_name=source_name,
388            incoming_source_catalog=incoming_source_catalog,
389            incoming_stream_names=stream_names,
390        )
391
392    def create_source_tables(
393        self,
394        source: Source,
395        streams: Literal["*"] | list[str] | None = None,
396    ) -> None:
397        """Create tables in the cache for the provided source if they do not exist already.
398
399        Tables are created based upon the Source's catalog.
400
401        Args:
402            source: The source to create tables for.
403            streams: Stream names to create tables for. If None, use the Source's selected_streams
404                or "*" if neither is set. If "*", all available streams will be used.
405        """
406        if streams is None:
407            streams = source.get_selected_streams() or "*"
408
409        catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams))
410
411        # Register the incoming source catalog
412        self.register_source(
413            source_name=source.name,
414            incoming_source_catalog=catalog_provider.configured_catalog,
415            stream_names=set(catalog_provider.stream_names),
416        )
417
418        # Ensure schema exists
419        self.processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
420
421        # Create tables for each stream if they don't exist
422        for stream_name in catalog_provider.stream_names:
423            self.processor._ensure_final_table_exists(  # noqa: SLF001
424                stream_name=stream_name,
425                create_if_missing=True,
426            )
427
428    def __getitem__(self, stream: str) -> CachedDataset:
429        """Return a dataset by stream name."""
430        return self.streams[stream]
431
432    def __contains__(self, stream: str) -> bool:
433        """Return whether a stream is in the cache."""
434        return stream in (self._catalog_backend.stream_names)
435
436    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
437        self,
438    ) -> Iterator[tuple[str, Any]]:
439        """Iterate over the streams in the cache."""
440        return ((name, dataset) for name, dataset in self.streams.items())
441
442    def _write_airbyte_message_stream(
443        self,
444        stdin: IO[str] | AirbyteMessageIterator,
445        *,
446        catalog_provider: CatalogProvider,
447        write_strategy: WriteStrategy,
448        state_writer: StateWriterBase | None = None,
449        progress_tracker: ProgressTracker,
450    ) -> None:
451        """Read from the connector and write to the cache."""
452        cache_processor = self.get_record_processor(
453            source_name=self.name,
454            catalog_provider=catalog_provider,
455            state_writer=state_writer,
456        )
457        cache_processor.process_airbyte_messages(
458            messages=stdin,
459            write_strategy=write_strategy,
460            progress_tracker=progress_tracker,
461        )
462        progress_tracker.log_cache_processing_complete()
class CacheBase(airbyte.shared.sql_processor.SqlConfig, airbyte._writers.base.AirbyteWriterInterface):
 46class CacheBase(SqlConfig, AirbyteWriterInterface):  # noqa: PLR0904
 47    """Base configuration for a cache.
 48
 49    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 50    and basic connectivity to the SQL database.
 51
 52    The cache is responsible for managing the state of the data synced to the cache, including the
 53    stream catalog and stream state. The cache also provides the mechanism to read and write data
 54    to the SQL backend specified in the `SqlConfig` class.
 55    """
 56
 57    cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT))
 58    """The directory to store the cache in."""
 59
 60    cleanup: bool = TEMP_FILE_CLEANUP
 61    """Whether to clean up the cache after use."""
 62
 63    _name: str = PrivateAttr()
 64
 65    _sql_processor_class: ClassVar[type[SqlProcessorBase]]
 66    _read_processor: SqlProcessorBase = PrivateAttr()
 67
 68    _catalog_backend: CatalogBackendBase = PrivateAttr()
 69    _state_backend: StateBackendBase = PrivateAttr()
 70
 71    paired_destination_name: ClassVar[str | None] = None
 72    paired_destination_config_class: ClassVar[type | None] = None
 73
 74    @property
 75    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
 76        """Return a dictionary of destination configuration values."""
 77        raise NotImplementedError(
 78            f"The type '{type(self).__name__}' does not define an equivalent destination "
 79            "configuration."
 80        )
 81
 82    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 83        """Initialize the cache and backends."""
 84        super().__init__(**data)
 85
 86        # Create a temporary processor to do the work of ensuring the schema exists
 87        temp_processor = self._sql_processor_class(
 88            sql_config=self,
 89            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 90            state_writer=StdOutStateWriter(),
 91            temp_dir=self.cache_dir,
 92            temp_file_cleanup=self.cleanup,
 93        )
 94        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 95
 96        # Initialize the catalog and state backends
 97        self._catalog_backend = SqlCatalogBackend(
 98            sql_config=self,
 99            table_prefix=self.table_prefix or "",
100        )
101        self._state_backend = SqlStateBackend(
102            sql_config=self,
103            table_prefix=self.table_prefix or "",
104        )
105
106        # Now we can create the SQL read processor
107        self._read_processor = self._sql_processor_class(
108            sql_config=self,
109            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
110            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
111            temp_dir=self.cache_dir,
112            temp_file_cleanup=self.cleanup,
113        )
114
115    def close(self) -> None:
116        """Close all database connections and dispose of connection pools.
117
118        This method ensures that all SQLAlchemy engines created by this cache
119        and its processors are properly disposed, releasing all database connections.
120        This is especially important for file-based databases like DuckDB, which
121        lock the database file until all connections are closed.
122
123        This method is idempotent and can be called multiple times safely.
124
125        Raises:
126            Exception: If any engine disposal fails, the exception will propagate
127                to the caller. This ensures callers are aware of cleanup failures.
128        """
129        if self._read_processor is not None:
130            self._read_processor.sql_config.dispose_engine()
131
132        if self._catalog_backend is not None:
133            self._catalog_backend._sql_config.dispose_engine()  # noqa: SLF001
134
135        if self._state_backend is not None:
136            self._state_backend._sql_config.dispose_engine()  # noqa: SLF001
137
138        self.dispose_engine()
139
140    def __enter__(self) -> Self:
141        """Enter context manager."""
142        return self
143
144    def __exit__(
145        self,
146        exc_type: type[BaseException] | None,
147        exc_val: BaseException | None,
148        exc_tb: TracebackType | None,
149    ) -> None:
150        """Exit context manager and clean up resources."""
151        self.close()
152
153    def __del__(self) -> None:
154        """Clean up resources when cache is garbage collected."""
155        with contextlib.suppress(Exception):
156            self.close()
157
158    @property
159    def config_hash(self) -> str | None:
160        """Return a hash of the cache configuration.
161
162        This is the same as the SQLConfig hash from the superclass.
163        """
164        return super(SqlConfig, self).config_hash
165
166    def execute_sql(self, sql: str | list[str]) -> None:
167        """Execute one or more SQL statements against the cache's SQL backend.
168
169        If multiple SQL statements are given, they are executed in order,
170        within the same transaction.
171
172        This method is useful for creating tables, indexes, and other
173        schema objects in the cache. It does not return any results and it
174        automatically closes the connection after executing all statements.
175
176        This method is not intended for querying data. For that, use the `get_records`
177        method - or for a low-level interface, use the `get_sql_engine` method.
178
179        If any of the statements fail, the transaction is canceled and an exception
180        is raised. Most databases will rollback the transaction in this case.
181        """
182        if isinstance(sql, str):
183            # Coerce to a list if a single string is given
184            sql = [sql]
185
186        with self.processor.get_sql_connection() as connection:
187            for sql_statement in sql:
188                connection.execute(text(sql_statement))
189
190    @final
191    @property
192    def processor(self) -> SqlProcessorBase:
193        """Return the SQL processor instance."""
194        return self._read_processor
195
196    def run_sql_query(
197        self,
198        sql_query: str,
199        *,
200        max_records: int | None = None,
201    ) -> list[dict[str, Any]]:
202        """Run a SQL query against the cache and return results as a list of dictionaries.
203
204        This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE.
205        For DDL statements or multiple statements, use the processor directly.
206
207        Args:
208            sql_query: The SQL query to execute
209            max_records: Maximum number of records to return. If None, returns all records.
210
211        Returns:
212            List of dictionaries representing the query results
213        """
214        # Execute the SQL within a connection context to ensure the connection stays open
215        # while we fetch the results
216        sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query
217
218        with self.processor.get_sql_connection() as conn:
219            try:
220                result = conn.execute(sql_text)
221            except (
222                sqlalchemy_exc.ProgrammingError,
223                sqlalchemy_exc.SQLAlchemyError,
224            ) as ex:
225                msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}"
226                raise RuntimeError(msg) from ex
227
228            # Convert the result to a list of dictionaries while connection is still open
229            if result.returns_rows:
230                # Get column names
231                columns = list(result.keys()) if result.keys() else []
232
233                # Fetch rows efficiently based on limit
234                if max_records is not None:
235                    rows = result.fetchmany(max_records)
236                else:
237                    rows = result.fetchall()
238
239                return [dict(zip(columns, row, strict=True)) for row in rows]
240
241            # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.)
242            return []
243
244    def get_record_processor(
245        self,
246        source_name: str,
247        catalog_provider: CatalogProvider,
248        state_writer: StateWriterBase | None = None,
249    ) -> SqlProcessorBase:
250        """Return a record processor for the specified source name and catalog.
251
252        We first register the source and its catalog with the catalog manager. Then we create a new
253        SQL processor instance with (only) the given input catalog.
254
255        For the state writer, we use a state writer which stores state in an internal SQL table.
256        """
257        # First register the source and catalog into durable storage. This is necessary to ensure
258        # that we can later retrieve the catalog information.
259        self.register_source(
260            source_name=source_name,
261            incoming_source_catalog=catalog_provider.configured_catalog,
262            stream_names=set(catalog_provider.stream_names),
263        )
264
265        # Next create a new SQL processor instance with the given catalog - and a state writer
266        # that writes state to the internal SQL table and associates with the given source name.
267        return self._sql_processor_class(
268            sql_config=self,
269            catalog_provider=catalog_provider,
270            state_writer=state_writer or self.get_state_writer(source_name=source_name),
271            temp_dir=self.cache_dir,
272            temp_file_cleanup=self.cleanup,
273        )
274
275    # Read methods:
276
277    def get_records(
278        self,
279        stream_name: str,
280    ) -> CachedDataset:
281        """Uses SQLAlchemy to select all rows from the table."""
282        return CachedDataset(self, stream_name)
283
284    def get_pandas_dataframe(
285        self,
286        stream_name: str,
287    ) -> pd.DataFrame:
288        """Return a Pandas data frame with the stream's data."""
289        table_name = self._read_processor.get_sql_table_name(stream_name)
290        engine = self.get_sql_engine()
291        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
292
293    def get_arrow_dataset(
294        self,
295        stream_name: str,
296        *,
297        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
298    ) -> ds.Dataset:
299        """Return an Arrow Dataset with the stream's data."""
300        table_name = self._read_processor.get_sql_table_name(stream_name)
301        engine = self.get_sql_engine()
302
303        # Read the table in chunks to handle large tables which does not fits in memory
304        pandas_chunks = pd.read_sql_table(
305            table_name=table_name,
306            con=engine,
307            schema=self.schema_name,
308            chunksize=max_chunk_size,
309        )
310
311        arrow_batches_list = []
312        arrow_schema = None
313
314        for pandas_chunk in pandas_chunks:
315            if arrow_schema is None:
316                # Initialize the schema with the first chunk
317                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
318
319            # Convert each pandas chunk to an Arrow Table
320            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
321            arrow_batches_list.append(arrow_table)
322
323        return ds.dataset(arrow_batches_list)
324
325    @final
326    @property
327    def streams(self) -> dict[str, CachedDataset]:
328        """Return a temporary table name."""
329        result = {}
330        stream_names = set(self._catalog_backend.stream_names)
331
332        for stream_name in stream_names:
333            result[stream_name] = CachedDataset(self, stream_name)
334
335        return result
336
337    @final
338    def __len__(self) -> int:
339        """Gets the number of streams."""
340        return len(self._catalog_backend.stream_names)
341
342    @final
343    def __bool__(self) -> bool:
344        """Always True.
345
346        This is needed so that caches with zero streams are not falsey (None-like).
347        """
348        return True
349
350    def get_state_provider(
351        self,
352        source_name: str,
353        *,
354        refresh: bool = True,
355        destination_name: str | None = None,
356    ) -> StateProviderBase:
357        """Return a state provider for the specified source name."""
358        return self._state_backend.get_state_provider(
359            source_name=source_name,
360            table_prefix=self.table_prefix or "",
361            refresh=refresh,
362            destination_name=destination_name,
363        )
364
365    def get_state_writer(
366        self,
367        source_name: str,
368        destination_name: str | None = None,
369    ) -> StateWriterBase:
370        """Return a state writer for the specified source name.
371
372        If syncing to the cache, `destination_name` should be `None`.
373        If syncing to a destination, `destination_name` should be the destination name.
374        """
375        return self._state_backend.get_state_writer(
376            source_name=source_name,
377            destination_name=destination_name,
378        )
379
380    def register_source(
381        self,
382        source_name: str,
383        incoming_source_catalog: ConfiguredAirbyteCatalog,
384        stream_names: set[str],
385    ) -> None:
386        """Register the source name and catalog."""
387        self._catalog_backend.register_source(
388            source_name=source_name,
389            incoming_source_catalog=incoming_source_catalog,
390            incoming_stream_names=stream_names,
391        )
392
393    def create_source_tables(
394        self,
395        source: Source,
396        streams: Literal["*"] | list[str] | None = None,
397    ) -> None:
398        """Create tables in the cache for the provided source if they do not exist already.
399
400        Tables are created based upon the Source's catalog.
401
402        Args:
403            source: The source to create tables for.
404            streams: Stream names to create tables for. If None, use the Source's selected_streams
405                or "*" if neither is set. If "*", all available streams will be used.
406        """
407        if streams is None:
408            streams = source.get_selected_streams() or "*"
409
410        catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams))
411
412        # Register the incoming source catalog
413        self.register_source(
414            source_name=source.name,
415            incoming_source_catalog=catalog_provider.configured_catalog,
416            stream_names=set(catalog_provider.stream_names),
417        )
418
419        # Ensure schema exists
420        self.processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
421
422        # Create tables for each stream if they don't exist
423        for stream_name in catalog_provider.stream_names:
424            self.processor._ensure_final_table_exists(  # noqa: SLF001
425                stream_name=stream_name,
426                create_if_missing=True,
427            )
428
429    def __getitem__(self, stream: str) -> CachedDataset:
430        """Return a dataset by stream name."""
431        return self.streams[stream]
432
433    def __contains__(self, stream: str) -> bool:
434        """Return whether a stream is in the cache."""
435        return stream in (self._catalog_backend.stream_names)
436
437    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
438        self,
439    ) -> Iterator[tuple[str, Any]]:
440        """Iterate over the streams in the cache."""
441        return ((name, dataset) for name, dataset in self.streams.items())
442
443    def _write_airbyte_message_stream(
444        self,
445        stdin: IO[str] | AirbyteMessageIterator,
446        *,
447        catalog_provider: CatalogProvider,
448        write_strategy: WriteStrategy,
449        state_writer: StateWriterBase | None = None,
450        progress_tracker: ProgressTracker,
451    ) -> None:
452        """Read from the connector and write to the cache."""
453        cache_processor = self.get_record_processor(
454            source_name=self.name,
455            catalog_provider=catalog_provider,
456            state_writer=state_writer,
457        )
458        cache_processor.process_airbyte_messages(
459            messages=stdin,
460            write_strategy=write_strategy,
461            progress_tracker=progress_tracker,
462        )
463        progress_tracker.log_cache_processing_complete()

Base configuration for a cache.

Caches inherit from the matching SqlConfig class, which provides the SQL config settings and basic connectivity to the SQL database.

The cache is responsible for managing the state of the data synced to the cache, including the stream catalog and stream state. The cache also provides the mechanism to read and write data to the SQL backend specified in the SqlConfig class.

CacheBase(**data: Any)
 82    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 83        """Initialize the cache and backends."""
 84        super().__init__(**data)
 85
 86        # Create a temporary processor to do the work of ensuring the schema exists
 87        temp_processor = self._sql_processor_class(
 88            sql_config=self,
 89            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 90            state_writer=StdOutStateWriter(),
 91            temp_dir=self.cache_dir,
 92            temp_file_cleanup=self.cleanup,
 93        )
 94        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 95
 96        # Initialize the catalog and state backends
 97        self._catalog_backend = SqlCatalogBackend(
 98            sql_config=self,
 99            table_prefix=self.table_prefix or "",
100        )
101        self._state_backend = SqlStateBackend(
102            sql_config=self,
103            table_prefix=self.table_prefix or "",
104        )
105
106        # Now we can create the SQL read processor
107        self._read_processor = self._sql_processor_class(
108            sql_config=self,
109            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
110            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
111            temp_dir=self.cache_dir,
112            temp_file_cleanup=self.cleanup,
113        )

Initialize the cache and backends.

cache_dir: pathlib.Path

The directory to store the cache in.

cleanup: bool

Whether to clean up the cache after use.

paired_destination_name: ClassVar[str | None] = None
paired_destination_config_class: ClassVar[type | None] = None
paired_destination_config: Union[Any, dict[str, Any]]
74    @property
75    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
76        """Return a dictionary of destination configuration values."""
77        raise NotImplementedError(
78            f"The type '{type(self).__name__}' does not define an equivalent destination "
79            "configuration."
80        )

Return a dictionary of destination configuration values.

def close(self) -> None:
115    def close(self) -> None:
116        """Close all database connections and dispose of connection pools.
117
118        This method ensures that all SQLAlchemy engines created by this cache
119        and its processors are properly disposed, releasing all database connections.
120        This is especially important for file-based databases like DuckDB, which
121        lock the database file until all connections are closed.
122
123        This method is idempotent and can be called multiple times safely.
124
125        Raises:
126            Exception: If any engine disposal fails, the exception will propagate
127                to the caller. This ensures callers are aware of cleanup failures.
128        """
129        if self._read_processor is not None:
130            self._read_processor.sql_config.dispose_engine()
131
132        if self._catalog_backend is not None:
133            self._catalog_backend._sql_config.dispose_engine()  # noqa: SLF001
134
135        if self._state_backend is not None:
136            self._state_backend._sql_config.dispose_engine()  # noqa: SLF001
137
138        self.dispose_engine()

Close all database connections and dispose of connection pools.

This method ensures that all SQLAlchemy engines created by this cache and its processors are properly disposed, releasing all database connections. This is especially important for file-based databases like DuckDB, which lock the database file until all connections are closed.

This method is idempotent and can be called multiple times safely.

Raises:
  • Exception: If any engine disposal fails, the exception will propagate to the caller. This ensures callers are aware of cleanup failures.
config_hash: str | None
158    @property
159    def config_hash(self) -> str | None:
160        """Return a hash of the cache configuration.
161
162        This is the same as the SQLConfig hash from the superclass.
163        """
164        return super(SqlConfig, self).config_hash

Return a hash of the cache configuration.

This is the same as the SQLConfig hash from the superclass.

def execute_sql(self, sql: str | list[str]) -> None:
166    def execute_sql(self, sql: str | list[str]) -> None:
167        """Execute one or more SQL statements against the cache's SQL backend.
168
169        If multiple SQL statements are given, they are executed in order,
170        within the same transaction.
171
172        This method is useful for creating tables, indexes, and other
173        schema objects in the cache. It does not return any results and it
174        automatically closes the connection after executing all statements.
175
176        This method is not intended for querying data. For that, use the `get_records`
177        method - or for a low-level interface, use the `get_sql_engine` method.
178
179        If any of the statements fail, the transaction is canceled and an exception
180        is raised. Most databases will rollback the transaction in this case.
181        """
182        if isinstance(sql, str):
183            # Coerce to a list if a single string is given
184            sql = [sql]
185
186        with self.processor.get_sql_connection() as connection:
187            for sql_statement in sql:
188                connection.execute(text(sql_statement))

Execute one or more SQL statements against the cache's SQL backend.

If multiple SQL statements are given, they are executed in order, within the same transaction.

This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.

This method is not intended for querying data. For that, use the get_records method - or for a low-level interface, use the get_sql_engine method.

If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.

processor: airbyte.shared.sql_processor.SqlProcessorBase
190    @final
191    @property
192    def processor(self) -> SqlProcessorBase:
193        """Return the SQL processor instance."""
194        return self._read_processor

Return the SQL processor instance.

def run_sql_query( self, sql_query: str, *, max_records: int | None = None) -> list[dict[str, typing.Any]]:
196    def run_sql_query(
197        self,
198        sql_query: str,
199        *,
200        max_records: int | None = None,
201    ) -> list[dict[str, Any]]:
202        """Run a SQL query against the cache and return results as a list of dictionaries.
203
204        This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE.
205        For DDL statements or multiple statements, use the processor directly.
206
207        Args:
208            sql_query: The SQL query to execute
209            max_records: Maximum number of records to return. If None, returns all records.
210
211        Returns:
212            List of dictionaries representing the query results
213        """
214        # Execute the SQL within a connection context to ensure the connection stays open
215        # while we fetch the results
216        sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query
217
218        with self.processor.get_sql_connection() as conn:
219            try:
220                result = conn.execute(sql_text)
221            except (
222                sqlalchemy_exc.ProgrammingError,
223                sqlalchemy_exc.SQLAlchemyError,
224            ) as ex:
225                msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}"
226                raise RuntimeError(msg) from ex
227
228            # Convert the result to a list of dictionaries while connection is still open
229            if result.returns_rows:
230                # Get column names
231                columns = list(result.keys()) if result.keys() else []
232
233                # Fetch rows efficiently based on limit
234                if max_records is not None:
235                    rows = result.fetchmany(max_records)
236                else:
237                    rows = result.fetchall()
238
239                return [dict(zip(columns, row, strict=True)) for row in rows]
240
241            # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.)
242            return []

Run a SQL query against the cache and return results as a list of dictionaries.

This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. For DDL statements or multiple statements, use the processor directly.

Arguments:
  • sql_query: The SQL query to execute
  • max_records: Maximum number of records to return. If None, returns all records.
Returns:

List of dictionaries representing the query results

def get_record_processor( self, source_name: str, catalog_provider: airbyte.shared.catalog_providers.CatalogProvider, state_writer: airbyte.shared.state_writers.StateWriterBase | None = None) -> airbyte.shared.sql_processor.SqlProcessorBase:
244    def get_record_processor(
245        self,
246        source_name: str,
247        catalog_provider: CatalogProvider,
248        state_writer: StateWriterBase | None = None,
249    ) -> SqlProcessorBase:
250        """Return a record processor for the specified source name and catalog.
251
252        We first register the source and its catalog with the catalog manager. Then we create a new
253        SQL processor instance with (only) the given input catalog.
254
255        For the state writer, we use a state writer which stores state in an internal SQL table.
256        """
257        # First register the source and catalog into durable storage. This is necessary to ensure
258        # that we can later retrieve the catalog information.
259        self.register_source(
260            source_name=source_name,
261            incoming_source_catalog=catalog_provider.configured_catalog,
262            stream_names=set(catalog_provider.stream_names),
263        )
264
265        # Next create a new SQL processor instance with the given catalog - and a state writer
266        # that writes state to the internal SQL table and associates with the given source name.
267        return self._sql_processor_class(
268            sql_config=self,
269            catalog_provider=catalog_provider,
270            state_writer=state_writer or self.get_state_writer(source_name=source_name),
271            temp_dir=self.cache_dir,
272            temp_file_cleanup=self.cleanup,
273        )

Return a record processor for the specified source name and catalog.

We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.

For the state writer, we use a state writer which stores state in an internal SQL table.

def get_records(self, stream_name: str) -> airbyte.CachedDataset:
277    def get_records(
278        self,
279        stream_name: str,
280    ) -> CachedDataset:
281        """Uses SQLAlchemy to select all rows from the table."""
282        return CachedDataset(self, stream_name)

Uses SQLAlchemy to select all rows from the table.

def get_pandas_dataframe(self, stream_name: str) -> pandas.core.frame.DataFrame:
284    def get_pandas_dataframe(
285        self,
286        stream_name: str,
287    ) -> pd.DataFrame:
288        """Return a Pandas data frame with the stream's data."""
289        table_name = self._read_processor.get_sql_table_name(stream_name)
290        engine = self.get_sql_engine()
291        return pd.read_sql_table(table_name, engine, schema=self.schema_name)

Return a Pandas data frame with the stream's data.

def get_arrow_dataset( self, stream_name: str, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
293    def get_arrow_dataset(
294        self,
295        stream_name: str,
296        *,
297        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
298    ) -> ds.Dataset:
299        """Return an Arrow Dataset with the stream's data."""
300        table_name = self._read_processor.get_sql_table_name(stream_name)
301        engine = self.get_sql_engine()
302
303        # Read the table in chunks to handle large tables which does not fits in memory
304        pandas_chunks = pd.read_sql_table(
305            table_name=table_name,
306            con=engine,
307            schema=self.schema_name,
308            chunksize=max_chunk_size,
309        )
310
311        arrow_batches_list = []
312        arrow_schema = None
313
314        for pandas_chunk in pandas_chunks:
315            if arrow_schema is None:
316                # Initialize the schema with the first chunk
317                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
318
319            # Convert each pandas chunk to an Arrow Table
320            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
321            arrow_batches_list.append(arrow_table)
322
323        return ds.dataset(arrow_batches_list)

Return an Arrow Dataset with the stream's data.

streams: dict[str, airbyte.CachedDataset]
325    @final
326    @property
327    def streams(self) -> dict[str, CachedDataset]:
328        """Return a temporary table name."""
329        result = {}
330        stream_names = set(self._catalog_backend.stream_names)
331
332        for stream_name in stream_names:
333            result[stream_name] = CachedDataset(self, stream_name)
334
335        return result

Return a temporary table name.

def get_state_provider( self, source_name: str, *, refresh: bool = True, destination_name: str | None = None) -> airbyte.shared.state_providers.StateProviderBase:
350    def get_state_provider(
351        self,
352        source_name: str,
353        *,
354        refresh: bool = True,
355        destination_name: str | None = None,
356    ) -> StateProviderBase:
357        """Return a state provider for the specified source name."""
358        return self._state_backend.get_state_provider(
359            source_name=source_name,
360            table_prefix=self.table_prefix or "",
361            refresh=refresh,
362            destination_name=destination_name,
363        )

Return a state provider for the specified source name.

def get_state_writer( self, source_name: str, destination_name: str | None = None) -> airbyte.shared.state_writers.StateWriterBase:
365    def get_state_writer(
366        self,
367        source_name: str,
368        destination_name: str | None = None,
369    ) -> StateWriterBase:
370        """Return a state writer for the specified source name.
371
372        If syncing to the cache, `destination_name` should be `None`.
373        If syncing to a destination, `destination_name` should be the destination name.
374        """
375        return self._state_backend.get_state_writer(
376            source_name=source_name,
377            destination_name=destination_name,
378        )

Return a state writer for the specified source name.

If syncing to the cache, destination_name should be None. If syncing to a destination, destination_name should be the destination name.

def register_source( self, source_name: str, incoming_source_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog, stream_names: set[str]) -> None:
380    def register_source(
381        self,
382        source_name: str,
383        incoming_source_catalog: ConfiguredAirbyteCatalog,
384        stream_names: set[str],
385    ) -> None:
386        """Register the source name and catalog."""
387        self._catalog_backend.register_source(
388            source_name=source_name,
389            incoming_source_catalog=incoming_source_catalog,
390            incoming_stream_names=stream_names,
391        )

Register the source name and catalog.

def create_source_tables( self, source: airbyte.Source, streams: Union[list[str], Literal['*'], NoneType] = None) -> None:
393    def create_source_tables(
394        self,
395        source: Source,
396        streams: Literal["*"] | list[str] | None = None,
397    ) -> None:
398        """Create tables in the cache for the provided source if they do not exist already.
399
400        Tables are created based upon the Source's catalog.
401
402        Args:
403            source: The source to create tables for.
404            streams: Stream names to create tables for. If None, use the Source's selected_streams
405                or "*" if neither is set. If "*", all available streams will be used.
406        """
407        if streams is None:
408            streams = source.get_selected_streams() or "*"
409
410        catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams))
411
412        # Register the incoming source catalog
413        self.register_source(
414            source_name=source.name,
415            incoming_source_catalog=catalog_provider.configured_catalog,
416            stream_names=set(catalog_provider.stream_names),
417        )
418
419        # Ensure schema exists
420        self.processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
421
422        # Create tables for each stream if they don't exist
423        for stream_name in catalog_provider.stream_names:
424            self.processor._ensure_final_table_exists(  # noqa: SLF001
425                stream_name=stream_name,
426                create_if_missing=True,
427            )

Create tables in the cache for the provided source if they do not exist already.

Tables are created based upon the Source's catalog.

Arguments:
  • source: The source to create tables for.
  • streams: Stream names to create tables for. If None, use the Source's selected_streams or "" if neither is set. If "", all available streams will be used.
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
get_sql_alchemy_url
get_database_name
get_create_table_extra_clauses
get_sql_alchemy_connect_args
get_sql_engine
dispose_engine
get_vendor_client
pydantic.main.BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name