airbyte
PyAirbyte brings the power of Airbyte to every Python developer.
Getting Started
Reading Data
You can connect to any of hundreds of sources
using the get_source method. You can then read data from sources using Source.read method.
from airbyte import get_source
source = get_source(
"source-faker",
config={},
)
read_result = source.read()
for record in read_result["users"]airbyte.records:
print(record)
For more information, see the airbyte.sources module.
Writing to SQL Caches
Data can be written to caches using a number of SQL-based cache implementations, including Postgres, BigQuery, Snowflake, DuckDB, and MotherDuck. If you do not specify a cache, PyAirbyte will automatically use a local DuckDB cache by default.
For more information, see the airbyte.caches module.
Writing to Destination Connectors
Data can be written to destinations using the Destination.write method. You can connect to
destinations using the get_destination method. PyAirbyte supports all Airbyte destinations, but
Docker is required on your machine in order to run Java-based destinations.
Note: When loading to a SQL database, we recommend using SQL cache (where available, see above) instead of a destination connector. This is because SQL caches are Python-native and therefor more portable when run from different Python-based environments which might not have Docker container support. Destinations in PyAirbyte are uniquely suited for loading to non-SQL platforms such as vector stores and other reverse ETL-type use cases.
For more information, see the airbyte.destinations module and the full list of destination
connectors here.
PyAirbyte API
Importing as ab
Most examples in the PyAirbyte documentation use the import airbyte as ab convention. The ab
alias is recommended, making code more concise and readable. When getting started, this
also saves you from digging in submodules to find the classes and functions you need, since
frequently-used classes and functions are available at the top level of the airbyte module.
Navigating the API
While many PyAirbyte classes and functions are available at the top level of the airbyte module,
you can also import classes and functions from submodules directly. For example, while you can
import the Source class from airbyte, you can also import it from the sources submodule like
this:
from airbyte.sources import Source
Whether you import from the top level or from a submodule, the classes and functions are the same. We expect that most users will import from the top level when getting started, and then import from submodules when they are deploying more complex implementations.
For quick reference, top-Level modules are listed in the left sidebar of this page.
Other Resources
- PyAirbyte GitHub Readme
- PyAirbyte Issue Tracker
- Frequently Asked Questions
- PyAirbyte Contributors Guide
- GitHub Releases
API Reference
Below is a list of all classes, functions, and modules available in the top-level airbyte
module. (This is a long list!) If you are just starting out, we recommend beginning by selecting a
submodule to navigate to from the left sidebar or from the list below:
Each module has its own documentation and code samples related to effectively using the related capabilities.
airbyte.cloud- Working with Airbyte Cloud, including running jobs remotely.airbyte.caches- Working with caches, including how to inspect a cache and get data from it.airbyte.datasets- Working with datasets, including how to read from datasets and convert to other formats, such as Pandas, Arrow, and LLM Document formats.airbyte.destinations- Working with destinations, including how to write to Airbyte destinations connectors.airbyte.documents- Working with LLM documents, including how to convert records into document formats, for instance, when working with AI libraries like LangChain.airbyte.exceptions- Definitions of all exception and warning classes used in PyAirbyte.airbyte.experimental- Experimental features and utilities that do not yet have a stable API.airbyte.logs- Logging functionality and configuration.airbyte.records- Internal record handling classes.airbyte.results- Documents the classes returned when working with results fromSource.readandDestination.writeairbyte.secrets- Tools for managing secrets in PyAirbyte.airbyte.sources- Tools for creating and reading from Airbyte sources. This includesairbyte.source.get_sourceto declare a source,airbyte.source.Source.readfor reading data, andairbyte.source.Source.get_records()to peek at records without caching or writing them directly.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""***PyAirbyte brings the power of Airbyte to every Python developer.*** 3 4[](https://badge.fury.io/py/airbyte) 5[](https://pypi.org/project/airbyte/) 6[](https://pypi.org/project/airbyte/) 7[](https://github.com/airbytehq/pyairbyte) 8 9# Getting Started 10 11## Reading Data 12 13You can connect to any of [hundreds of sources](https://docs.airbyte.com/integrations/sources/) 14using the `get_source` method. You can then read data from sources using `Source.read` method. 15 16```python 17from airbyte import get_source 18 19source = get_source( 20 "source-faker", 21 config={}, 22) 23read_result = source.read() 24 25for record in read_result["users"].records: 26 print(record) 27``` 28 29For more information, see the `airbyte.sources` module. 30 31## Writing to SQL Caches 32 33Data can be written to caches using a number of SQL-based cache implementations, including 34Postgres, BigQuery, Snowflake, DuckDB, and MotherDuck. If you do not specify a cache, PyAirbyte 35will automatically use a local DuckDB cache by default. 36 37For more information, see the `airbyte.caches` module. 38 39## Writing to Destination Connectors 40 41Data can be written to destinations using the `Destination.write` method. You can connect to 42destinations using the `get_destination` method. PyAirbyte supports all Airbyte destinations, but 43Docker is required on your machine in order to run Java-based destinations. 44 45**Note:** When loading to a SQL database, we recommend using SQL cache (where available, 46[see above](#writing-to-sql-caches)) instead of a destination connector. This is because SQL caches 47are Python-native and therefor more portable when run from different Python-based environments which 48might not have Docker container support. Destinations in PyAirbyte are uniquely suited for loading 49to non-SQL platforms such as vector stores and other reverse ETL-type use cases. 50 51For more information, see the `airbyte.destinations` module and the full list of destination 52connectors [here](https://docs.airbyte.com/integrations/destinations/). 53 54# PyAirbyte API 55 56## Importing as `ab` 57 58Most examples in the PyAirbyte documentation use the `import airbyte as ab` convention. The `ab` 59alias is recommended, making code more concise and readable. When getting started, this 60also saves you from digging in submodules to find the classes and functions you need, since 61frequently-used classes and functions are available at the top level of the `airbyte` module. 62 63## Navigating the API 64 65While many PyAirbyte classes and functions are available at the top level of the `airbyte` module, 66you can also import classes and functions from submodules directly. For example, while you can 67import the `Source` class from `airbyte`, you can also import it from the `sources` submodule like 68this: 69 70```python 71from airbyte.sources import Source 72``` 73 74Whether you import from the top level or from a submodule, the classes and functions are the same. 75We expect that most users will import from the top level when getting started, and then import from 76submodules when they are deploying more complex implementations. 77 78For quick reference, top-Level modules are listed in the left sidebar of this page. 79 80# Other Resources 81 82- [PyAirbyte GitHub Readme](https://github.com/airbytehq/pyairbyte) 83- [PyAirbyte Issue Tracker](https://github.com/airbytehq/pyairbyte/issues) 84- [Frequently Asked Questions](https://github.com/airbytehq/PyAirbyte/blob/main/docs/faq.md) 85- [PyAirbyte Contributors Guide](https://github.com/airbytehq/PyAirbyte/blob/main/docs/CONTRIBUTING.md) 86- [GitHub Releases](https://github.com/airbytehq/PyAirbyte/releases) 87 88---------------------- 89 90# API Reference 91 92Below is a list of all classes, functions, and modules available in the top-level `airbyte` 93module. (This is a long list!) If you are just starting out, we recommend beginning by selecting a 94submodule to navigate to from the left sidebar or from the list below: 95 96Each module 97has its own documentation and code samples related to effectively using the related capabilities. 98 99- **`airbyte.cloud`** - Working with Airbyte Cloud, including running jobs remotely. 100- **`airbyte.caches`** - Working with caches, including how to inspect a cache and get data from it. 101- **`airbyte.datasets`** - Working with datasets, including how to read from datasets and convert to 102 other formats, such as Pandas, Arrow, and LLM Document formats. 103- **`airbyte.destinations`** - Working with destinations, including how to write to Airbyte 104 destinations connectors. 105- **`airbyte.documents`** - Working with LLM documents, including how to convert records into 106 document formats, for instance, when working with AI libraries like LangChain. 107- **`airbyte.exceptions`** - Definitions of all exception and warning classes used in PyAirbyte. 108- **`airbyte.experimental`** - Experimental features and utilities that do not yet have a stable 109 API. 110- **`airbyte.logs`** - Logging functionality and configuration. 111- **`airbyte.records`** - Internal record handling classes. 112- **`airbyte.results`** - Documents the classes returned when working with results from 113 `Source.read` and `Destination.write` 114- **`airbyte.secrets`** - Tools for managing secrets in PyAirbyte. 115- **`airbyte.sources`** - Tools for creating and reading from Airbyte sources. This includes 116 `airbyte.source.get_source` to declare a source, `airbyte.source.Source.read` for reading data, 117 and `airbyte.source.Source.get_records()` to peek at records without caching or writing them 118 directly. 119 120---------------------- 121 122""" # noqa: D415 123 124from __future__ import annotations 125 126from typing import TYPE_CHECKING 127 128from airbyte import registry 129from airbyte.caches.bigquery import BigQueryCache 130from airbyte.caches.duckdb import DuckDBCache 131from airbyte.caches.util import get_colab_cache, get_default_cache, new_local_cache 132from airbyte.datasets import CachedDataset 133from airbyte.destinations.base import Destination 134from airbyte.destinations.util import get_destination 135from airbyte.records import StreamRecord 136from airbyte.registry import get_available_connectors 137from airbyte.results import ReadResult, WriteResult 138from airbyte.secrets import SecretSourceEnum, get_secret 139from airbyte.sources.base import Source 140from airbyte.sources.util import get_source 141 142 143# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 144if TYPE_CHECKING: 145 # ruff: noqa: TC004 # imports used for more than type checking 146 from airbyte import ( 147 caches, 148 callbacks, 149 cli, 150 cloud, 151 constants, 152 datasets, 153 destinations, 154 documents, 155 exceptions, # noqa: ICN001 # No 'exc' alias for top-level module 156 experimental, 157 logs, 158 mcp, 159 records, 160 results, 161 secrets, 162 sources, 163 ) 164 165 166__all__ = [ 167 # Modules 168 "caches", 169 "callbacks", 170 "cli", 171 "cloud", 172 "constants", 173 "datasets", 174 "destinations", 175 "documents", 176 "exceptions", 177 "experimental", 178 "logs", 179 "mcp", 180 "records", 181 "registry", 182 "results", 183 "secrets", 184 "sources", 185 # Factories 186 "get_available_connectors", 187 "get_colab_cache", 188 "get_default_cache", 189 "get_destination", 190 "get_secret", 191 "get_source", 192 "new_local_cache", 193 # Classes 194 "BigQueryCache", 195 "CachedDataset", 196 "Destination", 197 "DuckDBCache", 198 "ReadResult", 199 "SecretSourceEnum", 200 "Source", 201 "StreamRecord", 202 "WriteResult", 203] 204 205__docformat__ = "google"
251def get_available_connectors( 252 install_type: InstallType | str | None = InstallType.INSTALLABLE, 253) -> list[str]: 254 """Return a list of all available connectors. 255 256 Connectors will be returned in alphabetical order, with the standard prefix "source-". 257 258 Args: 259 install_type: The type of installation for the connector. 260 Defaults to `InstallType.INSTALLABLE`. 261 """ 262 if install_type is None or install_type == InstallType.INSTALLABLE: 263 # Filter for installable connectors (default behavior). 264 if is_docker_installed(): 265 logger.info("Docker is detected. Returning all connectors.") 266 return sorted(_get_registry_cache().keys()) 267 268 logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.") 269 return sorted( 270 [ 271 connector_name 272 for connector_name, conn_info in _get_registry_cache().items() 273 if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY} 274 ] 275 ) 276 277 if not isinstance(install_type, InstallType): 278 install_type = InstallType(install_type) 279 280 if install_type == InstallType.PYTHON: 281 return sorted( 282 connector_name 283 for connector_name, conn_info in _get_registry_cache().items() 284 if conn_info.pypi_package_name is not None 285 ) 286 287 if install_type == InstallType.JAVA: 288 warnings.warn( 289 message="Java connectors are not yet supported.", 290 stacklevel=2, 291 ) 292 return sorted( 293 connector_name 294 for connector_name, conn_info in _get_registry_cache().items() 295 if conn_info.language == Language.JAVA 296 ) 297 298 if install_type in {InstallType.DOCKER, InstallType.ANY}: 299 return sorted(_get_registry_cache().keys()) 300 301 if install_type == InstallType.YAML: 302 return sorted( 303 conn.name 304 for conn in _get_registry_cache().values() 305 if InstallType.YAML in conn.install_types 306 ) 307 308 # pragma: no cover # Should never be reached. 309 raise exc.PyAirbyteInputError( 310 message="Invalid install type.", 311 context={ 312 "install_type": install_type, 313 }, 314 )
Return a list of all available connectors.
Connectors will be returned in alphabetical order, with the standard prefix "source-".
Arguments:
- install_type: The type of installation for the connector.
Defaults to
InstallType.INSTALLABLE.
85def get_colab_cache( 86 cache_name: str = "default_cache", 87 sub_dir: str = "Airbyte/cache", 88 schema_name: str = "main", 89 table_prefix: str | None = "", 90 drive_name: str = _MY_DRIVE, 91 mount_path: str = _GOOGLE_DRIVE_DEFAULT_MOUNT_PATH, 92) -> DuckDBCache: 93 """Get a local cache for storing data, using the default database path. 94 95 Unlike the default `DuckDBCache`, this implementation will easily persist data across multiple 96 Colab sessions. 97 98 Please note that Google Colab may prompt you to authenticate with your Google account to access 99 your Google Drive. When prompted, click the link and follow the instructions. 100 101 Colab will require access to read and write files in your Google Drive, so please be sure to 102 grant the necessary permissions when prompted. 103 104 All arguments are optional and have default values that are suitable for most use cases. 105 106 Args: 107 cache_name: The name to use for the cache. Defaults to "colab_cache". Override this if you 108 want to use a different database for different projects. 109 sub_dir: The subdirectory to store the cache in. Defaults to "Airbyte/cache". Override this 110 if you want to store the cache in a different subdirectory than the default. 111 schema_name: The name of the schema to write to. Defaults to "main". Override this if you 112 want to write to a different schema. 113 table_prefix: The prefix to use for all tables in the cache. Defaults to "". Override this 114 if you want to use a different prefix for all tables. 115 drive_name: The name of the Google Drive to use. Defaults to "MyDrive". Override this if you 116 want to store data in a shared drive instead of your personal drive. 117 mount_path: The path to mount Google Drive to. Defaults to "/content/drive". Override this 118 if you want to mount Google Drive to a different path (not recommended). 119 120 ## Usage Examples 121 122 The default `get_colab_cache` arguments are suitable for most use cases: 123 124 ```python 125 from airbyte.caches.colab import get_colab_cache 126 127 colab_cache = get_colab_cache() 128 ``` 129 130 Or you can call `get_colab_cache` with custom arguments: 131 132 ```python 133 custom_cache = get_colab_cache( 134 cache_name="my_custom_cache", 135 sub_dir="Airbyte/custom_cache", 136 drive_name="My Company Drive", 137 ) 138 ``` 139 """ 140 try: 141 from google.colab import drive # noqa: PLC0415 # type: ignore[reportMissingImports] 142 except ImportError: 143 drive = None 144 msg = ( 145 "The `google.colab` interface is only available in Google Colab. " 146 "Please run this code in a Google Colab notebook." 147 ) 148 raise ImportError(msg) from None 149 150 drive.mount(mount_path) 151 drive_root = ( 152 Path(mount_path) / drive_name 153 if drive_name == _MY_DRIVE 154 else Path(mount_path) / "Shareddrives" / drive_name 155 ) 156 157 cache_dir = drive_root / sub_dir 158 cache_dir.mkdir(parents=True, exist_ok=True) 159 db_file_path = cache_dir / f"{cache_name}.duckdb" 160 161 print(f"Using persistent PyAirbyte cache in Google Drive: `{db_file_path}`.") 162 return DuckDBCache( 163 db_path=db_file_path, 164 cache_dir=cache_dir, 165 schema_name=schema_name, 166 table_prefix=table_prefix, 167 )
Get a local cache for storing data, using the default database path.
Unlike the default DuckDBCache, this implementation will easily persist data across multiple
Colab sessions.
Please note that Google Colab may prompt you to authenticate with your Google account to access your Google Drive. When prompted, click the link and follow the instructions.
Colab will require access to read and write files in your Google Drive, so please be sure to grant the necessary permissions when prompted.
All arguments are optional and have default values that are suitable for most use cases.
Arguments:
- cache_name: The name to use for the cache. Defaults to "colab_cache". Override this if you want to use a different database for different projects.
- sub_dir: The subdirectory to store the cache in. Defaults to "Airbyte/cache". Override this if you want to store the cache in a different subdirectory than the default.
- schema_name: The name of the schema to write to. Defaults to "main". Override this if you want to write to a different schema.
- table_prefix: The prefix to use for all tables in the cache. Defaults to "". Override this if you want to use a different prefix for all tables.
- drive_name: The name of the Google Drive to use. Defaults to "MyDrive". Override this if you want to store data in a shared drive instead of your personal drive.
- mount_path: The path to mount Google Drive to. Defaults to "/content/drive". Override this if you want to mount Google Drive to a different path (not recommended).
Usage Examples
The default get_colab_cache arguments are suitable for most use cases:
from airbyte.caches.colab import get_colab_cache
colab_cache = get_colab_cache()
Or you can call get_colab_cache with custom arguments:
custom_cache = get_colab_cache(
cache_name="my_custom_cache",
sub_dir="Airbyte/custom_cache",
drive_name="My Company Drive",
)
31def get_default_cache() -> DuckDBCache: 32 """Get a local cache for storing data, using the default database path. 33 34 Cache files are stored in the `.cache` directory, relative to the current 35 working directory. 36 """ 37 cache_dir = DEFAULT_CACHE_ROOT / "default_cache" 38 return DuckDBCache( 39 db_path=cache_dir / "default_cache.duckdb", 40 cache_dir=cache_dir, 41 )
Get a local cache for storing data, using the default database path.
Cache files are stored in the .cache directory, relative to the current
working directory.
22def get_destination( # noqa: PLR0913 # Too many arguments 23 name: str, 24 config: dict[str, Any] | None = None, 25 *, 26 config_change_callback: ConfigChangeCallback | None = None, 27 version: str | None = None, 28 use_python: bool | Path | str | None = None, 29 pip_url: str | None = None, 30 local_executable: Path | str | None = None, 31 docker_image: str | bool | None = None, 32 use_host_network: bool = False, 33 install_if_missing: bool = True, 34 install_root: Path | None = None, 35 no_executor: bool = False, 36) -> Destination: 37 """Get a connector by name and version. 38 39 Args: 40 name: connector name 41 config: connector config - if not provided, you need to set it later via the set_config 42 method. 43 config_change_callback: callback function to be called when the connector config changes. 44 streams: list of stream names to select for reading. If set to "*", all streams will be 45 selected. If not provided, you can set it later via the `select_streams()` or 46 `select_all_streams()` method. 47 version: connector version - if not provided, the currently installed version will be used. 48 If no version is installed, the latest available version will be used. The version can 49 also be set to "latest" to force the use of the latest available version. 50 use_python: (Optional.) Python interpreter specification: 51 - True: Use current Python interpreter. (Inferred if `pip_url` is set.) 52 - False: Use Docker instead. 53 - Path: Use interpreter at this path. 54 - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet 55 installed, it will be installed by uv. (This generally adds less than 3 seconds 56 to install times.) 57 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 58 connector name. 59 local_executable: If set, the connector will be assumed to already be installed and will be 60 executed using this path or executable name. Otherwise, the connector will be installed 61 automatically in a virtual environment. 62 docker_image: If set, the connector will be executed using Docker. You can specify `True` 63 to use the default image for the connector, or you can specify a custom image name. 64 If `version` is specified and your image name does not already contain a tag 65 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 66 use_host_network: If set, along with docker_image, the connector will be executed using 67 the host network. This is useful for connectors that need to access resources on 68 the host machine, such as a local database. This parameter is ignored when 69 `docker_image` is not set. 70 install_if_missing: Whether to install the connector if it is not available locally. This 71 parameter is ignored when local_executable is set. 72 install_root: (Optional.) The root directory where the virtual environment will be 73 created. If not provided, the current working directory will be used. 74 no_executor: If True, use NoOpExecutor which fetches specs from the registry without 75 local installation. This is useful for scenarios where you need to validate 76 configurations but don't need to run the connector locally (e.g., deploying to Cloud). 77 """ 78 executor = get_connector_executor( 79 name=name, 80 version=version, 81 use_python=use_python, 82 pip_url=pip_url, 83 local_executable=local_executable, 84 docker_image=docker_image, 85 use_host_network=use_host_network, 86 install_if_missing=install_if_missing, 87 install_root=install_root, 88 no_executor=no_executor, 89 ) 90 91 return Destination( 92 name=name, 93 config=config, 94 config_change_callback=config_change_callback, 95 executor=executor, 96 )
Get a connector by name and version.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()orselect_all_streams()method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- use_python: (Optional.) Python interpreter specification:
- True: Use current Python interpreter. (Inferred if
pip_urlis set.) - False: Use Docker instead.
- Path: Use interpreter at this path.
- str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
- True: Use current Python interpreter. (Inferred if
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
Trueto use the default image for the connector, or you can specify a custom image name. Ifversionis specified and your image name does not already contain a tag (e.g.my-image:latest), the version will be appended as a tag (e.g.my-image:0.1.0). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_imageis not set. - install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
- install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
- no_executor: If True, use NoOpExecutor which fetches specs from the registry without local installation. This is useful for scenarios where you need to validate configurations but don't need to run the connector locally (e.g., deploying to Cloud).
64def get_secret( 65 secret_name: str, 66 /, 67 *, 68 sources: list[SecretManager | SecretSourceEnum] | None = None, 69 default: str | SecretString | None = None, 70 allow_prompt: bool = True, 71 **kwargs: dict[str, Any], 72) -> SecretString: 73 """Get a secret from the environment. 74 75 The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum` 76 options. If left blank, all available sources will be checked. If a list of `SecretSourceEnum` 77 entries is passed, then the sources will be checked using the provided ordering. 78 79 If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then 80 the user will be prompted to enter the secret if it is not found in any of the other sources. 81 82 Raises: 83 PyAirbyteSecretNotFoundError: If the secret is not found in any of the configured sources, 84 and if no default value is provided. 85 PyAirbyteInputError: If an invalid source name is provided in the `sources` argument. 86 """ 87 if secret_name.startswith(SECRETS_HYDRATION_PREFIX): 88 # If the secret name starts with the hydration prefix, we assume it's a secret reference. 89 # We strip the prefix and get the actual secret name. 90 secret_name = secret_name.removeprefix(SECRETS_HYDRATION_PREFIX).lstrip() 91 92 if "source" in kwargs: 93 warnings.warn( 94 message="The `source` argument is deprecated. Use the `sources` argument instead.", 95 category=DeprecationWarning, 96 stacklevel=2, 97 ) 98 sources = kwargs.pop("source") # type: ignore [assignment] 99 100 available_sources: dict[str, SecretManager] = {} 101 for available_source in _get_secret_sources(): 102 # Add available sources to the dict. Order matters. 103 available_sources[available_source.name] = available_source 104 105 if sources is None: 106 # If ANY is in the list, then we don't need to check any other sources. 107 # This is the default behavior. 108 sources = list(available_sources.values()) 109 110 elif not isinstance(sources, list): 111 sources = [sources] # type: ignore [unreachable] # This is a 'just in case' catch. 112 113 # Replace any SecretSourceEnum strings with the matching SecretManager object 114 for source in list(sources): 115 if isinstance(source, SecretSourceEnum): 116 if source not in available_sources: 117 raise exc.PyAirbyteInputError( 118 guidance="Invalid secret source name.", 119 input_value=source, 120 context={ 121 "Available Sources": list(available_sources.keys()), 122 }, 123 ) 124 125 sources[sources.index(source)] = available_sources[source] 126 127 secret_managers = cast("list[SecretManager]", sources) 128 129 if SecretSourceEnum.PROMPT in secret_managers: 130 prompt_source = secret_managers.pop( 131 # Mis-typed, but okay here since we have equality logic for the enum comparison: 132 secret_managers.index(SecretSourceEnum.PROMPT), # type: ignore [arg-type] 133 ) 134 135 if allow_prompt: 136 # Always check prompt last. Add it to the end of the list. 137 secret_managers.append(prompt_source) 138 139 for secret_mgr in secret_managers: 140 val = secret_mgr.get_secret(secret_name) 141 if val: 142 return SecretString(val) 143 144 if default: 145 return SecretString(default) 146 147 raise exc.PyAirbyteSecretNotFoundError( 148 secret_name=secret_name, 149 sources=[str(s) for s in available_sources], 150 )
Get a secret from the environment.
The optional sources argument of enum type SecretSourceEnum or list of SecretSourceEnum
options. If left blank, all available sources will be checked. If a list of SecretSourceEnum
entries is passed, then the sources will be checked using the provided ordering.
If allow_prompt is True or if SecretSourceEnum.PROMPT is declared in the source arg, then
the user will be prompted to enter the secret if it is not found in any of the other sources.
Raises:
- PyAirbyteSecretNotFoundError: If the secret is not found in any of the configured sources, and if no default value is provided.
- PyAirbyteInputError: If an invalid source name is provided in the
sourcesargument.
48def get_source( # noqa: PLR0913 # Too many arguments 49 name: str, 50 config: dict[str, Any] | None = None, 51 *, 52 config_change_callback: ConfigChangeCallback | None = None, 53 streams: str | list[str] | None = None, 54 version: str | None = None, 55 use_python: bool | Path | str | None = None, 56 pip_url: str | None = None, 57 local_executable: Path | str | None = None, 58 docker_image: bool | str | None = None, 59 use_host_network: bool = False, 60 source_manifest: bool | dict | Path | str | None = None, 61 install_if_missing: bool = True, 62 install_root: Path | None = None, 63 no_executor: bool = False, 64) -> Source: 65 """Get a connector by name and version. 66 67 If an explicit install or execution method is requested (e.g. `local_executable`, 68 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 69 70 Otherwise, an appropriate method will be selected based on the available connector metadata: 71 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 72 will be downloaded and used to to execute the connector. 73 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 74 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 75 will be executed using Docker. 76 77 Args: 78 name: connector name 79 config: connector config - if not provided, you need to set it later via the set_config 80 method. 81 config_change_callback: callback function to be called when the connector config changes. 82 streams: list of stream names to select for reading. If set to "*", all streams will be 83 selected. If not provided, you can set it later via the `select_streams()` or 84 `select_all_streams()` method. 85 version: connector version - if not provided, the currently installed version will be used. 86 If no version is installed, the latest available version will be used. The version can 87 also be set to "latest" to force the use of the latest available version. 88 use_python: (Optional.) Python interpreter specification: 89 - True: Use current Python interpreter. (Inferred if `pip_url` is set.) 90 - False: Use Docker instead. 91 - Path: Use interpreter at this path. 92 - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet 93 installed, it will be installed by uv. (This generally adds less than 3 seconds 94 to install times.) 95 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 96 connector name. 97 local_executable: If set, the connector will be assumed to already be installed and will be 98 executed using this path or executable name. Otherwise, the connector will be installed 99 automatically in a virtual environment. 100 docker_image: If set, the connector will be executed using Docker. You can specify `True` 101 to use the default image for the connector, or you can specify a custom image name. 102 If `version` is specified and your image name does not already contain a tag 103 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 104 use_host_network: If set, along with docker_image, the connector will be executed using 105 the host network. This is useful for connectors that need to access resources on 106 the host machine, such as a local database. This parameter is ignored when 107 `docker_image` is not set. 108 source_manifest: If set, the connector will be executed based on a declarative YAML 109 source definition. This input can be `True` to attempt to auto-download a YAML spec, 110 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 111 the local file system, or `str` to pull the definition from a web URL. 112 install_if_missing: Whether to install the connector if it is not available locally. This 113 parameter is ignored when `local_executable` or `source_manifest` are set. 114 install_root: (Optional.) The root directory where the virtual environment will be 115 created. If not provided, the current working directory will be used. 116 no_executor: If True, use NoOpExecutor which fetches specs from the registry without 117 local installation. This is useful for scenarios where you need to validate 118 configurations but don't need to run the connector locally (e.g., deploying to Cloud). 119 """ 120 executor = get_connector_executor( 121 name=name, 122 version=version, 123 use_python=use_python, 124 pip_url=pip_url, 125 local_executable=local_executable, 126 docker_image=docker_image, 127 use_host_network=use_host_network, 128 source_manifest=source_manifest, 129 install_if_missing=install_if_missing, 130 install_root=install_root, 131 no_executor=no_executor, 132 ) 133 134 return Source( 135 name=name, 136 config=config, 137 config_change_callback=config_change_callback, 138 streams=streams, 139 executor=executor, 140 )
Get a connector by name and version.
If an explicit install or execution method is requested (e.g. local_executable,
docker_image, pip_url, source_manifest), the connector will be executed using this method.
Otherwise, an appropriate method will be selected based on the available connector metadata:
- If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
- Else, if the connector is registered and has a PyPI package, it will be installed via pip.
- Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()orselect_all_streams()method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- use_python: (Optional.) Python interpreter specification:
- True: Use current Python interpreter. (Inferred if
pip_urlis set.) - False: Use Docker instead.
- Path: Use interpreter at this path.
- str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
- True: Use current Python interpreter. (Inferred if
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
Trueto use the default image for the connector, or you can specify a custom image name. Ifversionis specified and your image name does not already contain a tag (e.g.my-image:latest), the version will be appended as a tag (e.g.my-image:0.1.0). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_imageis not set. - source_manifest: If set, the connector will be executed based on a declarative YAML
source definition. This input can be
Trueto attempt to auto-download a YAML spec,dictto accept a Python dictionary as the manifest,Pathto pull a manifest from the local file system, orstrto pull the definition from a web URL. - install_if_missing: Whether to install the connector if it is not available locally. This
parameter is ignored when
local_executableorsource_manifestare set. - install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
- no_executor: If True, use NoOpExecutor which fetches specs from the registry without local installation. This is useful for scenarios where you need to validate configurations but don't need to run the connector locally (e.g., deploying to Cloud).
44def new_local_cache( 45 cache_name: str | None = None, 46 cache_dir: str | Path | None = None, 47 *, 48 cleanup: bool = True, 49) -> DuckDBCache: 50 """Get a local cache for storing data, using a name string to seed the path. 51 52 Args: 53 cache_name: Name to use for the cache. Defaults to None. 54 cache_dir: Root directory to store the cache in. Defaults to None. 55 cleanup: Whether to clean up temporary files. Defaults to True. 56 57 Cache files are stored in the `.cache` directory, relative to the current 58 working directory. 59 """ 60 if cache_name: 61 if " " in cache_name: 62 raise exc.PyAirbyteInputError( 63 message="Cache name cannot contain spaces.", 64 input_value=cache_name, 65 ) 66 67 if not cache_name.replace("_", "").isalnum(): 68 raise exc.PyAirbyteInputError( 69 message="Cache name can only contain alphanumeric characters and underscores.", 70 input_value=cache_name, 71 ) 72 73 cache_name = cache_name or str(ulid.ULID()) 74 cache_dir = cache_dir or (DEFAULT_CACHE_ROOT / cache_name) 75 if not isinstance(cache_dir, Path): 76 cache_dir = Path(cache_dir) 77 78 return DuckDBCache( 79 db_path=cache_dir / f"db_{cache_name}.duckdb", 80 cache_dir=cache_dir, 81 cleanup=cleanup, 82 )
Get a local cache for storing data, using a name string to seed the path.
Arguments:
- cache_name: Name to use for the cache. Defaults to None.
- cache_dir: Root directory to store the cache in. Defaults to None.
- cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the .cache directory, relative to the current
working directory.
39class BigQueryCache(BigQueryConfig, CacheBase): 40 """The BigQuery cache implementation.""" 41 42 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor 43 44 paired_destination_name: ClassVar[str | None] = "destination-bigquery" 45 paired_destination_config_class: ClassVar[type | None] = DestinationBigquery 46 47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self) 51 52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
The BigQuery cache implementation.
47 @property 48 def paired_destination_config(self) -> DestinationBigquery: 49 """Return a dictionary of destination configuration values.""" 50 return bigquery_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
52 def get_arrow_dataset( 53 self, 54 stream_name: str, 55 *, 56 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 57 ) -> NoReturn: 58 """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`. 59 60 See: https://github.com/airbytehq/PyAirbyte/issues/165 61 """ 62 raise NotImplementedError( 63 "BigQuery doesn't currently support to_arrow" 64 "Please consider using a different cache implementation for these functionalities." 65 )
Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table.
150class CachedDataset(SQLDataset): 151 """A dataset backed by a SQL table cache. 152 153 Because this dataset includes all records from the underlying table, we also expose the 154 underlying table as a SQLAlchemy Table object. 155 """ 156 157 def __init__( 158 self, 159 cache: CacheBase, 160 stream_name: str, 161 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 162 ) -> None: 163 """We construct the query statement by selecting all columns from the table. 164 165 This prevents the need to scan the table schema to construct the query statement. 166 167 If stream_configuration is None, we attempt to retrieve the stream configuration from the 168 cache processor. This is useful when constructing a dataset from a CachedDataset object, 169 which already has the stream configuration. 170 171 If stream_configuration is set to False, we skip the stream configuration retrieval. 172 """ 173 table_name = cache.processor.get_sql_table_name(stream_name) 174 schema_name = cache.schema_name 175 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 176 super().__init__( 177 cache=cache, 178 stream_name=stream_name, 179 query_statement=query, 180 stream_configuration=stream_configuration, 181 ) 182 183 @overrides 184 def to_pandas(self) -> DataFrame: 185 """Return the underlying dataset data as a pandas DataFrame.""" 186 return self._cache.get_pandas_dataframe(self._stream_name) 187 188 @overrides 189 def to_arrow( 190 self, 191 *, 192 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 193 ) -> Dataset: 194 """Return an Arrow Dataset containing the data from the specified stream. 195 196 Args: 197 stream_name (str): Name of the stream to retrieve data from. 198 max_chunk_size (int): max number of records to include in each batch of pyarrow dataset. 199 200 Returns: 201 pa.dataset.Dataset: Arrow Dataset containing the stream's data. 202 """ 203 return self._cache.get_arrow_dataset( 204 stream_name=self._stream_name, 205 max_chunk_size=max_chunk_size, 206 ) 207 208 def to_sql_table(self) -> Table: 209 """Return the underlying SQL table as a SQLAlchemy Table object.""" 210 return self._cache.processor.get_sql_table(self.stream_name) 211 212 def __eq__(self, value: object) -> bool: 213 """Return True if the value is a CachedDataset with the same cache and stream name. 214 215 In the case of CachedDataset objects, we can simply compare the cache and stream name. 216 217 Note that this equality check is only supported on CachedDataset objects and not for 218 the base SQLDataset implementation. This is because of the complexity and computational 219 cost of comparing two arbitrary SQL queries that could be bound to different variables, 220 as well as the chance that two queries can be syntactically equivalent without being 221 text-wise equivalent. 222 """ 223 if not isinstance(value, SQLDataset): 224 return False 225 226 if self._cache is not value._cache: 227 return False 228 229 return not self._stream_name != value._stream_name 230 231 def __hash__(self) -> int: 232 return hash(self._stream_name)
A dataset backed by a SQL table cache.
Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.
157 def __init__( 158 self, 159 cache: CacheBase, 160 stream_name: str, 161 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 162 ) -> None: 163 """We construct the query statement by selecting all columns from the table. 164 165 This prevents the need to scan the table schema to construct the query statement. 166 167 If stream_configuration is None, we attempt to retrieve the stream configuration from the 168 cache processor. This is useful when constructing a dataset from a CachedDataset object, 169 which already has the stream configuration. 170 171 If stream_configuration is set to False, we skip the stream configuration retrieval. 172 """ 173 table_name = cache.processor.get_sql_table_name(stream_name) 174 schema_name = cache.schema_name 175 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 176 super().__init__( 177 cache=cache, 178 stream_name=stream_name, 179 query_statement=query, 180 stream_configuration=stream_configuration, 181 )
We construct the query statement by selecting all columns from the table.
This prevents the need to scan the table schema to construct the query statement.
If stream_configuration is None, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.
If stream_configuration is set to False, we skip the stream configuration retrieval.
183 @overrides 184 def to_pandas(self) -> DataFrame: 185 """Return the underlying dataset data as a pandas DataFrame.""" 186 return self._cache.get_pandas_dataframe(self._stream_name)
Return the underlying dataset data as a pandas DataFrame.
188 @overrides 189 def to_arrow( 190 self, 191 *, 192 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 193 ) -> Dataset: 194 """Return an Arrow Dataset containing the data from the specified stream. 195 196 Args: 197 stream_name (str): Name of the stream to retrieve data from. 198 max_chunk_size (int): max number of records to include in each batch of pyarrow dataset. 199 200 Returns: 201 pa.dataset.Dataset: Arrow Dataset containing the stream's data. 202 """ 203 return self._cache.get_arrow_dataset( 204 stream_name=self._stream_name, 205 max_chunk_size=max_chunk_size, 206 )
Return an Arrow Dataset containing the data from the specified stream.
Arguments:
- stream_name (str): Name of the stream to retrieve data from.
- max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
Returns:
pa.dataset.Dataset: Arrow Dataset containing the stream's data.
39class Destination(ConnectorBase, AirbyteWriterInterface): 40 """A class representing a destination that can be called.""" 41 42 connector_type = "destination" 43 44 def __init__( 45 self, 46 executor: Executor, 47 name: str, 48 config: dict[str, Any] | None = None, 49 *, 50 config_change_callback: ConfigChangeCallback | None = None, 51 validate: bool = False, 52 ) -> None: 53 """Initialize the source. 54 55 If config is provided, it will be validated against the spec if validate is True. 56 """ 57 super().__init__( 58 executor=executor, 59 name=name, 60 config=config, 61 config_change_callback=config_change_callback, 62 validate=validate, 63 ) 64 65 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 66 self, 67 source_data: Source | ReadResult, 68 *, 69 streams: list[str] | Literal["*"] | None = None, 70 cache: CacheBase | Literal[False] | None = None, 71 state_cache: CacheBase | Literal[False] | None = None, 72 write_strategy: WriteStrategy = WriteStrategy.AUTO, 73 force_full_refresh: bool = False, 74 ) -> WriteResult: 75 """Write data from source connector or already cached source data. 76 77 Caching is enabled by default, unless explicitly disabled. 78 79 Args: 80 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 81 streams: The streams to write to the destination. If omitted or if "*" is provided, 82 all streams will be written. If `source_data` is a source, then streams must be 83 selected here or on the source. If both are specified, this setting will override 84 the stream selection on the source. 85 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 86 False, the cache will be disabled. This must be `None` if `source_data` is already 87 a `Cache` object. 88 state_cache: A cache to use for storing incremental state. You do not need to set this 89 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 90 disable state management. 91 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 92 will decide the best strategy to use. 93 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 94 existing state will be ignored and all source data will be reloaded. 95 96 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 97 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 98 a known destination state, the destination-specific state will be used. If neither are 99 available, a full refresh will be performed. 100 """ 101 if not isinstance(source_data, ReadResult | Source): 102 raise exc.PyAirbyteInputError( 103 message="Invalid source_data type for `source_data` arg.", 104 context={ 105 "source_data_type_provided": type(source_data).__name__, 106 }, 107 ) 108 109 # Resolve `source`, `read_result`, and `source_name` 110 source: Source | None = source_data if isinstance(source_data, Source) else None 111 read_result: ReadResult | None = ( 112 source_data if isinstance(source_data, ReadResult) else None 113 ) 114 source_name: str = source.name if source else cast("ReadResult", read_result).source_name 115 116 # State providers and writers default to no-op, unless overridden below. 117 cache_state_provider: StateProviderBase = StaticInputState([]) 118 """Provides the state of the cache's data.""" 119 cache_state_writer: StateWriterBase = NoOpStateWriter() 120 """Writes updates for the state of the cache's data.""" 121 destination_state_provider: StateProviderBase = StaticInputState([]) 122 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 123 destination_state_writer: StateWriterBase = NoOpStateWriter() 124 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 125 126 # If caching not explicitly disabled 127 if cache is not False: 128 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 129 if isinstance(source_data, ReadResult): 130 cache = source_data.cache 131 132 cache = cache or get_default_cache() 133 cache_state_provider = cache.get_state_provider( 134 source_name=source_name, 135 destination_name=None, # This will just track the cache state 136 ) 137 cache_state_writer = cache.get_state_writer( 138 source_name=source_name, 139 destination_name=None, # This will just track the cache state 140 ) 141 142 # Resolve `state_cache` 143 if state_cache is None: 144 state_cache = cache or get_default_cache() 145 146 # Resolve `destination_state_writer` and `destination_state_provider` 147 if state_cache: 148 destination_state_writer = state_cache.get_state_writer( 149 source_name=source_name, 150 destination_name=self.name, 151 ) 152 if not force_full_refresh: 153 destination_state_provider = state_cache.get_state_provider( 154 source_name=source_name, 155 destination_name=self.name, 156 ) 157 elif state_cache is not False: 158 warnings.warn( 159 "No state backend or cache provided. State will not be tracked." 160 "To track state, provide a cache or state backend." 161 "To silence this warning, set `state_cache=False` explicitly.", 162 category=exc.PyAirbyteWarning, 163 stacklevel=2, 164 ) 165 166 # Resolve `catalog_provider` 167 if source: 168 catalog_provider = CatalogProvider( 169 configured_catalog=source.get_configured_catalog( 170 streams=streams, 171 force_full_refresh=force_full_refresh, 172 ) 173 ) 174 elif read_result: 175 catalog_provider = CatalogProvider.from_read_result(read_result) 176 else: 177 raise exc.PyAirbyteInternalError( 178 message="`source_data` must be a `Source` or `ReadResult` object.", 179 ) 180 181 progress_tracker = ProgressTracker( 182 source=source if isinstance(source_data, Source) else None, 183 cache=cache or None, 184 destination=self, 185 expected_streams=catalog_provider.stream_names, 186 ) 187 188 source_state_provider: StateProviderBase 189 source_state_provider = JoinedStateProvider( 190 primary=cache_state_provider, 191 secondary=destination_state_provider, 192 ) 193 194 if source: 195 if cache is False: 196 # Get message iterator for source (caching disabled) 197 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 198 streams=streams, 199 state_provider=source_state_provider, 200 progress_tracker=progress_tracker, 201 force_full_refresh=force_full_refresh, 202 ) 203 else: 204 # Caching enabled and we are reading from a source. 205 # Read the data to cache if caching is enabled. 206 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 207 cache=cache, 208 state_provider=source_state_provider, 209 state_writer=cache_state_writer, 210 catalog_provider=catalog_provider, 211 stream_names=catalog_provider.stream_names, 212 write_strategy=write_strategy, 213 force_full_refresh=force_full_refresh, 214 skip_validation=False, 215 progress_tracker=progress_tracker, 216 ) 217 message_iterator = AirbyteMessageIterator.from_read_result( 218 read_result=read_result, 219 ) 220 else: # Else we are reading from a read result 221 assert read_result is not None 222 message_iterator = AirbyteMessageIterator.from_read_result( 223 read_result=read_result, 224 ) 225 226 # Write the data to the destination 227 try: 228 self._write_airbyte_message_stream( 229 stdin=message_iterator, 230 catalog_provider=catalog_provider, 231 write_strategy=write_strategy, 232 state_writer=destination_state_writer, 233 progress_tracker=progress_tracker, 234 ) 235 except Exception as ex: 236 progress_tracker.log_failure(exception=ex) 237 raise 238 else: 239 # No exceptions were raised, so log success 240 progress_tracker.log_success() 241 242 return WriteResult( 243 destination=self, 244 source_data=source_data, 245 catalog_provider=catalog_provider, 246 state_writer=destination_state_writer, 247 progress_tracker=progress_tracker, 248 ) 249 250 def _write_airbyte_message_stream( 251 self, 252 stdin: IO[str] | AirbyteMessageIterator, 253 *, 254 catalog_provider: CatalogProvider, 255 write_strategy: WriteStrategy, 256 state_writer: StateWriterBase | None = None, 257 progress_tracker: ProgressTracker, 258 ) -> None: 259 """Read from the connector and write to the cache.""" 260 # Run optional validation step 261 if state_writer is None: 262 state_writer = StdOutStateWriter() 263 264 # Apply the write strategy to the catalog provider before sending to the destination 265 catalog_provider = catalog_provider.with_write_strategy(write_strategy) 266 267 with as_temp_files( 268 files_contents=[ 269 self._hydrated_config, 270 catalog_provider.configured_catalog.model_dump_json(exclude_none=True), 271 ] 272 ) as [ 273 config_file, 274 catalog_file, 275 ]: 276 try: 277 # We call the connector to write the data, tallying the inputs and outputs 278 for destination_message in progress_tracker.tally_confirmed_writes( 279 messages=self._execute( 280 args=[ 281 "write", 282 "--config", 283 config_file, 284 "--catalog", 285 catalog_file, 286 ], 287 stdin=AirbyteMessageIterator( 288 progress_tracker.tally_pending_writes( 289 stdin, 290 ) 291 ), 292 ) 293 ): 294 if destination_message.state: 295 state_writer.write_state(state_message=destination_message.state) 296 297 except exc.AirbyteConnectorFailedError as ex: 298 raise exc.AirbyteConnectorWriteError( 299 connector_name=self.name, 300 log_text=self._last_log_messages, 301 original_exception=ex, 302 ) from None
A class representing a destination that can be called.
44 def __init__( 45 self, 46 executor: Executor, 47 name: str, 48 config: dict[str, Any] | None = None, 49 *, 50 config_change_callback: ConfigChangeCallback | None = None, 51 validate: bool = False, 52 ) -> None: 53 """Initialize the source. 54 55 If config is provided, it will be validated against the spec if validate is True. 56 """ 57 super().__init__( 58 executor=executor, 59 name=name, 60 config=config, 61 config_change_callback=config_change_callback, 62 validate=validate, 63 )
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
65 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 66 self, 67 source_data: Source | ReadResult, 68 *, 69 streams: list[str] | Literal["*"] | None = None, 70 cache: CacheBase | Literal[False] | None = None, 71 state_cache: CacheBase | Literal[False] | None = None, 72 write_strategy: WriteStrategy = WriteStrategy.AUTO, 73 force_full_refresh: bool = False, 74 ) -> WriteResult: 75 """Write data from source connector or already cached source data. 76 77 Caching is enabled by default, unless explicitly disabled. 78 79 Args: 80 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 81 streams: The streams to write to the destination. If omitted or if "*" is provided, 82 all streams will be written. If `source_data` is a source, then streams must be 83 selected here or on the source. If both are specified, this setting will override 84 the stream selection on the source. 85 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 86 False, the cache will be disabled. This must be `None` if `source_data` is already 87 a `Cache` object. 88 state_cache: A cache to use for storing incremental state. You do not need to set this 89 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 90 disable state management. 91 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 92 will decide the best strategy to use. 93 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 94 existing state will be ignored and all source data will be reloaded. 95 96 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 97 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 98 a known destination state, the destination-specific state will be used. If neither are 99 available, a full refresh will be performed. 100 """ 101 if not isinstance(source_data, ReadResult | Source): 102 raise exc.PyAirbyteInputError( 103 message="Invalid source_data type for `source_data` arg.", 104 context={ 105 "source_data_type_provided": type(source_data).__name__, 106 }, 107 ) 108 109 # Resolve `source`, `read_result`, and `source_name` 110 source: Source | None = source_data if isinstance(source_data, Source) else None 111 read_result: ReadResult | None = ( 112 source_data if isinstance(source_data, ReadResult) else None 113 ) 114 source_name: str = source.name if source else cast("ReadResult", read_result).source_name 115 116 # State providers and writers default to no-op, unless overridden below. 117 cache_state_provider: StateProviderBase = StaticInputState([]) 118 """Provides the state of the cache's data.""" 119 cache_state_writer: StateWriterBase = NoOpStateWriter() 120 """Writes updates for the state of the cache's data.""" 121 destination_state_provider: StateProviderBase = StaticInputState([]) 122 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 123 destination_state_writer: StateWriterBase = NoOpStateWriter() 124 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 125 126 # If caching not explicitly disabled 127 if cache is not False: 128 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 129 if isinstance(source_data, ReadResult): 130 cache = source_data.cache 131 132 cache = cache or get_default_cache() 133 cache_state_provider = cache.get_state_provider( 134 source_name=source_name, 135 destination_name=None, # This will just track the cache state 136 ) 137 cache_state_writer = cache.get_state_writer( 138 source_name=source_name, 139 destination_name=None, # This will just track the cache state 140 ) 141 142 # Resolve `state_cache` 143 if state_cache is None: 144 state_cache = cache or get_default_cache() 145 146 # Resolve `destination_state_writer` and `destination_state_provider` 147 if state_cache: 148 destination_state_writer = state_cache.get_state_writer( 149 source_name=source_name, 150 destination_name=self.name, 151 ) 152 if not force_full_refresh: 153 destination_state_provider = state_cache.get_state_provider( 154 source_name=source_name, 155 destination_name=self.name, 156 ) 157 elif state_cache is not False: 158 warnings.warn( 159 "No state backend or cache provided. State will not be tracked." 160 "To track state, provide a cache or state backend." 161 "To silence this warning, set `state_cache=False` explicitly.", 162 category=exc.PyAirbyteWarning, 163 stacklevel=2, 164 ) 165 166 # Resolve `catalog_provider` 167 if source: 168 catalog_provider = CatalogProvider( 169 configured_catalog=source.get_configured_catalog( 170 streams=streams, 171 force_full_refresh=force_full_refresh, 172 ) 173 ) 174 elif read_result: 175 catalog_provider = CatalogProvider.from_read_result(read_result) 176 else: 177 raise exc.PyAirbyteInternalError( 178 message="`source_data` must be a `Source` or `ReadResult` object.", 179 ) 180 181 progress_tracker = ProgressTracker( 182 source=source if isinstance(source_data, Source) else None, 183 cache=cache or None, 184 destination=self, 185 expected_streams=catalog_provider.stream_names, 186 ) 187 188 source_state_provider: StateProviderBase 189 source_state_provider = JoinedStateProvider( 190 primary=cache_state_provider, 191 secondary=destination_state_provider, 192 ) 193 194 if source: 195 if cache is False: 196 # Get message iterator for source (caching disabled) 197 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 198 streams=streams, 199 state_provider=source_state_provider, 200 progress_tracker=progress_tracker, 201 force_full_refresh=force_full_refresh, 202 ) 203 else: 204 # Caching enabled and we are reading from a source. 205 # Read the data to cache if caching is enabled. 206 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 207 cache=cache, 208 state_provider=source_state_provider, 209 state_writer=cache_state_writer, 210 catalog_provider=catalog_provider, 211 stream_names=catalog_provider.stream_names, 212 write_strategy=write_strategy, 213 force_full_refresh=force_full_refresh, 214 skip_validation=False, 215 progress_tracker=progress_tracker, 216 ) 217 message_iterator = AirbyteMessageIterator.from_read_result( 218 read_result=read_result, 219 ) 220 else: # Else we are reading from a read result 221 assert read_result is not None 222 message_iterator = AirbyteMessageIterator.from_read_result( 223 read_result=read_result, 224 ) 225 226 # Write the data to the destination 227 try: 228 self._write_airbyte_message_stream( 229 stdin=message_iterator, 230 catalog_provider=catalog_provider, 231 write_strategy=write_strategy, 232 state_writer=destination_state_writer, 233 progress_tracker=progress_tracker, 234 ) 235 except Exception as ex: 236 progress_tracker.log_failure(exception=ex) 237 raise 238 else: 239 # No exceptions were raised, so log success 240 progress_tracker.log_success() 241 242 return WriteResult( 243 destination=self, 244 source_data=source_data, 245 catalog_provider=catalog_provider, 246 state_writer=destination_state_writer, 247 progress_tracker=progress_tracker, 248 )
Write data from source connector or already cached source data.
Caching is enabled by default, unless explicitly disabled.
Arguments:
- source_data: The source data to write. Can be a
Sourceor aReadResultobject. - streams: The streams to write to the destination. If omitted or if "*" is provided,
all streams will be written. If
source_datais a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source. - cache: The cache to use for reading source_data. If
None, no cache will be used. If False, the cache will be disabled. This must beNoneifsource_datais already aCacheobject. - state_cache: A cache to use for storing incremental state. You do not need to set this
if
cacheis specified or ifsource_datais aCacheobject. Set toFalseto disable state management. - write_strategy: The strategy to use for writing source_data. If
AUTO, the connector will decide the best strategy to use. - force_full_refresh: Whether to force a full refresh of the source_data. If
True, any existing state will be ignored and all source data will be reloaded.
For incremental syncs, cache or state_cache will be checked for matching state values.
If the cache has tracked state, this will be used for the sync. Otherwise, if there is
a known destination state, the destination-specific state will be used. If neither are
available, a full refresh will be performed.
44class DuckDBCache(DuckDBConfig, CacheBase): 45 """A DuckDB cache.""" 46 47 _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor 48 49 paired_destination_name: ClassVar[str | None] = "destination-duckdb" 50 paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb 51 52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
A DuckDB cache.
52 @property 53 def paired_destination_config(self) -> DestinationDuckdb: 54 """Return a dictionary of destination configuration values.""" 55 return duckdb_cache_to_destination_configuration(cache=self)
Return a dictionary of destination configuration values.
33class ReadResult(Mapping[str, CachedDataset]): 34 """The result of a read operation. 35 36 This class is used to return information about the read operation, such as the number of 37 records read. It should not be created directly, but instead returned by the write method 38 of a destination. 39 """ 40 41 def __init__( 42 self, 43 *, 44 source_name: str, 45 processed_streams: list[str], 46 cache: CacheBase, 47 progress_tracker: ProgressTracker, 48 ) -> None: 49 """Initialize a read result. 50 51 This class should not be created directly. Instead, it should be returned by the `read` 52 method of the `Source` class. 53 """ 54 self.source_name = source_name 55 self._progress_tracker = progress_tracker 56 self._cache = cache 57 self._processed_streams = processed_streams 58 59 def __getitem__(self, stream: str) -> CachedDataset: 60 """Return the cached dataset for a given stream name.""" 61 if stream not in self._processed_streams: 62 raise KeyError(stream) 63 64 return CachedDataset(self._cache, stream) 65 66 def __contains__(self, stream: object) -> bool: 67 """Return whether a given stream name was included in processing.""" 68 if not isinstance(stream, str): 69 return False 70 71 return stream in self._processed_streams 72 73 def __iter__(self) -> Iterator[str]: 74 """Return an iterator over the stream names that were processed.""" 75 return self._processed_streams.__iter__() 76 77 def __len__(self) -> int: 78 """Return the number of streams that were processed.""" 79 return len(self._processed_streams) 80 81 def get_sql_engine(self) -> Engine: 82 """Return the SQL engine used by the cache.""" 83 return self._cache.get_sql_engine() 84 85 @property 86 def processed_records(self) -> int: 87 """The total number of records read from the source.""" 88 return self._progress_tracker.total_records_read 89 90 @property 91 def streams(self) -> Mapping[str, CachedDataset]: 92 """Return a mapping of stream names to cached datasets.""" 93 return { 94 stream_name: CachedDataset(self._cache, stream_name) 95 for stream_name in self._processed_streams 96 } 97 98 @property 99 def cache(self) -> CacheBase: 100 """Return the cache object.""" 101 return self._cache
The result of a read operation.
This class is used to return information about the read operation, such as the number of records read. It should not be created directly, but instead returned by the write method of a destination.
41 def __init__( 42 self, 43 *, 44 source_name: str, 45 processed_streams: list[str], 46 cache: CacheBase, 47 progress_tracker: ProgressTracker, 48 ) -> None: 49 """Initialize a read result. 50 51 This class should not be created directly. Instead, it should be returned by the `read` 52 method of the `Source` class. 53 """ 54 self.source_name = source_name 55 self._progress_tracker = progress_tracker 56 self._cache = cache 57 self._processed_streams = processed_streams
Initialize a read result.
This class should not be created directly. Instead, it should be returned by the read
method of the Source class.
81 def get_sql_engine(self) -> Engine: 82 """Return the SQL engine used by the cache.""" 83 return self._cache.get_sql_engine()
Return the SQL engine used by the cache.
85 @property 86 def processed_records(self) -> int: 87 """The total number of records read from the source.""" 88 return self._progress_tracker.total_records_read
The total number of records read from the source.
90 @property 91 def streams(self) -> Mapping[str, CachedDataset]: 92 """Return a mapping of stream names to cached datasets.""" 93 return { 94 stream_name: CachedDataset(self._cache, stream_name) 95 for stream_name in self._processed_streams 96 }
Return a mapping of stream names to cached datasets.
98 @property 99 def cache(self) -> CacheBase: 100 """Return the cache object.""" 101 return self._cache
Return the cache object.
24class SecretSourceEnum(str, Enum): 25 """Enumeration of secret sources supported by PyAirbyte.""" 26 27 ENV = "env" 28 DOTENV = "dotenv" 29 GOOGLE_COLAB = "google_colab" 30 GOOGLE_GSM = "google_gsm" # Not enabled by default 31 32 PROMPT = "prompt" 33 34 def __str__(self) -> str: 35 """Return the string representation of the enum value.""" 36 return self.value
Enumeration of secret sources supported by PyAirbyte.
68class Source(ConnectorBase): # noqa: PLR0904 69 """A class representing a source that can be called.""" 70 71 connector_type = "source" 72 73 def __init__( 74 self, 75 executor: Executor, 76 name: str, 77 config: dict[str, Any] | None = None, 78 *, 79 config_change_callback: ConfigChangeCallback | None = None, 80 streams: str | list[str] | None = None, 81 validate: bool = False, 82 cursor_key_overrides: dict[str, str] | None = None, 83 primary_key_overrides: dict[str, str | list[str]] | None = None, 84 ) -> None: 85 """Initialize the source. 86 87 If config is provided, it will be validated against the spec if validate is True. 88 """ 89 self._to_be_selected_streams: list[str] | str = [] 90 """Used to hold selection criteria before catalog is known.""" 91 92 super().__init__( 93 executor=executor, 94 name=name, 95 config=config, 96 config_change_callback=config_change_callback, 97 validate=validate, 98 ) 99 self._config_dict: dict[str, Any] | None = None 100 self._last_log_messages: list[str] = [] 101 self._discovered_catalog: AirbyteCatalog | None = None 102 self._selected_stream_names: list[str] = [] 103 104 self._cursor_key_overrides: dict[str, str] = {} 105 """A mapping of lower-cased stream names to cursor key overrides.""" 106 107 self._primary_key_overrides: dict[str, list[str]] = {} 108 """A mapping of lower-cased stream names to primary key overrides.""" 109 110 if config is not None: 111 self.set_config(config, validate=validate) 112 if streams is not None: 113 self.select_streams(streams) 114 if cursor_key_overrides is not None: 115 self.set_cursor_keys(**cursor_key_overrides) 116 if primary_key_overrides is not None: 117 self.set_primary_keys(**primary_key_overrides) 118 119 def set_streams(self, streams: list[str]) -> None: 120 """Deprecated. See select_streams().""" 121 warnings.warn( 122 "The 'set_streams' method is deprecated and will be removed in a future version. " 123 "Please use the 'select_streams' method instead.", 124 DeprecationWarning, 125 stacklevel=2, 126 ) 127 self.select_streams(streams) 128 129 def set_cursor_key( 130 self, 131 stream_name: str, 132 cursor_key: str, 133 ) -> None: 134 """Set the cursor for a single stream. 135 136 Note: 137 - This does not unset previously set cursors. 138 - The cursor key must be a single field name. 139 - Not all streams support custom cursors. If a stream does not support custom cursors, 140 the override may be ignored. 141 - Stream names are case insensitive, while field names are case sensitive. 142 - Stream names are not validated by PyAirbyte. If the stream name 143 does not exist in the catalog, the override may be ignored. 144 """ 145 self._cursor_key_overrides[stream_name.lower()] = cursor_key 146 147 def set_cursor_keys( 148 self, 149 **kwargs: str, 150 ) -> None: 151 """Override the cursor key for one or more streams. 152 153 Usage: 154 source.set_cursor_keys( 155 stream1="cursor1", 156 stream2="cursor2", 157 ) 158 159 Note: 160 - This does not unset previously set cursors. 161 - The cursor key must be a single field name. 162 - Not all streams support custom cursors. If a stream does not support custom cursors, 163 the override may be ignored. 164 - Stream names are case insensitive, while field names are case sensitive. 165 - Stream names are not validated by PyAirbyte. If the stream name 166 does not exist in the catalog, the override may be ignored. 167 """ 168 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()}) 169 170 def set_primary_key( 171 self, 172 stream_name: str, 173 primary_key: str | list[str], 174 ) -> None: 175 """Set the primary key for a single stream. 176 177 Note: 178 - This does not unset previously set primary keys. 179 - The primary key must be a single field name or a list of field names. 180 - Not all streams support overriding primary keys. If a stream does not support overriding 181 primary keys, the override may be ignored. 182 - Stream names are case insensitive, while field names are case sensitive. 183 - Stream names are not validated by PyAirbyte. If the stream name 184 does not exist in the catalog, the override may be ignored. 185 """ 186 self._primary_key_overrides[stream_name.lower()] = ( 187 primary_key if isinstance(primary_key, list) else [primary_key] 188 ) 189 190 def set_primary_keys( 191 self, 192 **kwargs: str | list[str], 193 ) -> None: 194 """Override the primary keys for one or more streams. 195 196 This does not unset previously set primary keys. 197 198 Usage: 199 source.set_primary_keys( 200 stream1="pk1", 201 stream2=["pk1", "pk2"], 202 ) 203 204 Note: 205 - This does not unset previously set primary keys. 206 - The primary key must be a single field name or a list of field names. 207 - Not all streams support overriding primary keys. If a stream does not support overriding 208 primary keys, the override may be ignored. 209 - Stream names are case insensitive, while field names are case sensitive. 210 - Stream names are not validated by PyAirbyte. If the stream name 211 does not exist in the catalog, the override may be ignored. 212 """ 213 self._primary_key_overrides.update( 214 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 215 ) 216 217 def _log_warning_preselected_stream(self, streams: str | list[str]) -> None: 218 """Logs a warning message indicating stream selection which are not selected yet.""" 219 if streams == "*": 220 print( 221 "Warning: Config is not set yet. All streams will be selected after config is set.", 222 file=sys.stderr, 223 ) 224 else: 225 print( 226 "Warning: Config is not set yet. " 227 f"Streams to be selected after config is set: {streams}", 228 file=sys.stderr, 229 ) 230 231 def select_all_streams(self) -> None: 232 """Select all streams. 233 234 This is a more streamlined equivalent to: 235 > source.select_streams(source.get_available_streams()). 236 """ 237 if self._config_dict is None: 238 self._to_be_selected_streams = "*" 239 self._log_warning_preselected_stream(self._to_be_selected_streams) 240 return 241 242 self._selected_stream_names = self.get_available_streams() 243 244 def select_streams(self, streams: str | list[str]) -> None: 245 """Select the stream names that should be read from the connector. 246 247 Args: 248 streams: A list of stream names to select. If set to "*", all streams will be selected. 249 250 Currently, if this is not set, all streams will be read. 251 """ 252 if self._config_dict is None: 253 self._to_be_selected_streams = streams 254 self._log_warning_preselected_stream(streams) 255 return 256 257 if streams == "*": 258 self.select_all_streams() 259 return 260 261 if isinstance(streams, str): 262 # If a single stream is provided, convert it to a one-item list 263 streams = [streams] 264 265 available_streams = self.get_available_streams() 266 for stream in streams: 267 if stream not in available_streams: 268 raise exc.AirbyteStreamNotFoundError( 269 stream_name=stream, 270 connector_name=self.name, 271 available_streams=available_streams, 272 ) 273 self._selected_stream_names = streams 274 275 def get_selected_streams(self) -> list[str]: 276 """Get the selected streams. 277 278 If no streams are selected, return an empty list. 279 """ 280 return self._selected_stream_names 281 282 def set_config( 283 self, 284 config: dict[str, Any], 285 *, 286 validate: bool = True, 287 ) -> None: 288 """Set the config for the connector. 289 290 If validate is True, raise an exception if the config fails validation. 291 292 If validate is False, validation will be deferred until check() or validate_config() 293 is called. 294 """ 295 if validate: 296 self.validate_config(config) 297 298 self._config_dict = config 299 300 if self._to_be_selected_streams: 301 self.select_streams(self._to_be_selected_streams) 302 self._to_be_selected_streams = [] 303 304 def _discover(self) -> AirbyteCatalog: 305 """Call discover on the connector. 306 307 This involves the following steps: 308 - Write the config to a temporary file 309 - execute the connector with discover --config <config_file> 310 - Listen to the messages and return the first AirbyteCatalog that comes along. 311 - Make sure the subprocess is killed when the function returns. 312 """ 313 with as_temp_files([self._hydrated_config]) as [config_file]: 314 for msg in self._execute(["discover", "--config", config_file]): 315 if msg.type == Type.CATALOG and msg.catalog: 316 return msg.catalog 317 raise exc.AirbyteConnectorMissingCatalogError( 318 connector_name=self.name, 319 log_text=self._last_log_messages, 320 ) 321 322 def get_available_streams(self) -> list[str]: 323 """Get the available streams from the spec.""" 324 return [s.name for s in self.discovered_catalog.streams] 325 326 def _get_incremental_stream_names(self) -> list[str]: 327 """Get the name of streams that support incremental sync.""" 328 return [ 329 stream.name 330 for stream in self.discovered_catalog.streams 331 if SyncMode.incremental in stream.supported_sync_modes 332 ] 333 334 @override 335 def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: 336 """Call spec on the connector. 337 338 This involves the following steps: 339 * execute the connector with spec 340 * Listen to the messages and return the first AirbyteCatalog that comes along. 341 * Make sure the subprocess is killed when the function returns. 342 """ 343 if force_refresh or self._spec is None: 344 for msg in self._execute(["spec"]): 345 if msg.type == Type.SPEC and msg.spec: 346 self._spec = msg.spec 347 break 348 349 if self._spec: 350 return self._spec 351 352 raise exc.AirbyteConnectorMissingSpecError( 353 connector_name=self.name, 354 log_text=self._last_log_messages, 355 ) 356 357 @property 358 def config_spec(self) -> dict[str, Any]: 359 """Generate a configuration spec for this connector, as a JSON Schema definition. 360 361 This function generates a JSON Schema dictionary with configuration specs for the 362 current connector, as a dictionary. 363 364 Returns: 365 dict: The JSON Schema configuration spec as a dictionary. 366 """ 367 return self._get_spec(force_refresh=True).connectionSpecification 368 369 @property 370 def _yaml_spec(self) -> str: 371 """Get the spec as a yaml string. 372 373 For now, the primary use case is for writing and debugging a valid config for a source. 374 375 This is private for now because we probably want better polish before exposing this 376 as a stable interface. This will also get easier when we have docs links with this info 377 for each connector. 378 """ 379 spec_obj: ConnectorSpecification = self._get_spec() 380 spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True) 381 # convert to a yaml string 382 return yaml.dump(spec_dict) 383 384 @property 385 def docs_url(self) -> str: 386 """Get the URL to the connector's documentation.""" 387 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 388 "source-", "" 389 ) 390 391 @property 392 def discovered_catalog(self) -> AirbyteCatalog: 393 """Get the raw catalog for the given streams. 394 395 If the catalog is not yet known, we call discover to get it. 396 """ 397 if self._discovered_catalog is None: 398 self._discovered_catalog = self._discover() 399 400 return self._discovered_catalog 401 402 @property 403 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 404 """Get the configured catalog for the given streams. 405 406 If the raw catalog is not yet known, we call discover to get it. 407 408 If no specific streams are selected, we return a catalog that syncs all available streams. 409 410 TODO: We should consider disabling by default the streams that the connector would 411 disable by default. (For instance, streams that require a premium license are sometimes 412 disabled by default within the connector.) 413 """ 414 # Ensure discovered catalog is cached before we start 415 _ = self.discovered_catalog 416 417 # Filter for selected streams if set, otherwise use all available streams: 418 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 419 return self.get_configured_catalog(streams=streams_filter) 420 421 def get_configured_catalog( 422 self, 423 streams: Literal["*"] | list[str] | None = None, 424 *, 425 force_full_refresh: bool = False, 426 ) -> ConfiguredAirbyteCatalog: 427 """Get a configured catalog for the given streams. 428 429 If no streams are provided, the selected streams will be used. If no streams are selected, 430 all available streams will be used. 431 432 If '*' is provided, all available streams will be used. 433 434 If force_full_refresh is True, streams will be configured with full_refresh sync mode 435 when supported by the stream. Otherwise, incremental sync mode is used when supported. 436 """ 437 selected_streams: list[str] = [] 438 if streams is None: 439 selected_streams = self._selected_stream_names or self.get_available_streams() 440 elif streams == "*": 441 selected_streams = self.get_available_streams() 442 elif isinstance(streams, list): 443 selected_streams = streams 444 else: 445 raise exc.PyAirbyteInputError( 446 message="Invalid streams argument.", 447 input_value=streams, 448 ) 449 450 def _get_sync_mode(stream: AirbyteStream) -> SyncMode: 451 """Determine the sync mode for a stream based on force_full_refresh and support.""" 452 # Use getattr to handle mocks or streams without supported_sync_modes attribute 453 supported_modes = getattr(stream, "supported_sync_modes", None) 454 455 if force_full_refresh: 456 # When force_full_refresh is True, prefer full_refresh if supported 457 if supported_modes and SyncMode.full_refresh in supported_modes: 458 return SyncMode.full_refresh 459 # Fall back to incremental if full_refresh is not supported 460 return SyncMode.incremental 461 462 # Default behavior: preserve previous semantics (always incremental) 463 return SyncMode.incremental 464 465 return ConfiguredAirbyteCatalog( 466 streams=[ 467 ConfiguredAirbyteStream( 468 stream=stream, 469 destination_sync_mode=DestinationSyncMode.overwrite, 470 sync_mode=_get_sync_mode(stream), 471 primary_key=( 472 [self._primary_key_overrides[stream.name.lower()]] 473 if stream.name.lower() in self._primary_key_overrides 474 else stream.source_defined_primary_key 475 ), 476 cursor_field=( 477 [self._cursor_key_overrides[stream.name.lower()]] 478 if stream.name.lower() in self._cursor_key_overrides 479 else stream.default_cursor_field 480 ), 481 # These are unused in the current implementation: 482 generation_id=None, 483 minimum_generation_id=None, 484 sync_id=None, 485 ) 486 for stream in self.discovered_catalog.streams 487 if stream.name in selected_streams 488 ], 489 ) 490 491 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 492 """Return the JSON Schema spec for the specified stream name.""" 493 catalog: AirbyteCatalog = self.discovered_catalog 494 found: list[AirbyteStream] = [ 495 stream for stream in catalog.streams if stream.name == stream_name 496 ] 497 498 if len(found) == 0: 499 raise exc.PyAirbyteInputError( 500 message="Stream name does not exist in catalog.", 501 input_value=stream_name, 502 ) 503 504 if len(found) > 1: 505 raise exc.PyAirbyteInternalError( 506 message="Duplicate streams found with the same name.", 507 context={ 508 "found_streams": found, 509 }, 510 ) 511 512 return found[0].json_schema 513 514 def get_records( 515 self, 516 stream: str, 517 *, 518 limit: int | None = None, 519 stop_event: threading.Event | None = None, 520 normalize_field_names: bool = False, 521 prune_undeclared_fields: bool = True, 522 ) -> LazyDataset: 523 """Read a stream from the connector. 524 525 Args: 526 stream: The name of the stream to read. 527 limit: The maximum number of records to read. If None, all records will be read. 528 stop_event: If set, the event can be triggered by the caller to stop reading records 529 and terminate the process. 530 normalize_field_names: When `True`, field names will be normalized to lower case, with 531 special characters removed. This matches the behavior of PyAirbyte caches and most 532 Airbyte destinations. 533 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 534 which generally matches the behavior of PyAirbyte caches and most Airbyte 535 destinations, specifically when you expect the catalog may be stale. You can disable 536 this to keep all fields in the records. 537 538 This involves the following steps: 539 * Call discover to get the catalog 540 * Generate a configured catalog that syncs the given stream in full_refresh mode 541 * Write the configured catalog and the config to a temporary file 542 * execute the connector with read --config <config_file> --catalog <catalog_file> 543 * Listen to the messages and return the first AirbyteRecordMessages that come along. 544 * Make sure the subprocess is killed when the function returns. 545 """ 546 stop_event = stop_event or threading.Event() 547 configured_catalog = self.get_configured_catalog(streams=[stream]) 548 if len(configured_catalog.streams) == 0: 549 raise exc.PyAirbyteInputError( 550 message="Requested stream does not exist.", 551 context={ 552 "stream": stream, 553 "available_streams": self.get_available_streams(), 554 "connector_name": self.name, 555 }, 556 ) from KeyError(stream) 557 558 configured_stream = configured_catalog.streams[0] 559 560 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 561 yield from records 562 563 stream_record_handler = StreamRecordHandler( 564 json_schema=self.get_stream_json_schema(stream), 565 prune_extra_fields=prune_undeclared_fields, 566 normalize_keys=normalize_field_names, 567 ) 568 569 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 570 progress_tracker = ProgressTracker( 571 ProgressStyle.PLAIN, 572 source=self, 573 cache=None, 574 destination=None, 575 expected_streams=[stream], 576 ) 577 578 iterator: Iterator[dict[str, Any]] = ( 579 StreamRecord.from_record_message( 580 record_message=record.record, 581 stream_record_handler=stream_record_handler, 582 ) 583 for record in self._read_with_catalog( 584 catalog=configured_catalog, 585 progress_tracker=progress_tracker, 586 stop_event=stop_event, 587 ) 588 if record.record 589 ) 590 if limit is not None: 591 # Stop the iterator after the limit is reached 592 iterator = islice(iterator, limit) 593 594 return LazyDataset( 595 iterator, 596 stream_metadata=configured_stream, 597 stop_event=stop_event, 598 progress_tracker=progress_tracker, 599 ) 600 601 def get_documents( 602 self, 603 stream: str, 604 title_property: str | None = None, 605 content_properties: list[str] | None = None, 606 metadata_properties: list[str] | None = None, 607 *, 608 render_metadata: bool = False, 609 ) -> Iterable[Document]: 610 """Read a stream from the connector and return the records as documents. 611 612 If metadata_properties is not set, all properties that are not content will be added to 613 the metadata. 614 615 If render_metadata is True, metadata will be rendered in the document, as well as the 616 the main content. 617 """ 618 return self.get_records(stream).to_documents( 619 title_property=title_property, 620 content_properties=content_properties, 621 metadata_properties=metadata_properties, 622 render_metadata=render_metadata, 623 ) 624 625 def get_samples( 626 self, 627 streams: list[str] | Literal["*"] | None = None, 628 *, 629 limit: int = 5, 630 on_error: Literal["raise", "ignore", "log"] = "raise", 631 ) -> dict[str, InMemoryDataset | None]: 632 """Get a sample of records from the given streams.""" 633 if streams == "*": 634 streams = self.get_available_streams() 635 elif streams is None: 636 streams = self.get_selected_streams() 637 638 results: dict[str, InMemoryDataset | None] = {} 639 for stream in streams: 640 stop_event = threading.Event() 641 try: 642 results[stream] = self.get_records( 643 stream, 644 limit=limit, 645 stop_event=stop_event, 646 ).fetch_all() 647 stop_event.set() 648 except Exception as ex: 649 results[stream] = None 650 if on_error == "ignore": 651 continue 652 653 if on_error == "raise": 654 raise ex from None 655 656 if on_error == "log": 657 print(f"Error fetching sample for stream '{stream}': {ex}") 658 659 return results 660 661 def print_samples( 662 self, 663 streams: list[str] | Literal["*"] | None = None, 664 *, 665 limit: int = 5, 666 on_error: Literal["raise", "ignore", "log"] = "log", 667 ) -> None: 668 """Print a sample of records from the given streams.""" 669 internal_cols: list[str] = [ 670 AB_EXTRACTED_AT_COLUMN, 671 AB_META_COLUMN, 672 AB_RAW_ID_COLUMN, 673 ] 674 col_limit = 10 675 if streams == "*": 676 streams = self.get_available_streams() 677 elif streams is None: 678 streams = self.get_selected_streams() 679 680 console = Console() 681 682 console.print( 683 Markdown( 684 f"# Sample Records from `{self.name}` ({len(streams)} selected streams)", 685 justify="left", 686 ) 687 ) 688 689 for stream in streams: 690 console.print(Markdown(f"## `{stream}` Stream Sample", justify="left")) 691 samples = self.get_samples( 692 streams=[stream], 693 limit=limit, 694 on_error=on_error, 695 ) 696 dataset = samples[stream] 697 698 table = Table( 699 show_header=True, 700 show_lines=True, 701 ) 702 if dataset is None: 703 console.print( 704 Markdown("**⚠️ `Error fetching sample records.` ⚠️**"), 705 ) 706 continue 707 708 if len(dataset.column_names) > col_limit: 709 # We'll pivot the columns so each column is its own row 710 table.add_column("Column Name") 711 for _ in range(len(dataset)): 712 table.add_column(overflow="fold") 713 for col in dataset.column_names: 714 table.add_row( 715 Markdown(f"**`{col}`**"), 716 *[escape(str(record[col])) for record in dataset], 717 ) 718 else: 719 for col in dataset.column_names: 720 table.add_column( 721 Markdown(f"**`{col}`**"), 722 overflow="fold", 723 ) 724 725 for record in dataset: 726 table.add_row( 727 *[ 728 escape(str(val)) 729 for key, val in record.items() 730 # Exclude internal Airbyte columns. 731 if key not in internal_cols 732 ] 733 ) 734 735 console.print(table) 736 737 console.print(Markdown("--------------")) 738 739 def _get_airbyte_message_iterator( 740 self, 741 *, 742 streams: Literal["*"] | list[str] | None = None, 743 state_provider: StateProviderBase | None = None, 744 progress_tracker: ProgressTracker, 745 force_full_refresh: bool = False, 746 ) -> AirbyteMessageIterator: 747 """Get an AirbyteMessageIterator for this source.""" 748 return AirbyteMessageIterator( 749 self._read_with_catalog( 750 catalog=self.get_configured_catalog( 751 streams=streams, 752 force_full_refresh=force_full_refresh, 753 ), 754 state=state_provider if not force_full_refresh else None, 755 progress_tracker=progress_tracker, 756 ) 757 ) 758 759 def _read_with_catalog( 760 self, 761 catalog: ConfiguredAirbyteCatalog, 762 progress_tracker: ProgressTracker, 763 *, 764 state: StateProviderBase | None = None, 765 stop_event: threading.Event | None = None, 766 ) -> Generator[AirbyteMessage, None, None]: 767 """Call read on the connector. 768 769 This involves the following steps: 770 * Write the config to a temporary file 771 * execute the connector with read --config <config_file> --catalog <catalog_file> 772 * Listen to the messages and return the AirbyteRecordMessages that come along. 773 * Send out telemetry on the performed sync (with information about which source was used and 774 the type of the cache) 775 """ 776 with as_temp_files( 777 [ 778 self._hydrated_config, 779 catalog.model_dump_json(exclude_none=True), 780 state.to_state_input_file_text() if state else "[]", 781 ] 782 ) as [ 783 config_file, 784 catalog_file, 785 state_file, 786 ]: 787 message_generator = self._execute( 788 [ 789 "read", 790 "--config", 791 config_file, 792 "--catalog", 793 catalog_file, 794 "--state", 795 state_file, 796 ], 797 progress_tracker=progress_tracker, 798 ) 799 for message in progress_tracker.tally_records_read(message_generator): 800 if stop_event and stop_event.is_set(): 801 progress_tracker._log_sync_cancel() # noqa: SLF001 802 time.sleep(0.1) 803 return 804 805 yield message 806 807 progress_tracker.log_read_complete() 808 809 def _peek_airbyte_message( 810 self, 811 message: AirbyteMessage, 812 *, 813 raise_on_error: bool = True, 814 ) -> None: 815 """Process an Airbyte message. 816 817 This method handles reading Airbyte messages and taking action, if needed, based on the 818 message type. For instance, log messages are logged, records are tallied, and errors are 819 raised as exceptions if `raise_on_error` is True. 820 821 Raises: 822 AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. 823 """ 824 super()._peek_airbyte_message(message, raise_on_error=raise_on_error) 825 826 def _log_incremental_streams( 827 self, 828 *, 829 incremental_streams: set[str] | None = None, 830 ) -> None: 831 """Log the streams which are using incremental sync mode.""" 832 log_message = ( 833 "The following streams are currently using incremental sync:\n" 834 f"{incremental_streams}\n" 835 "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method." 836 ) 837 print(log_message, file=sys.stderr) 838 839 def read( 840 self, 841 cache: CacheBase | None = None, 842 *, 843 streams: str | list[str] | None = None, 844 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 845 force_full_refresh: bool = False, 846 skip_validation: bool = False, 847 ) -> ReadResult: 848 """Read from the connector and write to the cache. 849 850 Args: 851 cache: The cache to write to. If not set, a default cache will be used. 852 streams: Optional if already set. A list of stream names to select for reading. If set 853 to "*", all streams will be selected. 854 write_strategy: The strategy to use when writing to the cache. If a string, it must be 855 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 856 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 857 WriteStrategy.AUTO. 858 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 859 streams will be read in incremental mode if supported by the connector. This option 860 must be True when using the "replace" strategy. 861 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 862 running the connector. This can be helpful in debugging, when you want to send 863 configurations to the connector that otherwise might be rejected by JSON Schema 864 validation rules. 865 """ 866 cache = cache or get_default_cache() 867 progress_tracker = ProgressTracker( 868 source=self, 869 cache=cache, 870 destination=None, 871 expected_streams=None, # Will be set later 872 ) 873 874 # Set up state provider if not in full refresh mode 875 if force_full_refresh: 876 state_provider: StateProviderBase | None = None 877 else: 878 state_provider = cache.get_state_provider( 879 source_name=self._name, 880 ) 881 state_writer = cache.get_state_writer(source_name=self._name) 882 883 if streams: 884 self.select_streams(streams) 885 886 if not self._selected_stream_names: 887 raise exc.PyAirbyteNoStreamsSelectedError( 888 connector_name=self.name, 889 available_streams=self.get_available_streams(), 890 ) 891 892 try: 893 result = self._read_to_cache( 894 cache=cache, 895 catalog_provider=CatalogProvider( 896 self.get_configured_catalog(force_full_refresh=force_full_refresh) 897 ), 898 stream_names=self._selected_stream_names, 899 state_provider=state_provider, 900 state_writer=state_writer, 901 write_strategy=write_strategy, 902 force_full_refresh=force_full_refresh, 903 skip_validation=skip_validation, 904 progress_tracker=progress_tracker, 905 ) 906 except exc.PyAirbyteInternalError as ex: 907 progress_tracker.log_failure(exception=ex) 908 raise exc.AirbyteConnectorFailedError( 909 connector_name=self.name, 910 log_text=self._last_log_messages, 911 ) from ex 912 except Exception as ex: 913 progress_tracker.log_failure(exception=ex) 914 raise 915 916 progress_tracker.log_success() 917 return result 918 919 def _read_to_cache( # noqa: PLR0913 # Too many arguments 920 self, 921 cache: CacheBase, 922 *, 923 catalog_provider: CatalogProvider, 924 stream_names: list[str], 925 state_provider: StateProviderBase | None, 926 state_writer: StateWriterBase | None, 927 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 928 force_full_refresh: bool = False, 929 skip_validation: bool = False, 930 progress_tracker: ProgressTracker, 931 ) -> ReadResult: 932 """Internal read method.""" 933 if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: 934 warnings.warn( 935 message=( 936 "Using `REPLACE` strategy without also setting `force_full_refresh=True` " 937 "could result in data loss. " 938 "To silence this warning, use the following: " 939 'warnings.filterwarnings("ignore", ' 940 'category="airbyte.warnings.PyAirbyteDataLossWarning")`' 941 ), 942 category=exc.PyAirbyteDataLossWarning, 943 stacklevel=1, 944 ) 945 if isinstance(write_strategy, str): 946 try: 947 write_strategy = WriteStrategy(write_strategy) 948 except ValueError: 949 raise exc.PyAirbyteInputError( 950 message="Invalid strategy", 951 context={ 952 "write_strategy": write_strategy, 953 "available_strategies": [ 954 s.value 955 for s in WriteStrategy # pyrefly: ignore[not-iterable] 956 ], 957 }, 958 ) from None 959 960 # Run optional validation step 961 if not skip_validation: 962 self.validate_config() 963 964 # Log incremental stream if incremental streams are known 965 if state_provider and state_provider.known_stream_names: 966 # Retrieve set of the known streams support which support incremental sync 967 incremental_streams = ( 968 set(self._get_incremental_stream_names()) 969 & state_provider.known_stream_names 970 & set(self.get_selected_streams()) 971 ) 972 if incremental_streams: 973 self._log_incremental_streams(incremental_streams=incremental_streams) 974 975 airbyte_message_iterator = AirbyteMessageIterator( 976 self._read_with_catalog( 977 catalog=catalog_provider.configured_catalog, 978 state=state_provider, 979 progress_tracker=progress_tracker, 980 ) 981 ) 982 cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API 983 stdin=airbyte_message_iterator, 984 catalog_provider=catalog_provider, 985 write_strategy=write_strategy, 986 state_writer=state_writer, 987 progress_tracker=progress_tracker, 988 ) 989 990 # Flush the WAL, if applicable 991 cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API 992 993 return ReadResult( 994 source_name=self.name, 995 progress_tracker=progress_tracker, 996 processed_streams=stream_names, 997 cache=cache, 998 )
A class representing a source that can be called.
73 def __init__( 74 self, 75 executor: Executor, 76 name: str, 77 config: dict[str, Any] | None = None, 78 *, 79 config_change_callback: ConfigChangeCallback | None = None, 80 streams: str | list[str] | None = None, 81 validate: bool = False, 82 cursor_key_overrides: dict[str, str] | None = None, 83 primary_key_overrides: dict[str, str | list[str]] | None = None, 84 ) -> None: 85 """Initialize the source. 86 87 If config is provided, it will be validated against the spec if validate is True. 88 """ 89 self._to_be_selected_streams: list[str] | str = [] 90 """Used to hold selection criteria before catalog is known.""" 91 92 super().__init__( 93 executor=executor, 94 name=name, 95 config=config, 96 config_change_callback=config_change_callback, 97 validate=validate, 98 ) 99 self._config_dict: dict[str, Any] | None = None 100 self._last_log_messages: list[str] = [] 101 self._discovered_catalog: AirbyteCatalog | None = None 102 self._selected_stream_names: list[str] = [] 103 104 self._cursor_key_overrides: dict[str, str] = {} 105 """A mapping of lower-cased stream names to cursor key overrides.""" 106 107 self._primary_key_overrides: dict[str, list[str]] = {} 108 """A mapping of lower-cased stream names to primary key overrides.""" 109 110 if config is not None: 111 self.set_config(config, validate=validate) 112 if streams is not None: 113 self.select_streams(streams) 114 if cursor_key_overrides is not None: 115 self.set_cursor_keys(**cursor_key_overrides) 116 if primary_key_overrides is not None: 117 self.set_primary_keys(**primary_key_overrides)
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
119 def set_streams(self, streams: list[str]) -> None: 120 """Deprecated. See select_streams().""" 121 warnings.warn( 122 "The 'set_streams' method is deprecated and will be removed in a future version. " 123 "Please use the 'select_streams' method instead.", 124 DeprecationWarning, 125 stacklevel=2, 126 ) 127 self.select_streams(streams)
Deprecated. See select_streams().
129 def set_cursor_key( 130 self, 131 stream_name: str, 132 cursor_key: str, 133 ) -> None: 134 """Set the cursor for a single stream. 135 136 Note: 137 - This does not unset previously set cursors. 138 - The cursor key must be a single field name. 139 - Not all streams support custom cursors. If a stream does not support custom cursors, 140 the override may be ignored. 141 - Stream names are case insensitive, while field names are case sensitive. 142 - Stream names are not validated by PyAirbyte. If the stream name 143 does not exist in the catalog, the override may be ignored. 144 """ 145 self._cursor_key_overrides[stream_name.lower()] = cursor_key
Set the cursor for a single stream.
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
147 def set_cursor_keys( 148 self, 149 **kwargs: str, 150 ) -> None: 151 """Override the cursor key for one or more streams. 152 153 Usage: 154 source.set_cursor_keys( 155 stream1="cursor1", 156 stream2="cursor2", 157 ) 158 159 Note: 160 - This does not unset previously set cursors. 161 - The cursor key must be a single field name. 162 - Not all streams support custom cursors. If a stream does not support custom cursors, 163 the override may be ignored. 164 - Stream names are case insensitive, while field names are case sensitive. 165 - Stream names are not validated by PyAirbyte. If the stream name 166 does not exist in the catalog, the override may be ignored. 167 """ 168 self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
Override the cursor key for one or more streams.
Usage:
source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )
Note:
- This does not unset previously set cursors.
- The cursor key must be a single field name.
- Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
170 def set_primary_key( 171 self, 172 stream_name: str, 173 primary_key: str | list[str], 174 ) -> None: 175 """Set the primary key for a single stream. 176 177 Note: 178 - This does not unset previously set primary keys. 179 - The primary key must be a single field name or a list of field names. 180 - Not all streams support overriding primary keys. If a stream does not support overriding 181 primary keys, the override may be ignored. 182 - Stream names are case insensitive, while field names are case sensitive. 183 - Stream names are not validated by PyAirbyte. If the stream name 184 does not exist in the catalog, the override may be ignored. 185 """ 186 self._primary_key_overrides[stream_name.lower()] = ( 187 primary_key if isinstance(primary_key, list) else [primary_key] 188 )
Set the primary key for a single stream.
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
190 def set_primary_keys( 191 self, 192 **kwargs: str | list[str], 193 ) -> None: 194 """Override the primary keys for one or more streams. 195 196 This does not unset previously set primary keys. 197 198 Usage: 199 source.set_primary_keys( 200 stream1="pk1", 201 stream2=["pk1", "pk2"], 202 ) 203 204 Note: 205 - This does not unset previously set primary keys. 206 - The primary key must be a single field name or a list of field names. 207 - Not all streams support overriding primary keys. If a stream does not support overriding 208 primary keys, the override may be ignored. 209 - Stream names are case insensitive, while field names are case sensitive. 210 - Stream names are not validated by PyAirbyte. If the stream name 211 does not exist in the catalog, the override may be ignored. 212 """ 213 self._primary_key_overrides.update( 214 {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()} 215 )
Override the primary keys for one or more streams.
This does not unset previously set primary keys.
Usage:
source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )
Note:
- This does not unset previously set primary keys.
- The primary key must be a single field name or a list of field names.
- Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
- Stream names are case insensitive, while field names are case sensitive.
- Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
231 def select_all_streams(self) -> None: 232 """Select all streams. 233 234 This is a more streamlined equivalent to: 235 > source.select_streams(source.get_available_streams()). 236 """ 237 if self._config_dict is None: 238 self._to_be_selected_streams = "*" 239 self._log_warning_preselected_stream(self._to_be_selected_streams) 240 return 241 242 self._selected_stream_names = self.get_available_streams()
Select all streams.
This is a more streamlined equivalent to:
source.select_streams(source.get_available_streams()).
244 def select_streams(self, streams: str | list[str]) -> None: 245 """Select the stream names that should be read from the connector. 246 247 Args: 248 streams: A list of stream names to select. If set to "*", all streams will be selected. 249 250 Currently, if this is not set, all streams will be read. 251 """ 252 if self._config_dict is None: 253 self._to_be_selected_streams = streams 254 self._log_warning_preselected_stream(streams) 255 return 256 257 if streams == "*": 258 self.select_all_streams() 259 return 260 261 if isinstance(streams, str): 262 # If a single stream is provided, convert it to a one-item list 263 streams = [streams] 264 265 available_streams = self.get_available_streams() 266 for stream in streams: 267 if stream not in available_streams: 268 raise exc.AirbyteStreamNotFoundError( 269 stream_name=stream, 270 connector_name=self.name, 271 available_streams=available_streams, 272 ) 273 self._selected_stream_names = streams
Select the stream names that should be read from the connector.
Arguments:
- streams: A list of stream names to select. If set to "*", all streams will be selected.
Currently, if this is not set, all streams will be read.
275 def get_selected_streams(self) -> list[str]: 276 """Get the selected streams. 277 278 If no streams are selected, return an empty list. 279 """ 280 return self._selected_stream_names
Get the selected streams.
If no streams are selected, return an empty list.
282 def set_config( 283 self, 284 config: dict[str, Any], 285 *, 286 validate: bool = True, 287 ) -> None: 288 """Set the config for the connector. 289 290 If validate is True, raise an exception if the config fails validation. 291 292 If validate is False, validation will be deferred until check() or validate_config() 293 is called. 294 """ 295 if validate: 296 self.validate_config(config) 297 298 self._config_dict = config 299 300 if self._to_be_selected_streams: 301 self.select_streams(self._to_be_selected_streams) 302 self._to_be_selected_streams = []
Set the config for the connector.
If validate is True, raise an exception if the config fails validation.
If validate is False, validation will be deferred until check() or validate_config() is called.
322 def get_available_streams(self) -> list[str]: 323 """Get the available streams from the spec.""" 324 return [s.name for s in self.discovered_catalog.streams]
Get the available streams from the spec.
357 @property 358 def config_spec(self) -> dict[str, Any]: 359 """Generate a configuration spec for this connector, as a JSON Schema definition. 360 361 This function generates a JSON Schema dictionary with configuration specs for the 362 current connector, as a dictionary. 363 364 Returns: 365 dict: The JSON Schema configuration spec as a dictionary. 366 """ 367 return self._get_spec(force_refresh=True).connectionSpecification
Generate a configuration spec for this connector, as a JSON Schema definition.
This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.
Returns:
dict: The JSON Schema configuration spec as a dictionary.
384 @property 385 def docs_url(self) -> str: 386 """Get the URL to the connector's documentation.""" 387 return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace( 388 "source-", "" 389 )
Get the URL to the connector's documentation.
391 @property 392 def discovered_catalog(self) -> AirbyteCatalog: 393 """Get the raw catalog for the given streams. 394 395 If the catalog is not yet known, we call discover to get it. 396 """ 397 if self._discovered_catalog is None: 398 self._discovered_catalog = self._discover() 399 400 return self._discovered_catalog
Get the raw catalog for the given streams.
If the catalog is not yet known, we call discover to get it.
402 @property 403 def configured_catalog(self) -> ConfiguredAirbyteCatalog: 404 """Get the configured catalog for the given streams. 405 406 If the raw catalog is not yet known, we call discover to get it. 407 408 If no specific streams are selected, we return a catalog that syncs all available streams. 409 410 TODO: We should consider disabling by default the streams that the connector would 411 disable by default. (For instance, streams that require a premium license are sometimes 412 disabled by default within the connector.) 413 """ 414 # Ensure discovered catalog is cached before we start 415 _ = self.discovered_catalog 416 417 # Filter for selected streams if set, otherwise use all available streams: 418 streams_filter: list[str] = self._selected_stream_names or self.get_available_streams() 419 return self.get_configured_catalog(streams=streams_filter)
Get the configured catalog for the given streams.
If the raw catalog is not yet known, we call discover to get it.
If no specific streams are selected, we return a catalog that syncs all available streams.
TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)
421 def get_configured_catalog( 422 self, 423 streams: Literal["*"] | list[str] | None = None, 424 *, 425 force_full_refresh: bool = False, 426 ) -> ConfiguredAirbyteCatalog: 427 """Get a configured catalog for the given streams. 428 429 If no streams are provided, the selected streams will be used. If no streams are selected, 430 all available streams will be used. 431 432 If '*' is provided, all available streams will be used. 433 434 If force_full_refresh is True, streams will be configured with full_refresh sync mode 435 when supported by the stream. Otherwise, incremental sync mode is used when supported. 436 """ 437 selected_streams: list[str] = [] 438 if streams is None: 439 selected_streams = self._selected_stream_names or self.get_available_streams() 440 elif streams == "*": 441 selected_streams = self.get_available_streams() 442 elif isinstance(streams, list): 443 selected_streams = streams 444 else: 445 raise exc.PyAirbyteInputError( 446 message="Invalid streams argument.", 447 input_value=streams, 448 ) 449 450 def _get_sync_mode(stream: AirbyteStream) -> SyncMode: 451 """Determine the sync mode for a stream based on force_full_refresh and support.""" 452 # Use getattr to handle mocks or streams without supported_sync_modes attribute 453 supported_modes = getattr(stream, "supported_sync_modes", None) 454 455 if force_full_refresh: 456 # When force_full_refresh is True, prefer full_refresh if supported 457 if supported_modes and SyncMode.full_refresh in supported_modes: 458 return SyncMode.full_refresh 459 # Fall back to incremental if full_refresh is not supported 460 return SyncMode.incremental 461 462 # Default behavior: preserve previous semantics (always incremental) 463 return SyncMode.incremental 464 465 return ConfiguredAirbyteCatalog( 466 streams=[ 467 ConfiguredAirbyteStream( 468 stream=stream, 469 destination_sync_mode=DestinationSyncMode.overwrite, 470 sync_mode=_get_sync_mode(stream), 471 primary_key=( 472 [self._primary_key_overrides[stream.name.lower()]] 473 if stream.name.lower() in self._primary_key_overrides 474 else stream.source_defined_primary_key 475 ), 476 cursor_field=( 477 [self._cursor_key_overrides[stream.name.lower()]] 478 if stream.name.lower() in self._cursor_key_overrides 479 else stream.default_cursor_field 480 ), 481 # These are unused in the current implementation: 482 generation_id=None, 483 minimum_generation_id=None, 484 sync_id=None, 485 ) 486 for stream in self.discovered_catalog.streams 487 if stream.name in selected_streams 488 ], 489 )
Get a configured catalog for the given streams.
If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.
If '*' is provided, all available streams will be used.
If force_full_refresh is True, streams will be configured with full_refresh sync mode when supported by the stream. Otherwise, incremental sync mode is used when supported.
491 def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]: 492 """Return the JSON Schema spec for the specified stream name.""" 493 catalog: AirbyteCatalog = self.discovered_catalog 494 found: list[AirbyteStream] = [ 495 stream for stream in catalog.streams if stream.name == stream_name 496 ] 497 498 if len(found) == 0: 499 raise exc.PyAirbyteInputError( 500 message="Stream name does not exist in catalog.", 501 input_value=stream_name, 502 ) 503 504 if len(found) > 1: 505 raise exc.PyAirbyteInternalError( 506 message="Duplicate streams found with the same name.", 507 context={ 508 "found_streams": found, 509 }, 510 ) 511 512 return found[0].json_schema
Return the JSON Schema spec for the specified stream name.
514 def get_records( 515 self, 516 stream: str, 517 *, 518 limit: int | None = None, 519 stop_event: threading.Event | None = None, 520 normalize_field_names: bool = False, 521 prune_undeclared_fields: bool = True, 522 ) -> LazyDataset: 523 """Read a stream from the connector. 524 525 Args: 526 stream: The name of the stream to read. 527 limit: The maximum number of records to read. If None, all records will be read. 528 stop_event: If set, the event can be triggered by the caller to stop reading records 529 and terminate the process. 530 normalize_field_names: When `True`, field names will be normalized to lower case, with 531 special characters removed. This matches the behavior of PyAirbyte caches and most 532 Airbyte destinations. 533 prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records, 534 which generally matches the behavior of PyAirbyte caches and most Airbyte 535 destinations, specifically when you expect the catalog may be stale. You can disable 536 this to keep all fields in the records. 537 538 This involves the following steps: 539 * Call discover to get the catalog 540 * Generate a configured catalog that syncs the given stream in full_refresh mode 541 * Write the configured catalog and the config to a temporary file 542 * execute the connector with read --config <config_file> --catalog <catalog_file> 543 * Listen to the messages and return the first AirbyteRecordMessages that come along. 544 * Make sure the subprocess is killed when the function returns. 545 """ 546 stop_event = stop_event or threading.Event() 547 configured_catalog = self.get_configured_catalog(streams=[stream]) 548 if len(configured_catalog.streams) == 0: 549 raise exc.PyAirbyteInputError( 550 message="Requested stream does not exist.", 551 context={ 552 "stream": stream, 553 "available_streams": self.get_available_streams(), 554 "connector_name": self.name, 555 }, 556 ) from KeyError(stream) 557 558 configured_stream = configured_catalog.streams[0] 559 560 def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]: 561 yield from records 562 563 stream_record_handler = StreamRecordHandler( 564 json_schema=self.get_stream_json_schema(stream), 565 prune_extra_fields=prune_undeclared_fields, 566 normalize_keys=normalize_field_names, 567 ) 568 569 # This method is non-blocking, so we use "PLAIN" to avoid a live progress display 570 progress_tracker = ProgressTracker( 571 ProgressStyle.PLAIN, 572 source=self, 573 cache=None, 574 destination=None, 575 expected_streams=[stream], 576 ) 577 578 iterator: Iterator[dict[str, Any]] = ( 579 StreamRecord.from_record_message( 580 record_message=record.record, 581 stream_record_handler=stream_record_handler, 582 ) 583 for record in self._read_with_catalog( 584 catalog=configured_catalog, 585 progress_tracker=progress_tracker, 586 stop_event=stop_event, 587 ) 588 if record.record 589 ) 590 if limit is not None: 591 # Stop the iterator after the limit is reached 592 iterator = islice(iterator, limit) 593 594 return LazyDataset( 595 iterator, 596 stream_metadata=configured_stream, 597 stop_event=stop_event, 598 progress_tracker=progress_tracker, 599 )
Read a stream from the connector.
Arguments:
- stream: The name of the stream to read.
- limit: The maximum number of records to read. If None, all records will be read.
- stop_event: If set, the event can be triggered by the caller to stop reading records and terminate the process.
- normalize_field_names: When
True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations. - prune_undeclared_fields: When
True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.
This involves the following steps:
- Call discover to get the catalog
- Generate a configured catalog that syncs the given stream in full_refresh mode
- Write the configured catalog and the config to a temporary file
- execute the connector with read --config
--catalog - Listen to the messages and return the first AirbyteRecordMessages that come along.
- Make sure the subprocess is killed when the function returns.
601 def get_documents( 602 self, 603 stream: str, 604 title_property: str | None = None, 605 content_properties: list[str] | None = None, 606 metadata_properties: list[str] | None = None, 607 *, 608 render_metadata: bool = False, 609 ) -> Iterable[Document]: 610 """Read a stream from the connector and return the records as documents. 611 612 If metadata_properties is not set, all properties that are not content will be added to 613 the metadata. 614 615 If render_metadata is True, metadata will be rendered in the document, as well as the 616 the main content. 617 """ 618 return self.get_records(stream).to_documents( 619 title_property=title_property, 620 content_properties=content_properties, 621 metadata_properties=metadata_properties, 622 render_metadata=render_metadata, 623 )
Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content.
625 def get_samples( 626 self, 627 streams: list[str] | Literal["*"] | None = None, 628 *, 629 limit: int = 5, 630 on_error: Literal["raise", "ignore", "log"] = "raise", 631 ) -> dict[str, InMemoryDataset | None]: 632 """Get a sample of records from the given streams.""" 633 if streams == "*": 634 streams = self.get_available_streams() 635 elif streams is None: 636 streams = self.get_selected_streams() 637 638 results: dict[str, InMemoryDataset | None] = {} 639 for stream in streams: 640 stop_event = threading.Event() 641 try: 642 results[stream] = self.get_records( 643 stream, 644 limit=limit, 645 stop_event=stop_event, 646 ).fetch_all() 647 stop_event.set() 648 except Exception as ex: 649 results[stream] = None 650 if on_error == "ignore": 651 continue 652 653 if on_error == "raise": 654 raise ex from None 655 656 if on_error == "log": 657 print(f"Error fetching sample for stream '{stream}': {ex}") 658 659 return results
Get a sample of records from the given streams.
661 def print_samples( 662 self, 663 streams: list[str] | Literal["*"] | None = None, 664 *, 665 limit: int = 5, 666 on_error: Literal["raise", "ignore", "log"] = "log", 667 ) -> None: 668 """Print a sample of records from the given streams.""" 669 internal_cols: list[str] = [ 670 AB_EXTRACTED_AT_COLUMN, 671 AB_META_COLUMN, 672 AB_RAW_ID_COLUMN, 673 ] 674 col_limit = 10 675 if streams == "*": 676 streams = self.get_available_streams() 677 elif streams is None: 678 streams = self.get_selected_streams() 679 680 console = Console() 681 682 console.print( 683 Markdown( 684 f"# Sample Records from `{self.name}` ({len(streams)} selected streams)", 685 justify="left", 686 ) 687 ) 688 689 for stream in streams: 690 console.print(Markdown(f"## `{stream}` Stream Sample", justify="left")) 691 samples = self.get_samples( 692 streams=[stream], 693 limit=limit, 694 on_error=on_error, 695 ) 696 dataset = samples[stream] 697 698 table = Table( 699 show_header=True, 700 show_lines=True, 701 ) 702 if dataset is None: 703 console.print( 704 Markdown("**⚠️ `Error fetching sample records.` ⚠️**"), 705 ) 706 continue 707 708 if len(dataset.column_names) > col_limit: 709 # We'll pivot the columns so each column is its own row 710 table.add_column("Column Name") 711 for _ in range(len(dataset)): 712 table.add_column(overflow="fold") 713 for col in dataset.column_names: 714 table.add_row( 715 Markdown(f"**`{col}`**"), 716 *[escape(str(record[col])) for record in dataset], 717 ) 718 else: 719 for col in dataset.column_names: 720 table.add_column( 721 Markdown(f"**`{col}`**"), 722 overflow="fold", 723 ) 724 725 for record in dataset: 726 table.add_row( 727 *[ 728 escape(str(val)) 729 for key, val in record.items() 730 # Exclude internal Airbyte columns. 731 if key not in internal_cols 732 ] 733 ) 734 735 console.print(table) 736 737 console.print(Markdown("--------------"))
Print a sample of records from the given streams.
839 def read( 840 self, 841 cache: CacheBase | None = None, 842 *, 843 streams: str | list[str] | None = None, 844 write_strategy: str | WriteStrategy = WriteStrategy.AUTO, 845 force_full_refresh: bool = False, 846 skip_validation: bool = False, 847 ) -> ReadResult: 848 """Read from the connector and write to the cache. 849 850 Args: 851 cache: The cache to write to. If not set, a default cache will be used. 852 streams: Optional if already set. A list of stream names to select for reading. If set 853 to "*", all streams will be selected. 854 write_strategy: The strategy to use when writing to the cache. If a string, it must be 855 one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one 856 of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or 857 WriteStrategy.AUTO. 858 force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, 859 streams will be read in incremental mode if supported by the connector. This option 860 must be True when using the "replace" strategy. 861 skip_validation: If True, PyAirbyte will not pre-validate the input configuration before 862 running the connector. This can be helpful in debugging, when you want to send 863 configurations to the connector that otherwise might be rejected by JSON Schema 864 validation rules. 865 """ 866 cache = cache or get_default_cache() 867 progress_tracker = ProgressTracker( 868 source=self, 869 cache=cache, 870 destination=None, 871 expected_streams=None, # Will be set later 872 ) 873 874 # Set up state provider if not in full refresh mode 875 if force_full_refresh: 876 state_provider: StateProviderBase | None = None 877 else: 878 state_provider = cache.get_state_provider( 879 source_name=self._name, 880 ) 881 state_writer = cache.get_state_writer(source_name=self._name) 882 883 if streams: 884 self.select_streams(streams) 885 886 if not self._selected_stream_names: 887 raise exc.PyAirbyteNoStreamsSelectedError( 888 connector_name=self.name, 889 available_streams=self.get_available_streams(), 890 ) 891 892 try: 893 result = self._read_to_cache( 894 cache=cache, 895 catalog_provider=CatalogProvider( 896 self.get_configured_catalog(force_full_refresh=force_full_refresh) 897 ), 898 stream_names=self._selected_stream_names, 899 state_provider=state_provider, 900 state_writer=state_writer, 901 write_strategy=write_strategy, 902 force_full_refresh=force_full_refresh, 903 skip_validation=skip_validation, 904 progress_tracker=progress_tracker, 905 ) 906 except exc.PyAirbyteInternalError as ex: 907 progress_tracker.log_failure(exception=ex) 908 raise exc.AirbyteConnectorFailedError( 909 connector_name=self.name, 910 log_text=self._last_log_messages, 911 ) from ex 912 except Exception as ex: 913 progress_tracker.log_failure(exception=ex) 914 raise 915 916 progress_tracker.log_success() 917 return result
Read from the connector and write to the cache.
Arguments:
- cache: The cache to write to. If not set, a default cache will be used.
- streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
- write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
- force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
- skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
175class StreamRecord(dict[str, Any]): 176 """The StreamRecord class is a case-aware, case-insensitive dictionary implementation. 177 178 It has these behaviors: 179 - When a key is retrieved, deleted, or checked for existence, it is always checked in a 180 case-insensitive manner. 181 - The original case is stored in a separate dictionary, so that the original case can be 182 retrieved when needed. 183 - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal 184 Python dictionary. 185 - In addition to the properties of the stream's records, the dictionary also stores the Airbyte 186 metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`. 187 188 This behavior mirrors how a case-aware, case-insensitive SQL database would handle column 189 references. 190 191 There are two ways this class can store keys internally: 192 - If normalize_keys is True, the keys are normalized using the given normalizer. 193 - If normalize_keys is False, the original case of the keys is stored. 194 195 In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the 196 dictionary will be initialized with the given keys. If a key is not found in the input data, it 197 will be initialized with a value of None. When provided, the 'expected_keys' input will also 198 determine the original case of the keys. 199 """ 200 201 def __init__( 202 self, 203 from_dict: dict, 204 *, 205 stream_record_handler: StreamRecordHandler, 206 with_internal_columns: bool = True, 207 extracted_at: datetime | None = None, 208 ) -> None: 209 """Initialize the dictionary with the given data. 210 211 Args: 212 from_dict: The dictionary to initialize the StreamRecord with. 213 stream_record_handler: The StreamRecordHandler to use for processing the record. 214 with_internal_columns: If `True`, the internal columns will be added to the record. 215 extracted_at: The time the record was extracted. If not provided, the current time will 216 be used. 217 """ 218 self._stream_handler: StreamRecordHandler = stream_record_handler 219 220 # Start by initializing all values to None 221 self.update(dict.fromkeys(stream_record_handler.index_keys)) 222 223 # Update the dictionary with the given data 224 if self._stream_handler.prune_extra_fields: 225 self.update( 226 { 227 self._stream_handler.to_index_case(k): v 228 for k, v in from_dict.items() 229 if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys 230 } 231 ) 232 else: 233 self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()}) 234 235 if with_internal_columns: 236 self.update( 237 { 238 AB_RAW_ID_COLUMN: uuid7str(), 239 AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(timezone.utc), 240 AB_META_COLUMN: {}, 241 } 242 ) 243 244 @classmethod 245 def from_record_message( 246 cls, 247 record_message: AirbyteRecordMessage, 248 *, 249 stream_record_handler: StreamRecordHandler, 250 ) -> StreamRecord: 251 """Return a StreamRecord from a RecordMessage.""" 252 data_dict: dict[str, Any] = record_message.data.copy() 253 return cls( 254 from_dict=data_dict, 255 stream_record_handler=stream_record_handler, 256 with_internal_columns=True, 257 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc), 258 ) 259 260 def __getitem__(self, key: str) -> Any: # noqa: ANN401 261 """Return the item with the given key.""" 262 try: 263 return super().__getitem__(key) 264 except KeyError: 265 return super().__getitem__(self._stream_handler.to_index_case(key)) 266 267 def __setitem__(self, key: str, value: Any) -> None: # noqa: ANN401 268 """Set the item with the given key to the given value.""" 269 index_case_key = self._stream_handler.to_index_case(key) 270 if ( 271 self._stream_handler.prune_extra_fields 272 and index_case_key not in self._stream_handler.index_keys 273 ): 274 return 275 276 super().__setitem__(index_case_key, value) 277 278 def __delitem__(self, key: str) -> None: 279 """Delete the item with the given key.""" 280 try: 281 super().__delitem__(key) 282 except KeyError: 283 index_case_key = self._stream_handler.to_index_case(key) 284 if super().__contains__(index_case_key): 285 super().__delitem__(index_case_key) 286 return 287 else: 288 # No failure. Key was deleted. 289 return 290 291 raise KeyError(key) 292 293 def __contains__(self, key: object) -> bool: 294 """Return whether the dictionary contains the given key.""" 295 assert isinstance(key, str), "Key must be a string." 296 return super().__contains__(key) or super().__contains__( 297 self._stream_handler.to_index_case(key) 298 ) 299 300 def __iter__(self) -> Iterator[str]: 301 """Return an iterator over the keys of the dictionary.""" 302 return iter(super().__iter__()) 303 304 def __len__(self) -> int: 305 """Return the number of items in the dictionary.""" 306 return super().__len__() 307 308 def __eq__(self, other: object) -> bool: 309 """Return whether the StreamRecord is equal to the given dict or StreamRecord object.""" 310 if isinstance(other, StreamRecord): 311 return dict(self) == dict(other) 312 313 if isinstance(other, dict): 314 return {k.lower(): v for k, v in self.items()} == { 315 k.lower(): v for k, v in other.items() 316 } 317 return False 318 319 def __hash__(self) -> int: # type: ignore [override] # Doesn't match superclass (dict) 320 """Return the hash of the dictionary with keys sorted.""" 321 items = [(k, v) for k, v in self.items() if not isinstance(v, dict)] 322 return hash(tuple(sorted(items)))
The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
It has these behaviors:
- When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
- The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
- Because it is subclassed from
dict, theStreamRecordclass can be passed as a normal Python dictionary. - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
metadata columns:
_airbyte_raw_id,_airbyte_extracted_at, and_airbyte_meta.
This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.
There are two ways this class can store keys internally:
- If normalize_keys is True, the keys are normalized using the given normalizer.
- If normalize_keys is False, the original case of the keys is stored.
In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.
244 @classmethod 245 def from_record_message( 246 cls, 247 record_message: AirbyteRecordMessage, 248 *, 249 stream_record_handler: StreamRecordHandler, 250 ) -> StreamRecord: 251 """Return a StreamRecord from a RecordMessage.""" 252 data_dict: dict[str, Any] = record_message.data.copy() 253 return cls( 254 from_dict=data_dict, 255 stream_record_handler=stream_record_handler, 256 with_internal_columns=True, 257 extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc), 258 )
Return a StreamRecord from a RecordMessage.
104class WriteResult: 105 """The result of a write operation. 106 107 This class is used to return information about the write operation, such as the number of 108 records written. It should not be created directly, but instead returned by the write method 109 of a destination. 110 """ 111 112 def __init__( 113 self, 114 *, 115 destination: AirbyteWriterInterface | Destination, 116 source_data: Source | ReadResult, 117 catalog_provider: CatalogProvider, 118 state_writer: StateWriterBase, 119 progress_tracker: ProgressTracker, 120 ) -> None: 121 """Initialize a write result. 122 123 This class should not be created directly. Instead, it should be returned by the `write` 124 method of the `Destination` class. 125 """ 126 self._destination: AirbyteWriterInterface | Destination = destination 127 self._source_data: Source | ReadResult = source_data 128 self._catalog_provider: CatalogProvider = catalog_provider 129 self._state_writer: StateWriterBase = state_writer 130 self._progress_tracker: ProgressTracker = progress_tracker 131 132 @property 133 def processed_records(self) -> int: 134 """The total number of records written to the destination.""" 135 return self._progress_tracker.total_destination_records_delivered 136 137 def get_state_provider(self) -> StateProviderBase: 138 """Return the state writer as a state provider. 139 140 As a public interface, we only expose the state writer as a state provider. This is because 141 the state writer itself is only intended for internal use. As a state provider, the state 142 writer can be used to read the state artifacts that were written. This can be useful for 143 testing or debugging. 144 """ 145 return self._state_writer
The result of a write operation.
This class is used to return information about the write operation, such as the number of records written. It should not be created directly, but instead returned by the write method of a destination.
112 def __init__( 113 self, 114 *, 115 destination: AirbyteWriterInterface | Destination, 116 source_data: Source | ReadResult, 117 catalog_provider: CatalogProvider, 118 state_writer: StateWriterBase, 119 progress_tracker: ProgressTracker, 120 ) -> None: 121 """Initialize a write result. 122 123 This class should not be created directly. Instead, it should be returned by the `write` 124 method of the `Destination` class. 125 """ 126 self._destination: AirbyteWriterInterface | Destination = destination 127 self._source_data: Source | ReadResult = source_data 128 self._catalog_provider: CatalogProvider = catalog_provider 129 self._state_writer: StateWriterBase = state_writer 130 self._progress_tracker: ProgressTracker = progress_tracker
Initialize a write result.
This class should not be created directly. Instead, it should be returned by the write
method of the Destination class.
132 @property 133 def processed_records(self) -> int: 134 """The total number of records written to the destination.""" 135 return self._progress_tracker.total_destination_records_delivered
The total number of records written to the destination.
137 def get_state_provider(self) -> StateProviderBase: 138 """Return the state writer as a state provider. 139 140 As a public interface, we only expose the state writer as a state provider. This is because 141 the state writer itself is only intended for internal use. As a state provider, the state 142 writer can be used to read the state artifacts that were written. This can be useful for 143 testing or debugging. 144 """ 145 return self._state_writer
Return the state writer as a state provider.
As a public interface, we only expose the state writer as a state provider. This is because the state writer itself is only intended for internal use. As a state provider, the state writer can be used to read the state artifacts that were written. This can be useful for testing or debugging.