airbyte.caches.base
SQL Cache implementation.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""SQL Cache implementation.""" 3 4from __future__ import annotations 5 6from pathlib import Path 7from typing import TYPE_CHECKING, Any, Optional, final 8 9import pandas as pd 10from pydantic import Field, PrivateAttr 11 12from airbyte_protocol.models import ConfiguredAirbyteCatalog 13 14from airbyte._future_cdk.catalog_providers import CatalogProvider 15from airbyte._future_cdk.sql_processor import ( 16 SqlConfig, 17 SqlProcessorBase, 18) 19from airbyte._future_cdk.state_writers import StdOutStateWriter 20from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend 21from airbyte.caches._state_backend import SqlStateBackend 22from airbyte.datasets._sql import CachedDataset 23 24 25if TYPE_CHECKING: 26 from collections.abc import Iterator 27 28 from airbyte._future_cdk.sql_processor import SqlProcessorBase 29 from airbyte._future_cdk.state_providers import StateProviderBase 30 from airbyte._future_cdk.state_writers import StateWriterBase 31 from airbyte.caches._state_backend_base import StateBackendBase 32 from airbyte.datasets._base import DatasetBase 33 34 35class CacheBase(SqlConfig): 36 """Base configuration for a cache. 37 38 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 39 and basic connectivity to the SQL database. 40 41 The cache is responsible for managing the state of the data synced to the cache, including the 42 stream catalog and stream state. The cache also provides the mechanism to read and write data 43 to the SQL backend specified in the `SqlConfig` class. 44 """ 45 46 cache_dir: Path = Field(default=Path(".cache")) 47 """The directory to store the cache in.""" 48 49 cleanup: bool = True 50 """Whether to clean up the cache after use.""" 51 52 _deployed_api_root: Optional[str] = PrivateAttr(default=None) 53 _deployed_workspace_id: Optional[str] = PrivateAttr(default=None) 54 _deployed_destination_id: Optional[str] = PrivateAttr(default=None) 55 56 _sql_processor_class: type[SqlProcessorBase] = PrivateAttr() 57 _read_processor: SqlProcessorBase = PrivateAttr() 58 59 _catalog_backend: CatalogBackendBase = PrivateAttr() 60 _state_backend: StateBackendBase = PrivateAttr() 61 62 def __init__(self, **data: Any) -> None: # noqa: ANN401 63 """Initialize the cache and backends.""" 64 super().__init__(**data) 65 66 # Create a temporary processor to do the work of ensuring the schema exists 67 temp_processor = self._sql_processor_class( 68 sql_config=self, 69 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 70 state_writer=StdOutStateWriter(), 71 temp_dir=self.cache_dir, 72 temp_file_cleanup=self.cleanup, 73 ) 74 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 75 76 # Initialize the catalog and state backends 77 self._catalog_backend = SqlCatalogBackend( 78 engine=self.get_sql_engine(), 79 table_prefix=self.table_prefix or "", 80 ) 81 self._state_backend = SqlStateBackend( 82 engine=self.get_sql_engine(), 83 table_prefix=self.table_prefix or "", 84 ) 85 86 # Now we can create the SQL read processor 87 self._read_processor = self._sql_processor_class( 88 sql_config=self, 89 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 90 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 91 temp_dir=self.cache_dir, 92 temp_file_cleanup=self.cleanup, 93 ) 94 95 @final 96 @property 97 def processor(self) -> SqlProcessorBase: 98 """Return the SQL processor instance.""" 99 return self._read_processor 100 101 def get_record_processor( 102 self, 103 source_name: str, 104 catalog_provider: CatalogProvider, 105 ) -> SqlProcessorBase: 106 """Return a record processor for the specified source name and catalog. 107 108 We first register the source and its catalog with the catalog manager. Then we create a new 109 SQL processor instance with (only) the given input catalog. 110 111 For the state writer, we use a state writer which stores state in an internal SQL table. 112 """ 113 # First register the source and catalog into durable storage. This is necessary to ensure 114 # that we can later retrieve the catalog information. 115 self.register_source( 116 source_name=source_name, 117 incoming_source_catalog=catalog_provider.configured_catalog, 118 stream_names=set(catalog_provider.stream_names), 119 ) 120 121 # Next create a new SQL processor instance with the given catalog - and a state writer 122 # that writes state to the internal SQL table and associates with the given source name. 123 return self._sql_processor_class( 124 sql_config=self, 125 catalog_provider=catalog_provider, 126 state_writer=self.get_state_writer(source_name=source_name), 127 temp_dir=self.cache_dir, 128 temp_file_cleanup=self.cleanup, 129 ) 130 131 # Read methods: 132 133 def get_records( 134 self, 135 stream_name: str, 136 ) -> CachedDataset: 137 """Uses SQLAlchemy to select all rows from the table.""" 138 return CachedDataset(self, stream_name) 139 140 def get_pandas_dataframe( 141 self, 142 stream_name: str, 143 ) -> pd.DataFrame: 144 """Return a Pandas data frame with the stream's data.""" 145 table_name = self._read_processor.get_sql_table_name(stream_name) 146 engine = self.get_sql_engine() 147 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 148 149 @final 150 @property 151 def streams(self) -> dict[str, CachedDataset]: 152 """Return a temporary table name.""" 153 result = {} 154 stream_names = set(self._catalog_backend.stream_names) 155 156 for stream_name in stream_names: 157 result[stream_name] = CachedDataset(self, stream_name) 158 159 return result 160 161 def get_state_provider( 162 self, 163 source_name: str, 164 *, 165 refresh: bool = True, 166 ) -> StateProviderBase: 167 """Return a state provider for the specified source name.""" 168 return self._state_backend.get_state_provider( 169 source_name=source_name, 170 table_prefix=self.table_prefix or "", 171 refresh=refresh, 172 ) 173 174 def get_state_writer( 175 self, 176 source_name: str, 177 ) -> StateWriterBase: 178 """Return a state writer for the specified source name.""" 179 return self._state_backend.get_state_writer(source_name=source_name) 180 181 def register_source( 182 self, 183 source_name: str, 184 incoming_source_catalog: ConfiguredAirbyteCatalog, 185 stream_names: set[str], 186 ) -> None: 187 """Register the source name and catalog.""" 188 self._catalog_backend.register_source( 189 source_name=source_name, 190 incoming_source_catalog=incoming_source_catalog, 191 incoming_stream_names=stream_names, 192 ) 193 194 def __getitem__(self, stream: str) -> DatasetBase: 195 """Return a dataset by stream name.""" 196 return self.streams[stream] 197 198 def __contains__(self, stream: str) -> bool: 199 """Return whether a stream is in the cache.""" 200 return stream in (self._catalog_backend.stream_names) 201 202 def __iter__( # type: ignore [override] # Overriding Pydantic model method 203 self, 204 ) -> Iterator[tuple[str, Any]]: 205 """Iterate over the streams in the cache.""" 206 return ((name, dataset) for name, dataset in self.streams.items())
36class CacheBase(SqlConfig): 37 """Base configuration for a cache. 38 39 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 40 and basic connectivity to the SQL database. 41 42 The cache is responsible for managing the state of the data synced to the cache, including the 43 stream catalog and stream state. The cache also provides the mechanism to read and write data 44 to the SQL backend specified in the `SqlConfig` class. 45 """ 46 47 cache_dir: Path = Field(default=Path(".cache")) 48 """The directory to store the cache in.""" 49 50 cleanup: bool = True 51 """Whether to clean up the cache after use.""" 52 53 _deployed_api_root: Optional[str] = PrivateAttr(default=None) 54 _deployed_workspace_id: Optional[str] = PrivateAttr(default=None) 55 _deployed_destination_id: Optional[str] = PrivateAttr(default=None) 56 57 _sql_processor_class: type[SqlProcessorBase] = PrivateAttr() 58 _read_processor: SqlProcessorBase = PrivateAttr() 59 60 _catalog_backend: CatalogBackendBase = PrivateAttr() 61 _state_backend: StateBackendBase = PrivateAttr() 62 63 def __init__(self, **data: Any) -> None: # noqa: ANN401 64 """Initialize the cache and backends.""" 65 super().__init__(**data) 66 67 # Create a temporary processor to do the work of ensuring the schema exists 68 temp_processor = self._sql_processor_class( 69 sql_config=self, 70 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 71 state_writer=StdOutStateWriter(), 72 temp_dir=self.cache_dir, 73 temp_file_cleanup=self.cleanup, 74 ) 75 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 76 77 # Initialize the catalog and state backends 78 self._catalog_backend = SqlCatalogBackend( 79 engine=self.get_sql_engine(), 80 table_prefix=self.table_prefix or "", 81 ) 82 self._state_backend = SqlStateBackend( 83 engine=self.get_sql_engine(), 84 table_prefix=self.table_prefix or "", 85 ) 86 87 # Now we can create the SQL read processor 88 self._read_processor = self._sql_processor_class( 89 sql_config=self, 90 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 91 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 92 temp_dir=self.cache_dir, 93 temp_file_cleanup=self.cleanup, 94 ) 95 96 @final 97 @property 98 def processor(self) -> SqlProcessorBase: 99 """Return the SQL processor instance.""" 100 return self._read_processor 101 102 def get_record_processor( 103 self, 104 source_name: str, 105 catalog_provider: CatalogProvider, 106 ) -> SqlProcessorBase: 107 """Return a record processor for the specified source name and catalog. 108 109 We first register the source and its catalog with the catalog manager. Then we create a new 110 SQL processor instance with (only) the given input catalog. 111 112 For the state writer, we use a state writer which stores state in an internal SQL table. 113 """ 114 # First register the source and catalog into durable storage. This is necessary to ensure 115 # that we can later retrieve the catalog information. 116 self.register_source( 117 source_name=source_name, 118 incoming_source_catalog=catalog_provider.configured_catalog, 119 stream_names=set(catalog_provider.stream_names), 120 ) 121 122 # Next create a new SQL processor instance with the given catalog - and a state writer 123 # that writes state to the internal SQL table and associates with the given source name. 124 return self._sql_processor_class( 125 sql_config=self, 126 catalog_provider=catalog_provider, 127 state_writer=self.get_state_writer(source_name=source_name), 128 temp_dir=self.cache_dir, 129 temp_file_cleanup=self.cleanup, 130 ) 131 132 # Read methods: 133 134 def get_records( 135 self, 136 stream_name: str, 137 ) -> CachedDataset: 138 """Uses SQLAlchemy to select all rows from the table.""" 139 return CachedDataset(self, stream_name) 140 141 def get_pandas_dataframe( 142 self, 143 stream_name: str, 144 ) -> pd.DataFrame: 145 """Return a Pandas data frame with the stream's data.""" 146 table_name = self._read_processor.get_sql_table_name(stream_name) 147 engine = self.get_sql_engine() 148 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 149 150 @final 151 @property 152 def streams(self) -> dict[str, CachedDataset]: 153 """Return a temporary table name.""" 154 result = {} 155 stream_names = set(self._catalog_backend.stream_names) 156 157 for stream_name in stream_names: 158 result[stream_name] = CachedDataset(self, stream_name) 159 160 return result 161 162 def get_state_provider( 163 self, 164 source_name: str, 165 *, 166 refresh: bool = True, 167 ) -> StateProviderBase: 168 """Return a state provider for the specified source name.""" 169 return self._state_backend.get_state_provider( 170 source_name=source_name, 171 table_prefix=self.table_prefix or "", 172 refresh=refresh, 173 ) 174 175 def get_state_writer( 176 self, 177 source_name: str, 178 ) -> StateWriterBase: 179 """Return a state writer for the specified source name.""" 180 return self._state_backend.get_state_writer(source_name=source_name) 181 182 def register_source( 183 self, 184 source_name: str, 185 incoming_source_catalog: ConfiguredAirbyteCatalog, 186 stream_names: set[str], 187 ) -> None: 188 """Register the source name and catalog.""" 189 self._catalog_backend.register_source( 190 source_name=source_name, 191 incoming_source_catalog=incoming_source_catalog, 192 incoming_stream_names=stream_names, 193 ) 194 195 def __getitem__(self, stream: str) -> DatasetBase: 196 """Return a dataset by stream name.""" 197 return self.streams[stream] 198 199 def __contains__(self, stream: str) -> bool: 200 """Return whether a stream is in the cache.""" 201 return stream in (self._catalog_backend.stream_names) 202 203 def __iter__( # type: ignore [override] # Overriding Pydantic model method 204 self, 205 ) -> Iterator[tuple[str, Any]]: 206 """Iterate over the streams in the cache.""" 207 return ((name, dataset) for name, dataset in self.streams.items())
Base configuration for a cache.
Caches inherit from the matching SqlConfig
class, which provides the SQL config settings
and basic connectivity to the SQL database.
The cache is responsible for managing the state of the data synced to the cache, including the
stream catalog and stream state. The cache also provides the mechanism to read and write data
to the SQL backend specified in the SqlConfig
class.
63 def __init__(self, **data: Any) -> None: # noqa: ANN401 64 """Initialize the cache and backends.""" 65 super().__init__(**data) 66 67 # Create a temporary processor to do the work of ensuring the schema exists 68 temp_processor = self._sql_processor_class( 69 sql_config=self, 70 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 71 state_writer=StdOutStateWriter(), 72 temp_dir=self.cache_dir, 73 temp_file_cleanup=self.cleanup, 74 ) 75 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 76 77 # Initialize the catalog and state backends 78 self._catalog_backend = SqlCatalogBackend( 79 engine=self.get_sql_engine(), 80 table_prefix=self.table_prefix or "", 81 ) 82 self._state_backend = SqlStateBackend( 83 engine=self.get_sql_engine(), 84 table_prefix=self.table_prefix or "", 85 ) 86 87 # Now we can create the SQL read processor 88 self._read_processor = self._sql_processor_class( 89 sql_config=self, 90 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 91 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 92 temp_dir=self.cache_dir, 93 temp_file_cleanup=self.cleanup, 94 )
Initialize the cache and backends.
96 @final 97 @property 98 def processor(self) -> SqlProcessorBase: 99 """Return the SQL processor instance.""" 100 return self._read_processor
Return the SQL processor instance.
102 def get_record_processor( 103 self, 104 source_name: str, 105 catalog_provider: CatalogProvider, 106 ) -> SqlProcessorBase: 107 """Return a record processor for the specified source name and catalog. 108 109 We first register the source and its catalog with the catalog manager. Then we create a new 110 SQL processor instance with (only) the given input catalog. 111 112 For the state writer, we use a state writer which stores state in an internal SQL table. 113 """ 114 # First register the source and catalog into durable storage. This is necessary to ensure 115 # that we can later retrieve the catalog information. 116 self.register_source( 117 source_name=source_name, 118 incoming_source_catalog=catalog_provider.configured_catalog, 119 stream_names=set(catalog_provider.stream_names), 120 ) 121 122 # Next create a new SQL processor instance with the given catalog - and a state writer 123 # that writes state to the internal SQL table and associates with the given source name. 124 return self._sql_processor_class( 125 sql_config=self, 126 catalog_provider=catalog_provider, 127 state_writer=self.get_state_writer(source_name=source_name), 128 temp_dir=self.cache_dir, 129 temp_file_cleanup=self.cleanup, 130 )
Return a record processor for the specified source name and catalog.
We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.
For the state writer, we use a state writer which stores state in an internal SQL table.
134 def get_records( 135 self, 136 stream_name: str, 137 ) -> CachedDataset: 138 """Uses SQLAlchemy to select all rows from the table.""" 139 return CachedDataset(self, stream_name)
Uses SQLAlchemy to select all rows from the table.
141 def get_pandas_dataframe( 142 self, 143 stream_name: str, 144 ) -> pd.DataFrame: 145 """Return a Pandas data frame with the stream's data.""" 146 table_name = self._read_processor.get_sql_table_name(stream_name) 147 engine = self.get_sql_engine() 148 return pd.read_sql_table(table_name, engine, schema=self.schema_name)
Return a Pandas data frame with the stream's data.
150 @final 151 @property 152 def streams(self) -> dict[str, CachedDataset]: 153 """Return a temporary table name.""" 154 result = {} 155 stream_names = set(self._catalog_backend.stream_names) 156 157 for stream_name in stream_names: 158 result[stream_name] = CachedDataset(self, stream_name) 159 160 return result
Return a temporary table name.
162 def get_state_provider( 163 self, 164 source_name: str, 165 *, 166 refresh: bool = True, 167 ) -> StateProviderBase: 168 """Return a state provider for the specified source name.""" 169 return self._state_backend.get_state_provider( 170 source_name=source_name, 171 table_prefix=self.table_prefix or "", 172 refresh=refresh, 173 )
Return a state provider for the specified source name.
175 def get_state_writer( 176 self, 177 source_name: str, 178 ) -> StateWriterBase: 179 """Return a state writer for the specified source name.""" 180 return self._state_backend.get_state_writer(source_name=source_name)
Return a state writer for the specified source name.
182 def register_source( 183 self, 184 source_name: str, 185 incoming_source_catalog: ConfiguredAirbyteCatalog, 186 stream_names: set[str], 187 ) -> None: 188 """Register the source name and catalog.""" 189 self._catalog_backend.register_source( 190 source_name=source_name, 191 incoming_source_catalog=incoming_source_catalog, 192 incoming_stream_names=stream_names, 193 )
Register the source name and catalog.
Inherited Members
- airbyte._future_cdk.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_sql_alchemy_url
- get_database_name
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- Config
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs