airbyte.caches.bigquery

A BigQuery implementation of the cache.

Usage Example

import airbyte as ab
from airbyte.caches import BigQueryCache

cache = BigQueryCache(
    project_name="myproject",
    dataset_name="mydataset",
    credentials_path="path/to/credentials.json",
)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""A BigQuery implementation of the cache.
 3
 4## Usage Example
 5
 6```python
 7import airbyte as ab
 8from airbyte.caches import BigQueryCache
 9
10cache = BigQueryCache(
11    project_name="myproject",
12    dataset_name="mydataset",
13    credentials_path="path/to/credentials.json",
14)
15```
16"""
17
18from __future__ import annotations
19
20from typing import TYPE_CHECKING, ClassVar, NoReturn
21
22from airbyte_api.models import DestinationBigquery
23
24from airbyte._processors.sql.bigquery import BigQueryConfig, BigQuerySqlProcessor
25from airbyte.caches.base import (
26    CacheBase,
27)
28from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE
29from airbyte.destinations._translate_cache_to_dest import (
30    bigquery_cache_to_destination_configuration,
31)
32
33
34if TYPE_CHECKING:
35    from airbyte.shared.sql_processor import SqlProcessorBase
36
37
38class BigQueryCache(BigQueryConfig, CacheBase):
39    """The BigQuery cache implementation."""
40
41    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor
42
43    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
44    paired_destination_config_class: ClassVar[type | None] = DestinationBigquery
45
46    @property
47    def paired_destination_config(self) -> DestinationBigquery:
48        """Return a dictionary of destination configuration values."""
49        return bigquery_cache_to_destination_configuration(cache=self)
50
51    def get_arrow_dataset(
52        self,
53        stream_name: str,
54        *,
55        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
56    ) -> NoReturn:
57        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
58
59        See: https://github.com/airbytehq/PyAirbyte/issues/165
60        """
61        raise NotImplementedError(
62            "BigQuery doesn't currently support to_arrow"
63            "Please consider using a different cache implementation for these functionalities."
64        )
65
66
67# Expose the Cache class and also the Config class.
68__all__ = [
69    "BigQueryCache",
70    "BigQueryConfig",
71]
39class BigQueryCache(BigQueryConfig, CacheBase):
40    """The BigQuery cache implementation."""
41
42    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor
43
44    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
45    paired_destination_config_class: ClassVar[type | None] = DestinationBigquery
46
47    @property
48    def paired_destination_config(self) -> DestinationBigquery:
49        """Return a dictionary of destination configuration values."""
50        return bigquery_cache_to_destination_configuration(cache=self)
51
52    def get_arrow_dataset(
53        self,
54        stream_name: str,
55        *,
56        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
57    ) -> NoReturn:
58        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
59
60        See: https://github.com/airbytehq/PyAirbyte/issues/165
61        """
62        raise NotImplementedError(
63            "BigQuery doesn't currently support to_arrow"
64            "Please consider using a different cache implementation for these functionalities."
65        )

The BigQuery cache implementation.

paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_bigquery.DestinationBigquery'>
paired_destination_config: airbyte_api.models.destination_bigquery.DestinationBigquery
47    @property
48    def paired_destination_config(self) -> DestinationBigquery:
49        """Return a dictionary of destination configuration values."""
50        return bigquery_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

def get_arrow_dataset(self, stream_name: str, *, max_chunk_size: int = 100000) -> NoReturn:
52    def get_arrow_dataset(
53        self,
54        stream_name: str,
55        *,
56        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
57    ) -> NoReturn:
58        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
59
60        See: https://github.com/airbytehq/PyAirbyte/issues/165
61        """
62        raise NotImplementedError(
63            "BigQuery doesn't currently support to_arrow"
64            "Please consider using a different cache implementation for these functionalities."
65        )

Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table.

