airbyte.caches
Base module for all caches.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base module for all caches.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING 7 8from airbyte.caches.base import CacheBase 9from airbyte.caches.bigquery import BigQueryCache 10from airbyte.caches.duckdb import DuckDBCache 11from airbyte.caches.motherduck import MotherDuckCache 12from airbyte.caches.postgres import PostgresCache 13from airbyte.caches.snowflake import SnowflakeCache 14from airbyte.caches.util import get_default_cache, new_local_cache 15 16 17# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 18if TYPE_CHECKING: 19 # ruff: noqa: TC004 20 from airbyte.caches import base, bigquery, duckdb, motherduck, postgres, snowflake, util 21 22# We export these classes for easy access: `airbyte.caches...` 23__all__ = [ 24 # Factories 25 "get_default_cache", 26 "new_local_cache", 27 # Classes 28 "BigQueryCache", 29 "CacheBase", 30 "DuckDBCache", 31 "MotherDuckCache", 32 "PostgresCache", 33 "SnowflakeCache", 34 # Submodules, 35 "util", 36 "bigquery", 37 "duckdb", 38 "motherduck", 39 "postgres", 40 "snowflake", 41 "base", 42]
27def get_default_cache() -> DuckDBCache: 28 """Get a local cache for storing data, using the default database path. 29 30 Cache files are stored in the `.cache` directory, relative to the current 31 working directory. 32 """ 33 cache_dir = Path("./.cache/default_cache") 34 return DuckDBCache( 35 db_path=cache_dir / "default_cache.duckdb", 36 cache_dir=cache_dir, 37 )
Get a local cache for storing data, using the default database path.
Cache files are stored in the .cache
directory, relative to the current
working directory.
40def new_local_cache( 41 cache_name: str | None = None, 42 cache_dir: str | Path | None = None, 43 *, 44 cleanup: bool = True, 45) -> DuckDBCache: 46 """Get a local cache for storing data, using a name string to seed the path. 47 48 Args: 49 cache_name: Name to use for the cache. Defaults to None. 50 cache_dir: Root directory to store the cache in. Defaults to None. 51 cleanup: Whether to clean up temporary files. Defaults to True. 52 53 Cache files are stored in the `.cache` directory, relative to the current 54 working directory. 55 """ 56 if cache_name: 57 if " " in cache_name: 58 raise exc.PyAirbyteInputError( 59 message="Cache name cannot contain spaces.", 60 input_value=cache_name, 61 ) 62 63 if not cache_name.replace("_", "").isalnum(): 64 raise exc.PyAirbyteInputError( 65 message="Cache name can only contain alphanumeric characters and underscores.", 66 input_value=cache_name, 67 ) 68 69 cache_name = cache_name or str(ulid.ULID()) 70 cache_dir = cache_dir or Path(f"./.cache/{cache_name}") 71 if not isinstance(cache_dir, Path): 72 cache_dir = Path(cache_dir) 73 74 return DuckDBCache( 75 db_path=cache_dir / f"db_{cache_name}.duckdb", 76 cache_dir=cache_dir, 77 cleanup=cleanup, 78 )
Get a local cache for storing data, using a name string to seed the path.
Arguments:
- cache_name: Name to use for the cache. Defaults to None.
- cache_dir: Root directory to store the cache in. Defaults to None.
- cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the .cache
directory, relative to the current
working directory.
39class BigQueryCache(BigQueryConfig, CacheBase): 40 """The BigQuery cache implementation.""" 41 42 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor 43 44 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 45 paired_destination_config_class: ClassVar[type | None] = DestinationBigquery 46 47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self) 51 52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
The BigQuery cache implementation.
47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table
.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.bigquery.BigQueryConfig
- database_name
- schema_name
- credentials_path
- project_name
- dataset_name
- get_sql_alchemy_url
- get_database_name
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
41class CacheBase(SqlConfig, AirbyteWriterInterface): 42 """Base configuration for a cache. 43 44 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 45 and basic connectivity to the SQL database. 46 47 The cache is responsible for managing the state of the data synced to the cache, including the 48 stream catalog and stream state. The cache also provides the mechanism to read and write data 49 to the SQL backend specified in the `SqlConfig` class. 50 """ 51 52 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 53 """The directory to store the cache in.""" 54 55 cleanup: bool = TEMP_FILE_CLEANUP 56 """Whether to clean up the cache after use.""" 57 58 _name: str = PrivateAttr() 59 60 _sql_processor_class: ClassVar[type[SqlProcessorBase]] 61 _read_processor: SqlProcessorBase = PrivateAttr() 62 63 _catalog_backend: CatalogBackendBase = PrivateAttr() 64 _state_backend: StateBackendBase = PrivateAttr() 65 66 paired_destination_name: ClassVar[str | None] = None 67 paired_destination_config_class: ClassVar[type | None] = None 68 69 @property 70 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 71 """Return a dictionary of destination configuration values.""" 72 raise NotImplementedError( 73 f"The type '{type(self).__name__}' does not define an equivalent destination " 74 "configuration." 75 ) 76 77 def __init__(self, **data: Any) -> None: # noqa: ANN401 78 """Initialize the cache and backends.""" 79 super().__init__(**data) 80 81 # Create a temporary processor to do the work of ensuring the schema exists 82 temp_processor = self._sql_processor_class( 83 sql_config=self, 84 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 85 state_writer=StdOutStateWriter(), 86 temp_dir=self.cache_dir, 87 temp_file_cleanup=self.cleanup, 88 ) 89 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 90 91 # Initialize the catalog and state backends 92 self._catalog_backend = SqlCatalogBackend( 93 engine=self.get_sql_engine(), 94 table_prefix=self.table_prefix or "", 95 ) 96 self._state_backend = SqlStateBackend( 97 engine=self.get_sql_engine(), 98 table_prefix=self.table_prefix or "", 99 ) 100 101 # Now we can create the SQL read processor 102 self._read_processor = self._sql_processor_class( 103 sql_config=self, 104 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 105 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 106 temp_dir=self.cache_dir, 107 temp_file_cleanup=self.cleanup, 108 ) 109 110 @property 111 def config_hash(self) -> str | None: 112 """Return a hash of the cache configuration. 113 114 This is the same as the SQLConfig hash from the superclass. 115 """ 116 return super(SqlConfig, self).config_hash 117 118 def execute_sql(self, sql: str | list[str]) -> None: 119 """Execute one or more SQL statements against the cache's SQL backend. 120 121 If multiple SQL statements are given, they are executed in order, 122 within the same transaction. 123 124 This method is useful for creating tables, indexes, and other 125 schema objects in the cache. It does not return any results and it 126 automatically closes the connection after executing all statements. 127 128 This method is not intended for querying data. For that, use the `get_records` 129 method - or for a low-level interface, use the `get_sql_engine` method. 130 131 If any of the statements fail, the transaction is canceled and an exception 132 is raised. Most databases will rollback the transaction in this case. 133 """ 134 if isinstance(sql, str): 135 # Coerce to a list if a single string is given 136 sql = [sql] 137 138 with self.processor.get_sql_connection() as connection: 139 for sql_statement in sql: 140 connection.execute(text(sql_statement)) 141 142 @final 143 @property 144 def processor(self) -> SqlProcessorBase: 145 """Return the SQL processor instance.""" 146 return self._read_processor 147 148 def get_record_processor( 149 self, 150 source_name: str, 151 catalog_provider: CatalogProvider, 152 state_writer: StateWriterBase | None = None, 153 ) -> SqlProcessorBase: 154 """Return a record processor for the specified source name and catalog. 155 156 We first register the source and its catalog with the catalog manager. Then we create a new 157 SQL processor instance with (only) the given input catalog. 158 159 For the state writer, we use a state writer which stores state in an internal SQL table. 160 """ 161 # First register the source and catalog into durable storage. This is necessary to ensure 162 # that we can later retrieve the catalog information. 163 self.register_source( 164 source_name=source_name, 165 incoming_source_catalog=catalog_provider.configured_catalog, 166 stream_names=set(catalog_provider.stream_names), 167 ) 168 169 # Next create a new SQL processor instance with the given catalog - and a state writer 170 # that writes state to the internal SQL table and associates with the given source name. 171 return self._sql_processor_class( 172 sql_config=self, 173 catalog_provider=catalog_provider, 174 state_writer=state_writer or self.get_state_writer(source_name=source_name), 175 temp_dir=self.cache_dir, 176 temp_file_cleanup=self.cleanup, 177 ) 178 179 # Read methods: 180 181 def get_records( 182 self, 183 stream_name: str, 184 ) -> CachedDataset: 185 """Uses SQLAlchemy to select all rows from the table.""" 186 return CachedDataset(self, stream_name) 187 188 def get_pandas_dataframe( 189 self, 190 stream_name: str, 191 ) -> pd.DataFrame: 192 """Return a Pandas data frame with the stream's data.""" 193 table_name = self._read_processor.get_sql_table_name(stream_name) 194 engine = self.get_sql_engine() 195 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 196 197 def get_arrow_dataset( 198 self, 199 stream_name: str, 200 *, 201 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 202 ) -> ds.Dataset: 203 """Return an Arrow Dataset with the stream's data.""" 204 table_name = self._read_processor.get_sql_table_name(stream_name) 205 engine = self.get_sql_engine() 206 207 # Read the table in chunks to handle large tables which does not fits in memory 208 pandas_chunks = pd.read_sql_table( 209 table_name=table_name, 210 con=engine, 211 schema=self.schema_name, 212 chunksize=max_chunk_size, 213 ) 214 215 arrow_batches_list = [] 216 arrow_schema = None 217 218 for pandas_chunk in pandas_chunks: 219 if arrow_schema is None: 220 # Initialize the schema with the first chunk 221 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 222 223 # Convert each pandas chunk to an Arrow Table 224 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 225 arrow_batches_list.append(arrow_table) 226 227 return ds.dataset(arrow_batches_list) 228 229 @final 230 @property 231 def streams(self) -> dict[str, CachedDataset]: 232 """Return a temporary table name.""" 233 result = {} 234 stream_names = set(self._catalog_backend.stream_names) 235 236 for stream_name in stream_names: 237 result[stream_name] = CachedDataset(self, stream_name) 238 239 return result 240 241 @final 242 def __len__(self) -> int: 243 """Gets the number of streams.""" 244 return len(self._catalog_backend.stream_names) 245 246 @final 247 def __bool__(self) -> bool: 248 """Always True. 249 250 This is needed so that caches with zero streams are not falsey (None-like). 251 """ 252 return True 253 254 def get_state_provider( 255 self, 256 source_name: str, 257 *, 258 refresh: bool = True, 259 destination_name: str | None = None, 260 ) -> StateProviderBase: 261 """Return a state provider for the specified source name.""" 262 return self._state_backend.get_state_provider( 263 source_name=source_name, 264 table_prefix=self.table_prefix or "", 265 refresh=refresh, 266 destination_name=destination_name, 267 ) 268 269 def get_state_writer( 270 self, 271 source_name: str, 272 destination_name: str | None = None, 273 ) -> StateWriterBase: 274 """Return a state writer for the specified source name. 275 276 If syncing to the cache, `destination_name` should be `None`. 277 If syncing to a destination, `destination_name` should be the destination name. 278 """ 279 return self._state_backend.get_state_writer( 280 source_name=source_name, 281 destination_name=destination_name, 282 ) 283 284 def register_source( 285 self, 286 source_name: str, 287 incoming_source_catalog: ConfiguredAirbyteCatalog, 288 stream_names: set[str], 289 ) -> None: 290 """Register the source name and catalog.""" 291 self._catalog_backend.register_source( 292 source_name=source_name, 293 incoming_source_catalog=incoming_source_catalog, 294 incoming_stream_names=stream_names, 295 ) 296 297 def __getitem__(self, stream: str) -> CachedDataset: 298 """Return a dataset by stream name.""" 299 return self.streams[stream] 300 301 def __contains__(self, stream: str) -> bool: 302 """Return whether a stream is in the cache.""" 303 return stream in (self._catalog_backend.stream_names) 304 305 def __iter__( # type: ignore [override] # Overriding Pydantic model method 306 self, 307 ) -> Iterator[tuple[str, Any]]: 308 """Iterate over the streams in the cache.""" 309 return ((name, dataset) for name, dataset in self.streams.items()) 310 311 def _write_airbyte_message_stream( 312 self, 313 stdin: IO[str] | AirbyteMessageIterator, 314 *, 315 catalog_provider: CatalogProvider, 316 write_strategy: WriteStrategy, 317 state_writer: StateWriterBase | None = None, 318 progress_tracker: ProgressTracker, 319 ) -> None: 320 """Read from the connector and write to the cache.""" 321 cache_processor = self.get_record_processor( 322 source_name=self.name, 323 catalog_provider=catalog_provider, 324 state_writer=state_writer, 325 ) 326 cache_processor.process_airbyte_messages( 327 messages=stdin, 328 write_strategy=write_strategy, 329 progress_tracker=progress_tracker, 330 ) 331 progress_tracker.log_cache_processing_complete()
Base configuration for a cache.
Caches inherit from the matching SqlConfig
class, which provides the SQL config settings
and basic connectivity to the SQL database.
The cache is responsible for managing the state of the data synced to the cache, including the
stream catalog and stream state. The cache also provides the mechanism to read and write data
to the SQL backend specified in the SqlConfig
class.
77 def __init__(self, **data: Any) -> None: # noqa: ANN401 78 """Initialize the cache and backends.""" 79 super().__init__(**data) 80 81 # Create a temporary processor to do the work of ensuring the schema exists 82 temp_processor = self._sql_processor_class( 83 sql_config=self, 84 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 85 state_writer=StdOutStateWriter(), 86 temp_dir=self.cache_dir, 87 temp_file_cleanup=self.cleanup, 88 ) 89 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 90 91 # Initialize the catalog and state backends 92 self._catalog_backend = SqlCatalogBackend( 93 engine=self.get_sql_engine(), 94 table_prefix=self.table_prefix or "", 95 ) 96 self._state_backend = SqlStateBackend( 97 engine=self.get_sql_engine(), 98 table_prefix=self.table_prefix or "", 99 ) 100 101 # Now we can create the SQL read processor 102 self._read_processor = self._sql_processor_class( 103 sql_config=self, 104 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 105 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 106 temp_dir=self.cache_dir, 107 temp_file_cleanup=self.cleanup, 108 )
Initialize the cache and backends.
69 @property 70 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 71 """Return a dictionary of destination configuration values.""" 72 raise NotImplementedError( 73 f"The type '{type(self).__name__}' does not define an equivalent destination " 74 "configuration." 75 )
Return a dictionary of destination configuration values.
110 @property 111 def config_hash(self) -> str | None: 112 """Return a hash of the cache configuration. 113 114 This is the same as the SQLConfig hash from the superclass. 115 """ 116 return super(SqlConfig, self).config_hash
Return a hash of the cache configuration.
This is the same as the SQLConfig hash from the superclass.
118 def execute_sql(self, sql: str | list[str]) -> None: 119 """Execute one or more SQL statements against the cache's SQL backend. 120 121 If multiple SQL statements are given, they are executed in order, 122 within the same transaction. 123 124 This method is useful for creating tables, indexes, and other 125 schema objects in the cache. It does not return any results and it 126 automatically closes the connection after executing all statements. 127 128 This method is not intended for querying data. For that, use the `get_records` 129 method - or for a low-level interface, use the `get_sql_engine` method. 130 131 If any of the statements fail, the transaction is canceled and an exception 132 is raised. Most databases will rollback the transaction in this case. 133 """ 134 if isinstance(sql, str): 135 # Coerce to a list if a single string is given 136 sql = [sql] 137 138 with self.processor.get_sql_connection() as connection: 139 for sql_statement in sql: 140 connection.execute(text(sql_statement))
Execute one or more SQL statements against the cache's SQL backend.
If multiple SQL statements are given, they are executed in order, within the same transaction.
This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.
This method is not intended for querying data. For that, use the get_records
method - or for a low-level interface, use the get_sql_engine
method.
If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.
142 @final 143 @property 144 def processor(self) -> SqlProcessorBase: 145 """Return the SQL processor instance.""" 146 return self._read_processor
Return the SQL processor instance.
148 def get_record_processor( 149 self, 150 source_name: str, 151 catalog_provider: CatalogProvider, 152 state_writer: StateWriterBase | None = None, 153 ) -> SqlProcessorBase: 154 """Return a record processor for the specified source name and catalog. 155 156 We first register the source and its catalog with the catalog manager. Then we create a new 157 SQL processor instance with (only) the given input catalog. 158 159 For the state writer, we use a state writer which stores state in an internal SQL table. 160 """ 161 # First register the source and catalog into durable storage. This is necessary to ensure 162 # that we can later retrieve the catalog information. 163 self.register_source( 164 source_name=source_name, 165 incoming_source_catalog=catalog_provider.configured_catalog, 166 stream_names=set(catalog_provider.stream_names), 167 ) 168 169 # Next create a new SQL processor instance with the given catalog - and a state writer 170 # that writes state to the internal SQL table and associates with the given source name. 171 return self._sql_processor_class( 172 sql_config=self, 173 catalog_provider=catalog_provider, 174 state_writer=state_writer or self.get_state_writer(source_name=source_name), 175 temp_dir=self.cache_dir, 176 temp_file_cleanup=self.cleanup, 177 )
Return a record processor for the specified source name and catalog.
We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.
For the state writer, we use a state writer which stores state in an internal SQL table.
181 def get_records( 182 self, 183 stream_name: str, 184 ) -> CachedDataset: 185 """Uses SQLAlchemy to select all rows from the table.""" 186 return CachedDataset(self, stream_name)
Uses SQLAlchemy to select all rows from the table.
188 def get_pandas_dataframe( 189 self, 190 stream_name: str, 191 ) -> pd.DataFrame: 192 """Return a Pandas data frame with the stream's data.""" 193 table_name = self._read_processor.get_sql_table_name(stream_name) 194 engine = self.get_sql_engine() 195 return pd.read_sql_table(table_name, engine, schema=self.schema_name)
Return a Pandas data frame with the stream's data.
197 def get_arrow_dataset( 198 self, 199 stream_name: str, 200 *, 201 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 202 ) -> ds.Dataset: 203 """Return an Arrow Dataset with the stream's data.""" 204 table_name = self._read_processor.get_sql_table_name(stream_name) 205 engine = self.get_sql_engine() 206 207 # Read the table in chunks to handle large tables which does not fits in memory 208 pandas_chunks = pd.read_sql_table( 209 table_name=table_name, 210 con=engine, 211 schema=self.schema_name, 212 chunksize=max_chunk_size, 213 ) 214 215 arrow_batches_list = [] 216 arrow_schema = None 217 218 for pandas_chunk in pandas_chunks: 219 if arrow_schema is None: 220 # Initialize the schema with the first chunk 221 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 222 223 # Convert each pandas chunk to an Arrow Table 224 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 225 arrow_batches_list.append(arrow_table) 226 227 return ds.dataset(arrow_batches_list)
Return an Arrow Dataset with the stream's data.
229 @final 230 @property 231 def streams(self) -> dict[str, CachedDataset]: 232 """Return a temporary table name.""" 233 result = {} 234 stream_names = set(self._catalog_backend.stream_names) 235 236 for stream_name in stream_names: 237 result[stream_name] = CachedDataset(self, stream_name) 238 239 return result
Return a temporary table name.
254 def get_state_provider( 255 self, 256 source_name: str, 257 *, 258 refresh: bool = True, 259 destination_name: str | None = None, 260 ) -> StateProviderBase: 261 """Return a state provider for the specified source name.""" 262 return self._state_backend.get_state_provider( 263 source_name=source_name, 264 table_prefix=self.table_prefix or "", 265 refresh=refresh, 266 destination_name=destination_name, 267 )
Return a state provider for the specified source name.
269 def get_state_writer( 270 self, 271 source_name: str, 272 destination_name: str | None = None, 273 ) -> StateWriterBase: 274 """Return a state writer for the specified source name. 275 276 If syncing to the cache, `destination_name` should be `None`. 277 If syncing to a destination, `destination_name` should be the destination name. 278 """ 279 return self._state_backend.get_state_writer( 280 source_name=source_name, 281 destination_name=destination_name, 282 )
Return a state writer for the specified source name.
If syncing to the cache, destination_name
should be None
.
If syncing to a destination, destination_name
should be the destination name.
284 def register_source( 285 self, 286 source_name: str, 287 incoming_source_catalog: ConfiguredAirbyteCatalog, 288 stream_names: set[str], 289 ) -> None: 290 """Register the source name and catalog.""" 291 self._catalog_backend.register_source( 292 source_name=source_name, 293 incoming_source_catalog=incoming_source_catalog, 294 incoming_stream_names=stream_names, 295 )
Register the source name and catalog.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
384def init_private_attributes(self: BaseModel, context: Any, /) -> None: 385 """This function is meant to behave like a BaseModel method to initialise private attributes. 386 387 It takes context as an argument since that's what pydantic-core passes when calling it. 388 389 Args: 390 self: The BaseModel instance. 391 context: The context. 392 """ 393 if getattr(self, '__pydantic_private__', None) is None: 394 pydantic_private = {} 395 for name, private_attr in self.__private_attributes__.items(): 396 default = private_attr.get_default() 397 if default is not PydanticUndefined: 398 pydantic_private[name] = default 399 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_sql_alchemy_url
- get_database_name
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
44class DuckDBCache(DuckDBConfig, CacheBase): 45 """A DuckDB cache.""" 46 47 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor 48 49 paired_destination_name: ClassVar[str | None] = "destination-duckdb" 50 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 51 52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
A DuckDB cache.
52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.duckdb.DuckDBConfig
- db_path
- schema_name
- get_sql_alchemy_url
- get_database_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
72class MotherDuckCache(MotherDuckConfig, DuckDBCache): 73 """Cache that uses MotherDuck for external persistent storage.""" 74 75 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor 76 77 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 78 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 79 80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Cache that uses MotherDuck for external persistent storage.
80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte.caches.motherduck.MotherDuckConfig
- database
- api_key
- db_path
- get_sql_alchemy_url
- get_database_name
- airbyte._processors.sql.duckdb.DuckDBConfig
- schema_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
38class PostgresCache(PostgresConfig, CacheBase): 39 """Configuration for the Postgres cache. 40 41 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 42 """ 43 44 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor 45 46 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 47 paired_destination_config_class: ClassVar[type | None] = DestinationPostgres 48 49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self) 53 54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Configuration for the Postgres cache.
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Return a DestinationPostgres instance with the same configuration.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.postgres.PostgresConfig
- host
- port
- database
- username
- password
- get_sql_alchemy_url
- get_database_name
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
37class SnowflakeCache(SnowflakeConfig, CacheBase): 38 """Configuration for the Snowflake cache.""" 39 40 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 41 42 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 43 44 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 45 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 46 47 @property 48 def paired_destination_config(self) -> DestinationSnowflake: 49 """Return a dictionary of destination configuration values.""" 50 return snowflake_cache_to_destination_configuration(cache=self)
Configuration for the Snowflake cache.
47 @property 48 def paired_destination_config(self) -> DestinationSnowflake: 49 """Return a dictionary of destination configuration values.""" 50 return snowflake_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.snowflake.SnowflakeConfig
- account
- username
- password
- warehouse
- database
- role
- schema_name
- data_retention_time_in_days
- get_create_table_extra_clauses
- get_database_name
- get_sql_alchemy_url
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name