airbyte.caches
Base module for all caches.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base module for all caches.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING 7 8from airbyte.caches.base import CacheBase 9from airbyte.caches.bigquery import BigQueryCache 10from airbyte.caches.duckdb import DuckDBCache 11from airbyte.caches.motherduck import MotherDuckCache 12from airbyte.caches.postgres import PostgresCache 13from airbyte.caches.snowflake import SnowflakeCache 14from airbyte.caches.util import get_default_cache, new_local_cache 15 16 17# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 18if TYPE_CHECKING: 19 # ruff: noqa: TC004 20 from airbyte.caches import base, bigquery, duckdb, motherduck, postgres, snowflake, util 21 22# We export these classes for easy access: `airbyte.caches...` 23__all__ = [ 24 # Factories 25 "get_default_cache", 26 "new_local_cache", 27 # Classes 28 "BigQueryCache", 29 "CacheBase", 30 "DuckDBCache", 31 "MotherDuckCache", 32 "PostgresCache", 33 "SnowflakeCache", 34 # Submodules, 35 "util", 36 "bigquery", 37 "duckdb", 38 "motherduck", 39 "postgres", 40 "snowflake", 41 "base", 42]
27def get_default_cache() -> DuckDBCache: 28 """Get a local cache for storing data, using the default database path. 29 30 Cache files are stored in the `.cache` directory, relative to the current 31 working directory. 32 """ 33 cache_dir = Path("./.cache/default_cache") 34 return DuckDBCache( 35 db_path=cache_dir / "default_cache.duckdb", 36 cache_dir=cache_dir, 37 )
Get a local cache for storing data, using the default database path.
Cache files are stored in the .cache
directory, relative to the current
working directory.
40def new_local_cache( 41 cache_name: str | None = None, 42 cache_dir: str | Path | None = None, 43 *, 44 cleanup: bool = True, 45) -> DuckDBCache: 46 """Get a local cache for storing data, using a name string to seed the path. 47 48 Args: 49 cache_name: Name to use for the cache. Defaults to None. 50 cache_dir: Root directory to store the cache in. Defaults to None. 51 cleanup: Whether to clean up temporary files. Defaults to True. 52 53 Cache files are stored in the `.cache` directory, relative to the current 54 working directory. 55 """ 56 if cache_name: 57 if " " in cache_name: 58 raise exc.PyAirbyteInputError( 59 message="Cache name cannot contain spaces.", 60 input_value=cache_name, 61 ) 62 63 if not cache_name.replace("_", "").isalnum(): 64 raise exc.PyAirbyteInputError( 65 message="Cache name can only contain alphanumeric characters and underscores.", 66 input_value=cache_name, 67 ) 68 69 cache_name = cache_name or str(ulid.ULID()) 70 cache_dir = cache_dir or Path(f"./.cache/{cache_name}") 71 if not isinstance(cache_dir, Path): 72 cache_dir = Path(cache_dir) 73 74 return DuckDBCache( 75 db_path=cache_dir / f"db_{cache_name}.duckdb", 76 cache_dir=cache_dir, 77 cleanup=cleanup, 78 )
Get a local cache for storing data, using a name string to seed the path.
Arguments:
- cache_name: Name to use for the cache. Defaults to None.
- cache_dir: Root directory to store the cache in. Defaults to None.
- cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the .cache
directory, relative to the current
working directory.
39class BigQueryCache(BigQueryConfig, CacheBase): 40 """The BigQuery cache implementation.""" 41 42 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor 43 44 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 45 paired_destination_config_class: ClassVar[type | None] = DestinationBigquery 46 47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self) 51 52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
The BigQuery cache implementation.
47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table
.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.bigquery.BigQueryConfig
- database_name
- schema_name
- credentials_path
- project_name
- dataset_name
- get_sql_alchemy_url
- get_database_name
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
42class CacheBase(SqlConfig, AirbyteWriterInterface): 43 """Base configuration for a cache. 44 45 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 46 and basic connectivity to the SQL database. 47 48 The cache is responsible for managing the state of the data synced to the cache, including the 49 stream catalog and stream state. The cache also provides the mechanism to read and write data 50 to the SQL backend specified in the `SqlConfig` class. 51 """ 52 53 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 54 """The directory to store the cache in.""" 55 56 cleanup: bool = TEMP_FILE_CLEANUP 57 """Whether to clean up the cache after use.""" 58 59 _name: str = PrivateAttr() 60 61 _sql_processor_class: ClassVar[type[SqlProcessorBase]] 62 _read_processor: SqlProcessorBase = PrivateAttr() 63 64 _catalog_backend: CatalogBackendBase = PrivateAttr() 65 _state_backend: StateBackendBase = PrivateAttr() 66 67 paired_destination_name: ClassVar[str | None] = None 68 paired_destination_config_class: ClassVar[type | None] = None 69 70 @property 71 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 72 """Return a dictionary of destination configuration values.""" 73 raise NotImplementedError( 74 f"The type '{type(self).__name__}' does not define an equivalent destination " 75 "configuration." 76 ) 77 78 def __init__(self, **data: Any) -> None: # noqa: ANN401 79 """Initialize the cache and backends.""" 80 super().__init__(**data) 81 82 # Create a temporary processor to do the work of ensuring the schema exists 83 temp_processor = self._sql_processor_class( 84 sql_config=self, 85 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 86 state_writer=StdOutStateWriter(), 87 temp_dir=self.cache_dir, 88 temp_file_cleanup=self.cleanup, 89 ) 90 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 91 92 # Initialize the catalog and state backends 93 self._catalog_backend = SqlCatalogBackend( 94 engine=self.get_sql_engine(), 95 table_prefix=self.table_prefix or "", 96 ) 97 self._state_backend = SqlStateBackend( 98 engine=self.get_sql_engine(), 99 table_prefix=self.table_prefix or "", 100 ) 101 102 # Now we can create the SQL read processor 103 self._read_processor = self._sql_processor_class( 104 sql_config=self, 105 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 106 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 107 temp_dir=self.cache_dir, 108 temp_file_cleanup=self.cleanup, 109 ) 110 111 @property 112 def config_hash(self) -> str | None: 113 """Return a hash of the cache configuration. 114 115 This is the same as the SQLConfig hash from the superclass. 116 """ 117 return super(SqlConfig, self).config_hash 118 119 def execute_sql(self, sql: str | list[str]) -> None: 120 """Execute one or more SQL statements against the cache's SQL backend. 121 122 If multiple SQL statements are given, they are executed in order, 123 within the same transaction. 124 125 This method is useful for creating tables, indexes, and other 126 schema objects in the cache. It does not return any results and it 127 automatically closes the connection after executing all statements. 128 129 This method is not intended for querying data. For that, use the `get_records` 130 method - or for a low-level interface, use the `get_sql_engine` method. 131 132 If any of the statements fail, the transaction is canceled and an exception 133 is raised. Most databases will rollback the transaction in this case. 134 """ 135 if isinstance(sql, str): 136 # Coerce to a list if a single string is given 137 sql = [sql] 138 139 with self.processor.get_sql_connection() as connection: 140 for sql_statement in sql: 141 connection.execute(text(sql_statement)) 142 143 @final 144 @property 145 def processor(self) -> SqlProcessorBase: 146 """Return the SQL processor instance.""" 147 return self._read_processor 148 149 def get_record_processor( 150 self, 151 source_name: str, 152 catalog_provider: CatalogProvider, 153 state_writer: StateWriterBase | None = None, 154 ) -> SqlProcessorBase: 155 """Return a record processor for the specified source name and catalog. 156 157 We first register the source and its catalog with the catalog manager. Then we create a new 158 SQL processor instance with (only) the given input catalog. 159 160 For the state writer, we use a state writer which stores state in an internal SQL table. 161 """ 162 # First register the source and catalog into durable storage. This is necessary to ensure 163 # that we can later retrieve the catalog information. 164 self.register_source( 165 source_name=source_name, 166 incoming_source_catalog=catalog_provider.configured_catalog, 167 stream_names=set(catalog_provider.stream_names), 168 ) 169 170 # Next create a new SQL processor instance with the given catalog - and a state writer 171 # that writes state to the internal SQL table and associates with the given source name. 172 return self._sql_processor_class( 173 sql_config=self, 174 catalog_provider=catalog_provider, 175 state_writer=state_writer or self.get_state_writer(source_name=source_name), 176 temp_dir=self.cache_dir, 177 temp_file_cleanup=self.cleanup, 178 ) 179 180 # Read methods: 181 182 def get_records( 183 self, 184 stream_name: str, 185 ) -> CachedDataset: 186 """Uses SQLAlchemy to select all rows from the table.""" 187 return CachedDataset(self, stream_name) 188 189 def get_pandas_dataframe( 190 self, 191 stream_name: str, 192 ) -> pd.DataFrame: 193 """Return a Pandas data frame with the stream's data.""" 194 table_name = self._read_processor.get_sql_table_name(stream_name) 195 engine = self.get_sql_engine() 196 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 197 198 def get_arrow_dataset( 199 self, 200 stream_name: str, 201 *, 202 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 203 ) -> ds.Dataset: 204 """Return an Arrow Dataset with the stream's data.""" 205 table_name = self._read_processor.get_sql_table_name(stream_name) 206 engine = self.get_sql_engine() 207 208 # Read the table in chunks to handle large tables which does not fits in memory 209 pandas_chunks = pd.read_sql_table( 210 table_name=table_name, 211 con=engine, 212 schema=self.schema_name, 213 chunksize=max_chunk_size, 214 ) 215 216 arrow_batches_list = [] 217 arrow_schema = None 218 219 for pandas_chunk in pandas_chunks: 220 if arrow_schema is None: 221 # Initialize the schema with the first chunk 222 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 223 224 # Convert each pandas chunk to an Arrow Table 225 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 226 arrow_batches_list.append(arrow_table) 227 228 return ds.dataset(arrow_batches_list) 229 230 @final 231 @property 232 def streams(self) -> dict[str, CachedDataset]: 233 """Return a temporary table name.""" 234 result = {} 235 stream_names = set(self._catalog_backend.stream_names) 236 237 for stream_name in stream_names: 238 result[stream_name] = CachedDataset(self, stream_name) 239 240 return result 241 242 @final 243 def __len__(self) -> int: 244 """Gets the number of streams.""" 245 return len(self._catalog_backend.stream_names) 246 247 @final 248 def __bool__(self) -> bool: 249 """Always True. 250 251 This is needed so that caches with zero streams are not falsey (None-like). 252 """ 253 return True 254 255 def get_state_provider( 256 self, 257 source_name: str, 258 *, 259 refresh: bool = True, 260 destination_name: str | None = None, 261 ) -> StateProviderBase: 262 """Return a state provider for the specified source name.""" 263 return self._state_backend.get_state_provider( 264 source_name=source_name, 265 table_prefix=self.table_prefix or "", 266 refresh=refresh, 267 destination_name=destination_name, 268 ) 269 270 def get_state_writer( 271 self, 272 source_name: str, 273 destination_name: str | None = None, 274 ) -> StateWriterBase: 275 """Return a state writer for the specified source name. 276 277 If syncing to the cache, `destination_name` should be `None`. 278 If syncing to a destination, `destination_name` should be the destination name. 279 """ 280 return self._state_backend.get_state_writer( 281 source_name=source_name, 282 destination_name=destination_name, 283 ) 284 285 def register_source( 286 self, 287 source_name: str, 288 incoming_source_catalog: ConfiguredAirbyteCatalog, 289 stream_names: set[str], 290 ) -> None: 291 """Register the source name and catalog.""" 292 self._catalog_backend.register_source( 293 source_name=source_name, 294 incoming_source_catalog=incoming_source_catalog, 295 incoming_stream_names=stream_names, 296 ) 297 298 def create_source_tables( 299 self, 300 source: Source, 301 streams: Literal["*"] | list[str] | None = None, 302 ) -> None: 303 """Create tables in the cache for the provided source if they do not exist already. 304 305 Tables are created based upon the Source's catalog. 306 307 Args: 308 source: The source to create tables for. 309 streams: Stream names to create tables for. If None, use the Source's selected_streams 310 or "*" if neither is set. If "*", all available streams will be used. 311 """ 312 if streams is None: 313 streams = source.get_selected_streams() or "*" 314 315 catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams)) 316 317 # Register the incoming source catalog 318 self.register_source( 319 source_name=source.name, 320 incoming_source_catalog=catalog_provider.configured_catalog, 321 stream_names=set(catalog_provider.stream_names), 322 ) 323 324 # Ensure schema exists 325 self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 326 327 # Create tables for each stream if they don't exist 328 for stream_name in catalog_provider.stream_names: 329 self.processor._ensure_final_table_exists( # noqa: SLF001 330 stream_name=stream_name, 331 create_if_missing=True, 332 ) 333 334 def __getitem__(self, stream: str) -> CachedDataset: 335 """Return a dataset by stream name.""" 336 return self.streams[stream] 337 338 def __contains__(self, stream: str) -> bool: 339 """Return whether a stream is in the cache.""" 340 return stream in (self._catalog_backend.stream_names) 341 342 def __iter__( # type: ignore [override] # Overriding Pydantic model method 343 self, 344 ) -> Iterator[tuple[str, Any]]: 345 """Iterate over the streams in the cache.""" 346 return ((name, dataset) for name, dataset in self.streams.items()) 347 348 def _write_airbyte_message_stream( 349 self, 350 stdin: IO[str] | AirbyteMessageIterator, 351 *, 352 catalog_provider: CatalogProvider, 353 write_strategy: WriteStrategy, 354 state_writer: StateWriterBase | None = None, 355 progress_tracker: ProgressTracker, 356 ) -> None: 357 """Read from the connector and write to the cache.""" 358 cache_processor = self.get_record_processor( 359 source_name=self.name, 360 catalog_provider=catalog_provider, 361 state_writer=state_writer, 362 ) 363 cache_processor.process_airbyte_messages( 364 messages=stdin, 365 write_strategy=write_strategy, 366 progress_tracker=progress_tracker, 367 ) 368 progress_tracker.log_cache_processing_complete()
Base configuration for a cache.
Caches inherit from the matching SqlConfig
class, which provides the SQL config settings
and basic connectivity to the SQL database.
The cache is responsible for managing the state of the data synced to the cache, including the
stream catalog and stream state. The cache also provides the mechanism to read and write data
to the SQL backend specified in the SqlConfig
class.
78 def __init__(self, **data: Any) -> None: # noqa: ANN401 79 """Initialize the cache and backends.""" 80 super().__init__(**data) 81 82 # Create a temporary processor to do the work of ensuring the schema exists 83 temp_processor = self._sql_processor_class( 84 sql_config=self, 85 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 86 state_writer=StdOutStateWriter(), 87 temp_dir=self.cache_dir, 88 temp_file_cleanup=self.cleanup, 89 ) 90 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 91 92 # Initialize the catalog and state backends 93 self._catalog_backend = SqlCatalogBackend( 94 engine=self.get_sql_engine(), 95 table_prefix=self.table_prefix or "", 96 ) 97 self._state_backend = SqlStateBackend( 98 engine=self.get_sql_engine(), 99 table_prefix=self.table_prefix or "", 100 ) 101 102 # Now we can create the SQL read processor 103 self._read_processor = self._sql_processor_class( 104 sql_config=self, 105 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 106 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 107 temp_dir=self.cache_dir, 108 temp_file_cleanup=self.cleanup, 109 )
Initialize the cache and backends.
70 @property 71 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 72 """Return a dictionary of destination configuration values.""" 73 raise NotImplementedError( 74 f"The type '{type(self).__name__}' does not define an equivalent destination " 75 "configuration." 76 )
Return a dictionary of destination configuration values.
111 @property 112 def config_hash(self) -> str | None: 113 """Return a hash of the cache configuration. 114 115 This is the same as the SQLConfig hash from the superclass. 116 """ 117 return super(SqlConfig, self).config_hash
Return a hash of the cache configuration.
This is the same as the SQLConfig hash from the superclass.
119 def execute_sql(self, sql: str | list[str]) -> None: 120 """Execute one or more SQL statements against the cache's SQL backend. 121 122 If multiple SQL statements are given, they are executed in order, 123 within the same transaction. 124 125 This method is useful for creating tables, indexes, and other 126 schema objects in the cache. It does not return any results and it 127 automatically closes the connection after executing all statements. 128 129 This method is not intended for querying data. For that, use the `get_records` 130 method - or for a low-level interface, use the `get_sql_engine` method. 131 132 If any of the statements fail, the transaction is canceled and an exception 133 is raised. Most databases will rollback the transaction in this case. 134 """ 135 if isinstance(sql, str): 136 # Coerce to a list if a single string is given 137 sql = [sql] 138 139 with self.processor.get_sql_connection() as connection: 140 for sql_statement in sql: 141 connection.execute(text(sql_statement))
Execute one or more SQL statements against the cache's SQL backend.
If multiple SQL statements are given, they are executed in order, within the same transaction.
This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.
This method is not intended for querying data. For that, use the get_records
method - or for a low-level interface, use the get_sql_engine
method.
If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.
143 @final 144 @property 145 def processor(self) -> SqlProcessorBase: 146 """Return the SQL processor instance.""" 147 return self._read_processor
Return the SQL processor instance.
149 def get_record_processor( 150 self, 151 source_name: str, 152 catalog_provider: CatalogProvider, 153 state_writer: StateWriterBase | None = None, 154 ) -> SqlProcessorBase: 155 """Return a record processor for the specified source name and catalog. 156 157 We first register the source and its catalog with the catalog manager. Then we create a new 158 SQL processor instance with (only) the given input catalog. 159 160 For the state writer, we use a state writer which stores state in an internal SQL table. 161 """ 162 # First register the source and catalog into durable storage. This is necessary to ensure 163 # that we can later retrieve the catalog information. 164 self.register_source( 165 source_name=source_name, 166 incoming_source_catalog=catalog_provider.configured_catalog, 167 stream_names=set(catalog_provider.stream_names), 168 ) 169 170 # Next create a new SQL processor instance with the given catalog - and a state writer 171 # that writes state to the internal SQL table and associates with the given source name. 172 return self._sql_processor_class( 173 sql_config=self, 174 catalog_provider=catalog_provider, 175 state_writer=state_writer or self.get_state_writer(source_name=source_name), 176 temp_dir=self.cache_dir, 177 temp_file_cleanup=self.cleanup, 178 )
Return a record processor for the specified source name and catalog.
We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.
For the state writer, we use a state writer which stores state in an internal SQL table.
182 def get_records( 183 self, 184 stream_name: str, 185 ) -> CachedDataset: 186 """Uses SQLAlchemy to select all rows from the table.""" 187 return CachedDataset(self, stream_name)
Uses SQLAlchemy to select all rows from the table.
189 def get_pandas_dataframe( 190 self, 191 stream_name: str, 192 ) -> pd.DataFrame: 193 """Return a Pandas data frame with the stream's data.""" 194 table_name = self._read_processor.get_sql_table_name(stream_name) 195 engine = self.get_sql_engine() 196 return pd.read_sql_table(table_name, engine, schema=self.schema_name)
Return a Pandas data frame with the stream's data.
198 def get_arrow_dataset( 199 self, 200 stream_name: str, 201 *, 202 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 203 ) -> ds.Dataset: 204 """Return an Arrow Dataset with the stream's data.""" 205 table_name = self._read_processor.get_sql_table_name(stream_name) 206 engine = self.get_sql_engine() 207 208 # Read the table in chunks to handle large tables which does not fits in memory 209 pandas_chunks = pd.read_sql_table( 210 table_name=table_name, 211 con=engine, 212 schema=self.schema_name, 213 chunksize=max_chunk_size, 214 ) 215 216 arrow_batches_list = [] 217 arrow_schema = None 218 219 for pandas_chunk in pandas_chunks: 220 if arrow_schema is None: 221 # Initialize the schema with the first chunk 222 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 223 224 # Convert each pandas chunk to an Arrow Table 225 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 226 arrow_batches_list.append(arrow_table) 227 228 return ds.dataset(arrow_batches_list)
Return an Arrow Dataset with the stream's data.
230 @final 231 @property 232 def streams(self) -> dict[str, CachedDataset]: 233 """Return a temporary table name.""" 234 result = {} 235 stream_names = set(self._catalog_backend.stream_names) 236 237 for stream_name in stream_names: 238 result[stream_name] = CachedDataset(self, stream_name) 239 240 return result
Return a temporary table name.
255 def get_state_provider( 256 self, 257 source_name: str, 258 *, 259 refresh: bool = True, 260 destination_name: str | None = None, 261 ) -> StateProviderBase: 262 """Return a state provider for the specified source name.""" 263 return self._state_backend.get_state_provider( 264 source_name=source_name, 265 table_prefix=self.table_prefix or "", 266 refresh=refresh, 267 destination_name=destination_name, 268 )
Return a state provider for the specified source name.
270 def get_state_writer( 271 self, 272 source_name: str, 273 destination_name: str | None = None, 274 ) -> StateWriterBase: 275 """Return a state writer for the specified source name. 276 277 If syncing to the cache, `destination_name` should be `None`. 278 If syncing to a destination, `destination_name` should be the destination name. 279 """ 280 return self._state_backend.get_state_writer( 281 source_name=source_name, 282 destination_name=destination_name, 283 )
Return a state writer for the specified source name.
If syncing to the cache, destination_name
should be None
.
If syncing to a destination, destination_name
should be the destination name.
285 def register_source( 286 self, 287 source_name: str, 288 incoming_source_catalog: ConfiguredAirbyteCatalog, 289 stream_names: set[str], 290 ) -> None: 291 """Register the source name and catalog.""" 292 self._catalog_backend.register_source( 293 source_name=source_name, 294 incoming_source_catalog=incoming_source_catalog, 295 incoming_stream_names=stream_names, 296 )
Register the source name and catalog.
298 def create_source_tables( 299 self, 300 source: Source, 301 streams: Literal["*"] | list[str] | None = None, 302 ) -> None: 303 """Create tables in the cache for the provided source if they do not exist already. 304 305 Tables are created based upon the Source's catalog. 306 307 Args: 308 source: The source to create tables for. 309 streams: Stream names to create tables for. If None, use the Source's selected_streams 310 or "*" if neither is set. If "*", all available streams will be used. 311 """ 312 if streams is None: 313 streams = source.get_selected_streams() or "*" 314 315 catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams)) 316 317 # Register the incoming source catalog 318 self.register_source( 319 source_name=source.name, 320 incoming_source_catalog=catalog_provider.configured_catalog, 321 stream_names=set(catalog_provider.stream_names), 322 ) 323 324 # Ensure schema exists 325 self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 326 327 # Create tables for each stream if they don't exist 328 for stream_name in catalog_provider.stream_names: 329 self.processor._ensure_final_table_exists( # noqa: SLF001 330 stream_name=stream_name, 331 create_if_missing=True, 332 )
Create tables in the cache for the provided source if they do not exist already.
Tables are created based upon the Source's catalog.
Arguments:
- source: The source to create tables for.
- streams: Stream names to create tables for. If None, use the Source's selected_streams or "" if neither is set. If "", all available streams will be used.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
328def init_private_attributes(self: BaseModel, context: Any, /) -> None: 329 """This function is meant to behave like a BaseModel method to initialise private attributes. 330 331 It takes context as an argument since that's what pydantic-core passes when calling it. 332 333 Args: 334 self: The BaseModel instance. 335 context: The context. 336 """ 337 if getattr(self, '__pydantic_private__', None) is None: 338 pydantic_private = {} 339 for name, private_attr in self.__private_attributes__.items(): 340 default = private_attr.get_default() 341 if default is not PydanticUndefined: 342 pydantic_private[name] = default 343 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_sql_alchemy_url
- get_database_name
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
44class DuckDBCache(DuckDBConfig, CacheBase): 45 """A DuckDB cache.""" 46 47 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor 48 49 paired_destination_name: ClassVar[str | None] = "destination-duckdb" 50 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 51 52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
A DuckDB cache.
52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.duckdb.DuckDBConfig
- db_path
- schema_name
- get_sql_alchemy_url
- get_database_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
72class MotherDuckCache(MotherDuckConfig, DuckDBCache): 73 """Cache that uses MotherDuck for external persistent storage.""" 74 75 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor 76 77 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 78 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 79 80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Cache that uses MotherDuck for external persistent storage.
80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte.caches.motherduck.MotherDuckConfig
- database
- api_key
- db_path
- get_sql_alchemy_url
- get_database_name
- airbyte._processors.sql.duckdb.DuckDBConfig
- schema_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
38class PostgresCache(PostgresConfig, CacheBase): 39 """Configuration for the Postgres cache. 40 41 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 42 """ 43 44 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor 45 46 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 47 paired_destination_config_class: ClassVar[type | None] = DestinationPostgres 48 49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self) 53 54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Configuration for the Postgres cache.
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Return a DestinationPostgres instance with the same configuration.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.postgres.PostgresConfig
- host
- port
- database
- username
- password
- get_sql_alchemy_url
- get_database_name
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
37class SnowflakeCache(SnowflakeConfig, CacheBase): 38 """Configuration for the Snowflake cache.""" 39 40 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 41 42 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 43 44 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 45 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 46 47 @property 48 def paired_destination_config(self) -> DestinationSnowflake: 49 """Return a dictionary of destination configuration values.""" 50 return snowflake_cache_to_destination_configuration(cache=self)
Configuration for the Snowflake cache.
47 @property 48 def paired_destination_config(self) -> DestinationSnowflake: 49 """Return a dictionary of destination configuration values.""" 50 return snowflake_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.snowflake.SnowflakeConfig
- account
- username
- password
- warehouse
- database
- role
- schema_name
- data_retention_time_in_days
- get_create_table_extra_clauses
- get_database_name
- get_sql_alchemy_url
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name