airbyte.caches.base

SQL Cache implementation.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""SQL Cache implementation."""
  3
  4from __future__ import annotations
  5
  6from pathlib import Path
  7from typing import IO, TYPE_CHECKING, Any, ClassVar, Literal, final
  8
  9import pandas as pd
 10import pyarrow as pa
 11import pyarrow.dataset as ds
 12from pydantic import Field, PrivateAttr
 13from sqlalchemy import text
 14
 15from airbyte_protocol.models import ConfiguredAirbyteCatalog
 16
 17from airbyte import constants
 18from airbyte._writers.base import AirbyteWriterInterface
 19from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend
 20from airbyte.caches._state_backend import SqlStateBackend
 21from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP
 22from airbyte.datasets._sql import CachedDataset
 23from airbyte.shared.catalog_providers import CatalogProvider
 24from airbyte.shared.sql_processor import SqlConfig
 25from airbyte.shared.state_writers import StdOutStateWriter
 26
 27
 28if TYPE_CHECKING:
 29    from collections.abc import Iterator
 30
 31    from airbyte._message_iterators import AirbyteMessageIterator
 32    from airbyte.caches._state_backend_base import StateBackendBase
 33    from airbyte.progress import ProgressTracker
 34    from airbyte.shared.sql_processor import SqlProcessorBase
 35    from airbyte.shared.state_providers import StateProviderBase
 36    from airbyte.shared.state_writers import StateWriterBase
 37    from airbyte.sources.base import Source
 38    from airbyte.strategies import WriteStrategy
 39
 40
 41class CacheBase(SqlConfig, AirbyteWriterInterface):
 42    """Base configuration for a cache.
 43
 44    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 45    and basic connectivity to the SQL database.
 46
 47    The cache is responsible for managing the state of the data synced to the cache, including the
 48    stream catalog and stream state. The cache also provides the mechanism to read and write data
 49    to the SQL backend specified in the `SqlConfig` class.
 50    """
 51
 52    cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT))
 53    """The directory to store the cache in."""
 54
 55    cleanup: bool = TEMP_FILE_CLEANUP
 56    """Whether to clean up the cache after use."""
 57
 58    _name: str = PrivateAttr()
 59
 60    _sql_processor_class: ClassVar[type[SqlProcessorBase]]
 61    _read_processor: SqlProcessorBase = PrivateAttr()
 62
 63    _catalog_backend: CatalogBackendBase = PrivateAttr()
 64    _state_backend: StateBackendBase = PrivateAttr()
 65
 66    paired_destination_name: ClassVar[str | None] = None
 67    paired_destination_config_class: ClassVar[type | None] = None
 68
 69    @property
 70    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
 71        """Return a dictionary of destination configuration values."""
 72        raise NotImplementedError(
 73            f"The type '{type(self).__name__}' does not define an equivalent destination "
 74            "configuration."
 75        )
 76
 77    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 78        """Initialize the cache and backends."""
 79        super().__init__(**data)
 80
 81        # Create a temporary processor to do the work of ensuring the schema exists
 82        temp_processor = self._sql_processor_class(
 83            sql_config=self,
 84            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 85            state_writer=StdOutStateWriter(),
 86            temp_dir=self.cache_dir,
 87            temp_file_cleanup=self.cleanup,
 88        )
 89        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 90
 91        # Initialize the catalog and state backends
 92        self._catalog_backend = SqlCatalogBackend(
 93            engine=self.get_sql_engine(),
 94            table_prefix=self.table_prefix or "",
 95        )
 96        self._state_backend = SqlStateBackend(
 97            engine=self.get_sql_engine(),
 98            table_prefix=self.table_prefix or "",
 99        )
100
101        # Now we can create the SQL read processor
102        self._read_processor = self._sql_processor_class(
103            sql_config=self,
104            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
105            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
106            temp_dir=self.cache_dir,
107            temp_file_cleanup=self.cleanup,
108        )
109
110    @property
111    def config_hash(self) -> str | None:
112        """Return a hash of the cache configuration.
113
114        This is the same as the SQLConfig hash from the superclass.
115        """
116        return super(SqlConfig, self).config_hash
117
118    def execute_sql(self, sql: str | list[str]) -> None:
119        """Execute one or more SQL statements against the cache's SQL backend.
120
121        If multiple SQL statements are given, they are executed in order,
122        within the same transaction.
123
124        This method is useful for creating tables, indexes, and other
125        schema objects in the cache. It does not return any results and it
126        automatically closes the connection after executing all statements.
127
128        This method is not intended for querying data. For that, use the `get_records`
129        method - or for a low-level interface, use the `get_sql_engine` method.
130
131        If any of the statements fail, the transaction is canceled and an exception
132        is raised. Most databases will rollback the transaction in this case.
133        """
134        if isinstance(sql, str):
135            # Coerce to a list if a single string is given
136            sql = [sql]
137
138        with self.processor.get_sql_connection() as connection:
139            for sql_statement in sql:
140                connection.execute(text(sql_statement))
141
142    @final
143    @property
144    def processor(self) -> SqlProcessorBase:
145        """Return the SQL processor instance."""
146        return self._read_processor
147
148    def get_record_processor(
149        self,
150        source_name: str,
151        catalog_provider: CatalogProvider,
152        state_writer: StateWriterBase | None = None,
153    ) -> SqlProcessorBase:
154        """Return a record processor for the specified source name and catalog.
155
156        We first register the source and its catalog with the catalog manager. Then we create a new
157        SQL processor instance with (only) the given input catalog.
158
159        For the state writer, we use a state writer which stores state in an internal SQL table.
160        """
161        # First register the source and catalog into durable storage. This is necessary to ensure
162        # that we can later retrieve the catalog information.
163        self.register_source(
164            source_name=source_name,
165            incoming_source_catalog=catalog_provider.configured_catalog,
166            stream_names=set(catalog_provider.stream_names),
167        )
168
169        # Next create a new SQL processor instance with the given catalog - and a state writer
170        # that writes state to the internal SQL table and associates with the given source name.
171        return self._sql_processor_class(
172            sql_config=self,
173            catalog_provider=catalog_provider,
174            state_writer=state_writer or self.get_state_writer(source_name=source_name),
175            temp_dir=self.cache_dir,
176            temp_file_cleanup=self.cleanup,
177        )
178
179    # Read methods:
180
181    def get_records(
182        self,
183        stream_name: str,
184    ) -> CachedDataset:
185        """Uses SQLAlchemy to select all rows from the table."""
186        return CachedDataset(self, stream_name)
187
188    def get_pandas_dataframe(
189        self,
190        stream_name: str,
191    ) -> pd.DataFrame:
192        """Return a Pandas data frame with the stream's data."""
193        table_name = self._read_processor.get_sql_table_name(stream_name)
194        engine = self.get_sql_engine()
195        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
196
197    def get_arrow_dataset(
198        self,
199        stream_name: str,
200        *,
201        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
202    ) -> ds.Dataset:
203        """Return an Arrow Dataset with the stream's data."""
204        table_name = self._read_processor.get_sql_table_name(stream_name)
205        engine = self.get_sql_engine()
206
207        # Read the table in chunks to handle large tables which does not fits in memory
208        pandas_chunks = pd.read_sql_table(
209            table_name=table_name,
210            con=engine,
211            schema=self.schema_name,
212            chunksize=max_chunk_size,
213        )
214
215        arrow_batches_list = []
216        arrow_schema = None
217
218        for pandas_chunk in pandas_chunks:
219            if arrow_schema is None:
220                # Initialize the schema with the first chunk
221                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
222
223            # Convert each pandas chunk to an Arrow Table
224            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
225            arrow_batches_list.append(arrow_table)
226
227        return ds.dataset(arrow_batches_list)
228
229    @final
230    @property
231    def streams(self) -> dict[str, CachedDataset]:
232        """Return a temporary table name."""
233        result = {}
234        stream_names = set(self._catalog_backend.stream_names)
235
236        for stream_name in stream_names:
237            result[stream_name] = CachedDataset(self, stream_name)
238
239        return result
240
241    @final
242    def __len__(self) -> int:
243        """Gets the number of streams."""
244        return len(self._catalog_backend.stream_names)
245
246    @final
247    def __bool__(self) -> bool:
248        """Always True.
249
250        This is needed so that caches with zero streams are not falsey (None-like).
251        """
252        return True
253
254    def get_state_provider(
255        self,
256        source_name: str,
257        *,
258        refresh: bool = True,
259        destination_name: str | None = None,
260    ) -> StateProviderBase:
261        """Return a state provider for the specified source name."""
262        return self._state_backend.get_state_provider(
263            source_name=source_name,
264            table_prefix=self.table_prefix or "",
265            refresh=refresh,
266            destination_name=destination_name,
267        )
268
269    def get_state_writer(
270        self,
271        source_name: str,
272        destination_name: str | None = None,
273    ) -> StateWriterBase:
274        """Return a state writer for the specified source name.
275
276        If syncing to the cache, `destination_name` should be `None`.
277        If syncing to a destination, `destination_name` should be the destination name.
278        """
279        return self._state_backend.get_state_writer(
280            source_name=source_name,
281            destination_name=destination_name,
282        )
283
284    def register_source(
285        self,
286        source_name: str,
287        incoming_source_catalog: ConfiguredAirbyteCatalog,
288        stream_names: set[str],
289    ) -> None:
290        """Register the source name and catalog."""
291        self._catalog_backend.register_source(
292            source_name=source_name,
293            incoming_source_catalog=incoming_source_catalog,
294            incoming_stream_names=stream_names,
295        )
296
297    def create_source_tables(
298        self,
299        source: Source,
300        streams: Literal["*"] | list[str] | None = None,
301    ) -> None:
302        """Create tables in the cache for the provided source if they do not exist already.
303
304        Tables are created based upon the Source's catalog.
305
306        Args:
307            source: The source to create tables for.
308            streams: Stream names to create tables for. If None, use the Source's selected_streams
309                or "*" if neither is set. If "*", all available streams will be used.
310        """
311        if streams is None:
312            streams = source.get_selected_streams() or "*"
313
314        catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams))
315
316        # Register the incoming source catalog
317        self.register_source(
318            source_name=source.name,
319            incoming_source_catalog=catalog_provider.configured_catalog,
320            stream_names=set(catalog_provider.stream_names),
321        )
322
323        # Ensure schema exists
324        self.processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
325
326        # Create tables for each stream if they don't exist
327        for stream_name in catalog_provider.stream_names:
328            self.processor._ensure_final_table_exists(  # noqa: SLF001
329                stream_name=stream_name,
330                create_if_missing=True,
331            )
332
333    def __getitem__(self, stream: str) -> CachedDataset:
334        """Return a dataset by stream name."""
335        return self.streams[stream]
336
337    def __contains__(self, stream: str) -> bool:
338        """Return whether a stream is in the cache."""
339        return stream in (self._catalog_backend.stream_names)
340
341    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
342        self,
343    ) -> Iterator[tuple[str, Any]]:
344        """Iterate over the streams in the cache."""
345        return ((name, dataset) for name, dataset in self.streams.items())
346
347    def _write_airbyte_message_stream(
348        self,
349        stdin: IO[str] | AirbyteMessageIterator,
350        *,
351        catalog_provider: CatalogProvider,
352        write_strategy: WriteStrategy,
353        state_writer: StateWriterBase | None = None,
354        progress_tracker: ProgressTracker,
355    ) -> None:
356        """Read from the connector and write to the cache."""
357        cache_processor = self.get_record_processor(
358            source_name=self.name,
359            catalog_provider=catalog_provider,
360            state_writer=state_writer,
361        )
362        cache_processor.process_airbyte_messages(
363            messages=stdin,
364            write_strategy=write_strategy,
365            progress_tracker=progress_tracker,
366        )
367        progress_tracker.log_cache_processing_complete()
class CacheBase(airbyte.shared.sql_processor.SqlConfig, airbyte._writers.base.AirbyteWriterInterface):
 42class CacheBase(SqlConfig, AirbyteWriterInterface):
 43    """Base configuration for a cache.
 44
 45    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 46    and basic connectivity to the SQL database.
 47
 48    The cache is responsible for managing the state of the data synced to the cache, including the
 49    stream catalog and stream state. The cache also provides the mechanism to read and write data
 50    to the SQL backend specified in the `SqlConfig` class.
 51    """
 52
 53    cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT))
 54    """The directory to store the cache in."""
 55
 56    cleanup: bool = TEMP_FILE_CLEANUP
 57    """Whether to clean up the cache after use."""
 58
 59    _name: str = PrivateAttr()
 60
 61    _sql_processor_class: ClassVar[type[SqlProcessorBase]]
 62    _read_processor: SqlProcessorBase = PrivateAttr()
 63
 64    _catalog_backend: CatalogBackendBase = PrivateAttr()
 65    _state_backend: StateBackendBase = PrivateAttr()
 66
 67    paired_destination_name: ClassVar[str | None] = None
 68    paired_destination_config_class: ClassVar[type | None] = None
 69
 70    @property
 71    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
 72        """Return a dictionary of destination configuration values."""
 73        raise NotImplementedError(
 74            f"The type '{type(self).__name__}' does not define an equivalent destination "
 75            "configuration."
 76        )
 77
 78    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 79        """Initialize the cache and backends."""
 80        super().__init__(**data)
 81
 82        # Create a temporary processor to do the work of ensuring the schema exists
 83        temp_processor = self._sql_processor_class(
 84            sql_config=self,
 85            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 86            state_writer=StdOutStateWriter(),
 87            temp_dir=self.cache_dir,
 88            temp_file_cleanup=self.cleanup,
 89        )
 90        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 91
 92        # Initialize the catalog and state backends
 93        self._catalog_backend = SqlCatalogBackend(
 94            engine=self.get_sql_engine(),
 95            table_prefix=self.table_prefix or "",
 96        )
 97        self._state_backend = SqlStateBackend(
 98            engine=self.get_sql_engine(),
 99            table_prefix=self.table_prefix or "",
100        )
101
102        # Now we can create the SQL read processor
103        self._read_processor = self._sql_processor_class(
104            sql_config=self,
105            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
106            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
107            temp_dir=self.cache_dir,
108            temp_file_cleanup=self.cleanup,
109        )
110
111    @property
112    def config_hash(self) -> str | None:
113        """Return a hash of the cache configuration.
114
115        This is the same as the SQLConfig hash from the superclass.
116        """
117        return super(SqlConfig, self).config_hash
118
119    def execute_sql(self, sql: str | list[str]) -> None:
120        """Execute one or more SQL statements against the cache's SQL backend.
121
122        If multiple SQL statements are given, they are executed in order,
123        within the same transaction.
124
125        This method is useful for creating tables, indexes, and other
126        schema objects in the cache. It does not return any results and it
127        automatically closes the connection after executing all statements.
128
129        This method is not intended for querying data. For that, use the `get_records`
130        method - or for a low-level interface, use the `get_sql_engine` method.
131
132        If any of the statements fail, the transaction is canceled and an exception
133        is raised. Most databases will rollback the transaction in this case.
134        """
135        if isinstance(sql, str):
136            # Coerce to a list if a single string is given
137            sql = [sql]
138
139        with self.processor.get_sql_connection() as connection:
140            for sql_statement in sql:
141                connection.execute(text(sql_statement))
142
143    @final
144    @property
145    def processor(self) -> SqlProcessorBase:
146        """Return the SQL processor instance."""
147        return self._read_processor
148
149    def get_record_processor(
150        self,
151        source_name: str,
152        catalog_provider: CatalogProvider,
153        state_writer: StateWriterBase | None = None,
154    ) -> SqlProcessorBase:
155        """Return a record processor for the specified source name and catalog.
156
157        We first register the source and its catalog with the catalog manager. Then we create a new
158        SQL processor instance with (only) the given input catalog.
159
160        For the state writer, we use a state writer which stores state in an internal SQL table.
161        """
162        # First register the source and catalog into durable storage. This is necessary to ensure
163        # that we can later retrieve the catalog information.
164        self.register_source(
165            source_name=source_name,
166            incoming_source_catalog=catalog_provider.configured_catalog,
167            stream_names=set(catalog_provider.stream_names),
168        )
169
170        # Next create a new SQL processor instance with the given catalog - and a state writer
171        # that writes state to the internal SQL table and associates with the given source name.
172        return self._sql_processor_class(
173            sql_config=self,
174            catalog_provider=catalog_provider,
175            state_writer=state_writer or self.get_state_writer(source_name=source_name),
176            temp_dir=self.cache_dir,
177            temp_file_cleanup=self.cleanup,
178        )
179
180    # Read methods:
181
182    def get_records(
183        self,
184        stream_name: str,
185    ) -> CachedDataset:
186        """Uses SQLAlchemy to select all rows from the table."""
187        return CachedDataset(self, stream_name)
188
189    def get_pandas_dataframe(
190        self,
191        stream_name: str,
192    ) -> pd.DataFrame:
193        """Return a Pandas data frame with the stream's data."""
194        table_name = self._read_processor.get_sql_table_name(stream_name)
195        engine = self.get_sql_engine()
196        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
197
198    def get_arrow_dataset(
199        self,
200        stream_name: str,
201        *,
202        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
203    ) -> ds.Dataset:
204        """Return an Arrow Dataset with the stream's data."""
205        table_name = self._read_processor.get_sql_table_name(stream_name)
206        engine = self.get_sql_engine()
207
208        # Read the table in chunks to handle large tables which does not fits in memory
209        pandas_chunks = pd.read_sql_table(
210            table_name=table_name,
211            con=engine,
212            schema=self.schema_name,
213            chunksize=max_chunk_size,
214        )
215
216        arrow_batches_list = []
217        arrow_schema = None
218
219        for pandas_chunk in pandas_chunks:
220            if arrow_schema is None:
221                # Initialize the schema with the first chunk
222                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
223
224            # Convert each pandas chunk to an Arrow Table
225            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
226            arrow_batches_list.append(arrow_table)
227
228        return ds.dataset(arrow_batches_list)
229
230    @final
231    @property
232    def streams(self) -> dict[str, CachedDataset]:
233        """Return a temporary table name."""
234        result = {}
235        stream_names = set(self._catalog_backend.stream_names)
236
237        for stream_name in stream_names:
238            result[stream_name] = CachedDataset(self, stream_name)
239
240        return result
241
242    @final
243    def __len__(self) -> int:
244        """Gets the number of streams."""
245        return len(self._catalog_backend.stream_names)
246
247    @final
248    def __bool__(self) -> bool:
249        """Always True.
250
251        This is needed so that caches with zero streams are not falsey (None-like).
252        """
253        return True
254
255    def get_state_provider(
256        self,
257        source_name: str,
258        *,
259        refresh: bool = True,
260        destination_name: str | None = None,
261    ) -> StateProviderBase:
262        """Return a state provider for the specified source name."""
263        return self._state_backend.get_state_provider(
264            source_name=source_name,
265            table_prefix=self.table_prefix or "",
266            refresh=refresh,
267            destination_name=destination_name,
268        )
269
270    def get_state_writer(
271        self,
272        source_name: str,
273        destination_name: str | None = None,
274    ) -> StateWriterBase:
275        """Return a state writer for the specified source name.
276
277        If syncing to the cache, `destination_name` should be `None`.
278        If syncing to a destination, `destination_name` should be the destination name.
279        """
280        return self._state_backend.get_state_writer(
281            source_name=source_name,
282            destination_name=destination_name,
283        )
284
285    def register_source(
286        self,
287        source_name: str,
288        incoming_source_catalog: ConfiguredAirbyteCatalog,
289        stream_names: set[str],
290    ) -> None:
291        """Register the source name and catalog."""
292        self._catalog_backend.register_source(
293            source_name=source_name,
294            incoming_source_catalog=incoming_source_catalog,
295            incoming_stream_names=stream_names,
296        )
297
298    def create_source_tables(
299        self,
300        source: Source,
301        streams: Literal["*"] | list[str] | None = None,
302    ) -> None:
303        """Create tables in the cache for the provided source if they do not exist already.
304
305        Tables are created based upon the Source's catalog.
306
307        Args:
308            source: The source to create tables for.
309            streams: Stream names to create tables for. If None, use the Source's selected_streams
310                or "*" if neither is set. If "*", all available streams will be used.
311        """
312        if streams is None:
313            streams = source.get_selected_streams() or "*"
314
315        catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams))
316
317        # Register the incoming source catalog
318        self.register_source(
319            source_name=source.name,
320            incoming_source_catalog=catalog_provider.configured_catalog,
321            stream_names=set(catalog_provider.stream_names),
322        )
323
324        # Ensure schema exists
325        self.processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
326
327        # Create tables for each stream if they don't exist
328        for stream_name in catalog_provider.stream_names:
329            self.processor._ensure_final_table_exists(  # noqa: SLF001
330                stream_name=stream_name,
331                create_if_missing=True,
332            )
333
334    def __getitem__(self, stream: str) -> CachedDataset:
335        """Return a dataset by stream name."""
336        return self.streams[stream]
337
338    def __contains__(self, stream: str) -> bool:
339        """Return whether a stream is in the cache."""
340        return stream in (self._catalog_backend.stream_names)
341
342    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
343        self,
344    ) -> Iterator[tuple[str, Any]]:
345        """Iterate over the streams in the cache."""
346        return ((name, dataset) for name, dataset in self.streams.items())
347
348    def _write_airbyte_message_stream(
349        self,
350        stdin: IO[str] | AirbyteMessageIterator,
351        *,
352        catalog_provider: CatalogProvider,
353        write_strategy: WriteStrategy,
354        state_writer: StateWriterBase | None = None,
355        progress_tracker: ProgressTracker,
356    ) -> None:
357        """Read from the connector and write to the cache."""
358        cache_processor = self.get_record_processor(
359            source_name=self.name,
360            catalog_provider=catalog_provider,
361            state_writer=state_writer,
362        )
363        cache_processor.process_airbyte_messages(
364            messages=stdin,
365            write_strategy=write_strategy,
366            progress_tracker=progress_tracker,
367        )
368        progress_tracker.log_cache_processing_complete()

