airbyte.caches

Base module for all caches.

 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2"""Base module for all caches."""
 3
 4from __future__ import annotations
 5
 6from airbyte.caches import base, bigquery, duckdb, motherduck, postgres, snowflake, util
 7from airbyte.caches.base import CacheBase
 8from airbyte.caches.bigquery import BigQueryCache
 9from airbyte.caches.duckdb import DuckDBCache
10from airbyte.caches.motherduck import MotherDuckCache
11from airbyte.caches.postgres import PostgresCache
12from airbyte.caches.snowflake import SnowflakeCache
13from airbyte.caches.util import get_default_cache, new_local_cache
14
15
16# We export these classes for easy access: `airbyte.caches...`
17__all__ = [
18    # Factories
19    "get_default_cache",
20    "new_local_cache",
21    # Classes
22    "BigQueryCache",
23    "CacheBase",
24    "DuckDBCache",
25    "MotherDuckCache",
26    "PostgresCache",
27    "SnowflakeCache",
28    # Submodules,
29    "util",
30    "bigquery",
31    "duckdb",
32    "motherduck",
33    "postgres",
34    "snowflake",
35    "base",
36]
def get_default_cache() -> DuckDBCache:
27def get_default_cache() -> DuckDBCache:
28    """Get a local cache for storing data, using the default database path.
29
30    Cache files are stored in the `.cache` directory, relative to the current
31    working directory.
32    """
33    cache_dir = Path("./.cache/default_cache")
34    return DuckDBCache(
35        db_path=cache_dir / "default_cache.duckdb",
36        cache_dir=cache_dir,
37    )

Get a local cache for storing data, using the default database path.

Cache files are stored in the .cache directory, relative to the current working directory.

def new_local_cache( cache_name: str | None = None, cache_dir: str | pathlib.Path | None = None, *, cleanup: bool = True) -> DuckDBCache:
40def new_local_cache(
41    cache_name: str | None = None,
42    cache_dir: str | Path | None = None,
43    *,
44    cleanup: bool = True,
45) -> DuckDBCache:
46    """Get a local cache for storing data, using a name string to seed the path.
47
48    Args:
49        cache_name: Name to use for the cache. Defaults to None.
50        cache_dir: Root directory to store the cache in. Defaults to None.
51        cleanup: Whether to clean up temporary files. Defaults to True.
52
53    Cache files are stored in the `.cache` directory, relative to the current
54    working directory.
55    """
56    if cache_name:
57        if " " in cache_name:
58            raise exc.PyAirbyteInputError(
59                message="Cache name cannot contain spaces.",
60                input_value=cache_name,
61            )
62
63        if not cache_name.replace("_", "").isalnum():
64            raise exc.PyAirbyteInputError(
65                message="Cache name can only contain alphanumeric characters and underscores.",
66                input_value=cache_name,
67            )
68
69    cache_name = cache_name or str(ulid.ULID())
70    cache_dir = cache_dir or Path(f"./.cache/{cache_name}")
71    if not isinstance(cache_dir, Path):
72        cache_dir = Path(cache_dir)
73
74    return DuckDBCache(
75        db_path=cache_dir / f"db_{cache_name}.duckdb",
76        cache_dir=cache_dir,
77        cleanup=cleanup,
78    )

Get a local cache for storing data, using a name string to seed the path.

Arguments:
  • cache_name: Name to use for the cache. Defaults to None.
  • cache_dir: Root directory to store the cache in. Defaults to None.
  • cleanup: Whether to clean up temporary files. Defaults to True.

Cache files are stored in the .cache directory, relative to the current working directory.

