airbyte.caches

Base module for all caches.

 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2"""Base module for all caches."""
 3
 4from __future__ import annotations
 5
 6from typing import TYPE_CHECKING
 7
 8from airbyte.caches.base import CacheBase
 9from airbyte.caches.bigquery import BigQueryCache
10from airbyte.caches.duckdb import DuckDBCache
11from airbyte.caches.motherduck import MotherDuckCache
12from airbyte.caches.postgres import PostgresCache
13from airbyte.caches.snowflake import SnowflakeCache
14from airbyte.caches.util import get_default_cache, new_local_cache
15
16
17# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
18if TYPE_CHECKING:
19    # ruff: noqa: TC004
20    from airbyte.caches import base, bigquery, duckdb, motherduck, postgres, snowflake, util
21
22# We export these classes for easy access: `airbyte.caches...`
23__all__ = [
24    # Factories
25    "get_default_cache",
26    "new_local_cache",
27    # Classes
28    "BigQueryCache",
29    "CacheBase",
30    "DuckDBCache",
31    "MotherDuckCache",
32    "PostgresCache",
33    "SnowflakeCache",
34    # Submodules,
35    "util",
36    "bigquery",
37    "duckdb",
38    "motherduck",
39    "postgres",
40    "snowflake",
41    "base",
42]
def get_default_cache() -> DuckDBCache:
27def get_default_cache() -> DuckDBCache:
28    """Get a local cache for storing data, using the default database path.
29
30    Cache files are stored in the `.cache` directory, relative to the current
31    working directory.
32    """
33    cache_dir = Path("./.cache/default_cache")
34    return DuckDBCache(
35        db_path=cache_dir / "default_cache.duckdb",
36        cache_dir=cache_dir,
37    )

Get a local cache for storing data, using the default database path.

Cache files are stored in the .cache directory, relative to the current working directory.

def new_local_cache( cache_name: str | None = None, cache_dir: str | pathlib.Path | None = None, *, cleanup: bool = True) -> DuckDBCache:
40def new_local_cache(
41    cache_name: str | None = None,
42    cache_dir: str | Path | None = None,
43    *,
44    cleanup: bool = True,
45) -> DuckDBCache:
46    """Get a local cache for storing data, using a name string to seed the path.
47
48    Args:
49        cache_name: Name to use for the cache. Defaults to None.
50        cache_dir: Root directory to store the cache in. Defaults to None.
51        cleanup: Whether to clean up temporary files. Defaults to True.
52
53    Cache files are stored in the `.cache` directory, relative to the current
54    working directory.
55    """
56    if cache_name:
57        if " " in cache_name:
58            raise exc.PyAirbyteInputError(
59                message="Cache name cannot contain spaces.",
60                input_value=cache_name,
61            )
62
63        if not cache_name.replace("_", "").isalnum():
64            raise exc.PyAirbyteInputError(
65                message="Cache name can only contain alphanumeric characters and underscores.",
66                input_value=cache_name,
67            )
68
69    cache_name = cache_name or str(ulid.ULID())
70    cache_dir = cache_dir or Path(f"./.cache/{cache_name}")
71    if not isinstance(cache_dir, Path):
72        cache_dir = Path(cache_dir)
73
74    return DuckDBCache(
75        db_path=cache_dir / f"db_{cache_name}.duckdb",
76        cache_dir=cache_dir,
77        cleanup=cleanup,
78    )

Get a local cache for storing data, using a name string to seed the path.

Arguments:
  • cache_name: Name to use for the cache. Defaults to None.
  • cache_dir: Root directory to store the cache in. Defaults to None.
  • cleanup: Whether to clean up temporary files. Defaults to True.

Cache files are stored in the .cache directory, relative to the current working directory.

class BigQueryCache(airbyte._processors.sql.bigquery.BigQueryConfig, airbyte.caches.CacheBase):
39class BigQueryCache(BigQueryConfig, CacheBase):
40    """The BigQuery cache implementation."""
41
42    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor
43
44    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
45    paired_destination_config_class: ClassVar[type | None] = DestinationBigquery
46
47    @property
48    def paired_destination_config(self) -> DestinationBigquery:
49        """Return a dictionary of destination configuration values."""
50        return bigquery_cache_to_destination_configuration(cache=self)
51
52    def get_arrow_dataset(
53        self,
54        stream_name: str,
55        *,
56        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
57    ) -> NoReturn:
58        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
59
60        See: https://github.com/airbytehq/PyAirbyte/issues/165
61        """
62        raise NotImplementedError(
63            "BigQuery doesn't currently support to_arrow"
64            "Please consider using a different cache implementation for these functionalities."
65        )

The BigQuery cache implementation.

paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_bigquery.DestinationBigquery'>
paired_destination_config: airbyte_api.models.destination_bigquery.DestinationBigquery
47    @property
48    def paired_destination_config(self) -> DestinationBigquery:
49        """Return a dictionary of destination configuration values."""
50        return bigquery_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

def get_arrow_dataset(self, stream_name: str, *, max_chunk_size: int = 100000) -> NoReturn:
52    def get_arrow_dataset(
53        self,
54        stream_name: str,
55        *,
56        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
57    ) -> NoReturn:
58        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
59
60        See: https://github.com/airbytehq/PyAirbyte/issues/165
61        """
62        raise NotImplementedError(
63            "BigQuery doesn't currently support to_arrow"
64            "Please consider using a different cache implementation for these functionalities."
65        )

Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table.

See: https://github.com/airbytehq/PyAirbyte/issues/165

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.bigquery.BigQueryConfig
database_name
schema_name
credentials_path
project_name
dataset_name
get_sql_alchemy_url
get_database_name
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class CacheBase(airbyte.shared.sql_processor.SqlConfig, airbyte._writers.base.AirbyteWriterInterface):
 41class CacheBase(SqlConfig, AirbyteWriterInterface):
 42    """Base configuration for a cache.
 43
 44    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 45    and basic connectivity to the SQL database.
 46
 47    The cache is responsible for managing the state of the data synced to the cache, including the
 48    stream catalog and stream state. The cache also provides the mechanism to read and write data
 49    to the SQL backend specified in the `SqlConfig` class.
 50    """
 51
 52    cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT))
 53    """The directory to store the cache in."""
 54
 55    cleanup: bool = TEMP_FILE_CLEANUP
 56    """Whether to clean up the cache after use."""
 57
 58    _name: str = PrivateAttr()
 59
 60    _sql_processor_class: ClassVar[type[SqlProcessorBase]]
 61    _read_processor: SqlProcessorBase = PrivateAttr()
 62
 63    _catalog_backend: CatalogBackendBase = PrivateAttr()
 64    _state_backend: StateBackendBase = PrivateAttr()
 65
 66    paired_destination_name: ClassVar[str | None] = None
 67    paired_destination_config_class: ClassVar[type | None] = None
 68
 69    @property
 70    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
 71        """Return a dictionary of destination configuration values."""
 72        raise NotImplementedError(
 73            f"The type '{type(self).__name__}' does not define an equivalent destination "
 74            "configuration."
 75        )
 76
 77    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 78        """Initialize the cache and backends."""
 79        super().__init__(**data)
 80
 81        # Create a temporary processor to do the work of ensuring the schema exists
 82        temp_processor = self._sql_processor_class(
 83            sql_config=self,
 84            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 85            state_writer=StdOutStateWriter(),
 86            temp_dir=self.cache_dir,
 87            temp_file_cleanup=self.cleanup,
 88        )
 89        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 90
 91        # Initialize the catalog and state backends
 92        self._catalog_backend = SqlCatalogBackend(
 93            engine=self.get_sql_engine(),
 94            table_prefix=self.table_prefix or "",
 95        )
 96        self._state_backend = SqlStateBackend(
 97            engine=self.get_sql_engine(),
 98            table_prefix=self.table_prefix or "",
 99        )
