airbyte.caches
Base module for all caches.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Base module for all caches.""" 3 4from __future__ import annotations 5 6from airbyte.caches import base, bigquery, duckdb, motherduck, postgres, snowflake, util 7from airbyte.caches.base import CacheBase 8from airbyte.caches.bigquery import BigQueryCache 9from airbyte.caches.duckdb import DuckDBCache 10from airbyte.caches.motherduck import MotherDuckCache 11from airbyte.caches.postgres import PostgresCache 12from airbyte.caches.snowflake import SnowflakeCache 13from airbyte.caches.util import get_default_cache, new_local_cache 14 15 16# We export these classes for easy access: `airbyte.caches...` 17__all__ = [ 18 # Factories 19 "get_default_cache", 20 "new_local_cache", 21 # Classes 22 "BigQueryCache", 23 "CacheBase", 24 "DuckDBCache", 25 "MotherDuckCache", 26 "PostgresCache", 27 "SnowflakeCache", 28 # Submodules, 29 "util", 30 "bigquery", 31 "duckdb", 32 "motherduck", 33 "postgres", 34 "snowflake", 35 "base", 36]
27def get_default_cache() -> DuckDBCache: 28 """Get a local cache for storing data, using the default database path. 29 30 Cache files are stored in the `.cache` directory, relative to the current 31 working directory. 32 """ 33 cache_dir = Path("./.cache/default_cache") 34 return DuckDBCache( 35 db_path=cache_dir / "default_cache.duckdb", 36 cache_dir=cache_dir, 37 )
Get a local cache for storing data, using the default database path.
Cache files are stored in the .cache
directory, relative to the current
working directory.
40def new_local_cache( 41 cache_name: str | None = None, 42 cache_dir: str | Path | None = None, 43 *, 44 cleanup: bool = True, 45) -> DuckDBCache: 46 """Get a local cache for storing data, using a name string to seed the path. 47 48 Args: 49 cache_name: Name to use for the cache. Defaults to None. 50 cache_dir: Root directory to store the cache in. Defaults to None. 51 cleanup: Whether to clean up temporary files. Defaults to True. 52 53 Cache files are stored in the `.cache` directory, relative to the current 54 working directory. 55 """ 56 if cache_name: 57 if " " in cache_name: 58 raise exc.PyAirbyteInputError( 59 message="Cache name cannot contain spaces.", 60 input_value=cache_name, 61 ) 62 63 if not cache_name.replace("_", "").isalnum(): 64 raise exc.PyAirbyteInputError( 65 message="Cache name can only contain alphanumeric characters and underscores.", 66 input_value=cache_name, 67 ) 68 69 cache_name = cache_name or str(ulid.ULID()) 70 cache_dir = cache_dir or Path(f"./.cache/{cache_name}") 71 if not isinstance(cache_dir, Path): 72 cache_dir = Path(cache_dir) 73 74 return DuckDBCache( 75 db_path=cache_dir / f"db_{cache_name}.duckdb", 76 cache_dir=cache_dir, 77 cleanup=cleanup, 78 )
Get a local cache for storing data, using a name string to seed the path.
Arguments:
- cache_name: Name to use for the cache. Defaults to None.
- cache_dir: Root directory to store the cache in. Defaults to None.
- cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the .cache
directory, relative to the current
working directory.
32class BigQueryCache(BigQueryConfig, CacheBase): 33 """The BigQuery cache implementation.""" 34 35 _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor) 36 37 def get_arrow_dataset( 38 self, 39 stream_name: str, 40 *, 41 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 42 ) -> NoReturn: 43 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 44 45 See: https://github.com/airbytehq/PyAirbyte/issues/165 46 """ 47 raise NotImplementedError( 48 "BigQuery doesn't currently support to_arrow" 49 "Please consider using a different cache implementation for these functionalities." 50 )
The BigQuery cache implementation.
37 def get_arrow_dataset( 38 self, 39 stream_name: str, 40 *, 41 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 42 ) -> NoReturn: 43 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 44 45 See: https://github.com/airbytehq/PyAirbyte/issues/165 46 """ 47 raise NotImplementedError( 48 "BigQuery doesn't currently support to_arrow" 49 "Please consider using a different cache implementation for these functionalities." 50 )
Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table
.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.bigquery.BigQueryConfig
- database_name
- schema_name
- credentials_path
- project_name
- dataset_name
- get_sql_alchemy_url
- get_database_name
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
42class CacheBase(SqlConfig, AirbyteWriterInterface): 43 """Base configuration for a cache. 44 45 Caches inherit from the matching `SqlConfig` class, which provides the SQL config settings 46 and basic connectivity to the SQL database. 47 48 The cache is responsible for managing the state of the data synced to the cache, including the 49 stream catalog and stream state. The cache also provides the mechanism to read and write data 50 to the SQL backend specified in the `SqlConfig` class. 51 """ 52 53 cache_dir: Path = Field(default=Path(constants.DEFAULT_CACHE_ROOT)) 54 """The directory to store the cache in.""" 55 56 cleanup: bool = TEMP_FILE_CLEANUP 57 """Whether to clean up the cache after use.""" 58 59 _name: str = PrivateAttr() 60 61 _deployed_api_root: str | None = PrivateAttr(default=None) 62 _deployed_workspace_id: str | None = PrivateAttr(default=None) 63 _deployed_destination_id: str | None = PrivateAttr(default=None) 64 65 _sql_processor_class: type[SqlProcessorBase] = PrivateAttr() 66 _read_processor: SqlProcessorBase = PrivateAttr() 67 68 _catalog_backend: CatalogBackendBase = PrivateAttr() 69 _state_backend: StateBackendBase = PrivateAttr() 70 71 def __init__(self, **data: Any) -> None: # noqa: ANN401 72 """Initialize the cache and backends.""" 73 super().__init__(**data) 74 75 # Create a temporary processor to do the work of ensuring the schema exists 76 temp_processor = self._sql_processor_class( 77 sql_config=self, 78 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 79 state_writer=StdOutStateWriter(), 80 temp_dir=self.cache_dir, 81 temp_file_cleanup=self.cleanup, 82 ) 83 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 84 85 # Initialize the catalog and state backends 86 self._catalog_backend = SqlCatalogBackend( 87 engine=self.get_sql_engine(), 88 table_prefix=self.table_prefix or "", 89 ) 90 self._state_backend = SqlStateBackend( 91 engine=self.get_sql_engine(), 92 table_prefix=self.table_prefix or "", 93 ) 94 95 # Now we can create the SQL read processor 96 self._read_processor = self._sql_processor_class( 97 sql_config=self, 98 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 99 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 100 temp_dir=self.cache_dir, 101 temp_file_cleanup=self.cleanup, 102 ) 103 104 @property 105 def config_hash(self) -> str | None: 106 """Return a hash of the cache configuration. 107 108 This is the same as the SQLConfig hash from the superclass. 109 """ 110 return super(SqlConfig, self).config_hash 111 112 def execute_sql(self, sql: str | list[str]) -> None: 113 """Execute one or more SQL statements against the cache's SQL backend. 114 115 If multiple SQL statements are given, they are executed in order, 116 within the same transaction. 117 118 This method is useful for creating tables, indexes, and other 119 schema objects in the cache. It does not return any results and it 120 automatically closes the connection after executing all statements. 121 122 This method is not intended for querying data. For that, use the `get_records` 123 method - or for a low-level interface, use the `get_sql_engine` method. 124 125 If any of the statements fail, the transaction is canceled and an exception 126 is raised. Most databases will rollback the transaction in this case. 127 """ 128 if isinstance(sql, str): 129 # Coerce to a list if a single string is given 130 sql = [sql] 131 132 with self.processor.get_sql_connection() as connection: 133 for sql_statement in sql: 134 connection.execute(text(sql_statement)) 135 136 @final 137 @property 138 def processor(self) -> SqlProcessorBase: 139 """Return the SQL processor instance.""" 140 return self._read_processor 141 142 def get_record_processor( 143 self, 144 source_name: str, 145 catalog_provider: CatalogProvider, 146 state_writer: StateWriterBase | None = None, 147 ) -> SqlProcessorBase: 148 """Return a record processor for the specified source name and catalog. 149 150 We first register the source and its catalog with the catalog manager. Then we create a new 151 SQL processor instance with (only) the given input catalog. 152 153 For the state writer, we use a state writer which stores state in an internal SQL table. 154 """ 155 # First register the source and catalog into durable storage. This is necessary to ensure 156 # that we can later retrieve the catalog information. 157 self.register_source( 158 source_name=source_name, 159 incoming_source_catalog=catalog_provider.configured_catalog, 160 stream_names=set(catalog_provider.stream_names), 161 ) 162 163 # Next create a new SQL processor instance with the given catalog - and a state writer 164 # that writes state to the internal SQL table and associates with the given source name. 165 return self._sql_processor_class( 166 sql_config=self, 167 catalog_provider=catalog_provider, 168 state_writer=state_writer or self.get_state_writer(source_name=source_name), 169 temp_dir=self.cache_dir, 170 temp_file_cleanup=self.cleanup, 171 ) 172 173 # Read methods: 174 175 def get_records( 176 self, 177 stream_name: str, 178 ) -> CachedDataset: 179 """Uses SQLAlchemy to select all rows from the table.""" 180 return CachedDataset(self, stream_name) 181 182 def get_pandas_dataframe( 183 self, 184 stream_name: str, 185 ) -> pd.DataFrame: 186 """Return a Pandas data frame with the stream's data.""" 187 table_name = self._read_processor.get_sql_table_name(stream_name) 188 engine = self.get_sql_engine() 189 return pd.read_sql_table(table_name, engine, schema=self.schema_name) 190 191 def get_arrow_dataset( 192 self, 193 stream_name: str, 194 *, 195 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 196 ) -> ds.Dataset: 197 """Return an Arrow Dataset with the stream's data.""" 198 table_name = self._read_processor.get_sql_table_name(stream_name) 199 engine = self.get_sql_engine() 200 201 # Read the table in chunks to handle large tables which does not fits in memory 202 pandas_chunks = pd.read_sql_table( 203 table_name=table_name, 204 con=engine, 205 schema=self.schema_name, 206 chunksize=max_chunk_size, 207 ) 208 209 arrow_batches_list = [] 210 arrow_schema = None 211 212 for pandas_chunk in pandas_chunks: 213 if arrow_schema is None: 214 # Initialize the schema with the first chunk 215 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 216 217 # Convert each pandas chunk to an Arrow Table 218 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 219 arrow_batches_list.append(arrow_table) 220 221 return ds.dataset(arrow_batches_list) 222 223 @final 224 @property 225 def streams(self) -> dict[str, CachedDataset]: 226 """Return a temporary table name.""" 227 result = {} 228 stream_names = set(self._catalog_backend.stream_names) 229 230 for stream_name in stream_names: 231 result[stream_name] = CachedDataset(self, stream_name) 232 233 return result 234 235 def get_state_provider( 236 self, 237 source_name: str, 238 *, 239 refresh: bool = True, 240 destination_name: str | None = None, 241 ) -> StateProviderBase: 242 """Return a state provider for the specified source name.""" 243 return self._state_backend.get_state_provider( 244 source_name=source_name, 245 table_prefix=self.table_prefix or "", 246 refresh=refresh, 247 destination_name=destination_name, 248 ) 249 250 def get_state_writer( 251 self, 252 source_name: str, 253 destination_name: str | None = None, 254 ) -> StateWriterBase: 255 """Return a state writer for the specified source name. 256 257 If syncing to the cache, `destination_name` should be `None`. 258 If syncing to a destination, `destination_name` should be the destination name. 259 """ 260 return self._state_backend.get_state_writer( 261 source_name=source_name, 262 destination_name=destination_name, 263 ) 264 265 def register_source( 266 self, 267 source_name: str, 268 incoming_source_catalog: ConfiguredAirbyteCatalog, 269 stream_names: set[str], 270 ) -> None: 271 """Register the source name and catalog.""" 272 self._catalog_backend.register_source( 273 source_name=source_name, 274 incoming_source_catalog=incoming_source_catalog, 275 incoming_stream_names=stream_names, 276 ) 277 278 def __getitem__(self, stream: str) -> DatasetBase: 279 """Return a dataset by stream name.""" 280 return self.streams[stream] 281 282 def __contains__(self, stream: str) -> bool: 283 """Return whether a stream is in the cache.""" 284 return stream in (self._catalog_backend.stream_names) 285 286 def __iter__( # type: ignore [override] # Overriding Pydantic model method 287 self, 288 ) -> Iterator[tuple[str, Any]]: 289 """Iterate over the streams in the cache.""" 290 return ((name, dataset) for name, dataset in self.streams.items()) 291 292 def _write_airbyte_message_stream( 293 self, 294 stdin: IO[str] | AirbyteMessageIterator, 295 *, 296 catalog_provider: CatalogProvider, 297 write_strategy: WriteStrategy, 298 state_writer: StateWriterBase | None = None, 299 progress_tracker: ProgressTracker, 300 ) -> None: 301 """Read from the connector and write to the cache.""" 302 cache_processor = self.get_record_processor( 303 source_name=self.name, 304 catalog_provider=catalog_provider, 305 state_writer=state_writer, 306 ) 307 cache_processor.process_airbyte_messages( 308 messages=stdin, 309 write_strategy=write_strategy, 310 progress_tracker=progress_tracker, 311 ) 312 progress_tracker.log_cache_processing_complete()
Base configuration for a cache.
Caches inherit from the matching SqlConfig
class, which provides the SQL config settings
and basic connectivity to the SQL database.
The cache is responsible for managing the state of the data synced to the cache, including the
stream catalog and stream state. The cache also provides the mechanism to read and write data
to the SQL backend specified in the SqlConfig
class.
71 def __init__(self, **data: Any) -> None: # noqa: ANN401 72 """Initialize the cache and backends.""" 73 super().__init__(**data) 74 75 # Create a temporary processor to do the work of ensuring the schema exists 76 temp_processor = self._sql_processor_class( 77 sql_config=self, 78 catalog_provider=CatalogProvider(ConfiguredAirbyteCatalog(streams=[])), 79 state_writer=StdOutStateWriter(), 80 temp_dir=self.cache_dir, 81 temp_file_cleanup=self.cleanup, 82 ) 83 temp_processor._ensure_schema_exists() # noqa: SLF001 # Accessing non-public member 84 85 # Initialize the catalog and state backends 86 self._catalog_backend = SqlCatalogBackend( 87 engine=self.get_sql_engine(), 88 table_prefix=self.table_prefix or "", 89 ) 90 self._state_backend = SqlStateBackend( 91 engine=self.get_sql_engine(), 92 table_prefix=self.table_prefix or "", 93 ) 94 95 # Now we can create the SQL read processor 96 self._read_processor = self._sql_processor_class( 97 sql_config=self, 98 catalog_provider=self._catalog_backend.get_full_catalog_provider(), 99 state_writer=StdOutStateWriter(), # Shouldn't be needed for the read-only processor 100 temp_dir=self.cache_dir, 101 temp_file_cleanup=self.cleanup, 102 )
Initialize the cache and backends.
104 @property 105 def config_hash(self) -> str | None: 106 """Return a hash of the cache configuration. 107 108 This is the same as the SQLConfig hash from the superclass. 109 """ 110 return super(SqlConfig, self).config_hash
Return a hash of the cache configuration.
This is the same as the SQLConfig hash from the superclass.
112 def execute_sql(self, sql: str | list[str]) -> None: 113 """Execute one or more SQL statements against the cache's SQL backend. 114 115 If multiple SQL statements are given, they are executed in order, 116 within the same transaction. 117 118 This method is useful for creating tables, indexes, and other 119 schema objects in the cache. It does not return any results and it 120 automatically closes the connection after executing all statements. 121 122 This method is not intended for querying data. For that, use the `get_records` 123 method - or for a low-level interface, use the `get_sql_engine` method. 124 125 If any of the statements fail, the transaction is canceled and an exception 126 is raised. Most databases will rollback the transaction in this case. 127 """ 128 if isinstance(sql, str): 129 # Coerce to a list if a single string is given 130 sql = [sql] 131 132 with self.processor.get_sql_connection() as connection: 133 for sql_statement in sql: 134 connection.execute(text(sql_statement))
Execute one or more SQL statements against the cache's SQL backend.
If multiple SQL statements are given, they are executed in order, within the same transaction.
This method is useful for creating tables, indexes, and other schema objects in the cache. It does not return any results and it automatically closes the connection after executing all statements.
This method is not intended for querying data. For that, use the get_records
method - or for a low-level interface, use the get_sql_engine
method.
If any of the statements fail, the transaction is canceled and an exception is raised. Most databases will rollback the transaction in this case.
136 @final 137 @property 138 def processor(self) -> SqlProcessorBase: 139 """Return the SQL processor instance.""" 140 return self._read_processor
Return the SQL processor instance.
142 def get_record_processor( 143 self, 144 source_name: str, 145 catalog_provider: CatalogProvider, 146 state_writer: StateWriterBase | None = None, 147 ) -> SqlProcessorBase: 148 """Return a record processor for the specified source name and catalog. 149 150 We first register the source and its catalog with the catalog manager. Then we create a new 151 SQL processor instance with (only) the given input catalog. 152 153 For the state writer, we use a state writer which stores state in an internal SQL table. 154 """ 155 # First register the source and catalog into durable storage. This is necessary to ensure 156 # that we can later retrieve the catalog information. 157 self.register_source( 158 source_name=source_name, 159 incoming_source_catalog=catalog_provider.configured_catalog, 160 stream_names=set(catalog_provider.stream_names), 161 ) 162 163 # Next create a new SQL processor instance with the given catalog - and a state writer 164 # that writes state to the internal SQL table and associates with the given source name. 165 return self._sql_processor_class( 166 sql_config=self, 167 catalog_provider=catalog_provider, 168 state_writer=state_writer or self.get_state_writer(source_name=source_name), 169 temp_dir=self.cache_dir, 170 temp_file_cleanup=self.cleanup, 171 )
Return a record processor for the specified source name and catalog.
We first register the source and its catalog with the catalog manager. Then we create a new SQL processor instance with (only) the given input catalog.
For the state writer, we use a state writer which stores state in an internal SQL table.
175 def get_records( 176 self, 177 stream_name: str, 178 ) -> CachedDataset: 179 """Uses SQLAlchemy to select all rows from the table.""" 180 return CachedDataset(self, stream_name)
Uses SQLAlchemy to select all rows from the table.
182 def get_pandas_dataframe( 183 self, 184 stream_name: str, 185 ) -> pd.DataFrame: 186 """Return a Pandas data frame with the stream's data.""" 187 table_name = self._read_processor.get_sql_table_name(stream_name) 188 engine = self.get_sql_engine() 189 return pd.read_sql_table(table_name, engine, schema=self.schema_name)
Return a Pandas data frame with the stream's data.
191 def get_arrow_dataset( 192 self, 193 stream_name: str, 194 *, 195 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 196 ) -> ds.Dataset: 197 """Return an Arrow Dataset with the stream's data.""" 198 table_name = self._read_processor.get_sql_table_name(stream_name) 199 engine = self.get_sql_engine() 200 201 # Read the table in chunks to handle large tables which does not fits in memory 202 pandas_chunks = pd.read_sql_table( 203 table_name=table_name, 204 con=engine, 205 schema=self.schema_name, 206 chunksize=max_chunk_size, 207 ) 208 209 arrow_batches_list = [] 210 arrow_schema = None 211 212 for pandas_chunk in pandas_chunks: 213 if arrow_schema is None: 214 # Initialize the schema with the first chunk 215 arrow_schema = pa.Schema.from_pandas(pandas_chunk) 216 217 # Convert each pandas chunk to an Arrow Table 218 arrow_table = pa.RecordBatch.from_pandas(pandas_chunk, schema=arrow_schema) 219 arrow_batches_list.append(arrow_table) 220 221 return ds.dataset(arrow_batches_list)
Return an Arrow Dataset with the stream's data.
223 @final 224 @property 225 def streams(self) -> dict[str, CachedDataset]: 226 """Return a temporary table name.""" 227 result = {} 228 stream_names = set(self._catalog_backend.stream_names) 229 230 for stream_name in stream_names: 231 result[stream_name] = CachedDataset(self, stream_name) 232 233 return result
Return a temporary table name.
235 def get_state_provider( 236 self, 237 source_name: str, 238 *, 239 refresh: bool = True, 240 destination_name: str | None = None, 241 ) -> StateProviderBase: 242 """Return a state provider for the specified source name.""" 243 return self._state_backend.get_state_provider( 244 source_name=source_name, 245 table_prefix=self.table_prefix or "", 246 refresh=refresh, 247 destination_name=destination_name, 248 )
Return a state provider for the specified source name.
250 def get_state_writer( 251 self, 252 source_name: str, 253 destination_name: str | None = None, 254 ) -> StateWriterBase: 255 """Return a state writer for the specified source name. 256 257 If syncing to the cache, `destination_name` should be `None`. 258 If syncing to a destination, `destination_name` should be the destination name. 259 """ 260 return self._state_backend.get_state_writer( 261 source_name=source_name, 262 destination_name=destination_name, 263 )
Return a state writer for the specified source name.
If syncing to the cache, destination_name
should be None
.
If syncing to a destination, destination_name
should be the destination name.
265 def register_source( 266 self, 267 source_name: str, 268 incoming_source_catalog: ConfiguredAirbyteCatalog, 269 stream_names: set[str], 270 ) -> None: 271 """Register the source name and catalog.""" 272 self._catalog_backend.register_source( 273 source_name=source_name, 274 incoming_source_catalog=incoming_source_catalog, 275 incoming_stream_names=stream_names, 276 )
Register the source name and catalog.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
306def init_private_attributes(self: BaseModel, context: Any, /) -> None: 307 """This function is meant to behave like a BaseModel method to initialise private attributes. 308 309 It takes context as an argument since that's what pydantic-core passes when calling it. 310 311 Args: 312 self: The BaseModel instance. 313 context: The context. 314 """ 315 if getattr(self, '__pydantic_private__', None) is None: 316 pydantic_private = {} 317 for name, private_attr in self.__private_attributes__.items(): 318 default = private_attr.get_default() 319 if default is not PydanticUndefined: 320 pydantic_private[name] = default 321 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_sql_alchemy_url
- get_database_name
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
38class DuckDBCache(DuckDBConfig, CacheBase): 39 """A DuckDB cache.""" 40 41 _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)
A DuckDB cache.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.duckdb.DuckDBConfig
- db_path
- schema_name
- get_sql_alchemy_url
- get_database_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
62class MotherDuckCache(MotherDuckConfig, DuckDBCache): 63 """Cache that uses MotherDuck for external persistent storage.""" 64 65 _sql_processor_class: type[MotherDuckSqlProcessor] = PrivateAttr(default=MotherDuckSqlProcessor)
Cache that uses MotherDuck for external persistent storage.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte.caches.motherduck.MotherDuckConfig
- database
- api_key
- db_path
- get_sql_alchemy_url
- get_database_name
- airbyte._processors.sql.duckdb.DuckDBConfig
- schema_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
29class PostgresCache(PostgresConfig, CacheBase): 30 """Configuration for the Postgres cache. 31 32 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 33 """ 34 35 _sql_processor_class = PrivateAttr(default=PostgresSqlProcessor)
Configuration for the Postgres cache.
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.postgres.PostgresConfig
- host
- port
- database
- username
- password
- get_sql_alchemy_url
- get_database_name
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
32class SnowflakeCache(SnowflakeConfig, CacheBase): 33 """Configuration for the Snowflake cache.""" 34 35 dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND 36 37 _sql_processor_class = PrivateAttr(default=SnowflakeSqlProcessor)
Configuration for the Snowflake cache.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.snowflake.SnowflakeConfig
- account
- username
- password
- warehouse
- database
- role
- schema_name
- data_retention_time_in_days
- get_create_table_extra_clauses
- get_database_name
- get_sql_alchemy_url
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name