class BigQueryCache(airbyte._processors.sql.bigquery.BigQueryConfig, airbyte.caches.CacheBase):
32class BigQueryCache(BigQueryConfig, CacheBase):
33    """The BigQuery cache implementation."""
34
35    _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor)
36
37    def get_arrow_dataset(
38        self,
39        stream_name: str,
40        *,
41        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
42    ) -> NoReturn:
43        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
44
45        See: https://github.com/airbytehq/PyAirbyte/issues/165
46        """
47        raise NotImplementedError(
48            "BigQuery doesn't currently support to_arrow"
49            "Please consider using a different cache implementation for these functionalities."
50        )

The BigQuery cache implementation.

def get_arrow_dataset(self, stream_name: str, *, max_chunk_size: int = 100000) -> NoReturn:
37    def get_arrow_dataset(
38        self,
39        stream_name: str,
40        *,
41        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
42    ) -> NoReturn:
43        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
44
45        See: https://github.com/airbytehq/PyAirbyte/issues/165
46        """
47        raise NotImplementedError(
48            "BigQuery doesn't currently support to_arrow"
49            "Please consider using a different cache implementation for these functionalities."
50        )

Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table.

See: https://github.com/airbytehq/PyAirbyte/issues/165

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw', alias='dataset_name', alias_priority=2), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'database_name': FieldInfo(annotation=str, required=True, alias='project_name', alias_priority=2), 'credentials_path': FieldInfo(annotation=Union[str, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.bigquery.BigQueryConfig
database_name
schema_name
credentials_path
project_name
dataset_name
get_sql_alchemy_url
get_database_name
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class CacheBase(airbyte.shared.sql_processor.SqlConfig, airbyte._writers.base.AirbyteWriterInterface):
 42class CacheBase(SqlConfig, AirbyteWriterInterface):
 43    """Base configuration for a cache.
 44
 45    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 46    and basic connectivity to the SQL database.
 47
 48    The cache is responsible for managing the state of the data synced to the cache, including the
 49    stream catalog and stream state. The cache also provides the mechanism to read and write data
 50    to the SQL backend specified in the `SqlConfig` class.
 51    """
 52
 53    cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT))
 54    """The directory to store the cache in."""
 55
 56    cleanup: bool = TEMP_FILE_CLEANUP
 57    """Whether to clean up the cache after use."""
 58
 59    _name: str = PrivateAttr()
 60
 61    _deployed_api_root: str | None = PrivateAttr(default=None)
 62    _deployed_workspace_id: str | None = PrivateAttr(default=None)
 63    _deployed_destination_id: str | None = PrivateAttr(default=None)
 64
 65    _sql_processor_class: type[SqlProcessorBase] = PrivateAttr()
 66    _read_processor: SqlProcessorBase = PrivateAttr()
 67
 68    _catalog_backend: CatalogBackendBase = PrivateAttr()
 69    _state_backend: StateBackendBase = PrivateAttr()
 70
 71    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 72        """Initialize the cache and backends."""
 73        super().__init__(**data)
 74
 75        # Create a temporary processor to do the work of ensuring the schema exists
 76        temp_processor = self._sql_processor_class(
 77            sql_config=self,
 78            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 79            state_writer=StdOutStateWriter(),
 80            temp_dir=self.cache_dir,
 81            temp_file_cleanup=self.cleanup,
 82        )
 83        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 84
 85        # Initialize the catalog and state backends
 86        self._catalog_backend = SqlCatalogBackend(
 87            engine=self.get_sql_engine(),
 88            table_prefix=self.table_prefix or "",
 89        )
 90        self._state_backend = SqlStateBackend(
 91            engine=self.get_sql_engine(),
 92            table_prefix=self.table_prefix or "",
 93        )
 94
 95        # Now we can create the SQL read processor
 96        self._read_processor = self._sql_processor_class(
 97            sql_config=self,
 98            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
 99            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
100            temp_dir=self.cache_dir,
101            temp_file_cleanup=self.cleanup,
102        )
103
104    @property
105    def config_hash(self) -> str | None:
106        """Return a hash of the cache configuration.
107
108        This is the same as the SQLConfig hash from the superclass.
109        """
110        return super(SqlConfig, self).config_hash
111
112    def execute_sql(self, sql: str | list[str]) -> None:
113        """Execute one or more SQL statements against the cache's SQL backend.
114
115        If multiple SQL statements are given, they are executed in order,
116        within the same transaction.
117
118        This method is useful for creating tables, indexes, and other
119        schema objects in the cache. It does not return any results and it
120        automatically closes the connection after executing all statements.
121
122        This method is not intended for querying data. For that, use the `get_records`
123        method - or for a low-level interface, use the `get_sql_engine` method.
124
125        If any of the statements fail, the transaction is canceled and an exception
126        is raised. Most databases will rollback the transaction in this case.
127        """
128        if isinstance(sql, str):
129            # Coerce to a list if a single string is given
130            sql = [sql]
131
132        with self.processor.get_sql_connection() as connection:
133            for sql_statement in sql:
134                connection.execute(text(sql_statement))
135
136    @final
137    @property
138    def processor(self) -> SqlProcessorBase:
139        """Return the SQL processor instance."""
140        return self._read_processor
141
142    def get_record_processor(
143        self,
144        source_name: str,
145        catalog_provider: CatalogProvider,
146        state_writer: StateWriterBase | None = None,
147    ) -> SqlProcessorBase:
148        """Return a record processor for the specified source name and catalog.
149
150        We first register the source and its catalog with the catalog manager. Then we create a new
151        SQL processor instance with (only) the given input catalog.
152
153        For the state writer, we use a state writer which stores state in an internal SQL table.
154        """
155        # First register the source and catalog into durable storage. This is necessary to ensure
156        # that we can later retrieve the catalog information.
157        self.register_source(
158            source_name=source_name,
159            incoming_source_catalog=catalog_provider.configured_catalog,
160            stream_names=set(catalog_provider.stream_names),
161        )
162
163        # Next create a new SQL processor instance with the given catalog - and a state writer
164        # that writes state to the internal SQL table and associates with the given source name.
165        return self._sql_processor_class(
166            sql_config=self,
167            catalog_provider=catalog_provider,
168            state_writer=state_writer or self.get_state_writer(source_name=source_name),
169            temp_dir=self.cache_dir,
170            temp_file_cleanup=self.cleanup,
171        )
172
173    # Read methods:
174
175    def get_records(
176        self,
177        stream_name: str,
178    ) -> CachedDataset:
179        """Uses SQLAlchemy to select all rows from the table."""
180        return CachedDataset(self, stream_name)
181
182    def get_pandas_dataframe(
183        self,
184        stream_name: str,
185    ) -> pd.DataFrame:
186        """Return a Pandas data frame with the stream's data."""
187        table_name = self._read_processor.get_sql_table_name(stream_name)
188        engine = self.get_sql_engine()
189        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
190
191    def get_arrow_dataset(
192        self,
193        stream_name: str,
194        *,
195        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
196    ) -> ds.Dataset:
197        """Return an Arrow Dataset with the stream's data."""
198        table_name = self._read_processor.get_sql_table_name(stream_name)
199        engine = self.get_sql_engine()
200
201        # Read the table in chunks to handle large tables which does not fits in memory
202        pandas_chunks = pd.read_sql_table(
203            table_name=table_name,
204            con=engine,
205            schema=self.schema_name,
206            chunksize=max_chunk_size,
207        )
208
209        arrow_batches_list = []
210        arrow_schema = None
211
212        for pandas_chunk in pandas_chunks:
213            if arrow_schema is None:
214                # Initialize the schema with the first chunk
215                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
216
217            # Convert each pandas chunk to an Arrow Table
218            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
219            arrow_batches_list.append(arrow_table)
220
221        return ds.dataset(arrow_batches_list)
222
223    @final
224    @property
225    def streams(self) -> dict[str, CachedDataset]:
226        """Return a temporary table name."""
227        result = {}
228        stream_names = set(self._catalog_backend.stream_names)
229
230        for stream_name in stream_names:
231            result[stream_name] = CachedDataset(self, stream_name)
232
233        return result
234
235    def get_state_provider(
236        self,
237        source_name: str,
238        *,
239        refresh: bool = True,
240        destination_name: str | None = None,
241    ) -> StateProviderBase:
242        """Return a state provider for the specified source name."""
243        return self._state_backend.get_state_provider(
244            source_name=source_name,
245            table_prefix=self.table_prefix or "",
246            refresh=refresh,
247            destination_name=destination_name,
248        )
249
250    def get_state_writer(
251        self,
252        source_name: str,
253        destination_name: str | None = None,
254    ) -> StateWriterBase:
255        """Return a state writer for the specified source name.
256
257        If syncing to the cache, `destination_name` should be `None`.
258        If syncing to a destination, `destination_name` should be the destination name.
259        """
260        return self._state_backend.get_state_writer(
261            source_name=source_name,
262            destination_name=destination_name,
263        )
264
265    def register_source(
266        self,
267        source_name: str,
268        incoming_source_catalog: ConfiguredAirbyteCatalog,
269        stream_names: set[str],
270    ) -> None:
271        """Register the source name and catalog."""
272        self._catalog_backend.register_source(
273            source_name=source_name,
274            incoming_source_catalog=incoming_source_catalog,
275            incoming_stream_names=stream_names,
276        )
277
278    def __getitem__(self, stream: str) -> DatasetBase:
279        """Return a dataset by stream name."""
280        return self.streams[stream]
281
282    def __contains__(self, stream: str) -> bool:
283        """Return whether a stream is in the cache."""
284        return stream in (self._catalog_backend.stream_names)
285
286    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
287        self,
288    ) -> Iterator[tuple[str, Any]]:
289        """Iterate over the streams in the cache."""
290        return ((name, dataset) for name, dataset in self.streams.items())
291
292    def _write_airbyte_message_stream(
293        self,
294        stdin: IO[str] | AirbyteMessageIterator,
295        *,
296        catalog_provider: CatalogProvider,
297        write_strategy: WriteStrategy,
298        state_writer: StateWriterBase | None = None,
299        progress_tracker: ProgressTracker,
300    ) -> None:
301        """Read from the connector and write to the cache."""
302        cache_processor = self.get_record_processor(
303            source_name=self.name,
304            catalog_provider=catalog_provider,
305            state_writer=state_writer,
306        )
307        cache_processor.process_airbyte_messages(
308            messages=stdin,
309            write_strategy=write_strategy,
310            progress_tracker=progress_tracker,
311        )
312        progress_tracker.log_cache_processing_complete()

