airbyte.caches.base
SQL Cache implementation.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""SQL Cache implementation.""" 3 4from __future__ import annotations 5 6from pathlib import Path 7from typing import IO, TYPE_CHECKING, Any, ClassVar, final 8 9import pandas as pd 10import pyarrow as pa 11import pyarrow.dataset as ds 12from pydantic import Field, PrivateAttr 13from sqlalchemy import text 14 15from airbyte_protocol.models import ConfiguredAirbyteCatalog 16 17from airbyte import constants 18from airbyte._writers.base import AirbyteWriterInterface 19from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend 20from airbyte.caches._state_backend import SqlStateBackend 21from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP 22from airbyte.datasets._sql import CachedDataset 23from airbyte.shared.catalog_providers import CatalogProvider 24from airbyte.shared.sql_processor import SqlConfig 25from airbyte.shared.state_writers import StdOutStateWriter 26 27 28if TYPE_CHECKING: 29 from collections.abc import Iterator 30 31 from airbyte._message_iterators import AirbyteMessageIterator 32 from airbyte.caches._state_backend_base import StateBackendBase 33 from airbyte.progress import ProgressTracker 34 from airbyte.shared.sql_processor import SqlProcessorBase 35 from airbyte.shared.state_providers import StateProviderBase 36 from airbyte.shared.state_writers import StateWriterBase 37 from airbyte.strategies import WriteStrategy 38 39 40class CacheBase(SqlConfig, AirbyteWriterInterface): 41 """Base configuration for a cache. 42 43 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 44 and basic connectivity to the SQL database. 45 46 The cache is responsible for managing the state of the data synced to the cache, including the 47 stream catalog and stream state. The cache also provides the mechanism to read and write data 48 to the SQL backend specified in the `SqlConfig` class. 49 """ 50 51 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 52 """The directory to store the cache in.""" 53 54 cleanup: bool = TEMP_FILE_CLEANUP 55 """Whether to clean up the cache after use.""" 56 57 _name: str = PrivateAttr() 58 59 _sql_processor_class: ClassVar[type[SqlProcessorBase]] 60 _read_processor: SqlProcessorBase = PrivateAttr() 61 62 _catalog_backend: CatalogBackendBase = PrivateAttr() 63 _state_backend: StateBackendBase = PrivateAttr() 64 65 paired_destination_name: ClassVar[str | None] = None 66 paired_destination_config_class: ClassVar[type | None] = None 67 68 @property 69 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 70 """Return a dictionary of destination configuration values.""" 71 raise NotImplementedError( 72 f"The type '{type(self).__name__}' does not define an equivalent destination " 73 "configuration." 74 ) 75 76 def __init__(self, **data: Any) -> None: # noqa: ANN401 77 """Initialize the cache and backends.""" 78 super().__init__(**data) 79 80 # Create a temporary processor to do the work of ensuring the schema exists 81 temp_processor = self._sql_processor_class( 82 sql_config=self, 83 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 84 state_writer=StdOutStateWriter(), 85 temp_dir=self.cache_dir, 86 temp_file_cleanup=self.cleanup, 87 ) 88 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 89 90 # Initialize the catalog and state backends 91 self._catalog_backend = SqlCatalogBackend( 92 engine=self.get_sql_engine(), 93 table_prefix=self.table_prefix or "", 94 ) 95 self._state_backend = SqlStateBackend( 96 engine=self.get_sql_engine(), 97 table_prefix=self.table_prefix or "", 98 ) 99 100 # Now we can create the SQL read processor 101 self._read_processor = self._sql_processor_class( 102 sql_config=self, 103 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 104 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 105 temp_dir=self.cache_dir, 106 temp_file_cleanup=self.cleanup, 107 ) 108 109 @property 110 def config_hash(self) -> str | None: 111 """Return a hash of the cache configuration. 112 113 This is the same as the SQLConfig hash from the superclass. 114 """ 115 return super(SqlConfig, self).config_hash 116 117 def execute_sql(self, sql: str | list[str]) -> None: 118 """Execute one or more SQL statements against the cache's SQL backend. 119 120 If multiple SQL statements are given, they are executed in order, 121 within the same transaction. 122 123 This method is useful for creating tables, indexes, and other 124 schema objects in the cache. It does not return any results and it 125 automatically closes the connection after executing all statements. 126 127 This method is not intended for querying data. For that, use the `get_records` 128 method - or for a low-level interface, use the `get_sql_engine` method. 129 130 If any of the statements fail, the transaction is canceled and an exception 131 is raised. Most databases will rollback the transaction in this case. 132 """ 133 if isinstance(sql, str): 134 # Coerce to a list if a single string is given 135 sql = [sql] 136 137 with self.processor.get_sql_connection() as connection: 138 for sql_statement in sql: 139 connection.execute(text(sql_statement)) 140 141 @final 142 @property 143 def processor(self) -> SqlProcessorBase: 144 """Return the SQL processor instance.""" 145 return self._read_processor 146 147 def get_record_processor( 148 self, 149 source_name: str, 150 catalog_provider: CatalogProvider, 151 state_writer: StateWriterBase | None = None, 152 ) -> SqlProcessorBase: 153 """Return a record processor for the specified source name and catalog. 154 155 We first register the source and its catalog with the catalog manager. Then we create a new 156 SQL processor instance with (only) the given input catalog. 157 158 For the state writer, we use a state writer which stores state in an internal SQL table. 159 """ 160 # First register the source and catalog into durable storage. This is necessary to ensure 161 # that we can later retrieve the catalog information. 162 self.register_source( 163 source_name=source_name, 164 incoming_source_catalog=catalog_provider.configured_catalog, 165 stream_names=set(catalog_provider.stream_names), 166 ) 167 168 # Next create a new SQL processor instance with the given catalog - and a state writer 169 # that writes state to the internal SQL table and associates with the given source name. 170 return self._sql_processor_class( 171 sql_config=self, 172 catalog_provider=catalog_provider, 173 state_writer=state_writer or self.get_state_writer(source_name=source_name), 174 temp_dir=self.cache_dir, 175 temp_file_cleanup=self.cleanup, 176 ) 177 178 # Read methods: 179 180 def get_records( 181 self, 182 stream_name: str, 183 ) -> CachedDataset: 184 """Uses SQLAlchemy to select all rows from the table.""" 185 return CachedDataset(self, stream_name) 186 187 def get_pandas_dataframe( 188 self, 189 stream_name: str, 190 ) -> pd.DataFrame: 191 """Return a Pandas data frame with the stream's data.""" 192 table_name = self._read_processor.get_sql_table_name(stream_name) 193 engine = self.get_sql_engine() 194 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 195 196 def get_arrow_dataset( 197 self, 198 stream_name: str, 199 *, 200 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 201 ) -> ds.Dataset: 202 """Return an Arrow Dataset with the stream's data.""" 203 table_name = self._read_processor.get_sql_table_name(stream_name) 204 engine = self.get_sql_engine() 205 206 # Read the table in chunks to handle large tables which does not fits in memory 207 pandas_chunks = pd.read_sql_table( 208 table_name=table_name, 209 con=engine, 210 schema=self.schema_name, 211 chunksize=max_chunk_size, 212 ) 213 214 arrow_batches_list = [] 215 arrow_schema = None 216 217 for pandas_chunk in pandas_chunks: 218 if arrow_schema is None: 219 # Initialize the schema with the first chunk 220 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 221 222 # Convert each pandas chunk to an Arrow Table 223 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 224 arrow_batches_list.append(arrow_table) 225 226 return ds.dataset(arrow_batches_list) 227 228 @final 229 @property 230 def streams(self) -> dict[str, CachedDataset]: 231 """Return a temporary table name.""" 232 result = {} 233 stream_names = set(self._catalog_backend.stream_names) 234 235 for stream_name in stream_names: 236 result[stream_name] = CachedDataset(self, stream_name) 237 238 return result 239 240 @final 241 def __len__(self) -> int: 242 """Gets the number of streams.""" 243 return len(self._catalog_backend.stream_names) 244 245 @final 246 def __bool__(self) -> bool: 247 """Always True. 248 249 This is needed so that caches with zero streams are not falsey (None-like). 250 """ 251 return True 252 253 def get_state_provider( 254 self, 255 source_name: str, 256 *, 257 refresh: bool = True, 258 destination_name: str | None = None, 259 ) -> StateProviderBase: 260 """Return a state provider for the specified source name.""" 261 return self._state_backend.get_state_provider( 262 source_name=source_name, 263 table_prefix=self.table_prefix or "", 264 refresh=refresh, 265 destination_name=destination_name, 266 ) 267 268 def get_state_writer( 269 self, 270 source_name: str, 271 destination_name: str | None = None, 272 ) -> StateWriterBase: 273 """Return a state writer for the specified source name. 274 275 If syncing to the cache, `destination_name` should be `None`. 276 If syncing to a destination, `destination_name` should be the destination name. 277 """ 278 return self._state_backend.get_state_writer( 279 source_name=source_name, 280 destination_name=destination_name, 281 ) 282 283 def register_source( 284 self, 285 source_name: str, 286 incoming_source_catalog: ConfiguredAirbyteCatalog, 287 stream_names: set[str], 288 ) -> None: 289 """Register the source name and catalog.""" 290 self._catalog_backend.register_source( 291 source_name=source_name, 292 incoming_source_catalog=incoming_source_catalog, 293 incoming_stream_names=stream_names, 294 ) 295 296 def __getitem__(self, stream: str) -> CachedDataset: 297 """Return a dataset by stream name.""" 298 return self.streams[stream] 299 300 def __contains__(self, stream: str) -> bool: 301 """Return whether a stream is in the cache.""" 302 return stream in (self._catalog_backend.stream_names) 303 304 def __iter__( # type: ignore [override] # Overriding Pydantic model method 305 self, 306 ) -> Iterator[tuple[str, Any]]: 307 """Iterate over the streams in the cache.""" 308 return ((name, dataset) for name, dataset in self.streams.items()) 309 310 def _write_airbyte_message_stream( 311 self, 312 stdin: IO[str] | AirbyteMessageIterator, 313 *, 314 catalog_provider: CatalogProvider, 315 write_strategy: WriteStrategy, 316 state_writer: StateWriterBase | None = None, 317 progress_tracker: ProgressTracker, 318 ) -> None: 319 """Read from the connector and write to the cache.""" 320 cache_processor = self.get_record_processor( 321 source_name=self.name, 322 catalog_provider=catalog_provider, 323 state_writer=state_writer, 324 ) 325 cache_processor.process_airbyte_messages( 326 messages=stdin, 327 write_strategy=write_strategy, 328 progress_tracker=progress_tracker, 329 ) 330 progress_tracker.log_cache_processing_complete()
41class CacheBase(SqlConfig, AirbyteWriterInterface): 42 """Base configuration for a cache. 43 44 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 45 and basic connectivity to the SQL database. 46 47 The cache is responsible for managing the state of the data synced to the cache, including the 48 stream catalog and stream state. The cache also provides the mechanism to read and write data 49 to the SQL backend specified in the `SqlConfig` class. 50 """ 51 52 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 53 """The directory to store the cache in.""" 54 55 cleanup: bool = TEMP_FILE_CLEANUP 56 """Whether to clean up the cache after use.""" 57 58 _name: str = PrivateAttr() 59 60 _sql_processor_class: ClassVar[type[SqlProcessorBase]] 61 _read_processor: SqlProcessorBase = PrivateAttr() 62 63 _catalog_backend: CatalogBackendBase = PrivateAttr() 64 _state_backend: StateBackendBase = PrivateAttr() 65 66 paired_destination_name: ClassVar[str | None] = None 67 paired_destination_config_class: ClassVar[type | None] = None 68 69 @property 70 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 71 """Return a dictionary of destination configuration values.""" 72 raise NotImplementedError( 73 f"The type '{type(self).__name__}' does not define an equivalent destination " 74 "configuration." 75 ) 76 77 def __init__(self, **data: Any) -> None: # noqa: ANN401 78 """Initialize the cache and backends.""" 79 super().__init__(**data) 80 81 # Create a temporary processor to do the work of ensuring the schema exists 82 temp_processor = self._sql_processor_class( 83 sql_config=self, 84 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 85 state_writer=StdOutStateWriter(), 86 temp_dir=self.cache_dir, 87 temp_file_cleanup=self.cleanup, 88 ) 89 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 90 91 # Initialize the catalog and state backends 92 self._catalog_backend = SqlCatalogBackend( 93 engine=self.get_sql_engine(), 94 table_prefix=self.table_prefix or "", 95 ) 96 self._state_backend = SqlStateBackend( 97 engine=self.get_sql_engine(), 98 table_prefix=self.table_prefix or "", 99 ) 100 101 # Now we can create the SQL read processor 102 self._read_processor = self._sql_processor_class( 103 sql_config=self, 104 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 105 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 106 temp_dir=self.cache_dir, 107 temp_file_cleanup=self.cleanup, 108 ) 109 110 @property 111 def config_hash(self) -> str | None: 112 """Return a hash of the cache configuration. 113 114 This is the same as the SQLConfig hash from the superclass. 115 """ 116 return super(SqlConfig, self).config_hash 117 118 def execute_sql(self, sql: str | list[str]) -> None: 119 """Execute one or more SQL statements against the cache's SQL backend. 120 121 If multiple SQL statements are given, they are executed in order, 122 within the same transaction. 123 124 This method is useful for creating tables, indexes, and other 125 schema objects in the cache. It does not return any results and it 126 automatically closes the connection after executing all statements. 127 128 This method is not intended for querying data. For that, use the `get_records` 129 method - or for a low-level interface, use the `get_sql_engine` method. 130 131 If any of the statements fail, the transaction is canceled and an exception 132 is raised. Most databases will rollback the transaction in this case. 133 """ 134 if isinstance(sql, str): 135 # Coerce to a list if a single string is given 136 sql = [sql] 137 138 with self.processor.get_sql_connection() as connection: 139 for sql_statement in sql: 140 connection.execute(text(sql_statement)) 141 142 @final 143 @property 144 def processor(self) -> SqlProcessorBase: 145 """Return the SQL processor instance.""" 146 return self._read_processor 147 148 def get_record_processor( 149 self, 150 source_name: str, 151 catalog_provider: CatalogProvider, 152 state_writer: StateWriterBase | None = None, 153 ) -> SqlProcessorBase: 154 """Return a record processor for the specified source name and catalog. 155 156 We first register the source and its catalog with the catalog manager. Then we create a new 157 SQL processor instance with (only) the given input catalog. 158 159 For the state writer, we use a state writer which stores state in an internal SQL table. 160 """ 161 # First register the source and catalog into durable storage. This is necessary to ensure 162 # that we can later retrieve the catalog information. 163 self.register_source( 164 source_name=source_name, 165 incoming_source_catalog=catalog_provider.configured_catalog, 166 stream_names=set(catalog_provider.stream_names), 167 ) 168 169 # Next create a new SQL processor instance with the given catalog - and a state writer 170 # that writes state to the internal SQL table and associates with the given source name. 171 return self._sql_processor_class( 172 sql_config=self, 173 catalog_provider=catalog_provider, 174 state_writer=state_writer or self.get_state_writer(source_name=source_name), 175 temp_dir=self.cache_dir, 176 temp_file_cleanup=self.cleanup, 177 ) 178 179 # Read methods: 180 181 def get_records( 182 self, 183 stream_name: str, 184 ) -> CachedDataset: 185 """Uses SQLAlchemy to select all rows from the table.""" 186 return CachedDataset(self, stream_name) 187 188 def get_pandas_dataframe( 189 self, 190 stream_name: str, 191 ) -> pd.DataFrame: 192 """Return a Pandas data frame with the stream's data.""" 193 table_name = self._read_processor.get_sql_table_name(stream_name) 194 engine = self.get_sql_engine() 195 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 196 197 def get_arrow_dataset( 198 self, 199 stream_name: str, 200 *, 201 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 202 ) -> ds.Dataset: 203 """Return an Arrow Dataset with the stream's data.""" 204 table_name = self._read_processor.get_sql_table_name(stream_name) 205 engine = self.get_sql_engine() 206 207 # Read the table in chunks to handle large tables which does not fits in memory 208 pandas_chunks = pd.read_sql_table( 209 table_name=table_name, 210 con=engine, 211 schema=self.schema_name, 212 chunksize=max_chunk_size, 213 ) 214 215 arrow_batches_list = [] 216 arrow_schema = None 217 218 for pandas_chunk in pandas_chunks: 219 if arrow_schema is None: 220 # Initialize the schema with the first chunk 221 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 222 223 # Convert each pandas chunk to an Arrow Table 224 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 225 arrow_batches_list.append(arrow_table) 226 227 return ds.dataset(arrow_batches_list) 228 229 @final 230 @property 231 def streams(self) -> dict[str, CachedDataset]: 232 """Return a temporary table name.""" 233 result = {} 234 stream_names = set(self._catalog_backend.stream_names) 235 236 for stream_name in stream_names: 237 result[stream_name] = CachedDataset(self, stream_name) 238 239 return result 240 241 @final 242 def __len__(self) -> int: 243 """Gets the number of streams.""" 244 return len(self._catalog_backend.stream_names) 245 246 @final 247 def __bool__(self) -> bool: 248 """Always True. 249 250 This is needed so that caches with zero streams are not falsey (None-like). 251 """ 252 return True 253 254 def get_state_provider( 255 self, 256 source_name: str, 257 *, 258 refresh: bool = True, 259 destination_name: str | None = None, 260 ) -> StateProviderBase: 261 """Return a state provider for the specified source name.""" 262 return self._state_backend.get_state_provider( 263 source_name=source_name, 264 table_prefix=self.table_prefix or "", 265 refresh=refresh, 266 destination_name=destination_name, 267 ) 268 269 def get_state_writer( 270 self, 271 source_name: str, 272 destination_name: str | None = None, 273 ) -> StateWriterBase: 274 """Return a state writer for the specified source name. 275 276 If syncing to the cache, `destination_name` should be `None`. 277 If syncing to a destination, `destination_name` should be the destination name. 278 """ 279 return self._state_backend.get_state_writer( 280 source_name=source_name, 281 destination_name=destination_name, 282 ) 283 284 def register_source( 285 self, 286 source_name: str, 287 incoming_source_catalog: ConfiguredAirbyteCatalog, 288 stream_names: set[str], 289 ) -> None: 290 """Register the source name and catalog.""" 291 self._catalog_backend.register_source( 292 source_name=source_name, 293 incoming_source_catalog=incoming_source_catalog, 294 incoming_stream_names=stream_names, 295 ) 296 297 def __getitem__(self, stream: str) -> CachedDataset: 298 """Return a dataset by stream name.""" 299 return self.streams[stream] 300 301 def __contains__(self, stream: str) -> bool: 302 """Return whether a stream is in the cache.""" 303 return stream in (self._catalog_backend.stream_names) 304 305 def __iter__( # type: ignore [override] # Overriding Pydantic model method 306 self, 307 ) -> Iterator[tuple[str, Any]]: 308 """Iterate over the streams in the cache.""" 309 return ((name, dataset) for name, dataset in self.streams.items()) 310 311 def _write_airbyte_message_stream( 312 self, 313 stdin: IO[str] | AirbyteMessageIterator, 314 *, 315 catalog_provider: CatalogProvider, 316 write_strategy: WriteStrategy, 317 state_writer: StateWriterBase | None = None, 318 progress_tracker: ProgressTracker, 319 ) -> None: 320 """Read from the connector and write to the cache.""" 321 cache_processor = self.get_record_processor( 322 source_name=self.name, 323 catalog_provider=catalog_provider, 324 state_writer=state_writer, 325 ) 326 cache_processor.process_airbyte_messages( 327 messages=stdin, 328 write_strategy=write_strategy, 329 progress_tracker=progress_tracker, 330 ) 331 progress_tracker.log_cache_processing_complete()
Base configuration for a cache.
Caches inherit from the matching SqlConfig
class, which provides the SQL config settings
and basic connectivity to the SQL database.
The cache is responsible for managing the state of the data synced to the cache, including the
stream catalog and stream state. The cache also provides the mechanism to read and write data
to the SQL backend specified in the SqlConfig
class.
77 def __init__(self, **data: Any) -> None: # noqa: ANN401 78 """Initialize the cache and backends.""" 79 super().__init__(**data) 80 81 # Create a temporary processor to do the work of ensuring the schema exists 82 temp_processor = self._sql_processor_class( 83 sql_config=self, 84 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 85 state_writer=StdOutStateWriter(), 86 temp_dir=self.cache_dir, 87 temp_file_cleanup=self.cleanup, 88 ) 89 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 90 91 # Initialize the catalog and state backends 92 self._catalog_backend = SqlCatalogBackend( 93 engine=self.get_sql_engine(), 94 table_prefix=self.table_prefix or "", 95 ) 96 self._state_backend = SqlStateBackend( 97 engine=self.get_sql_engine(), 98 table_prefix=self.table_prefix or "", 99 ) 100 101 # Now we can create the SQL read processor 102 self._read_processor = self._sql_processor_class( 103 sql_config=self, 104 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 105 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 106 temp_dir=self.cache_dir, 107 temp_file_cleanup=self.cleanup, 108 )
Initialize the cache and backends.
69 @property 70 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 71 """Return a dictionary of destination configuration values.""" 72 raise NotImplementedError( 73 f"The type '{type(self).__name__}' does not define an equivalent destination " 74 "configuration." 75 )
Return a dictionary of destination configuration values.
110 @property 111 def config_hash(self) -> str | None: 112 """Return a hash of the cache configuration. 113 114 This is the same as the SQLConfig hash from the superclass. 115 """ 116 return super(SqlConfig, self).config_hash
Return a hash of the cache configuration.
This is the same as the SQLConfig hash from the superclass.
118 def execute_sql(self, sql: str | list[str]) -> None: 119 """Execute one or more SQL statements against the cache's SQL backend. 120 121 If multiple SQL statements are given, they are executed in order, 122 within the same transaction. 123 124 This method is useful for creating tables, indexes, and other 125 schema objects in the cache. It does not return any results and it 126 automatically closes the connection after executing all statements. 127 128 This method is not intended for querying data. For that, use the `get_records` 129 method - or for a low-level interface, use the `get_sql_engine` method. 130 131 If any of the statements fail, the transaction is canceled and an exception 132 is raised. Most databases will rollback the transaction in this case. 133 """ 134 if isinstance(sql, str): 135 # Coerce to a list if a single string is given 136 sql = [sql] 137 138 with self.processor.get_sql_connection() as connection: 139 for sql_statement in sql: 140 connection.execute(text(sql_statement))
Execute one or more SQL statements against the cache's SQL backend.
If multiple SQL statements are given, they are executed in order, within the same transaction.
This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.
This method is not intended for querying data. For that, use the get_records
method - or for a low-level interface, use the get_sql_engine
method.
If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.
142 @final 143 @property 144 def processor(self) -> SqlProcessorBase: 145 """Return the SQL processor instance.""" 146 return self._read_processor
Return the SQL processor instance.
148 def get_record_processor( 149 self, 150 source_name: str, 151 catalog_provider: CatalogProvider, 152 state_writer: StateWriterBase | None = None, 153 ) -> SqlProcessorBase: 154 """Return a record processor for the specified source name and catalog. 155 156 We first register the source and its catalog with the catalog manager. Then we create a new 157 SQL processor instance with (only) the given input catalog. 158 159 For the state writer, we use a state writer which stores state in an internal SQL table. 160 """ 161 # First register the source and catalog into durable storage. This is necessary to ensure 162 # that we can later retrieve the catalog information. 163 self.register_source( 164 source_name=source_name, 165 incoming_source_catalog=catalog_provider.configured_catalog, 166 stream_names=set(catalog_provider.stream_names), 167 ) 168 169 # Next create a new SQL processor instance with the given catalog - and a state writer 170 # that writes state to the internal SQL table and associates with the given source name. 171 return self._sql_processor_class( 172 sql_config=self, 173 catalog_provider=catalog_provider, 174 state_writer=state_writer or self.get_state_writer(source_name=source_name), 175 temp_dir=self.cache_dir, 176 temp_file_cleanup=self.cleanup, 177 )
Return a record processor for the specified source name and catalog.
We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.
For the state writer, we use a state writer which stores state in an internal SQL table.
181 def get_records( 182 self, 183 stream_name: str, 184 ) -> CachedDataset: 185 """Uses SQLAlchemy to select all rows from the table.""" 186 return CachedDataset(self, stream_name)
Uses SQLAlchemy to select all rows from the table.
188 def get_pandas_dataframe( 189 self, 190 stream_name: str, 191 ) -> pd.DataFrame: 192 """Return a Pandas data frame with the stream's data.""" 193 table_name = self._read_processor.get_sql_table_name(stream_name) 194 engine = self.get_sql_engine() 195 return pd.read_sql_table(table_name, engine, schema=self.schema_name)
Return a Pandas data frame with the stream's data.
197 def get_arrow_dataset( 198 self, 199 stream_name: str, 200 *, 201 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 202 ) -> ds.Dataset: 203 """Return an Arrow Dataset with the stream's data.""" 204 table_name = self._read_processor.get_sql_table_name(stream_name) 205 engine = self.get_sql_engine() 206 207 # Read the table in chunks to handle large tables which does not fits in memory 208 pandas_chunks = pd.read_sql_table( 209 table_name=table_name, 210 con=engine, 211 schema=self.schema_name, 212 chunksize=max_chunk_size, 213 ) 214 215 arrow_batches_list = [] 216 arrow_schema = None 217 218 for pandas_chunk in pandas_chunks: 219 if arrow_schema is None: 220 # Initialize the schema with the first chunk 221 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 222 223 # Convert each pandas chunk to an Arrow Table 224 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 225 arrow_batches_list.append(arrow_table) 226 227 return ds.dataset(arrow_batches_list)
Return an Arrow Dataset with the stream's data.
229 @final 230 @property 231 def streams(self) -> dict[str, CachedDataset]: 232 """Return a temporary table name.""" 233 result = {} 234 stream_names = set(self._catalog_backend.stream_names) 235 236 for stream_name in stream_names: 237 result[stream_name] = CachedDataset(self, stream_name) 238 239 return result
Return a temporary table name.
254 def get_state_provider( 255 self, 256 source_name: str, 257 *, 258 refresh: bool = True, 259 destination_name: str | None = None, 260 ) -> StateProviderBase: 261 """Return a state provider for the specified source name.""" 262 return self._state_backend.get_state_provider( 263 source_name=source_name, 264 table_prefix=self.table_prefix or "", 265 refresh=refresh, 266 destination_name=destination_name, 267 )
Return a state provider for the specified source name.
269 def get_state_writer( 270 self, 271 source_name: str, 272 destination_name: str | None = None, 273 ) -> StateWriterBase: 274 """Return a state writer for the specified source name. 275 276 If syncing to the cache, `destination_name` should be `None`. 277 If syncing to a destination, `destination_name` should be the destination name. 278 """ 279 return self._state_backend.get_state_writer( 280 source_name=source_name, 281 destination_name=destination_name, 282 )
Return a state writer for the specified source name.
If syncing to the cache, destination_name
should be None
.
If syncing to a destination, destination_name
should be the destination name.
284 def register_source( 285 self, 286 source_name: str, 287 incoming_source_catalog: ConfiguredAirbyteCatalog, 288 stream_names: set[str], 289 ) -> None: 290 """Register the source name and catalog.""" 291 self._catalog_backend.register_source( 292 source_name=source_name, 293 incoming_source_catalog=incoming_source_catalog, 294 incoming_stream_names=stream_names, 295 )
Register the source name and catalog.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
384def init_private_attributes(self: BaseModel, context: Any, /) -> None: 385 """This function is meant to behave like a BaseModel method to initialise private attributes. 386 387 It takes context as an argument since that's what pydantic-core passes when calling it. 388 389 Args: 390 self: The BaseModel instance. 391 context: The context. 392 """ 393 if getattr(self, '__pydantic_private__', None) is None: 394 pydantic_private = {} 395 for name, private_attr in self.__private_attributes__.items(): 396 default = private_attr.get_default() 397 if default is not PydanticUndefined: 398 pydantic_private[name] = default 399 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_sql_alchemy_url
- get_database_name
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name