airbyte.caches.bigquery

A BigQuery implementation of the cache.

Usage Example

import airbyte as ab
from airbyte.caches import BigQueryCache

cache = BigQueryCache(
    project_name="myproject",
    dataset_name="mydataset",
    credentials_path="path/to/credentials.json",
)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""A BigQuery implementation of the cache.
 3
 4## Usage Example
 5
 6```python
 7import airbyte as ab
 8from airbyte.caches import BigQueryCache
 9
10cache = BigQueryCache(
11    project_name="myproject",
12    dataset_name="mydataset",
13    credentials_path="path/to/credentials.json",
14)
15```
16"""
17
18from __future__ import annotations
19
20from typing import TYPE_CHECKING, ClassVar, NoReturn
21
22from airbyte_api.models import DestinationBigquery
23
24from airbyte._processors.sql.bigquery import BigQueryConfig, BigQuerySqlProcessor
25from airbyte.caches.base import (
26    CacheBase,
27)
28from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE
29from airbyte.destinations._translate_cache_to_dest import (
30    bigquery_cache_to_destination_configuration,
31)
32
33
34if TYPE_CHECKING:
35    from airbyte.shared.sql_processor import SqlProcessorBase
36
37
38class BigQueryCache(BigQueryConfig, CacheBase):
39    """The BigQuery cache implementation."""
40
41    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor
42
43    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
44    paired_destination_config_class: ClassVar[type | None] = DestinationBigquery
45
46    @property
47    def paired_destination_config(self) -> DestinationBigquery:
48        """Return a dictionary of destination configuration values."""
49        return bigquery_cache_to_destination_configuration(cache=self)
50
51    def get_arrow_dataset(
52        self,
53        stream_name: str,
54        *,
55        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
56    ) -> NoReturn:
57        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
58
59        See: https://github.com/airbytehq/PyAirbyte/issues/165
60        """
61        raise NotImplementedError(
62            "BigQuery doesn't currently support to_arrow"
63            "Please consider using a different cache implementation for these functionalities."
64        )
65
66
67# Expose the Cache class and also the Config class.
68__all__ = [
69    "BigQueryCache",
70    "BigQueryConfig",
71]
39class BigQueryCache(BigQueryConfig, CacheBase):
40    """The BigQuery cache implementation."""
41
42    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor
43
44    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
45    paired_destination_config_class: ClassVar[type | None] = DestinationBigquery
46
47    @property
48    def paired_destination_config(self) -> DestinationBigquery:
49        """Return a dictionary of destination configuration values."""
50        return bigquery_cache_to_destination_configuration(cache=self)
51
52    def get_arrow_dataset(
53        self,
54        stream_name: str,
55        *,
56        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
57    ) -> NoReturn:
58        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
59
60        See: https://github.com/airbytehq/PyAirbyte/issues/165
61        """
62        raise NotImplementedError(
63            "BigQuery doesn't currently support to_arrow"
64            "Please consider using a different cache implementation for these functionalities."
65        )

The BigQuery cache implementation.

paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_bigquery.DestinationBigquery'>
paired_destination_config: airbyte_api.models.destination_bigquery.DestinationBigquery
47    @property
48    def paired_destination_config(self) -> DestinationBigquery:
49        """Return a dictionary of destination configuration values."""
50        return bigquery_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

def get_arrow_dataset(self, stream_name: str, *, max_chunk_size: int = 100000) -> NoReturn:
52    def get_arrow_dataset(
53        self,
54        stream_name: str,
55        *,
56        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
57    ) -> NoReturn:
58        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
59
60        See: https://github.com/airbytehq/PyAirbyte/issues/165
61        """
62        raise NotImplementedError(
63            "BigQuery doesn't currently support to_arrow"
64            "Please consider using a different cache implementation for these functionalities."
65        )

Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table.