Base configuration for a cache.

Caches inherit from the matching SqlConfig class, which provides the SQL config settings and basic connectivity to the SQL database.

The cache is responsible for managing the state of the data synced to the cache, including the stream catalog and stream state. The cache also provides the mechanism to read and write data to the SQL backend specified in the SqlConfig class.

CacheBase(**data: Any)
 71    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 72        """Initialize the cache and backends."""
 73        super().__init__(**data)
 74
 75        # Create a temporary processor to do the work of ensuring the schema exists
 76        temp_processor = self._sql_processor_class(
 77            sql_config=self,
 78            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 79            state_writer=StdOutStateWriter(),
 80            temp_dir=self.cache_dir,
 81            temp_file_cleanup=self.cleanup,
 82        )
 83        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 84
 85        # Initialize the catalog and state backends
 86        self._catalog_backend = SqlCatalogBackend(
 87            engine=self.get_sql_engine(),
 88            table_prefix=self.table_prefix or "",
 89        )
 90        self._state_backend = SqlStateBackend(
 91            engine=self.get_sql_engine(),
 92            table_prefix=self.table_prefix or "",
 93        )
 94
 95        # Now we can create the SQL read processor
 96        self._read_processor = self._sql_processor_class(
 97            sql_config=self,
 98            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
 99            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
100            temp_dir=self.cache_dir,
101            temp_file_cleanup=self.cleanup,
102        )

Initialize the cache and backends.

cache_dir: pathlib.Path

The directory to store the cache in.

cleanup: bool

Whether to clean up the cache after use.

config_hash: str | None
104    @property
105    def config_hash(self) -> str | None:
106        """Return a hash of the cache configuration.
107
108        This is the same as the SQLConfig hash from the superclass.
109        """
110        return super(SqlConfig, self).config_hash

Return a hash of the cache configuration.

This is the same as the SQLConfig hash from the superclass.

def execute_sql(self, sql: str | list[str]) -> None:
112    def execute_sql(self, sql: str | list[str]) -> None:
113        """Execute one or more SQL statements against the cache's SQL backend.
114
115        If multiple SQL statements are given, they are executed in order,
116        within the same transaction.
117
118        This method is useful for creating tables, indexes, and other
119        schema objects in the cache. It does not return any results and it
120        automatically closes the connection after executing all statements.
121
122        This method is not intended for querying data. For that, use the `get_records`
123        method - or for a low-level interface, use the `get_sql_engine` method.
124
125        If any of the statements fail, the transaction is canceled and an exception
126        is raised. Most databases will rollback the transaction in this case.
127        """
128        if isinstance(sql, str):
129            # Coerce to a list if a single string is given
130            sql = [sql]
131
132        with self.processor.get_sql_connection() as connection:
133            for sql_statement in sql:
134                connection.execute(text(sql_statement))

