airbyte.caches.base

SQL Cache implementation.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""SQL Cache implementation."""
  3
  4from __future__ import annotations
  5
  6from pathlib import Path
  7from typing import TYPE_CHECKING, Any, Optional, final
  8
  9import pandas as pd
 10from pydantic import Field, PrivateAttr
 11
 12from airbyte_protocol.models import ConfiguredAirbyteCatalog
 13
 14from airbyte._future_cdk.catalog_providers import CatalogProvider
 15from airbyte._future_cdk.sql_processor import (
 16    SqlConfig,
 17    SqlProcessorBase,
 18)
 19from airbyte._future_cdk.state_writers import StdOutStateWriter
 20from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend
 21from airbyte.caches._state_backend import SqlStateBackend
 22from airbyte.datasets._sql import CachedDataset
 23
 24
 25if TYPE_CHECKING:
 26    from collections.abc import Iterator
 27
 28    from airbyte._future_cdk.sql_processor import SqlProcessorBase
 29    from airbyte._future_cdk.state_providers import StateProviderBase
 30    from airbyte._future_cdk.state_writers import StateWriterBase
 31    from airbyte.caches._state_backend_base import StateBackendBase
 32    from airbyte.datasets._base import DatasetBase
 33
 34
 35class CacheBase(SqlConfig):
 36    """Base configuration for a cache.
 37
 38    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 39    and basic connectivity to the SQL database.
 40
 41    The cache is responsible for managing the state of the data synced to the cache, including the
 42    stream catalog and stream state. The cache also provides the mechanism to read and write data
 43    to the SQL backend specified in the `SqlConfig` class.
 44    """
 45
 46    cache_dir: Path = Field(default=Path(".cache"))
 47    """The directory to store the cache in."""
 48
 49    cleanup: bool = True
 50    """Whether to clean up the cache after use."""
 51
 52    _deployed_api_root: Optional[str] = PrivateAttr(default=None)
 53    _deployed_workspace_id: Optional[str] = PrivateAttr(default=None)
 54    _deployed_destination_id: Optional[str] = PrivateAttr(default=None)
 55
 56    _sql_processor_class: type[SqlProcessorBase] = PrivateAttr()
 57    _read_processor: SqlProcessorBase = PrivateAttr()
 58
 59    _catalog_backend: CatalogBackendBase = PrivateAttr()
 60    _state_backend: StateBackendBase = PrivateAttr()
 61
 62    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 63        """Initialize the cache and backends."""
 64        super().__init__(**data)
 65
 66        # Create a temporary processor to do the work of ensuring the schema exists
 67        temp_processor = self._sql_processor_class(
 68            sql_config=self,
 69            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 70            state_writer=StdOutStateWriter(),
 71            temp_dir=self.cache_dir,
 72            temp_file_cleanup=self.cleanup,
 73        )
 74        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 75
 76        # Initialize the catalog and state backends
 77        self._catalog_backend = SqlCatalogBackend(
 78            engine=self.get_sql_engine(),
 79            table_prefix=self.table_prefix or "",
 80        )
 81        self._state_backend = SqlStateBackend(
 82            engine=self.get_sql_engine(),
 83            table_prefix=self.table_prefix or "",
 84        )
 85
 86        # Now we can create the SQL read processor
 87        self._read_processor = self._sql_processor_class(
 88            sql_config=self,
 89            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
 90            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
 91            temp_dir=self.cache_dir,
 92            temp_file_cleanup=self.cleanup,
 93        )
 94
 95    @final
 96    @property
 97    def processor(self) -> SqlProcessorBase:
 98        """Return the SQL processor instance."""
 99        return self._read_processor
100
101    def get_record_processor(
102        self,
103        source_name: str,
104        catalog_provider: CatalogProvider,
105    ) -> SqlProcessorBase:
106        """Return a record processor for the specified source name and catalog.
107
108        We first register the source and its catalog with the catalog manager. Then we create a new
109        SQL processor instance with (only) the given input catalog.
110
111        For the state writer, we use a state writer which stores state in an internal SQL table.
112        """
113        # First register the source and catalog into durable storage. This is necessary to ensure
114        # that we can later retrieve the catalog information.
115        self.register_source(
116            source_name=source_name,
117            incoming_source_catalog=catalog_provider.configured_catalog,
118            stream_names=set(catalog_provider.stream_names),
119        )
120
121        # Next create a new SQL processor instance with the given catalog - and a state writer
122        # that writes state to the internal SQL table and associates with the given source name.
123        return self._sql_processor_class(
124            sql_config=self,
125            catalog_provider=catalog_provider,
126            state_writer=self.get_state_writer(source_name=source_name),
127            temp_dir=self.cache_dir,
128            temp_file_cleanup=self.cleanup,
129        )
130
131    # Read methods:
132
133    def get_records(
134        self,
135        stream_name: str,
136    ) -> CachedDataset:
137        """Uses SQLAlchemy to select all rows from the table."""
138        return CachedDataset(self, stream_name)
139
140    def get_pandas_dataframe(
141        self,
142        stream_name: str,
143    ) -> pd.DataFrame:
144        """Return a Pandas data frame with the stream's data."""
145        table_name = self._read_processor.get_sql_table_name(stream_name)
146        engine = self.get_sql_engine()
147        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
148
149    @final
150    @property
151    def streams(self) -> dict[str, CachedDataset]:
152        """Return a temporary table name."""
153        result = {}
154        stream_names = set(self._catalog_backend.stream_names)
155
156        for stream_name in stream_names:
157            result[stream_name] = CachedDataset(self, stream_name)
158
159        return result
160
161    def get_state_provider(
162        self,
163        source_name: str,
164        *,
165        refresh: bool = True,
166    ) -> StateProviderBase:
167        """Return a state provider for the specified source name."""
168        return self._state_backend.get_state_provider(
169            source_name=source_name,
170            table_prefix=self.table_prefix or "",
171            refresh=refresh,
172        )
173
174    def get_state_writer(
175        self,
176        source_name: str,
177    ) -> StateWriterBase:
178        """Return a state writer for the specified source name."""
179        return self._state_backend.get_state_writer(source_name=source_name)
180
181    def register_source(
182        self,
183        source_name: str,
184        incoming_source_catalog: ConfiguredAirbyteCatalog,
185        stream_names: set[str],
186    ) -> None:
187        """Register the source name and catalog."""
188        self._catalog_backend.register_source(
189            source_name=source_name,
190            incoming_source_catalog=incoming_source_catalog,
191            incoming_stream_names=stream_names,
192        )
193
194    def __getitem__(self, stream: str) -> DatasetBase:
195        """Return a dataset by stream name."""
196        return self.streams[stream]
197
198    def __contains__(self, stream: str) -> bool:
199        """Return whether a stream is in the cache."""
200        return stream in (self._catalog_backend.stream_names)
201
202    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
203        self,
204    ) -> Iterator[tuple[str, Any]]:
205        """Iterate over the streams in the cache."""
206        return ((name, dataset) for name, dataset in self.streams.items())
class CacheBase(airbyte._future_cdk.sql_processor.SqlConfig):
 36class CacheBase(SqlConfig):
 37    """Base configuration for a cache.
 38
 39    Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings
 40    and basic connectivity to the SQL database.
 41
 42    The cache is responsible for managing the state of the data synced to the cache, including the
 43    stream catalog and stream state. The cache also provides the mechanism to read and write data
 44    to the SQL backend specified in the `SqlConfig` class.
 45    """
 46
 47    cache_dir: Path = Field(default=Path(".cache"))
 48    """The directory to store the cache in."""
 49
 50    cleanup: bool = True
 51    """Whether to clean up the cache after use."""
 52
 53    _deployed_api_root: Optional[str] = PrivateAttr(default=None)
 54    _deployed_workspace_id: Optional[str] = PrivateAttr(default=None)
 55    _deployed_destination_id: Optional[str] = PrivateAttr(default=None)
 56
 57    _sql_processor_class: type[SqlProcessorBase] = PrivateAttr()
 58    _read_processor: SqlProcessorBase = PrivateAttr()
 59
 60    _catalog_backend: CatalogBackendBase = PrivateAttr()
 61    _state_backend: StateBackendBase = PrivateAttr()
 62
 63    def __init__(self, **data: Any) -> None:  # noqa: ANN401
 64        """Initialize the cache and backends."""
 65        super().__init__(**data)
 66
 67        # Create a temporary processor to do the work of ensuring the schema exists
 68        temp_processor = self._sql_processor_class(
 69            sql_config=self,
 70            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
 71            state_writer=StdOutStateWriter(),
 72            temp_dir=self.cache_dir,
 73            temp_file_cleanup=self.cleanup,
 74        )
 75        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
 76
 77        # Initialize the catalog and state backends
 78        self._catalog_backend = SqlCatalogBackend(
 79            engine=self.get_sql_engine(),
 80            table_prefix=self.table_prefix or "",
 81        )
 82        self._state_backend = SqlStateBackend(
 83            engine=self.get_sql_engine(),
 84            table_prefix=self.table_prefix or "",
 85        )
 86
 87        # Now we can create the SQL read processor
 88        self._read_processor = self._sql_processor_class(
 89            sql_config=self,
 90            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
 91            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
 92            temp_dir=self.cache_dir,
 93            temp_file_cleanup=self.cleanup,
 94        )
 95
 96    @final
 97    @property
 98    def processor(self) -> SqlProcessorBase:
 99        """Return the SQL processor instance."""
100        return self._read_processor
101
102    def get_record_processor(
103        self,
104        source_name: str,
105        catalog_provider: CatalogProvider,
106    ) -> SqlProcessorBase:
107        """Return a record processor for the specified source name and catalog.
108
109        We first register the source and its catalog with the catalog manager. Then we create a new
110        SQL processor instance with (only) the given input catalog.
111
112        For the state writer, we use a state writer which stores state in an internal SQL table.
113        """
114        # First register the source and catalog into durable storage. This is necessary to ensure
115        # that we can later retrieve the catalog information.
116        self.register_source(
117            source_name=source_name,
118            incoming_source_catalog=catalog_provider.configured_catalog,
119            stream_names=set(catalog_provider.stream_names),
120        )
121
122        # Next create a new SQL processor instance with the given catalog - and a state writer
123        # that writes state to the internal SQL table and associates with the given source name.
124        return self._sql_processor_class(
125            sql_config=self,
126            catalog_provider=catalog_provider,
127            state_writer=self.get_state_writer(source_name=source_name),
128            temp_dir=self.cache_dir,
129            temp_file_cleanup=self.cleanup,
130        )
131
132    # Read methods:
133
134    def get_records(
135        self,
136        stream_name: str,
137    ) -> CachedDataset:
138        """Uses SQLAlchemy to select all rows from the table."""
139        return CachedDataset(self, stream_name)
140
141    def get_pandas_dataframe(
142        self,
143        stream_name: str,
144    ) -> pd.DataFrame:
145        """Return a Pandas data frame with the stream's data."""
146        table_name = self._read_processor.get_sql_table_name(stream_name)
147        engine = self.get_sql_engine()
148        return pd.read_sql_table(table_name, engine, schema=self.schema_name)
149
150    @final
151    @property
152    def streams(self) -> dict[str, CachedDataset]:
153        """Return a temporary table name."""
154        result = {}
155        stream_names = set(self._catalog_backend.stream_names)
156
157        for stream_name in stream_names:
158            result[stream_name] = CachedDataset(self, stream_name)
159
160        return result
161
162    def get_state_provider(
163        self,
164        source_name: str,
165        *,
166        refresh: bool = True,
167    ) -> StateProviderBase:
168        """Return a state provider for the specified source name."""
169        return self._state_backend.get_state_provider(
170            source_name=source_name,
171            table_prefix=self.table_prefix or "",
172            refresh=refresh,
173        )
174
175    def get_state_writer(
176        self,
177        source_name: str,
178    ) -> StateWriterBase:
179        """Return a state writer for the specified source name."""
180        return self._state_backend.get_state_writer(source_name=source_name)
181
182    def register_source(
183        self,
184        source_name: str,
185        incoming_source_catalog: ConfiguredAirbyteCatalog,
186        stream_names: set[str],
187    ) -> None:
188        """Register the source name and catalog."""
189        self._catalog_backend.register_source(
190            source_name=source_name,
191            incoming_source_catalog=incoming_source_catalog,
192            incoming_stream_names=stream_names,
193        )
194
195    def __getitem__(self, stream: str) -> DatasetBase:
196        """Return a dataset by stream name."""
197        return self.streams[stream]
198
199    def __contains__(self, stream: str) -> bool:
200        """Return whether a stream is in the cache."""
201        return stream in (self._catalog_backend.stream_names)
202
203    def __iter__(  # type: ignore [override]  # Overriding Pydantic model method
204        self,
205    ) -> Iterator[tuple[str, Any]]:
206        """Iterate over the streams in the cache."""
207        return ((name, dataset) for name, dataset in self.streams.items())

