airbyte.caches
Base module for all caches.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base module for all caches.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING 7 8from airbyte.caches.base import CacheBase 9from airbyte.caches.bigquery import BigQueryCache 10from airbyte.caches.duckdb import DuckDBCache 11from airbyte.caches.motherduck import MotherDuckCache 12from airbyte.caches.postgres import PostgresCache 13from airbyte.caches.snowflake import SnowflakeCache 14from airbyte.caches.util import get_default_cache, new_local_cache 15 16 17# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 18if TYPE_CHECKING: 19 # ruff: noqa: TC004 20 from airbyte.caches import base, bigquery, duckdb, motherduck, postgres, snowflake, util 21 22# We export these classes for easy access: `airbyte.caches...` 23__all__ = [ 24 # Factories 25 "get_default_cache", 26 "new_local_cache", 27 # Classes 28 "BigQueryCache", 29 "CacheBase", 30 "DuckDBCache", 31 "MotherDuckCache", 32 "PostgresCache", 33 "SnowflakeCache", 34 # Submodules, 35 "util", 36 "bigquery", 37 "duckdb", 38 "motherduck", 39 "postgres", 40 "snowflake", 41 "base", 42]
31def get_default_cache() -> DuckDBCache: 32 """Get a local cache for storing data, using the default database path. 33 34 Cache files are stored in the `.cache` directory, relative to the current 35 working directory. 36 """ 37 cache_dir = DEFAULT_CACHE_ROOT / "default_cache" 38 return DuckDBCache( 39 db_path=cache_dir / "default_cache.duckdb", 40 cache_dir=cache_dir, 41 )
Get a local cache for storing data, using the default database path.
Cache files are stored in the .cache
directory, relative to the current
working directory.
44def new_local_cache( 45 cache_name: str | None = None, 46 cache_dir: str | Path | None = None, 47 *, 48 cleanup: bool = True, 49) -> DuckDBCache: 50 """Get a local cache for storing data, using a name string to seed the path. 51 52 Args: 53 cache_name: Name to use for the cache. Defaults to None. 54 cache_dir: Root directory to store the cache in. Defaults to None. 55 cleanup: Whether to clean up temporary files. Defaults to True. 56 57 Cache files are stored in the `.cache` directory, relative to the current 58 working directory. 59 """ 60 if cache_name: 61 if " " in cache_name: 62 raise exc.PyAirbyteInputError( 63 message="Cache name cannot contain spaces.", 64 input_value=cache_name, 65 ) 66 67 if not cache_name.replace("_", "").isalnum(): 68 raise exc.PyAirbyteInputError( 69 message="Cache name can only contain alphanumeric characters and underscores.", 70 input_value=cache_name, 71 ) 72 73 cache_name = cache_name or str(ulid.ULID()) 74 cache_dir = cache_dir or (DEFAULT_CACHE_ROOT / cache_name) 75 if not isinstance(cache_dir, Path): 76 cache_dir = Path(cache_dir) 77 78 return DuckDBCache( 79 db_path=cache_dir / f"db_{cache_name}.duckdb", 80 cache_dir=cache_dir, 81 cleanup=cleanup, 82 )
Get a local cache for storing data, using a name string to seed the path.
Arguments:
- cache_name: Name to use for the cache. Defaults to None.
- cache_dir: Root directory to store the cache in. Defaults to None.
- cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the .cache
directory, relative to the current
working directory.
39class BigQueryCache(BigQueryConfig, CacheBase): 40 """The BigQuery cache implementation.""" 41 42 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor 43 44 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 45 paired_destination_config_class: ClassVar[type | None] = DestinationBigquery 46 47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self) 51 52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
The BigQuery cache implementation.
47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table
.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- close
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.bigquery.BigQueryConfig
- database_name
- schema_name
- credentials_path
- dataset_location
- project_name
- dataset_name
- get_sql_alchemy_url
- get_database_name
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- get_sql_engine
- dispose_engine
- pydantic.main.BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
46class CacheBase(SqlConfig, AirbyteWriterInterface): # noqa: PLR0904 47 """Base configuration for a cache. 48 49 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 50 and basic connectivity to the SQL database. 51 52 The cache is responsible for managing the state of the data synced to the cache, including the 53 stream catalog and stream state. The cache also provides the mechanism to read and write data 54 to the SQL backend specified in the `SqlConfig` class. 55 """ 56 57 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 58 """The directory to store the cache in.""" 59 60 cleanup: bool = TEMP_FILE_CLEANUP 61 """Whether to clean up the cache after use.""" 62 63 _name: str = PrivateAttr() 64 65 _sql_processor_class: ClassVar[type[SqlProcessorBase]] 66 _read_processor: SqlProcessorBase = PrivateAttr() 67 68 _catalog_backend: CatalogBackendBase = PrivateAttr() 69 _state_backend: StateBackendBase = PrivateAttr() 70 71 paired_destination_name: ClassVar[str | None] = None 72 paired_destination_config_class: ClassVar[type | None] = None 73 74 @property 75 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 76 """Return a dictionary of destination configuration values.""" 77 raise NotImplementedError( 78 f"The type '{type(self).__name__}' does not define an equivalent destination " 79 "configuration." 80 ) 81 82 def __init__(self, **data: Any) -> None: # noqa: ANN401 83 """Initialize the cache and backends.""" 84 super().__init__(**data) 85 86 # Create a temporary processor to do the work of ensuring the schema exists 87 temp_processor = self._sql_processor_class( 88 sql_config=self, 89 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 90 state_writer=StdOutStateWriter(), 91 temp_dir=self.cache_dir, 92 temp_file_cleanup=self.cleanup, 93 ) 94 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 95 96 # Initialize the catalog and state backends 97 self._catalog_backend = SqlCatalogBackend( 98 sql_config=self, 99 table_prefix=self.table_prefix or "", 100 ) 101 self._state_backend = SqlStateBackend( 102 sql_config=self, 103 table_prefix=self.table_prefix or "", 104 ) 105 106 # Now we can create the SQL read processor 107 self._read_processor = self._sql_processor_class( 108 sql_config=self, 109 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 110 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 111 temp_dir=self.cache_dir, 112 temp_file_cleanup=self.cleanup, 113 ) 114 115 def close(self) -> None: 116 """Close all database connections and dispose of connection pools. 117 118 This method ensures that all SQLAlchemy engines created by this cache 119 and its processors are properly disposed, releasing all database connections. 120 This is especially important for file-based databases like DuckDB, which 121 lock the database file until all connections are closed. 122 123 This method is idempotent and can be called multiple times safely. 124 125 Raises: 126 Exception: If any engine disposal fails, the exception will propagate 127 to the caller. This ensures callers are aware of cleanup failures. 128 """ 129 if self._read_processor is not None: 130 self._read_processor.sql_config.dispose_engine() 131 132 if self._catalog_backend is not None: 133 self._catalog_backend._sql_config.dispose_engine() # noqa: SLF001 134 135 if self._state_backend is not None: 136 self._state_backend._sql_config.dispose_engine() # noqa: SLF001 137 138 self.dispose_engine() 139 140 def __enter__(self) -> Self: 141 """Enter context manager.""" 142 return self 143 144 def __exit__( 145 self, 146 exc_type: type[BaseException] | None, 147 exc_val: BaseException | None, 148 exc_tb: TracebackType | None, 149 ) -> None: 150 """Exit context manager and clean up resources.""" 151 self.close() 152 153 def __del__(self) -> None: 154 """Clean up resources when cache is garbage collected.""" 155 with contextlib.suppress(Exception): 156 self.close() 157 158 @property 159 def config_hash(self) -> str | None: 160 """Return a hash of the cache configuration. 161 162 This is the same as the SQLConfig hash from the superclass. 163 """ 164 return super(SqlConfig, self).config_hash 165 166 def execute_sql(self, sql: str | list[str]) -> None: 167 """Execute one or more SQL statements against the cache's SQL backend. 168 169 If multiple SQL statements are given, they are executed in order, 170 within the same transaction. 171 172 This method is useful for creating tables, indexes, and other 173 schema objects in the cache. It does not return any results and it 174 automatically closes the connection after executing all statements. 175 176 This method is not intended for querying data. For that, use the `get_records` 177 method - or for a low-level interface, use the `get_sql_engine` method. 178 179 If any of the statements fail, the transaction is canceled and an exception 180 is raised. Most databases will rollback the transaction in this case. 181 """ 182 if isinstance(sql, str): 183 # Coerce to a list if a single string is given 184 sql = [sql] 185 186 with self.processor.get_sql_connection() as connection: 187 for sql_statement in sql: 188 connection.execute(text(sql_statement)) 189 190 @final 191 @property 192 def processor(self) -> SqlProcessorBase: 193 """Return the SQL processor instance.""" 194 return self._read_processor 195 196 def run_sql_query( 197 self, 198 sql_query: str, 199 *, 200 max_records: int | None = None, 201 ) -> list[dict[str, Any]]: 202 """Run a SQL query against the cache and return results as a list of dictionaries. 203 204 This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. 205 For DDL statements or multiple statements, use the processor directly. 206 207 Args: 208 sql_query: The SQL query to execute 209 max_records: Maximum number of records to return. If None, returns all records. 210 211 Returns: 212 List of dictionaries representing the query results 213 """ 214 # Execute the SQL within a connection context to ensure the connection stays open 215 # while we fetch the results 216 sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query 217 218 with self.processor.get_sql_connection() as conn: 219 try: 220 result = conn.execute(sql_text) 221 except ( 222 sqlalchemy_exc.ProgrammingError, 223 sqlalchemy_exc.SQLAlchemyError, 224 ) as ex: 225 msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}" 226 raise RuntimeError(msg) from ex 227 228 # Convert the result to a list of dictionaries while connection is still open 229 if result.returns_rows: 230 # Get column names 231 columns = list(result.keys()) if result.keys() else [] 232 233 # Fetch rows efficiently based on limit 234 if max_records is not None: 235 rows = result.fetchmany(max_records) 236 else: 237 rows = result.fetchall() 238 239 return [dict(zip(columns, row, strict=True)) for row in rows] 240 241 # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.) 242 return [] 243 244 def get_record_processor( 245 self, 246 source_name: str, 247 catalog_provider: CatalogProvider, 248 state_writer: StateWriterBase | None = None, 249 ) -> SqlProcessorBase: 250 """Return a record processor for the specified source name and catalog. 251 252 We first register the source and its catalog with the catalog manager. Then we create a new 253 SQL processor instance with (only) the given input catalog. 254 255 For the state writer, we use a state writer which stores state in an internal SQL table. 256 """ 257 # First register the source and catalog into durable storage. This is necessary to ensure 258 # that we can later retrieve the catalog information. 259 self.register_source( 260 source_name=source_name, 261 incoming_source_catalog=catalog_provider.configured_catalog, 262 stream_names=set(catalog_provider.stream_names), 263 ) 264 265 # Next create a new SQL processor instance with the given catalog - and a state writer 266 # that writes state to the internal SQL table and associates with the given source name. 267 return self._sql_processor_class( 268 sql_config=self, 269 catalog_provider=catalog_provider, 270 state_writer=state_writer or self.get_state_writer(source_name=source_name), 271 temp_dir=self.cache_dir, 272 temp_file_cleanup=self.cleanup, 273 ) 274 275 # Read methods: 276 277 def get_records( 278 self, 279 stream_name: str, 280 ) -> CachedDataset: 281 """Uses SQLAlchemy to select all rows from the table.""" 282 return CachedDataset(self, stream_name) 283 284 def get_pandas_dataframe( 285 self, 286 stream_name: str, 287 ) -> pd.DataFrame: 288 """Return a Pandas data frame with the stream's data.""" 289 table_name = self._read_processor.get_sql_table_name(stream_name) 290 engine = self.get_sql_engine() 291 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 292 293 def get_arrow_dataset( 294 self, 295 stream_name: str, 296 *, 297 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 298 ) -> ds.Dataset: 299 """Return an Arrow Dataset with the stream's data.""" 300 table_name = self._read_processor.get_sql_table_name(stream_name) 301 engine = self.get_sql_engine() 302 303 # Read the table in chunks to handle large tables which does not fits in memory 304 pandas_chunks = pd.read_sql_table( 305 table_name=table_name, 306 con=engine, 307 schema=self.schema_name, 308 chunksize=max_chunk_size, 309 ) 310 311 arrow_batches_list = [] 312 arrow_schema = None 313 314 for pandas_chunk in pandas_chunks: 315 if arrow_schema is None: 316 # Initialize the schema with the first chunk 317 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 318 319 # Convert each pandas chunk to an Arrow Table 320 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 321 arrow_batches_list.append(arrow_table) 322 323 return ds.dataset(arrow_batches_list) 324 325 @final 326 @property 327 def streams(self) -> dict[str, CachedDataset]: 328 """Return a temporary table name.""" 329 result = {} 330 stream_names = set(self._catalog_backend.stream_names) 331 332 for stream_name in stream_names: 333 result[stream_name] = CachedDataset(self, stream_name) 334 335 return result 336 337 @final 338 def __len__(self) -> int: 339 """Gets the number of streams.""" 340 return len(self._catalog_backend.stream_names) 341 342 @final 343 def __bool__(self) -> bool: 344 """Always True. 345 346 This is needed so that caches with zero streams are not falsey (None-like). 347 """ 348 return True 349 350 def get_state_provider( 351 self, 352 source_name: str, 353 *, 354 refresh: bool = True, 355 destination_name: str | None = None, 356 ) -> StateProviderBase: 357 """Return a state provider for the specified source name.""" 358 return self._state_backend.get_state_provider( 359 source_name=source_name, 360 table_prefix=self.table_prefix or "", 361 refresh=refresh, 362 destination_name=destination_name, 363 ) 364 365 def get_state_writer( 366 self, 367 source_name: str, 368 destination_name: str | None = None, 369 ) -> StateWriterBase: 370 """Return a state writer for the specified source name. 371 372 If syncing to the cache, `destination_name` should be `None`. 373 If syncing to a destination, `destination_name` should be the destination name. 374 """ 375 return self._state_backend.get_state_writer( 376 source_name=source_name, 377 destination_name=destination_name, 378 ) 379 380 def register_source( 381 self, 382 source_name: str, 383 incoming_source_catalog: ConfiguredAirbyteCatalog, 384 stream_names: set[str], 385 ) -> None: 386 """Register the source name and catalog.""" 387 self._catalog_backend.register_source( 388 source_name=source_name, 389 incoming_source_catalog=incoming_source_catalog, 390 incoming_stream_names=stream_names, 391 ) 392 393 def create_source_tables( 394 self, 395 source: Source, 396 streams: Literal["*"] | list[str] | None = None, 397 ) -> None: 398 """Create tables in the cache for the provided source if they do not exist already. 399 400 Tables are created based upon the Source's catalog. 401 402 Args: 403 source: The source to create tables for. 404 streams: Stream names to create tables for. If None, use the Source's selected_streams 405 or "*" if neither is set. If "*", all available streams will be used. 406 """ 407 if streams is None: 408 streams = source.get_selected_streams() or "*" 409 410 catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams)) 411 412 # Register the incoming source catalog 413 self.register_source( 414 source_name=source.name, 415 incoming_source_catalog=catalog_provider.configured_catalog, 416 stream_names=set(catalog_provider.stream_names), 417 ) 418 419 # Ensure schema exists 420 self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 421 422 # Create tables for each stream if they don't exist 423 for stream_name in catalog_provider.stream_names: 424 self.processor._ensure_final_table_exists( # noqa: SLF001 425 stream_name=stream_name, 426 create_if_missing=True, 427 ) 428 429 def __getitem__(self, stream: str) -> CachedDataset: 430 """Return a dataset by stream name.""" 431 return self.streams[stream] 432 433 def __contains__(self, stream: str) -> bool: 434 """Return whether a stream is in the cache.""" 435 return stream in (self._catalog_backend.stream_names) 436 437 def __iter__( # type: ignore [override] # Overriding Pydantic model method 438 self, 439 ) -> Iterator[tuple[str, Any]]: 440 """Iterate over the streams in the cache.""" 441 return ((name, dataset) for name, dataset in self.streams.items()) 442 443 def _write_airbyte_message_stream( 444 self, 445 stdin: IO[str] | AirbyteMessageIterator, 446 *, 447 catalog_provider: CatalogProvider, 448 write_strategy: WriteStrategy, 449 state_writer: StateWriterBase | None = None, 450 progress_tracker: ProgressTracker, 451 ) -> None: 452 """Read from the connector and write to the cache.""" 453 cache_processor = self.get_record_processor( 454 source_name=self.name, 455 catalog_provider=catalog_provider, 456 state_writer=state_writer, 457 ) 458 cache_processor.process_airbyte_messages( 459 messages=stdin, 460 write_strategy=write_strategy, 461 progress_tracker=progress_tracker, 462 ) 463 progress_tracker.log_cache_processing_complete()
Base configuration for a cache.
Caches inherit from the matching SqlConfig
class, which provides the SQL config settings
and basic connectivity to the SQL database.
The cache is responsible for managing the state of the data synced to the cache, including the
stream catalog and stream state. The cache also provides the mechanism to read and write data
to the SQL backend specified in the SqlConfig
class.
82 def __init__(self, **data: Any) -> None: # noqa: ANN401 83 """Initialize the cache and backends.""" 84 super().__init__(**data) 85 86 # Create a temporary processor to do the work of ensuring the schema exists 87 temp_processor = self._sql_processor_class( 88 sql_config=self, 89 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 90 state_writer=StdOutStateWriter(), 91 temp_dir=self.cache_dir, 92 temp_file_cleanup=self.cleanup, 93 ) 94 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 95 96 # Initialize the catalog and state backends 97 self._catalog_backend = SqlCatalogBackend( 98 sql_config=self, 99 table_prefix=self.table_prefix or "", 100 ) 101 self._state_backend = SqlStateBackend( 102 sql_config=self, 103 table_prefix=self.table_prefix or "", 104 ) 105 106 # Now we can create the SQL read processor 107 self._read_processor = self._sql_processor_class( 108 sql_config=self, 109 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 110 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 111 temp_dir=self.cache_dir, 112 temp_file_cleanup=self.cleanup, 113 )
Initialize the cache and backends.
74 @property 75 def paired_destination_config(self) -> Any | dict[str, Any]: # noqa: ANN401 # Allow Any return type 76 """Return a dictionary of destination configuration values.""" 77 raise NotImplementedError( 78 f"The type '{type(self).__name__}' does not define an equivalent destination " 79 "configuration." 80 )
Return a dictionary of destination configuration values.
115 def close(self) -> None: 116 """Close all database connections and dispose of connection pools. 117 118 This method ensures that all SQLAlchemy engines created by this cache 119 and its processors are properly disposed, releasing all database connections. 120 This is especially important for file-based databases like DuckDB, which 121 lock the database file until all connections are closed. 122 123 This method is idempotent and can be called multiple times safely. 124 125 Raises: 126 Exception: If any engine disposal fails, the exception will propagate 127 to the caller. This ensures callers are aware of cleanup failures. 128 """ 129 if self._read_processor is not None: 130 self._read_processor.sql_config.dispose_engine() 131 132 if self._catalog_backend is not None: 133 self._catalog_backend._sql_config.dispose_engine() # noqa: SLF001 134 135 if self._state_backend is not None: 136 self._state_backend._sql_config.dispose_engine() # noqa: SLF001 137 138 self.dispose_engine()
Close all database connections and dispose of connection pools.
This method ensures that all SQLAlchemy engines created by this cache and its processors are properly disposed, releasing all database connections. This is especially important for file-based databases like DuckDB, which lock the database file until all connections are closed.
This method is idempotent and can be called multiple times safely.
Raises:
- Exception: If any engine disposal fails, the exception will propagate to the caller. This ensures callers are aware of cleanup failures.
158 @property 159 def config_hash(self) -> str | None: 160 """Return a hash of the cache configuration. 161 162 This is the same as the SQLConfig hash from the superclass. 163 """ 164 return super(SqlConfig, self).config_hash
Return a hash of the cache configuration.
This is the same as the SQLConfig hash from the superclass.
166 def execute_sql(self, sql: str | list[str]) -> None: 167 """Execute one or more SQL statements against the cache's SQL backend. 168 169 If multiple SQL statements are given, they are executed in order, 170 within the same transaction. 171 172 This method is useful for creating tables, indexes, and other 173 schema objects in the cache. It does not return any results and it 174 automatically closes the connection after executing all statements. 175 176 This method is not intended for querying data. For that, use the `get_records` 177 method - or for a low-level interface, use the `get_sql_engine` method. 178 179 If any of the statements fail, the transaction is canceled and an exception 180 is raised. Most databases will rollback the transaction in this case. 181 """ 182 if isinstance(sql, str): 183 # Coerce to a list if a single string is given 184 sql = [sql] 185 186 with self.processor.get_sql_connection() as connection: 187 for sql_statement in sql: 188 connection.execute(text(sql_statement))
Execute one or more SQL statements against the cache's SQL backend.
If multiple SQL statements are given, they are executed in order, within the same transaction.
This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.
This method is not intended for querying data. For that, use the get_records
method - or for a low-level interface, use the get_sql_engine
method.
If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.
190 @final 191 @property 192 def processor(self) -> SqlProcessorBase: 193 """Return the SQL processor instance.""" 194 return self._read_processor
Return the SQL processor instance.
196 def run_sql_query( 197 self, 198 sql_query: str, 199 *, 200 max_records: int | None = None, 201 ) -> list[dict[str, Any]]: 202 """Run a SQL query against the cache and return results as a list of dictionaries. 203 204 This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. 205 For DDL statements or multiple statements, use the processor directly. 206 207 Args: 208 sql_query: The SQL query to execute 209 max_records: Maximum number of records to return. If None, returns all records. 210 211 Returns: 212 List of dictionaries representing the query results 213 """ 214 # Execute the SQL within a connection context to ensure the connection stays open 215 # while we fetch the results 216 sql_text = text(sql_query) if isinstance(sql_query, str) else sql_query 217 218 with self.processor.get_sql_connection() as conn: 219 try: 220 result = conn.execute(sql_text) 221 except ( 222 sqlalchemy_exc.ProgrammingError, 223 sqlalchemy_exc.SQLAlchemyError, 224 ) as ex: 225 msg = f"Error when executing SQL:\n{sql_query}\n{type(ex).__name__}{ex!s}" 226 raise RuntimeError(msg) from ex 227 228 # Convert the result to a list of dictionaries while connection is still open 229 if result.returns_rows: 230 # Get column names 231 columns = list(result.keys()) if result.keys() else [] 232 233 # Fetch rows efficiently based on limit 234 if max_records is not None: 235 rows = result.fetchmany(max_records) 236 else: 237 rows = result.fetchall() 238 239 return [dict(zip(columns, row, strict=True)) for row in rows] 240 241 # For non-SELECT queries (INSERT, UPDATE, DELETE, etc.) 242 return []
Run a SQL query against the cache and return results as a list of dictionaries.
This method is designed for single DML statements like SELECT, SHOW, or DESCRIBE. For DDL statements or multiple statements, use the processor directly.
Arguments:
- sql_query: The SQL query to execute
- max_records: Maximum number of records to return. If None, returns all records.
Returns:
List of dictionaries representing the query results
244 def get_record_processor( 245 self, 246 source_name: str, 247 catalog_provider: CatalogProvider, 248 state_writer: StateWriterBase | None = None, 249 ) -> SqlProcessorBase: 250 """Return a record processor for the specified source name and catalog. 251 252 We first register the source and its catalog with the catalog manager. Then we create a new 253 SQL processor instance with (only) the given input catalog. 254 255 For the state writer, we use a state writer which stores state in an internal SQL table. 256 """ 257 # First register the source and catalog into durable storage. This is necessary to ensure 258 # that we can later retrieve the catalog information. 259 self.register_source( 260 source_name=source_name, 261 incoming_source_catalog=catalog_provider.configured_catalog, 262 stream_names=set(catalog_provider.stream_names), 263 ) 264 265 # Next create a new SQL processor instance with the given catalog - and a state writer 266 # that writes state to the internal SQL table and associates with the given source name. 267 return self._sql_processor_class( 268 sql_config=self, 269 catalog_provider=catalog_provider, 270 state_writer=state_writer or self.get_state_writer(source_name=source_name), 271 temp_dir=self.cache_dir, 272 temp_file_cleanup=self.cleanup, 273 )
Return a record processor for the specified source name and catalog.
We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.
For the state writer, we use a state writer which stores state in an internal SQL table.
277 def get_records( 278 self, 279 stream_name: str, 280 ) -> CachedDataset: 281 """Uses SQLAlchemy to select all rows from the table.""" 282 return CachedDataset(self, stream_name)
Uses SQLAlchemy to select all rows from the table.
284 def get_pandas_dataframe( 285 self, 286 stream_name: str, 287 ) -> pd.DataFrame: 288 """Return a Pandas data frame with the stream's data.""" 289 table_name = self._read_processor.get_sql_table_name(stream_name) 290 engine = self.get_sql_engine() 291 return pd.read_sql_table(table_name, engine, schema=self.schema_name)
Return a Pandas data frame with the stream's data.
293 def get_arrow_dataset( 294 self, 295 stream_name: str, 296 *, 297 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 298 ) -> ds.Dataset: 299 """Return an Arrow Dataset with the stream's data.""" 300 table_name = self._read_processor.get_sql_table_name(stream_name) 301 engine = self.get_sql_engine() 302 303 # Read the table in chunks to handle large tables which does not fits in memory 304 pandas_chunks = pd.read_sql_table( 305 table_name=table_name, 306 con=engine, 307 schema=self.schema_name, 308 chunksize=max_chunk_size, 309 ) 310 311 arrow_batches_list = [] 312 arrow_schema = None 313 314 for pandas_chunk in pandas_chunks: 315 if arrow_schema is None: 316 # Initialize the schema with the first chunk 317 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 318 319 # Convert each pandas chunk to an Arrow Table 320 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 321 arrow_batches_list.append(arrow_table) 322 323 return ds.dataset(arrow_batches_list)
Return an Arrow Dataset with the stream's data.
325 @final 326 @property 327 def streams(self) -> dict[str, CachedDataset]: 328 """Return a temporary table name.""" 329 result = {} 330 stream_names = set(self._catalog_backend.stream_names) 331 332 for stream_name in stream_names: 333 result[stream_name] = CachedDataset(self, stream_name) 334 335 return result
Return a temporary table name.
350 def get_state_provider( 351 self, 352 source_name: str, 353 *, 354 refresh: bool = True, 355 destination_name: str | None = None, 356 ) -> StateProviderBase: 357 """Return a state provider for the specified source name.""" 358 return self._state_backend.get_state_provider( 359 source_name=source_name, 360 table_prefix=self.table_prefix or "", 361 refresh=refresh, 362 destination_name=destination_name, 363 )
Return a state provider for the specified source name.
365 def get_state_writer( 366 self, 367 source_name: str, 368 destination_name: str | None = None, 369 ) -> StateWriterBase: 370 """Return a state writer for the specified source name. 371 372 If syncing to the cache, `destination_name` should be `None`. 373 If syncing to a destination, `destination_name` should be the destination name. 374 """ 375 return self._state_backend.get_state_writer( 376 source_name=source_name, 377 destination_name=destination_name, 378 )
Return a state writer for the specified source name.
If syncing to the cache, destination_name
should be None
.
If syncing to a destination, destination_name
should be the destination name.
380 def register_source( 381 self, 382 source_name: str, 383 incoming_source_catalog: ConfiguredAirbyteCatalog, 384 stream_names: set[str], 385 ) -> None: 386 """Register the source name and catalog.""" 387 self._catalog_backend.register_source( 388 source_name=source_name, 389 incoming_source_catalog=incoming_source_catalog, 390 incoming_stream_names=stream_names, 391 )
Register the source name and catalog.
393 def create_source_tables( 394 self, 395 source: Source, 396 streams: Literal["*"] | list[str] | None = None, 397 ) -> None: 398 """Create tables in the cache for the provided source if they do not exist already. 399 400 Tables are created based upon the Source's catalog. 401 402 Args: 403 source: The source to create tables for. 404 streams: Stream names to create tables for. If None, use the Source's selected_streams 405 or "*" if neither is set. If "*", all available streams will be used. 406 """ 407 if streams is None: 408 streams = source.get_selected_streams() or "*" 409 410 catalog_provider = CatalogProvider(source.get_configured_catalog(streams=streams)) 411 412 # Register the incoming source catalog 413 self.register_source( 414 source_name=source.name, 415 incoming_source_catalog=catalog_provider.configured_catalog, 416 stream_names=set(catalog_provider.stream_names), 417 ) 418 419 # Ensure schema exists 420 self.processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 421 422 # Create tables for each stream if they don't exist 423 for stream_name in catalog_provider.stream_names: 424 self.processor._ensure_final_table_exists( # noqa: SLF001 425 stream_name=stream_name, 426 create_if_missing=True, 427 )
Create tables in the cache for the provided source if they do not exist already.
Tables are created based upon the Source's catalog.
Arguments:
- source: The source to create tables for.
- streams: Stream names to create tables for. If None, use the Source's selected_streams or "" if neither is set. If "", all available streams will be used.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_sql_alchemy_url
- get_database_name
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- get_sql_engine
- dispose_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
44class DuckDBCache(DuckDBConfig, CacheBase): 45 """A DuckDB cache.""" 46 47 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor 48 49 paired_destination_name: ClassVar[str | None] = "destination-duckdb" 50 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 51 52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
A DuckDB cache.
52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- close
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.duckdb.DuckDBConfig
- db_path
- schema_name
- get_sql_alchemy_url
- get_database_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- dispose_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
72class MotherDuckCache(MotherDuckConfig, DuckDBCache): 73 """Cache that uses MotherDuck for external persistent storage.""" 74 75 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = MotherDuckSqlProcessor 76 77 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 78 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 79 80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Cache that uses MotherDuck for external persistent storage.
80 @property 81 def paired_destination_config(self) -> DestinationDuckdb: 82 """Return a dictionary of destination configuration values.""" 83 return motherduck_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- close
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte.caches.motherduck.MotherDuckConfig
- database
- api_key
- db_path
- get_sql_alchemy_url
- get_database_name
- airbyte._processors.sql.duckdb.DuckDBConfig
- schema_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- dispose_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
38class PostgresCache(PostgresConfig, CacheBase): 39 """Configuration for the Postgres cache. 40 41 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 42 """ 43 44 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor 45 46 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 47 paired_destination_config_class: ClassVar[type | None] = DestinationPostgres 48 49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self) 53 54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Configuration for the Postgres cache.
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Return a DestinationPostgres instance with the same configuration.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- close
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.postgres.PostgresConfig
- host
- port
- database
- username
- password
- get_sql_alchemy_url
- get_database_name
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_create_table_extra_clauses
- get_sql_alchemy_connect_args
- get_sql_engine
- dispose_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
75class SnowflakeCache(SnowflakeConfig, CacheBase): 76 """Configuration for the Snowflake cache.""" 77 78 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 79 80 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor 81 82 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 83 paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake 84 85 @property 86 def paired_destination_config(self) -> DestinationSnowflake: 87 """Return a dictionary of destination configuration values.""" 88 return snowflake_cache_to_destination_configuration(cache=self)
Configuration for the Snowflake cache.
85 @property 86 def paired_destination_config(self) -> DestinationSnowflake: 87 """Return a dictionary of destination configuration values.""" 88 return snowflake_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- close
- config_hash
- execute_sql
- processor
- run_sql_query
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- create_source_tables
- airbyte._processors.sql.snowflake.SnowflakeConfig
- account
- username
- password
- private_key
- private_key_path
- private_key_passphrase
- warehouse
- database
- role
- schema_name
- data_retention_time_in_days
- get_sql_alchemy_connect_args
- get_create_table_extra_clauses
- get_database_name
- get_sql_alchemy_url
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- dispose_engine
- pydantic.main.BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name