Execute one or more SQL statements against the cache's SQL backend.

If multiple SQL statements are given, they are executed in order, within the same transaction.

This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.

This method is not intended for querying data. For that, use the get_records method - or for a low-level interface, use the get_sql_engine method.

If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.

processor: airbyte.shared.sql_processor.SqlProcessorBase
136    @final
137    @property
138    def processor(self) -> SqlProcessorBase:
139        """Return the SQL processor instance."""
140        return self._read_processor

Return the SQL processor instance.

def get_record_processor( self, source_name: str, catalog_provider: airbyte.shared.catalog_providers.CatalogProvider, state_writer: airbyte.shared.state_writers.StateWriterBase | None = None) -> airbyte.shared.sql_processor.SqlProcessorBase:
142    def get_record_processor(
143        self,
144        source_name: str,
145        catalog_provider: CatalogProvider,
146        state_writer: StateWriterBase | None = None,
147    ) -> SqlProcessorBase:
148        """Return a record processor for the specified source name and catalog.
149
150        We first register the source and its catalog with the catalog manager. Then we create a new
151        SQL processor instance with (only) the given input catalog.
152
153        For the state writer, we use a state writer which stores state in an internal SQL table.
154        """
155        # First register the source and catalog into durable storage. This is necessary to ensure
156        # that we can later retrieve the catalog information.
157        self.register_source(
158            source_name=source_name,
159            incoming_source_catalog=catalog_provider.configured_catalog,
160            stream_names=set(catalog_provider.stream_names),
161        )
162
163        # Next create a new SQL processor instance with the given catalog - and a state writer
164        # that writes state to the internal SQL table and associates with the given source name.
165        return self._sql_processor_class(
166            sql_config=self,
167            catalog_provider=catalog_provider,
168            state_writer=state_writer or self.get_state_writer(source_name=source_name),
169            temp_dir=self.cache_dir,
170            temp_file_cleanup=self.cleanup,
171        )

Return a record processor for the specified source name and catalog.

We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.

For the state writer, we use a state writer which stores state in an internal SQL table.

def get_records(self, stream_name: str) -> airbyte.CachedDataset:
175    def get_records(
176        self,
177        stream_name: str,
178    ) -> CachedDataset:
179        """Uses SQLAlchemy to select all rows from the table."""
180        return CachedDataset(self, stream_name)

Uses SQLAlchemy to select all rows from the table.