Base configuration for a cache.

Caches inherit from the matching SqlConfig class, which provides the SQL config settings and basic connectivity to the SQL database.

The cache is responsible for managing the state of the data synced to the cache, including the stream catalog and stream state. The cache also provides the mechanism to read and write data to the SQL backend specified in the SqlConfig class.

CacheBase(**data: Any)
63    def __init__(self, **data: Any) -> None:  # noqa: ANN401
64        """Initialize the cache and backends."""
65        super().__init__(**data)
66
67        # Create a temporary processor to do the work of ensuring the schema exists
68        temp_processor = self._sql_processor_class(
69            sql_config=self,
70            catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])),
71            state_writer=StdOutStateWriter(),
72            temp_dir=self.cache_dir,
73            temp_file_cleanup=self.cleanup,
74        )
75        temp_processor._ensure_schema_exists()  # noqa: SLF001  # Accessing non-public member
76
77        # Initialize the catalog and state backends
78        self._catalog_backend = SqlCatalogBackend(
79            engine=self.get_sql_engine(),
80            table_prefix=self.table_prefix or "",
81        )
82        self._state_backend = SqlStateBackend(
83            engine=self.get_sql_engine(),
84            table_prefix=self.table_prefix or "",
85        )
86
87        # Now we can create the SQL read processor
88        self._read_processor = self._sql_processor_class(
89            sql_config=self,
90            catalog_provider=self._catalog_backend.get_full_catalog_provider(),
91            state_writer=StdOutStateWriter(),  # Shouldn't be needed for the read-only processor
92            temp_dir=self.cache_dir,
93            temp_file_cleanup=self.cleanup,
94        )

