airbyte.caches.postgres

A Postgres implementation of the PyAirbyte cache.

Usage Example

from airbyte as ab
from airbyte.caches import PostgresCache

cache = PostgresCache(
    host="myhost",
    port=5432,
    username="myusername",
    password=ab.get_secret("POSTGRES_PASSWORD"),
    database="mydatabase",
)
 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2"""A Postgres implementation of the PyAirbyte cache.
 3
 4## Usage Example
 5
 6```python
 7from airbyte as ab
 8from airbyte.caches import PostgresCache
 9
10cache = PostgresCache(
11    host="myhost",
12    port=5432,
13    username="myusername",
14    password=ab.get_secret("POSTGRES_PASSWORD"),
15    database="mydatabase",
16)
17```
18"""
19
20from __future__ import annotations
21
22from typing import TYPE_CHECKING, ClassVar
23
24from airbyte_api.models import DestinationPostgres
25
26from airbyte._processors.sql.postgres import PostgresConfig, PostgresSqlProcessor
27from airbyte.caches.base import CacheBase
28from airbyte.destinations._translate_cache_to_dest import (
29    postgres_cache_to_destination_configuration,
30)
31
32
33if TYPE_CHECKING:
34    from airbyte.shared.sql_processor import SqlProcessorBase
35
36
37class PostgresCache(PostgresConfig, CacheBase):
38    """Configuration for the Postgres cache.
39
40    Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
41    """
42
43    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor
44
45    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
46    paired_destination_config_class: ClassVar[type | None] = DestinationPostgres
47
48    @property
49    def paired_destination_config(self) -> DestinationPostgres:
50        """Return a dictionary of destination configuration values."""
51        return postgres_cache_to_destination_configuration(cache=self)
52
53    def clone_as_cloud_destination_config(self) -> DestinationPostgres:
54        """Return a DestinationPostgres instance with the same configuration."""
55        return DestinationPostgres(
56            host=self.host,
57            port=self.port,
58            username=self.username,
59            password=self.password,
60            database=self.database,
61        )
62
63
64# Expose the Cache class and also the Config class.
65__all__ = [
66    "PostgresCache",
67    "PostgresConfig",
68]
38class PostgresCache(PostgresConfig, CacheBase):
39    """Configuration for the Postgres cache.
40
41    Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
42    """
43
44    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = PostgresSqlProcessor
45
46    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
47    paired_destination_config_class: ClassVar[type | None] = DestinationPostgres
48
49    @property
50    def paired_destination_config(self) -> DestinationPostgres:
51        """Return a dictionary of destination configuration values."""
52        return postgres_cache_to_destination_configuration(cache=self)
53
54    def clone_as_cloud_destination_config(self) -> DestinationPostgres:
55        """Return a DestinationPostgres instance with the same configuration."""
56        return DestinationPostgres(
57            host=self.host,
58            port=self.port,
59            username=self.username,
60            password=self.password,
61            database=self.database,
62        )

Configuration for the Postgres cache.

Also inherits config from the JsonlWriter, which is responsible for writing files to disk.

paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_postgres.DestinationPostgres'>
paired_destination_config: airbyte_api.models.destination_postgres.DestinationPostgres
49    @property
50    def paired_destination_config(self) -> DestinationPostgres:
51        """Return a dictionary of destination configuration values."""
52        return postgres_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

def clone_as_cloud_destination_config(self) -> airbyte_api.models.destination_postgres.DestinationPostgres:
54    def clone_as_cloud_destination_config(self) -> DestinationPostgres:
55        """Return a DestinationPostgres instance with the same configuration."""
56        return DestinationPostgres(
57            host=self.host,
58            port=self.port,
59            username=self.username,
60            password=self.password,
61            database=self.database,
62        )

Return a DestinationPostgres instance with the same configuration.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
PostgresConfig
host
port
database
username
password
get_sql_alchemy_url
get_database_name
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
get_create_table_extra_clauses
get_sql_engine
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class PostgresConfig(airbyte.shared.sql_processor.SqlConfig):
17class PostgresConfig(SqlConfig):
18    """Configuration for the Postgres cache.
19
20    Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
21    """
22
23    host: str
24    port: int
25    database: str
26    username: str
27    password: SecretString | str
28
29    @overrides
30    def get_sql_alchemy_url(self) -> SecretString:
31        """Return the SQLAlchemy URL to use."""
32        return SecretString(
33            f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
34        )
35
36    @overrides
37    def get_database_name(self) -> str:
38        """Return the name of the database."""
39        return self.database

Configuration for the Postgres cache.

Also inherits config from the JsonlWriter, which is responsible for writing files to disk.

host: str
port: int
database: str
username: str
@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
29    @overrides
30    def get_sql_alchemy_url(self) -> SecretString:
31        """Return the SQLAlchemy URL to use."""
32        return SecretString(
33            f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
34        )

Return the SQLAlchemy URL to use.

@overrides
def get_database_name(self) -> str:
36    @overrides
37    def get_database_name(self) -> str:
38        """Return the name of the database."""
39        return self.database

Return the name of the database.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
config_hash
get_create_table_extra_clauses
get_sql_engine
get_vendor_client