100
101        # Now we can create the SQL read processor
102        self._read_processor = self._sql_processor_class(
103            sql_config=self,
104            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
105            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
106            temp_dir=self.cache_dir,
107            temp_file_cleanup=self.cleanup,
108        )
109
110    @property
111    def config_hash(self) -> str | None:
112        """Return a hash of the cache configuration.
113
114        This is the same as the SQLConfig hash from the superclass.
115        """
116        return super(SqlConfig, self).config_hash
117
118    def execute_sql(self, sql: str | list[str]) -> None:
119        """Execute one or more SQL statements against the cache's SQL backend.
120
121        If multiple SQL statements are given, they are executed in order,
122        within the same transaction.
123
124        This method is useful for creating tables, indexes, and other
125        schema objects in the cache. It does not return any results and it
126        automatically closes the connection after executing all statements.
127
128        This method is not intended for querying data. For that, use the `get_records`
129        method - or for a low-level interface, use the `get_sql_engine` method.
130
131        If any of the statements fail, the transaction is canceled and an exception
132        is raised. Most databases will rollback the transaction in this case.
133        """
134        if isinstance(sql, str):
135            # Coerce to a list if a single string is given
136            sql = [sql]
137
138        with self.processor.get_sql_connection() as connection:
139            for sql_statement in sql:
140                connection.execute(text(sql_statement))
141
142    @final
143    @property
144    def processor(self) -> SqlProcessorBase:
145        """Return the SQL processor instance."""
146        return self._read_processor
147
148    def get_record_processor(
149        self,
150        source_name: str,
151        catalog_provider: CatalogProvider,
152        state_writer: StateWriterBase | None = None,
153    ) -> SqlProcessorBase:
154        """Return a record processor for the specified source name and catalog.
155
156        We first register the source and its catalog with the catalog manager. Then we create a new
157        SQL processor instance with (only) the given input catalog.
158
159        For the state writer, we use a state writer which stores state in an internal SQL table.
160        """
161        # First register the source and catalog into durable storage. This is necessary to ensure
162        # that we can later retrieve the catalog information.
163        self.register_source(
164            source_name=source_name,
165            incoming_source_catalog=catalog_provider.configured_catalog,
166            stream_names=set(catalog_provider.stream_names),
167        )
168
169        # Next create a new SQL processor instance with the given catalog - and a state writer
170        # that writes state to the internal SQL table and associates with the given source name.
171        return self._sql_processor_class(
172            sql_config=self,
173            catalog_provider=catalog_provider,
174            state_writer=state_writer or self.get_state_writer(source_name=source_name),
175            temp_dir=self.cache_dir,
176            temp_file_cleanup=self.cleanup,
177        )
178
179    # Read methods:
180
181    def get_records(
182        self,
183        stream_name: str,
184    ) -> CachedDataset:
185        """Uses SQLAlchemy to select all rows from the table."""
186        return CachedDataset(self, stream_name)
187
188    def get_pandas_dataframe(
189        self,
190        stream_name: str,
191    ) -> pd.DataFrame:
192        """Return a Pandas data frame with the stream's data."""
193        table_name = self._read_processor.get_sql_table_name(stream_name)
194        engine = self.get_sql_engine()
195        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
196
197    def get_arrow_dataset(
198        self,
199        stream_name: str,
200        *,
201        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
202    ) -> ds.Dataset:
203        """Return an Arrow Dataset with the stream's data."""
204        table_name = self._read_processor.get_sql_table_name(stream_name)
205        engine = self.get_sql_engine()
206
207        # Read the table in chunks to handle large tables which does not fits in memory
208        pandas_chunks = pd.read_sql_table(
209            table_name=table_name,
210            con=engine,
211            schema=self.schema_name,
212            chunksize=max_chunk_size,
213        )
214
215        arrow_batches_list = []
216        arrow_schema = None
217
218        for pandas_chunk in pandas_chunks:
219            if arrow_schema is None:
220                # Initialize the schema with the first chunk
221                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
222
223            # Convert each pandas chunk to an Arrow Table
224            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
225            arrow_batches_list.append(arrow_table)
226
227        return ds.dataset(arrow_batches_list)
228
229    @final
230    @property
231    def streams(self) -> dict[str, CachedDataset]:
232        """Return a temporary table name."""
233        result = {}
234        stream_names = set(self._catalog_backend.stream_names)
235
236        for stream_name in stream_names:
237            result[stream_name] = CachedDataset(self, stream_name)
238
239        return result
240
241    @final
242    def __len__(self) -> int:
243        """Gets the number of streams."""
244        return len(self._catalog_backend.stream_names)
245
246    @final
247    def __bool__(self) -> bool:
248        """Always True.
249
250        This is needed so that caches with zero streams are not falsey (None-like).
251        """
252        return True
253
254    def get_state_provider(
255        self,
256        source_name: str,
257        *,
258        refresh: bool = True,
259        destination_name: str | None = None,
260    ) -> StateProviderBase:
261        """Return a state provider for the specified source name."""
262        return self._state_backend.get_state_provider(
263            source_name=source_name,
264            table_prefix=self.table_prefix or "",
265            refresh=refresh,
266            destination_name=destination_name,
267        )
268
269    def get_state_writer(
270        self,
271        source_name: str,
272        destination_name: str | None = None,
273    ) -> StateWriterBase:
274        """Return a state writer for the specified source name.
275
276        If syncing to the cache, `destination_name` should be `None`.
277        If syncing to a destination, `destination_name` should be the destination name.
278        """
279        return self._state_backend.get_state_writer(
280            source_name=source_name,
281            destination_name=destination_name,
282        )
283
284    def register_source(
285        self,
286        source_name: str,
287        incoming_source_catalog: ConfiguredAirbyteCatalog,
288        stream_names: set[str],
289    ) -> None:
290        """Register the source name and catalog."""
291        self._catalog_backend.register_source(
292            source_name=source_name,
293            incoming_source_catalog=incoming_source_catalog,
294            incoming_stream_names=stream_names,
295        )
296
297    def __getitem__(self, stream: str) -> CachedDataset:
298        """Return a dataset by stream name."""
299        return self.streams[stream]
300
301    def __contains__(self, stream: str) -> bool:
302        """Return whether a stream is in the cache."""
303        return stream in (self._catalog_backend.stream_names)
304
305    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
306        self,
307    ) -> Iterator[tuple[str, Any]]:
308        """Iterate over the streams in the cache."""
309        return ((name, dataset) for name, dataset in self.streams.items())
310
311    def _write_airbyte_message_stream(
312        self,
313        stdin: IO[str] | AirbyteMessageIterator,
314        *,
315        catalog_provider: CatalogProvider,
316        write_strategy: WriteStrategy,
317        state_writer: StateWriterBase | None = None,
318        progress_tracker: ProgressTracker,
319    ) -> None:
320        """Read from the connector and write to the cache."""
321        cache_processor = self.get_record_processor(
322            source_name=self.name,
323            catalog_provider=catalog_provider,
324            state_writer=state_writer,
325        )
326        cache_processor.process_airbyte_messages(
327            messages=stdin,
328            write_strategy=write_strategy,
329            progress_tracker=progress_tracker,
330        )
331        progress_tracker.log_cache_processing_complete()