See: https://github.com/airbytehq/PyAirbyte/issues/165

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
close
config_hash
execute_sql
processor
run_sql_query
get_record_processor
get_records
get_pandas_dataframe
streams
get_state_provider
get_state_writer
register_source
create_source_tables
BigQueryConfig
database_name
schema_name
credentials_path
dataset_location
project_name
dataset_name
get_sql_alchemy_url
get_database_name
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_sql_alchemy_connect_args
get_sql_engine
dispose_engine
pydantic.main.BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class BigQueryConfig(airbyte.shared.sql_processor.SqlConfig):
 40class BigQueryConfig(SqlConfig):
 41    """Configuration for BigQuery."""
 42
 43    database_name: str = Field(alias="project_name")
 44    """The name of the project to use. In BigQuery, this is equivalent to the database name."""
 45
 46    schema_name: str = Field(alias="dataset_name", default=DEFAULT_CACHE_SCHEMA_NAME)
 47    """The name of the dataset to use. In BigQuery, this is equivalent to the schema name."""
 48
 49    credentials_path: str | None = None
 50    """The path to the credentials file to use.
 51    If not passed, falls back to the default inferred from the environment."""
 52
 53    dataset_location: str = "US"
 54    """The geographic location of the BigQuery dataset (e.g., 'US', 'EU', etc.).
 55    Defaults to 'US'. See: https://cloud.google.com/bigquery/docs/locations"""
 56
 57    @property
 58    def project_name(self) -> str:
 59        """Return the project name (alias of self.database_name)."""
 60        return self.database_name
 61
 62    @property
 63    def dataset_name(self) -> str:
 64        """Return the dataset name (alias of self.schema_name)."""
 65        return self.schema_name
 66
 67    @overrides
 68    def get_sql_alchemy_url(self) -> SecretString:
 69        """Return the SQLAlchemy URL to use.
 70
 71        We suppress warnings about unrecognized JSON type. More info on that here:
 72        - https://github.com/airbytehq/PyAirbyte/issues/254
 73        """
 74        warnings.filterwarnings(
 75            "ignore",
 76            message="Did not recognize type 'JSON' of column",
 77            category=sqlalchemy.exc.SAWarning,
 78        )
 79
 80        url: URL = make_url(f"bigquery://{self.project_name!s}")
 81        if self.credentials_path:
 82            url = url.update_query_dict({"credentials_path": self.credentials_path})
 83
 84        return SecretString(url)
 85
 86    @overrides
 87    def get_database_name(self) -> str:
 88        """Return the name of the database. For BigQuery, this is the project name."""
 89        return self.project_name
 90
 91    def get_vendor_client(self) -> bigquery.Client:
 92        """Return a BigQuery python client."""
 93        if self.credentials_path:
 94            credentials = service_account.Credentials.from_service_account_file(
 95                self.credentials_path
 96            )
 97        else:
 98            credentials, _ = google.auth.default()
 99
100        return bigquery.Client(credentials=credentials, location=self.dataset_location)

Configuration for BigQuery.

database_name: str

The name of the project to use. In BigQuery, this is equivalent to the database name.

schema_name: str

The name of the dataset to use. In BigQuery, this is equivalent to the schema name.

credentials_path: str | None

The path to the credentials file to use. If not passed, falls back to the default inferred from the environment.

dataset_location: str

The geographic location of the BigQuery dataset (e.g., 'US', 'EU', etc.). Defaults to 'US'. See: https://cloud.google.com/bigquery/docs/locations

project_name: str
57    @property
58    def project_name(self) -> str:
59        """Return the project name (alias of self.database_name)."""
60        return self.database_name

Return the project name (alias of self.database_name).

dataset_name: str
62    @property
63    def dataset_name(self) -> str:
64        """Return the dataset name (alias of self.schema_name)."""
65        return self.schema_name

Return the dataset name (alias of self.schema_name).

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
67    @overrides
68    def get_sql_alchemy_url(self) -> SecretString:
69        """Return the SQLAlchemy URL to use.
70
71        We suppress warnings about unrecognized JSON type. More info on that here:
72        - https://github.com/airbytehq/PyAirbyte/issues/254
73        """
74        warnings.filterwarnings(
75            "ignore",
76            message="Did not recognize type 'JSON' of column",
77            category=sqlalchemy.exc.SAWarning,
78        )
79
80        url: URL = make_url(f"bigquery://{self.project_name!s}")
81        if self.credentials_path:
82            url = url.update_query_dict({"credentials_path": self.credentials_path})
83
84        return SecretString(url)

Return the SQLAlchemy URL to use.

We suppress warnings about unrecognized JSON type. More info on that here:

@overrides
def get_database_name(self) -> str:
86    @overrides
87    def get_database_name(self) -> str:
88        """Return the name of the database. For BigQuery, this is the project name."""
89        return self.project_name

Return the name of the database. For BigQuery, this is the project name.

def get_vendor_client(self) -> google.cloud.bigquery.client.Client:
 91    def get_vendor_client(self) -> bigquery.Client:
 92        """Return a BigQuery python client."""
 93        if self.credentials_path:
 94            credentials = service_account.Credentials.from_service_account_file(
 95                self.credentials_path
 96            )
 97        else:
 98            credentials, _ = google.auth.default()
 99
100        return bigquery.Client(credentials=credentials, location=self.dataset_location)

Return a BigQuery python client.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_create_table_extra_clauses
get_sql_alchemy_connect_args
get_sql_engine
dispose_engine