airbyte.caches.postgres
A Postgres implementation of the PyAirbyte cache.
Usage Example
from airbyte as ab
from airbyte.caches import PostgresCache
cache = PostgresCache(
host="myhost",
port=5432,
username="myusername",
password=ab.get_secret("POSTGRES_PASSWORD"),
database="mydatabase",
)
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""A Postgres implementation of the PyAirbyte cache. 3 4## Usage Example 5 6```python 7from airbyte as ab 8from airbyte.caches import PostgresCache 9 10cache = PostgresCache( 11 host="myhost", 12 port=5432, 13 username="myusername", 14 password=ab.get_secret("POSTGRES_PASSWORD"), 15 database="mydatabase", 16) 17``` 18""" 19 20from __future__ import annotations 21 22from typing import TYPE_CHECKING, ClassVar 23 24from airbyte_api.models import DestinationPostgres 25 26from airbyte._processors.sql.postgres import PostgresConfig, PostgresSqlProcessor 27from airbyte.caches.base import CacheBase 28from airbyte.destinations._translate_cache_to_dest import ( 29 postgres_cache_to_destination_configuration, 30) 31 32 33if TYPE_CHECKING: 34 from airbyte.shared.sql_processor import SqlProcessorBase 35 36 37class PostgresCache(PostgresConfig, CacheBase): 38 """Configuration for the Postgres cache. 39 40 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 41 """ 42 43 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor 44 45 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 46 paired_destination_config_class: ClassVar[type | None] = DestinationPostgres 47 48 @property 49 def paired_destination_config(self) -> DestinationPostgres: 50 """Return a dictionary of destination configuration values.""" 51 return postgres_cache_to_destination_configuration(cache=self) 52 53 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 54 """Return a DestinationPostgres instance with the same configuration.""" 55 return DestinationPostgres( 56 host=self.host, 57 port=self.port, 58 username=self.username, 59 password=self.password, 60 database=self.database, 61 ) 62 63 64# Expose the Cache class and also the Config class. 65__all__ = [ 66 "PostgresCache", 67 "PostgresConfig", 68]
38class PostgresCache(PostgresConfig, CacheBase): 39 """Configuration for the Postgres cache. 40 41 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 42 """ 43 44 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor 45 46 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 47 paired_destination_config_class: ClassVar[type | None] = DestinationPostgres 48 49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self) 53 54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Configuration for the Postgres cache.
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Return a DestinationPostgres instance with the same configuration.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
17class PostgresConfig(SqlConfig): 18 """Configuration for the Postgres cache. 19 20 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 21 """ 22 23 host: str 24 port: int 25 database: str 26 username: str 27 password: SecretString | str 28 29 @overrides 30 def get_sql_alchemy_url(self) -> SecretString: 31 """Return the SQLAlchemy URL to use.""" 32 return SecretString( 33 f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}" 34 ) 35 36 @overrides 37 def get_database_name(self) -> str: 38 """Return the name of the database.""" 39 return self.database
Configuration for the Postgres cache.
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
29 @overrides 30 def get_sql_alchemy_url(self) -> SecretString: 31 """Return the SQLAlchemy URL to use.""" 32 return SecretString( 33 f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}" 34 )
Return the SQLAlchemy URL to use.
36 @overrides 37 def get_database_name(self) -> str: 38 """Return the name of the database.""" 39 return self.database
Return the name of the database.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte.shared.sql_processor.SqlConfig
- schema_name
- table_prefix
- config_hash
- get_create_table_extra_clauses
- get_sql_engine
- get_vendor_client