Base configuration for a cache.

Caches inherit from the matching SqlConfig class, which provides the SQL config settings and basic connectivity to the SQL database.

The cache is responsible for managing the state of the data synced to the cache, including the stream catalog and stream state. The cache also provides the mechanism to read and write data to the SQL backend specified in the SqlConfig class.

CacheBase(**data: Any)
 77    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 78        """Initialize the cache and backends."""
 79        super().__init__(**data)
 80
 81        # Create a temporary processor to do the work of ensuring the schema exists
 82        temp_processor = self._sql_processor_class(
 83            sql_config=self,
 84            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 85            state_writer=StdOutStateWriter(),
 86            temp_dir=self.cache_dir,
 87            temp_file_cleanup=self.cleanup,
 88        )
 89        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 90
 91        # Initialize the catalog and state backends
 92        self._catalog_backend = SqlCatalogBackend(
 93            engine=self.get_sql_engine(),
 94            table_prefix=self.table_prefix or "",
 95        )
 96        self._state_backend = SqlStateBackend(
 97            engine=self.get_sql_engine(),
 98            table_prefix=self.table_prefix or "",
 99        )
100
101        # Now we can create the SQL read processor
102        self._read_processor = self._sql_processor_class(
103            sql_config=self,
104            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
105            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
106            temp_dir=self.cache_dir,
107            temp_file_cleanup=self.cleanup,
108        )

Initialize the cache and backends.

cache_dir: pathlib.Path

The directory to store the cache in.

cleanup: bool

Whether to clean up the cache after use.

paired_destination_name: ClassVar[str | None] = None
paired_destination_config_class: ClassVar[type | None] = None
paired_destination_config: Union[Any, dict[str, Any]]
69    @property
70    def paired_destination_config(self) -> Any | dict[str, Any]:  # noqa: ANN401  # Allow Any return type
71        """Return a dictionary of destination configuration values."""
72        raise NotImplementedError(
73            f"The type '{type(self).__name__}' does not define an equivalent destination "
74            "configuration."
75        )

Return a dictionary of destination configuration values.