Base configuration for a cache.

Caches inherit from the matching SqlConfig class, which provides the SQL config settings and basic connectivity to the SQL database.

The cache is responsible for managing the state of the data synced to the cache, including the stream catalog and stream state. The cache also provides the mechanism to read and write data to the SQL backend specified in the SqlConfig class.

CacheBase(**data: Any)
 78    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 79        """Initialize the cache and backends."""
 80        super().__init__(**data)
 81
 82        # Create a temporary processor to do the work of ensuring the schema exists
 83        temp_processor = self._sql_processor_class(
 84            sql_config=self,
 85            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 86            state_writer=StdOutStateWriter(),
 87            temp_dir=self.cache_dir,
 88            temp_file_cleanup=self.cleanup,
 89        )
 90        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 91
 92        # Initialize the catalog and state backends
 93        self._catalog_backend = SqlCatalogBackend(
 94            engine=self.get_sql_engine(),
 95            table_prefix=self.table_prefix or "",
 96        )
 97        self._state_backend = SqlStateBackend(
 98            engine=self.get_sql_engine(),
 99            table_prefix=self.table_prefix or "",
100        )
101
102        # Now we can create the SQL read processor
103        self._read_processor = self._sql_processor_class(
104            sql_config=self,
105            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
106            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
107            temp_dir=self.cache_dir,
108            temp_file_cleanup=self.cleanup,
109        )

