airbyte

PyAirbyte brings the power of Airbyte to every Python developer.

PyPI version PyPI - Downloads PyPI - Python Version Star on GitHub

Getting Started

Reading Data

You can connect to any of hundreds of sources using the get_source method. You can then read data from sources using Source.read method.

from airbyte import get_source

source = get_source(
    "source-faker",
    config={},
)
read_result = source.read()

for record in read_result["users"]airbyte.records:
    print(record)

For more information, see the airbyte.sources module.

Writing to SQL Caches

Data can be written to caches using a number of SQL-based cache implementations, including Postgres, BigQuery, Snowflake, DuckDB, and MotherDuck. If you do not specify a cache, PyAirbyte will automatically use a local DuckDB cache by default.

For more information, see the airbyte.caches module.

Writing to Destination Connectors

Data can be written to destinations using the Destination.write method. You can connect to destinations using the get_destination method. PyAirbyte supports all Airbyte destinations, but Docker is required on your machine in order to run Java-based destinations.

Note: When loading to a SQL database, we recommend using SQL cache (where available, see above) instead of a destination connector. This is because SQL caches are Python-native and therefor more portable when run from different Python-based environments which might not have Docker container support. Destinations in PyAirbyte are uniquely suited for loading to non-SQL platforms such as vector stores and other reverse ETL-type use cases.

For more information, see the airbyte.destinations module and the full list of destination connectors here.

PyAirbyte API

Importing as ab

Most examples in the PyAirbyte documentation use the import airbyte as ab convention. The ab alias is recommended, making code more concise and readable. When getting started, this also saves you from digging in submodules to find the classes and functions you need, since frequently-used classes and functions are available at the top level of the airbyte module.

While many PyAirbyte classes and functions are available at the top level of the airbyte module, you can also import classes and functions from submodules directly. For example, while you can import the Source class from airbyte, you can also import it from the sources submodule like this:

from airbyte.sources import Source

Whether you import from the top level or from a submodule, the classes and functions are the same. We expect that most users will import from the top level when getting started, and then import from submodules when they are deploying more complex implementations.

For quick reference, top-Level modules are listed in the left sidebar of this page.

Other Resources


API Reference

Below is a list of all classes, functions, and modules available in the top-level airbyte module. (This is a long list!) If you are just starting out, we recommend beginning by selecting a submodule to navigate to from the left sidebar or from the list below:

Each module has its own documentation and code samples related to effectively using the related capabilities.


  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""***PyAirbyte brings the power of Airbyte to every Python developer.***
  3
  4[![PyPI version](https://badge.fury.io/py/airbyte.svg)](https://badge.fury.io/py/airbyte)
  5[![PyPI - Downloads](https://img.shields.io/pypi/dm/airbyte)](https://pypi.org/project/airbyte/)
  6[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/airbyte)](https://pypi.org/project/airbyte/)
  7[![Star on GitHub](https://img.shields.io/github/stars/airbytehq/pyairbyte.svg?style=social&label=★%20on%20GitHub)](https://github.com/airbytehq/pyairbyte)
  8
  9# Getting Started
 10
 11## Reading Data
 12
 13You can connect to any of [hundreds of sources](https://docs.airbyte.com/integrations/sources/)
 14using the `get_source` method. You can then read data from sources using `Source.read` method.
 15
 16```python
 17from airbyte import get_source
 18
 19source = get_source(
 20    "source-faker",
 21    config={},
 22)
 23read_result = source.read()
 24
 25for record in read_result["users"].records:
 26    print(record)
 27```
 28
 29For more information, see the `airbyte.sources` module.
 30
 31## Writing to SQL Caches
 32
 33Data can be written to caches using a number of SQL-based cache implementations, including
 34Postgres, BigQuery, Snowflake, DuckDB, and MotherDuck. If you do not specify a cache, PyAirbyte
 35will automatically use a local DuckDB cache by default.
 36
 37For more information, see the `airbyte.caches` module.
 38
 39## Writing to Destination Connectors
 40
 41Data can be written to destinations using the `Destination.write` method. You can connect to
 42destinations using the `get_destination` method. PyAirbyte supports all Airbyte destinations, but
 43Docker is required on your machine in order to run Java-based destinations.
 44
 45**Note:** When loading to a SQL database, we recommend using SQL cache (where available,
 46[see above](#writing-to-sql-caches)) instead of a destination connector. This is because SQL caches
 47are Python-native and therefor more portable when run from different Python-based environments which
 48might not have Docker container support. Destinations in PyAirbyte are uniquely suited for loading
 49to non-SQL platforms such as vector stores and other reverse ETL-type use cases.
 50
 51For more information, see the `airbyte.destinations` module and the full list of destination
 52connectors [here](https://docs.airbyte.com/integrations/destinations/).
 53
 54# PyAirbyte API
 55
 56## Importing as `ab`
 57
 58Most examples in the PyAirbyte documentation use the `import airbyte as ab` convention. The `ab`
 59alias is recommended, making code more concise and readable. When getting started, this
 60also saves you from digging in submodules to find the classes and functions you need, since
 61frequently-used classes and functions are available at the top level of the `airbyte` module.
 62
 63## Navigating the API
 64
 65While many PyAirbyte classes and functions are available at the top level of the `airbyte` module,
 66you can also import classes and functions from submodules directly. For example, while you can
 67import the `Source` class from `airbyte`, you can also import it from the `sources` submodule like
 68this:
 69
 70```python
 71from airbyte.sources import Source
 72```
 73
 74Whether you import from the top level or from a submodule, the classes and functions are the same.
 75We expect that most users will import from the top level when getting started, and then import from
 76submodules when they are deploying more complex implementations.
 77
 78For quick reference, top-Level modules are listed in the left sidebar of this page.
 79
 80# Other Resources
 81
 82- [PyAirbyte GitHub Readme](https://github.com/airbytehq/pyairbyte)
 83- [PyAirbyte Issue Tracker](https://github.com/airbytehq/pyairbyte/issues)
 84- [Frequently Asked Questions](https://github.com/airbytehq/PyAirbyte/blob/main/docs/faq.md)
 85- [PyAirbyte Contributors Guide](https://github.com/airbytehq/PyAirbyte/blob/main/docs/CONTRIBUTING.md)
 86- [GitHub Releases](https://github.com/airbytehq/PyAirbyte/releases)
 87
 88----------------------
 89
 90# API Reference
 91
 92Below is a list of all classes, functions, and modules available in the top-level `airbyte`
 93module. (This is a long list!) If you are just starting out, we recommend beginning by selecting a
 94submodule to navigate to from the left sidebar or from the list below:
 95
 96Each module
 97has its own documentation and code samples related to effectively using the related capabilities.
 98
 99- **`airbyte.cloud`** - Working with Airbyte Cloud, including running jobs remotely.
100- **`airbyte.caches`** - Working with caches, including how to inspect a cache and get data from it.
101- **`airbyte.datasets`** - Working with datasets, including how to read from datasets and convert to
102    other formats, such as Pandas, Arrow, and LLM Document formats.
103- **`airbyte.destinations`** - Working with destinations, including how to write to Airbyte
104    destinations connectors.
105- **`airbyte.documents`** - Working with LLM documents, including how to convert records into
106    document formats, for instance, when working with AI libraries like LangChain.
107- **`airbyte.exceptions`** - Definitions of all exception and warning classes used in PyAirbyte.
108- **`airbyte.experimental`** - Experimental features and utilities that do not yet have a stable
109    API.
110- **`airbyte.logs`** - Logging functionality and configuration.
111- **`airbyte.records`** - Internal record handling classes.
112- **`airbyte.results`** - Documents the classes returned when working with results from
113    `Source.read` and `Destination.write`
114- **`airbyte.secrets`** - Tools for managing secrets in PyAirbyte.
115- **`airbyte.sources`** - Tools for creating and reading from Airbyte sources. This includes
116    `airbyte.source.get_source` to declare a source, `airbyte.source.Source.read` for reading data,
117    and `airbyte.source.Source.get_records()` to peek at records without caching or writing them
118    directly.
119
120----------------------
121
122"""  # noqa: D415
123
124from __future__ import annotations
125
126from typing import TYPE_CHECKING
127
128from airbyte.caches.bigquery import BigQueryCache
129from airbyte.caches.duckdb import DuckDBCache
130from airbyte.caches.util import get_colab_cache, get_default_cache, new_local_cache
131from airbyte.datasets import CachedDataset
132from airbyte.destinations.base import Destination
133from airbyte.destinations.util import get_destination
134from airbyte.records import StreamRecord
135from airbyte.results import ReadResult, WriteResult
136from airbyte.secrets import SecretSourceEnum, get_secret
137from airbyte.sources import registry
138from airbyte.sources.base import Source
139from airbyte.sources.registry import get_available_connectors
140from airbyte.sources.util import get_source
141
142
143# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
144if TYPE_CHECKING:
145    # ruff: noqa: TC004  # imports used for more than type checking
146    from airbyte import (
147        caches,
148        callbacks,
149        cli,
150        cloud,
151        constants,
152        datasets,
153        destinations,
154        documents,
155        exceptions,  # noqa: ICN001  # No 'exc' alias for top-level module
156        experimental,
157        logs,
158        mcp,
159        records,
160        results,
161        secrets,
162        sources,
163    )
164
165
166__all__ = [
167    # Modules
168    "caches",
169    "callbacks",
170    "cli",
171    "cloud",
172    "constants",
173    "datasets",
174    "destinations",
175    "documents",
176    "exceptions",
177    "experimental",
178    "logs",
179    "mcp",
180    "records",
181    "registry",
182    "results",
183    "secrets",
184    "sources",
185    # Factories
186    "get_available_connectors",
187    "get_colab_cache",
188    "get_default_cache",
189    "get_destination",
190    "get_secret",
191    "get_source",
192    "new_local_cache",
193    # Classes
194    "BigQueryCache",
195    "CachedDataset",
196    "Destination",
197    "DuckDBCache",
198    "ReadResult",
199    "SecretSourceEnum",
200    "Source",
201    "StreamRecord",
202    "WriteResult",
203]
204
205__docformat__ = "google"
def get_available_connectors( install_type: airbyte.sources.registry.InstallType | str | None = None) -> list[str]:
222def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
223    """Return a list of all available connectors.
224
225    Connectors will be returned in alphabetical order, with the standard prefix "source-".
226    """
227    if install_type is None:
228        # No install type specified. Filter for whatever is runnable.
229        if is_docker_installed():
230            logger.info("Docker is detected. Returning all connectors.")
231            # If Docker is available, return all connectors.
232            return sorted(conn.name for conn in _get_registry_cache().values())
233
234        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
235
236        # If Docker is not available, return only Python and Manifest-based connectors.
237        return sorted(
238            conn.name
239            for conn in _get_registry_cache().values()
240            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
241        )
242
243    if not isinstance(install_type, InstallType):
244        install_type = InstallType(install_type)
245
246    if install_type == InstallType.PYTHON:
247        return sorted(
248            conn.name
249            for conn in _get_registry_cache().values()
250            if conn.pypi_package_name is not None
251        )
252
253    if install_type == InstallType.JAVA:
254        warnings.warn(
255            message="Java connectors are not yet supported.",
256            stacklevel=2,
257        )
258        return sorted(
259            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
260        )
261
262    if install_type == InstallType.DOCKER:
263        return sorted(conn.name for conn in _get_registry_cache().values())
264
265    if install_type == InstallType.YAML:
266        return sorted(
267            conn.name
268            for conn in _get_registry_cache().values()
269            if InstallType.YAML in conn.install_types
270        )
271
272    # pragma: no cover  # Should never be reached.
273    raise exc.PyAirbyteInputError(
274        message="Invalid install type.",
275        context={
276            "install_type": install_type,
277        },
278    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

def get_colab_cache( cache_name: str = 'default_cache', sub_dir: str = 'Airbyte/cache', schema_name: str = 'main', table_prefix: str | None = '', drive_name: str = 'MyDrive', mount_path: str = '/content/drive') -> DuckDBCache:
 81def get_colab_cache(
 82    cache_name: str = "default_cache",
 83    sub_dir: str = "Airbyte/cache",
 84    schema_name: str = "main",
 85    table_prefix: str | None = "",
 86    drive_name: str = _MY_DRIVE,
 87    mount_path: str = _GOOGLE_DRIVE_DEFAULT_MOUNT_PATH,
 88) -> DuckDBCache:
 89    """Get a local cache for storing data, using the default database path.
 90
 91    Unlike the default `DuckDBCache`, this implementation will easily persist data across multiple
 92    Colab sessions.
 93
 94    Please note that Google Colab may prompt you to authenticate with your Google account to access
 95    your Google Drive. When prompted, click the link and follow the instructions.
 96
 97    Colab will require access to read and write files in your Google Drive, so please be sure to
 98    grant the necessary permissions when prompted.
 99
100    All arguments are optional and have default values that are suitable for most use cases.
101
102    Args:
103        cache_name: The name to use for the cache. Defaults to "colab_cache". Override this if you
104            want to use a different database for different projects.
105        sub_dir: The subdirectory to store the cache in. Defaults to "Airbyte/cache". Override this
106            if you want to store the cache in a different subdirectory than the default.
107        schema_name: The name of the schema to write to. Defaults to "main". Override this if you
108            want to write to a different schema.
109        table_prefix: The prefix to use for all tables in the cache. Defaults to "". Override this
110            if you want to use a different prefix for all tables.
111        drive_name: The name of the Google Drive to use. Defaults to "MyDrive". Override this if you
112            want to store data in a shared drive instead of your personal drive.
113        mount_path: The path to mount Google Drive to. Defaults to "/content/drive". Override this
114            if you want to mount Google Drive to a different path (not recommended).
115
116    ## Usage Examples
117
118    The default `get_colab_cache` arguments are suitable for most use cases:
119
120    ```python
121    from airbyte.caches.colab import get_colab_cache
122
123    colab_cache = get_colab_cache()
124    ```
125
126    Or you can call `get_colab_cache` with custom arguments:
127
128    ```python
129    custom_cache = get_colab_cache(
130        cache_name="my_custom_cache",
131        sub_dir="Airbyte/custom_cache",
132        drive_name="My Company Drive",
133    )
134    ```
135    """
136    try:
137        from google.colab import drive  # noqa: PLC0415 # type: ignore[reportMissingImports]
138    except ImportError:
139        drive = None
140        msg = (
141            "The `google.colab` interface is only available in Google Colab. "
142            "Please run this code in a Google Colab notebook."
143        )
144        raise ImportError(msg) from None
145
146    drive.mount(mount_path)
147    drive_root = (
148        Path(mount_path) / drive_name
149        if drive_name == _MY_DRIVE
150        else Path(mount_path) / "Shareddrives" / drive_name
151    )
152
153    cache_dir = drive_root / sub_dir
154    cache_dir.mkdir(parents=True, exist_ok=True)
155    db_file_path = cache_dir / f"{cache_name}.duckdb"
156
157    print(f"Using persistent PyAirbyte cache in Google Drive: `{db_file_path}`.")
158    return DuckDBCache(
159        db_path=db_file_path,
160        cache_dir=cache_dir,
161        schema_name=schema_name,
162        table_prefix=table_prefix,
163    )

Get a local cache for storing data, using the default database path.

Unlike the default DuckDBCache, this implementation will easily persist data across multiple Colab sessions.

Please note that Google Colab may prompt you to authenticate with your Google account to access your Google Drive. When prompted, click the link and follow the instructions.

Colab will require access to read and write files in your Google Drive, so please be sure to grant the necessary permissions when prompted.

All arguments are optional and have default values that are suitable for most use cases.

Arguments:
  • cache_name: The name to use for the cache. Defaults to "colab_cache". Override this if you want to use a different database for different projects.
  • sub_dir: The subdirectory to store the cache in. Defaults to "Airbyte/cache". Override this if you want to store the cache in a different subdirectory than the default.
  • schema_name: The name of the schema to write to. Defaults to "main". Override this if you want to write to a different schema.
  • table_prefix: The prefix to use for all tables in the cache. Defaults to "". Override this if you want to use a different prefix for all tables.
  • drive_name: The name of the Google Drive to use. Defaults to "MyDrive". Override this if you want to store data in a shared drive instead of your personal drive.
  • mount_path: The path to mount Google Drive to. Defaults to "/content/drive". Override this if you want to mount Google Drive to a different path (not recommended).

Usage Examples

The default get_colab_cache arguments are suitable for most use cases:

from airbyte.caches.colab import get_colab_cache

colab_cache = get_colab_cache()

Or you can call get_colab_cache with custom arguments:

custom_cache = get_colab_cache(
    cache_name="my_custom_cache",
    sub_dir="Airbyte/custom_cache",
    drive_name="My Company Drive",
)
def get_default_cache() -> DuckDBCache:
27def get_default_cache() -> DuckDBCache:
28    """Get a local cache for storing data, using the default database path.
29
30    Cache files are stored in the `.cache` directory, relative to the current
31    working directory.
32    """
33    cache_dir = Path("./.cache/default_cache")
34    return DuckDBCache(
35        db_path=cache_dir / "default_cache.duckdb",
36        cache_dir=cache_dir,
37    )

Get a local cache for storing data, using the default database path.

Cache files are stored in the .cache directory, relative to the current working directory.

def get_destination( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, version: str | None = None, use_python: bool | pathlib.Path | str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: str | bool | None = None, use_host_network: bool = False, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> Destination:
22def get_destination(  # noqa: PLR0913 # Too many arguments
23    name: str,
24    config: dict[str, Any] | None = None,
25    *,
26    config_change_callback: ConfigChangeCallback | None = None,
27    version: str | None = None,
28    use_python: bool | Path | str | None = None,
29    pip_url: str | None = None,
30    local_executable: Path | str | None = None,
31    docker_image: str | bool | None = None,
32    use_host_network: bool = False,
33    install_if_missing: bool = True,
34    install_root: Path | None = None,
35) -> Destination:
36    """Get a connector by name and version.
37
38    Args:
39        name: connector name
40        config: connector config - if not provided, you need to set it later via the set_config
41            method.
42        config_change_callback: callback function to be called when the connector config changes.
43        streams: list of stream names to select for reading. If set to "*", all streams will be
44            selected. If not provided, you can set it later via the `select_streams()` or
45            `select_all_streams()` method.
46        version: connector version - if not provided, the currently installed version will be used.
47            If no version is installed, the latest available version will be used. The version can
48            also be set to "latest" to force the use of the latest available version.
49        use_python: (Optional.) Python interpreter specification:
50            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
51            - False: Use Docker instead.
52            - Path: Use interpreter at this path.
53            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
54                installed, it will be installed by uv. (This generally adds less than 3 seconds
55                to install times.)
56        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
57            connector name.
58        local_executable: If set, the connector will be assumed to already be installed and will be
59            executed using this path or executable name. Otherwise, the connector will be installed
60            automatically in a virtual environment.
61        docker_image: If set, the connector will be executed using Docker. You can specify `True`
62            to use the default image for the connector, or you can specify a custom image name.
63            If `version` is specified and your image name does not already contain a tag
64            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
65        use_host_network: If set, along with docker_image, the connector will be executed using
66            the host network. This is useful for connectors that need to access resources on
67            the host machine, such as a local database. This parameter is ignored when
68            `docker_image` is not set.
69        install_if_missing: Whether to install the connector if it is not available locally. This
70            parameter is ignored when local_executable is set.
71        install_root: (Optional.) The root directory where the virtual environment will be
72            created. If not provided, the current working directory will be used.
73    """
74    return Destination(
75        name=name,
76        config=config,
77        config_change_callback=config_change_callback,
78        executor=get_connector_executor(
79            name=name,
80            version=version,
81            use_python=use_python,
82            pip_url=pip_url,
83            local_executable=local_executable,
84            docker_image=docker_image,
85            use_host_network=use_host_network,
86            install_if_missing=install_if_missing,
87            install_root=install_root,
88        ),
89    )

Get a connector by name and version.

Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • use_python: (Optional.) Python interpreter specification:
    • True: Use current Python interpreter. (Inferred if pip_url is set.)
    • False: Use Docker instead.
    • Path: Use interpreter at this path.
    • str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def get_secret( secret_name: str, /, *, sources: list[airbyte.secrets.SecretManager | SecretSourceEnum] | None = None, allow_prompt: bool = True, **kwargs: dict[str, typing.Any]) -> airbyte.secrets.SecretString:
 33def get_secret(
 34    secret_name: str,
 35    /,
 36    *,
 37    sources: list[SecretManager | SecretSourceEnum] | None = None,
 38    allow_prompt: bool = True,
 39    **kwargs: dict[str, Any],
 40) -> SecretString:
 41    """Get a secret from the environment.
 42
 43    The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum`
 44    options. If left blank, all available sources will be checked. If a list of `SecretSourceEnum`
 45    entries is passed, then the sources will be checked using the provided ordering.
 46
 47    If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then
 48    the user will be prompted to enter the secret if it is not found in any of the other sources.
 49    """
 50    if secret_name.startswith(SECRETS_HYDRATION_PREFIX):
 51        # If the secret name starts with the hydration prefix, we assume it's a secret reference.
 52        # We strip the prefix and get the actual secret name.
 53        secret_name = secret_name.removeprefix(SECRETS_HYDRATION_PREFIX).lstrip()
 54
 55    if "source" in kwargs:
 56        warnings.warn(
 57            message="The `source` argument is deprecated. Use the `sources` argument instead.",
 58            category=DeprecationWarning,
 59            stacklevel=2,
 60        )
 61        sources = kwargs.pop("source")  # type: ignore [assignment]
 62
 63    available_sources: dict[str, SecretManager] = {}
 64    for available_source in _get_secret_sources():
 65        # Add available sources to the dict. Order matters.
 66        available_sources[available_source.name] = available_source
 67
 68    if sources is None:
 69        # If ANY is in the list, then we don't need to check any other sources.
 70        # This is the default behavior.
 71        sources = list(available_sources.values())
 72
 73    elif not isinstance(sources, list):
 74        sources = [sources]  # type: ignore [unreachable]  # This is a 'just in case' catch.
 75
 76    # Replace any SecretSourceEnum strings with the matching SecretManager object
 77    for source in list(sources):
 78        if isinstance(source, SecretSourceEnum):
 79            if source not in available_sources:
 80                raise exc.PyAirbyteInputError(
 81                    guidance="Invalid secret source name.",
 82                    input_value=source,
 83                    context={
 84                        "Available Sources": list(available_sources.keys()),
 85                    },
 86                )
 87
 88            sources[sources.index(source)] = available_sources[source]
 89
 90    secret_managers = cast("list[SecretManager]", sources)
 91
 92    if SecretSourceEnum.PROMPT in secret_managers:
 93        prompt_source = secret_managers.pop(
 94            # Mis-typed, but okay here since we have equality logic for the enum comparison:
 95            secret_managers.index(SecretSourceEnum.PROMPT),  # type: ignore [arg-type]
 96        )
 97
 98        if allow_prompt:
 99            # Always check prompt last. Add it to the end of the list.
100            secret_managers.append(prompt_source)
101
102    for secret_mgr in secret_managers:
103        val = secret_mgr.get_secret(secret_name)
104        if val:
105            return SecretString(val)
106
107    raise exc.PyAirbyteSecretNotFoundError(
108        secret_name=secret_name,
109        sources=[str(s) for s in available_sources],
110    )

Get a secret from the environment.

The optional sources argument of enum type SecretSourceEnum or list of SecretSourceEnum options. If left blank, all available sources will be checked. If a list of SecretSourceEnum entries is passed, then the sources will be checked using the provided ordering.

If allow_prompt is True or if SecretSourceEnum.PROMPT is declared in the source arg, then the user will be prompted to enter the secret if it is not found in any of the other sources.

def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, use_python: bool | pathlib.Path | str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    use_python: bool | Path | str | None = None,
 56    pip_url: str | None = None,
 57    local_executable: Path | str | None = None,
 58    docker_image: bool | str | None = None,
 59    use_host_network: bool = False,
 60    source_manifest: bool | dict | Path | str | None = None,
 61    install_if_missing: bool = True,
 62    install_root: Path | None = None,
 63) -> Source:
 64    """Get a connector by name and version.
 65
 66    If an explicit install or execution method is requested (e.g. `local_executable`,
 67    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 68
 69    Otherwise, an appropriate method will be selected based on the available connector metadata:
 70    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 71       will be downloaded and used to to execute the connector.
 72    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 73    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 74       will be executed using Docker.
 75
 76    Args:
 77        name: connector name
 78        config: connector config - if not provided, you need to set it later via the set_config
 79            method.
 80        config_change_callback: callback function to be called when the connector config changes.
 81        streams: list of stream names to select for reading. If set to "*", all streams will be
 82            selected. If not provided, you can set it later via the `select_streams()` or
 83            `select_all_streams()` method.
 84        version: connector version - if not provided, the currently installed version will be used.
 85            If no version is installed, the latest available version will be used. The version can
 86            also be set to "latest" to force the use of the latest available version.
 87        use_python: (Optional.) Python interpreter specification:
 88            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
 89            - False: Use Docker instead.
 90            - Path: Use interpreter at this path.
 91            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
 92                installed, it will be installed by uv. (This generally adds less than 3 seconds
 93                to install times.)
 94        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 95            connector name.
 96        local_executable: If set, the connector will be assumed to already be installed and will be
 97            executed using this path or executable name. Otherwise, the connector will be installed
 98            automatically in a virtual environment.
 99        docker_image: If set, the connector will be executed using Docker. You can specify `True`
100            to use the default image for the connector, or you can specify a custom image name.
101            If `version` is specified and your image name does not already contain a tag
102            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
103        use_host_network: If set, along with docker_image, the connector will be executed using
104            the host network. This is useful for connectors that need to access resources on
105            the host machine, such as a local database. This parameter is ignored when
106            `docker_image` is not set.
107        source_manifest: If set, the connector will be executed based on a declarative YAML
108            source definition. This input can be `True` to attempt to auto-download a YAML spec,
109            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
110            the local file system, or `str` to pull the definition from a web URL.
111        install_if_missing: Whether to install the connector if it is not available locally. This
112            parameter is ignored when `local_executable` or `source_manifest` are set.
113        install_root: (Optional.) The root directory where the virtual environment will be
114            created. If not provided, the current working directory will be used.
115    """
116    return Source(
117        name=name,
118        config=config,
119        config_change_callback=config_change_callback,
120        streams=streams,
121        executor=get_connector_executor(
122            name=name,
123            version=version,
124            use_python=use_python,
125            pip_url=pip_url,
126            local_executable=local_executable,
127            docker_image=docker_image,
128            use_host_network=use_host_network,
129            source_manifest=source_manifest,
130            install_if_missing=install_if_missing,
131            install_root=install_root,
132        ),
133    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • use_python: (Optional.) Python interpreter specification:
    • True: Use current Python interpreter. (Inferred if pip_url is set.)
    • False: Use Docker instead.
    • Path: Use interpreter at this path.
    • str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def new_local_cache( cache_name: str | None = None, cache_dir: str | pathlib.Path | None = None, *, cleanup: bool = True) -> DuckDBCache:
40def new_local_cache(
41    cache_name: str | None = None,
42    cache_dir: str | Path | None = None,
43    *,
44    cleanup: bool = True,
45) -> DuckDBCache:
46    """Get a local cache for storing data, using a name string to seed the path.
47
48    Args:
49        cache_name: Name to use for the cache. Defaults to None.
50        cache_dir: Root directory to store the cache in. Defaults to None.
51        cleanup: Whether to clean up temporary files. Defaults to True.
52
53    Cache files are stored in the `.cache` directory, relative to the current
54    working directory.
55    """
56    if cache_name:
57        if " " in cache_name:
58            raise exc.PyAirbyteInputError(
59                message="Cache name cannot contain spaces.",
60                input_value=cache_name,
61            )
62
63        if not cache_name.replace("_", "").isalnum():
64            raise exc.PyAirbyteInputError(
65                message="Cache name can only contain alphanumeric characters and underscores.",
66                input_value=cache_name,
67            )
68
69    cache_name = cache_name or str(ulid.ULID())
70    cache_dir = cache_dir or Path(f"./.cache/{cache_name}")
71    if not isinstance(cache_dir, Path):
72        cache_dir = Path(cache_dir)
73
74    return DuckDBCache(
75        db_path=cache_dir / f"db_{cache_name}.duckdb",
76        cache_dir=cache_dir,
77        cleanup=cleanup,
78    )

Get a local cache for storing data, using a name string to seed the path.

Arguments:
  • cache_name: Name to use for the cache. Defaults to None.
  • cache_dir: Root directory to store the cache in. Defaults to None.
  • cleanup: Whether to clean up temporary files. Defaults to True.

Cache files are stored in the .cache directory, relative to the current working directory.

class BigQueryCache(airbyte._processors.sql.bigquery.BigQueryConfig, airbyte.caches.base.CacheBase):
39class BigQueryCache(BigQueryConfig, CacheBase):
40    """The BigQuery cache implementation."""
41
42    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = BigQuerySqlProcessor
43
44    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
45    paired_destination_config_class: ClassVar[type | None] = DestinationBigquery
46
47    @property
48    def paired_destination_config(self) -> DestinationBigquery:
49        """Return a dictionary of destination configuration values."""
50        return bigquery_cache_to_destination_configuration(cache=self)
51
52    def get_arrow_dataset(
53        self,
54        stream_name: str,
55        *,
56        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
57    ) -> NoReturn:
58        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
59
60        See: https://github.com/airbytehq/PyAirbyte/issues/165
61        """
62        raise NotImplementedError(
63            "BigQuery doesn't currently support to_arrow"
64            "Please consider using a different cache implementation for these functionalities."
65        )

The BigQuery cache implementation.

paired_destination_name: ClassVar[str | None] = 'destination-bigquery'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_bigquery.DestinationBigquery'>
paired_destination_config: airbyte_api.models.destination_bigquery.DestinationBigquery
47    @property
48    def paired_destination_config(self) -> DestinationBigquery:
49        """Return a dictionary of destination configuration values."""
50        return bigquery_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

def get_arrow_dataset(self, stream_name: str, *, max_chunk_size: int = 100000) -> NoReturn:
52    def get_arrow_dataset(
53        self,
54        stream_name: str,
55        *,
56        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
57    ) -> NoReturn:
58        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
59
60        See: https://github.com/airbytehq/PyAirbyte/issues/165
61        """
62        raise NotImplementedError(
63            "BigQuery doesn't currently support to_arrow"
64            "Please consider using a different cache implementation for these functionalities."
65        )

Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table.

See: https://github.com/airbytehq/PyAirbyte/issues/165

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
run_sql_query
get_record_processor
get_records
get_pandas_dataframe
streams
get_state_provider
get_state_writer
register_source
create_source_tables
airbyte._processors.sql.bigquery.BigQueryConfig
database_name
schema_name
credentials_path
dataset_location
project_name
dataset_name
get_sql_alchemy_url
get_database_name
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_sql_alchemy_connect_args
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class CachedDataset(airbyte.datasets._sql.SQLDataset):
150class CachedDataset(SQLDataset):
151    """A dataset backed by a SQL table cache.
152
153    Because this dataset includes all records from the underlying table, we also expose the
154    underlying table as a SQLAlchemy Table object.
155    """
156
157    def __init__(
158        self,
159        cache: CacheBase,
160        stream_name: str,
161        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
162    ) -> None:
163        """We construct the query statement by selecting all columns from the table.
164
165        This prevents the need to scan the table schema to construct the query statement.
166
167        If stream_configuration is None, we attempt to retrieve the stream configuration from the
168        cache processor. This is useful when constructing a dataset from a CachedDataset object,
169        which already has the stream configuration.
170
171        If stream_configuration is set to False, we skip the stream configuration retrieval.
172        """
173        table_name = cache.processor.get_sql_table_name(stream_name)
174        schema_name = cache.schema_name
175        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
176        super().__init__(
177            cache=cache,
178            stream_name=stream_name,
179            query_statement=query,
180            stream_configuration=stream_configuration,
181        )
182
183    @overrides
184    def to_pandas(self) -> DataFrame:
185        """Return the underlying dataset data as a pandas DataFrame."""
186        return self._cache.get_pandas_dataframe(self._stream_name)
187
188    @overrides
189    def to_arrow(
190        self,
191        *,
192        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
193    ) -> Dataset:
194        """Return an Arrow Dataset containing the data from the specified stream.
195
196        Args:
197            stream_name (str): Name of the stream to retrieve data from.
198            max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
199
200        Returns:
201            pa.dataset.Dataset: Arrow Dataset containing the stream's data.
202        """
203        return self._cache.get_arrow_dataset(
204            stream_name=self._stream_name,
205            max_chunk_size=max_chunk_size,
206        )
207
208    def to_sql_table(self) -> Table:
209        """Return the underlying SQL table as a SQLAlchemy Table object."""
210        return self._cache.processor.get_sql_table(self.stream_name)
211
212    def __eq__(self, value: object) -> bool:
213        """Return True if the value is a CachedDataset with the same cache and stream name.
214
215        In the case of CachedDataset objects, we can simply compare the cache and stream name.
216
217        Note that this equality check is only supported on CachedDataset objects and not for
218        the base SQLDataset implementation. This is because of the complexity and computational
219        cost of comparing two arbitrary SQL queries that could be bound to different variables,
220        as well as the chance that two queries can be syntactically equivalent without being
221        text-wise equivalent.
222        """
223        if not isinstance(value, SQLDataset):
224            return False
225
226        if self._cache is not value._cache:
227            return False
228
229        return not self._stream_name != value._stream_name
230
231    def __hash__(self) -> int:
232        return hash(self._stream_name)

A dataset backed by a SQL table cache.

Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.

CachedDataset( cache: airbyte.caches.CacheBase, stream_name: str, stream_configuration: Union[airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream, Literal[False], NoneType] = None)
157    def __init__(
158        self,
159        cache: CacheBase,
160        stream_name: str,
161        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
162    ) -> None:
163        """We construct the query statement by selecting all columns from the table.
164
165        This prevents the need to scan the table schema to construct the query statement.
166
167        If stream_configuration is None, we attempt to retrieve the stream configuration from the
168        cache processor. This is useful when constructing a dataset from a CachedDataset object,
169        which already has the stream configuration.
170
171        If stream_configuration is set to False, we skip the stream configuration retrieval.
172        """
173        table_name = cache.processor.get_sql_table_name(stream_name)
174        schema_name = cache.schema_name
175        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
176        super().__init__(
177            cache=cache,
178            stream_name=stream_name,
179            query_statement=query,
180            stream_configuration=stream_configuration,
181        )

We construct the query statement by selecting all columns from the table.

This prevents the need to scan the table schema to construct the query statement.

If stream_configuration is None, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.

If stream_configuration is set to False, we skip the stream configuration retrieval.

@overrides
def to_pandas(self) -> pandas.core.frame.DataFrame:
183    @overrides
184    def to_pandas(self) -> DataFrame:
185        """Return the underlying dataset data as a pandas DataFrame."""
186        return self._cache.get_pandas_dataframe(self._stream_name)

Return the underlying dataset data as a pandas DataFrame.

@overrides
def to_arrow(self, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
188    @overrides
189    def to_arrow(
190        self,
191        *,
192        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
193    ) -> Dataset:
194        """Return an Arrow Dataset containing the data from the specified stream.
195
196        Args:
197            stream_name (str): Name of the stream to retrieve data from.
198            max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
199
200        Returns:
201            pa.dataset.Dataset: Arrow Dataset containing the stream's data.
202        """
203        return self._cache.get_arrow_dataset(
204            stream_name=self._stream_name,
205            max_chunk_size=max_chunk_size,
206        )

Return an Arrow Dataset containing the data from the specified stream.

Arguments:
  • stream_name (str): Name of the stream to retrieve data from.
  • max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
Returns:

pa.dataset.Dataset: Arrow Dataset containing the stream's data.

def to_sql_table(self) -> sqlalchemy.sql.schema.Table:
208    def to_sql_table(self) -> Table:
209        """Return the underlying SQL table as a SQLAlchemy Table object."""
210        return self._cache.processor.get_sql_table(self.stream_name)

Return the underlying SQL table as a SQLAlchemy Table object.

Inherited Members
airbyte.datasets._sql.SQLDataset
stream_name
with_filter
column_names
airbyte.datasets._base.DatasetBase
to_documents
class Destination(airbyte._connector_base.ConnectorBase, airbyte._writers.base.AirbyteWriterInterface):
 39class Destination(ConnectorBase, AirbyteWriterInterface):
 40    """A class representing a destination that can be called."""
 41
 42    connector_type = "destination"
 43
 44    def __init__(
 45        self,
 46        executor: Executor,
 47        name: str,
 48        config: dict[str, Any] | None = None,
 49        *,
 50        config_change_callback: ConfigChangeCallback | None = None,
 51        validate: bool = False,
 52    ) -> None:
 53        """Initialize the source.
 54
 55        If config is provided, it will be validated against the spec if validate is True.
 56        """
 57        super().__init__(
 58            executor=executor,
 59            name=name,
 60            config=config,
 61            config_change_callback=config_change_callback,
 62            validate=validate,
 63        )
 64
 65    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 66        self,
 67        source_data: Source | ReadResult,
 68        *,
 69        streams: list[str] | Literal["*"] | None = None,
 70        cache: CacheBase | Literal[False] | None = None,
 71        state_cache: CacheBase | Literal[False] | None = None,
 72        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 73        force_full_refresh: bool = False,
 74    ) -> WriteResult:
 75        """Write data from source connector or already cached source data.
 76
 77        Caching is enabled by default, unless explicitly disabled.
 78
 79        Args:
 80            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 81            streams: The streams to write to the destination. If omitted or if "*" is provided,
 82                all streams will be written. If `source_data` is a source, then streams must be
 83                selected here or on the source. If both are specified, this setting will override
 84                the stream selection on the source.
 85            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 86                False, the cache will be disabled. This must be `None` if `source_data` is already
 87                a `Cache` object.
 88            state_cache: A cache to use for storing incremental state. You do not need to set this
 89                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 90                disable state management.
 91            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 92                will decide the best strategy to use.
 93            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 94                existing state will be ignored and all source data will be reloaded.
 95
 96        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
 97        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
 98        a known destination state, the destination-specific state will be used. If neither are
 99        available, a full refresh will be performed.
100        """
101        if not isinstance(source_data, ReadResult | Source):
102            raise exc.PyAirbyteInputError(
103                message="Invalid source_data type for `source_data` arg.",
104                context={
105                    "source_data_type_provided": type(source_data).__name__,
106                },
107            )
108
109        # Resolve `source`, `read_result`, and `source_name`
110        source: Source | None = source_data if isinstance(source_data, Source) else None
111        read_result: ReadResult | None = (
112            source_data if isinstance(source_data, ReadResult) else None
113        )
114        source_name: str = source.name if source else cast("ReadResult", read_result).source_name
115
116        # State providers and writers default to no-op, unless overridden below.
117        cache_state_provider: StateProviderBase = StaticInputState([])
118        """Provides the state of the cache's data."""
119        cache_state_writer: StateWriterBase = NoOpStateWriter()
120        """Writes updates for the state of the cache's data."""
121        destination_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the destination's data, from `cache` or `state_cache`."""
123        destination_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
125
126        # If caching not explicitly disabled
127        if cache is not False:
128            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
129            if isinstance(source_data, ReadResult):
130                cache = source_data.cache
131
132            cache = cache or get_default_cache()
133            cache_state_provider = cache.get_state_provider(
134                source_name=source_name,
135                destination_name=None,  # This will just track the cache state
136            )
137            cache_state_writer = cache.get_state_writer(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141
142        # Resolve `state_cache`
143        if state_cache is None:
144            state_cache = cache or get_default_cache()
145
146        # Resolve `destination_state_writer` and `destination_state_provider`
147        if state_cache:
148            destination_state_writer = state_cache.get_state_writer(
149                source_name=source_name,
150                destination_name=self.name,
151            )
152            if not force_full_refresh:
153                destination_state_provider = state_cache.get_state_provider(
154                    source_name=source_name,
155                    destination_name=self.name,
156                )
157        elif state_cache is not False:
158            warnings.warn(
159                "No state backend or cache provided. State will not be tracked."
160                "To track state, provide a cache or state backend."
161                "To silence this warning, set `state_cache=False` explicitly.",
162                category=exc.PyAirbyteWarning,
163                stacklevel=2,
164            )
165
166        # Resolve `catalog_provider`
167        if source:
168            catalog_provider = CatalogProvider(
169                configured_catalog=source.get_configured_catalog(
170                    streams=streams,
171                )
172            )
173        elif read_result:
174            catalog_provider = CatalogProvider.from_read_result(read_result)
175        else:
176            raise exc.PyAirbyteInternalError(
177                message="`source_data` must be a `Source` or `ReadResult` object.",
178            )
179
180        progress_tracker = ProgressTracker(
181            source=source if isinstance(source_data, Source) else None,
182            cache=cache or None,
183            destination=self,
184            expected_streams=catalog_provider.stream_names,
185        )
186
187        source_state_provider: StateProviderBase
188        source_state_provider = JoinedStateProvider(
189            primary=cache_state_provider,
190            secondary=destination_state_provider,
191        )
192
193        if source:
194            if cache is False:
195                # Get message iterator for source (caching disabled)
196                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
197                    streams=streams,
198                    state_provider=source_state_provider,
199                    progress_tracker=progress_tracker,
200                    force_full_refresh=force_full_refresh,
201                )
202            else:
203                # Caching enabled and we are reading from a source.
204                # Read the data to cache if caching is enabled.
205                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
206                    cache=cache,
207                    state_provider=source_state_provider,
208                    state_writer=cache_state_writer,
209                    catalog_provider=catalog_provider,
210                    stream_names=catalog_provider.stream_names,
211                    write_strategy=write_strategy,
212                    force_full_refresh=force_full_refresh,
213                    skip_validation=False,
214                    progress_tracker=progress_tracker,
215                )
216                message_iterator = AirbyteMessageIterator.from_read_result(
217                    read_result=read_result,
218                )
219        else:  # Else we are reading from a read result
220            assert read_result is not None
221            message_iterator = AirbyteMessageIterator.from_read_result(
222                read_result=read_result,
223            )
224
225        # Write the data to the destination
226        try:
227            self._write_airbyte_message_stream(
228                stdin=message_iterator,
229                catalog_provider=catalog_provider,
230                write_strategy=write_strategy,
231                state_writer=destination_state_writer,
232                progress_tracker=progress_tracker,
233            )
234        except Exception as ex:
235            progress_tracker.log_failure(exception=ex)
236            raise
237        else:
238            # No exceptions were raised, so log success
239            progress_tracker.log_success()
240
241        return WriteResult(
242            destination=self,
243            source_data=source_data,
244            catalog_provider=catalog_provider,
245            state_writer=destination_state_writer,
246            progress_tracker=progress_tracker,
247        )
248
249    def _write_airbyte_message_stream(
250        self,
251        stdin: IO[str] | AirbyteMessageIterator,
252        *,
253        catalog_provider: CatalogProvider,
254        write_strategy: WriteStrategy,
255        state_writer: StateWriterBase | None = None,
256        progress_tracker: ProgressTracker,
257    ) -> None:
258        """Read from the connector and write to the cache."""
259        # Run optional validation step
260        if state_writer is None:
261            state_writer = StdOutStateWriter()
262
263        # Apply the write strategy to the catalog provider before sending to the destination
264        catalog_provider = catalog_provider.with_write_strategy(write_strategy)
265
266        with as_temp_files(
267            files_contents=[
268                self._hydrated_config,
269                catalog_provider.configured_catalog.model_dump_json(),
270            ]
271        ) as [
272            config_file,
273            catalog_file,
274        ]:
275            try:
276                # We call the connector to write the data, tallying the inputs and outputs
277                for destination_message in progress_tracker.tally_confirmed_writes(
278                    messages=self._execute(
279                        args=[
280                            "write",
281                            "--config",
282                            config_file,
283                            "--catalog",
284                            catalog_file,
285                        ],
286                        stdin=AirbyteMessageIterator(
287                            progress_tracker.tally_pending_writes(
288                                stdin,
289                            )
290                        ),
291                    )
292                ):
293                    if destination_message.state:
294                        state_writer.write_state(state_message=destination_message.state)
295
296            except exc.AirbyteConnectorFailedError as ex:
297                raise exc.AirbyteConnectorWriteError(
298                    connector_name=self.name,
299                    log_text=self._last_log_messages,
300                    original_exception=ex,
301                ) from None

A class representing a destination that can be called.

Destination( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, validate: bool = False)
44    def __init__(
45        self,
46        executor: Executor,
47        name: str,
48        config: dict[str, Any] | None = None,
49        *,
50        config_change_callback: ConfigChangeCallback | None = None,
51        validate: bool = False,
52    ) -> None:
53        """Initialize the source.
54
55        If config is provided, it will be validated against the spec if validate is True.
56        """
57        super().__init__(
58            executor=executor,
59            name=name,
60            config=config,
61            config_change_callback=config_change_callback,
62            validate=validate,
63        )

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'destination'
def write( self, source_data: Source | ReadResult, *, streams: Union[list[str], Literal['*'], NoneType] = None, cache: Union[airbyte.caches.CacheBase, Literal[False], NoneType] = None, state_cache: Union[airbyte.caches.CacheBase, Literal[False], NoneType] = None, write_strategy: airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False) -> WriteResult:
 65    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 66        self,
 67        source_data: Source | ReadResult,
 68        *,
 69        streams: list[str] | Literal["*"] | None = None,
 70        cache: CacheBase | Literal[False] | None = None,
 71        state_cache: CacheBase | Literal[False] | None = None,
 72        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 73        force_full_refresh: bool = False,
 74    ) -> WriteResult:
 75        """Write data from source connector or already cached source data.
 76
 77        Caching is enabled by default, unless explicitly disabled.
 78
 79        Args:
 80            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 81            streams: The streams to write to the destination. If omitted or if "*" is provided,
 82                all streams will be written. If `source_data` is a source, then streams must be
 83                selected here or on the source. If both are specified, this setting will override
 84                the stream selection on the source.
 85            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 86                False, the cache will be disabled. This must be `None` if `source_data` is already
 87                a `Cache` object.
 88            state_cache: A cache to use for storing incremental state. You do not need to set this
 89                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 90                disable state management.
 91            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 92                will decide the best strategy to use.
 93            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 94                existing state will be ignored and all source data will be reloaded.
 95
 96        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
 97        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
 98        a known destination state, the destination-specific state will be used. If neither are
 99        available, a full refresh will be performed.
100        """
101        if not isinstance(source_data, ReadResult | Source):
102            raise exc.PyAirbyteInputError(
103                message="Invalid source_data type for `source_data` arg.",
104                context={
105                    "source_data_type_provided": type(source_data).__name__,
106                },
107            )
108
109        # Resolve `source`, `read_result`, and `source_name`
110        source: Source | None = source_data if isinstance(source_data, Source) else None
111        read_result: ReadResult | None = (
112            source_data if isinstance(source_data, ReadResult) else None
113        )
114        source_name: str = source.name if source else cast("ReadResult", read_result).source_name
115
116        # State providers and writers default to no-op, unless overridden below.
117        cache_state_provider: StateProviderBase = StaticInputState([])
118        """Provides the state of the cache's data."""
119        cache_state_writer: StateWriterBase = NoOpStateWriter()
120        """Writes updates for the state of the cache's data."""
121        destination_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the destination's data, from `cache` or `state_cache`."""
123        destination_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
125
126        # If caching not explicitly disabled
127        if cache is not False:
128            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
129            if isinstance(source_data, ReadResult):
130                cache = source_data.cache
131
132            cache = cache or get_default_cache()
133            cache_state_provider = cache.get_state_provider(
134                source_name=source_name,
135                destination_name=None,  # This will just track the cache state
136            )
137            cache_state_writer = cache.get_state_writer(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141
142        # Resolve `state_cache`
143        if state_cache is None:
144            state_cache = cache or get_default_cache()
145
146        # Resolve `destination_state_writer` and `destination_state_provider`
147        if state_cache:
148            destination_state_writer = state_cache.get_state_writer(
149                source_name=source_name,
150                destination_name=self.name,
151            )
152            if not force_full_refresh:
153                destination_state_provider = state_cache.get_state_provider(
154                    source_name=source_name,
155                    destination_name=self.name,
156                )
157        elif state_cache is not False:
158            warnings.warn(
159                "No state backend or cache provided. State will not be tracked."
160                "To track state, provide a cache or state backend."
161                "To silence this warning, set `state_cache=False` explicitly.",
162                category=exc.PyAirbyteWarning,
163                stacklevel=2,
164            )
165
166        # Resolve `catalog_provider`
167        if source:
168            catalog_provider = CatalogProvider(
169                configured_catalog=source.get_configured_catalog(
170                    streams=streams,
171                )
172            )
173        elif read_result:
174            catalog_provider = CatalogProvider.from_read_result(read_result)
175        else:
176            raise exc.PyAirbyteInternalError(
177                message="`source_data` must be a `Source` or `ReadResult` object.",
178            )
179
180        progress_tracker = ProgressTracker(
181            source=source if isinstance(source_data, Source) else None,
182            cache=cache or None,
183            destination=self,
184            expected_streams=catalog_provider.stream_names,
185        )
186
187        source_state_provider: StateProviderBase
188        source_state_provider = JoinedStateProvider(
189            primary=cache_state_provider,
190            secondary=destination_state_provider,
191        )
192
193        if source:
194            if cache is False:
195                # Get message iterator for source (caching disabled)
196                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
197                    streams=streams,
198                    state_provider=source_state_provider,
199                    progress_tracker=progress_tracker,
200                    force_full_refresh=force_full_refresh,
201                )
202            else:
203                # Caching enabled and we are reading from a source.
204                # Read the data to cache if caching is enabled.
205                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
206                    cache=cache,
207                    state_provider=source_state_provider,
208                    state_writer=cache_state_writer,
209                    catalog_provider=catalog_provider,
210                    stream_names=catalog_provider.stream_names,
211                    write_strategy=write_strategy,
212                    force_full_refresh=force_full_refresh,
213                    skip_validation=False,
214                    progress_tracker=progress_tracker,
215                )
216                message_iterator = AirbyteMessageIterator.from_read_result(
217                    read_result=read_result,
218                )
219        else:  # Else we are reading from a read result
220            assert read_result is not None
221            message_iterator = AirbyteMessageIterator.from_read_result(
222                read_result=read_result,
223            )
224
225        # Write the data to the destination
226        try:
227            self._write_airbyte_message_stream(
228                stdin=message_iterator,
229                catalog_provider=catalog_provider,
230                write_strategy=write_strategy,
231                state_writer=destination_state_writer,
232                progress_tracker=progress_tracker,
233            )
234        except Exception as ex:
235            progress_tracker.log_failure(exception=ex)
236            raise
237        else:
238            # No exceptions were raised, so log success
239            progress_tracker.log_success()
240
241        return WriteResult(
242            destination=self,
243            source_data=source_data,
244            catalog_provider=catalog_provider,
245            state_writer=destination_state_writer,
246            progress_tracker=progress_tracker,
247        )

Write data from source connector or already cached source data.

Caching is enabled by default, unless explicitly disabled.

Arguments:
  • source_data: The source data to write. Can be a Source or a ReadResult object.
  • streams: The streams to write to the destination. If omitted or if "*" is provided, all streams will be written. If source_data is a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source.
  • cache: The cache to use for reading source_data. If None, no cache will be used. If False, the cache will be disabled. This must be None if source_data is already a Cache object.
  • state_cache: A cache to use for storing incremental state. You do not need to set this if cache is specified or if source_data is a Cache object. Set to False to disable state management.
  • write_strategy: The strategy to use for writing source_data. If AUTO, the connector will decide the best strategy to use.
  • force_full_refresh: Whether to force a full refresh of the source_data. If True, any existing state will be ignored and all source data will be reloaded.

For incremental syncs, cache or state_cache will be checked for matching state values. If the cache has tracked state, this will be used for the sync. Otherwise, if there is a known destination state, the destination-specific state will be used. If neither are available, a full refresh will be performed.

Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
set_config
get_config
config_hash
validate_config
config_spec
print_config_spec
docs_url
connector_version
check
install
uninstall
class DuckDBCache(airbyte._processors.sql.duckdb.DuckDBConfig, airbyte.caches.base.CacheBase):
44class DuckDBCache(DuckDBConfig, CacheBase):
45    """A DuckDB cache."""
46
47    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = DuckDBSqlProcessor
48
49    paired_destination_name: ClassVar[str | None] = "destination-duckdb"
50    paired_destination_config_class: ClassVar[type | None] = DestinationDuckdb
51
52    @property
53    def paired_destination_config(self) -> DestinationDuckdb:
54        """Return a dictionary of destination configuration values."""
55        return duckdb_cache_to_destination_configuration(cache=self)

A DuckDB cache.

paired_destination_name: ClassVar[str | None] = 'destination-duckdb'
paired_destination_config_class: ClassVar[type | None] = <class 'airbyte_api.models.destination_duckdb.DestinationDuckdb'>
paired_destination_config: airbyte_api.models.destination_duckdb.DestinationDuckdb
52    @property
53    def paired_destination_config(self) -> DestinationDuckdb:
54        """Return a dictionary of destination configuration values."""
55        return duckdb_cache_to_destination_configuration(cache=self)

Return a dictionary of destination configuration values.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
122                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
123                        """We need to both initialize private attributes and call the user-defined model_post_init
124                        method.
125                        """
126                        init_private_attributes(self, context)
127                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
run_sql_query
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
create_source_tables
airbyte._processors.sql.duckdb.DuckDBConfig
db_path
schema_name
get_sql_alchemy_url
get_database_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_sql_alchemy_connect_args
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
airbyte._writers.base.AirbyteWriterInterface
name
class ReadResult(collections.abc.Mapping[str, airbyte.datasets._sql.CachedDataset]):
 33class ReadResult(Mapping[str, CachedDataset]):
 34    """The result of a read operation.
 35
 36    This class is used to return information about the read operation, such as the number of
 37    records read. It should not be created directly, but instead returned by the write method
 38    of a destination.
 39    """
 40
 41    def __init__(
 42        self,
 43        *,
 44        source_name: str,
 45        processed_streams: list[str],
 46        cache: CacheBase,
 47        progress_tracker: ProgressTracker,
 48    ) -> None:
 49        """Initialize a read result.
 50
 51        This class should not be created directly. Instead, it should be returned by the `read`
 52        method of the `Source` class.
 53        """
 54        self.source_name = source_name
 55        self._progress_tracker = progress_tracker
 56        self._cache = cache
 57        self._processed_streams = processed_streams
 58
 59    def __getitem__(self, stream: str) -> CachedDataset:
 60        """Return the cached dataset for a given stream name."""
 61        if stream not in self._processed_streams:
 62            raise KeyError(stream)
 63
 64        return CachedDataset(self._cache, stream)
 65
 66    def __contains__(self, stream: object) -> bool:
 67        """Return whether a given stream name was included in processing."""
 68        if not isinstance(stream, str):
 69            return False
 70
 71        return stream in self._processed_streams
 72
 73    def __iter__(self) -> Iterator[str]:
 74        """Return an iterator over the stream names that were processed."""
 75        return self._processed_streams.__iter__()
 76
 77    def __len__(self) -> int:
 78        """Return the number of streams that were processed."""
 79        return len(self._processed_streams)
 80
 81    def get_sql_engine(self) -> Engine:
 82        """Return the SQL engine used by the cache."""
 83        return self._cache.get_sql_engine()
 84
 85    @property
 86    def processed_records(self) -> int:
 87        """The total number of records read from the source."""
 88        return self._progress_tracker.total_records_read
 89
 90    @property
 91    def streams(self) -> Mapping[str, CachedDataset]:
 92        """Return a mapping of stream names to cached datasets."""
 93        return {
 94            stream_name: CachedDataset(self._cache, stream_name)
 95            for stream_name in self._processed_streams
 96        }
 97
 98    @property
 99    def cache(self) -> CacheBase:
100        """Return the cache object."""
101        return self._cache

The result of a read operation.

This class is used to return information about the read operation, such as the number of records read. It should not be created directly, but instead returned by the write method of a destination.

ReadResult( *, source_name: str, processed_streams: list[str], cache: airbyte.caches.CacheBase, progress_tracker: airbyte.progress.ProgressTracker)
41    def __init__(
42        self,
43        *,
44        source_name: str,
45        processed_streams: list[str],
46        cache: CacheBase,
47        progress_tracker: ProgressTracker,
48    ) -> None:
49        """Initialize a read result.
50
51        This class should not be created directly. Instead, it should be returned by the `read`
52        method of the `Source` class.
53        """
54        self.source_name = source_name
55        self._progress_tracker = progress_tracker
56        self._cache = cache
57        self._processed_streams = processed_streams

Initialize a read result.

This class should not be created directly. Instead, it should be returned by the read method of the Source class.

source_name
def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
81    def get_sql_engine(self) -> Engine:
82        """Return the SQL engine used by the cache."""
83        return self._cache.get_sql_engine()

Return the SQL engine used by the cache.

processed_records: int
85    @property
86    def processed_records(self) -> int:
87        """The total number of records read from the source."""
88        return self._progress_tracker.total_records_read

The total number of records read from the source.

streams: Mapping[str, CachedDataset]
90    @property
91    def streams(self) -> Mapping[str, CachedDataset]:
92        """Return a mapping of stream names to cached datasets."""
93        return {
94            stream_name: CachedDataset(self._cache, stream_name)
95            for stream_name in self._processed_streams
96        }

Return a mapping of stream names to cached datasets.

cache: airbyte.caches.CacheBase
 98    @property
 99    def cache(self) -> CacheBase:
100        """Return the cache object."""
101        return self._cache

Return the cache object.

Inherited Members
collections.abc.Mapping
get
keys
items
values
class SecretSourceEnum(builtins.str, enum.Enum):
24class SecretSourceEnum(str, Enum):
25    """Enumeration of secret sources supported by PyAirbyte."""
26
27    ENV = "env"
28    DOTENV = "dotenv"
29    GOOGLE_COLAB = "google_colab"
30    GOOGLE_GSM = "google_gsm"  # Not enabled by default
31
32    PROMPT = "prompt"

Enumeration of secret sources supported by PyAirbyte.

ENV = <SecretSourceEnum.ENV: 'env'>
DOTENV = <SecretSourceEnum.DOTENV: 'dotenv'>
GOOGLE_COLAB = <SecretSourceEnum.GOOGLE_COLAB: 'google_colab'>
GOOGLE_GSM = <SecretSourceEnum.GOOGLE_GSM: 'google_gsm'>
PROMPT = <SecretSourceEnum.PROMPT: 'prompt'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Source(airbyte._connector_base.ConnectorBase):
 68class Source(ConnectorBase):  # noqa: PLR0904
 69    """A class representing a source that can be called."""
 70
 71    connector_type = "source"
 72
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)
118
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)
128
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key
146
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
169
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )
189
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )
216
217    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
218        """Logs a warning message indicating stream selection which are not selected yet."""
219        if streams == "*":
220            print(
221                "Warning: Config is not set yet. All streams will be selected after config is set.",
222                file=sys.stderr,
223            )
224        else:
225            print(
226                "Warning: Config is not set yet. "
227                f"Streams to be selected after config is set: {streams}",
228                file=sys.stderr,
229            )
230
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()
243
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams
274
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names
281
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []
303
304    def _discover(self) -> AirbyteCatalog:
305        """Call discover on the connector.
306
307        This involves the following steps:
308        - Write the config to a temporary file
309        - execute the connector with discover --config <config_file>
310        - Listen to the messages and return the first AirbyteCatalog that comes along.
311        - Make sure the subprocess is killed when the function returns.
312        """
313        with as_temp_files([self._hydrated_config]) as [config_file]:
314            for msg in self._execute(["discover", "--config", config_file]):
315                if msg.type == Type.CATALOG and msg.catalog:
316                    return msg.catalog
317            raise exc.AirbyteConnectorMissingCatalogError(
318                connector_name=self.name,
319                log_text=self._last_log_messages,
320            )
321
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]
325
326    def _get_incremental_stream_names(self) -> list[str]:
327        """Get the name of streams that support incremental sync."""
328        return [
329            stream.name
330            for stream in self.discovered_catalog.streams
331            if SyncMode.incremental in stream.supported_sync_modes
332        ]
333
334    @override
335    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
336        """Call spec on the connector.
337
338        This involves the following steps:
339        * execute the connector with spec
340        * Listen to the messages and return the first AirbyteCatalog that comes along.
341        * Make sure the subprocess is killed when the function returns.
342        """
343        if force_refresh or self._spec is None:
344            for msg in self._execute(["spec"]):
345                if msg.type == Type.SPEC and msg.spec:
346                    self._spec = msg.spec
347                    break
348
349        if self._spec:
350            return self._spec
351
352        raise exc.AirbyteConnectorMissingSpecError(
353            connector_name=self.name,
354            log_text=self._last_log_messages,
355        )
356
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification
368
369    @property
370    def _yaml_spec(self) -> str:
371        """Get the spec as a yaml string.
372
373        For now, the primary use case is for writing and debugging a valid config for a source.
374
375        This is private for now because we probably want better polish before exposing this
376        as a stable interface. This will also get easier when we have docs links with this info
377        for each connector.
378        """
379        spec_obj: ConnectorSpecification = self._get_spec()
380        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
381        # convert to a yaml string
382        return yaml.dump(spec_dict)
383
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )
390
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog
401
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)
420
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424    ) -> ConfiguredAirbyteCatalog:
425        """Get a configured catalog for the given streams.
426
427        If no streams are provided, the selected streams will be used. If no streams are selected,
428        all available streams will be used.
429
430        If '*' is provided, all available streams will be used.
431        """
432        selected_streams: list[str] = []
433        if streams is None:
434            selected_streams = self._selected_stream_names or self.get_available_streams()
435        elif streams == "*":
436            selected_streams = self.get_available_streams()
437        elif isinstance(streams, list):
438            selected_streams = streams
439        else:
440            raise exc.PyAirbyteInputError(
441                message="Invalid streams argument.",
442                input_value=streams,
443            )
444
445        return ConfiguredAirbyteCatalog(
446            streams=[
447                ConfiguredAirbyteStream(
448                    stream=stream,
449                    destination_sync_mode=DestinationSyncMode.overwrite,
450                    sync_mode=SyncMode.incremental,
451                    primary_key=(
452                        [self._primary_key_overrides[stream.name.lower()]]
453                        if stream.name.lower() in self._primary_key_overrides
454                        else stream.source_defined_primary_key
455                    ),
456                    cursor_field=(
457                        [self._cursor_key_overrides[stream.name.lower()]]
458                        if stream.name.lower() in self._cursor_key_overrides
459                        else stream.default_cursor_field
460                    ),
461                    # These are unused in the current implementation:
462                    generation_id=None,
463                    minimum_generation_id=None,
464                    sync_id=None,
465                )
466                for stream in self.discovered_catalog.streams
467                if stream.name in selected_streams
468            ],
469        )
470
471    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
472        """Return the JSON Schema spec for the specified stream name."""
473        catalog: AirbyteCatalog = self.discovered_catalog
474        found: list[AirbyteStream] = [
475            stream for stream in catalog.streams if stream.name == stream_name
476        ]
477
478        if len(found) == 0:
479            raise exc.PyAirbyteInputError(
480                message="Stream name does not exist in catalog.",
481                input_value=stream_name,
482            )
483
484        if len(found) > 1:
485            raise exc.PyAirbyteInternalError(
486                message="Duplicate streams found with the same name.",
487                context={
488                    "found_streams": found,
489                },
490            )
491
492        return found[0].json_schema
493
494    def get_records(
495        self,
496        stream: str,
497        *,
498        limit: int | None = None,
499        stop_event: threading.Event | None = None,
500        normalize_field_names: bool = False,
501        prune_undeclared_fields: bool = True,
502    ) -> LazyDataset:
503        """Read a stream from the connector.
504
505        Args:
506            stream: The name of the stream to read.
507            limit: The maximum number of records to read. If None, all records will be read.
508            stop_event: If set, the event can be triggered by the caller to stop reading records
509                and terminate the process.
510            normalize_field_names: When `True`, field names will be normalized to lower case, with
511                special characters removed. This matches the behavior of PyAirbyte caches and most
512                Airbyte destinations.
513            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
514                which generally matches the behavior of PyAirbyte caches and most Airbyte
515                destinations, specifically when you expect the catalog may be stale. You can disable
516                this to keep all fields in the records.
517
518        This involves the following steps:
519        * Call discover to get the catalog
520        * Generate a configured catalog that syncs the given stream in full_refresh mode
521        * Write the configured catalog and the config to a temporary file
522        * execute the connector with read --config <config_file> --catalog <catalog_file>
523        * Listen to the messages and return the first AirbyteRecordMessages that come along.
524        * Make sure the subprocess is killed when the function returns.
525        """
526        stop_event = stop_event or threading.Event()
527        configured_catalog = self.get_configured_catalog(streams=[stream])
528        if len(configured_catalog.streams) == 0:
529            raise exc.PyAirbyteInputError(
530                message="Requested stream does not exist.",
531                context={
532                    "stream": stream,
533                    "available_streams": self.get_available_streams(),
534                    "connector_name": self.name,
535                },
536            ) from KeyError(stream)
537
538        configured_stream = configured_catalog.streams[0]
539
540        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
541            yield from records
542
543        stream_record_handler = StreamRecordHandler(
544            json_schema=self.get_stream_json_schema(stream),
545            prune_extra_fields=prune_undeclared_fields,
546            normalize_keys=normalize_field_names,
547        )
548
549        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
550        progress_tracker = ProgressTracker(
551            ProgressStyle.PLAIN,
552            source=self,
553            cache=None,
554            destination=None,
555            expected_streams=[stream],
556        )
557
558        iterator: Iterator[dict[str, Any]] = (
559            StreamRecord.from_record_message(
560                record_message=record.record,
561                stream_record_handler=stream_record_handler,
562            )
563            for record in self._read_with_catalog(
564                catalog=configured_catalog,
565                progress_tracker=progress_tracker,
566                stop_event=stop_event,
567            )
568            if record.record
569        )
570        if limit is not None:
571            # Stop the iterator after the limit is reached
572            iterator = islice(iterator, limit)
573
574        return LazyDataset(
575            iterator,
576            stream_metadata=configured_stream,
577            stop_event=stop_event,
578            progress_tracker=progress_tracker,
579        )
580
581    def get_documents(
582        self,
583        stream: str,
584        title_property: str | None = None,
585        content_properties: list[str] | None = None,
586        metadata_properties: list[str] | None = None,
587        *,
588        render_metadata: bool = False,
589    ) -> Iterable[Document]:
590        """Read a stream from the connector and return the records as documents.
591
592        If metadata_properties is not set, all properties that are not content will be added to
593        the metadata.
594
595        If render_metadata is True, metadata will be rendered in the document, as well as the
596        the main content.
597        """
598        return self.get_records(stream).to_documents(
599            title_property=title_property,
600            content_properties=content_properties,
601            metadata_properties=metadata_properties,
602            render_metadata=render_metadata,
603        )
604
605    def get_samples(
606        self,
607        streams: list[str] | Literal["*"] | None = None,
608        *,
609        limit: int = 5,
610        on_error: Literal["raise", "ignore", "log"] = "raise",
611    ) -> dict[str, InMemoryDataset | None]:
612        """Get a sample of records from the given streams."""
613        if streams == "*":
614            streams = self.get_available_streams()
615        elif streams is None:
616            streams = self.get_selected_streams()
617
618        results: dict[str, InMemoryDataset | None] = {}
619        for stream in streams:
620            stop_event = threading.Event()
621            try:
622                results[stream] = self.get_records(
623                    stream,
624                    limit=limit,
625                    stop_event=stop_event,
626                ).fetch_all()
627                stop_event.set()
628            except Exception as ex:
629                results[stream] = None
630                if on_error == "ignore":
631                    continue
632
633                if on_error == "raise":
634                    raise ex from None
635
636                if on_error == "log":
637                    print(f"Error fetching sample for stream '{stream}': {ex}")
638
639        return results
640
641    def print_samples(
642        self,
643        streams: list[str] | Literal["*"] | None = None,
644        *,
645        limit: int = 5,
646        on_error: Literal["raise", "ignore", "log"] = "log",
647    ) -> None:
648        """Print a sample of records from the given streams."""
649        internal_cols: list[str] = [
650            AB_EXTRACTED_AT_COLUMN,
651            AB_META_COLUMN,
652            AB_RAW_ID_COLUMN,
653        ]
654        col_limit = 10
655        if streams == "*":
656            streams = self.get_available_streams()
657        elif streams is None:
658            streams = self.get_selected_streams()
659
660        console = Console()
661
662        console.print(
663            Markdown(
664                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
665                justify="left",
666            )
667        )
668
669        for stream in streams:
670            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
671            samples = self.get_samples(
672                streams=[stream],
673                limit=limit,
674                on_error=on_error,
675            )
676            dataset = samples[stream]
677
678            table = Table(
679                show_header=True,
680                show_lines=True,
681            )
682            if dataset is None:
683                console.print(
684                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
685                )
686                continue
687
688            if len(dataset.column_names) > col_limit:
689                # We'll pivot the columns so each column is its own row
690                table.add_column("Column Name")
691                for _ in range(len(dataset)):
692                    table.add_column(overflow="fold")
693                for col in dataset.column_names:
694                    table.add_row(
695                        Markdown(f"**`{col}`**"),
696                        *[escape(str(record[col])) for record in dataset],
697                    )
698            else:
699                for col in dataset.column_names:
700                    table.add_column(
701                        Markdown(f"**`{col}`**"),
702                        overflow="fold",
703                    )
704
705                for record in dataset:
706                    table.add_row(
707                        *[
708                            escape(str(val))
709                            for key, val in record.items()
710                            # Exclude internal Airbyte columns.
711                            if key not in internal_cols
712                        ]
713                    )
714
715            console.print(table)
716
717        console.print(Markdown("--------------"))
718
719    def _get_airbyte_message_iterator(
720        self,
721        *,
722        streams: Literal["*"] | list[str] | None = None,
723        state_provider: StateProviderBase | None = None,
724        progress_tracker: ProgressTracker,
725        force_full_refresh: bool = False,
726    ) -> AirbyteMessageIterator:
727        """Get an AirbyteMessageIterator for this source."""
728        return AirbyteMessageIterator(
729            self._read_with_catalog(
730                catalog=self.get_configured_catalog(streams=streams),
731                state=state_provider if not force_full_refresh else None,
732                progress_tracker=progress_tracker,
733            )
734        )
735
736    def _read_with_catalog(
737        self,
738        catalog: ConfiguredAirbyteCatalog,
739        progress_tracker: ProgressTracker,
740        *,
741        state: StateProviderBase | None = None,
742        stop_event: threading.Event | None = None,
743    ) -> Generator[AirbyteMessage, None, None]:
744        """Call read on the connector.
745
746        This involves the following steps:
747        * Write the config to a temporary file
748        * execute the connector with read --config <config_file> --catalog <catalog_file>
749        * Listen to the messages and return the AirbyteRecordMessages that come along.
750        * Send out telemetry on the performed sync (with information about which source was used and
751          the type of the cache)
752        """
753        with as_temp_files(
754            [
755                self._hydrated_config,
756                catalog.model_dump_json(),
757                state.to_state_input_file_text() if state else "[]",
758            ]
759        ) as [
760            config_file,
761            catalog_file,
762            state_file,
763        ]:
764            message_generator = self._execute(
765                [
766                    "read",
767                    "--config",
768                    config_file,
769                    "--catalog",
770                    catalog_file,
771                    "--state",
772                    state_file,
773                ],
774                progress_tracker=progress_tracker,
775            )
776            for message in progress_tracker.tally_records_read(message_generator):
777                if stop_event and stop_event.is_set():
778                    progress_tracker._log_sync_cancel()  # noqa: SLF001
779                    time.sleep(0.1)
780                    return
781
782                yield message
783
784        progress_tracker.log_read_complete()
785
786    def _peek_airbyte_message(
787        self,
788        message: AirbyteMessage,
789        *,
790        raise_on_error: bool = True,
791    ) -> None:
792        """Process an Airbyte message.
793
794        This method handles reading Airbyte messages and taking action, if needed, based on the
795        message type. For instance, log messages are logged, records are tallied, and errors are
796        raised as exceptions if `raise_on_error` is True.
797
798        Raises:
799            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
800        """
801        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
802
803    def _log_incremental_streams(
804        self,
805        *,
806        incremental_streams: set[str] | None = None,
807    ) -> None:
808        """Log the streams which are using incremental sync mode."""
809        log_message = (
810            "The following streams are currently using incremental sync:\n"
811            f"{incremental_streams}\n"
812            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
813        )
814        print(log_message, file=sys.stderr)
815
816    def read(
817        self,
818        cache: CacheBase | None = None,
819        *,
820        streams: str | list[str] | None = None,
821        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
822        force_full_refresh: bool = False,
823        skip_validation: bool = False,
824    ) -> ReadResult:
825        """Read from the connector and write to the cache.
826
827        Args:
828            cache: The cache to write to. If not set, a default cache will be used.
829            streams: Optional if already set. A list of stream names to select for reading. If set
830                to "*", all streams will be selected.
831            write_strategy: The strategy to use when writing to the cache. If a string, it must be
832                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
833                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
834                WriteStrategy.AUTO.
835            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
836                streams will be read in incremental mode if supported by the connector. This option
837                must be True when using the "replace" strategy.
838            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
839                running the connector. This can be helpful in debugging, when you want to send
840                configurations to the connector that otherwise might be rejected by JSON Schema
841                validation rules.
842        """
843        cache = cache or get_default_cache()
844        progress_tracker = ProgressTracker(
845            source=self,
846            cache=cache,
847            destination=None,
848            expected_streams=None,  # Will be set later
849        )
850
851        # Set up state provider if not in full refresh mode
852        if force_full_refresh:
853            state_provider: StateProviderBase | None = None
854        else:
855            state_provider = cache.get_state_provider(
856                source_name=self._name,
857            )
858        state_writer = cache.get_state_writer(source_name=self._name)
859
860        if streams:
861            self.select_streams(streams)
862
863        if not self._selected_stream_names:
864            raise exc.PyAirbyteNoStreamsSelectedError(
865                connector_name=self.name,
866                available_streams=self.get_available_streams(),
867            )
868
869        try:
870            result = self._read_to_cache(
871                cache=cache,
872                catalog_provider=CatalogProvider(self.configured_catalog),
873                stream_names=self._selected_stream_names,
874                state_provider=state_provider,
875                state_writer=state_writer,
876                write_strategy=write_strategy,
877                force_full_refresh=force_full_refresh,
878                skip_validation=skip_validation,
879                progress_tracker=progress_tracker,
880            )
881        except exc.PyAirbyteInternalError as ex:
882            progress_tracker.log_failure(exception=ex)
883            raise exc.AirbyteConnectorFailedError(
884                connector_name=self.name,
885                log_text=self._last_log_messages,
886            ) from ex
887        except Exception as ex:
888            progress_tracker.log_failure(exception=ex)
889            raise
890
891        progress_tracker.log_success()
892        return result
893
894    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
895        self,
896        cache: CacheBase,
897        *,
898        catalog_provider: CatalogProvider,
899        stream_names: list[str],
900        state_provider: StateProviderBase | None,
901        state_writer: StateWriterBase | None,
902        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
903        force_full_refresh: bool = False,
904        skip_validation: bool = False,
905        progress_tracker: ProgressTracker,
906    ) -> ReadResult:
907        """Internal read method."""
908        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
909            warnings.warn(
910                message=(
911                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
912                    "could result in data loss. "
913                    "To silence this warning, use the following: "
914                    'warnings.filterwarnings("ignore", '
915                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
916                ),
917                category=exc.PyAirbyteDataLossWarning,
918                stacklevel=1,
919            )
920        if isinstance(write_strategy, str):
921            try:
922                write_strategy = WriteStrategy(write_strategy)
923            except ValueError:
924                raise exc.PyAirbyteInputError(
925                    message="Invalid strategy",
926                    context={
927                        "write_strategy": write_strategy,
928                        "available_strategies": [s.value for s in WriteStrategy],
929                    },
930                ) from None
931
932        # Run optional validation step
933        if not skip_validation:
934            self.validate_config()
935
936        # Log incremental stream if incremental streams are known
937        if state_provider and state_provider.known_stream_names:
938            # Retrieve set of the known streams support which support incremental sync
939            incremental_streams = (
940                set(self._get_incremental_stream_names())
941                & state_provider.known_stream_names
942                & set(self.get_selected_streams())
943            )
944            if incremental_streams:
945                self._log_incremental_streams(incremental_streams=incremental_streams)
946
947        airbyte_message_iterator = AirbyteMessageIterator(
948            self._read_with_catalog(
949                catalog=catalog_provider.configured_catalog,
950                state=state_provider,
951                progress_tracker=progress_tracker,
952            )
953        )
954        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
955            stdin=airbyte_message_iterator,
956            catalog_provider=catalog_provider,
957            write_strategy=write_strategy,
958            state_writer=state_writer,
959            progress_tracker=progress_tracker,
960        )
961
962        # Flush the WAL, if applicable
963        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
964
965        return ReadResult(
966            source_name=self.name,
967            progress_tracker=progress_tracker,
968            processed_streams=stream_names,
969            cache=cache,
970        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False, cursor_key_overrides: dict[str, str] | None = None, primary_key_overrides: dict[str, str | list[str]] | None = None)
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)

Deprecated. See select_streams().

def set_cursor_key(self, stream_name: str, cursor_key: str) -> None:
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key

Set the cursor for a single stream.

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_cursor_keys(self, **kwargs: str) -> None:
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})

Override the cursor key for one or more streams.

Usage:

source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_key(self, stream_name: str, primary_key: str | list[str]) -> None:
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )

Set the primary key for a single stream.

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_keys(self, **kwargs: str | list[str]) -> None:
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )

Override the primary keys for one or more streams.

This does not unset previously set primary keys.

Usage:

source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def select_all_streams(self) -> None:
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_available_streams(self) -> list[str]:
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

docs_url: str
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424    ) -> ConfiguredAirbyteCatalog:
425        """Get a configured catalog for the given streams.
426
427        If no streams are provided, the selected streams will be used. If no streams are selected,
428        all available streams will be used.
429
430        If '*' is provided, all available streams will be used.
431        """
432        selected_streams: list[str] = []
433        if streams is None:
434            selected_streams = self._selected_stream_names or self.get_available_streams()
435        elif streams == "*":
436            selected_streams = self.get_available_streams()
437        elif isinstance(streams, list):
438            selected_streams = streams
439        else:
440            raise exc.PyAirbyteInputError(
441                message="Invalid streams argument.",
442                input_value=streams,
443            )
444
445        return ConfiguredAirbyteCatalog(
446            streams=[
447                ConfiguredAirbyteStream(
448                    stream=stream,
449                    destination_sync_mode=DestinationSyncMode.overwrite,
450                    sync_mode=SyncMode.incremental,
451                    primary_key=(
452                        [self._primary_key_overrides[stream.name.lower()]]
453                        if stream.name.lower() in self._primary_key_overrides
454                        else stream.source_defined_primary_key
455                    ),
456                    cursor_field=(
457                        [self._cursor_key_overrides[stream.name.lower()]]
458                        if stream.name.lower() in self._cursor_key_overrides
459                        else stream.default_cursor_field
460                    ),
461                    # These are unused in the current implementation:
462                    generation_id=None,
463                    minimum_generation_id=None,
464                    sync_id=None,
465                )
466                for stream in self.discovered_catalog.streams
467                if stream.name in selected_streams
468            ],
469        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
471    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
472        """Return the JSON Schema spec for the specified stream name."""
473        catalog: AirbyteCatalog = self.discovered_catalog
474        found: list[AirbyteStream] = [
475            stream for stream in catalog.streams if stream.name == stream_name
476        ]
477
478        if len(found) == 0:
479            raise exc.PyAirbyteInputError(
480                message="Stream name does not exist in catalog.",
481                input_value=stream_name,
482            )
483
484        if len(found) > 1:
485            raise exc.PyAirbyteInternalError(
486                message="Duplicate streams found with the same name.",
487                context={
488                    "found_streams": found,
489                },
490            )
491
492        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, limit: int | None = None, stop_event: threading.Event | None = None, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
494    def get_records(
495        self,
496        stream: str,
497        *,
498        limit: int | None = None,
499        stop_event: threading.Event | None = None,
500        normalize_field_names: bool = False,
501        prune_undeclared_fields: bool = True,
502    ) -> LazyDataset:
503        """Read a stream from the connector.
504
505        Args:
506            stream: The name of the stream to read.
507            limit: The maximum number of records to read. If None, all records will be read.
508            stop_event: If set, the event can be triggered by the caller to stop reading records
509                and terminate the process.
510            normalize_field_names: When `True`, field names will be normalized to lower case, with
511                special characters removed. This matches the behavior of PyAirbyte caches and most
512                Airbyte destinations.
513            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
514                which generally matches the behavior of PyAirbyte caches and most Airbyte
515                destinations, specifically when you expect the catalog may be stale. You can disable
516                this to keep all fields in the records.
517
518        This involves the following steps:
519        * Call discover to get the catalog
520        * Generate a configured catalog that syncs the given stream in full_refresh mode
521        * Write the configured catalog and the config to a temporary file
522        * execute the connector with read --config <config_file> --catalog <catalog_file>
523        * Listen to the messages and return the first AirbyteRecordMessages that come along.
524        * Make sure the subprocess is killed when the function returns.
525        """
526        stop_event = stop_event or threading.Event()
527        configured_catalog = self.get_configured_catalog(streams=[stream])
528        if len(configured_catalog.streams) == 0:
529            raise exc.PyAirbyteInputError(
530                message="Requested stream does not exist.",
531                context={
532                    "stream": stream,
533                    "available_streams": self.get_available_streams(),
534                    "connector_name": self.name,
535                },
536            ) from KeyError(stream)
537
538        configured_stream = configured_catalog.streams[0]
539
540        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
541            yield from records
542
543        stream_record_handler = StreamRecordHandler(
544            json_schema=self.get_stream_json_schema(stream),
545            prune_extra_fields=prune_undeclared_fields,
546            normalize_keys=normalize_field_names,
547        )
548
549        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
550        progress_tracker = ProgressTracker(
551            ProgressStyle.PLAIN,
552            source=self,
553            cache=None,
554            destination=None,
555            expected_streams=[stream],
556        )
557
558        iterator: Iterator[dict[str, Any]] = (
559            StreamRecord.from_record_message(
560                record_message=record.record,
561                stream_record_handler=stream_record_handler,
562            )
563            for record in self._read_with_catalog(
564                catalog=configured_catalog,
565                progress_tracker=progress_tracker,
566                stop_event=stop_event,
567            )
568            if record.record
569        )
570        if limit is not None:
571            # Stop the iterator after the limit is reached
572            iterator = islice(iterator, limit)
573
574        return LazyDataset(
575            iterator,
576            stream_metadata=configured_stream,
577            stop_event=stop_event,
578            progress_tracker=progress_tracker,
579        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • limit: The maximum number of records to read. If None, all records will be read.
  • stop_event: If set, the event can be triggered by the caller to stop reading records and terminate the process.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
581    def get_documents(
582        self,
583        stream: str,
584        title_property: str | None = None,
585        content_properties: list[str] | None = None,
586        metadata_properties: list[str] | None = None,
587        *,
588        render_metadata: bool = False,
589    ) -> Iterable[Document]:
590        """Read a stream from the connector and return the records as documents.
591
592        If metadata_properties is not set, all properties that are not content will be added to
593        the metadata.
594
595        If render_metadata is True, metadata will be rendered in the document, as well as the
596        the main content.
597        """
598        return self.get_records(stream).to_documents(
599            title_property=title_property,
600            content_properties=content_properties,
601            metadata_properties=metadata_properties,
602            render_metadata=render_metadata,
603        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def get_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'raise') -> dict[str, airbyte.datasets._inmemory.InMemoryDataset | None]:
605    def get_samples(
606        self,
607        streams: list[str] | Literal["*"] | None = None,
608        *,
609        limit: int = 5,
610        on_error: Literal["raise", "ignore", "log"] = "raise",
611    ) -> dict[str, InMemoryDataset | None]:
612        """Get a sample of records from the given streams."""
613        if streams == "*":
614            streams = self.get_available_streams()
615        elif streams is None:
616            streams = self.get_selected_streams()
617
618        results: dict[str, InMemoryDataset | None] = {}
619        for stream in streams:
620            stop_event = threading.Event()
621            try:
622                results[stream] = self.get_records(
623                    stream,
624                    limit=limit,
625                    stop_event=stop_event,
626                ).fetch_all()
627                stop_event.set()
628            except Exception as ex:
629                results[stream] = None
630                if on_error == "ignore":
631                    continue
632
633                if on_error == "raise":
634                    raise ex from None
635
636                if on_error == "log":
637                    print(f"Error fetching sample for stream '{stream}': {ex}")
638
639        return results

Get a sample of records from the given streams.

def print_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'log') -> None:
641    def print_samples(
642        self,
643        streams: list[str] | Literal["*"] | None = None,
644        *,
645        limit: int = 5,
646        on_error: Literal["raise", "ignore", "log"] = "log",
647    ) -> None:
648        """Print a sample of records from the given streams."""
649        internal_cols: list[str] = [
650            AB_EXTRACTED_AT_COLUMN,
651            AB_META_COLUMN,
652            AB_RAW_ID_COLUMN,
653        ]
654        col_limit = 10
655        if streams == "*":
656            streams = self.get_available_streams()
657        elif streams is None:
658            streams = self.get_selected_streams()
659
660        console = Console()
661
662        console.print(
663            Markdown(
664                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
665                justify="left",
666            )
667        )
668
669        for stream in streams:
670            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
671            samples = self.get_samples(
672                streams=[stream],
673                limit=limit,
674                on_error=on_error,
675            )
676            dataset = samples[stream]
677
678            table = Table(
679                show_header=True,
680                show_lines=True,
681            )
682            if dataset is None:
683                console.print(
684                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
685                )
686                continue
687
688            if len(dataset.column_names) > col_limit:
689                # We'll pivot the columns so each column is its own row
690                table.add_column("Column Name")
691                for _ in range(len(dataset)):
692                    table.add_column(overflow="fold")
693                for col in dataset.column_names:
694                    table.add_row(
695                        Markdown(f"**`{col}`**"),
696                        *[escape(str(record[col])) for record in dataset],
697                    )
698            else:
699                for col in dataset.column_names:
700                    table.add_column(
701                        Markdown(f"**`{col}`**"),
702                        overflow="fold",
703                    )
704
705                for record in dataset:
706                    table.add_row(
707                        *[
708                            escape(str(val))
709                            for key, val in record.items()
710                            # Exclude internal Airbyte columns.
711                            if key not in internal_cols
712                        ]
713                    )
714
715            console.print(table)
716
717        console.print(Markdown("--------------"))

Print a sample of records from the given streams.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> ReadResult:
816    def read(
817        self,
818        cache: CacheBase | None = None,
819        *,
820        streams: str | list[str] | None = None,
821        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
822        force_full_refresh: bool = False,
823        skip_validation: bool = False,
824    ) -> ReadResult:
825        """Read from the connector and write to the cache.
826
827        Args:
828            cache: The cache to write to. If not set, a default cache will be used.
829            streams: Optional if already set. A list of stream names to select for reading. If set
830                to "*", all streams will be selected.
831            write_strategy: The strategy to use when writing to the cache. If a string, it must be
832                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
833                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
834                WriteStrategy.AUTO.
835            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
836                streams will be read in incremental mode if supported by the connector. This option
837                must be True when using the "replace" strategy.
838            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
839                running the connector. This can be helpful in debugging, when you want to send
840                configurations to the connector that otherwise might be rejected by JSON Schema
841                validation rules.
842        """
843        cache = cache or get_default_cache()
844        progress_tracker = ProgressTracker(
845            source=self,
846            cache=cache,
847            destination=None,
848            expected_streams=None,  # Will be set later
849        )
850
851        # Set up state provider if not in full refresh mode
852        if force_full_refresh:
853            state_provider: StateProviderBase | None = None
854        else:
855            state_provider = cache.get_state_provider(
856                source_name=self._name,
857            )
858        state_writer = cache.get_state_writer(source_name=self._name)
859
860        if streams:
861            self.select_streams(streams)
862
863        if not self._selected_stream_names:
864            raise exc.PyAirbyteNoStreamsSelectedError(
865                connector_name=self.name,
866                available_streams=self.get_available_streams(),
867            )
868
869        try:
870            result = self._read_to_cache(
871                cache=cache,
872                catalog_provider=CatalogProvider(self.configured_catalog),
873                stream_names=self._selected_stream_names,
874                state_provider=state_provider,
875                state_writer=state_writer,
876                write_strategy=write_strategy,
877                force_full_refresh=force_full_refresh,
878                skip_validation=skip_validation,
879                progress_tracker=progress_tracker,
880            )
881        except exc.PyAirbyteInternalError as ex:
882            progress_tracker.log_failure(exception=ex)
883            raise exc.AirbyteConnectorFailedError(
884                connector_name=self.name,
885                log_text=self._last_log_messages,
886            ) from ex
887        except Exception as ex:
888            progress_tracker.log_failure(exception=ex)
889            raise
890
891        progress_tracker.log_success()
892        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
get_config
config_hash
validate_config
print_config_spec
connector_version
check
install
uninstall
class StreamRecord(dict[str, typing.Any]):
176class StreamRecord(dict[str, Any]):
177    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
178
179    It has these behaviors:
180    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
181      case-insensitive manner.
182    - The original case is stored in a separate dictionary, so that the original case can be
183      retrieved when needed.
184    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
185      Python dictionary.
186    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
187      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
188
189    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
190    references.
191
192    There are two ways this class can store keys internally:
193    - If normalize_keys is True, the keys are normalized using the given normalizer.
194    - If normalize_keys is False, the original case of the keys is stored.
195
196    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
197    dictionary will be initialized with the given keys. If a key is not found in the input data, it
198    will be initialized with a value of None. When provided, the 'expected_keys' input will also
199    determine the original case of the keys.
200    """
201
202    def __init__(
203        self,
204        from_dict: dict,
205        *,
206        stream_record_handler: StreamRecordHandler,
207        with_internal_columns: bool = True,
208        extracted_at: datetime | None = None,
209    ) -> None:
210        """Initialize the dictionary with the given data.
211
212        Args:
213            from_dict: The dictionary to initialize the StreamRecord with.
214            stream_record_handler: The StreamRecordHandler to use for processing the record.
215            with_internal_columns: If `True`, the internal columns will be added to the record.
216            extracted_at: The time the record was extracted. If not provided, the current time will
217                be used.
218        """
219        self._stream_handler: StreamRecordHandler = stream_record_handler
220
221        # Start by initializing all values to None
222        self.update(dict.fromkeys(stream_record_handler.index_keys))
223
224        # Update the dictionary with the given data
225        if self._stream_handler.prune_extra_fields:
226            self.update(
227                {
228                    self._stream_handler.to_index_case(k): v
229                    for k, v in from_dict.items()
230                    if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys
231                }
232            )
233        else:
234            self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()})
235
236        if with_internal_columns:
237            self.update(
238                {
239                    AB_RAW_ID_COLUMN: uuid7str(),
240                    AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc),
241                    AB_META_COLUMN: {},
242                }
243            )
244
245    @classmethod
246    def from_record_message(
247        cls,
248        record_message: AirbyteRecordMessage,
249        *,
250        stream_record_handler: StreamRecordHandler,
251    ) -> StreamRecord:
252        """Return a StreamRecord from a RecordMessage."""
253        data_dict: dict[str, Any] = record_message.data.copy()
254        return cls(
255            from_dict=data_dict,
256            stream_record_handler=stream_record_handler,
257            with_internal_columns=True,
258            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
259        )
260
261    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
262        """Return the item with the given key."""
263        try:
264            return super().__getitem__(key)
265        except KeyError:
266            return super().__getitem__(self._stream_handler.to_index_case(key))
267
268    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
269        """Set the item with the given key to the given value."""
270        index_case_key = self._stream_handler.to_index_case(key)
271        if (
272            self._stream_handler.prune_extra_fields
273            and index_case_key not in self._stream_handler.index_keys
274        ):
275            return
276
277        super().__setitem__(index_case_key, value)
278
279    def __delitem__(self, key: str) -> None:
280        """Delete the item with the given key."""
281        try:
282            super().__delitem__(key)
283        except KeyError:
284            index_case_key = self._stream_handler.to_index_case(key)
285            if super().__contains__(index_case_key):
286                super().__delitem__(index_case_key)
287                return
288        else:
289            # No failure. Key was deleted.
290            return
291
292        raise KeyError(key)
293
294    def __contains__(self, key: object) -> bool:
295        """Return whether the dictionary contains the given key."""
296        assert isinstance(key, str), "Key must be a string."
297        return super().__contains__(key) or super().__contains__(
298            self._stream_handler.to_index_case(key)
299        )
300
301    def __iter__(self) -> Iterator[str]:
302        """Return an iterator over the keys of the dictionary."""
303        return iter(super().__iter__())
304
305    def __len__(self) -> int:
306        """Return the number of items in the dictionary."""
307        return super().__len__()
308
309    def __eq__(self, other: object) -> bool:
310        """Return whether the StreamRecord is equal to the given dict or StreamRecord object."""
311        if isinstance(other, StreamRecord):
312            return dict(self) == dict(other)
313
314        if isinstance(other, dict):
315            return {k.lower(): v for k, v in self.items()} == {
316                k.lower(): v for k, v in other.items()
317            }
318        return False
319
320    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
321        """Return the hash of the dictionary with keys sorted."""
322        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
323        return hash(tuple(sorted(items)))

The StreamRecord class is a case-aware, case-insensitive dictionary implementation.

It has these behaviors:

  • When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
  • The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
  • Because it is subclassed from dict, the StreamRecord class can be passed as a normal Python dictionary.
  • In addition to the properties of the stream's records, the dictionary also stores the Airbyte metadata columns: _airbyte_raw_id, _airbyte_extracted_at, and _airbyte_meta.

This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.

There are two ways this class can store keys internally:

  • If normalize_keys is True, the keys are normalized using the given normalizer.
  • If normalize_keys is False, the original case of the keys is stored.

In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.

@classmethod
def from_record_message( cls, record_message: airbyte_protocol.models.airbyte_protocol.AirbyteRecordMessage, *, stream_record_handler: airbyte.records.StreamRecordHandler) -> StreamRecord:
245    @classmethod
246    def from_record_message(
247        cls,
248        record_message: AirbyteRecordMessage,
249        *,
250        stream_record_handler: StreamRecordHandler,
251    ) -> StreamRecord:
252        """Return a StreamRecord from a RecordMessage."""
253        data_dict: dict[str, Any] = record_message.data.copy()
254        return cls(
255            from_dict=data_dict,
256            stream_record_handler=stream_record_handler,
257            with_internal_columns=True,
258            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
259        )

Return a StreamRecord from a RecordMessage.

Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class WriteResult:
104class WriteResult:
105    """The result of a write operation.
106
107    This class is used to return information about the write operation, such as the number of
108    records written. It should not be created directly, but instead returned by the write method
109    of a destination.
110    """
111
112    def __init__(
113        self,
114        *,
115        destination: AirbyteWriterInterface | Destination,
116        source_data: Source | ReadResult,
117        catalog_provider: CatalogProvider,
118        state_writer: StateWriterBase,
119        progress_tracker: ProgressTracker,
120    ) -> None:
121        """Initialize a write result.
122
123        This class should not be created directly. Instead, it should be returned by the `write`
124        method of the `Destination` class.
125        """
126        self._destination: AirbyteWriterInterface | Destination = destination
127        self._source_data: Source | ReadResult = source_data
128        self._catalog_provider: CatalogProvider = catalog_provider
129        self._state_writer: StateWriterBase = state_writer
130        self._progress_tracker: ProgressTracker = progress_tracker
131
132    @property
133    def processed_records(self) -> int:
134        """The total number of records written to the destination."""
135        return self._progress_tracker.total_destination_records_delivered
136
137    def get_state_provider(self) -> StateProviderBase:
138        """Return the state writer as a state provider.
139
140        As a public interface, we only expose the state writer as a state provider. This is because
141        the state writer itself is only intended for internal use. As a state provider, the state
142        writer can be used to read the state artifacts that were written. This can be useful for
143        testing or debugging.
144        """
145        return self._state_writer

The result of a write operation.

This class is used to return information about the write operation, such as the number of records written. It should not be created directly, but instead returned by the write method of a destination.

WriteResult( *, destination: airbyte._writers.base.AirbyteWriterInterface | Destination, source_data: Source | ReadResult, catalog_provider: airbyte.shared.catalog_providers.CatalogProvider, state_writer: airbyte.shared.state_writers.StateWriterBase, progress_tracker: airbyte.progress.ProgressTracker)
112    def __init__(
113        self,
114        *,
115        destination: AirbyteWriterInterface | Destination,
116        source_data: Source | ReadResult,
117        catalog_provider: CatalogProvider,
118        state_writer: StateWriterBase,
119        progress_tracker: ProgressTracker,
120    ) -> None:
121        """Initialize a write result.
122
123        This class should not be created directly. Instead, it should be returned by the `write`
124        method of the `Destination` class.
125        """
126        self._destination: AirbyteWriterInterface | Destination = destination
127        self._source_data: Source | ReadResult = source_data
128        self._catalog_provider: CatalogProvider = catalog_provider
129        self._state_writer: StateWriterBase = state_writer
130        self._progress_tracker: ProgressTracker = progress_tracker

Initialize a write result.

This class should not be created directly. Instead, it should be returned by the write method of the Destination class.

processed_records: int
132    @property
133    def processed_records(self) -> int:
134        """The total number of records written to the destination."""
135        return self._progress_tracker.total_destination_records_delivered

The total number of records written to the destination.

def get_state_provider(self) -> airbyte.shared.state_providers.StateProviderBase:
137    def get_state_provider(self) -> StateProviderBase:
138        """Return the state writer as a state provider.
139
140        As a public interface, we only expose the state writer as a state provider. This is because
141        the state writer itself is only intended for internal use. As a state provider, the state
142        writer can be used to read the state artifacts that were written. This can be useful for
143        testing or debugging.
144        """
145        return self._state_writer

Return the state writer as a state provider.

As a public interface, we only expose the state writer as a state provider. This is because the state writer itself is only intended for internal use. As a state provider, the state writer can be used to read the state artifacts that were written. This can be useful for testing or debugging.