airbyte.caches.bigquery
A BigQuery implementation of the cache.
Usage Example
import airbyte as ab
from airbyte.caches import BigQueryCache
cache = BigQueryCache(
project_name="myproject",
dataset_name="mydataset",
credentials_path="path/to/credentials.json",
)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""A BigQuery implementation of the cache. 3 4## Usage Example 5 6```python 7import airbyte as ab 8from airbyte.caches import BigQueryCache 9 10cache = BigQueryCache( 11 project_name="myproject", 12 dataset_name="mydataset", 13 credentials_path="path/to/credentials.json", 14) 15``` 16""" 17 18from __future__ import annotations 19 20from typing import NoReturn 21 22from pydantic import PrivateAttr 23 24from airbyte._processors.sql.bigquery import BigQueryConfig, BigQuerySqlProcessor 25from airbyte.caches.base import ( 26 CacheBase, 27) 28from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE 29 30 31class BigQueryCache(BigQueryConfig, CacheBase): 32 """The BigQuery cache implementation.""" 33 34 _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor) 35 36 def get_arrow_dataset( 37 self, 38 stream_name: str, 39 *, 40 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 41 ) -> NoReturn: 42 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 43 44 See: https://github.com/airbytehq/PyAirbyte/issues/165 45 """ 46 raise NotImplementedError( 47 "BigQuery doesn't currently support to_arrow" 48 "Please consider using a different cache implementation for these functionalities." 49 ) 50 51 52# Expose the Cache class and also the Config class. 53__all__ = [ 54 "BigQueryCache", 55 "BigQueryConfig", 56]
32class BigQueryCache(BigQueryConfig, CacheBase): 33 """The BigQuery cache implementation.""" 34 35 _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor) 36 37 def get_arrow_dataset( 38 self, 39 stream_name: str, 40 *, 41 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 42 ) -> NoReturn: 43 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 44 45 See: https://github.com/airbytehq/PyAirbyte/issues/165 46 """ 47 raise NotImplementedError( 48 "BigQuery doesn't currently support to_arrow" 49 "Please consider using a different cache implementation for these functionalities." 50 )
The BigQuery cache implementation.
37 def get_arrow_dataset( 38 self, 39 stream_name: str, 40 *, 41 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 42 ) -> NoReturn: 43 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 44 45 See: https://github.com/airbytehq/PyAirbyte/issues/165 46 """ 47 raise NotImplementedError( 48 "BigQuery doesn't currently support to_arrow" 49 "Please consider using a different cache implementation for these functionalities." 50 )
Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table
.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- BigQueryConfig
- database_name
- schema_name
- credentials_path
- project_name
- dataset_name
- get_sql_alchemy_url
- get_database_name
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
38class BigQueryConfig(SqlConfig): 39 """Configuration for BigQuery.""" 40 41 database_name: str = Field(alias="project_name") 42 """The name of the project to use. In BigQuery, this is equivalent to the database name.""" 43 44 schema_name: str = Field(alias="dataset_name", default=DEFAULT_CACHE_SCHEMA_NAME) 45 """The name of the dataset to use. In BigQuery, this is equivalent to the schema name.""" 46 47 credentials_path: str | None = None 48 """The path to the credentials file to use. 49 If not passed, falls back to the default inferred from the environment.""" 50 51 @property 52 def project_name(self) -> str: 53 """Return the project name (alias of self.database_name).""" 54 return self.database_name 55 56 @property 57 def dataset_name(self) -> str: 58 """Return the dataset name (alias of self.schema_name).""" 59 return self.schema_name 60 61 @overrides 62 def get_sql_alchemy_url(self) -> SecretString: 63 """Return the SQLAlchemy URL to use. 64 65 We suppress warnings about unrecognized JSON type. More info on that here: 66 - https://github.com/airbytehq/PyAirbyte/issues/254 67 """ 68 warnings.filterwarnings( 69 "ignore", 70 message="Did not recognize type 'JSON' of column", 71 category=sqlalchemy.exc.SAWarning, 72 ) 73 74 url: URL = make_url(f"bigquery://{self.project_name!s}") 75 if self.credentials_path: 76 url = url.update_query_dict({"credentials_path": self.credentials_path}) 77 78 return SecretString(url) 79 80 @overrides 81 def get_database_name(self) -> str: 82 """Return the name of the database. For BigQuery, this is the project name.""" 83 return self.project_name 84 85 def get_vendor_client(self) -> bigquery.Client: 86 """Return a BigQuery python client.""" 87 if self.credentials_path: 88 credentials = service_account.Credentials.from_service_account_file( 89 self.credentials_path 90 ) 91 else: 92 credentials, _ = google.auth.default() 93 94 return bigquery.Client(credentials=credentials)
Configuration for BigQuery.
The name of the project to use. In BigQuery, this is equivalent to the database name.
The name of the dataset to use. In BigQuery, this is equivalent to the schema name.
The path to the credentials file to use. If not passed, falls back to the default inferred from the environment.
51 @property 52 def project_name(self) -> str: 53 """Return the project name (alias of self.database_name).""" 54 return self.database_name
Return the project name (alias of self.database_name).
56 @property 57 def dataset_name(self) -> str: 58 """Return the dataset name (alias of self.schema_name).""" 59 return self.schema_name
Return the dataset name (alias of self.schema_name).
61 @overrides 62 def get_sql_alchemy_url(self) -> SecretString: 63 """Return the SQLAlchemy URL to use. 64 65 We suppress warnings about unrecognized JSON type. More info on that here: 66 - https://github.com/airbytehq/PyAirbyte/issues/254 67 """ 68 warnings.filterwarnings( 69 "ignore", 70 message="Did not recognize type 'JSON' of column", 71 category=sqlalchemy.exc.SAWarning, 72 ) 73 74 url: URL = make_url(f"bigquery://{self.project_name!s}") 75 if self.credentials_path: 76 url = url.update_query_dict({"credentials_path": self.credentials_path}) 77 78 return SecretString(url)
Return the SQLAlchemy URL to use.
We suppress warnings about unrecognized JSON type. More info on that here:
80 @overrides 81 def get_database_name(self) -> str: 82 """Return the name of the database. For BigQuery, this is the project name.""" 83 return self.project_name
Return the name of the database. For BigQuery, this is the project name.
85 def get_vendor_client(self) -> bigquery.Client: 86 """Return a BigQuery python client.""" 87 if self.credentials_path: 88 credentials = service_account.Credentials.from_service_account_file( 89 self.credentials_path 90 ) 91 else: 92 credentials, _ = google.auth.default() 93 94 return bigquery.Client(credentials=credentials)
Return a BigQuery python client.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- config_hash
- get_create_table_extra_clauses
- get_sql_engine