Initialize the cache and backends.

cache_dir: pathlib.Path

The directory to store the cache in.

cleanup: bool

Whether to clean up the cache after use.

paired_destination_name: ClassVar[str | None] = None
paired_destination_config_class: ClassVar[type | None] = None
paired_destination_config: Union[Any, dict[str, Any]]
70    @property
71    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
72        """Return a dictionary of destination configuration values."""
73        raise NotImplementedError(
74            f"The type '{type(self).__name__}' does not define an equivalent destination "
75            "configuration."
76        )

Return a dictionary of destination configuration values.

config_hash: str | None
111    @property
112    def config_hash(self) -> str | None:
113        """Return a hash of the cache configuration.
114
115        This is the same as the SQLConfig hash from the superclass.
116        """
117        return super(SqlConfig, self).config_hash

Return a hash of the cache configuration.

This is the same as the SQLConfig hash from the superclass.

def execute_sql(self, sql: str | list[str]) -> None:
119    def execute_sql(self, sql: str | list[str]) -> None:
120        """Execute one or more SQL statements against the cache's SQL backend.
121
122        If multiple SQL statements are given, they are executed in order,
123        within the same transaction.
124
125        This method is useful for creating tables, indexes, and other
126        schema objects in the cache. It does not return any results and it
127        automatically closes the connection after executing all statements.
128
129        This method is not intended for querying data. For that, use the `get_records`
130        method - or for a low-level interface, use the `get_sql_engine` method.
131
132        If any of the statements fail, the transaction is canceled and an exception
133        is raised. Most databases will rollback the transaction in this case.
134        """
135        if isinstance(sql, str):
136            # Coerce to a list if a single string is given
137            sql = [sql]
138
139        with self.processor.get_sql_connection() as connection:
140            for sql_statement in sql:
141                connection.execute(text(sql_statement))

