airbyte.caches.postgres
A Postgres implementation of the PyAirbyte cache.
Usage Example
from airbyte as ab
from airbyte.caches import PostgresCache
cache = PostgresCache(
host="myhost",
port=5432,
username="myusername",
password=ab.get_secret("POSTGRES_PASSWORD"),
database="mydatabase",
)
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""A Postgres implementation of the PyAirbyte cache. 3 4## Usage Example 5 6```python 7from airbyte as ab 8from airbyte.caches import PostgresCache 9 10cache = PostgresCache( 11 host="myhost", 12 port=5432, 13 username="myusername", 14 password=ab.get_secret("POSTGRES_PASSWORD"), 15 database="mydatabase", 16) 17``` 18""" 19 20from __future__ import annotations 21 22from typing import TYPE_CHECKING, ClassVar 23 24from airbyte_api.models import DestinationPostgres 25 26from airbyte._processors.sql.postgres import PostgresConfig, PostgresSqlProcessor 27from airbyte.caches.base import CacheBase 28from airbyte.destinations._translate_cache_to_dest import ( 29 postgres_cache_to_destination_configuration, 30) 31 32 33if TYPE_CHECKING: 34 from airbyte.shared.sql_processor import SqlProcessorBase 35 36 37class PostgresCache(PostgresConfig, CacheBase): 38 """Configuration for the Postgres cache. 39 40 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 41 """ 42 43 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor 44 45 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 46 paired_destination_config_class: ClassVar[type | None] = DestinationPostgres 47 48 @property 49 def paired_destination_config(self) -> DestinationPostgres: 50 """Return a dictionary of destination configuration values.""" 51 return postgres_cache_to_destination_configuration(cache=self) 52 53 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 54 """Return a DestinationPostgres instance with the same configuration.""" 55 return DestinationPostgres( 56 host=self.host, 57 port=self.port, 58 username=self.username, 59 password=self.password, 60 database=self.database, 61 ) 62 63 64# Expose the Cache class and also the Config class. 65__all__ = [ 66 "PostgresCache", 67 "PostgresConfig", 68]
38class PostgresCache(PostgresConfig, CacheBase): 39 """Configuration for the Postgres cache. 40 41 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 42 """ 43 44 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor 45 46 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 47 paired_destination_config_class: ClassVar[type | None] = DestinationPostgres 48 49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self) 53 54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Configuration for the Postgres cache.
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
paired_destination_config_class: ClassVar[type | None] =
<class 'airbyte_api.models.destination_postgres.DestinationPostgres'>
paired_destination_config: airbyte_api.models.destination_postgres.DestinationPostgres
49 @property 50 def paired_destination_config(self) -> DestinationPostgres: 51 """Return a dictionary of destination configuration values.""" 52 return postgres_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
def
clone_as_cloud_destination_config(self) -> airbyte_api.models.destination_postgres.DestinationPostgres:
54 def clone_as_cloud_destination_config(self) -> DestinationPostgres: 55 """Return a DestinationPostgres instance with the same configuration.""" 56 return DestinationPostgres( 57 host=self.host, 58 port=self.port, 59 username=self.username, 60 password=self.password, 61 database=self.database, 62 )
Return a DestinationPostgres instance with the same configuration.
class
PostgresConfig(airbyte.shared.sql_processor.SqlConfig):
17class PostgresConfig(SqlConfig): 18 """Configuration for the Postgres cache. 19 20 Also inherits config from the JsonlWriter, which is responsible for writing files to disk. 21 """ 22 23 host: str 24 port: int 25 database: str 26 username: str 27 password: SecretString | str 28 29 @overrides 30 def get_sql_alchemy_url(self) -> SecretString: 31 """Return the SQLAlchemy URL to use.""" 32 return SecretString( 33 f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}" 34 ) 35 36 @overrides 37 def get_database_name(self) -> str: 38 """Return the name of the database.""" 39 return self.database
Configuration for the Postgres cache.
Also inherits config from the JsonlWriter, which is responsible for writing files to disk.