Initialize the cache and backends.

cache_dir: pathlib.Path

The directory to store the cache in.

cleanup: bool

Whether to clean up the cache after use.

processor: airbyte._future_cdk.sql_processor.SqlProcessorBase
 96    @final
 97    @property
 98    def processor(self) -> SqlProcessorBase:
 99        """Return the SQL processor instance."""
100        return self._read_processor

Return the SQL processor instance.

def get_record_processor( self, source_name: str, catalog_provider: airbyte._future_cdk.catalog_providers.CatalogProvider) -> airbyte._future_cdk.sql_processor.SqlProcessorBase:
102    def get_record_processor(
103        self,
104        source_name: str,
105        catalog_provider: CatalogProvider,
106    ) -> SqlProcessorBase:
107        """Return a record processor for the specified source name and catalog.
108
109        We first register the source and its catalog with the catalog manager. Then we create a new
110        SQL processor instance with (only) the given input catalog.
111
112        For the state writer, we use a state writer which stores state in an internal SQL table.
113        """
114        # First register the source and catalog into durable storage. This is necessary to ensure
115        # that we can later retrieve the catalog information.
116        self.register_source(
117            source_name=source_name,
118            incoming_source_catalog=catalog_provider.configured_catalog,
119            stream_names=set(catalog_provider.stream_names),
120        )
121
122        # Next create a new SQL processor instance with the given catalog - and a state writer
123        # that writes state to the internal SQL table and associates with the given source name.
124        return self._sql_processor_class(
125            sql_config=self,
126            catalog_provider=catalog_provider,
127            state_writer=self.get_state_writer(source_name=source_name),
128            temp_dir=self.cache_dir,
129            temp_file_cleanup=self.cleanup,
130        )