Execute one or more SQL statements against the cache's SQL backend.

If multiple SQL statements are given, they are executed in order, within the same transaction.

This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.

This method is not intended for querying data. For that, use the get_records method - or for a low-level interface, use the get_sql_engine method.

If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.

processor: airbyte.shared.sql_processor.SqlProcessorBase
143    @final
144    @property
145    def processor(self) -> SqlProcessorBase:
146        """Return the SQL processor instance."""
147        return self._read_processor

Return the SQL processor instance.

def get_record_processor( self, source_name: str, catalog_provider: airbyte.shared.catalog_providers.CatalogProvider, state_writer: airbyte.shared.state_writers.StateWriterBase | None = None) -> airbyte.shared.sql_processor.SqlProcessorBase:
149    def get_record_processor(
150        self,
151        source_name: str,
152        catalog_provider: CatalogProvider,
153        state_writer: StateWriterBase | None = None,
154    ) -> SqlProcessorBase:
155        """Return a record processor for the specified source name and catalog.
156
157        We first register the source and its catalog with the catalog manager. Then we create a new
158        SQL processor instance with (only) the given input catalog.
159
160        For the state writer, we use a state writer which stores state in an internal SQL table.
161        """
162        # First register the source and catalog into durable storage. This is necessary to ensure
163        # that we can later retrieve the catalog information.
164        self.register_source(
165            source_name=source_name,
166            incoming_source_catalog=catalog_provider.configured_catalog,
167            stream_names=set(catalog_provider.stream_names),
168        )
169
170        # Next create a new SQL processor instance with the given catalog - and a state writer
171        # that writes state to the internal SQL table and associates with the given source name.
172        return self._sql_processor_class(
173            sql_config=self,
174            catalog_provider=catalog_provider,
175            state_writer=state_writer or self.get_state_writer(source_name=source_name),
176            temp_dir=self.cache_dir,
177            temp_file_cleanup=self.cleanup,
178        )