def get_pandas_dataframe(self, stream_name: str) -> pandas.core.frame.DataFrame:
182    def get_pandas_dataframe(
183        self,
184        stream_name: str,
185    ) -> pd.DataFrame:
186        """Return a Pandas data frame with the stream's data."""
187        table_name = self._read_processor.get_sql_table_name(stream_name)
188        engine = self.get_sql_engine()
189        return pd.read_sql_table(table_name, engine, schema=self.schema_name)

Return a Pandas data frame with the stream's data.

def get_arrow_dataset( self, stream_name: str, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
191    def get_arrow_dataset(
192        self,
193        stream_name: str,
194        *,
195        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
196    ) -> ds.Dataset:
197        """Return an Arrow Dataset with the stream's data."""
198        table_name = self._read_processor.get_sql_table_name(stream_name)
199        engine = self.get_sql_engine()
200
201        # Read the table in chunks to handle large tables which does not fits in memory
202        pandas_chunks = pd.read_sql_table(
203            table_name=table_name,
204            con=engine,
205            schema=self.schema_name,
206            chunksize=max_chunk_size,
207        )
208
209        arrow_batches_list = []
210        arrow_schema = None
211
212        for pandas_chunk in pandas_chunks:
213            if arrow_schema is None:
214                # Initialize the schema with the first chunk
215                arrow_schema = pa.Schema.from_pandas(pandas_chunk)
216
217            # Convert each pandas chunk to an Arrow Table
218            arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema)
219            arrow_batches_list.append(arrow_table)
220
221        return ds.dataset(arrow_batches_list)

Return an Arrow Dataset with the stream's data.

streams: dict[str, airbyte.CachedDataset]
223    @final
224    @property
225    def streams(self) -> dict[str, CachedDataset]:
226        """Return a temporary table name."""
227        result = {}
228        stream_names = set(self._catalog_backend.stream_names)
229
230        for stream_name in stream_names:
231            result[stream_name] = CachedDataset(self, stream_name)
232
233        return result

Return a temporary table name.

def get_state_provider( self, source_name: str, *, refresh: bool = True, destination_name: str | None = None) -> airbyte.shared.state_providers.StateProviderBase:
235    def get_state_provider(
236        self,
237        source_name: str,
238        *,
239        refresh: bool = True,
240        destination_name: str | None = None,
241    ) -> StateProviderBase:
242        """Return a state provider for the specified source name."""
243        return self._state_backend.get_state_provider(
244            source_name=source_name,
245            table_prefix=self.table_prefix or "",
246            refresh=refresh,
247            destination_name=destination_name,
248        )

Return a state provider for the specified source name.

def get_state_writer( self, source_name: str, destination_name: str | None = None) -> airbyte.shared.state_writers.StateWriterBase:
250    def get_state_writer(
251        self,
252        source_name: str,
253        destination_name: str | None = None,
254    ) -> StateWriterBase:
255        """Return a state writer for the specified source name.
256
257        If syncing to the cache, `destination_name` should be `None`.
258        If syncing to a destination, `destination_name` should be the destination name.
259        """
260        return self._state_backend.get_state_writer(
261            source_name=source_name,
262            destination_name=destination_name,
263        )

Return a state writer for the specified source name.

If syncing to the cache, destination_name should be None. If syncing to a destination, destination_name should be the destination name.

def register_source( self, source_name: str, incoming_source_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog, stream_names: set[str]) -> None:
265    def register_source(
266        self,
267        source_name: str,
268        incoming_source_catalog: ConfiguredAirbyteCatalog,
269        stream_names: set[str],
270    ) -> None:
271        """Register the source name and catalog."""
272        self._catalog_backend.register_source(
273            source_name=source_name,
274            incoming_source_catalog=incoming_source_catalog,
275            incoming_stream_names=stream_names,
276        )

