
SQL Cache implementation.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""SQL Cache implementation."""
  4from __future__ import annotations
  6from pathlib import Path
  7from typing import IO, TYPE_CHECKING, Any, ClassVar, final
  9import pandas as pd
 10import pyarrow as pa
 11import pyarrow.dataset as ds
 12from pydantic import Field, PrivateAttr
 13from sqlalchemy import text
 15from airbyte_protocol.models import ConfiguredAirbyteCatalog
 17from airbyte import constants
 18from airbyte._writers.base import AirbyteWriterInterface
 19from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend
 20from airbyte.caches._state_backend import SqlStateBackend
 21from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP
 22from airbyte.datasets._sql import CachedDataset
 23from airbyte.shared.catalog_providers import CatalogProvider
 24from airbyte.shared.sql_processor import SqlConfig
 25from airbyte.shared.state_writers import StdOutStateWriter
 29    from import Iterator
 31    from airbyte._message_iterators import AirbyteMessageIterator
 32    from airbyte.caches._state_backend_base import StateBackendBase
 33    from airbyte.progress import ProgressTracker
 34    from airbyte.shared.sql_processor import SqlProcessorBase
 35    from airbyte.shared.state_providers import StateProviderBase
 36    from airbyte.shared.state_writers import StateWriterBase
 37    from airbyte.strategies import WriteStrategy
 40class CacheBase(SqlConfig, AirbyteWriterInterface):
 41    """Base configuration for a cache.
 43    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 44    and basic connectivity to the SQL database.
 46    The cache is responsible for managing the state of the data synced to the cache, including the
 47    stream catalog and stream state. The cache also provides the mechanism to read and write data
 48    to the SQL backend specified in the `SqlConfig` class.
 49    """
 51    cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT))
 52    """The directory to store the cache in."""
 54    cleanup: bool = TEMP_FILE_CLEANUP
 55    """Whether to clean up the cache after use."""
 57    _name: str = PrivateAttr()
 59    _sql_processor_class: ClassVar[type[SqlProcessorBase]]
 60    _read_processor: SqlProcessorBase = PrivateAttr()
 62    _catalog_backend: CatalogBackendBase = PrivateAttr()
 63    _state_backend: StateBackendBase = PrivateAttr()
 65    paired_destination_name: ClassVar[str | None] = None
 66    paired_destination_config_class: ClassVar[type | None] = None
 68    @property
 69    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
 70        """Return a dictionary of destination configuration values."""
 71        raise NotImplementedError(
 72            f"The type '{type(self).__name__}' does not define an equivalent destination "
 73            "configuration."
 74        )
 76    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 77        """Initialize the cache and backends."""
 78        super().__init__(**data)
 80        # Create a temporary processor to do the work of ensuring the schema exists
 81        temp_processor = self._sql_processor_class(
 82            sql_config=self,
 83            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 84            state_writer=StdOutStateWriter(),
 85            temp_dir=self.cache_dir,
 86            temp_file_cleanup=self.cleanup,
 87        )
 88        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 90        # Initialize the catalog and state backends
 91        self._catalog_backend = SqlCatalogBackend(
 92            engine=self.get_sql_engine(),
 93            table_prefix=self.table_prefix or "",
 94        )
 95        self._state_backend = SqlStateBackend(
 96            engine=self.get_sql_engine(),
 97            table_prefix=self.table_prefix or "",
 98        )
100        # Now we can create the SQL read processor
101        self._read_processor = self._sql_processor_class(
102            sql_config=self,
103            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
104            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
105            temp_dir=self.cache_dir,
106            temp_file_cleanup=self.cleanup,
107        )
109    @property
110    def config_hash(self) -> str | None:
111        """Return a hash of the cache configuration.
113        This is the same as the SQLConfig hash from the superclass.
114        """
115        return super(SqlConfig, self).config_hash
117    def execute_sql(self, sql: str | list[str]) -> None:
118        """Execute one or more SQL statements against the cache's SQL backend.
120        If multiple SQL statements are given, they are executed in order,
121        within the same transaction.
123        This method is useful for creating tables, indexes, and other
124        schema objects in the cache. It does not return any results and it
125        automatically closes the connection after executing all statements.
127        This method is not intended for querying data. For that, use the `get_records`
128        method - or for a low-level interface, use the `get_sql_engine` method.
130        If any of the statements fail, the transaction is canceled and an exception
131        is raised. Most databases will rollback the transaction in this case.
132        """
133        if isinstance(sql, str):
134            # Coerce to a list if a single string is given
135            sql = [sql]
137        with self.processor.get_sql_connection() as connection:
138            for sql_statement in sql:
139                connection.execute(text(sql_statement))
141    @final
142    @property
143    def processor(self) -> SqlProcessorBase:
144        """Return the SQL processor instance."""
145        return self._read_processor
147    def get_record_processor(
148        self,
149        source_name: str,
150        catalog_provider: CatalogProvider,
151        state_writer: StateWriterBase | None = None,
152    ) -> SqlProcessorBase:
153        """Return a record processor for the specified source name and catalog.
155        We first register the source and its catalog with the catalog manager. Then we create a new
156        SQL processor instance with (only) the given input catalog.
158        For the state writer, we use a state writer which stores state in an internal SQL table.
159        """
160        # First register the source and catalog into durable storage. This is necessary to ensure
161        # that we can later retrieve the catalog information.
162        self.register_source(
163            source_name=source_name,
164            incoming_source_catalog=catalog_provider.configured_catalog,
165            stream_names=set(catalog_provider.stream_names),
166        )
168        # Next create a new SQL processor instance with the given catalog - and a state writer
169        # that writes state to the internal SQL table and associates with the given source name.
170        return self._sql_processor_class(
171            sql_config=self,
172            catalog_provider=catalog_provider,
173            state_writer=state_writer or self.get_state_writer(source_name=source_name),
174            temp_dir=self.cache_dir,
175            temp_file_cleanup=self.cleanup,
176        )
178    # Read methods:
180    def get_records(
181        self,
182        stream_name: str,
183    ) -> CachedDataset:
184        """Uses SQLAlchemy to select all rows from the table."""
185        return CachedDataset(self, stream_name)
187    def get_pandas_dataframe(
188        self,
189        stream_name: str,
190    ) -> pd.DataFrame:
191        """Return a Pandas data frame with the stream's data."""
192        table_name = self._read_processor.get_sql_table_name(stream_name)
193        engine = self.get_sql_engine()
194        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
196    def get_arrow_dataset(
197        self,
198        stream_name: str,
199        *,
200        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
201    ) -> ds.Dataset:
202        """Return an Arrow Dataset with the stream's data."""
203        table_name = self._read_processor.get_sql_table_name(stream_name)
204        engine = self.get_sql_engine()
206        # Read the table in chunks to handle large tables which does not fits in memory
207        pandas_chunks = pd.read_sql_table(
208            table_name=table_name,
209            con=engine,
210            schema=self.schema_name,
211            chunksize=max_chunk_size,
212        )
214        arrow_batches_list = []
215        arrow_schema = None
217        for pandas_chunk in pandas_chunks:
218            if arrow_schema is None:
219                # Initialize the schema with the first chunk
220                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
222            # Convert each pandas chunk to an Arrow Table
223            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
224            arrow_batches_list.append(arrow_table)
226        return ds.dataset(arrow_batches_list)
228    @final
229    @property
230    def streams(self) -> dict[str, CachedDataset]:
231        """Return a temporary table name."""
232        result = {}
233        stream_names = set(self._catalog_backend.stream_names)
235        for stream_name in stream_names:
236            result[stream_name] = CachedDataset(self, stream_name)
238        return result
240    @final
241    def __len__(self) -> int:
242        """Gets the number of streams."""
243        return len(self._catalog_backend.stream_names)
245    @final
246    def __bool__(self) -> bool:
247        """Always True.
249        This is needed so that caches with zero streams are not falsey (None-like).
250        """
251        return True
253    def get_state_provider(
254        self,
255        source_name: str,
256        *,
257        refresh: bool = True,
258        destination_name: str | None = None,
259    ) -> StateProviderBase:
260        """Return a state provider for the specified source name."""
261        return self._state_backend.get_state_provider(
262            source_name=source_name,
263            table_prefix=self.table_prefix or "",
264            refresh=refresh,
265            destination_name=destination_name,
266        )
268    def get_state_writer(
269        self,
270        source_name: str,
271        destination_name: str | None = None,
272    ) -> StateWriterBase:
273        """Return a state writer for the specified source name.
275        If syncing to the cache, `destination_name` should be `None`.
276        If syncing to a destination, `destination_name` should be the destination name.
277        """
278        return self._state_backend.get_state_writer(
279            source_name=source_name,
280            destination_name=destination_name,
281        )
283    def register_source(
284        self,
285        source_name: str,
286        incoming_source_catalog: ConfiguredAirbyteCatalog,
287        stream_names: set[str],
288    ) -> None:
289        """Register the source name and catalog."""
290        self._catalog_backend.register_source(
291            source_name=source_name,
292            incoming_source_catalog=incoming_source_catalog,
293            incoming_stream_names=stream_names,
294        )
296    def __getitem__(self, stream: str) -> CachedDataset:
297        """Return a dataset by stream name."""
298        return self.streams[stream]
300    def __contains__(self, stream: str) -> bool:
301        """Return whether a stream is in the cache."""
302        return stream in (self._catalog_backend.stream_names)
304    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
305        self,
306    ) -> Iterator[tuple[str, Any]]:
307        """Iterate over the streams in the cache."""
308        return ((name, dataset) for name, dataset in self.streams.items())
310    def _write_airbyte_message_stream(
311        self,
312        stdin: IO[str] | AirbyteMessageIterator,
313        *,
314        catalog_provider: CatalogProvider,
315        write_strategy: WriteStrategy,
316        state_writer: StateWriterBase | None = None,
317        progress_tracker: ProgressTracker,
318    ) -> None:
319        """Read from the connector and write to the cache."""
320        cache_processor = self.get_record_processor(
321  ,
322            catalog_provider=catalog_provider,
323            state_writer=state_writer,
324        )
325        cache_processor.process_airbyte_messages(
326            messages=stdin,
327            write_strategy=write_strategy,
328            progress_tracker=progress_tracker,
329        )
330        progress_tracker.log_cache_processing_complete()
class CacheBase(airbyte.shared.sql_processor.SqlConfig, airbyte._writers.base.AirbyteWriterInterface):
 41class CacheBase(SqlConfig, AirbyteWriterInterface):
 42    """Base configuration for a cache.
 44    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 45    and basic connectivity to the SQL database.
 47    The cache is responsible for managing the state of the data synced to the cache, including the
 48    stream catalog and stream state. The cache also provides the mechanism to read and write data
 49    to the SQL backend specified in the `SqlConfig` class.
 50    """
 52    cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT))
 53    """The directory to store the cache in."""
 55    cleanup: bool = TEMP_FILE_CLEANUP
 56    """Whether to clean up the cache after use."""
 58    _name: str = PrivateAttr()
 60    _sql_processor_class: ClassVar[type[SqlProcessorBase]]
 61    _read_processor: SqlProcessorBase = PrivateAttr()
 63    _catalog_backend: CatalogBackendBase = PrivateAttr()
 64    _state_backend: StateBackendBase = PrivateAttr()
 66    paired_destination_name: ClassVar[str | None] = None
 67    paired_destination_config_class: ClassVar[type | None] = None
 69    @property
 70    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
 71        """Return a dictionary of destination configuration values."""
 72        raise NotImplementedError(
 73            f"The type '{type(self).__name__}' does not define an equivalent destination "
 74            "configuration."
 75        )
 77    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 78        """Initialize the cache and backends."""
 79        super().__init__(**data)
 81        # Create a temporary processor to do the work of ensuring the schema exists
 82        temp_processor = self._sql_processor_class(
 83            sql_config=self,
 84            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 85            state_writer=StdOutStateWriter(),
 86            temp_dir=self.cache_dir,
 87            temp_file_cleanup=self.cleanup,
 88        )
 89        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 91        # Initialize the catalog and state backends
 92        self._catalog_backend = SqlCatalogBackend(
 93            engine=self.get_sql_engine(),
 94            table_prefix=self.table_prefix or "",
 95        )
 96        self._state_backend = SqlStateBackend(
 97            engine=self.get_sql_engine(),
 98            table_prefix=self.table_prefix or "",
 99        )