Return a record processor for the specified source name and catalog.

We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.

For the state writer, we use a state writer which stores state in an internal SQL table.

def get_records(self, stream_name: str) -> airbyte.CachedDataset:
182    def get_records(
183        self,
184        stream_name: str,
185    ) -> CachedDataset:
186        """Uses SQLAlchemy to select all rows from the table."""
187        return CachedDataset(self, stream_name)

Uses SQLAlchemy to select all rows from the table.

def get_pandas_dataframe(self, stream_name: str) -> pandas.core.frame.DataFrame:
189    def get_pandas_dataframe(
190        self,
191        stream_name: str,
192    ) -> pd.DataFrame:
193        """Return a Pandas data frame with the stream's data."""
194        table_name = self._read_processor.get_sql_table_name(stream_name)
195        engine = self.get_sql_engine()
196        return pd.read_sql_table(table_name, engine, schema=self.schema_name)

Return a Pandas data frame with the stream's data.

def get_arrow_dataset( self, stream_name: str, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
198    def get_arrow_dataset(
199        self,
200        stream_name: str,
201        *,
202        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
203    ) -> ds.Dataset:
204        """Return an Arrow Dataset with the stream's data."""
205        table_name = self._read_processor.get_sql_table_name(stream_name)
206        engine = self.get_sql_engine()
207
208        # Read the table in chunks to handle large tables which does not fits in memory
209        pandas_chunks = pd.read_sql_table(
210            table_name=table_name,
211            con=engine,
212            schema=self.schema_name,
213            chunksize=max_chunk_size,
214        )
215
216        arrow_batches_list = []
217        arrow_schema = None
218
219        for pandas_chunk in pandas_chunks:
220            if arrow_schema is None:
221                # Initialize the schema with the first chunk
222                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
223
224            # Convert each pandas chunk to an Arrow Table
225            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
226            arrow_batches_list.append(arrow_table)
227
228        return ds.dataset(arrow_batches_list)

Return an Arrow Dataset with the stream's data.

streams: dict[str, airbyte.CachedDataset]
230    @final
231    @property
232    def streams(self) -> dict[str, CachedDataset]:
233        """Return a temporary table name."""
234        result = {}
235        stream_names = set(self._catalog_backend.stream_names)
236
237        for stream_name in stream_names:
238            result[stream_name] = CachedDataset(self, stream_name)
239
240        return result

Return a temporary table name.

def get_state_provider( self, source_name: str, *, refresh: bool = True, destination_name: str | None = None) -> airbyte.shared.state_providers.StateProviderBase:
255    def get_state_provider(
256        self,
257        source_name: str,
258        *,
259        refresh: bool = True,
260        destination_name: str | None = None,
261    ) -> StateProviderBase:
262        """Return a state provider for the specified source name."""
263        return self._state_backend.get_state_provider(
264            source_name=source_name,
265            table_prefix=self.table_prefix or "",
266            refresh=refresh,
267            destination_name=destination_name,
268        )

Return a state provider for the specified source name.

def get_state_writer( self, source_name: str, destination_name: str | None = None) -> airbyte.shared.state_writers.StateWriterBase:
270    def get_state_writer(
271        self,
272        source_name: str,
273        destination_name: str | None = None,
274    ) -> StateWriterBase:
275        """Return a state writer for the specified source name.
276
277        If syncing to the cache, `destination_name` should be `None`.
278        If syncing to a destination, `destination_name` should be the destination name.
279        """
280        return self._state_backend.get_state_writer(
281            source_name=source_name,
282            destination_name=destination_name,
283        )

Return a state writer for the specified source name.

If syncing to the cache, destination_name should be None. If syncing to a destination, destination_name should be the destination name.

def register_source( self, source_name: str, incoming_source_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog, stream_names: set[str]) -> None:
285    def register_source(
286        self,
287        source_name: str,
288        incoming_source_catalog: ConfiguredAirbyteCatalog,
289        stream_names: set[str],
290    ) -> None:
291        """Register the source name and catalog."""
292        self._catalog_backend.register_source(
293            source_name=source_name,
294            incoming_source_catalog=incoming_source_catalog,
295            incoming_stream_names=stream_names,
296        )

Register the source name and catalog.

def create_source_tables( self, source: airbyte.Source, streams: Union[list[str], Literal['*'], NoneType] = None) -> None:
298    def create_source_tables(
299        self,
300        source: Source,
301        streams: Literal["*"] | list[str] | None = None,
302    ) -> None:
303        """Create tables in the cache for the provided source if they do not exist already.
304
305        Tables are created based upon the Source's catalog.
306
307        Args:
308            source: The source to create tables for.
309            streams: Stream names to create tables for. If None, use the Source's selected_streams
310                or "*" if neither is set. If "*", all available streams will be used.
311        """
312        if streams is None:
313            streams = source.get_selected_streams() or "*"
314
315        catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams))
316
317        # Register the incoming source catalog
318        self.register_source(
319            source_name=source.name,
320            incoming_source_catalog=catalog_provider.configured_catalog,
321            stream_names=set(catalog_provider.stream_names),
322        )
323
324        # Ensure schema exists
325        self.processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
326
327        # Create tables for each stream if they don't exist
328        for stream_name in catalog_provider.stream_names:
329            self.processor._ensure_final_table_exists(  # noqa: SLF001
330                stream_name=stream_name,
331                create_if_missing=True,
332            )

Create tables in the cache for the provided source if they do not exist already.

Tables are created based upon the Source's catalog.

Arguments:
  • source: The source to create tables for.
  • streams: Stream names to create tables for. If None, use the Source's selected_streams or "" if neither is set. If "", all available streams will be used.
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
328def init_private_attributes(self: BaseModel, context: Any, /) -> None:
329    """This function is meant to behave like a BaseModel method to initialise private attributes.
330
331    It takes context as an argument since that's what pydantic-core passes when calling it.
332
333    Args:
334        self: The BaseModel instance.
335        context: The context.
336    """
337    if getattr(self, '__pydantic_private__', None) is None:
338        pydantic_private = {}
339        for name, private_attr in self.__private_attributes__.items():
340            default = private_attr.get_default()
341            if default is not PydanticUndefined:
342                pydantic_private[name] = default
343        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
get_sql_alchemy_url
get_database_name
get_create_table_extra_clauses
get_sql_engine
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name