airbyte.caches.base
SQL Cache implementation.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""SQL Cache implementation.""" 3 4from __future__ import annotations 5 6from pathlib import Path 7from typing import IO, TYPE_CHECKING, Any, final 8 9import pandas as pd 10import pyarrow as pa 11import pyarrow.dataset as ds 12from pydantic import Field, PrivateAttr 13from sqlalchemy import text 14 15from airbyte_protocol.models import ConfiguredAirbyteCatalog 16 17from airbyte import constants 18from airbyte._writers.base import AirbyteWriterInterface 19from airbyte.caches._catalog_backend import CatalogBackendBase, SqlCatalogBackend 20from airbyte.caches._state_backend import SqlStateBackend 21from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP 22from airbyte.datasets._sql import CachedDataset 23from airbyte.shared.catalog_providers import CatalogProvider 24from airbyte.shared.sql_processor import SqlConfig 25from airbyte.shared.state_writers import StdOutStateWriter 26 27 28if TYPE_CHECKING: 29 from collections.abc import Iterator 30 31 from airbyte._message_iterators import AirbyteMessageIterator 32 from airbyte.caches._state_backend_base import StateBackendBase 33 from airbyte.datasets._base import DatasetBase 34 from airbyte.progress import ProgressTracker 35 from airbyte.shared.sql_processor import SqlProcessorBase 36 from airbyte.shared.state_providers import StateProviderBase 37 from airbyte.shared.state_writers import StateWriterBase 38 from airbyte.strategies import WriteStrategy 39 40 41class CacheBase(SqlConfig, AirbyteWriterInterface): 42 """Base configuration for a cache. 43 44 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 45 and basic connectivity to the SQL database. 46 47 The cache is responsible for managing the state of the data synced to the cache, including the 48 stream catalog and stream state. The cache also provides the mechanism to read and write data 49 to the SQL backend specified in the `SqlConfig` class. 50 """ 51 52 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 53 """The directory to store the cache in.""" 54 55 cleanup: bool = TEMP_FILE_CLEANUP 56 """Whether to clean up the cache after use.""" 57 58 _name: str = PrivateAttr() 59 60 _deployed_api_root: str | None = PrivateAttr(default=None) 61 _deployed_workspace_id: str | None = PrivateAttr(default=None) 62 _deployed_destination_id: str | None = PrivateAttr(default=None) 63 64 _sql_processor_class: type[SqlProcessorBase] = PrivateAttr() 65 _read_processor: SqlProcessorBase = PrivateAttr() 66 67 _catalog_backend: CatalogBackendBase = PrivateAttr() 68 _state_backend: StateBackendBase = PrivateAttr() 69 70 def __init__(self, **data: Any) -> None: # noqa: ANN401 71 """Initialize the cache and backends.""" 72 super().__init__(**data) 73 74 # Create a temporary processor to do the work of ensuring the schema exists 75 temp_processor = self._sql_processor_class( 76 sql_config=self, 77 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 78 state_writer=StdOutStateWriter(), 79 temp_dir=self.cache_dir, 80 temp_file_cleanup=self.cleanup, 81 ) 82 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 83 84 # Initialize the catalog and state backends 85 self._catalog_backend = SqlCatalogBackend( 86 engine=self.get_sql_engine(), 87 table_prefix=self.table_prefix or "", 88 ) 89 self._state_backend = SqlStateBackend( 90 engine=self.get_sql_engine(), 91 table_prefix=self.table_prefix or "", 92 ) 93 94 # Now we can create the SQL read processor 95 self._read_processor = self._sql_processor_class( 96 sql_config=self, 97 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 98 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 99 temp_dir=self.cache_dir, 100 temp_file_cleanup=self.cleanup, 101 ) 102 103 @property 104 def config_hash(self) -> str | None: 105 """Return a hash of the cache configuration. 106 107 This is the same as the SQLConfig hash from the superclass. 108 """ 109 return super(SqlConfig, self).config_hash 110 111 def execute_sql(self, sql: str | list[str]) -> None: 112 """Execute one or more SQL statements against the cache's SQL backend. 113 114 If multiple SQL statements are given, they are executed in order, 115 within the same transaction. 116 117 This method is useful for creating tables, indexes, and other 118 schema objects in the cache. It does not return any results and it 119 automatically closes the connection after executing all statements. 120 121 This method is not intended for querying data. For that, use the `get_records` 122 method - or for a low-level interface, use the `get_sql_engine` method. 123 124 If any of the statements fail, the transaction is canceled and an exception 125 is raised. Most databases will rollback the transaction in this case. 126 """ 127 if isinstance(sql, str): 128 # Coerce to a list if a single string is given 129 sql = [sql] 130 131 with self.processor.get_sql_connection() as connection: 132 for sql_statement in sql: 133 connection.execute(text(sql_statement)) 134 135 @final 136 @property 137 def processor(self) -> SqlProcessorBase: 138 """Return the SQL processor instance.""" 139 return self._read_processor 140 141 def get_record_processor( 142 self, 143 source_name: str, 144 catalog_provider: CatalogProvider, 145 state_writer: StateWriterBase | None = None, 146 ) -> SqlProcessorBase: 147 """Return a record processor for the specified source name and catalog. 148 149 We first register the source and its catalog with the catalog manager. Then we create a new 150 SQL processor instance with (only) the given input catalog. 151 152 For the state writer, we use a state writer which stores state in an internal SQL table. 153 """ 154 # First register the source and catalog into durable storage. This is necessary to ensure 155 # that we can later retrieve the catalog information. 156 self.register_source( 157 source_name=source_name, 158 incoming_source_catalog=catalog_provider.configured_catalog, 159 stream_names=set(catalog_provider.stream_names), 160 ) 161 162 # Next create a new SQL processor instance with the given catalog - and a state writer 163 # that writes state to the internal SQL table and associates with the given source name. 164 return self._sql_processor_class( 165 sql_config=self, 166 catalog_provider=catalog_provider, 167 state_writer=state_writer or self.get_state_writer(source_name=source_name), 168 temp_dir=self.cache_dir, 169 temp_file_cleanup=self.cleanup, 170 ) 171 172 # Read methods: 173 174 def get_records( 175 self, 176 stream_name: str, 177 ) -> CachedDataset: 178 """Uses SQLAlchemy to select all rows from the table.""" 179 return CachedDataset(self, stream_name) 180 181 def get_pandas_dataframe( 182 self, 183 stream_name: str, 184 ) -> pd.DataFrame: 185 """Return a Pandas data frame with the stream's data.""" 186 table_name = self._read_processor.get_sql_table_name(stream_name) 187 engine = self.get_sql_engine() 188 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 189 190 def get_arrow_dataset( 191 self, 192 stream_name: str, 193 *, 194 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 195 ) -> ds.Dataset: 196 """Return an Arrow Dataset with the stream's data.""" 197 table_name = self._read_processor.get_sql_table_name(stream_name) 198 engine = self.get_sql_engine() 199 200 # Read the table in chunks to handle large tables which does not fits in memory 201 pandas_chunks = pd.read_sql_table( 202 table_name=table_name, 203 con=engine, 204 schema=self.schema_name, 205 chunksize=max_chunk_size, 206 ) 207 208 arrow_batches_list = [] 209 arrow_schema = None 210 211 for pandas_chunk in pandas_chunks: 212 if arrow_schema is None: 213 # Initialize the schema with the first chunk 214 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 215 216 # Convert each pandas chunk to an Arrow Table 217 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 218 arrow_batches_list.append(arrow_table) 219 220 return ds.dataset(arrow_batches_list) 221 222 @final 223 @property 224 def streams(self) -> dict[str, CachedDataset]: 225 """Return a temporary table name.""" 226 result = {} 227 stream_names = set(self._catalog_backend.stream_names) 228 229 for stream_name in stream_names: 230 result[stream_name] = CachedDataset(self, stream_name) 231 232 return result 233 234 def get_state_provider( 235 self, 236 source_name: str, 237 *, 238 refresh: bool = True, 239 destination_name: str | None = None, 240 ) -> StateProviderBase: 241 """Return a state provider for the specified source name.""" 242 return self._state_backend.get_state_provider( 243 source_name=source_name, 244 table_prefix=self.table_prefix or "", 245 refresh=refresh, 246 destination_name=destination_name, 247 ) 248 249 def get_state_writer( 250 self, 251 source_name: str, 252 destination_name: str | None = None, 253 ) -> StateWriterBase: 254 """Return a state writer for the specified source name. 255 256 If syncing to the cache, `destination_name` should be `None`. 257 If syncing to a destination, `destination_name` should be the destination name. 258 """ 259 return self._state_backend.get_state_writer( 260 source_name=source_name, 261 destination_name=destination_name, 262 ) 263 264 def register_source( 265 self, 266 source_name: str, 267 incoming_source_catalog: ConfiguredAirbyteCatalog, 268 stream_names: set[str], 269 ) -> None: 270 """Register the source name and catalog.""" 271 self._catalog_backend.register_source( 272 source_name=source_name, 273 incoming_source_catalog=incoming_source_catalog, 274 incoming_stream_names=stream_names, 275 ) 276 277 def __getitem__(self, stream: str) -> DatasetBase: 278 """Return a dataset by stream name.""" 279 return self.streams[stream] 280 281 def __contains__(self, stream: str) -> bool: 282 """Return whether a stream is in the cache.""" 283 return stream in (self._catalog_backend.stream_names) 284 285 def __iter__( # type: ignore [override] # Overriding Pydantic model method 286 self, 287 ) -> Iterator[tuple[str, Any]]: 288 """Iterate over the streams in the cache.""" 289 return ((name, dataset) for name, dataset in self.streams.items()) 290 291 def _write_airbyte_message_stream( 292 self, 293 stdin: IO[str] | AirbyteMessageIterator, 294 *, 295 catalog_provider: CatalogProvider, 296 write_strategy: WriteStrategy, 297 state_writer: StateWriterBase | None = None, 298 progress_tracker: ProgressTracker, 299 ) -> None: 300 """Read from the connector and write to the cache.""" 301 cache_processor = self.get_record_processor( 302 source_name=self.name, 303 catalog_provider=catalog_provider, 304 state_writer=state_writer, 305 ) 306 cache_processor.process_airbyte_messages( 307 messages=stdin, 308 write_strategy=write_strategy, 309 progress_tracker=progress_tracker, 310 ) 311 progress_tracker.log_cache_processing_complete()
42class CacheBase(SqlConfig, AirbyteWriterInterface): 43 """Base configuration for a cache. 44 45 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 46 and basic connectivity to the SQL database. 47 48 The cache is responsible for managing the state of the data synced to the cache, including the 49 stream catalog and stream state. The cache also provides the mechanism to read and write data 50 to the SQL backend specified in the `SqlConfig` class. 51 """ 52 53 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 54 """The directory to store the cache in.""" 55 56 cleanup: bool = TEMP_FILE_CLEANUP 57 """Whether to clean up the cache after use.""" 58 59 _name: str = PrivateAttr() 60 61 _deployed_api_root: str | None = PrivateAttr(default=None) 62 _deployed_workspace_id: str | None = PrivateAttr(default=None) 63 _deployed_destination_id: str | None = PrivateAttr(default=None) 64 65 _sql_processor_class: type[SqlProcessorBase] = PrivateAttr() 66 _read_processor: SqlProcessorBase = PrivateAttr() 67 68 _catalog_backend: CatalogBackendBase = PrivateAttr() 69 _state_backend: StateBackendBase = PrivateAttr() 70 71 def __init__(self, **data: Any) -> None: # noqa: ANN401 72 """Initialize the cache and backends.""" 73 super().__init__(**data) 74 75 # Create a temporary processor to do the work of ensuring the schema exists 76 temp_processor = self._sql_processor_class( 77 sql_config=self, 78 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 79 state_writer=StdOutStateWriter(), 80 temp_dir=self.cache_dir, 81 temp_file_cleanup=self.cleanup, 82 ) 83 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 84 85 # Initialize the catalog and state backends 86 self._catalog_backend = SqlCatalogBackend( 87 engine=self.get_sql_engine(), 88 table_prefix=self.table_prefix or "", 89 ) 90 self._state_backend = SqlStateBackend( 91 engine=self.get_sql_engine(), 92 table_prefix=self.table_prefix or "", 93 ) 94 95 # Now we can create the SQL read processor 96 self._read_processor = self._sql_processor_class( 97 sql_config=self, 98 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 99 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 100 temp_dir=self.cache_dir, 101 temp_file_cleanup=self.cleanup, 102 ) 103 104 @property 105 def config_hash(self) -> str | None: 106 """Return a hash of the cache configuration. 107 108 This is the same as the SQLConfig hash from the superclass. 109 """ 110 return super(SqlConfig, self).config_hash 111 112 def execute_sql(self, sql: str | list[str]) -> None: 113 """Execute one or more SQL statements against the cache's SQL backend. 114 115 If multiple SQL statements are given, they are executed in order, 116 within the same transaction. 117 118 This method is useful for creating tables, indexes, and other 119 schema objects in the cache. It does not return any results and it 120 automatically closes the connection after executing all statements. 121 122 This method is not intended for querying data. For that, use the `get_records` 123 method - or for a low-level interface, use the `get_sql_engine` method. 124 125 If any of the statements fail, the transaction is canceled and an exception 126 is raised. Most databases will rollback the transaction in this case. 127 """ 128 if isinstance(sql, str): 129 # Coerce to a list if a single string is given 130 sql = [sql] 131 132 with self.processor.get_sql_connection() as connection: 133 for sql_statement in sql: 134 connection.execute(text(sql_statement)) 135 136 @final 137 @property 138 def processor(self) -> SqlProcessorBase: 139 """Return the SQL processor instance.""" 140 return self._read_processor 141 142 def get_record_processor( 143 self, 144 source_name: str, 145 catalog_provider: CatalogProvider, 146 state_writer: StateWriterBase | None = None, 147 ) -> SqlProcessorBase: 148 """Return a record processor for the specified source name and catalog. 149 150 We first register the source and its catalog with the catalog manager. Then we create a new 151 SQL processor instance with (only) the given input catalog. 152 153 For the state writer, we use a state writer which stores state in an internal SQL table. 154 """ 155 # First register the source and catalog into durable storage. This is necessary to ensure 156 # that we can later retrieve the catalog information. 157 self.register_source( 158 source_name=source_name, 159 incoming_source_catalog=catalog_provider.configured_catalog, 160 stream_names=set(catalog_provider.stream_names), 161 ) 162 163 # Next create a new SQL processor instance with the given catalog - and a state writer 164 # that writes state to the internal SQL table and associates with the given source name. 165 return self._sql_processor_class( 166 sql_config=self, 167 catalog_provider=catalog_provider, 168 state_writer=state_writer or self.get_state_writer(source_name=source_name), 169 temp_dir=self.cache_dir, 170 temp_file_cleanup=self.cleanup, 171 ) 172 173 # Read methods: 174 175 def get_records( 176 self, 177 stream_name: str, 178 ) -> CachedDataset: 179 """Uses SQLAlchemy to select all rows from the table.""" 180 return CachedDataset(self, stream_name) 181 182 def get_pandas_dataframe( 183 self, 184 stream_name: str, 185 ) -> pd.DataFrame: 186 """Return a Pandas data frame with the stream's data.""" 187 table_name = self._read_processor.get_sql_table_name(stream_name) 188 engine = self.get_sql_engine() 189 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 190 191 def get_arrow_dataset( 192 self, 193 stream_name: str, 194 *, 195 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 196 ) -> ds.Dataset: 197 """Return an Arrow Dataset with the stream's data.""" 198 table_name = self._read_processor.get_sql_table_name(stream_name) 199 engine = self.get_sql_engine() 200 201 # Read the table in chunks to handle large tables which does not fits in memory 202 pandas_chunks = pd.read_sql_table( 203 table_name=table_name, 204 con=engine, 205 schema=self.schema_name, 206 chunksize=max_chunk_size, 207 ) 208 209 arrow_batches_list = [] 210 arrow_schema = None 211 212 for pandas_chunk in pandas_chunks: 213 if arrow_schema is None: 214 # Initialize the schema with the first chunk 215 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 216 217 # Convert each pandas chunk to an Arrow Table 218 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 219 arrow_batches_list.append(arrow_table) 220 221 return ds.dataset(arrow_batches_list) 222 223 @final 224 @property 225 def streams(self) -> dict[str, CachedDataset]: 226 """Return a temporary table name.""" 227 result = {} 228 stream_names = set(self._catalog_backend.stream_names) 229 230 for stream_name in stream_names: 231 result[stream_name] = CachedDataset(self, stream_name) 232 233 return result 234 235 def get_state_provider( 236 self, 237 source_name: str, 238 *, 239 refresh: bool = True, 240 destination_name: str | None = None, 241 ) -> StateProviderBase: 242 """Return a state provider for the specified source name.""" 243 return self._state_backend.get_state_provider( 244 source_name=source_name, 245 table_prefix=self.table_prefix or "", 246 refresh=refresh, 247 destination_name=destination_name, 248 ) 249 250 def get_state_writer( 251 self, 252 source_name: str, 253 destination_name: str | None = None, 254 ) -> StateWriterBase: 255 """Return a state writer for the specified source name. 256 257 If syncing to the cache, `destination_name` should be `None`. 258 If syncing to a destination, `destination_name` should be the destination name. 259 """ 260 return self._state_backend.get_state_writer( 261 source_name=source_name, 262 destination_name=destination_name, 263 ) 264 265 def register_source( 266 self, 267 source_name: str, 268 incoming_source_catalog: ConfiguredAirbyteCatalog, 269 stream_names: set[str], 270 ) -> None: 271 """Register the source name and catalog.""" 272 self._catalog_backend.register_source( 273 source_name=source_name, 274 incoming_source_catalog=incoming_source_catalog, 275 incoming_stream_names=stream_names, 276 ) 277 278 def __getitem__(self, stream: str) -> DatasetBase: 279 """Return a dataset by stream name.""" 280 return self.streams[stream] 281 282 def __contains__(self, stream: str) -> bool: 283 """Return whether a stream is in the cache.""" 284 return stream in (self._catalog_backend.stream_names) 285 286 def __iter__( # type: ignore [override] # Overriding Pydantic model method 287 self, 288 ) -> Iterator[tuple[str, Any]]: 289 """Iterate over the streams in the cache.""" 290 return ((name, dataset) for name, dataset in self.streams.items()) 291 292 def _write_airbyte_message_stream( 293 self, 294 stdin: IO[str] | AirbyteMessageIterator, 295 *, 296 catalog_provider: CatalogProvider, 297 write_strategy: WriteStrategy, 298 state_writer: StateWriterBase | None = None, 299 progress_tracker: ProgressTracker, 300 ) -> None: 301 """Read from the connector and write to the cache.""" 302 cache_processor = self.get_record_processor( 303 source_name=self.name, 304 catalog_provider=catalog_provider, 305 state_writer=state_writer, 306 ) 307 cache_processor.process_airbyte_messages( 308 messages=stdin, 309 write_strategy=write_strategy, 310 progress_tracker=progress_tracker, 311 ) 312 progress_tracker.log_cache_processing_complete()
Base configuration for a cache.
Caches inherit from the matching SqlConfig
class, which provides the SQL config settings
and basic connectivity to the SQL database.
The cache is responsible for managing the state of the data synced to the cache, including the
stream catalog and stream state. The cache also provides the mechanism to read and write data
to the SQL backend specified in the SqlConfig
class.
71 def __init__(self, **data: Any) -> None: # noqa: ANN401 72 """Initialize the cache and backends.""" 73 super().__init__(**data) 74 75 # Create a temporary processor to do the work of ensuring the schema exists 76 temp_processor = self._sql_processor_class( 77 sql_config=self, 78 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 79 state_writer=StdOutStateWriter(), 80 temp_dir=self.cache_dir, 81 temp_file_cleanup=self.cleanup, 82 ) 83 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 84 85 # Initialize the catalog and state backends 86 self._catalog_backend = SqlCatalogBackend( 87 engine=self.get_sql_engine(), 88 table_prefix=self.table_prefix or "", 89 ) 90 self._state_backend = SqlStateBackend( 91 engine=self.get_sql_engine(), 92 table_prefix=self.table_prefix or "", 93 ) 94 95 # Now we can create the SQL read processor 96 self._read_processor = self._sql_processor_class( 97 sql_config=self, 98 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 99 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 100 temp_dir=self.cache_dir, 101 temp_file_cleanup=self.cleanup, 102 )
Initialize the cache and backends.
104 @property 105 def config_hash(self) -> str | None: 106 """Return a hash of the cache configuration. 107 108 This is the same as the SQLConfig hash from the superclass. 109 """ 110 return super(SqlConfig, self).config_hash
Return a hash of the cache configuration.
This is the same as the SQLConfig hash from the superclass.
112 def execute_sql(self, sql: str | list[str]) -> None: 113 """Execute one or more SQL statements against the cache's SQL backend. 114 115 If multiple SQL statements are given, they are executed in order, 116 within the same transaction. 117 118 This method is useful for creating tables, indexes, and other 119 schema objects in the cache. It does not return any results and it 120 automatically closes the connection after executing all statements. 121 122 This method is not intended for querying data. For that, use the `get_records` 123 method - or for a low-level interface, use the `get_sql_engine` method. 124 125 If any of the statements fail, the transaction is canceled and an exception 126 is raised. Most databases will rollback the transaction in this case. 127 """ 128 if isinstance(sql, str): 129 # Coerce to a list if a single string is given 130 sql = [sql] 131 132 with self.processor.get_sql_connection() as connection: 133 for sql_statement in sql: 134 connection.execute(text(sql_statement))
Execute one or more SQL statements against the cache's SQL backend.
If multiple SQL statements are given, they are executed in order, within the same transaction.
This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.
This method is not intended for querying data. For that, use the get_records
method - or for a low-level interface, use the get_sql_engine
method.
If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.
136 @final 137 @property 138 def processor(self) -> SqlProcessorBase: 139 """Return the SQL processor instance.""" 140 return self._read_processor
Return the SQL processor instance.
142 def get_record_processor( 143 self, 144 source_name: str, 145 catalog_provider: CatalogProvider, 146 state_writer: StateWriterBase | None = None, 147 ) -> SqlProcessorBase: 148 """Return a record processor for the specified source name and catalog. 149 150 We first register the source and its catalog with the catalog manager. Then we create a new 151 SQL processor instance with (only) the given input catalog. 152 153 For the state writer, we use a state writer which stores state in an internal SQL table. 154 """ 155 # First register the source and catalog into durable storage. This is necessary to ensure 156 # that we can later retrieve the catalog information. 157 self.register_source( 158 source_name=source_name, 159 incoming_source_catalog=catalog_provider.configured_catalog, 160 stream_names=set(catalog_provider.stream_names), 161 ) 162 163 # Next create a new SQL processor instance with the given catalog - and a state writer 164 # that writes state to the internal SQL table and associates with the given source name. 165 return self._sql_processor_class( 166 sql_config=self, 167 catalog_provider=catalog_provider, 168 state_writer=state_writer or self.get_state_writer(source_name=source_name), 169 temp_dir=self.cache_dir, 170 temp_file_cleanup=self.cleanup, 171 )
Return a record processor for the specified source name and catalog.
We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.
For the state writer, we use a state writer which stores state in an internal SQL table.
175 def get_records( 176 self, 177 stream_name: str, 178 ) -> CachedDataset: 179 """Uses SQLAlchemy to select all rows from the table.""" 180 return CachedDataset(self, stream_name)
Uses SQLAlchemy to select all rows from the table.
182 def get_pandas_dataframe( 183 self, 184 stream_name: str, 185 ) -> pd.DataFrame: 186 """Return a Pandas data frame with the stream's data.""" 187 table_name = self._read_processor.get_sql_table_name(stream_name) 188 engine = self.get_sql_engine() 189 return pd.read_sql_table(table_name, engine, schema=self.schema_name)
Return a Pandas data frame with the stream's data.
191 def get_arrow_dataset( 192 self, 193 stream_name: str, 194 *, 195 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 196 ) -> ds.Dataset: 197 """Return an Arrow Dataset with the stream's data.""" 198 table_name = self._read_processor.get_sql_table_name(stream_name) 199 engine = self.get_sql_engine() 200 201 # Read the table in chunks to handle large tables which does not fits in memory 202 pandas_chunks = pd.read_sql_table( 203 table_name=table_name, 204 con=engine, 205 schema=self.schema_name, 206 chunksize=max_chunk_size, 207 ) 208 209 arrow_batches_list = [] 210 arrow_schema = None 211 212 for pandas_chunk in pandas_chunks: 213 if arrow_schema is None: 214 # Initialize the schema with the first chunk 215 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 216 217 # Convert each pandas chunk to an Arrow Table 218 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 219 arrow_batches_list.append(arrow_table) 220 221 return ds.dataset(arrow_batches_list)
Return an Arrow Dataset with the stream's data.
223 @final 224 @property 225 def streams(self) -> dict[str, CachedDataset]: 226 """Return a temporary table name.""" 227 result = {} 228 stream_names = set(self._catalog_backend.stream_names) 229 230 for stream_name in stream_names: 231 result[stream_name] = CachedDataset(self, stream_name) 232 233 return result
Return a temporary table name.
235 def get_state_provider( 236 self, 237 source_name: str, 238 *, 239 refresh: bool = True, 240 destination_name: str | None = None, 241 ) -> StateProviderBase: 242 """Return a state provider for the specified source name.""" 243 return self._state_backend.get_state_provider( 244 source_name=source_name, 245 table_prefix=self.table_prefix or "", 246 refresh=refresh, 247 destination_name=destination_name, 248 )
Return a state provider for the specified source name.
250 def get_state_writer( 251 self, 252 source_name: str, 253 destination_name: str | None = None, 254 ) -> StateWriterBase: 255 """Return a state writer for the specified source name. 256 257 If syncing to the cache, `destination_name` should be `None`. 258 If syncing to a destination, `destination_name` should be the destination name. 259 """ 260 return self._state_backend.get_state_writer( 261 source_name=source_name, 262 destination_name=destination_name, 263 )
Return a state writer for the specified source name.
If syncing to the cache, destination_name
should be None
.
If syncing to a destination, destination_name
should be the destination name.
265 def register_source( 266 self, 267 source_name: str, 268 incoming_source_catalog: ConfiguredAirbyteCatalog, 269 stream_names: set[str], 270 ) -> None: 271 """Register the source name and catalog.""" 272 self._catalog_backend.register_source( 273 source_name=source_name, 274 incoming_source_catalog=incoming_source_catalog, 275 incoming_stream_names=stream_names, 276 )
Register the source name and catalog.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
306def init_private_attributes(self: BaseModel, context: Any, /) -> None: 307 """This function is meant to behave like a BaseModel method to initialise private attributes. 308 309 It takes context as an argument since that's what pydantic-core passes when calling it. 310 311 Args: 312 self: The BaseModel instance. 313 context: The context. 314 """ 315 if getattr(self, '__pydantic_private__', None) is None: 316 pydantic_private = {} 317 for name, private_attr in self.__private_attributes__.items(): 318 default = private_attr.get_default() 319 if default is not PydanticUndefined: 320 pydantic_private[name] = default 321 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_sql_alchemy_url
- get_database_name
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name