101        # Now we can create the SQL read processor
102        self._read_processor = self._sql_processor_class(
103            sql_config=self,
104            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
105            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
106            temp_dir=self.cache_dir,
107            temp_file_cleanup=self.cleanup,
108        )
110    @property
111    def config_hash(self) -> str | None:
112        """Return a hash of the cache configuration.
114        This is the same as the SQLConfig hash from the superclass.
115        """
116        return super(SqlConfig, self).config_hash
118    def execute_sql(self, sql: str | list[str]) -> None:
119        """Execute one or more SQL statements against the cache's SQL backend.
121        If multiple SQL statements are given, they are executed in order,
122        within the same transaction.
124        This method is useful for creating tables, indexes, and other
125        schema objects in the cache. It does not return any results and it
126        automatically closes the connection after executing all statements.
128        This method is not intended for querying data. For that, use the `get_records`
129        method - or for a low-level interface, use the `get_sql_engine` method.
131        If any of the statements fail, the transaction is canceled and an exception
132        is raised. Most databases will rollback the transaction in this case.
133        """
134        if isinstance(sql, str):
135            # Coerce to a list if a single string is given
136            sql = [sql]
138        with self.processor.get_sql_connection() as connection:
139            for sql_statement in sql:
140                connection.execute(text(sql_statement))
142    @final
143    @property
144    def processor(self) -> SqlProcessorBase:
145        """Return the SQL processor instance."""
146        return self._read_processor
148    def get_record_processor(
149        self,
150        source_name: str,
151        catalog_provider: CatalogProvider,
152        state_writer: StateWriterBase | None = None,
153    ) -> SqlProcessorBase:
154        """Return a record processor for the specified source name and catalog.
156        We first register the source and its catalog with the catalog manager. Then we create a new
157        SQL processor instance with (only) the given input catalog.
159        For the state writer, we use a state writer which stores state in an internal SQL table.
160        """
161        # First register the source and catalog into durable storage. This is necessary to ensure
162        # that we can later retrieve the catalog information.
163        self.register_source(
164            source_name=source_name,
165            incoming_source_catalog=catalog_provider.configured_catalog,
166            stream_names=set(catalog_provider.stream_names),
167        )
169        # Next create a new SQL processor instance with the given catalog - and a state writer
170        # that writes state to the internal SQL table and associates with the given source name.
171        return self._sql_processor_class(
172            sql_config=self,
173            catalog_provider=catalog_provider,
174            state_writer=state_writer or self.get_state_writer(source_name=source_name),
175            temp_dir=self.cache_dir,
176            temp_file_cleanup=self.cleanup,
177        )
179    # Read methods:
181    def get_records(
182        self,
183        stream_name: str,
184    ) -> CachedDataset:
185        """Uses SQLAlchemy to select all rows from the table."""
186        return CachedDataset(self, stream_name)
188    def get_pandas_dataframe(
189        self,
190        stream_name: str,
191    ) -> pd.DataFrame:
192        """Return a Pandas data frame with the stream's data."""
193        table_name = self._read_processor.get_sql_table_name(stream_name)
194        engine = self.get_sql_engine()
195        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
197    def get_arrow_dataset(
198        self,
199        stream_name: str,
200        *,
201        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
202    ) -> ds.Dataset:
203        """Return an Arrow Dataset with the stream's data."""
204        table_name = self._read_processor.get_sql_table_name(stream_name)
205        engine = self.get_sql_engine()
207        # Read the table in chunks to handle large tables which does not fits in memory
208        pandas_chunks = pd.read_sql_table(
209            table_name=table_name,
210            con=engine,
211            schema=self.schema_name,
212            chunksize=max_chunk_size,
213        )
215        arrow_batches_list = []
216        arrow_schema = None
218        for pandas_chunk in pandas_chunks:
219            if arrow_schema is None:
220                # Initialize the schema with the first chunk
221                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
223            # Convert each pandas chunk to an Arrow Table
224            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
225            arrow_batches_list.append(arrow_table)
227        return ds.dataset(arrow_batches_list)
229    @final
230    @property
231    def streams(self) -> dict[str, CachedDataset]:
232        """Return a temporary table name."""
233        result = {}
234        stream_names = set(self._catalog_backend.stream_names)
236        for stream_name in stream_names:
237            result[stream_name] = CachedDataset(self, stream_name)
239        return result
241    @final
242    def __len__(self) -> int:
243        """Gets the number of streams."""
244        return len(self._catalog_backend.stream_names)
246    @final
247    def __bool__(self) -> bool:
248        """Always True.
250        This is needed so that caches with zero streams are not falsey (None-like).
251        """
252        return True
254    def get_state_provider(
255        self,
256        source_name: str,
257        *,
258        refresh: bool = True,
259        destination_name: str | None = None,
260    ) -> StateProviderBase:
261        """Return a state provider for the specified source name."""
262        return self._state_backend.get_state_provider(
263            source_name=source_name,
264            table_prefix=self.table_prefix or "",
265            refresh=refresh,
266            destination_name=destination_name,
267        )
269    def get_state_writer(
270        self,
271        source_name: str,
272        destination_name: str | None = None,
273    ) -> StateWriterBase:
274        """Return a state writer for the specified source name.
276        If syncing to the cache, `destination_name` should be `None`.
277        If syncing to a destination, `destination_name` should be the destination name.
278        """
279        return self._state_backend.get_state_writer(
280            source_name=source_name,
281            destination_name=destination_name,
282        )
284    def register_source(
285        self,
286        source_name: str,
287        incoming_source_catalog: ConfiguredAirbyteCatalog,
288        stream_names: set[str],
289    ) -> None:
290        """Register the source name and catalog."""
291        self._catalog_backend.register_source(
292            source_name=source_name,
293            incoming_source_catalog=incoming_source_catalog,
294            incoming_stream_names=stream_names,
295        )
297    def __getitem__(self, stream: str) -> CachedDataset:
298        """Return a dataset by stream name."""
299        return self.streams[stream]
301    def __contains__(self, stream: str) -> bool:
302        """Return whether a stream is in the cache."""
303        return stream in (self._catalog_backend.stream_names)
305    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
306        self,
307    ) -> Iterator[tuple[str, Any]]:
308        """Iterate over the streams in the cache."""
309        return ((name, dataset) for name, dataset in self.streams.items())
311    def _write_airbyte_message_stream(
312        self,
313        stdin: IO[str] | AirbyteMessageIterator,
314        *,
315        catalog_provider: CatalogProvider,
316        write_strategy: WriteStrategy,
317        state_writer: StateWriterBase | None = None,
318        progress_tracker: ProgressTracker,
319    ) -> None:
320        """Read from the connector and write to the cache."""
321        cache_processor = self.get_record_processor(
322  ,
323            catalog_provider=catalog_provider,
324            state_writer=state_writer,
325        )
326        cache_processor.process_airbyte_messages(
327            messages=stdin,
328            write_strategy=write_strategy,
329            progress_tracker=progress_tracker,
330        )
331        progress_tracker.log_cache_processing_complete()

Base configuration for a cache.

Caches inherit from the matching SqlConfig class, which provides the SQL config settings and basic connectivity to the SQL database.

The cache is responsible for managing the state of the data synced to the cache, including the stream catalog and stream state. The cache also provides the mechanism to read and write data to the SQL backend specified in the SqlConfig class.

CacheBase(**data: Any)
 77    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 78        """Initialize the cache and backends."""
 79        super().__init__(**data)
 81        # Create a temporary processor to do the work of ensuring the schema exists
 82        temp_processor = self._sql_processor_class(
 83            sql_config=self,
 84            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 85            state_writer=StdOutStateWriter(),
 86            temp_dir=self.cache_dir,
 87            temp_file_cleanup=self.cleanup,
 88        )
 89        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 91        # Initialize the catalog and state backends
 92        self._catalog_backend = SqlCatalogBackend(
 93            engine=self.get_sql_engine(),
 94            table_prefix=self.table_prefix or "",
 95        )
 96        self._state_backend = SqlStateBackend(
 97            engine=self.get_sql_engine(),
 98            table_prefix=self.table_prefix or "",
 99        )
101        # Now we can create the SQL read processor
102        self._read_processor = self._sql_processor_class(
103            sql_config=self,
104            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
105            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
106            temp_dir=self.cache_dir,
107            temp_file_cleanup=self.cleanup,
108        )

Initialize the cache and backends.

cache_dir: pathlib.Path

The directory to store the cache in.

cleanup: bool

Whether to clean up the cache after use.

paired_destination_name: ClassVar[str | None] = None
paired_destination_config_class: ClassVar[type | None] = None
paired_destination_config: Union[Any, dict[str, Any]]
69    @property
70    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
71        """Return a dictionary of destination configuration values."""
72        raise NotImplementedError(
73            f"The type '{type(self).__name__}' does not define an equivalent destination "
74            "configuration."
75        )

Return a dictionary of destination configuration values.

config_hash: str | None
110    @property
111    def config_hash(self) -> str | None:
112        """Return a hash of the cache configuration.
114        This is the same as the SQLConfig hash from the superclass.
115        """
116        return super(SqlConfig, self).config_hash

Return a hash of the cache configuration.

This is the same as the SQLConfig hash from the superclass.

def execute_sql(self, sql: str | list[str]) -> None:
118    def execute_sql(self, sql: str | list[str]) -> None:
119        """Execute one or more SQL statements against the cache's SQL backend.
121        If multiple SQL statements are given, they are executed in order,
122        within the same transaction.
124        This method is useful for creating tables, indexes, and other
125        schema objects in the cache. It does not return any results and it
126        automatically closes the connection after executing all statements.
128        This method is not intended for querying data. For that, use the `get_records`
129        method - or for a low-level interface, use the `get_sql_engine` method.
131        If any of the statements fail, the transaction is canceled and an exception
132        is raised. Most databases will rollback the transaction in this case.
133        """
134        if isinstance(sql, str):
135            # Coerce to a list if a single string is given
136            sql = [sql]
138        with self.processor.get_sql_connection() as connection:
139            for sql_statement in sql:
140                connection.execute(text(sql_statement))

Execute one or more SQL statements against the cache's SQL backend.

If multiple SQL statements are given, they are executed in order, within the same transaction.

This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.

This method is not intended for querying data. For that, use the get_records method - or for a low-level interface, use the get_sql_engine method.

If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.

processor: airbyte.shared.sql_processor.SqlProcessorBase
142    @final
143    @property
144    def processor(self) -> SqlProcessorBase:
145        """Return the SQL processor instance."""
146        return self._read_processor

Return the SQL processor instance.

def get_record_processor( self, source_name: str, catalog_provider: airbyte.shared.catalog_providers.CatalogProvider, state_writer: airbyte.shared.state_writers.StateWriterBase | None = None) -> airbyte.shared.sql_processor.SqlProcessorBase:
148    def get_record_processor(
149        self,
150        source_name: str,
151        catalog_provider: CatalogProvider,
152        state_writer: StateWriterBase | None = None,
153    ) -> SqlProcessorBase:
154        """Return a record processor for the specified source name and catalog.
156        We first register the source and its catalog with the catalog manager. Then we create a new
157        SQL processor instance with (only) the given input catalog.
159        For the state writer, we use a state writer which stores state in an internal SQL table.
160        """
161        # First register the source and catalog into durable storage. This is necessary to ensure
162        # that we can later retrieve the catalog information.
163        self.register_source(
164            source_name=source_name,
165            incoming_source_catalog=catalog_provider.configured_catalog,
166            stream_names=set(catalog_provider.stream_names),
167        )
169        # Next create a new SQL processor instance with the given catalog - and a state writer
170        # that writes state to the internal SQL table and associates with the given source name.
171        return self._sql_processor_class(
172            sql_config=self,
173            catalog_provider=catalog_provider,
174            state_writer=state_writer or self.get_state_writer(source_name=source_name),
175            temp_dir=self.cache_dir,
176            temp_file_cleanup=self.cleanup,
177        )

