airbyte.caches.bigquery
A BigQuery implementation of the cache.
Usage Example
import airbyte as ab
from airbyte.caches import BigQueryCache
cache = BigQueryCache(
project_name="myproject",
dataset_name="mydataset",
credentials_path="path/to/credentials.json",
)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""A BigQuery implementation of the cache. 3 4## Usage Example 5 6```python 7import airbyte as ab 8from airbyte.caches import BigQueryCache 9 10cache = BigQueryCache( 11 project_name="myproject", 12 dataset_name="mydataset", 13 credentials_path="path/to/credentials.json", 14) 15``` 16""" 17 18from __future__ import annotations 19 20from typing import TYPE_CHECKING, ClassVar, NoReturn 21 22from airbyte_api.models import DestinationBigquery 23 24from airbyte._processors.sql.bigquery import BigQueryConfig, BigQuerySqlProcessor 25from airbyte.caches.base import ( 26 CacheBase, 27) 28from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE 29from airbyte.destinations._translate_cache_to_dest import ( 30 bigquery_cache_to_destination_configuration, 31) 32 33 34if TYPE_CHECKING: 35 from airbyte.shared.sql_processor import SqlProcessorBase 36 37 38class BigQueryCache(BigQueryConfig, CacheBase): 39 """The BigQuery cache implementation.""" 40 41 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor 42 43 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 44 paired_destination_config_class: ClassVar[type | None] = DestinationBigquery 45 46 @property 47 def paired_destination_config(self) -> DestinationBigquery: 48 """Return a dictionary of destination configuration values.""" 49 return bigquery_cache_to_destination_configuration(cache=self) 50 51 def get_arrow_dataset( 52 self, 53 stream_name: str, 54 *, 55 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 56 ) -> NoReturn: 57 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 58 59 See: https://github.com/airbytehq/PyAirbyte/issues/165 60 """ 61 raise NotImplementedError( 62 "BigQuery doesn't currently support to_arrow" 63 "Please consider using a different cache implementation for these functionalities." 64 ) 65 66 67# Expose the Cache class and also the Config class. 68__all__ = [ 69 "BigQueryCache", 70 "BigQueryConfig", 71]
39class BigQueryCache(BigQueryConfig, CacheBase): 40 """The BigQuery cache implementation.""" 41 42 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor 43 44 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 45 paired_destination_config_class: ClassVar[type | None] = DestinationBigquery 46 47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self) 51 52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
The BigQuery cache implementation.
47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table
.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
122 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 123 """We need to both initialize private attributes and call the user-defined model_post_init 124 method. 125 """ 126 init_private_attributes(self, context) 127 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- BigQueryConfig
- database_name
- schema_name
- credentials_path
- project_name
- dataset_name
- get_sql_alchemy_url
- get_database_name
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte._writers.base.AirbyteWriterInterface
- name
38class BigQueryConfig(SqlConfig): 39 """Configuration for BigQuery.""" 40 41 database_name: str = Field(alias="project_name") 42 """The name of the project to use. In BigQuery, this is equivalent to the database name.""" 43 44 schema_name: str = Field(alias="dataset_name", default=DEFAULT_CACHE_SCHEMA_NAME) 45 """The name of the dataset to use. In BigQuery, this is equivalent to the schema name.""" 46 47 credentials_path: str | None = None 48 """The path to the credentials file to use. 49 If not passed, falls back to the default inferred from the environment.""" 50 51 @property 52 def project_name(self) -> str: 53 """Return the project name (alias of self.database_name).""" 54 return self.database_name 55 56 @property 57 def dataset_name(self) -> str: 58 """Return the dataset name (alias of self.schema_name).""" 59 return self.schema_name 60 61 @overrides 62 def get_sql_alchemy_url(self) -> SecretString: 63 """Return the SQLAlchemy URL to use. 64 65 We suppress warnings about unrecognized JSON type. More info on that here: 66 - https://github.com/airbytehq/PyAirbyte/issues/254 67 """ 68 warnings.filterwarnings( 69 "ignore", 70 message="Did not recognize type 'JSON' of column", 71 category=sqlalchemy.exc.SAWarning, 72 ) 73 74 url: URL = make_url(f"bigquery://{self.project_name!s}") 75 if self.credentials_path: 76 url = url.update_query_dict({"credentials_path": self.credentials_path}) 77 78 return SecretString(url) 79 80 @overrides 81 def get_database_name(self) -> str: 82 """Return the name of the database. For BigQuery, this is the project name.""" 83 return self.project_name 84 85 def get_vendor_client(self) -> bigquery.Client: 86 """Return a BigQuery python client.""" 87 if self.credentials_path: 88 credentials = service_account.Credentials.from_service_account_file( 89 self.credentials_path 90 ) 91 else: 92 credentials, _ = google.auth.default() 93 94 return bigquery.Client(credentials=credentials)
Configuration for BigQuery.
The name of the project to use. In BigQuery, this is equivalent to the database name.
The name of the dataset to use. In BigQuery, this is equivalent to the schema name.
The path to the credentials file to use. If not passed, falls back to the default inferred from the environment.
51 @property 52 def project_name(self) -> str: 53 """Return the project name (alias of self.database_name).""" 54 return self.database_name
Return the project name (alias of self.database_name).
56 @property 57 def dataset_name(self) -> str: 58 """Return the dataset name (alias of self.schema_name).""" 59 return self.schema_name
Return the dataset name (alias of self.schema_name).
61 @overrides 62 def get_sql_alchemy_url(self) -> SecretString: 63 """Return the SQLAlchemy URL to use. 64 65 We suppress warnings about unrecognized JSON type. More info on that here: 66 - https://github.com/airbytehq/PyAirbyte/issues/254 67 """ 68 warnings.filterwarnings( 69 "ignore", 70 message="Did not recognize type 'JSON' of column", 71 category=sqlalchemy.exc.SAWarning, 72 ) 73 74 url: URL = make_url(f"bigquery://{self.project_name!s}") 75 if self.credentials_path: 76 url = url.update_query_dict({"credentials_path": self.credentials_path}) 77 78 return SecretString(url)
Return the SQLAlchemy URL to use.
We suppress warnings about unrecognized JSON type. More info on that here:
80 @overrides 81 def get_database_name(self) -> str: 82 """Return the name of the database. For BigQuery, this is the project name.""" 83 return self.project_name
Return the name of the database. For BigQuery, this is the project name.
85 def get_vendor_client(self) -> bigquery.Client: 86 """Return a BigQuery python client.""" 87 if self.credentials_path: 88 credentials = service_account.Credentials.from_service_account_file( 89 self.credentials_path 90 ) 91 else: 92 credentials, _ = google.auth.default() 93 94 return bigquery.Client(credentials=credentials)
Return a BigQuery python client.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- config_hash
- get_create_table_extra_clauses
- get_sql_engine