airbyte
PyAirbyte brings the power of Airbyte to every Python developer.
Getting Started
Reading Data
You can connect to any of hundreds of sources
using the get_source
method. You can then read data from sources using Source.read
method.
from airbyte import get_source
source = get_source(
"source-faker",
config={},
)
read_result = source.read()
for record in read_result["users"]airbyte.records:
print(record)
For more information, see the airbyte.sources
module.
Writing to SQL Caches
Data can be written to caches using a number of SQL-based cache implementations, including Postgres, BigQuery, Snowflake, DuckDB, and MotherDuck. If you do not specify a cache, PyAirbyte will automatically use a local DuckDB cache by default.
For more information, see the airbyte.caches
module.
Writing to Destination Connectors
Data can be written to destinations using the Destination.write
method. You can connect to
destinations using the get_destination
method. PyAirbyte supports all Airbyte destinations, but
Docker is required on your machine in order to run Java-based destinations.
Note: When loading to a SQL database, we recommend using SQL cache (where available, see above) instead of a destination connector. This is because SQL caches are Python-native and therefor more portable when run from different Python-based environments which might not have Docker container support. Destinations in PyAirbyte are uniquely suited for loading to non-SQL platforms such as vector stores and other reverse ETL-type use cases.
For more information, see the airbyte.destinations
module and the full list of destination
connectors here.
PyAirbyte API
Importing as ab
Most examples in the PyAirbyte documentation use the import airbyte as ab
convention. The ab
alias is recommended, making code more concise and readable. When getting started, this
also saves you from digging in submodules to find the classes and functions you need, since
frequently-used classes and functions are available at the top level of the airbyte
module.
Navigating the API
While many PyAirbyte classes and functions are available at the top level of the airbyte
module,
you can also import classes and functions from submodules directly. For example, while you can
import the Source
class from airbyte
, you can also import it from the sources
submodule like
this:
from airbyte.sources import Source
Whether you import from the top level or from a submodule, the classes and functions are the same. We expect that most users will import from the top level when getting started, and then import from submodules when they are deploying more complex implementations.
For quick reference, top-Level modules are listed in the left sidebar of this page.
Other Resources
- PyAirbyte GitHub Readme
- PyAirbyte Issue Tracker
- Frequently Asked Questions
- PyAirbyte Contributors Guide
- GitHub Releases
API Reference
Below is a list of all classes, functions, and modules available in the top-level airbyte
module. (This is a long list!) If you are just starting out, we recommend beginning by selecting a
submodule to navigate to from the left sidebar or from the list below:
Each module has its own documentation and code samples related to effectively using the related capabilities.
airbyte.cloud
- Working with Airbyte Cloud, including running jobs remotely.airbyte.caches
- Working with caches, including how to inspect a cache and get data from it.airbyte.datasets
- Working with datasets, including how to read from datasets and convert to other formats, such as Pandas, Arrow, and LLM Document formats.airbyte.destinations
- Working with destinations, including how to write to Airbyte destinations connectors.airbyte.documents
- Working with LLM documents, including how to convert records into document formats, for instance, when working with AI libraries like LangChain.airbyte.exceptions
- Definitions of all exception and warning classes used in PyAirbyte.airbyte.experimental
- Experimental features and utilities that do not yet have a stable API.airbyte.logs
- Logging functionality and configuration.airbyte.records
- Internal record handling classes.airbyte.results
- Documents the classes returned when working with results fromSource.read
andDestination.write
airbyte.secrets
- Tools for managing secrets in PyAirbyte.airbyte.sources
- Tools for creating and reading from Airbyte sources. This includesairbyte.source.get_source
to declare a source,airbyte.source.Source.read
for reading data, andairbyte.source.Source.get_records()
to peek at records without caching or writing them directly.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""***PyAirbyte brings the power of Airbyte to every Python developer.*** 3 4[![PyPI version](https://badge.fury.io/py/airbyte.svg)](https://badge.fury.io/py/airbyte) 5[![PyPI - Downloads](https://img.shields.io/pypi/dm/airbyte)](https://pypi.org/project/airbyte/) 6[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/airbyte)](https://pypi.org/project/airbyte/) 7[![Star on GitHub](https://img.shields.io/github/stars/airbytehq/pyairbyte.svg?style=social&label=★%20on%20GitHub)](https://github.com/airbytehq/pyairbyte) 8 9# Getting Started 10 11## Reading Data 12 13You can connect to any of [hundreds of sources](https://docs.airbyte.com/integrations/sources/) 14using the `get_source` method. You can then read data from sources using `Source.read` method. 15 16```python 17from airbyte import get_source 18 19source = get_source( 20 "source-faker", 21 config={}, 22) 23read_result = source.read() 24 25for record in read_result["users"].records: 26 print(record) 27``` 28 29For more information, see the `airbyte.sources` module. 30 31## Writing to SQL Caches 32 33Data can be written to caches using a number of SQL-based cache implementations, including 34Postgres, BigQuery, Snowflake, DuckDB, and MotherDuck. If you do not specify a cache, PyAirbyte 35will automatically use a local DuckDB cache by default. 36 37For more information, see the `airbyte.caches` module. 38 39## Writing to Destination Connectors 40 41Data can be written to destinations using the `Destination.write` method. You can connect to 42destinations using the `get_destination` method. PyAirbyte supports all Airbyte destinations, but 43Docker is required on your machine in order to run Java-based destinations. 44 45**Note:** When loading to a SQL database, we recommend using SQL cache (where available, 46[see above](#writing-to-sql-caches)) instead of a destination connector. This is because SQL caches 47are Python-native and therefor more portable when run from different Python-based environments which 48might not have Docker container support. Destinations in PyAirbyte are uniquely suited for loading 49to non-SQL platforms such as vector stores and other reverse ETL-type use cases. 50 51For more information, see the `airbyte.destinations` module and the full list of destination 52connectors [here](https://docs.airbyte.com/integrations/destinations/). 53 54# PyAirbyte API 55 56## Importing as `ab` 57 58Most examples in the PyAirbyte documentation use the `import airbyte as ab` convention. The `ab` 59alias is recommended, making code more concise and readable. When getting started, this 60also saves you from digging in submodules to find the classes and functions you need, since 61frequently-used classes and functions are available at the top level of the `airbyte` module. 62 63## Navigating the API 64 65While many PyAirbyte classes and functions are available at the top level of the `airbyte` module, 66you can also import classes and functions from submodules directly. For example, while you can 67import the `Source` class from `airbyte`, you can also import it from the `sources` submodule like 68this: 69 70```python 71from airbyte.sources import Source 72``` 73 74Whether you import from the top level or from a submodule, the classes and functions are the same. 75We expect that most users will import from the top level when getting started, and then import from 76submodules when they are deploying more complex implementations. 77 78For quick reference, top-Level modules are listed in the left sidebar of this page. 79 80# Other Resources 81 82- [PyAirbyte GitHub Readme](https://github.com/airbytehq/pyairbyte) 83- [PyAirbyte Issue Tracker](https://github.com/airbytehq/pyairbyte/issues) 84- [Frequently Asked Questions](https://github.com/airbytehq/PyAirbyte/blob/main/docs/faq.md) 85- [PyAirbyte Contributors Guide](https://github.com/airbytehq/PyAirbyte/blob/main/docs/CONTRIBUTING.md) 86- [GitHub Releases](https://github.com/airbytehq/PyAirbyte/releases) 87 88---------------------- 89 90# API Reference 91 92Below is a list of all classes, functions, and modules available in the top-level `airbyte` 93module. (This is a long list!) If you are just starting out, we recommend beginning by selecting a 94submodule to navigate to from the left sidebar or from the list below: 95 96Each module 97has its own documentation and code samples related to effectively using the related capabilities. 98 99- **`airbyte.cloud`** - Working with Airbyte Cloud, including running jobs remotely. 100- **`airbyte.caches`** - Working with caches, including how to inspect a cache and get data from it. 101- **`airbyte.datasets`** - Working with datasets, including how to read from datasets and convert to 102 other formats, such as Pandas, Arrow, and LLM Document formats. 103- **`airbyte.destinations`** - Working with destinations, including how to write to Airbyte 104 destinations connectors. 105- **`airbyte.documents`** - Working with LLM documents, including how to convert records into 106 document formats, for instance, when working with AI libraries like LangChain. 107- **`airbyte.exceptions`** - Definitions of all exception and warning classes used in PyAirbyte. 108- **`airbyte.experimental`** - Experimental features and utilities that do not yet have a stable 109 API. 110- **`airbyte.logs`** - Logging functionality and configuration. 111- **`airbyte.records`** - Internal record handling classes. 112- **`airbyte.results`** - Documents the classes returned when working with results from 113 `Source.read` and `Destination.write` 114- **`airbyte.secrets`** - Tools for managing secrets in PyAirbyte. 115- **`airbyte.sources`** - Tools for creating and reading from Airbyte sources. This includes 116 `airbyte.source.get_source` to declare a source, `airbyte.source.Source.read` for reading data, 117 and `airbyte.source.Source.get_records()` to peek at records without caching or writing them 118 directly. 119 120---------------------- 121 122""" # noqa: D415 123 124from __future__ import annotations 125 126from airbyte import ( 127 caches, 128 callbacks, 129 # cli, # Causes circular import if included 130 cloud, 131 constants, 132 datasets, 133 destinations, 134 documents, 135 exceptions, # noqa: ICN001 # No 'exc' alias for top-level module 136 experimental, 137 logs, 138 records, 139 results, 140 secrets, 141 sources, 142) 143from airbyte.caches.bigquery import BigQueryCache 144from airbyte.caches.duckdb import DuckDBCache 145from airbyte.caches.util import get_colab_cache, get_default_cache, new_local_cache 146from airbyte.datasets import CachedDataset 147from airbyte.destinations.base import Destination 148from airbyte.destinations.util import get_destination 149from airbyte.records import StreamRecord 150from airbyte.results import ReadResult, WriteResult 151from airbyte.secrets import SecretSourceEnum, get_secret 152from airbyte.sources import registry 153from airbyte.sources.base import Source 154from airbyte.sources.registry import get_available_connectors 155from airbyte.sources.util import get_source 156 157 158__all__ = [ 159 # Modules 160 "caches", 161 "callbacks", 162 # "cli", # Causes circular import if included 163 "cloud", 164 "constants", 165 "datasets", 166 "destinations", 167 "documents", 168 "exceptions", 169 "experimental", 170 "logs", 171 "records", 172 "registry", 173 "results", 174 "secrets", 175 "sources", 176 # Factories 177 "get_available_connectors", 178 "get_colab_cache", 179 "get_default_cache", 180 "get_destination", 181 "get_secret", 182 "get_source", 183 "new_local_cache", 184 # Classes 185 "BigQueryCache", 186 "CachedDataset", 187 "Destination", 188 "DuckDBCache", 189 "ReadResult", 190 "SecretSourceEnum", 191 "Source", 192 "StreamRecord", 193 "WriteResult", 194] 195 196__docformat__ = "google"
311def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]: 312 """Return a list of all available connectors. 313 314 Connectors will be returned in alphabetical order, with the standard prefix "source-". 315 """ 316 if install_type is None: 317 # No install type specified. Filter for whatever is runnable. 318 if is_docker_installed(): 319 logging.info("Docker is detected. Returning all connectors.") 320 # If Docker is available, return all connectors. 321 return sorted(conn.name for conn in _get_registry_cache().values()) 322 323 logging.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 324 325 # If Docker is not available, return only Python and Manifest-based connectors. 326 return sorted( 327 conn.name 328 for conn in _get_registry_cache().values() 329 if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY} 330 ) 331 332 if not isinstance(install_type, InstallType): 333 install_type = InstallType(install_type) 334 335 if install_type == InstallType.PYTHON: 336 return sorted( 337 conn.name 338 for conn in _get_registry_cache().values() 339 if conn.pypi_package_name is not None 340 ) 341 342 if install_type == InstallType.JAVA: 343 warnings.warn( 344 message="Java connectors are not yet supported.", 345 stacklevel=2, 346 ) 347 return sorted( 348 conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA 349 ) 350 351 if install_type == InstallType.DOCKER: 352 return sorted(conn.name for conn in _get_registry_cache().values()) 353 354 if install_type == InstallType.YAML: 355 return sorted( 356 conn.name 357 for conn in _get_registry_cache().values() 358 if InstallType.YAML in conn.install_types 359 and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED 360 ) 361 362 # pragma: no cover # Should never be reached. 363 raise exc.PyAirbyteInputError( 364 message="Invalid install type.", 365 context={ 366 "install_type": install_type, 367 }, 368 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".
81def get_colab_cache( 82 cache_name: str = "default_cache", 83 sub_dir: str = "Airbyte/cache", 84 schema_name: str = "main", 85 table_prefix: str | None = "", 86 drive_name: str = _MY_DRIVE, 87 mount_path: str = _GOOGLE_DRIVE_DEFAULT_MOUNT_PATH, 88) -> DuckDBCache: 89 """Get a local cache for storing data, using the default database path. 90 91 Unlike the default `DuckDBCache`, this implementation will easily persist data across multiple 92 Colab sessions. 93 94 Please note that Google Colab may prompt you to authenticate with your Google account to access 95 your Google Drive. When prompted, click the link and follow the instructions. 96 97 Colab will require access to read and write files in your Google Drive, so please be sure to 98 grant the necessary permissions when prompted. 99 100 All arguments are optional and have default values that are suitable for most use cases. 101 102 Args: 103 cache_name: The name to use for the cache. Defaults to "colab_cache". Override this if you 104 want to use a different database for different projects. 105 sub_dir: The subdirectory to store the cache in. Defaults to "Airbyte/cache". Override this 106 if you want to store the cache in a different subdirectory than the default. 107 schema_name: The name of the schema to write to. Defaults to "main". Override this if you 108 want to write to a different schema. 109 table_prefix: The prefix to use for all tables in the cache. Defaults to "". Override this 110 if you want to use a different prefix for all tables. 111 drive_name: The name of the Google Drive to use. Defaults to "MyDrive". Override this if you 112 want to store data in a shared drive instead of your personal drive. 113 mount_path: The path to mount Google Drive to. Defaults to "/content/drive". Override this 114 if you want to mount Google Drive to a different path (not recommended). 115 116 ## Usage Examples 117 118 The default `get_colab_cache` arguments are suitable for most use cases: 119 120 ```python 121 from airbyte.caches.colab import get_colab_cache 122 123 colab_cache = get_colab_cache() 124 ``` 125 126 Or you can call `get_colab_cache` with custom arguments: 127 128 ```python 129 custom_cache = get_colab_cache( 130 cache_name="my_custom_cache", 131 sub_dir="Airbyte/custom_cache", 132 drive_name="My Company Drive", 133 ) 134 ``` 135 """ 136 try: 137 from google.colab import drive # noqa: PLC0415 # type: ignore[reportMissingImports] 138 except ImportError: 139 drive = None 140 msg = ( 141 "The `google.colab` interface is only available in Google Colab. " 142 "Please run this code in a Google Colab notebook." 143 ) 144 raise ImportError(msg) from None 145 146 drive.mount(mount_path) 147 drive_root = ( 148 Path(mount_path) / drive_name 149 if drive_name == _MY_DRIVE 150 else Path(mount_path) / "Shareddrives" / drive_name 151 ) 152 153 cache_dir = drive_root / sub_dir 154 cache_dir.mkdir(parents=True, exist_ok=True) 155 db_file_path = cache_dir / f"{cache_name}.duckdb" 156 157 print(f"Using persistent PyAirbyte cache in Google Drive: `{db_file_path}`.") 158 return DuckDBCache( 159 db_path=db_file_path, 160 cache_dir=cache_dir, 161 schema_name=schema_name, 162 table_prefix=table_prefix, 163 )
Get a local cache for storing data, using the default database path.
Unlike the default DuckDBCache
, this implementation will easily persist data across multiple
Colab sessions.
Please note that Google Colab may prompt you to authenticate with your Google account to access your Google Drive. When prompted, click the link and follow the instructions.
Colab will require access to read and write files in your Google Drive, so please be sure to grant the necessary permissions when prompted.
All arguments are optional and have default values that are suitable for most use cases.
Arguments:
- cache_name: The name to use for the cache. Defaults to "colab_cache". Override this if you want to use a different database for different projects.
- sub_dir: The subdirectory to store the cache in. Defaults to "Airbyte/cache". Override this if you want to store the cache in a different subdirectory than the default.
- schema_name: The name of the schema to write to. Defaults to "main". Override this if you want to write to a different schema.
- table_prefix: The prefix to use for all tables in the cache. Defaults to "". Override this if you want to use a different prefix for all tables.
- drive_name: The name of the Google Drive to use. Defaults to "MyDrive". Override this if you want to store data in a shared drive instead of your personal drive.
- mount_path: The path to mount Google Drive to. Defaults to "/content/drive". Override this if you want to mount Google Drive to a different path (not recommended).
Usage Examples
The default get_colab_cache
arguments are suitable for most use cases:
from airbyte.caches.colab import get_colab_cache
colab_cache = get_colab_cache()
Or you can call get_colab_cache
with custom arguments:
custom_cache = get_colab_cache(
cache_name="my_custom_cache",
sub_dir="Airbyte/custom_cache",
drive_name="My Company Drive",
)
27def get_default_cache() -> DuckDBCache: 28 """Get a local cache for storing data, using the default database path. 29 30 Cache files are stored in the `.cache` directory, relative to the current 31 working directory. 32 """ 33 cache_dir = Path("./.cache/default_cache") 34 return DuckDBCache( 35 db_path=cache_dir / "default_cache.duckdb", 36 cache_dir=cache_dir, 37 )
Get a local cache for storing data, using the default database path.
Cache files are stored in the .cache
directory, relative to the current
working directory.
22def get_destination( # noqa: PLR0913 # Too many arguments 23 name: str, 24 config: dict[str, Any] | None = None, 25 *, 26 config_change_callback: ConfigChangeCallback | None = None, 27 version: str | None = None, 28 pip_url: str | None = None, 29 local_executable: Path | str | None = None, 30 docker_image: str | bool | None = None, 31 use_host_network: bool = False, 32 install_if_missing: bool = True, 33) -> Destination: 34 """Get a connector by name and version. 35 36 Args: 37 name: connector name 38 config: connector config - if not provided, you need to set it later via the set_config 39 method. 40 config_change_callback: callback function to be called when the connector config changes. 41 streams: list of stream names to select for reading. If set to "*", all streams will be 42 selected. If not provided, you can set it later via the `select_streams()` or 43 `select_all_streams()` method. 44 version: connector version - if not provided, the currently installed version will be used. 45 If no version is installed, the latest available version will be used. The version can 46 also be set to "latest" to force the use of the latest available version. 47 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 48 connector name. 49 local_executable: If set, the connector will be assumed to already be installed and will be 50 executed using this path or executable name. Otherwise, the connector will be installed 51 automatically in a virtual environment. 52 docker_image: If set, the connector will be executed using Docker. You can specify `True` 53 to use the default image for the connector, or you can specify a custom image name. 54 If `version` is specified and your image name does not already contain a tag 55 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 56 use_host_network: If set, along with docker_image, the connector will be executed using 57 the host network. This is useful for connectors that need to access resources on 58 the host machine, such as a local database. This parameter is ignored when 59 `docker_image` is not set. 60 install_if_missing: Whether to install the connector if it is not available locally. This 61 parameter is ignored when local_executable is set. 62 """ 63 return Destination( 64 name=name, 65 config=config, 66 config_change_callback=config_change_callback, 67 executor=get_connector_executor( 68 name=name, 69 version=version, 70 pip_url=pip_url, 71 local_executable=local_executable, 72 docker_image=docker_image, 73 use_host_network=use_host_network, 74 install_if_missing=install_if_missing, 75 ), 76 )
Get a connector by name and version.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
True
to use the default image for the connector, or you can specify a custom image name. Ifversion
is specified and your image name does not already contain a tag (e.g.my-image:latest
), the version will be appended as a tag (e.g.my-image:0.1.0
). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_image
is not set. - install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
15def get_secret( 16 secret_name: str, 17 /, 18 *, 19 sources: list[SecretManager | SecretSourceEnum] | None = None, 20 allow_prompt: bool = True, 21 **kwargs: dict[str, Any], 22) -> SecretString: 23 """Get a secret from the environment. 24 25 The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum` 26 options. If left blank, all available sources will be checked. If a list of `SecretSourceEnum` 27 entries is passed, then the sources will be checked using the provided ordering. 28 29 If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then 30 the user will be prompted to enter the secret if it is not found in any of the other sources. 31 """ 32 if "source" in kwargs: 33 warnings.warn( 34 message="The `source` argument is deprecated. Use the `sources` argument instead.", 35 category=DeprecationWarning, 36 stacklevel=2, 37 ) 38 sources = kwargs.pop("source") # type: ignore [assignment] 39 40 available_sources: dict[str, SecretManager] = {} 41 for available_source in _get_secret_sources(): 42 # Add available sources to the dict. Order matters. 43 available_sources[available_source.name] = available_source 44 45 if sources is None: 46 # If ANY is in the list, then we don't need to check any other sources. 47 # This is the default behavior. 48 sources = list(available_sources.values()) 49 50 elif not isinstance(sources, list): 51 sources = [sources] # type: ignore [unreachable] # This is a 'just in case' catch. 52 53 # Replace any SecretSourceEnum strings with the matching SecretManager object 54 for source in list(sources): 55 if isinstance(source, SecretSourceEnum): 56 if source not in available_sources: 57 raise exc.PyAirbyteInputError( 58 guidance="Invalid secret source name.", 59 input_value=source, 60 context={ 61 "Available Sources": list(available_sources.keys()), 62 }, 63 ) 64 65 sources[sources.index(source)] = available_sources[source] 66 67 secret_managers = cast(list[SecretManager], sources) 68 69 if SecretSourceEnum.PROMPT in secret_managers: 70 prompt_source = secret_managers.pop( 71 # Mis-typed, but okay here since we have equality logic for the enum comparison: 72 secret_managers.index(SecretSourceEnum.PROMPT), # type: ignore [arg-type] 73 ) 74 75 if allow_prompt: 76 # Always check prompt last. Add it to the end of the list. 77 secret_managers.append(prompt_source) 78 79 for secret_mgr in secret_managers: 80 val = secret_mgr.get_secret(secret_name) 81 if val: 82 return SecretString(val) 83 84 raise exc.PyAirbyteSecretNotFoundError( 85 secret_name=secret_name, 86 sources=[str(s) for s in available_sources], 87 )
Get a secret from the environment.
The optional sources
argument of enum type SecretSourceEnum
or list of SecretSourceEnum
options. If left blank, all available sources will be checked. If a list of SecretSourceEnum
entries is passed, then the sources will be checked using the provided ordering.
If allow_prompt
is True
or if SecretSourceEnum.PROMPT is declared in the source
arg, then
the user will be prompted to enter the secret if it is not found in any of the other sources.
48def get_source( # noqa: PLR0913 # Too many arguments 49 name: str, 50 config: dict[str, Any] | None = None, 51 *, 52 config_change_callback: ConfigChangeCallback | None = None, 53 streams: str | list[str] | None = None, 54 version: str | None = None, 55 pip_url: str | None = None, 56 local_executable: Path | str | None = None, 57 docker_image: bool | str | None = None, 58 use_host_network: bool = False, 59 source_manifest: bool | dict | Path | str | None = None, 60 install_if_missing: bool = True, 61 install_root: Path | None = None, 62) -> Source: 63 """Get a connector by name and version. 64 65 If an explicit install or execution method is requested (e.g. `local_executable`, 66 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 67 68 Otherwise, an appropriate method will be selected based on the available connector metadata: 69 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 70 will be downloaded and used to to execute the connector. 71 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 72 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 73 will be executed using Docker. 74 75 Args: 76 name: connector name 77 config: connector config - if not provided, you need to set it later via the set_config 78 method. 79 config_change_callback: callback function to be called when the connector config changes. 80 streams: list of stream names to select for reading. If set to "*", all streams will be 81 selected. If not provided, you can set it later via the `select_streams()` or 82 `select_all_streams()` method. 83 version: connector version - if not provided, the currently installed version will be used. 84 If no version is installed, the latest available version will be used. The version can 85 also be set to "latest" to force the use of the latest available version. 86 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 87 connector name. 88 local_executable: If set, the connector will be assumed to already be installed and will be 89 executed using this path or executable name. Otherwise, the connector will be installed 90 automatically in a virtual environment. 91 docker_image: If set, the connector will be executed using Docker. You can specify `True` 92 to use the default image for the connector, or you can specify a custom image name. 93 If `version` is specified and your image name does not already contain a tag 94 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 95 use_host_network: If set, along with docker_image, the connector will be executed using 96 the host network. This is useful for connectors that need to access resources on 97 the host machine, such as a local database. This parameter is ignored when 98 `docker_image` is not set. 99 source_manifest: If set, the connector will be executed based on a declarative YAML 100 source definition. This input can be `True` to attempt to auto-download a YAML spec, 101 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 102 the local file system, or `str` to pull the definition from a web URL. 103 install_if_missing: Whether to install the connector if it is not available locally. This 104 parameter is ignored when `local_executable` or `source_manifest` are set. 105 install_root: (Optional.) The root directory where the virtual environment will be 106 created. If not provided, the current working directory will be used. 107 """ 108 return Source( 109 name=name, 110 config=config, 111 config_change_callback=config_change_callback, 112 streams=streams, 113 executor=get_connector_executor( 114 name=name, 115 version=version, 116 pip_url=pip_url, 117 local_executable=local_executable, 118 docker_image=docker_image, 119 use_host_network=use_host_network, 120 source_manifest=source_manifest, 121 install_if_missing=install_if_missing, 122 install_root=install_root, 123 ), 124 )
Get a connector by name and version.
If an explicit install or execution method is requested (e.g. local_executable
,
docker_image
, pip_url
, source_manifest
), the connector will be executed using this method.
Otherwise, an appropriate method will be selected based on the available connector metadata:
- If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
- Else, if the connector is registered and has a PyPI package, it will be installed via pip.
- Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
True
to use the default image for the connector, or you can specify a custom image name. Ifversion
is specified and your image name does not already contain a tag (e.g.my-image:latest
), the version will be appended as a tag (e.g.my-image:0.1.0
). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_image
is not set. - source_manifest: If set, the connector will be executed based on a declarative YAML
source definition. This input can be
True
to attempt to auto-download a YAML spec,dict
to accept a Python dictionary as the manifest,Path
to pull a manifest from the local file system, orstr
to pull the definition from a web URL. - install_if_missing: Whether to install the connector if it is not available locally. This
parameter is ignored when
local_executable
orsource_manifest
are set. - install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
40def new_local_cache( 41 cache_name: str | None = None, 42 cache_dir: str | Path | None = None, 43 *, 44 cleanup: bool = True, 45) -> DuckDBCache: 46 """Get a local cache for storing data, using a name string to seed the path. 47 48 Args: 49 cache_name: Name to use for the cache. Defaults to None. 50 cache_dir: Root directory to store the cache in. Defaults to None. 51 cleanup: Whether to clean up temporary files. Defaults to True. 52 53 Cache files are stored in the `.cache` directory, relative to the current 54 working directory. 55 """ 56 if cache_name: 57 if " " in cache_name: 58 raise exc.PyAirbyteInputError( 59 message="Cache name cannot contain spaces.", 60 input_value=cache_name, 61 ) 62 63 if not cache_name.replace("_", "").isalnum(): 64 raise exc.PyAirbyteInputError( 65 message="Cache name can only contain alphanumeric characters and underscores.", 66 input_value=cache_name, 67 ) 68 69 cache_name = cache_name or str(ulid.ULID()) 70 cache_dir = cache_dir or Path(f"./.cache/{cache_name}") 71 if not isinstance(cache_dir, Path): 72 cache_dir = Path(cache_dir) 73 74 return DuckDBCache( 75 db_path=cache_dir / f"db_{cache_name}.duckdb", 76 cache_dir=cache_dir, 77 cleanup=cleanup, 78 )
Get a local cache for storing data, using a name string to seed the path.
Arguments:
- cache_name: Name to use for the cache. Defaults to None.
- cache_dir: Root directory to store the cache in. Defaults to None.
- cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the .cache
directory, relative to the current
working directory.
32class BigQueryCache(BigQueryConfig, CacheBase): 33 """The BigQuery cache implementation.""" 34 35 _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor) 36 37 def get_arrow_dataset( 38 self, 39 stream_name: str, 40 *, 41 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 42 ) -> NoReturn: 43 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 44 45 See: https://github.com/airbytehq/PyAirbyte/issues/165 46 """ 47 raise NotImplementedError( 48 "BigQuery doesn't currently support to_arrow" 49 "Please consider using a different cache implementation for these functionalities." 50 )
The BigQuery cache implementation.
37 def get_arrow_dataset( 38 self, 39 stream_name: str, 40 *, 41 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 42 ) -> NoReturn: 43 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 44 45 See: https://github.com/airbytehq/PyAirbyte/issues/165 46 """ 47 raise NotImplementedError( 48 "BigQuery doesn't currently support to_arrow" 49 "Please consider using a different cache implementation for these functionalities." 50 )
Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table
.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.bigquery.BigQueryConfig
- database_name
- schema_name
- credentials_path
- project_name
- dataset_name
- get_sql_alchemy_url
- get_database_name
- get_vendor_client
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_sql_engine
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
140class CachedDataset(SQLDataset): 141 """A dataset backed by a SQL table cache. 142 143 Because this dataset includes all records from the underlying table, we also expose the 144 underlying table as a SQLAlchemy Table object. 145 """ 146 147 def __init__( 148 self, 149 cache: CacheBase, 150 stream_name: str, 151 stream_configuration: ConfiguredAirbyteStream | None | Literal[False] = None, 152 ) -> None: 153 """We construct the query statement by selecting all columns from the table. 154 155 This prevents the need to scan the table schema to construct the query statement. 156 157 If stream_configuration is None, we attempt to retrieve the stream configuration from the 158 cache processor. This is useful when constructing a dataset from a CachedDataset object, 159 which already has the stream configuration. 160 161 If stream_configuration is set to False, we skip the stream configuration retrieval. 162 """ 163 table_name = cache.processor.get_sql_table_name(stream_name) 164 schema_name = cache.schema_name 165 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 166 super().__init__( 167 cache=cache, 168 stream_name=stream_name, 169 query_statement=query, 170 stream_configuration=stream_configuration, 171 ) 172 173 @overrides 174 def to_pandas(self) -> DataFrame: 175 """Return the underlying dataset data as a pandas DataFrame.""" 176 return self._cache.get_pandas_dataframe(self._stream_name) 177 178 @overrides 179 def to_arrow( 180 self, 181 *, 182 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 183 ) -> Dataset: 184 """Return an Arrow Dataset containing the data from the specified stream. 185 186 Args: 187 stream_name (str): Name of the stream to retrieve data from. 188 max_chunk_size (int): max number of records to include in each batch of pyarrow dataset. 189 190 Returns: 191 pa.dataset.Dataset: Arrow Dataset containing the stream's data. 192 """ 193 return self._cache.get_arrow_dataset( 194 stream_name=self._stream_name, 195 max_chunk_size=max_chunk_size, 196 ) 197 198 def to_sql_table(self) -> Table: 199 """Return the underlying SQL table as a SQLAlchemy Table object.""" 200 return self._cache.processor.get_sql_table(self.stream_name) 201 202 def __eq__(self, value: object) -> bool: 203 """Return True if the value is a CachedDataset with the same cache and stream name. 204 205 In the case of CachedDataset objects, we can simply compare the cache and stream name. 206 207 Note that this equality check is only supported on CachedDataset objects and not for 208 the base SQLDataset implementation. This is because of the complexity and computational 209 cost of comparing two arbitrary SQL queries that could be bound to different variables, 210 as well as the chance that two queries can be syntactically equivalent without being 211 text-wise equivalent. 212 """ 213 if not isinstance(value, SQLDataset): 214 return False 215 216 if self._cache is not value._cache: 217 return False 218 219 return not self._stream_name != value._stream_name 220 221 def __hash__(self) -> int: 222 return hash(self._stream_name)
A dataset backed by a SQL table cache.
Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.
147 def __init__( 148 self, 149 cache: CacheBase, 150 stream_name: str, 151 stream_configuration: ConfiguredAirbyteStream | None | Literal[False] = None, 152 ) -> None: 153 """We construct the query statement by selecting all columns from the table. 154 155 This prevents the need to scan the table schema to construct the query statement. 156 157 If stream_configuration is None, we attempt to retrieve the stream configuration from the 158 cache processor. This is useful when constructing a dataset from a CachedDataset object, 159 which already has the stream configuration. 160 161 If stream_configuration is set to False, we skip the stream configuration retrieval. 162 """ 163 table_name = cache.processor.get_sql_table_name(stream_name) 164 schema_name = cache.schema_name 165 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 166 super().__init__( 167 cache=cache, 168 stream_name=stream_name, 169 query_statement=query, 170 stream_configuration=stream_configuration, 171 )
We construct the query statement by selecting all columns from the table.
This prevents the need to scan the table schema to construct the query statement.
If stream_configuration is None, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.
If stream_configuration is set to False, we skip the stream configuration retrieval.
173 @overrides 174 def to_pandas(self) -> DataFrame: 175 """Return the underlying dataset data as a pandas DataFrame.""" 176 return self._cache.get_pandas_dataframe(self._stream_name)
Return the underlying dataset data as a pandas DataFrame.
178 @overrides 179 def to_arrow( 180 self, 181 *, 182 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 183 ) -> Dataset: 184 """Return an Arrow Dataset containing the data from the specified stream. 185 186 Args: 187 stream_name (str): Name of the stream to retrieve data from. 188 max_chunk_size (int): max number of records to include in each batch of pyarrow dataset. 189 190 Returns: 191 pa.dataset.Dataset: Arrow Dataset containing the stream's data. 192 """ 193 return self._cache.get_arrow_dataset( 194 stream_name=self._stream_name, 195 max_chunk_size=max_chunk_size, 196 )
Return an Arrow Dataset containing the data from the specified stream.
Arguments:
- stream_name (str): Name of the stream to retrieve data from.
- max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
Returns:
pa.dataset.Dataset: Arrow Dataset containing the stream's data.
198 def to_sql_table(self) -> Table: 199 """Return the underlying SQL table as a SQLAlchemy Table object.""" 200 return self._cache.processor.get_sql_table(self.stream_name)
Return the underlying SQL table as a SQLAlchemy Table object.
Inherited Members
- airbyte.datasets._sql.SQLDataset
- stream_name
- with_filter
- airbyte.datasets._base.DatasetBase
- to_documents
43class Destination(ConnectorBase, AirbyteWriterInterface): 44 """A class representing a destination that can be called.""" 45 46 connector_type: Literal["destination"] = "destination" 47 48 def __init__( 49 self, 50 executor: Executor, 51 name: str, 52 config: dict[str, Any] | None = None, 53 *, 54 config_change_callback: ConfigChangeCallback | None = None, 55 validate: bool = False, 56 ) -> None: 57 """Initialize the source. 58 59 If config is provided, it will be validated against the spec if validate is True. 60 """ 61 super().__init__( 62 executor=executor, 63 name=name, 64 config=config, 65 config_change_callback=config_change_callback, 66 validate=validate, 67 ) 68 69 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 70 self, 71 source_data: Source | ReadResult, 72 *, 73 streams: list[str] | Literal["*"] | None = None, 74 cache: CacheBase | None | Literal[False] = None, 75 state_cache: CacheBase | None | Literal[False] = None, 76 write_strategy: WriteStrategy = WriteStrategy.AUTO, 77 force_full_refresh: bool = False, 78 ) -> WriteResult: 79 """Write data from source connector or already cached source data. 80 81 Caching is enabled by default, unless explicitly disabled. 82 83 Args: 84 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 85 streams: The streams to write to the destination. If omitted or if "*" is provided, 86 all streams will be written. If `source_data` is a source, then streams must be 87 selected here or on the source. If both are specified, this setting will override 88 the stream selection on the source. 89 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 90 False, the cache will be disabled. This must be `None` if `source_data` is already 91 a `Cache` object. 92 state_cache: A cache to use for storing incremental state. You do not need to set this 93 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 94 disable state management. 95 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 96 will decide the best strategy to use. 97 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 98 existing state will be ignored and all source data will be reloaded. 99 100 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 101 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 102 a known destination state, the destination-specific state will be used. If neither are 103 available, a full refresh will be performed. 104 """ 105 if not isinstance(source_data, ReadResult | Source): 106 raise exc.PyAirbyteInputError( 107 message="Invalid source_data type for `source_data` arg.", 108 context={ 109 "source_data_type_provided": type(source_data).__name__, 110 }, 111 ) 112 113 # Resolve `source`, `read_result`, and `source_name` 114 source: Source | None = source_data if isinstance(source_data, Source) else None 115 read_result: ReadResult | None = ( 116 source_data if isinstance(source_data, ReadResult) else None 117 ) 118 source_name: str = source.name if source else cast(ReadResult, read_result).source_name 119 120 # State providers and writers default to no-op, unless overridden below. 121 cache_state_provider: StateProviderBase = StaticInputState([]) 122 """Provides the state of the cache's data.""" 123 cache_state_writer: StateWriterBase = NoOpStateWriter() 124 """Writes updates for the state of the cache's data.""" 125 destination_state_provider: StateProviderBase = StaticInputState([]) 126 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 127 destination_state_writer: StateWriterBase = NoOpStateWriter() 128 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 129 130 # If caching not explicitly disabled 131 if cache is not False: 132 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 133 if isinstance(source_data, ReadResult): 134 cache = source_data.cache 135 136 cache = cache or get_default_cache() 137 cache_state_provider = cache.get_state_provider( 138 source_name=source_name, 139 destination_name=None, # This will just track the cache state 140 ) 141 cache_state_writer = cache.get_state_writer( 142 source_name=source_name, 143 destination_name=None, # This will just track the cache state 144 ) 145 146 # Resolve `state_cache` 147 if state_cache is None: 148 state_cache = cache or get_default_cache() 149 150 # Resolve `destination_state_writer` and `destination_state_provider` 151 if state_cache: 152 destination_state_writer = state_cache.get_state_writer( 153 source_name=source_name, 154 destination_name=self.name, 155 ) 156 if not force_full_refresh: 157 destination_state_provider = state_cache.get_state_provider( 158 source_name=source_name, 159 destination_name=self.name, 160 ) 161 elif state_cache is not False: 162 warnings.warn( 163 "No state backend or cache provided. State will not be tracked." 164 "To track state, provide a cache or state backend." 165 "To silence this warning, set `state_cache=False` explicitly.", 166 category=exc.PyAirbyteWarning, 167 stacklevel=2, 168 ) 169 170 # Resolve `catalog_provider` 171 if source: 172 catalog_provider = CatalogProvider( 173 configured_catalog=source.get_configured_catalog( 174 streams=streams, 175 ) 176 ) 177 elif read_result: 178 catalog_provider = CatalogProvider.from_read_result(read_result) 179 else: 180 raise exc.PyAirbyteInternalError( 181 message="`source_data` must be a `Source` or `ReadResult` object.", 182 ) 183 184 progress_tracker = ProgressTracker( 185 source=source if isinstance(source_data, Source) else None, 186 cache=cache or None, 187 destination=self, 188 expected_streams=catalog_provider.stream_names, 189 ) 190 191 source_state_provider: StateProviderBase 192 source_state_provider = JoinedStateProvider( 193 primary=cache_state_provider, 194 secondary=destination_state_provider, 195 ) 196 197 if source: 198 if cache is False: 199 # Get message iterator for source (caching disabled) 200 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 201 streams=streams, 202 state_provider=source_state_provider, 203 progress_tracker=progress_tracker, 204 force_full_refresh=force_full_refresh, 205 ) 206 else: 207 # Caching enabled and we are reading from a source. 208 # Read the data to cache if caching is enabled. 209 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 210 cache=cache, 211 state_provider=source_state_provider, 212 state_writer=cache_state_writer, 213 catalog_provider=catalog_provider, 214 stream_names=catalog_provider.stream_names, 215 write_strategy=write_strategy, 216 force_full_refresh=force_full_refresh, 217 skip_validation=False, 218 progress_tracker=progress_tracker, 219 ) 220 message_iterator = AirbyteMessageIterator.from_read_result( 221 read_result=read_result, 222 ) 223 else: # Else we are reading from a read result 224 assert read_result is not None 225 message_iterator = AirbyteMessageIterator.from_read_result( 226 read_result=read_result, 227 ) 228 229 # Write the data to the destination 230 try: 231 self._write_airbyte_message_stream( 232 stdin=message_iterator, 233 catalog_provider=catalog_provider, 234 write_strategy=write_strategy, 235 state_writer=destination_state_writer, 236 progress_tracker=progress_tracker, 237 ) 238 except Exception as ex: 239 progress_tracker.log_failure(exception=ex) 240 raise 241 else: 242 # No exceptions were raised, so log success 243 progress_tracker.log_success() 244 245 return WriteResult( 246 destination=self, 247 source_data=source_data, 248 catalog_provider=catalog_provider, 249 state_writer=destination_state_writer, 250 progress_tracker=progress_tracker, 251 ) 252 253 def _write_airbyte_message_stream( 254 self, 255 stdin: IO[str] | AirbyteMessageIterator, 256 *, 257 catalog_provider: CatalogProvider, 258 write_strategy: WriteStrategy, 259 state_writer: StateWriterBase | None = None, 260 progress_tracker: ProgressTracker, 261 ) -> None: 262 """Read from the connector and write to the cache.""" 263 # Run optional validation step 264 if state_writer is None: 265 state_writer = StdOutStateWriter() 266 267 # Apply the write strategy to the catalog provider before sending to the destination 268 catalog_provider = catalog_provider.with_write_strategy(write_strategy) 269 270 with as_temp_files( 271 files_contents=[ 272 self._config, 273 catalog_provider.configured_catalog.model_dump_json(), 274 ] 275 ) as [ 276 config_file, 277 catalog_file, 278 ]: 279 try: 280 # We call the connector to write the data, tallying the inputs and outputs 281 for destination_message in progress_tracker.tally_confirmed_writes( 282 messages=self._execute( 283 args=[ 284 "write", 285 "--config", 286 config_file, 287 "--catalog", 288 catalog_file, 289 ], 290 stdin=AirbyteMessageIterator( 291 progress_tracker.tally_pending_writes( 292 stdin, 293 ) 294 ), 295 ) 296 ): 297 if destination_message.type is Type.STATE: 298 state_writer.write_state(state_message=destination_message.state) 299 300 except exc.AirbyteConnectorFailedError as ex: 301 raise exc.AirbyteConnectorWriteError( 302 connector_name=self.name, 303 log_text=self._last_log_messages, 304 original_exception=ex, 305 ) from None
A class representing a destination that can be called.
48 def __init__( 49 self, 50 executor: Executor, 51 name: str, 52 config: dict[str, Any] | None = None, 53 *, 54 config_change_callback: ConfigChangeCallback | None = None, 55 validate: bool = False, 56 ) -> None: 57 """Initialize the source. 58 59 If config is provided, it will be validated against the spec if validate is True. 60 """ 61 super().__init__( 62 executor=executor, 63 name=name, 64 config=config, 65 config_change_callback=config_change_callback, 66 validate=validate, 67 )
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
69 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 70 self, 71 source_data: Source | ReadResult, 72 *, 73 streams: list[str] | Literal["*"] | None = None, 74 cache: CacheBase | None | Literal[False] = None, 75 state_cache: CacheBase | None | Literal[False] = None, 76 write_strategy: WriteStrategy = WriteStrategy.AUTO, 77 force_full_refresh: bool = False, 78 ) -> WriteResult: 79 """Write data from source connector or already cached source data. 80 81 Caching is enabled by default, unless explicitly disabled. 82 83 Args: 84 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 85 streams: The streams to write to the destination. If omitted or if "*" is provided, 86 all streams will be written. If `source_data` is a source, then streams must be 87 selected here or on the source. If both are specified, this setting will override 88 the stream selection on the source. 89 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 90 False, the cache will be disabled. This must be `None` if `source_data` is already 91 a `Cache` object. 92 state_cache: A cache to use for storing incremental state. You do not need to set this 93 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 94 disable state management. 95 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 96 will decide the best strategy to use. 97 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 98 existing state will be ignored and all source data will be reloaded. 99 100 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 101 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 102 a known destination state, the destination-specific state will be used. If neither are 103 available, a full refresh will be performed. 104 """ 105 if not isinstance(source_data, ReadResult | Source): 106 raise exc.PyAirbyteInputError( 107 message="Invalid source_data type for `source_data` arg.", 108 context={ 109 "source_data_type_provided": type(source_data).__name__, 110 }, 111 ) 112 113 # Resolve `source`, `read_result`, and `source_name` 114 source: Source | None = source_data if isinstance(source_data, Source) else None 115 read_result: ReadResult | None = ( 116 source_data if isinstance(source_data, ReadResult) else None 117 ) 118 source_name: str = source.name if source else cast(ReadResult, read_result).source_name 119 120 # State providers and writers default to no-op, unless overridden below. 121 cache_state_provider: StateProviderBase = StaticInputState([]) 122 """Provides the state of the cache's data.""" 123 cache_state_writer: StateWriterBase = NoOpStateWriter() 124 """Writes updates for the state of the cache's data.""" 125 destination_state_provider: StateProviderBase = StaticInputState([]) 126 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 127 destination_state_writer: StateWriterBase = NoOpStateWriter() 128 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 129 130 # If caching not explicitly disabled 131 if cache is not False: 132 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 133 if isinstance(source_data, ReadResult): 134 cache = source_data.cache 135 136 cache = cache or get_default_cache() 137 cache_state_provider = cache.get_state_provider( 138 source_name=source_name, 139 destination_name=None, # This will just track the cache state 140 ) 141 cache_state_writer = cache.get_state_writer( 142 source_name=source_name, 143 destination_name=None, # This will just track the cache state 144 ) 145 146 # Resolve `state_cache` 147 if state_cache is None: 148 state_cache = cache or get_default_cache() 149 150 # Resolve `destination_state_writer` and `destination_state_provider` 151 if state_cache: 152 destination_state_writer = state_cache.get_state_writer( 153 source_name=source_name, 154 destination_name=self.name, 155 ) 156 if not force_full_refresh: 157 destination_state_provider = state_cache.get_state_provider( 158 source_name=source_name, 159 destination_name=self.name, 160 ) 161 elif state_cache is not False: 162 warnings.warn( 163 "No state backend or cache provided. State will not be tracked." 164 "To track state, provide a cache or state backend." 165 "To silence this warning, set `state_cache=False` explicitly.", 166 category=exc.PyAirbyteWarning, 167 stacklevel=2, 168 ) 169 170 # Resolve `catalog_provider` 171 if source: 172 catalog_provider = CatalogProvider( 173 configured_catalog=source.get_configured_catalog( 174 streams=streams, 175 ) 176 ) 177 elif read_result: 178 catalog_provider = CatalogProvider.from_read_result(read_result) 179 else: 180 raise exc.PyAirbyteInternalError( 181 message="`source_data` must be a `Source` or `ReadResult` object.", 182 ) 183 184 progress_tracker = ProgressTracker( 185 source=source if isinstance(source_data, Source) else None, 186 cache=cache or None, 187 destination=self, 188 expected_streams=catalog_provider.stream_names, 189 ) 190 191 source_state_provider: StateProviderBase 192 source_state_provider = JoinedStateProvider( 193 primary=cache_state_provider, 194 secondary=destination_state_provider, 195 ) 196 197 if source: 198 if cache is False: 199 # Get message iterator for source (caching disabled) 200 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 201 streams=streams, 202 state_provider=source_state_provider, 203 progress_tracker=progress_tracker, 204 force_full_refresh=force_full_refresh, 205 ) 206 else: 207 # Caching enabled and we are reading from a source. 208 # Read the data to cache if caching is enabled. 209 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 210 cache=cache, 211 state_provider=source_state_provider, 212 state_writer=cache_state_writer, 213 catalog_provider=catalog_provider, 214 stream_names=catalog_provider.stream_names, 215 write_strategy=write_strategy, 216 force_full_refresh=force_full_refresh, 217 skip_validation=False, 218 progress_tracker=progress_tracker, 219 ) 220 message_iterator = AirbyteMessageIterator.from_read_result( 221 read_result=read_result, 222 ) 223 else: # Else we are reading from a read result 224 assert read_result is not None 225 message_iterator = AirbyteMessageIterator.from_read_result( 226 read_result=read_result, 227 ) 228 229 # Write the data to the destination 230 try: 231 self._write_airbyte_message_stream( 232 stdin=message_iterator, 233 catalog_provider=catalog_provider, 234 write_strategy=write_strategy, 235 state_writer=destination_state_writer, 236 progress_tracker=progress_tracker, 237 ) 238 except Exception as ex: 239 progress_tracker.log_failure(exception=ex) 240 raise 241 else: 242 # No exceptions were raised, so log success 243 progress_tracker.log_success() 244 245 return WriteResult( 246 destination=self, 247 source_data=source_data, 248 catalog_provider=catalog_provider, 249 state_writer=destination_state_writer, 250 progress_tracker=progress_tracker, 251 )
Write data from source connector or already cached source data.
Caching is enabled by default, unless explicitly disabled.
Arguments:
- source_data: The source data to write. Can be a
Source
or aReadResult
object. - streams: The streams to write to the destination. If omitted or if "*" is provided,
all streams will be written. If
source_data
is a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source. - cache: The cache to use for reading source_data. If
None
, no cache will be used. If False, the cache will be disabled. This must beNone
ifsource_data
is already aCache
object. - state_cache: A cache to use for storing incremental state. You do not need to set this
if
cache
is specified or ifsource_data
is aCache
object. Set toFalse
to disable state management. - write_strategy: The strategy to use for writing source_data. If
AUTO
, the connector will decide the best strategy to use. - force_full_refresh: Whether to force a full refresh of the source_data. If
True
, any existing state will be ignored and all source data will be reloaded.
For incremental syncs, cache
or state_cache
will be checked for matching state values.
If the cache has tracked state, this will be used for the sync. Otherwise, if there is
a known destination state, the destination-specific state will be used. If neither are
available, a full refresh will be performed.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- set_config
- get_config
- config_hash
- validate_config
- config_spec
- print_config_spec
- docs_url
- connector_version
- check
- install
- uninstall
38class DuckDBCache(DuckDBConfig, CacheBase): 39 """A DuckDB cache.""" 40 41 _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)
A DuckDB cache.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A dictionary of computed field names and their corresponding ComputedFieldInfo
objects.
124 def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None: 125 """We need to both initialize private attributes and call the user-defined model_post_init 126 method. 127 """ 128 init_private_attributes(self, context) 129 original_model_post_init(self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Inherited Members
- airbyte.caches.base.CacheBase
- CacheBase
- cache_dir
- cleanup
- config_hash
- execute_sql
- processor
- get_record_processor
- get_records
- get_pandas_dataframe
- get_arrow_dataset
- streams
- get_state_provider
- get_state_writer
- register_source
- airbyte._processors.sql.duckdb.DuckDBConfig
- db_path
- schema_name
- get_sql_alchemy_url
- get_database_name
- get_sql_engine
- airbyte.shared.sql_processor.SqlConfig
- table_prefix
- get_create_table_extra_clauses
- get_vendor_client
- pydantic.main.BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- airbyte._writers.base.AirbyteWriterInterface
- name
33class ReadResult(Mapping[str, CachedDataset]): 34 """The result of a read operation. 35 36 This class is used to return information about the read operation, such as the number of 37 records read. It should not be created directly, but instead returned by the write method 38 of a destination. 39 """ 40 41 def __init__( 42 self, 43 *, 44 source_name: str, 45 processed_streams: list[str], 46 cache: CacheBase, 47 progress_tracker: ProgressTracker, 48 ) -> None: 49 """Initialize a read result. 50 51 This class should not be created directly. Instead, it should be returned by the `read` 52 method of the `Source` class. 53 """ 54 self.source_name = source_name 55 self._progress_tracker = progress_tracker 56 self._cache = cache 57 self._processed_streams = processed_streams 58 59 def __getitem__(self, stream: str) -> CachedDataset: 60 """Return the cached dataset for a given stream name.""" 61 if stream not in self._processed_streams: 62 raise KeyError(stream) 63 64 return CachedDataset(self._cache, stream) 65 66 def __contains__(self, stream: object) -> bool: 67 """Return whether a given stream name was included in processing.""" 68 if not isinstance(stream, str): 69 return False 70 71 return stream in self._processed_streams 72 73 def __iter__(self) -> Iterator[str]: 74 """Return an iterator over the stream names that were processed.""" 75 return self._processed_streams.__iter__() 76 77 def __len__(self) -> int: 78 """Return the number of streams that were processed.""" 79 return len(self._processed_streams) 80 81 def get_sql_engine(self) -> Engine: 82 """Return the SQL engine used by the cache.""" 83 return self._cache.get_sql_engine() 84 85 @property 86 def processed_records(self) -> int: 87 """The total number of records read from the source.""" 88 return self._progress_tracker.total_records_read 89 90 @property 91 def streams(self) -> Mapping[str, CachedDataset]: 92 """Return a mapping of stream names to cached datasets.""" 93 return { 94 stream_name: CachedDataset(self._cache, stream_name) 95 for stream_name in self._processed_streams 96 } 97 98 @property 99 def cache(self) -> CacheBase: 100 """Return the cache object.""" 101 return self._cache
The result of a read operation.
This class is used to return information about the read operation, such as the number of records read. It should not be created directly, but instead returned by the write method of a destination.
41 def __init__( 42 self, 43 *, 44 source_name: str, 45 processed_streams: list[str], 46 cache: CacheBase, 47 progress_tracker: ProgressTracker, 48 ) -> None: 49 """Initialize a read result. 50 51 This class should not be created directly. Instead, it should be returned by the `read` 52 method of the `Source` class. 53 """ 54 self.source_name = source_name 55 self._progress_tracker = progress_tracker 56 self._cache = cache 57 self._processed_streams = processed_streams
Initialize a read result.
This class should not be created directly. Instead, it should be returned by the read
method of the Source
class.
81 def get_sql_engine(self) -> Engine: 82 """Return the SQL engine used by the cache.""" 83 return self._cache.get_sql_engine()
Return the SQL engine used by the cache.
85 @property 86 def processed_records(self) -> int: 87 """The total number of records read from the source.""" 88 return self._progress_tracker.total_records_read
The total number of records read from the source.
90 @property 91 def streams(self) -> Mapping[str, CachedDataset]: 92 """Return a mapping of stream names to cached datasets.""" 93 return { 94 stream_name: CachedDataset(self._cache, stream_name) 95 for stream_name in self._processed_streams 96 }
Return a mapping of stream names to cached datasets.
98 @property 99 def cache(self) -> CacheBase: 100 """Return the cache object.""" 101 return self._cache
Return the cache object.
Inherited Members
- collections.abc.Mapping
- get
- keys
- items
- values
24class SecretSourceEnum(str, Enum): 25 """Enumeration of secret sources supported by PyAirbyte.""" 26 27 ENV = "env" 28 DOTENV = "dotenv" 29 GOOGLE_COLAB = "google_colab" 30 GOOGLE_GSM = "google_gsm" # Not enabled by default 31 32 PROMPT = "prompt"
Enumeration of secret sources supported by PyAirbyte.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
53class Source(ConnectorBase): 54 """A class representing a source that can be called.""" 55 56 connector_type: Literal["source"] = "source" 57 58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 ) -> None: 68 """Initialize the source. 69 70 If config is provided, it will be validated against the spec if validate is True. 71 """ 72 self._to_be_selected_streams: list[str] | str = [] 73 """Used to hold selection criteria before catalog is known.""" 74 75 super().__init__( 76 executor=executor, 77 name=name, 78 config=config, 79 config_change_callback=config_change_callback, 80 validate=validate, 81 ) 82 self._config_dict: dict[str, Any] | None = None 83 self._last_log_messages: list[str] = [] 84 self._discovered_catalog: AirbyteCatalog | None = None 85 self._selected_stream_names: list[str] = [] 86 if config is not None: 87 self.set_config(config, validate=validate) 88 if streams is not None: 89 self.select_streams(streams) 90 91 self._deployed_api_root: str | None = None 92 self._deployed_workspace_id: str | None = None 93 self._deployed_source_id: str | None = None 94 95 def set_streams(self, streams: list[str]) -> None: 96 """Deprecated. See select_streams().""" 97 warnings.warn( 98 "The 'set_streams' method is deprecated and will be removed in a future version. " 99 "Please use the 'select_streams' method instead.", 100 DeprecationWarning, 101 stacklevel=2, 102 ) 103 self.select_streams(streams) 104 105 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 106 """Logs a warning message indicating stream selection which are not selected yet.""" 107 if streams == "*": 108 print( 109 "Warning: Config is not set yet. All streams will be selected after config is set." 110 ) 111 else: 112 print( 113 "Warning: Config is not set yet. " 114 f"Streams to be selected after config is set: {streams}" 115 ) 116 117 def select_all_streams(self) -> None: 118 """Select all streams. 119 120 This is a more streamlined equivalent to: 121 > source.select_streams(source.get_available_streams()). 122 """ 123 if self._config_dict is None: 124 self._to_be_selected_streams = "*" 125 self._log_warning_preselected_stream(self._to_be_selected_streams) 126 return 127 128 self._selected_stream_names = self.get_available_streams() 129 130 def select_streams(self, streams: str | list[str]) -> None: 131 """Select the stream names that should be read from the connector. 132 133 Args: 134 streams: A list of stream names to select. If set to "*", all streams will be selected. 135 136 Currently, if this is not set, all streams will be read. 137 """ 138 if self._config_dict is None: 139 self._to_be_selected_streams = streams 140 self._log_warning_preselected_stream(streams) 141 return 142 143 if streams == "*": 144 self.select_all_streams() 145 return 146 147 if isinstance(streams, str): 148 # If a single stream is provided, convert it to a one-item list 149 streams = [streams] 150 151 available_streams = self.get_available_streams() 152 for stream in streams: 153 if stream not in available_streams: 154 raise exc.AirbyteStreamNotFoundError( 155 stream_name=stream, 156 connector_name=self.name, 157 available_streams=available_streams, 158 ) 159 self._selected_stream_names = streams 160 161 def get_selected_streams(self) -> list[str]: 162 """Get the selected streams. 163 164 If no streams are selected, return an empty list. 165 """ 166 return self._selected_stream_names 167 168 def set_config( 169 self, 170 config: dict[str, Any], 171 *, 172 validate: bool = True, 173 ) -> None: 174 """Set the config for the connector. 175 176 If validate is True, raise an exception if the config fails validation. 177 178 If validate is False, validation will be deferred until check() or validate_config() 179 is called. 180 """ 181 if validate: 182 self.validate_config(config) 183 184 self._config_dict = config 185 186 if self._to_be_selected_streams: 187 self.select_streams(self._to_be_selected_streams) 188 self._to_be_selected_streams = [] 189 190 def get_config(self) -> dict[str, Any]: 191 """Get the config for the connector.""" 192 return self._config 193 194 @property 195 def _config(self) -> dict[str, Any]: 196 if self._config_dict is None: 197 raise exc.AirbyteConnectorConfigurationMissingError( 198 connector_name=self.name, 199 guidance="Provide via get_source() or set_config()", 200 ) 201 return self._config_dict 202 203 def _discover(self) -> AirbyteCatalog: 204 """Call discover on the connector. 205 206 This involves the following steps: 207 - Write the config to a temporary file 208 - execute the connector with discover --config <config_file> 209 - Listen to the messages and return the first AirbyteCatalog that comes along. 210 - Make sure the subprocess is killed when the function returns. 211 """ 212 with as_temp_files([self._config]) as [config_file]: 213 for msg in self._execute(["discover", "--config", config_file]): 214 if msg.type == Type.CATALOG and msg.catalog: 215 return msg.catalog 216 raise exc.AirbyteConnectorMissingCatalogError( 217 connector_name=self.name, 218 log_text=self._last_log_messages, 219 ) 220 221 def get_available_streams(self) -> list[str]: 222 """Get the available streams from the spec.""" 223 return [s.name for s in self.discovered_catalog.streams] 224 225 def _get_incremental_stream_names(self) -> list[str]: 226 """Get the name of streams that support incremental sync.""" 227 return [ 228 stream.name 229 for stream in self.discovered_catalog.streams 230 if SyncMode.incremental in stream.supported_sync_modes 231 ] 232 233 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 234 """Call spec on the connector. 235 236 This involves the following steps: 237 * execute the connector with spec 238 * Listen to the messages and return the first AirbyteCatalog that comes along. 239 * Make sure the subprocess is killed when the function returns. 240 """ 241 if force_refresh or self._spec is None: 242 for msg in self._execute(["spec"]): 243 if msg.type == Type.SPEC and msg.spec: 244 self._spec = msg.spec 245 break 246 247 if self._spec: 248 return self._spec 249 250 raise exc.AirbyteConnectorMissingSpecError( 251 connector_name=self.name, 252 log_text=self._last_log_messages, 253 ) 254 255 @property 256 def config_spec(self) -> dict[str, Any]: 257 """Generate a configuration spec for this connector, as a JSON Schema definition. 258 259 This function generates a JSON Schema dictionary with configuration specs for the 260 current connector, as a dictionary. 261 262 Returns: 263 dict: The JSON Schema configuration spec as a dictionary. 264 """ 265 return self._get_spec(force_refresh=True).connectionSpecification 266 267 def print_config_spec( 268 self, 269 format: Literal["yaml", "json"] = "yaml", # noqa: A002 270 *, 271 output_file: Path | str | None = None, 272 ) -> None: 273 """Print the configuration spec for this connector. 274 275 Args: 276 format: The format to print the spec in. Must be "yaml" or "json". 277 output_file: Optional. If set, the spec will be written to the given file path. 278 Otherwise, it will be printed to the console. 279 """ 280 if format not in {"yaml", "json"}: 281 raise exc.PyAirbyteInputError( 282 message="Invalid format. Expected 'yaml' or 'json'", 283 input_value=format, 284 ) 285 if isinstance(output_file, str): 286 output_file = Path(output_file) 287 288 if format == "yaml": 289 content = yaml.dump(self.config_spec, indent=2) 290 elif format == "json": 291 content = json.dumps(self.config_spec, indent=2) 292 293 if output_file: 294 output_file.write_text(content) 295 return 296 297 syntax_highlighted = Syntax(content, format) 298 print(syntax_highlighted) 299 300 @property 301 def _yaml_spec(self) -> str: 302 """Get the spec as a yaml string. 303 304 For now, the primary use case is for writing and debugging a valid config for a source. 305 306 This is private for now because we probably want better polish before exposing this 307 as a stable interface. This will also get easier when we have docs links with this info 308 for each connector. 309 """ 310 spec_obj: ConnectorSpecification = self._get_spec() 311 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 312 # convert to a yaml string 313 return yaml.dump(spec_dict) 314 315 @property 316 def docs_url(self) -> str: 317 """Get the URL to the connector's documentation.""" 318 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 319 "source-", "" 320 ) 321 322 @property 323 def discovered_catalog(self) -> AirbyteCatalog: 324 """Get the raw catalog for the given streams. 325 326 If the catalog is not yet known, we call discover to get it. 327 """ 328 if self._discovered_catalog is None: 329 self._discovered_catalog = self._discover() 330 331 return self._discovered_catalog 332 333 @property 334 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 335 """Get the configured catalog for the given streams. 336 337 If the raw catalog is not yet known, we call discover to get it. 338 339 If no specific streams are selected, we return a catalog that syncs all available streams. 340 341 TODO: We should consider disabling by default the streams that the connector would 342 disable by default. (For instance, streams that require a premium license are sometimes 343 disabled by default within the connector.) 344 """ 345 # Ensure discovered catalog is cached before we start 346 _ = self.discovered_catalog 347 348 # Filter for selected streams if set, otherwise use all available streams: 349 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 350 return self.get_configured_catalog(streams=streams_filter) 351 352 def get_configured_catalog( 353 self, 354 streams: Literal["*"] | list[str] | None = None, 355 ) -> ConfiguredAirbyteCatalog: 356 """Get a configured catalog for the given streams. 357 358 If no streams are provided, the selected streams will be used. If no streams are selected, 359 all available streams will be used. 360 361 If '*' is provided, all available streams will be used. 362 """ 363 selected_streams: list[str] = [] 364 if streams is None: 365 selected_streams = self._selected_stream_names or self.get_available_streams() 366 elif streams == "*": 367 selected_streams = self.get_available_streams() 368 elif isinstance(streams, list): 369 selected_streams = streams 370 else: 371 raise exc.PyAirbyteInputError( 372 message="Invalid streams argument.", 373 input_value=streams, 374 ) 375 376 return ConfiguredAirbyteCatalog( 377 streams=[ 378 ConfiguredAirbyteStream( 379 stream=stream, 380 destination_sync_mode=DestinationSyncMode.overwrite, 381 primary_key=stream.source_defined_primary_key, 382 sync_mode=SyncMode.incremental, 383 ) 384 for stream in self.discovered_catalog.streams 385 if stream.name in selected_streams 386 ], 387 ) 388 389 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 390 """Return the JSON Schema spec for the specified stream name.""" 391 catalog: AirbyteCatalog = self.discovered_catalog 392 found: list[AirbyteStream] = [ 393 stream for stream in catalog.streams if stream.name == stream_name 394 ] 395 396 if len(found) == 0: 397 raise exc.PyAirbyteInputError( 398 message="Stream name does not exist in catalog.", 399 input_value=stream_name, 400 ) 401 402 if len(found) > 1: 403 raise exc.PyAirbyteInternalError( 404 message="Duplicate streams found with the same name.", 405 context={ 406 "found_streams": found, 407 }, 408 ) 409 410 return found[0].json_schema 411 412 def get_records( 413 self, 414 stream: str, 415 *, 416 normalize_field_names: bool = False, 417 prune_undeclared_fields: bool = True, 418 ) -> LazyDataset: 419 """Read a stream from the connector. 420 421 Args: 422 stream: The name of the stream to read. 423 normalize_field_names: When `True`, field names will be normalized to lower case, with 424 special characters removed. This matches the behavior of PyAirbyte caches and most 425 Airbyte destinations. 426 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 427 which generally matches the behavior of PyAirbyte caches and most Airbyte 428 destinations, specifically when you expect the catalog may be stale. You can disable 429 this to keep all fields in the records. 430 431 This involves the following steps: 432 * Call discover to get the catalog 433 * Generate a configured catalog that syncs the given stream in full_refresh mode 434 * Write the configured catalog and the config to a temporary file 435 * execute the connector with read --config <config_file> --catalog <catalog_file> 436 * Listen to the messages and return the first AirbyteRecordMessages that come along. 437 * Make sure the subprocess is killed when the function returns. 438 """ 439 discovered_catalog: AirbyteCatalog = self.discovered_catalog 440 configured_catalog = ConfiguredAirbyteCatalog( 441 streams=[ 442 ConfiguredAirbyteStream( 443 stream=s, 444 sync_mode=SyncMode.full_refresh, 445 destination_sync_mode=DestinationSyncMode.overwrite, 446 ) 447 for s in discovered_catalog.streams 448 if s.name == stream 449 ], 450 ) 451 if len(configured_catalog.streams) == 0: 452 raise exc.PyAirbyteInputError( 453 message="Requested stream does not exist.", 454 context={ 455 "stream": stream, 456 "available_streams": self.get_available_streams(), 457 "connector_name": self.name, 458 }, 459 ) from KeyError(stream) 460 461 configured_stream = configured_catalog.streams[0] 462 463 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 464 yield from records 465 466 stream_record_handler = StreamRecordHandler( 467 json_schema=self.get_stream_json_schema(stream), 468 prune_extra_fields=prune_undeclared_fields, 469 normalize_keys=normalize_field_names, 470 ) 471 472 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 473 progress_tracker = ProgressTracker( 474 ProgressStyle.PLAIN, 475 source=self, 476 cache=None, 477 destination=None, 478 expected_streams=[stream], 479 ) 480 481 iterator: Iterator[dict[str, Any]] = ( 482 StreamRecord.from_record_message( 483 record_message=record.record, 484 stream_record_handler=stream_record_handler, 485 ) 486 for record in self._read_with_catalog( 487 catalog=configured_catalog, 488 progress_tracker=progress_tracker, 489 ) 490 if record.record 491 ) 492 progress_tracker.log_success() 493 return LazyDataset( 494 iterator, 495 stream_metadata=configured_stream, 496 ) 497 498 def get_documents( 499 self, 500 stream: str, 501 title_property: str | None = None, 502 content_properties: list[str] | None = None, 503 metadata_properties: list[str] | None = None, 504 *, 505 render_metadata: bool = False, 506 ) -> Iterable[Document]: 507 """Read a stream from the connector and return the records as documents. 508 509 If metadata_properties is not set, all properties that are not content will be added to 510 the metadata. 511 512 If render_metadata is True, metadata will be rendered in the document, as well as the 513 the main content. 514 """ 515 return self.get_records(stream).to_documents( 516 title_property=title_property, 517 content_properties=content_properties, 518 metadata_properties=metadata_properties, 519 render_metadata=render_metadata, 520 ) 521 522 def _get_airbyte_message_iterator( 523 self, 524 *, 525 streams: Literal["*"] | list[str] | None = None, 526 state_provider: StateProviderBase | None = None, 527 progress_tracker: ProgressTracker, 528 force_full_refresh: bool = False, 529 ) -> AirbyteMessageIterator: 530 """Get an AirbyteMessageIterator for this source.""" 531 return AirbyteMessageIterator( 532 self._read_with_catalog( 533 catalog=self.get_configured_catalog(streams=streams), 534 state=state_provider if not force_full_refresh else None, 535 progress_tracker=progress_tracker, 536 ) 537 ) 538 539 def _read_with_catalog( 540 self, 541 catalog: ConfiguredAirbyteCatalog, 542 progress_tracker: ProgressTracker, 543 state: StateProviderBase | None = None, 544 ) -> Generator[AirbyteMessage, None, None]: 545 """Call read on the connector. 546 547 This involves the following steps: 548 * Write the config to a temporary file 549 * execute the connector with read --config <config_file> --catalog <catalog_file> 550 * Listen to the messages and return the AirbyteRecordMessages that come along. 551 * Send out telemetry on the performed sync (with information about which source was used and 552 the type of the cache) 553 """ 554 with as_temp_files( 555 [ 556 self._config, 557 catalog.model_dump_json(), 558 state.to_state_input_file_text() if state else "[]", 559 ] 560 ) as [ 561 config_file, 562 catalog_file, 563 state_file, 564 ]: 565 message_generator = self._execute( 566 [ 567 "read", 568 "--config", 569 config_file, 570 "--catalog", 571 catalog_file, 572 "--state", 573 state_file, 574 ], 575 progress_tracker=progress_tracker, 576 ) 577 yield from progress_tracker.tally_records_read(message_generator) 578 progress_tracker.log_read_complete() 579 580 def _peek_airbyte_message( 581 self, 582 message: AirbyteMessage, 583 *, 584 raise_on_error: bool = True, 585 ) -> None: 586 """Process an Airbyte message. 587 588 This method handles reading Airbyte messages and taking action, if needed, based on the 589 message type. For instance, log messages are logged, records are tallied, and errors are 590 raised as exceptions if `raise_on_error` is True. 591 592 Raises: 593 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 594 """ 595 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 596 597 def _log_incremental_streams( 598 self, 599 *, 600 incremental_streams: set[str] | None = None, 601 ) -> None: 602 """Log the streams which are using incremental sync mode.""" 603 log_message = ( 604 "The following streams are currently using incremental sync:\n" 605 f"{incremental_streams}\n" 606 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 607 ) 608 print(log_message) 609 610 def read( 611 self, 612 cache: CacheBase | None = None, 613 *, 614 streams: str | list[str] | None = None, 615 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 616 force_full_refresh: bool = False, 617 skip_validation: bool = False, 618 ) -> ReadResult: 619 """Read from the connector and write to the cache. 620 621 Args: 622 cache: The cache to write to. If not set, a default cache will be used. 623 streams: Optional if already set. A list of stream names to select for reading. If set 624 to "*", all streams will be selected. 625 write_strategy: The strategy to use when writing to the cache. If a string, it must be 626 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 627 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 628 WriteStrategy.AUTO. 629 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 630 streams will be read in incremental mode if supported by the connector. This option 631 must be True when using the "replace" strategy. 632 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 633 running the connector. This can be helpful in debugging, when you want to send 634 configurations to the connector that otherwise might be rejected by JSON Schema 635 validation rules. 636 """ 637 cache = cache or get_default_cache() 638 progress_tracker = ProgressTracker( 639 source=self, 640 cache=cache, 641 destination=None, 642 expected_streams=None, # Will be set later 643 ) 644 645 # Set up state provider if not in full refresh mode 646 if force_full_refresh: 647 state_provider: StateProviderBase | None = None 648 else: 649 state_provider = cache.get_state_provider( 650 source_name=self._name, 651 ) 652 state_writer = cache.get_state_writer(source_name=self._name) 653 654 if streams: 655 self.select_streams(streams) 656 657 if not self._selected_stream_names: 658 raise exc.PyAirbyteNoStreamsSelectedError( 659 connector_name=self.name, 660 available_streams=self.get_available_streams(), 661 ) 662 663 try: 664 result = self._read_to_cache( 665 cache=cache, 666 catalog_provider=CatalogProvider(self.configured_catalog), 667 stream_names=self._selected_stream_names, 668 state_provider=state_provider, 669 state_writer=state_writer, 670 write_strategy=write_strategy, 671 force_full_refresh=force_full_refresh, 672 skip_validation=skip_validation, 673 progress_tracker=progress_tracker, 674 ) 675 except exc.PyAirbyteInternalError as ex: 676 progress_tracker.log_failure(exception=ex) 677 raise exc.AirbyteConnectorFailedError( 678 connector_name=self.name, 679 log_text=self._last_log_messages, 680 ) from ex 681 except Exception as ex: 682 progress_tracker.log_failure(exception=ex) 683 raise 684 685 progress_tracker.log_success() 686 return result 687 688 def _read_to_cache( # noqa: PLR0913 # Too many arguments 689 self, 690 cache: CacheBase, 691 *, 692 catalog_provider: CatalogProvider, 693 stream_names: list[str], 694 state_provider: StateProviderBase | None, 695 state_writer: StateWriterBase | None, 696 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 697 force_full_refresh: bool = False, 698 skip_validation: bool = False, 699 progress_tracker: ProgressTracker, 700 ) -> ReadResult: 701 """Internal read method.""" 702 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 703 warnings.warn( 704 message=( 705 "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " 706 "could result in data loss. " 707 "To silence this warning, use the following: " 708 'warnings.filterwarnings("ignore", ' 709 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 710 ), 711 category=exc.PyAirbyteDataLossWarning, 712 stacklevel=1, 713 ) 714 if isinstance(write_strategy, str): 715 try: 716 write_strategy = WriteStrategy(write_strategy) 717 except ValueError: 718 raise exc.PyAirbyteInputError( 719 message="Invalid strategy", 720 context={ 721 "write_strategy": write_strategy, 722 "available_strategies": [s.value for s in WriteStrategy], 723 }, 724 ) from None 725 726 # Run optional validation step 727 if not skip_validation: 728 self.validate_config() 729 730 # Log incremental stream if incremental streams are known 731 if state_provider and state_provider.known_stream_names: 732 # Retrieve set of the known streams support which support incremental sync 733 incremental_streams = ( 734 set(self._get_incremental_stream_names()) 735 & state_provider.known_stream_names 736 & set(self.get_selected_streams()) 737 ) 738 if incremental_streams: 739 self._log_incremental_streams(incremental_streams=incremental_streams) 740 741 airbyte_message_iterator = AirbyteMessageIterator( 742 self._read_with_catalog( 743 catalog=catalog_provider.configured_catalog, 744 state=state_provider, 745 progress_tracker=progress_tracker, 746 ) 747 ) 748 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 749 stdin=airbyte_message_iterator, 750 catalog_provider=catalog_provider, 751 write_strategy=write_strategy, 752 state_writer=state_writer, 753 progress_tracker=progress_tracker, 754 ) 755 756 # Flush the WAL, if applicable 757 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 758 759 return ReadResult( 760 source_name=self.name, 761 progress_tracker=progress_tracker, 762 processed_streams=stream_names, 763 cache=cache, 764 )
A class representing a source that can be called.
58 def __init__( 59 self, 60 executor: Executor, 61 name: str, 62 config: dict[str, Any] | None = None, 63 *, 64 config_change_callback: ConfigChangeCallback | None = None, 65 streams: str | list[str] | None = None, 66 validate: bool = False, 67 ) -> None: 68 """Initialize the source. 69 70 If config is provided, it will be validated against the spec if validate is True. 71 """ 72 self._to_be_selected_streams: list[str] | str = [] 73 """Used to hold selection criteria before catalog is known.""" 74 75 super().__init__( 76 executor=executor, 77 name=name, 78 config=config, 79 config_change_callback=config_change_callback, 80 validate=validate, 81 ) 82 self._config_dict: dict[str, Any] | None = None 83 self._last_log_messages: list[str] = [] 84 self._discovered_catalog: AirbyteCatalog | None = None 85 self._selected_stream_names: list[str] = [] 86 if config is not None: 87 self.set_config(config, validate=validate) 88 if streams is not None: 89 self.select_streams(streams) 90 91 self._deployed_api_root: str | None = None 92 self._deployed_workspace_id: str | None = None 93 self._deployed_source_id: str | None = None
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
95 def set_streams(self, streams: list[str]) -> None: 96 """Deprecated. See select_streams().""" 97 warnings.warn( 98 "The 'set_streams' method is deprecated and will be removed in a future version. " 99 "Please use the 'select_streams' method instead.", 100 DeprecationWarning, 101 stacklevel=2, 102 ) 103 self.select_streams(streams)
Deprecated. See select_streams().
117 def select_all_streams(self) -> None: 118 """Select all streams. 119 120 This is a more streamlined equivalent to: 121 > source.select_streams(source.get_available_streams()). 122 """ 123 if self._config_dict is None: 124 self._to_be_selected_streams = "*" 125 self._log_warning_preselected_stream(self._to_be_selected_streams) 126 return 127 128 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
130 def select_streams(self, streams: str | list[str]) -> None: 131 """Select the stream names that should be read from the connector. 132 133 Args: 134 streams: A list of stream names to select. If set to "*", all streams will be selected. 135 136 Currently, if this is not set, all streams will be read. 137 """ 138 if self._config_dict is None: 139 self._to_be_selected_streams = streams 140 self._log_warning_preselected_stream(streams) 141 return 142 143 if streams == "*": 144 self.select_all_streams() 145 return 146 147 if isinstance(streams, str): 148 # If a single stream is provided, convert it to a one-item list 149 streams = [streams] 150 151 available_streams = self.get_available_streams() 152 for stream in streams: 153 if stream not in available_streams: 154 raise exc.AirbyteStreamNotFoundError( 155 stream_name=stream, 156 connector_name=self.name, 157 available_streams=available_streams, 158 ) 159 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Arguments:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
161 def get_selected_streams(self) -> list[str]: 162 """Get the selected streams. 163 164 If no streams are selected, return an empty list. 165 """ 166 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
168 def set_config( 169 self, 170 config: dict[str, Any], 171 *, 172 validate: bool = True, 173 ) -> None: 174 """Set the config for the connector. 175 176 If validate is True, raise an exception if the config fails validation. 177 178 If validate is False, validation will be deferred until check() or validate_config() 179 is called. 180 """ 181 if validate: 182 self.validate_config(config) 183 184 self._config_dict = config 185 186 if self._to_be_selected_streams: 187 self.select_streams(self._to_be_selected_streams) 188 self._to_be_selected_streams = []
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
190 def get_config(self) -> dict[str, Any]: 191 """Get the config for the connector.""" 192 return self._config
Get the config for the connector.
221 def get_available_streams(self) -> list[str]: 222 """Get the available streams from the spec.""" 223 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
255 @property 256 def config_spec(self) -> dict[str, Any]: 257 """Generate a configuration spec for this connector, as a JSON Schema definition. 258 259 This function generates a JSON Schema dictionary with configuration specs for the 260 current connector, as a dictionary. 261 262 Returns: 263 dict: The JSON Schema configuration spec as a dictionary. 264 """ 265 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
267 def print_config_spec( 268 self, 269 format: Literal["yaml", "json"] = "yaml", # noqa: A002 270 *, 271 output_file: Path | str | None = None, 272 ) -> None: 273 """Print the configuration spec for this connector. 274 275 Args: 276 format: The format to print the spec in. Must be "yaml" or "json". 277 output_file: Optional. If set, the spec will be written to the given file path. 278 Otherwise, it will be printed to the console. 279 """ 280 if format not in {"yaml", "json"}: 281 raise exc.PyAirbyteInputError( 282 message="Invalid format. Expected 'yaml' or 'json'", 283 input_value=format, 284 ) 285 if isinstance(output_file, str): 286 output_file = Path(output_file) 287 288 if format == "yaml": 289 content = yaml.dump(self.config_spec, indent=2) 290 elif format == "json": 291 content = json.dumps(self.config_spec, indent=2) 292 293 if output_file: 294 output_file.write_text(content) 295 return 296 297 syntax_highlighted = Syntax(content, format) 298 print(syntax_highlighted)
Print the configuration spec for this connector.
Arguments:
- format: The format to print the spec in. Must be "yaml" or "json".
- output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
315 @property 316 def docs_url(self) -> str: 317 """Get the URL to the connector's documentation.""" 318 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 319 "source-", "" 320 )
Get the URL to the connector's documentation.
322 @property 323 def discovered_catalog(self) -> AirbyteCatalog: 324 """Get the raw catalog for the given streams. 325 326 If the catalog is not yet known, we call discover to get it. 327 """ 328 if self._discovered_catalog is None: 329 self._discovered_catalog = self._discover() 330 331 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
333 @property 334 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 335 """Get the configured catalog for the given streams. 336 337 If the raw catalog is not yet known, we call discover to get it. 338 339 If no specific streams are selected, we return a catalog that syncs all available streams. 340 341 TODO: We should consider disabling by default the streams that the connector would 342 disable by default. (For instance, streams that require a premium license are sometimes 343 disabled by default within the connector.) 344 """ 345 # Ensure discovered catalog is cached before we start 346 _ = self.discovered_catalog 347 348 # Filter for selected streams if set, otherwise use all available streams: 349 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 350 return self.get_configured_catalog(streams=streams_filter)
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
352 def get_configured_catalog( 353 self, 354 streams: Literal["*"] | list[str] | None = None, 355 ) -> ConfiguredAirbyteCatalog: 356 """Get a configured catalog for the given streams. 357 358 If no streams are provided, the selected streams will be used. If no streams are selected, 359 all available streams will be used. 360 361 If '*' is provided, all available streams will be used. 362 """ 363 selected_streams: list[str] = [] 364 if streams is None: 365 selected_streams = self._selected_stream_names or self.get_available_streams() 366 elif streams == "*": 367 selected_streams = self.get_available_streams() 368 elif isinstance(streams, list): 369 selected_streams = streams 370 else: 371 raise exc.PyAirbyteInputError( 372 message="Invalid streams argument.", 373 input_value=streams, 374 ) 375 376 return ConfiguredAirbyteCatalog( 377 streams=[ 378 ConfiguredAirbyteStream( 379 stream=stream, 380 destination_sync_mode=DestinationSyncMode.overwrite, 381 primary_key=stream.source_defined_primary_key, 382 sync_mode=SyncMode.incremental, 383 ) 384 for stream in self.discovered_catalog.streams 385 if stream.name in selected_streams 386 ], 387 )
Get a configured catalog for the given streams.
If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.
If '*' is provided, all available streams will be used.
389 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 390 """Return the JSON Schema spec for the specified stream name.""" 391 catalog: AirbyteCatalog = self.discovered_catalog 392 found: list[AirbyteStream] = [ 393 stream for stream in catalog.streams if stream.name == stream_name 394 ] 395 396 if len(found) == 0: 397 raise exc.PyAirbyteInputError( 398 message="Stream name does not exist in catalog.", 399 input_value=stream_name, 400 ) 401 402 if len(found) > 1: 403 raise exc.PyAirbyteInternalError( 404 message="Duplicate streams found with the same name.", 405 context={ 406 "found_streams": found, 407 }, 408 ) 409 410 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
412 def get_records( 413 self, 414 stream: str, 415 *, 416 normalize_field_names: bool = False, 417 prune_undeclared_fields: bool = True, 418 ) -> LazyDataset: 419 """Read a stream from the connector. 420 421 Args: 422 stream: The name of the stream to read. 423 normalize_field_names: When `True`, field names will be normalized to lower case, with 424 special characters removed. This matches the behavior of PyAirbyte caches and most 425 Airbyte destinations. 426 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 427 which generally matches the behavior of PyAirbyte caches and most Airbyte 428 destinations, specifically when you expect the catalog may be stale. You can disable 429 this to keep all fields in the records. 430 431 This involves the following steps: 432 * Call discover to get the catalog 433 * Generate a configured catalog that syncs the given stream in full_refresh mode 434 * Write the configured catalog and the config to a temporary file 435 * execute the connector with read --config <config_file> --catalog <catalog_file> 436 * Listen to the messages and return the first AirbyteRecordMessages that come along. 437 * Make sure the subprocess is killed when the function returns. 438 """ 439 discovered_catalog: AirbyteCatalog = self.discovered_catalog 440 configured_catalog = ConfiguredAirbyteCatalog( 441 streams=[ 442 ConfiguredAirbyteStream( 443 stream=s, 444 sync_mode=SyncMode.full_refresh, 445 destination_sync_mode=DestinationSyncMode.overwrite, 446 ) 447 for s in discovered_catalog.streams 448 if s.name == stream 449 ], 450 ) 451 if len(configured_catalog.streams) == 0: 452 raise exc.PyAirbyteInputError( 453 message="Requested stream does not exist.", 454 context={ 455 "stream": stream, 456 "available_streams": self.get_available_streams(), 457 "connector_name": self.name, 458 }, 459 ) from KeyError(stream) 460 461 configured_stream = configured_catalog.streams[0] 462 463 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 464 yield from records 465 466 stream_record_handler = StreamRecordHandler( 467 json_schema=self.get_stream_json_schema(stream), 468 prune_extra_fields=prune_undeclared_fields, 469 normalize_keys=normalize_field_names, 470 ) 471 472 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 473 progress_tracker = ProgressTracker( 474 ProgressStyle.PLAIN, 475 source=self, 476 cache=None, 477 destination=None, 478 expected_streams=[stream], 479 ) 480 481 iterator: Iterator[dict[str, Any]] = ( 482 StreamRecord.from_record_message( 483 record_message=record.record, 484 stream_record_handler=stream_record_handler, 485 ) 486 for record in self._read_with_catalog( 487 catalog=configured_catalog, 488 progress_tracker=progress_tracker, 489 ) 490 if record.record 491 ) 492 progress_tracker.log_success() 493 return LazyDataset( 494 iterator, 495 stream_metadata=configured_stream, 496 )
Read a stream from the connector.
Arguments:
- stream: The name of the stream to read.
- normalize_field_names: When
True
, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations. - prune_undeclared_fields: When
True
, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
498 def get_documents( 499 self, 500 stream: str, 501 title_property: str | None = None, 502 content_properties: list[str] | None = None, 503 metadata_properties: list[str] | None = None, 504 *, 505 render_metadata: bool = False, 506 ) -> Iterable[Document]: 507 """Read a stream from the connector and return the records as documents. 508 509 If metadata_properties is not set, all properties that are not content will be added to 510 the metadata. 511 512 If render_metadata is True, metadata will be rendered in the document, as well as the 513 the main content. 514 """ 515 return self.get_records(stream).to_documents( 516 title_property=title_property, 517 content_properties=content_properties, 518 metadata_properties=metadata_properties, 519 render_metadata=render_metadata, 520 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
610 def read( 611 self, 612 cache: CacheBase | None = None, 613 *, 614 streams: str | list[str] | None = None, 615 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 616 force_full_refresh: bool = False, 617 skip_validation: bool = False, 618 ) -> ReadResult: 619 """Read from the connector and write to the cache. 620 621 Args: 622 cache: The cache to write to. If not set, a default cache will be used. 623 streams: Optional if already set. A list of stream names to select for reading. If set 624 to "*", all streams will be selected. 625 write_strategy: The strategy to use when writing to the cache. If a string, it must be 626 one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one 627 of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or 628 WriteStrategy.AUTO. 629 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 630 streams will be read in incremental mode if supported by the connector. This option 631 must be True when using the "replace" strategy. 632 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 633 running the connector. This can be helpful in debugging, when you want to send 634 configurations to the connector that otherwise might be rejected by JSON Schema 635 validation rules. 636 """ 637 cache = cache or get_default_cache() 638 progress_tracker = ProgressTracker( 639 source=self, 640 cache=cache, 641 destination=None, 642 expected_streams=None, # Will be set later 643 ) 644 645 # Set up state provider if not in full refresh mode 646 if force_full_refresh: 647 state_provider: StateProviderBase | None = None 648 else: 649 state_provider = cache.get_state_provider( 650 source_name=self._name, 651 ) 652 state_writer = cache.get_state_writer(source_name=self._name) 653 654 if streams: 655 self.select_streams(streams) 656 657 if not self._selected_stream_names: 658 raise exc.PyAirbyteNoStreamsSelectedError( 659 connector_name=self.name, 660 available_streams=self.get_available_streams(), 661 ) 662 663 try: 664 result = self._read_to_cache( 665 cache=cache, 666 catalog_provider=CatalogProvider(self.configured_catalog), 667 stream_names=self._selected_stream_names, 668 state_provider=state_provider, 669 state_writer=state_writer, 670 write_strategy=write_strategy, 671 force_full_refresh=force_full_refresh, 672 skip_validation=skip_validation, 673 progress_tracker=progress_tracker, 674 ) 675 except exc.PyAirbyteInternalError as ex: 676 progress_tracker.log_failure(exception=ex) 677 raise exc.AirbyteConnectorFailedError( 678 connector_name=self.name, 679 log_text=self._last_log_messages, 680 ) from ex 681 except Exception as ex: 682 progress_tracker.log_failure(exception=ex) 683 raise 684 685 progress_tracker.log_success() 686 return result
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If not set, a default cache will be used.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
- skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- config_hash
- validate_config
- connector_version
- check
- install
- uninstall
173class StreamRecord(dict[str, Any]): 174 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 175 176 It has these behaviors: 177 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 178 case-insensitive manner. 179 - The original case is stored in a separate dictionary, so that the original case can be 180 retrieved when needed. 181 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 182 Python dictionary. 183 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 184 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 185 186 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 187 references. 188 189 There are two ways this class can store keys internally: 190 - If normalize_keys is True, the keys are normalized using the given normalizer. 191 - If normalize_keys is False, the original case of the keys is stored. 192 193 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 194 dictionary will be initialized with the given keys. If a key is not found in the input data, it 195 will be initialized with a value of None. When provided, the 'expected_keys' input will also 196 determine the original case of the keys. 197 """ 198 199 def __init__( 200 self, 201 from_dict: dict, 202 *, 203 stream_record_handler: StreamRecordHandler, 204 with_internal_columns: bool = True, 205 extracted_at: datetime | None = None, 206 ) -> None: 207 """Initialize the dictionary with the given data. 208 209 Args: 210 from_dict: The dictionary to initialize the StreamRecord with. 211 stream_record_handler: The StreamRecordHandler to use for processing the record. 212 with_internal_columns: If `True`, the internal columns will be added to the record. 213 extracted_at: The time the record was extracted. If not provided, the current time will 214 be used. 215 """ 216 self._stream_handler: StreamRecordHandler = stream_record_handler 217 218 # Start by initializing all values to None 219 self.update(dict.fromkeys(stream_record_handler.index_keys)) 220 221 # Update the dictionary with the given data 222 if self._stream_handler.prune_extra_fields: 223 self.update( 224 { 225 self._stream_handler.to_index_case(k): v 226 for k, v in from_dict.items() 227 if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys 228 } 229 ) 230 else: 231 self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()}) 232 233 if with_internal_columns: 234 self.update( 235 { 236 AB_RAW_ID_COLUMN: uuid7str(), 237 AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc), 238 AB_META_COLUMN: {}, 239 } 240 ) 241 242 @classmethod 243 def from_record_message( 244 cls, 245 record_message: AirbyteRecordMessage, 246 *, 247 stream_record_handler: StreamRecordHandler, 248 ) -> StreamRecord: 249 """Return a StreamRecord from a RecordMessage.""" 250 data_dict: dict[str, Any] = record_message.data.copy() 251 return cls( 252 from_dict=data_dict, 253 stream_record_handler=stream_record_handler, 254 with_internal_columns=True, 255 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc), 256 ) 257 258 def __getitem__(self, key: str) -> Any: # noqa: ANN401 259 """Return the item with the given key.""" 260 try: 261 return super().__getitem__(key) 262 except KeyError: 263 return super().__getitem__(self._stream_handler.to_index_case(key)) 264 265 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 266 """Set the item with the given key to the given value.""" 267 index_case_key = self._stream_handler.to_index_case(key) 268 if ( 269 self._stream_handler.prune_extra_fields 270 and index_case_key not in self._stream_handler.index_keys 271 ): 272 return 273 274 super().__setitem__(index_case_key, value) 275 276 def __delitem__(self, key: str) -> None: 277 """Delete the item with the given key.""" 278 try: 279 super().__delitem__(key) 280 except KeyError: 281 index_case_key = self._stream_handler.to_index_case(key) 282 if super().__contains__(index_case_key): 283 super().__delitem__(index_case_key) 284 return 285 else: 286 # No failure. Key was deleted. 287 return 288 289 raise KeyError(key) 290 291 def __contains__(self, key: object) -> bool: 292 """Return whether the dictionary contains the given key.""" 293 assert isinstance(key, str), "Key must be a string." 294 return super().__contains__(key) or super().__contains__( 295 self._stream_handler.to_index_case(key) 296 ) 297 298 def __iter__(self) -> Iterator[str]: 299 """Return an iterator over the keys of the dictionary.""" 300 return iter(super().__iter__()) 301 302 def __len__(self) -> int: 303 """Return the number of items in the dictionary.""" 304 return super().__len__() 305 306 def __eq__(self, other: object) -> bool: 307 """Return whether the StreamRecord is equal to the given dict or StreamRecord object.""" 308 if isinstance(other, StreamRecord): 309 return dict(self) == dict(other) 310 311 if isinstance(other, dict): 312 return {k.lower(): v for k, v in self.items()} == { 313 k.lower(): v for k, v in other.items() 314 } 315 return False 316 317 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 318 """Return the hash of the dictionary with keys sorted.""" 319 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 320 return hash(tuple(sorted(items)))
The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
It has these behaviors:
- When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
- The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
- Because it is subclassed from
dict
, theStreamRecord
class can be passed as a normal Python dictionary. - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
metadata columns:
_airbyte_raw_id
,_airbyte_extracted_at
, and_airbyte_meta
.
This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.
There are two ways this class can store keys internally:
- If normalize_keys is True, the keys are normalized using the given normalizer.
- If normalize_keys is False, the original case of the keys is stored.
In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.
242 @classmethod 243 def from_record_message( 244 cls, 245 record_message: AirbyteRecordMessage, 246 *, 247 stream_record_handler: StreamRecordHandler, 248 ) -> StreamRecord: 249 """Return a StreamRecord from a RecordMessage.""" 250 data_dict: dict[str, Any] = record_message.data.copy() 251 return cls( 252 from_dict=data_dict, 253 stream_record_handler=stream_record_handler, 254 with_internal_columns=True, 255 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc), 256 )
Return a StreamRecord from a RecordMessage.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
104class WriteResult: 105 """The result of a write operation. 106 107 This class is used to return information about the write operation, such as the number of 108 records written. It should not be created directly, but instead returned by the write method 109 of a destination. 110 """ 111 112 def __init__( 113 self, 114 *, 115 destination: AirbyteWriterInterface | Destination, 116 source_data: Source | ReadResult, 117 catalog_provider: CatalogProvider, 118 state_writer: StateWriterBase, 119 progress_tracker: ProgressTracker, 120 ) -> None: 121 """Initialize a write result. 122 123 This class should not be created directly. Instead, it should be returned by the `write` 124 method of the `Destination` class. 125 """ 126 self._destination: AirbyteWriterInterface | Destination = destination 127 self._source_data: Source | ReadResult = source_data 128 self._catalog_provider: CatalogProvider = catalog_provider 129 self._state_writer: StateWriterBase = state_writer 130 self._progress_tracker: ProgressTracker = progress_tracker 131 132 @property 133 def processed_records(self) -> int: 134 """The total number of records written to the destination.""" 135 return self._progress_tracker.total_destination_records_delivered 136 137 def get_state_provider(self) -> StateProviderBase: 138 """Return the state writer as a state provider. 139 140 As a public interface, we only expose the state writer as a state provider. This is because 141 the state writer itself is only intended for internal use. As a state provider, the state 142 writer can be used to read the state artifacts that were written. This can be useful for 143 testing or debugging. 144 """ 145 return self._state_writer
The result of a write operation.
This class is used to return information about the write operation, such as the number of records written. It should not be created directly, but instead returned by the write method of a destination.
112 def __init__( 113 self, 114 *, 115 destination: AirbyteWriterInterface | Destination, 116 source_data: Source | ReadResult, 117 catalog_provider: CatalogProvider, 118 state_writer: StateWriterBase, 119 progress_tracker: ProgressTracker, 120 ) -> None: 121 """Initialize a write result. 122 123 This class should not be created directly. Instead, it should be returned by the `write` 124 method of the `Destination` class. 125 """ 126 self._destination: AirbyteWriterInterface | Destination = destination 127 self._source_data: Source | ReadResult = source_data 128 self._catalog_provider: CatalogProvider = catalog_provider 129 self._state_writer: StateWriterBase = state_writer 130 self._progress_tracker: ProgressTracker = progress_tracker
Initialize a write result.
This class should not be created directly. Instead, it should be returned by the write
method of the Destination
class.
132 @property 133 def processed_records(self) -> int: 134 """The total number of records written to the destination.""" 135 return self._progress_tracker.total_destination_records_delivered
The total number of records written to the destination.
137 def get_state_provider(self) -> StateProviderBase: 138 """Return the state writer as a state provider. 139 140 As a public interface, we only expose the state writer as a state provider. This is because 141 the state writer itself is only intended for internal use. As a state provider, the state 142 writer can be used to read the state artifacts that were written. This can be useful for 143 testing or debugging. 144 """ 145 return self._state_writer
Return the state writer as a state provider.
As a public interface, we only expose the state writer as a state provider. This is because the state writer itself is only intended for internal use. As a state provider, the state writer can be used to read the state artifacts that were written. This can be useful for testing or debugging.