config_hash: str | None
110    @property
111    def config_hash(self) -> str | None:
112        """Return a hash of the cache configuration.
113
114        This is the same as the SQLConfig hash from the superclass.
115        """
116        return super(SqlConfig, self).config_hash

Return a hash of the cache configuration.

This is the same as the SQLConfig hash from the superclass.

def execute_sql(self, sql: str | list[str]) -> None:
118    def execute_sql(self, sql: str | list[str]) -> None:
119        """Execute one or more SQL statements against the cache's SQL backend.
120
121        If multiple SQL statements are given, they are executed in order,
122        within the same transaction.
123
124        This method is useful for creating tables, indexes, and other
125        schema objects in the cache. It does not return any results and it
126        automatically closes the connection after executing all statements.
127
128        This method is not intended for querying data. For that, use the `get_records`
129        method - or for a low-level interface, use the `get_sql_engine` method.
130
131        If any of the statements fail, the transaction is canceled and an exception
132        is raised. Most databases will rollback the transaction in this case.
133        """
134        if isinstance(sql, str):
135            # Coerce to a list if a single string is given
136            sql = [sql]
137
138        with self.processor.get_sql_connection() as connection:
139            for sql_statement in sql:
140                connection.execute(text(sql_statement))

Execute one or more SQL statements against the cache's SQL backend.

If multiple SQL statements are given, they are executed in order, within the same transaction.

This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.

This method is not intended for querying data. For that, use the get_records method - or for a low-level interface, use the get_sql_engine method.

If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.

processor: airbyte.shared.sql_processor.SqlProcessorBase
142    @final
143    @property
144    def processor(self) -> SqlProcessorBase:
145        """Return the SQL processor instance."""
146        return self._read_processor

Return the SQL processor instance.

def get_record_processor( self, source_name: str, catalog_provider: airbyte.shared.catalog_providers.CatalogProvider, state_writer: airbyte.shared.state_writers.StateWriterBase | None = None) -> airbyte.shared.sql_processor.SqlProcessorBase:
148    def get_record_processor(
149        self,
150        source_name: str,
151        catalog_provider: CatalogProvider,
152        state_writer: StateWriterBase | None = None,
153    ) -> SqlProcessorBase:
154        """Return a record processor for the specified source name and catalog.
155
156        We first register the source and its catalog with the catalog manager. Then we create a new
157        SQL processor instance with (only) the given input catalog.
158
159        For the state writer, we use a state writer which stores state in an internal SQL table.
160        """
161        # First register the source and catalog into durable storage. This is necessary to ensure
162        # that we can later retrieve the catalog information.
163        self.register_source(
164            source_name=source_name,
165            incoming_source_catalog=catalog_provider.configured_catalog,
166            stream_names=set(catalog_provider.stream_names),
167        )
168
169        # Next create a new SQL processor instance with the given catalog - and a state writer
170        # that writes state to the internal SQL table and associates with the given source name.
171        return self._sql_processor_class(
172            sql_config=self,
173            catalog_provider=catalog_provider,
174            state_writer=state_writer or self.get_state_writer(source_name=source_name),
175            temp_dir=self.cache_dir,
176            temp_file_cleanup=self.cleanup,
177        )

Return a record processor for the specified source name and catalog.

We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.

For the state writer, we use a state writer which stores state in an internal SQL table.

def get_records(self, stream_name: str) -> airbyte.CachedDataset:
181    def get_records(
182        self,
183        stream_name: str,
184    ) -> CachedDataset:
185        """Uses SQLAlchemy to select all rows from the table."""
186        return CachedDataset(self, stream_name)

Uses SQLAlchemy to select all rows from the table.

def get_pandas_dataframe(self, stream_name: str) -> pandas.core.frame.DataFrame:
188    def get_pandas_dataframe(
189        self,
190        stream_name: str,
191    ) -> pd.DataFrame:
192        """Return a Pandas data frame with the stream's data."""
193        table_name = self._read_processor.get_sql_table_name(stream_name)
194        engine = self.get_sql_engine()
195        return pd.read_sql_table(table_name, engine, schema=self.schema_name)

Return a Pandas data frame with the stream's data.