Return a record processor for the specified source name and catalog.

We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.

For the state writer, we use a state writer which stores state in an internal SQL table.

def get_records(self, stream_name: str) -> airbyte.datasets._sql.CachedDataset:
134    def get_records(
135        self,
136        stream_name: str,
137    ) -> CachedDataset:
138        """Uses SQLAlchemy to select all rows from the table."""
139        return CachedDataset(self, stream_name)

Uses SQLAlchemy to select all rows from the table.

def get_pandas_dataframe(self, stream_name: str) -> pandas.core.frame.DataFrame:
141    def get_pandas_dataframe(
142        self,
143        stream_name: str,
144    ) -> pd.DataFrame:
145        """Return a Pandas data frame with the stream's data."""
146        table_name = self._read_processor.get_sql_table_name(stream_name)
147        engine = self.get_sql_engine()
148        return pd.read_sql_table(table_name, engine, schema=self.schema_name)

Return a Pandas data frame with the stream's data.

streams: dict[str, airbyte.datasets._sql.CachedDataset]
150    @final
151    @property
152    def streams(self) -> dict[str, CachedDataset]:
153        """Return a temporary table name."""
154        result = {}
155        stream_names = set(self._catalog_backend.stream_names)
156
157        for stream_name in stream_names:
158            result[stream_name] = CachedDataset(self, stream_name)
159
160        return result

Return a temporary table name.

def get_state_provider( self, source_name: str, *, refresh: bool = True) -> airbyte._future_cdk.state_providers.StateProviderBase:
162    def get_state_provider(
163        self,
164        source_name: str,
165        *,
166        refresh: bool = True,
167    ) -> StateProviderBase:
168        """Return a state provider for the specified source name."""
169        return self._state_backend.get_state_provider(
170            source_name=source_name,
171            table_prefix=self.table_prefix or "",
172            refresh=refresh,
173        )

Return a state provider for the specified source name.

def get_state_writer( self, source_name: str) -> airbyte._future_cdk.state_writers.StateWriterBase:
175    def get_state_writer(
176        self,
177        source_name: str,
178    ) -> StateWriterBase:
179        """Return a state writer for the specified source name."""
180        return self._state_backend.get_state_writer(source_name=source_name)

Return a state writer for the specified source name.

def register_source( self, source_name: str, incoming_source_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog, stream_names: set[str]) -> None:
182    def register_source(
183        self,
184        source_name: str,
185        incoming_source_catalog: ConfiguredAirbyteCatalog,
186        stream_names: set[str],
187    ) -> None:
188        """Register the source name and catalog."""
189        self._catalog_backend.register_source(
190            source_name=source_name,
191            incoming_source_catalog=incoming_source_catalog,
192            incoming_stream_names=stream_names,
193        )

Register the source name and catalog.

Inherited Members
airbyte._future_cdk.sql_processor.SqlConfig
schema_name
table_prefix
get_sql_alchemy_url
get_database_name
get_sql_engine
get_vendor_client
pydantic.main.BaseModel
Config
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs