airbyte.caches.postgres

A Postgres implementation of the PyAirbyte cache.

Usage Example

from airbyte as ab
from airbyte.caches import PostgresCache

cache = PostgresCache(
    host="myhost",
    port=5432,
    username="myusername",
    password=ab.get_secret("POSTGRES_PASSWORD"),
    database="mydatabase",
)
 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2"""A Postgres implementation of the PyAirbyte cache.
 3
 4## Usage Example
 5
 6```python
 7from airbyte as ab
 8from airbyte.caches import PostgresCache
 9
10cache = PostgresCache(
11    host="myhost",
12    port=5432,
13    username="myusername",
14    password=ab.get_secret("POSTGRES_PASSWORD"),
15    database="mydatabase",
16)
17```
18"""
19
20from __future__ import annotations
21
22from pydantic import PrivateAttr
23
24from airbyte._processors.sql.postgres import PostgresConfig, PostgresSqlProcessor
25from airbyte.caches.base import CacheBase
26
27
28class PostgresCache(PostgresConfig, CacheBase):
29    """Configuration for the Postgres cache.
30
31    Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
32    """
33
34    _sql_processor_class = PrivateAttr(default=PostgresSqlProcessor)
35
36
37# Expose the Cache class and also the Config class.
38__all__ = [
39    "PostgresCache",
40    "PostgresConfig",
41]
29class PostgresCache(PostgresConfig, CacheBase):
30    """Configuration for the Postgres cache.
31
32    Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
33    """
34
35    _sql_processor_class = PrivateAttr(default=PostgresSqlProcessor)

Configuration for the Postgres cache.

Also inherits config from the JsonlWriter, which is responsible for writing files to disk.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'host': FieldInfo(annotation=str, required=True), 'port': FieldInfo(annotation=int, required=True), 'database': FieldInfo(annotation=str, required=True), 'username': FieldInfo(annotation=str, required=True), 'password': FieldInfo(annotation=Union[SecretString, str], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
PostgresConfig
host
port
database
username
password
get_sql_alchemy_url
get_database_name
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
get_create_table_extra_clauses
get_sql_engine
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class PostgresConfig(airbyte.shared.sql_processor.SqlConfig):
17class PostgresConfig(SqlConfig):
18    """Configuration for the Postgres cache.
19
20    Also inherits config from the JsonlWriter, which is responsible for writing files to disk.
21    """
22
23    host: str
24    port: int
25    database: str
26    username: str
27    password: SecretString | str
28
29    @overrides
30    def get_sql_alchemy_url(self) -> SecretString:
31        """Return the SQLAlchemy URL to use."""
32        return SecretString(
33            f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
34        )
35
36    @overrides
37    def get_database_name(self) -> str:
38        """Return the name of the database."""
39        return self.database

Configuration for the Postgres cache.

Also inherits config from the JsonlWriter, which is responsible for writing files to disk.

host: str
port: int
database: str
username: str
@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
29    @overrides
30    def get_sql_alchemy_url(self) -> SecretString:
31        """Return the SQLAlchemy URL to use."""
32        return SecretString(
33            f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
34        )

Return the SQLAlchemy URL to use.

@overrides
def get_database_name(self) -> str:
36    @overrides
37    def get_database_name(self) -> str:
38        """Return the name of the database."""
39        return self.database

Return the name of the database.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'host': FieldInfo(annotation=str, required=True), 'port': FieldInfo(annotation=int, required=True), 'database': FieldInfo(annotation=str, required=True), 'username': FieldInfo(annotation=str, required=True), 'password': FieldInfo(annotation=Union[SecretString, str], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte.shared.sql_processor.SqlConfig
schema_name
table_prefix
config_hash
get_create_table_extra_clauses
get_sql_engine
get_vendor_client