Return a record processor for the specified source name and catalog.

We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.

For the state writer, we use a state writer which stores state in an internal SQL table.

def get_records(self, stream_name: str) -> airbyte.CachedDataset:
181    def get_records(
182        self,
183        stream_name: str,
184    ) -> CachedDataset:
185        """Uses SQLAlchemy to select all rows from the table."""
186        return CachedDataset(self, stream_name)

Uses SQLAlchemy to select all rows from the table.

def get_pandas_dataframe(self, stream_name: str) -> pandas.core.frame.DataFrame:
188    def get_pandas_dataframe(
189        self,
190        stream_name: str,
191    ) -> pd.DataFrame:
192        """Return a Pandas data frame with the stream's data."""
193        table_name = self._read_processor.get_sql_table_name(stream_name)
194        engine = self.get_sql_engine()
195        return pd.read_sql_table(table_name, engine, schema=self.schema_name)

Return a Pandas data frame with the stream's data.

def get_arrow_dataset( self, stream_name: str, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
197    def get_arrow_dataset(
198        self,
199        stream_name: str,
200        *,
201        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
202    ) -> ds.Dataset:
203        """Return an Arrow Dataset with the stream's data."""
204        table_name = self._read_processor.get_sql_table_name(stream_name)
205        engine = self.get_sql_engine()
207        # Read the table in chunks to handle large tables which does not fits in memory
208        pandas_chunks = pd.read_sql_table(
209            table_name=table_name,
210            con=engine,
211            schema=self.schema_name,
212            chunksize=max_chunk_size,
213        )
215        arrow_batches_list = []
216        arrow_schema = None
218        for pandas_chunk in pandas_chunks:
219            if arrow_schema is None:
220                # Initialize the schema with the first chunk
221                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
223            # Convert each pandas chunk to an Arrow Table
224            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
225            arrow_batches_list.append(arrow_table)
227        return ds.dataset(arrow_batches_list)

Return an Arrow Dataset with the stream's data.

streams: dict[str, airbyte.CachedDataset]
229    @final
230    @property
231    def streams(self) -> dict[str, CachedDataset]:
232        """Return a temporary table name."""
233        result = {}
234        stream_names = set(self._catalog_backend.stream_names)
236        for stream_name in stream_names:
237            result[stream_name] = CachedDataset(self, stream_name)
239        return result

Return a temporary table name.

def get_state_provider( self, source_name: str, *, refresh: bool = True, destination_name: str | None = None) -> airbyte.shared.state_providers.StateProviderBase:
254    def get_state_provider(
255        self,
256        source_name: str,
257        *,
258        refresh: bool = True,
259        destination_name: str | None = None,
260    ) -> StateProviderBase:
261        """Return a state provider for the specified source name."""
262        return self._state_backend.get_state_provider(
263            source_name=source_name,
264            table_prefix=self.table_prefix or "",
265            refresh=refresh,
266            destination_name=destination_name,
267        )

Return a state provider for the specified source name.

def get_state_writer( self, source_name: str, destination_name: str | None = None) -> airbyte.shared.state_writers.StateWriterBase:
269    def get_state_writer(
270        self,
271        source_name: str,
272        destination_name: str | None = None,
273    ) -> StateWriterBase:
274        """Return a state writer for the specified source name.
276        If syncing to the cache, `destination_name` should be `None`.
277        If syncing to a destination, `destination_name` should be the destination name.
278        """
279        return self._state_backend.get_state_writer(
280            source_name=source_name,
281            destination_name=destination_name,
282        )

Return a state writer for the specified source name.

If syncing to the cache, destination_name should be None. If syncing to a destination, destination_name should be the destination name.

def register_source( self, source_name: str, incoming_source_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog, stream_names: set[str]) -> None:
284    def register_source(
285        self,
286        source_name: str,
287        incoming_source_catalog: ConfiguredAirbyteCatalog,
288        stream_names: set[str],
289    ) -> None:
290        """Register the source name and catalog."""
291        self._catalog_backend.register_source(
292            source_name=source_name,
293            incoming_source_catalog=incoming_source_catalog,
294            incoming_stream_names=stream_names,
295        )

Register the source name and catalog.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
328def init_private_attributes(self: BaseModel, context: Any, /) -> None:
329    """This function is meant to behave like a BaseModel method to initialise private attributes.
331    It takes context as an argument since that's what pydantic-core passes when calling it.
333    Args:
334        self: The BaseModel instance.
335        context: The context.
336    """
337    if getattr(self, '__pydantic_private__', None) is None:
338        pydantic_private = {}
339        for name, private_attr in self.__private_attributes__.items():
340            default = private_attr.get_default()
341            if default is not PydanticUndefined:
342                pydantic_private[name] = default
343        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

  • self: The BaseModel instance.
  • context: The context.
Inherited Members