Register the source name and catalog.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
306def init_private_attributes(self: BaseModel, context: Any, /) -> None:
307    """This function is meant to behave like a BaseModel method to initialise private attributes.
308
309    It takes context as an argument since that's what pydantic-core passes when calling it.
310
311    Args:
312        self: The BaseModel instance.
313        context: The context.
314    """
315    if getattr(self, '__pydantic_private__', None) is None:
316        pydantic_private = {}
317        for name, private_attr in self.__private_attributes__.items():
318            default = private_attr.get_default()
319            if default is not PydanticUndefined:
320                pydantic_private[name] = default
321        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
get_sql_alchemy_url
get_database_name
get_create_table_extra_clauses
get_sql_engine
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class DuckDBCache(airbyte._processors.sql.duckdb.DuckDBConfig, airbyte.caches.CacheBase):
38class DuckDBCache(DuckDBConfig, CacheBase):
39    """A DuckDB cache."""
40
41    _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)

A DuckDB cache.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='main'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'db_path': FieldInfo(annotation=Union[Path, str], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.duckdb.DuckDBConfig
db_path
schema_name
get_sql_alchemy_url
get_database_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
62class MotherDuckCache(MotherDuckConfig, DuckDBCache):
63    """Cache that uses MotherDuck for external persistent storage."""
64
65    _sql_processor_class: type[MotherDuckSqlProcessor] = PrivateAttr(default=MotherDuckSqlProcessor)

Cache that uses MotherDuck for external persistent storage.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='main'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'db_path': FieldInfo(annotation=str, required=False, default='md:'), 'database': FieldInfo(annotation=str, required=True), 'api_key': FieldInfo(annotation=SecretString, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
airbyte.caches.motherduck.MotherDuckConfig
database
api_key
db_path
get_sql_alchemy_url
get_database_name
airbyte._processors.sql.duckdb.DuckDBConfig
schema_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class PostgresCache(airbyte._processors.sql.postgres.PostgresConfig, airbyte.caches.CacheBase):
29class PostgresCache(PostgresConfig, CacheBase):
30    """Configuration for the Postgres cache.
31
32    Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
33    """
34
35    _sql_processor_class = PrivateAttr(default=PostgresSqlProcessor)

Configuration for the Postgres cache.

Also inherits config from the JsonlWriter, which is responsible for writing files to disk.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'host': FieldInfo(annotation=str, required=True), 'port': FieldInfo(annotation=int, required=True), 'database': FieldInfo(annotation=str, required=True), 'username': FieldInfo(annotation=str, required=True), 'password': FieldInfo(annotation=Union[SecretString, str], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.postgres.PostgresConfig
host
port
database
username
password
get_sql_alchemy_url
get_database_name
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
get_create_table_extra_clauses
get_sql_engine
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class SnowflakeCache(airbyte._processors.sql.snowflake.SnowflakeConfig, airbyte.caches.CacheBase):
32class SnowflakeCache(SnowflakeConfig, CacheBase):
33    """Configuration for the Snowflake cache."""
34
35    dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND
36
37    _sql_processor_class = PrivateAttr(default=SnowflakeSqlProcessor)

Configuration for the Snowflake cache.

dedupe_mode: airbyte.shared.sql_processor.RecordDedupeMode
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'account': FieldInfo(annotation=str, required=True), 'username': FieldInfo(annotation=str, required=True), 'password': FieldInfo(annotation=SecretString, required=True), 'warehouse': FieldInfo(annotation=str, required=True), 'database': FieldInfo(annotation=str, required=True), 'role': FieldInfo(annotation=str, required=True), 'data_retention_time_in_days': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'dedupe_mode': FieldInfo(annotation=RecordDedupeMode, required=False, default=<RecordDedupeMode.APPEND: 'append'>)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.snowflake.SnowflakeConfig
account
username
password
warehouse
database
role
schema_name
data_retention_time_in_days
get_create_table_extra_clauses
get_database_name
get_sql_alchemy_url
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name