airbyte.caches.bigquery

A BigQuery implementation of the cache.

Usage Example

import airbyte as ab
from airbyte.caches import BigQueryCache

cache = BigQueryCache(
    project_name="myproject",
    dataset_name="mydataset",
    credentials_path="path/to/credentials.json",
)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""A BigQuery implementation of the cache.
 3
 4## Usage Example
 5
 6```python
 7import airbyte as ab
 8from airbyte.caches import BigQueryCache
 9
10cache = BigQueryCache(
11    project_name="myproject",
12    dataset_name="mydataset",
13    credentials_path="path/to/credentials.json",
14)
15```
16"""
17
18from __future__ import annotations
19
20from typing import NoReturn
21
22from pydantic import PrivateAttr
23
24from airbyte._processors.sql.bigquery import BigQueryConfig, BigQuerySqlProcessor
25from airbyte.caches.base import (
26    CacheBase,
27)
28from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE
29
30
31class BigQueryCache(BigQueryConfig, CacheBase):
32    """The BigQuery cache implementation."""
33
34    _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor)
35
36    def get_arrow_dataset(
37        self,
38        stream_name: str,
39        *,
40        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
41    ) -> NoReturn:
42        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
43
44        See: https://github.com/airbytehq/PyAirbyte/issues/165
45        """
46        raise NotImplementedError(
47            "BigQuery doesn't currently support to_arrow"
48            "Please consider using a different cache implementation for these functionalities."
49        )
50
51
52# Expose the Cache class and also the Config class.
53__all__ = [
54    "BigQueryCache",
55    "BigQueryConfig",
56]
32class BigQueryCache(BigQueryConfig, CacheBase):
33    """The BigQuery cache implementation."""
34
35    _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor)
36
37    def get_arrow_dataset(
38        self,
39        stream_name: str,
40        *,
41        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
42    ) -> NoReturn:
43        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
44
45        See: https://github.com/airbytehq/PyAirbyte/issues/165
46        """
47        raise NotImplementedError(
48            "BigQuery doesn't currently support to_arrow"
49            "Please consider using a different cache implementation for these functionalities."
50        )

The BigQuery cache implementation.

def get_arrow_dataset(self, stream_name: str, *, max_chunk_size: int = 100000) -> NoReturn:
37    def get_arrow_dataset(
38        self,
39        stream_name: str,
40        *,
41        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
42    ) -> NoReturn:
43        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
44
45        See: https://github.com/airbytehq/PyAirbyte/issues/165
46        """
47        raise NotImplementedError(
48            "BigQuery doesn't currently support to_arrow"
49            "Please consider using a different cache implementation for these functionalities."
50        )

Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table.

See: https://github.com/airbytehq/PyAirbyte/issues/165

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw', alias='dataset_name', alias_priority=2), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'database_name': FieldInfo(annotation=str, required=True, alias='project_name', alias_priority=2), 'credentials_path': FieldInfo(annotation=Union[str, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
streams
get_state_provider
get_state_writer
register_source
BigQueryConfig
database_name
schema_name
credentials_path
project_name
dataset_name
get_sql_alchemy_url
get_database_name
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class BigQueryConfig(airbyte.shared.sql_processor.SqlConfig):
38class BigQueryConfig(SqlConfig):
39    """Configuration for BigQuery."""
40
41    database_name: str = Field(alias="project_name")
42    """The name of the project to use. In BigQuery, this is equivalent to the database name."""
43
44    schema_name: str = Field(alias="dataset_name", default=DEFAULT_CACHE_SCHEMA_NAME)
45    """The name of the dataset to use. In BigQuery, this is equivalent to the schema name."""
46
47    credentials_path: str | None = None
48    """The path to the credentials file to use.
49    If not passed, falls back to the default inferred from the environment."""
50
51    @property
52    def project_name(self) -> str:
53        """Return the project name (alias of self.database_name)."""
54        return self.database_name
55
56    @property
57    def dataset_name(self) -> str:
58        """Return the dataset name (alias of self.schema_name)."""
59        return self.schema_name
60
61    @overrides
62    def get_sql_alchemy_url(self) -> SecretString:
63        """Return the SQLAlchemy URL to use.
64
65        We suppress warnings about unrecognized JSON type. More info on that here:
66        - https://github.com/airbytehq/PyAirbyte/issues/254
67        """
68        warnings.filterwarnings(
69            "ignore",
70            message="Did not recognize type 'JSON' of column",
71            category=sqlalchemy.exc.SAWarning,
72        )
73
74        url: URL = make_url(f"bigquery://{self.project_name!s}")
75        if self.credentials_path:
76            url = url.update_query_dict({"credentials_path": self.credentials_path})
77
78        return SecretString(url)
79
80    @overrides
81    def get_database_name(self) -> str:
82        """Return the name of the database. For BigQuery, this is the project name."""
83        return self.project_name
84
85    def get_vendor_client(self) -> bigquery.Client:
86        """Return a BigQuery python client."""
87        if self.credentials_path:
88            credentials = service_account.Credentials.from_service_account_file(
89                self.credentials_path
90            )
91        else:
92            credentials, _ = google.auth.default()
93
94        return bigquery.Client(credentials=credentials)

Configuration for BigQuery.

database_name: str

The name of the project to use. In BigQuery, this is equivalent to the database name.

schema_name: str

The name of the dataset to use. In BigQuery, this is equivalent to the schema name.

credentials_path: str | None

The path to the credentials file to use. If not passed, falls back to the default inferred from the environment.

project_name: str
51    @property
52    def project_name(self) -> str:
53        """Return the project name (alias of self.database_name)."""
54        return self.database_name

Return the project name (alias of self.database_name).

dataset_name: str
56    @property
57    def dataset_name(self) -> str:
58        """Return the dataset name (alias of self.schema_name)."""
59        return self.schema_name

Return the dataset name (alias of self.schema_name).

@overrides
def get_sql_alchemy_url(self) -> airbyte.secrets.SecretString:
61    @overrides
62    def get_sql_alchemy_url(self) -> SecretString:
63        """Return the SQLAlchemy URL to use.
64
65        We suppress warnings about unrecognized JSON type. More info on that here:
66        - https://github.com/airbytehq/PyAirbyte/issues/254
67        """
68        warnings.filterwarnings(
69            "ignore",
70            message="Did not recognize type 'JSON' of column",
71            category=sqlalchemy.exc.SAWarning,
72        )
73
74        url: URL = make_url(f"bigquery://{self.project_name!s}")
75        if self.credentials_path:
76            url = url.update_query_dict({"credentials_path": self.credentials_path})
77
78        return SecretString(url)

Return the SQLAlchemy URL to use.

We suppress warnings about unrecognized JSON type. More info on that here:

@overrides
def get_database_name(self) -> str:
80    @overrides
81    def get_database_name(self) -> str:
82        """Return the name of the database. For BigQuery, this is the project name."""
83        return self.project_name

Return the name of the database. For BigQuery, this is the project name.

def get_vendor_client(self) -> google.cloud.bigquery.client.Client:
85    def get_vendor_client(self) -> bigquery.Client:
86        """Return a BigQuery python client."""
87        if self.credentials_path:
88            credentials = service_account.Credentials.from_service_account_file(
89                self.credentials_path
90            )
91        else:
92            credentials, _ = google.auth.default()
93
94        return bigquery.Client(credentials=credentials)

Return a BigQuery python client.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw', alias='dataset_name', alias_priority=2), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'database_name': FieldInfo(annotation=str, required=True, alias='project_name', alias_priority=2), 'credentials_path': FieldInfo(annotation=Union[str, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte.shared.sql_processor.SqlConfig
table_prefix
config_hash
get_create_table_extra_clauses
get_sql_engine