See: https://github.com/airbytehq/PyAirbyte/issues/165

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
streams
get_state_provider
get_state_writer
register_source
BigQueryConfig
database_name
schema_name
credentials_path
project_name
dataset_name
get_sql_alchemy_url
get_database_name
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class BigQueryConfig(airbyte.shared.sql_processor.SqlConfig):
38class BigQueryConfig(SqlConfig):
39    """Configuration for BigQuery."""
40
41    database_name: str = Field(alias="project_name")
42    """The name of the project to use. In BigQuery, this is equivalent to the database name."""
43
44    schema_name: str = Field(alias="dataset_name", default=DEFAULT_CACHE_SCHEMA_NAME)
45    """The name of the dataset to use. In BigQuery, this is equivalent to the schema name."""
46
47    credentials_path: str | None = None
48    """The path to the credentials file to use.
49    If not passed, falls back to the default inferred from the environment."""
50
51    @property
52    def project_name(self) -> str:
53        """Return the project name (alias of self.database_name)."""
54        return self.database_name
55
56    @property
57    def dataset_name(self) -> str:
58        """Return the dataset name (alias of self.schema_name)."""
59        return self.schema_name
60
61    @overrides
62    def get_sql_alchemy_url(self) -> SecretString:
63        """Return the SQLAlchemy URL to use.
64
65        We suppress warnings about unrecognized JSON type. More info on that here:
66        - https://github.com/airbytehq/PyAirbyte/issues/254
67        """
68        warnings.filterwarnings(
69            "ignore",
70            message="Did not recognize type 'JSON' of column",
71            category=sqlalchemy.exc.SAWarning,
72        )
73
74        url: URL = make_url(f"bigquery://{self.project_name!s}")
75        if self.credentials_path:
76            url = url.update_query_dict({"credentials_path": self.credentials_path})
77
78        return SecretString(url)
79
80    @overrides
81    def get_database_name(self) -> str:
82        """Return the name of the database. For BigQuery, this is the project name."""
83        return self.project_name
84
85    def get_vendor_client(self) -> bigquery.Client:
86        """Return a BigQuery python client."""
87        if self.credentials_path:
88            credentials = service_account.Credentials.from_service_account_file(
89                self.credentials_path
90            )
91        else:
92            credentials, _ = google.auth.default()
93
94        return bigquery.Client(credentials=credentials)

Configuration for BigQuery.

database_name: str

The name of the project to use. In BigQuery, this is equivalent to the database name.

schema_name: str

The name of the dataset to use. In BigQuery, this is equivalent to the schema name.

credentials_path: str | None

The path to the credentials file to use. If not passed, falls back to the default inferred from the environment.

project_name: str
51    @property
52    def project_name(self) -> str:
53        """Return the project name (alias of self.database_name)."""
54        return self.database_name

Return the project name (alias of self.database_name).

dataset_name: str
56    @property
57    def dataset_name(self) -> str:
58        """Return the dataset name (alias of self.schema_name)."""
59        return self.schema_name

Return the dataset name (alias of self.schema_name).

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
61    @overrides
62    def get_sql_alchemy_url(self) -> SecretString:
63        """Return the SQLAlchemy URL to use.
64
65        We suppress warnings about unrecognized JSON type. More info on that here:
66        - https://github.com/airbytehq/PyAirbyte/issues/254
67        """
68        warnings.filterwarnings(
69            "ignore",
70            message="Did not recognize type 'JSON' of column",
71            category=sqlalchemy.exc.SAWarning,
72        )
73
74        url: URL = make_url(f"bigquery://{self.project_name!s}")
75        if self.credentials_path:
76            url = url.update_query_dict({"credentials_path": self.credentials_path})
77
78        return SecretString(url)

Return the SQLAlchemy URL to use.

We suppress warnings about unrecognized JSON type. More info on that here:

@overrides
def get_database_name(self) -> str:
80    @overrides
81    def get_database_name(self) -> str:
82        """Return the name of the database. For BigQuery, this is the project name."""
83        return self.project_name

Return the name of the database. For BigQuery, this is the project name.

def get_vendor_client(self) -> google.cloud.bigquery.client.Client:
85    def get_vendor_client(self) -> bigquery.Client:
86        """Return a BigQuery python client."""
87        if self.credentials_path:
88            credentials = service_account.Credentials.from_service_account_file(
89                self.credentials_path
90            )
91        else:
92            credentials, _ = google.auth.default()
93
94        return bigquery.Client(credentials=credentials)

Return a BigQuery python client.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_create_table_extra_clauses
get_sql_engine