def get_arrow_dataset( self, stream_name: str, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
197    def get_arrow_dataset(
198        self,
199        stream_name: str,
200        *,
201        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
202    ) -> ds.Dataset:
203        """Return an Arrow Dataset with the stream's data."""
204        table_name = self._read_processor.get_sql_table_name(stream_name)
205        engine = self.get_sql_engine()
206
207        # Read the table in chunks to handle large tables which does not fits in memory
208        pandas_chunks = pd.read_sql_table(
209            table_name=table_name,
210            con=engine,
211            schema=self.schema_name,
212            chunksize=max_chunk_size,
213        )
214
215        arrow_batches_list = []
216        arrow_schema = None
217
218        for pandas_chunk in pandas_chunks:
219            if arrow_schema is None:
220                # Initialize the schema with the first chunk
221                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
222
223            # Convert each pandas chunk to an Arrow Table
224            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
225            arrow_batches_list.append(arrow_table)
226
227        return ds.dataset(arrow_batches_list)

Return an Arrow Dataset with the stream's data.

streams: dict[str, airbyte.CachedDataset]
229    @final
230    @property
231    def streams(self) -> dict[str, CachedDataset]:
232        """Return a temporary table name."""
233        result = {}
234        stream_names = set(self._catalog_backend.stream_names)
235
236        for stream_name in stream_names:
237            result[stream_name] = CachedDataset(self, stream_name)
238
239        return result

Return a temporary table name.

def get_state_provider( self, source_name: str, *, refresh: bool = True, destination_name: str | None = None) -> airbyte.shared.state_providers.StateProviderBase:
254    def get_state_provider(
255        self,
256        source_name: str,
257        *,
258        refresh: bool = True,
259        destination_name: str | None = None,
260    ) -> StateProviderBase:
261        """Return a state provider for the specified source name."""
262        return self._state_backend.get_state_provider(
263            source_name=source_name,
264            table_prefix=self.table_prefix or "",
265            refresh=refresh,
266            destination_name=destination_name,
267        )

Return a state provider for the specified source name.

def get_state_writer( self, source_name: str, destination_name: str | None = None) -> airbyte.shared.state_writers.StateWriterBase:
269    def get_state_writer(
270        self,
271        source_name: str,
272        destination_name: str | None = None,
273    ) -> StateWriterBase:
274        """Return a state writer for the specified source name.
275
276        If syncing to the cache, `destination_name` should be `None`.
277        If syncing to a destination, `destination_name` should be the destination name.
278        """
279        return self._state_backend.get_state_writer(
280            source_name=source_name,
281            destination_name=destination_name,
282        )

Return a state writer for the specified source name.

If syncing to the cache, destination_name should be None. If syncing to a destination, destination_name should be the destination name.

def register_source( self, source_name: str, incoming_source_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog, stream_names: set[str]) -> None:
284    def register_source(
285        self,
286        source_name: str,
287        incoming_source_catalog: ConfiguredAirbyteCatalog,
288        stream_names: set[str],
289    ) -> None:
290        """Register the source name and catalog."""
291        self._catalog_backend.register_source(
292            source_name=source_name,
293            incoming_source_catalog=incoming_source_catalog,
294            incoming_stream_names=stream_names,
295        )

Register the source name and catalog.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
384def init_private_attributes(self: BaseModel, context: Any, /) -> None:
385    """This function is meant to behave like a BaseModel method to initialise private attributes.
386
387    It takes context as an argument since that's what pydantic-core passes when calling it.
388
389    Args:
390        self: The BaseModel instance.
391        context: The context.
392    """
393    if getattr(self, '__pydantic_private__', None) is None:
394        pydantic_private = {}
395        for name, private_attr in self.__private_attributes__.items():
396            default = private_attr.get_default()
397            if default is not PydanticUndefined:
398                pydantic_private[name] = default
399        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
get_sql_alchemy_url
get_database_name
get_create_table_extra_clauses
get_sql_engine
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class DuckDBCache(airbyte._processors.sql.duckdb.DuckDBConfig, airbyte.caches.CacheBase):
44class DuckDBCache(DuckDBConfig, CacheBase):
45    """A DuckDB cache."""
46
47    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor
48
49    paired_destination_name: ClassVar[str | None] = "destination-duckdb"
50    paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb
51
52    @property
53    def paired_destination_config(self) -> DestinationDuckdb:
54        """Return a dictionary of destination configuration values."""
55        return duckdb_cache_to_destination_configuration(cache=self)

A DuckDB cache.

paired_destination_name: ClassVar[str | None] = 'destination-duckdb'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_duckdb.DestinationDuckdb'>
paired_destination_config: airbyte_api.models.destination_duckdb.DestinationDuckdb
52    @property
53    def paired_destination_config(self) -> DestinationDuckdb:
54        """Return a dictionary of destination configuration values."""
55        return duckdb_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.duckdb.DuckDBConfig
db_path
schema_name
get_sql_alchemy_url
get_database_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
72class MotherDuckCache(MotherDuckConfig, DuckDBCache):
73    """Cache that uses MotherDuck for external persistent storage."""
74
75    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor
76
77    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
78    paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb
79
80    @property
81    def paired_destination_config(self) -> DestinationDuckdb:
82        """Return a dictionary of destination configuration values."""
83        return motherduck_cache_to_destination_configuration(cache=self)

Cache that uses MotherDuck for external persistent storage.

paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_duckdb.DestinationDuckdb'>
paired_destination_config: airbyte_api.models.destination_duckdb.DestinationDuckdb
80    @property
81    def paired_destination_config(self) -> DestinationDuckdb:
82        """Return a dictionary of destination configuration values."""
83        return motherduck_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
airbyte.caches.motherduck.MotherDuckConfig
database
api_key
db_path
get_sql_alchemy_url
get_database_name
airbyte._processors.sql.duckdb.DuckDBConfig
schema_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class PostgresCache(airbyte._processors.sql.postgres.PostgresConfig, airbyte.caches.CacheBase):
38class PostgresCache(PostgresConfig, CacheBase):
39    """Configuration for the Postgres cache.
40
41    Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
42    """
43
44    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor
45
46    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
47    paired_destination_config_class: ClassVar[type | None] = DestinationPostgres
48
49    @property
50    def paired_destination_config(self) -> DestinationPostgres:
51        """Return a dictionary of destination configuration values."""
52        return postgres_cache_to_destination_configuration(cache=self)
53
54    def clone_as_cloud_destination_config(self) -> DestinationPostgres:
55        """Return a DestinationPostgres instance with the same configuration."""
56        return DestinationPostgres(
57            host=self.host,
58            port=self.port,
59            username=self.username,
60            password=self.password,
61            database=self.database,
62        )

Configuration for the Postgres cache.

Also inherits config from the JsonlWriter, which is responsible for writing files to disk.

paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_postgres.DestinationPostgres'>
paired_destination_config: airbyte_api.models.destination_postgres.DestinationPostgres
49    @property
50    def paired_destination_config(self) -> DestinationPostgres:
51        """Return a dictionary of destination configuration values."""
52        return postgres_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

def clone_as_cloud_destination_config(self) -> airbyte_api.models.destination_postgres.DestinationPostgres:
54    def clone_as_cloud_destination_config(self) -> DestinationPostgres:
55        """Return a DestinationPostgres instance with the same configuration."""
56        return DestinationPostgres(
57            host=self.host,
58            port=self.port,
59            username=self.username,
60            password=self.password,
61            database=self.database,
62        )

Return a DestinationPostgres instance with the same configuration.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.postgres.PostgresConfig
host
port
database
username
password
get_sql_alchemy_url
get_database_name
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
get_create_table_extra_clauses
get_sql_engine
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class SnowflakeCache(airbyte._processors.sql.snowflake.SnowflakeConfig, airbyte.caches.CacheBase):
37class SnowflakeCache(SnowflakeConfig, CacheBase):
38    """Configuration for the Snowflake cache."""
39
40    dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND
41
42    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor
43
44    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
45    paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake
46
47    @property
48    def paired_destination_config(self) -> DestinationSnowflake:
49        """Return a dictionary of destination configuration values."""
50        return snowflake_cache_to_destination_configuration(cache=self)

Configuration for the Snowflake cache.

dedupe_mode: airbyte.shared.sql_processor.RecordDedupeMode
paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_snowflake.DestinationSnowflake'>
paired_destination_config: airbyte_api.models.destination_snowflake.DestinationSnowflake
47    @property
48    def paired_destination_config(self) -> DestinationSnowflake:
49        """Return a dictionary of destination configuration values."""
50        return snowflake_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.snowflake.SnowflakeConfig
account
username
password
warehouse
database
role
schema_name
data_retention_time_in_days
get_create_table_extra_clauses
get_database_name
get_sql_alchemy_url
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name