airbyte

PyAirbyte brings the power of Airbyte to every Python developer.

PyPI version PyPI - Downloads PyPI - Python Version Star on GitHub

Getting Started

Reading Data

You can connect to any of hundreds of sources using the get_source method. You can then read data from sources using Source.read method.

from airbyte import get_source

source = get_source(
    "source-faker",
    config={},
)
read_result = source.read()

for record in read_result["users"]airbyte.records:
    print(record)

For more information, see the airbyte.sources module.

Writing to SQL Caches

Data can be written to caches using a number of SQL-based cache implementations, including Postgres, BigQuery, Snowflake, DuckDB, and MotherDuck. If you do not specify a cache, PyAirbyte will automatically use a local DuckDB cache by default.

For more information, see the airbyte.caches module.

Writing to Destination Connectors

Data can be written to destinations using the Destination.write method. You can connect to destinations using the get_destination method. PyAirbyte supports all Airbyte destinations, but Docker is required on your machine in order to run Java-based destinations.

Note: When loading to a SQL database, we recommend using SQL cache (where available, see above) instead of a destination connector. This is because SQL caches are Python-native and therefor more portable when run from different Python-based environments which might not have Docker container support. Destinations in PyAirbyte are uniquely suited for loading to non-SQL platforms such as vector stores and other reverse ETL-type use cases.

For more information, see the airbyte.destinations module and the full list of destination connectors here.

PyAirbyte API

Importing as ab

Most examples in the PyAirbyte documentation use the import airbyte as ab convention. The ab alias is recommended, making code more concise and readable. When getting started, this also saves you from digging in submodules to find the classes and functions you need, since frequently-used classes and functions are available at the top level of the airbyte module.

While many PyAirbyte classes and functions are available at the top level of the airbyte module, you can also import classes and functions from submodules directly. For example, while you can import the Source class from airbyte, you can also import it from the sources submodule like this:

from airbyte.sources import Source

Whether you import from the top level or from a submodule, the classes and functions are the same. We expect that most users will import from the top level when getting started, and then import from submodules when they are deploying more complex implementations.

For quick reference, top-Level modules are listed in the left sidebar of this page.

Other Resources


API Reference

Below is a list of all classes, functions, and modules available in the top-level airbyte module. (This is a long list!) If you are just starting out, we recommend beginning by selecting a submodule to navigate to from the left sidebar or from the list below:

Each module has its own documentation and code samples related to effectively using the related capabilities.


  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""***PyAirbyte brings the power of Airbyte to every Python developer.***
  3
  4[![PyPI version](https://badge.fury.io/py/airbyte.svg)](https://badge.fury.io/py/airbyte)
  5[![PyPI - Downloads](https://img.shields.io/pypi/dm/airbyte)](https://pypi.org/project/airbyte/)
  6[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/airbyte)](https://pypi.org/project/airbyte/)
  7[![Star on GitHub](https://img.shields.io/github/stars/airbytehq/pyairbyte.svg?style=social&label=★%20on%20GitHub)](https://github.com/airbytehq/pyairbyte)
  8
  9# Getting Started
 10
 11## Reading Data
 12
 13You can connect to any of [hundreds of sources](https://docs.airbyte.com/integrations/sources/)
 14using the `get_source` method. You can then read data from sources using `Source.read` method.
 15
 16```python
 17from airbyte import get_source
 18
 19source = get_source(
 20    "source-faker",
 21    config={},
 22)
 23read_result = source.read()
 24
 25for record in read_result["users"].records:
 26    print(record)
 27```
 28
 29For more information, see the `airbyte.sources` module.
 30
 31## Writing to SQL Caches
 32
 33Data can be written to caches using a number of SQL-based cache implementations, including
 34Postgres, BigQuery, Snowflake, DuckDB, and MotherDuck. If you do not specify a cache, PyAirbyte
 35will automatically use a local DuckDB cache by default.
 36
 37For more information, see the `airbyte.caches` module.
 38
 39## Writing to Destination Connectors
 40
 41Data can be written to destinations using the `Destination.write` method. You can connect to
 42destinations using the `get_destination` method. PyAirbyte supports all Airbyte destinations, but
 43Docker is required on your machine in order to run Java-based destinations.
 44
 45**Note:** When loading to a SQL database, we recommend using SQL cache (where available,
 46[see above](#writing-to-sql-caches)) instead of a destination connector. This is because SQL caches
 47are Python-native and therefor more portable when run from different Python-based environments which
 48might not have Docker container support. Destinations in PyAirbyte are uniquely suited for loading
 49to non-SQL platforms such as vector stores and other reverse ETL-type use cases.
 50
 51For more information, see the `airbyte.destinations` module and the full list of destination
 52connectors [here](https://docs.airbyte.com/integrations/destinations/).
 53
 54# PyAirbyte API
 55
 56## Importing as `ab`
 57
 58Most examples in the PyAirbyte documentation use the `import airbyte as ab` convention. The `ab`
 59alias is recommended, making code more concise and readable. When getting started, this
 60also saves you from digging in submodules to find the classes and functions you need, since
 61frequently-used classes and functions are available at the top level of the `airbyte` module.
 62
 63## Navigating the API
 64
 65While many PyAirbyte classes and functions are available at the top level of the `airbyte` module,
 66you can also import classes and functions from submodules directly. For example, while you can
 67import the `Source` class from `airbyte`, you can also import it from the `sources` submodule like
 68this:
 69
 70```python
 71from airbyte.sources import Source
 72```
 73
 74Whether you import from the top level or from a submodule, the classes and functions are the same.
 75We expect that most users will import from the top level when getting started, and then import from
 76submodules when they are deploying more complex implementations.
 77
 78For quick reference, top-Level modules are listed in the left sidebar of this page.
 79
 80# Other Resources
 81
 82- [PyAirbyte GitHub Readme](https://github.com/airbytehq/pyairbyte)
 83- [PyAirbyte Issue Tracker](https://github.com/airbytehq/pyairbyte/issues)
 84- [Frequently Asked Questions](https://github.com/airbytehq/PyAirbyte/blob/main/docs/faq.md)
 85- [PyAirbyte Contributors Guide](https://github.com/airbytehq/PyAirbyte/blob/main/docs/CONTRIBUTING.md)
 86- [GitHub Releases](https://github.com/airbytehq/PyAirbyte/releases)
 87
 88----------------------
 89
 90# API Reference
 91
 92Below is a list of all classes, functions, and modules available in the top-level `airbyte`
 93module. (This is a long list!) If you are just starting out, we recommend beginning by selecting a
 94submodule to navigate to from the left sidebar or from the list below:
 95
 96Each module
 97has its own documentation and code samples related to effectively using the related capabilities.
 98
 99- **`airbyte.cloud`** - Working with Airbyte Cloud, including running jobs remotely.
100- **`airbyte.caches`** - Working with caches, including how to inspect a cache and get data from it.
101- **`airbyte.datasets`** - Working with datasets, including how to read from datasets and convert to
102    other formats, such as Pandas, Arrow, and LLM Document formats.
103- **`airbyte.destinations`** - Working with destinations, including how to write to Airbyte
104    destinations connectors.
105- **`airbyte.documents`** - Working with LLM documents, including how to convert records into
106    document formats, for instance, when working with AI libraries like LangChain.
107- **`airbyte.exceptions`** - Definitions of all exception and warning classes used in PyAirbyte.
108- **`airbyte.experimental`** - Experimental features and utilities that do not yet have a stable
109    API.
110- **`airbyte.logs`** - Logging functionality and configuration.
111- **`airbyte.records`** - Internal record handling classes.
112- **`airbyte.results`** - Documents the classes returned when working with results from
113    `Source.read` and `Destination.write`
114- **`airbyte.secrets`** - Tools for managing secrets in PyAirbyte.
115- **`airbyte.sources`** - Tools for creating and reading from Airbyte sources. This includes
116    `airbyte.source.get_source` to declare a source, `airbyte.source.Source.read` for reading data,
117    and `airbyte.source.Source.get_records()` to peek at records without caching or writing them
118    directly.
119
120----------------------
121
122"""  # noqa: D415
123
124from __future__ import annotations
125
126from airbyte import (
127    caches,
128    callbacks,
129    # cli,  # Causes circular import if included
130    cloud,
131    constants,
132    datasets,
133    destinations,
134    documents,
135    exceptions,  # noqa: ICN001  # No 'exc' alias for top-level module
136    experimental,
137    logs,
138    records,
139    results,
140    secrets,
141    sources,
142)
143from airbyte.caches.bigquery import BigQueryCache
144from airbyte.caches.duckdb import DuckDBCache
145from airbyte.caches.util import get_colab_cache, get_default_cache, new_local_cache
146from airbyte.datasets import CachedDataset
147from airbyte.destinations.base import Destination
148from airbyte.destinations.util import get_destination
149from airbyte.records import StreamRecord
150from airbyte.results import ReadResult, WriteResult
151from airbyte.secrets import SecretSourceEnum, get_secret
152from airbyte.sources import registry
153from airbyte.sources.base import Source
154from airbyte.sources.registry import get_available_connectors
155from airbyte.sources.util import get_source
156
157
158__all__ = [
159    # Modules
160    "caches",
161    "callbacks",
162    # "cli",  # Causes circular import if included
163    "cloud",
164    "constants",
165    "datasets",
166    "destinations",
167    "documents",
168    "exceptions",
169    "experimental",
170    "logs",
171    "records",
172    "registry",
173    "results",
174    "secrets",
175    "sources",
176    # Factories
177    "get_available_connectors",
178    "get_colab_cache",
179    "get_default_cache",
180    "get_destination",
181    "get_secret",
182    "get_source",
183    "new_local_cache",
184    # Classes
185    "BigQueryCache",
186    "CachedDataset",
187    "Destination",
188    "DuckDBCache",
189    "ReadResult",
190    "SecretSourceEnum",
191    "Source",
192    "StreamRecord",
193    "WriteResult",
194]
195
196__docformat__ = "google"
def get_available_connectors( install_type: airbyte.sources.registry.InstallType | str | None = None) -> list[str]:
311def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
312    """Return a list of all available connectors.
313
314    Connectors will be returned in alphabetical order, with the standard prefix "source-".
315    """
316    if install_type is None:
317        # No install type specified. Filter for whatever is runnable.
318        if is_docker_installed():
319            logging.info("Docker is detected. Returning all connectors.")
320            # If Docker is available, return all connectors.
321            return sorted(conn.name for conn in _get_registry_cache().values())
322
323        logging.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
324
325        # If Docker is not available, return only Python and Manifest-based connectors.
326        return sorted(
327            conn.name
328            for conn in _get_registry_cache().values()
329            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
330        )
331
332    if not isinstance(install_type, InstallType):
333        install_type = InstallType(install_type)
334
335    if install_type == InstallType.PYTHON:
336        return sorted(
337            conn.name
338            for conn in _get_registry_cache().values()
339            if conn.pypi_package_name is not None
340        )
341
342    if install_type == InstallType.JAVA:
343        warnings.warn(
344            message="Java connectors are not yet supported.",
345            stacklevel=2,
346        )
347        return sorted(
348            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
349        )
350
351    if install_type == InstallType.DOCKER:
352        return sorted(conn.name for conn in _get_registry_cache().values())
353
354    if install_type == InstallType.YAML:
355        return sorted(
356            conn.name
357            for conn in _get_registry_cache().values()
358            if InstallType.YAML in conn.install_types
359            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
360        )
361
362    # pragma: no cover  # Should never be reached.
363    raise exc.PyAirbyteInputError(
364        message="Invalid install type.",
365        context={
366            "install_type": install_type,
367        },
368    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

def get_colab_cache( cache_name: str = 'default_cache', sub_dir: str = 'Airbyte/cache', schema_name: str = 'main', table_prefix: str | None = '', drive_name: str = 'MyDrive', mount_path: str = '/content/drive') -> DuckDBCache:
 81def get_colab_cache(
 82    cache_name: str = "default_cache",
 83    sub_dir: str = "Airbyte/cache",
 84    schema_name: str = "main",
 85    table_prefix: str | None = "",
 86    drive_name: str = _MY_DRIVE,
 87    mount_path: str = _GOOGLE_DRIVE_DEFAULT_MOUNT_PATH,
 88) -> DuckDBCache:
 89    """Get a local cache for storing data, using the default database path.
 90
 91    Unlike the default `DuckDBCache`, this implementation will easily persist data across multiple
 92    Colab sessions.
 93
 94    Please note that Google Colab may prompt you to authenticate with your Google account to access
 95    your Google Drive. When prompted, click the link and follow the instructions.
 96
 97    Colab will require access to read and write files in your Google Drive, so please be sure to
 98    grant the necessary permissions when prompted.
 99
100    All arguments are optional and have default values that are suitable for most use cases.
101
102    Args:
103        cache_name: The name to use for the cache. Defaults to "colab_cache". Override this if you
104            want to use a different database for different projects.
105        sub_dir: The subdirectory to store the cache in. Defaults to "Airbyte/cache". Override this
106            if you want to store the cache in a different subdirectory than the default.
107        schema_name: The name of the schema to write to. Defaults to "main". Override this if you
108            want to write to a different schema.
109        table_prefix: The prefix to use for all tables in the cache. Defaults to "". Override this
110            if you want to use a different prefix for all tables.
111        drive_name: The name of the Google Drive to use. Defaults to "MyDrive". Override this if you
112            want to store data in a shared drive instead of your personal drive.
113        mount_path: The path to mount Google Drive to. Defaults to "/content/drive". Override this
114            if you want to mount Google Drive to a different path (not recommended).
115
116    ## Usage Examples
117
118    The default `get_colab_cache` arguments are suitable for most use cases:
119
120    ```python
121    from airbyte.caches.colab import get_colab_cache
122
123    colab_cache = get_colab_cache()
124    ```
125
126    Or you can call `get_colab_cache` with custom arguments:
127
128    ```python
129    custom_cache = get_colab_cache(
130        cache_name="my_custom_cache",
131        sub_dir="Airbyte/custom_cache",
132        drive_name="My Company Drive",
133    )
134    ```
135    """
136    try:
137        from google.colab import drive  # noqa: PLC0415 # type: ignore[reportMissingImports]
138    except ImportError:
139        drive = None
140        msg = (
141            "The `google.colab` interface is only available in Google Colab. "
142            "Please run this code in a Google Colab notebook."
143        )
144        raise ImportError(msg) from None
145
146    drive.mount(mount_path)
147    drive_root = (
148        Path(mount_path) / drive_name
149        if drive_name == _MY_DRIVE
150        else Path(mount_path) / "Shareddrives" / drive_name
151    )
152
153    cache_dir = drive_root / sub_dir
154    cache_dir.mkdir(parents=True, exist_ok=True)
155    db_file_path = cache_dir / f"{cache_name}.duckdb"
156
157    print(f"Using persistent PyAirbyte cache in Google Drive: `{db_file_path}`.")
158    return DuckDBCache(
159        db_path=db_file_path,
160        cache_dir=cache_dir,
161        schema_name=schema_name,
162        table_prefix=table_prefix,
163    )

Get a local cache for storing data, using the default database path.

Unlike the default DuckDBCache, this implementation will easily persist data across multiple Colab sessions.

Please note that Google Colab may prompt you to authenticate with your Google account to access your Google Drive. When prompted, click the link and follow the instructions.

Colab will require access to read and write files in your Google Drive, so please be sure to grant the necessary permissions when prompted.

All arguments are optional and have default values that are suitable for most use cases.

Arguments:
  • cache_name: The name to use for the cache. Defaults to "colab_cache". Override this if you want to use a different database for different projects.
  • sub_dir: The subdirectory to store the cache in. Defaults to "Airbyte/cache". Override this if you want to store the cache in a different subdirectory than the default.
  • schema_name: The name of the schema to write to. Defaults to "main". Override this if you want to write to a different schema.
  • table_prefix: The prefix to use for all tables in the cache. Defaults to "". Override this if you want to use a different prefix for all tables.
  • drive_name: The name of the Google Drive to use. Defaults to "MyDrive". Override this if you want to store data in a shared drive instead of your personal drive.
  • mount_path: The path to mount Google Drive to. Defaults to "/content/drive". Override this if you want to mount Google Drive to a different path (not recommended).

Usage Examples

The default get_colab_cache arguments are suitable for most use cases:

from airbyte.caches.colab import get_colab_cache

colab_cache = get_colab_cache()

Or you can call get_colab_cache with custom arguments:

custom_cache = get_colab_cache(
    cache_name="my_custom_cache",
    sub_dir="Airbyte/custom_cache",
    drive_name="My Company Drive",
)
def get_default_cache() -> DuckDBCache:
27def get_default_cache() -> DuckDBCache:
28    """Get a local cache for storing data, using the default database path.
29
30    Cache files are stored in the `.cache` directory, relative to the current
31    working directory.
32    """
33    cache_dir = Path("./.cache/default_cache")
34    return DuckDBCache(
35        db_path=cache_dir / "default_cache.duckdb",
36        cache_dir=cache_dir,
37    )

Get a local cache for storing data, using the default database path.

Cache files are stored in the .cache directory, relative to the current working directory.

def get_destination( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: str | bool | None = None, use_host_network: bool = False, install_if_missing: bool = True) -> Destination:
22def get_destination(  # noqa: PLR0913 # Too many arguments
23    name: str,
24    config: dict[str, Any] | None = None,
25    *,
26    config_change_callback: ConfigChangeCallback | None = None,
27    version: str | None = None,
28    pip_url: str | None = None,
29    local_executable: Path | str | None = None,
30    docker_image: str | bool | None = None,
31    use_host_network: bool = False,
32    install_if_missing: bool = True,
33) -> Destination:
34    """Get a connector by name and version.
35
36    Args:
37        name: connector name
38        config: connector config - if not provided, you need to set it later via the set_config
39            method.
40        config_change_callback: callback function to be called when the connector config changes.
41        streams: list of stream names to select for reading. If set to "*", all streams will be
42            selected. If not provided, you can set it later via the `select_streams()` or
43            `select_all_streams()` method.
44        version: connector version - if not provided, the currently installed version will be used.
45            If no version is installed, the latest available version will be used. The version can
46            also be set to "latest" to force the use of the latest available version.
47        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
48            connector name.
49        local_executable: If set, the connector will be assumed to already be installed and will be
50            executed using this path or executable name. Otherwise, the connector will be installed
51            automatically in a virtual environment.
52        docker_image: If set, the connector will be executed using Docker. You can specify `True`
53            to use the default image for the connector, or you can specify a custom image name.
54            If `version` is specified and your image name does not already contain a tag
55            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
56        use_host_network: If set, along with docker_image, the connector will be executed using
57            the host network. This is useful for connectors that need to access resources on
58            the host machine, such as a local database. This parameter is ignored when
59            `docker_image` is not set.
60        install_if_missing: Whether to install the connector if it is not available locally. This
61            parameter is ignored when local_executable is set.
62    """
63    return Destination(
64        name=name,
65        config=config,
66        config_change_callback=config_change_callback,
67        executor=get_connector_executor(
68            name=name,
69            version=version,
70            pip_url=pip_url,
71            local_executable=local_executable,
72            docker_image=docker_image,
73            use_host_network=use_host_network,
74            install_if_missing=install_if_missing,
75        ),
76    )

Get a connector by name and version.

Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
def get_secret( secret_name: str, /, *, sources: list[airbyte.secrets.SecretManager | SecretSourceEnum] | None = None, allow_prompt: bool = True, **kwargs: dict[str, typing.Any]) -> airbyte.secrets.SecretString:
15def get_secret(
16    secret_name: str,
17    /,
18    *,
19    sources: list[SecretManager | SecretSourceEnum] | None = None,
20    allow_prompt: bool = True,
21    **kwargs: dict[str, Any],
22) -> SecretString:
23    """Get a secret from the environment.
24
25    The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum`
26    options. If left blank, all available sources will be checked. If a list of `SecretSourceEnum`
27    entries is passed, then the sources will be checked using the provided ordering.
28
29    If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then
30    the user will be prompted to enter the secret if it is not found in any of the other sources.
31    """
32    if "source" in kwargs:
33        warnings.warn(
34            message="The `source` argument is deprecated. Use the `sources` argument instead.",
35            category=DeprecationWarning,
36            stacklevel=2,
37        )
38        sources = kwargs.pop("source")  # type: ignore [assignment]
39
40    available_sources: dict[str, SecretManager] = {}
41    for available_source in _get_secret_sources():
42        # Add available sources to the dict. Order matters.
43        available_sources[available_source.name] = available_source
44
45    if sources is None:
46        # If ANY is in the list, then we don't need to check any other sources.
47        # This is the default behavior.
48        sources = list(available_sources.values())
49
50    elif not isinstance(sources, list):
51        sources = [sources]  # type: ignore [unreachable]  # This is a 'just in case' catch.
52
53    # Replace any SecretSourceEnum strings with the matching SecretManager object
54    for source in list(sources):
55        if isinstance(source, SecretSourceEnum):
56            if source not in available_sources:
57                raise exc.PyAirbyteInputError(
58                    guidance="Invalid secret source name.",
59                    input_value=source,
60                    context={
61                        "Available Sources": list(available_sources.keys()),
62                    },
63                )
64
65            sources[sources.index(source)] = available_sources[source]
66
67    secret_managers = cast(list[SecretManager], sources)
68
69    if SecretSourceEnum.PROMPT in secret_managers:
70        prompt_source = secret_managers.pop(
71            # Mis-typed, but okay here since we have equality logic for the enum comparison:
72            secret_managers.index(SecretSourceEnum.PROMPT),  # type: ignore [arg-type]
73        )
74
75        if allow_prompt:
76            # Always check prompt last. Add it to the end of the list.
77            secret_managers.append(prompt_source)
78
79    for secret_mgr in secret_managers:
80        val = secret_mgr.get_secret(secret_name)
81        if val:
82            return SecretString(val)
83
84    raise exc.PyAirbyteSecretNotFoundError(
85        secret_name=secret_name,
86        sources=[str(s) for s in available_sources],
87    )

Get a secret from the environment.

The optional sources argument of enum type SecretSourceEnum or list of SecretSourceEnum options. If left blank, all available sources will be checked. If a list of SecretSourceEnum entries is passed, then the sources will be checked using the provided ordering.

If allow_prompt is True or if SecretSourceEnum.PROMPT is declared in the source arg, then the user will be prompted to enter the secret if it is not found in any of the other sources.

def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    pip_url: str | None = None,
 56    local_executable: Path | str | None = None,
 57    docker_image: bool | str | None = None,
 58    use_host_network: bool = False,
 59    source_manifest: bool | dict | Path | str | None = None,
 60    install_if_missing: bool = True,
 61    install_root: Path | None = None,
 62) -> Source:
 63    """Get a connector by name and version.
 64
 65    If an explicit install or execution method is requested (e.g. `local_executable`,
 66    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 67
 68    Otherwise, an appropriate method will be selected based on the available connector metadata:
 69    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 70       will be downloaded and used to to execute the connector.
 71    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 72    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 73       will be executed using Docker.
 74
 75    Args:
 76        name: connector name
 77        config: connector config - if not provided, you need to set it later via the set_config
 78            method.
 79        config_change_callback: callback function to be called when the connector config changes.
 80        streams: list of stream names to select for reading. If set to "*", all streams will be
 81            selected. If not provided, you can set it later via the `select_streams()` or
 82            `select_all_streams()` method.
 83        version: connector version - if not provided, the currently installed version will be used.
 84            If no version is installed, the latest available version will be used. The version can
 85            also be set to "latest" to force the use of the latest available version.
 86        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 87            connector name.
 88        local_executable: If set, the connector will be assumed to already be installed and will be
 89            executed using this path or executable name. Otherwise, the connector will be installed
 90            automatically in a virtual environment.
 91        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 92            to use the default image for the connector, or you can specify a custom image name.
 93            If `version` is specified and your image name does not already contain a tag
 94            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 95        use_host_network: If set, along with docker_image, the connector will be executed using
 96            the host network. This is useful for connectors that need to access resources on
 97            the host machine, such as a local database. This parameter is ignored when
 98            `docker_image` is not set.
 99        source_manifest: If set, the connector will be executed based on a declarative YAML
100            source definition. This input can be `True` to attempt to auto-download a YAML spec,
101            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
102            the local file system, or `str` to pull the definition from a web URL.
103        install_if_missing: Whether to install the connector if it is not available locally. This
104            parameter is ignored when `local_executable` or `source_manifest` are set.
105        install_root: (Optional.) The root directory where the virtual environment will be
106            created. If not provided, the current working directory will be used.
107    """
108    return Source(
109        name=name,
110        config=config,
111        config_change_callback=config_change_callback,
112        streams=streams,
113        executor=get_connector_executor(
114            name=name,
115            version=version,
116            pip_url=pip_url,
117            local_executable=local_executable,
118            docker_image=docker_image,
119            use_host_network=use_host_network,
120            source_manifest=source_manifest,
121            install_if_missing=install_if_missing,
122            install_root=install_root,
123        ),
124    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def new_local_cache( cache_name: str | None = None, cache_dir: str | pathlib.Path | None = None, *, cleanup: bool = True) -> DuckDBCache:
40def new_local_cache(
41    cache_name: str | None = None,
42    cache_dir: str | Path | None = None,
43    *,
44    cleanup: bool = True,
45) -> DuckDBCache:
46    """Get a local cache for storing data, using a name string to seed the path.
47
48    Args:
49        cache_name: Name to use for the cache. Defaults to None.
50        cache_dir: Root directory to store the cache in. Defaults to None.
51        cleanup: Whether to clean up temporary files. Defaults to True.
52
53    Cache files are stored in the `.cache` directory, relative to the current
54    working directory.
55    """
56    if cache_name:
57        if " " in cache_name:
58            raise exc.PyAirbyteInputError(
59                message="Cache name cannot contain spaces.",
60                input_value=cache_name,
61            )
62
63        if not cache_name.replace("_", "").isalnum():
64            raise exc.PyAirbyteInputError(
65                message="Cache name can only contain alphanumeric characters and underscores.",
66                input_value=cache_name,
67            )
68
69    cache_name = cache_name or str(ulid.ULID())
70    cache_dir = cache_dir or Path(f"./.cache/{cache_name}")
71    if not isinstance(cache_dir, Path):
72        cache_dir = Path(cache_dir)
73
74    return DuckDBCache(
75        db_path=cache_dir / f"db_{cache_name}.duckdb",
76        cache_dir=cache_dir,
77        cleanup=cleanup,
78    )

Get a local cache for storing data, using a name string to seed the path.

Arguments:
  • cache_name: Name to use for the cache. Defaults to None.
  • cache_dir: Root directory to store the cache in. Defaults to None.
  • cleanup: Whether to clean up temporary files. Defaults to True.

Cache files are stored in the .cache directory, relative to the current working directory.

class BigQueryCache(airbyte._processors.sql.bigquery.BigQueryConfig, airbyte.caches.base.CacheBase):
32class BigQueryCache(BigQueryConfig, CacheBase):
33    """The BigQuery cache implementation."""
34
35    _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor)
36
37    def get_arrow_dataset(
38        self,
39        stream_name: str,
40        *,
41        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
42    ) -> NoReturn:
43        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
44
45        See: https://github.com/airbytehq/PyAirbyte/issues/165
46        """
47        raise NotImplementedError(
48            "BigQuery doesn't currently support to_arrow"
49            "Please consider using a different cache implementation for these functionalities."
50        )

The BigQuery cache implementation.

def get_arrow_dataset(self, stream_name: str, *, max_chunk_size: int = 100000) -> NoReturn:
37    def get_arrow_dataset(
38        self,
39        stream_name: str,
40        *,
41        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
42    ) -> NoReturn:
43        """Raises NotImplementedError; BigQuery doesn't support `pd.read_sql_table`.
44
45        See: https://github.com/airbytehq/PyAirbyte/issues/165
46        """
47        raise NotImplementedError(
48            "BigQuery doesn't currently support to_arrow"
49            "Please consider using a different cache implementation for these functionalities."
50        )

Raises NotImplementedError; BigQuery doesn't support pd.read_sql_table.

See: https://github.com/airbytehq/PyAirbyte/issues/165

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='airbyte_raw', alias='dataset_name', alias_priority=2), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'database_name': FieldInfo(annotation=str, required=True, alias='project_name', alias_priority=2), 'credentials_path': FieldInfo(annotation=Union[str, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.bigquery.BigQueryConfig
database_name
schema_name
credentials_path
project_name
dataset_name
get_sql_alchemy_url
get_database_name
get_vendor_client
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_sql_engine
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class CachedDataset(airbyte.datasets._sql.SQLDataset):
140class CachedDataset(SQLDataset):
141    """A dataset backed by a SQL table cache.
142
143    Because this dataset includes all records from the underlying table, we also expose the
144    underlying table as a SQLAlchemy Table object.
145    """
146
147    def __init__(
148        self,
149        cache: CacheBase,
150        stream_name: str,
151        stream_configuration: ConfiguredAirbyteStream | None | Literal[False] = None,
152    ) -> None:
153        """We construct the query statement by selecting all columns from the table.
154
155        This prevents the need to scan the table schema to construct the query statement.
156
157        If stream_configuration is None, we attempt to retrieve the stream configuration from the
158        cache processor. This is useful when constructing a dataset from a CachedDataset object,
159        which already has the stream configuration.
160
161        If stream_configuration is set to False, we skip the stream configuration retrieval.
162        """
163        table_name = cache.processor.get_sql_table_name(stream_name)
164        schema_name = cache.schema_name
165        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
166        super().__init__(
167            cache=cache,
168            stream_name=stream_name,
169            query_statement=query,
170            stream_configuration=stream_configuration,
171        )
172
173    @overrides
174    def to_pandas(self) -> DataFrame:
175        """Return the underlying dataset data as a pandas DataFrame."""
176        return self._cache.get_pandas_dataframe(self._stream_name)
177
178    @overrides
179    def to_arrow(
180        self,
181        *,
182        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
183    ) -> Dataset:
184        """Return an Arrow Dataset containing the data from the specified stream.
185
186        Args:
187            stream_name (str): Name of the stream to retrieve data from.
188            max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
189
190        Returns:
191            pa.dataset.Dataset: Arrow Dataset containing the stream's data.
192        """
193        return self._cache.get_arrow_dataset(
194            stream_name=self._stream_name,
195            max_chunk_size=max_chunk_size,
196        )
197
198    def to_sql_table(self) -> Table:
199        """Return the underlying SQL table as a SQLAlchemy Table object."""
200        return self._cache.processor.get_sql_table(self.stream_name)
201
202    def __eq__(self, value: object) -> bool:
203        """Return True if the value is a CachedDataset with the same cache and stream name.
204
205        In the case of CachedDataset objects, we can simply compare the cache and stream name.
206
207        Note that this equality check is only supported on CachedDataset objects and not for
208        the base SQLDataset implementation. This is because of the complexity and computational
209        cost of comparing two arbitrary SQL queries that could be bound to different variables,
210        as well as the chance that two queries can be syntactically equivalent without being
211        text-wise equivalent.
212        """
213        if not isinstance(value, SQLDataset):
214            return False
215
216        if self._cache is not value._cache:
217            return False
218
219        return not self._stream_name != value._stream_name
220
221    def __hash__(self) -> int:
222        return hash(self._stream_name)

A dataset backed by a SQL table cache.

Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.

CachedDataset( cache: airbyte.caches.CacheBase, stream_name: str, stream_configuration: Union[airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream, NoneType, Literal[False]] = None)
147    def __init__(
148        self,
149        cache: CacheBase,
150        stream_name: str,
151        stream_configuration: ConfiguredAirbyteStream | None | Literal[False] = None,
152    ) -> None:
153        """We construct the query statement by selecting all columns from the table.
154
155        This prevents the need to scan the table schema to construct the query statement.
156
157        If stream_configuration is None, we attempt to retrieve the stream configuration from the
158        cache processor. This is useful when constructing a dataset from a CachedDataset object,
159        which already has the stream configuration.
160
161        If stream_configuration is set to False, we skip the stream configuration retrieval.
162        """
163        table_name = cache.processor.get_sql_table_name(stream_name)
164        schema_name = cache.schema_name
165        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
166        super().__init__(
167            cache=cache,
168            stream_name=stream_name,
169            query_statement=query,
170            stream_configuration=stream_configuration,
171        )

We construct the query statement by selecting all columns from the table.

This prevents the need to scan the table schema to construct the query statement.

If stream_configuration is None, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.

If stream_configuration is set to False, we skip the stream configuration retrieval.

@overrides
def to_pandas(self) -> pandas.core.frame.DataFrame:
173    @overrides
174    def to_pandas(self) -> DataFrame:
175        """Return the underlying dataset data as a pandas DataFrame."""
176        return self._cache.get_pandas_dataframe(self._stream_name)

Return the underlying dataset data as a pandas DataFrame.

@overrides
def to_arrow(self, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
178    @overrides
179    def to_arrow(
180        self,
181        *,
182        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
183    ) -> Dataset:
184        """Return an Arrow Dataset containing the data from the specified stream.
185
186        Args:
187            stream_name (str): Name of the stream to retrieve data from.
188            max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
189
190        Returns:
191            pa.dataset.Dataset: Arrow Dataset containing the stream's data.
192        """
193        return self._cache.get_arrow_dataset(
194            stream_name=self._stream_name,
195            max_chunk_size=max_chunk_size,
196        )

Return an Arrow Dataset containing the data from the specified stream.

Arguments:
  • stream_name (str): Name of the stream to retrieve data from.
  • max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
Returns:

pa.dataset.Dataset: Arrow Dataset containing the stream's data.

def to_sql_table(self) -> sqlalchemy.sql.schema.Table:
198    def to_sql_table(self) -> Table:
199        """Return the underlying SQL table as a SQLAlchemy Table object."""
200        return self._cache.processor.get_sql_table(self.stream_name)

Return the underlying SQL table as a SQLAlchemy Table object.

Inherited Members
airbyte.datasets._sql.SQLDataset
stream_name
with_filter
airbyte.datasets._base.DatasetBase
to_documents
class Destination(airbyte._connector_base.ConnectorBase, airbyte._writers.base.AirbyteWriterInterface):
 43class Destination(ConnectorBase, AirbyteWriterInterface):
 44    """A class representing a destination that can be called."""
 45
 46    connector_type: Literal["destination"] = "destination"
 47
 48    def __init__(
 49        self,
 50        executor: Executor,
 51        name: str,
 52        config: dict[str, Any] | None = None,
 53        *,
 54        config_change_callback: ConfigChangeCallback | None = None,
 55        validate: bool = False,
 56    ) -> None:
 57        """Initialize the source.
 58
 59        If config is provided, it will be validated against the spec if validate is True.
 60        """
 61        super().__init__(
 62            executor=executor,
 63            name=name,
 64            config=config,
 65            config_change_callback=config_change_callback,
 66            validate=validate,
 67        )
 68
 69    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 70        self,
 71        source_data: Source | ReadResult,
 72        *,
 73        streams: list[str] | Literal["*"] | None = None,
 74        cache: CacheBase | None | Literal[False] = None,
 75        state_cache: CacheBase | None | Literal[False] = None,
 76        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 77        force_full_refresh: bool = False,
 78    ) -> WriteResult:
 79        """Write data from source connector or already cached source data.
 80
 81        Caching is enabled by default, unless explicitly disabled.
 82
 83        Args:
 84            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 85            streams: The streams to write to the destination. If omitted or if "*" is provided,
 86                all streams will be written. If `source_data` is a source, then streams must be
 87                selected here or on the source. If both are specified, this setting will override
 88                the stream selection on the source.
 89            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 90                False, the cache will be disabled. This must be `None` if `source_data` is already
 91                a `Cache` object.
 92            state_cache: A cache to use for storing incremental state. You do not need to set this
 93                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 94                disable state management.
 95            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 96                will decide the best strategy to use.
 97            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 98                existing state will be ignored and all source data will be reloaded.
 99
100        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
101        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
102        a known destination state, the destination-specific state will be used. If neither are
103        available, a full refresh will be performed.
104        """
105        if not isinstance(source_data, ReadResult | Source):
106            raise exc.PyAirbyteInputError(
107                message="Invalid source_data type for `source_data` arg.",
108                context={
109                    "source_data_type_provided": type(source_data).__name__,
110                },
111            )
112
113        # Resolve `source`, `read_result`, and `source_name`
114        source: Source | None = source_data if isinstance(source_data, Source) else None
115        read_result: ReadResult | None = (
116            source_data if isinstance(source_data, ReadResult) else None
117        )
118        source_name: str = source.name if source else cast(ReadResult, read_result).source_name
119
120        # State providers and writers default to no-op, unless overridden below.
121        cache_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the cache's data."""
123        cache_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the cache's data."""
125        destination_state_provider: StateProviderBase = StaticInputState([])
126        """Provides the state of the destination's data, from `cache` or `state_cache`."""
127        destination_state_writer: StateWriterBase = NoOpStateWriter()
128        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
129
130        # If caching not explicitly disabled
131        if cache is not False:
132            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
133            if isinstance(source_data, ReadResult):
134                cache = source_data.cache
135
136            cache = cache or get_default_cache()
137            cache_state_provider = cache.get_state_provider(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141            cache_state_writer = cache.get_state_writer(
142                source_name=source_name,
143                destination_name=None,  # This will just track the cache state
144            )
145
146        # Resolve `state_cache`
147        if state_cache is None:
148            state_cache = cache or get_default_cache()
149
150        # Resolve `destination_state_writer` and `destination_state_provider`
151        if state_cache:
152            destination_state_writer = state_cache.get_state_writer(
153                source_name=source_name,
154                destination_name=self.name,
155            )
156            if not force_full_refresh:
157                destination_state_provider = state_cache.get_state_provider(
158                    source_name=source_name,
159                    destination_name=self.name,
160                )
161        elif state_cache is not False:
162            warnings.warn(
163                "No state backend or cache provided. State will not be tracked."
164                "To track state, provide a cache or state backend."
165                "To silence this warning, set `state_cache=False` explicitly.",
166                category=exc.PyAirbyteWarning,
167                stacklevel=2,
168            )
169
170        # Resolve `catalog_provider`
171        if source:
172            catalog_provider = CatalogProvider(
173                configured_catalog=source.get_configured_catalog(
174                    streams=streams,
175                )
176            )
177        elif read_result:
178            catalog_provider = CatalogProvider.from_read_result(read_result)
179        else:
180            raise exc.PyAirbyteInternalError(
181                message="`source_data` must be a `Source` or `ReadResult` object.",
182            )
183
184        progress_tracker = ProgressTracker(
185            source=source if isinstance(source_data, Source) else None,
186            cache=cache or None,
187            destination=self,
188            expected_streams=catalog_provider.stream_names,
189        )
190
191        source_state_provider: StateProviderBase
192        source_state_provider = JoinedStateProvider(
193            primary=cache_state_provider,
194            secondary=destination_state_provider,
195        )
196
197        if source:
198            if cache is False:
199                # Get message iterator for source (caching disabled)
200                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
201                    streams=streams,
202                    state_provider=source_state_provider,
203                    progress_tracker=progress_tracker,
204                    force_full_refresh=force_full_refresh,
205                )
206            else:
207                # Caching enabled and we are reading from a source.
208                # Read the data to cache if caching is enabled.
209                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
210                    cache=cache,
211                    state_provider=source_state_provider,
212                    state_writer=cache_state_writer,
213                    catalog_provider=catalog_provider,
214                    stream_names=catalog_provider.stream_names,
215                    write_strategy=write_strategy,
216                    force_full_refresh=force_full_refresh,
217                    skip_validation=False,
218                    progress_tracker=progress_tracker,
219                )
220                message_iterator = AirbyteMessageIterator.from_read_result(
221                    read_result=read_result,
222                )
223        else:  # Else we are reading from a read result
224            assert read_result is not None
225            message_iterator = AirbyteMessageIterator.from_read_result(
226                read_result=read_result,
227            )
228
229        # Write the data to the destination
230        try:
231            self._write_airbyte_message_stream(
232                stdin=message_iterator,
233                catalog_provider=catalog_provider,
234                write_strategy=write_strategy,
235                state_writer=destination_state_writer,
236                progress_tracker=progress_tracker,
237            )
238        except Exception as ex:
239            progress_tracker.log_failure(exception=ex)
240            raise
241        else:
242            # No exceptions were raised, so log success
243            progress_tracker.log_success()
244
245        return WriteResult(
246            destination=self,
247            source_data=source_data,
248            catalog_provider=catalog_provider,
249            state_writer=destination_state_writer,
250            progress_tracker=progress_tracker,
251        )
252
253    def _write_airbyte_message_stream(
254        self,
255        stdin: IO[str] | AirbyteMessageIterator,
256        *,
257        catalog_provider: CatalogProvider,
258        write_strategy: WriteStrategy,
259        state_writer: StateWriterBase | None = None,
260        progress_tracker: ProgressTracker,
261    ) -> None:
262        """Read from the connector and write to the cache."""
263        # Run optional validation step
264        if state_writer is None:
265            state_writer = StdOutStateWriter()
266
267        # Apply the write strategy to the catalog provider before sending to the destination
268        catalog_provider = catalog_provider.with_write_strategy(write_strategy)
269
270        with as_temp_files(
271            files_contents=[
272                self._config,
273                catalog_provider.configured_catalog.model_dump_json(),
274            ]
275        ) as [
276            config_file,
277            catalog_file,
278        ]:
279            try:
280                # We call the connector to write the data, tallying the inputs and outputs
281                for destination_message in progress_tracker.tally_confirmed_writes(
282                    messages=self._execute(
283                        args=[
284                            "write",
285                            "--config",
286                            config_file,
287                            "--catalog",
288                            catalog_file,
289                        ],
290                        stdin=AirbyteMessageIterator(
291                            progress_tracker.tally_pending_writes(
292                                stdin,
293                            )
294                        ),
295                    )
296                ):
297                    if destination_message.type is Type.STATE:
298                        state_writer.write_state(state_message=destination_message.state)
299
300            except exc.AirbyteConnectorFailedError as ex:
301                raise exc.AirbyteConnectorWriteError(
302                    connector_name=self.name,
303                    log_text=self._last_log_messages,
304                    original_exception=ex,
305                ) from None

A class representing a destination that can be called.

Destination( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, validate: bool = False)
48    def __init__(
49        self,
50        executor: Executor,
51        name: str,
52        config: dict[str, Any] | None = None,
53        *,
54        config_change_callback: ConfigChangeCallback | None = None,
55        validate: bool = False,
56    ) -> None:
57        """Initialize the source.
58
59        If config is provided, it will be validated against the spec if validate is True.
60        """
61        super().__init__(
62            executor=executor,
63            name=name,
64            config=config,
65            config_change_callback=config_change_callback,
66            validate=validate,
67        )

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type: Literal['destination'] = 'destination'
def write( self, source_data: Source | ReadResult, *, streams: Union[list[str], Literal['*'], NoneType] = None, cache: Union[airbyte.caches.CacheBase, NoneType, Literal[False]] = None, state_cache: Union[airbyte.caches.CacheBase, NoneType, Literal[False]] = None, write_strategy: airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False) -> WriteResult:
 69    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 70        self,
 71        source_data: Source | ReadResult,
 72        *,
 73        streams: list[str] | Literal["*"] | None = None,
 74        cache: CacheBase | None | Literal[False] = None,
 75        state_cache: CacheBase | None | Literal[False] = None,
 76        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 77        force_full_refresh: bool = False,
 78    ) -> WriteResult:
 79        """Write data from source connector or already cached source data.
 80
 81        Caching is enabled by default, unless explicitly disabled.
 82
 83        Args:
 84            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 85            streams: The streams to write to the destination. If omitted or if "*" is provided,
 86                all streams will be written. If `source_data` is a source, then streams must be
 87                selected here or on the source. If both are specified, this setting will override
 88                the stream selection on the source.
 89            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 90                False, the cache will be disabled. This must be `None` if `source_data` is already
 91                a `Cache` object.
 92            state_cache: A cache to use for storing incremental state. You do not need to set this
 93                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 94                disable state management.
 95            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 96                will decide the best strategy to use.
 97            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 98                existing state will be ignored and all source data will be reloaded.
 99
100        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
101        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
102        a known destination state, the destination-specific state will be used. If neither are
103        available, a full refresh will be performed.
104        """
105        if not isinstance(source_data, ReadResult | Source):
106            raise exc.PyAirbyteInputError(
107                message="Invalid source_data type for `source_data` arg.",
108                context={
109                    "source_data_type_provided": type(source_data).__name__,
110                },
111            )
112
113        # Resolve `source`, `read_result`, and `source_name`
114        source: Source | None = source_data if isinstance(source_data, Source) else None
115        read_result: ReadResult | None = (
116            source_data if isinstance(source_data, ReadResult) else None
117        )
118        source_name: str = source.name if source else cast(ReadResult, read_result).source_name
119
120        # State providers and writers default to no-op, unless overridden below.
121        cache_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the cache's data."""
123        cache_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the cache's data."""
125        destination_state_provider: StateProviderBase = StaticInputState([])
126        """Provides the state of the destination's data, from `cache` or `state_cache`."""
127        destination_state_writer: StateWriterBase = NoOpStateWriter()
128        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
129
130        # If caching not explicitly disabled
131        if cache is not False:
132            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
133            if isinstance(source_data, ReadResult):
134                cache = source_data.cache
135
136            cache = cache or get_default_cache()
137            cache_state_provider = cache.get_state_provider(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141            cache_state_writer = cache.get_state_writer(
142                source_name=source_name,
143                destination_name=None,  # This will just track the cache state
144            )
145
146        # Resolve `state_cache`
147        if state_cache is None:
148            state_cache = cache or get_default_cache()
149
150        # Resolve `destination_state_writer` and `destination_state_provider`
151        if state_cache:
152            destination_state_writer = state_cache.get_state_writer(
153                source_name=source_name,
154                destination_name=self.name,
155            )
156            if not force_full_refresh:
157                destination_state_provider = state_cache.get_state_provider(
158                    source_name=source_name,
159                    destination_name=self.name,
160                )
161        elif state_cache is not False:
162            warnings.warn(
163                "No state backend or cache provided. State will not be tracked."
164                "To track state, provide a cache or state backend."
165                "To silence this warning, set `state_cache=False` explicitly.",
166                category=exc.PyAirbyteWarning,
167                stacklevel=2,
168            )
169
170        # Resolve `catalog_provider`
171        if source:
172            catalog_provider = CatalogProvider(
173                configured_catalog=source.get_configured_catalog(
174                    streams=streams,
175                )
176            )
177        elif read_result:
178            catalog_provider = CatalogProvider.from_read_result(read_result)
179        else:
180            raise exc.PyAirbyteInternalError(
181                message="`source_data` must be a `Source` or `ReadResult` object.",
182            )
183
184        progress_tracker = ProgressTracker(
185            source=source if isinstance(source_data, Source) else None,
186            cache=cache or None,
187            destination=self,
188            expected_streams=catalog_provider.stream_names,
189        )
190
191        source_state_provider: StateProviderBase
192        source_state_provider = JoinedStateProvider(
193            primary=cache_state_provider,
194            secondary=destination_state_provider,
195        )
196
197        if source:
198            if cache is False:
199                # Get message iterator for source (caching disabled)
200                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
201                    streams=streams,
202                    state_provider=source_state_provider,
203                    progress_tracker=progress_tracker,
204                    force_full_refresh=force_full_refresh,
205                )
206            else:
207                # Caching enabled and we are reading from a source.
208                # Read the data to cache if caching is enabled.
209                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
210                    cache=cache,
211                    state_provider=source_state_provider,
212                    state_writer=cache_state_writer,
213                    catalog_provider=catalog_provider,
214                    stream_names=catalog_provider.stream_names,
215                    write_strategy=write_strategy,
216                    force_full_refresh=force_full_refresh,
217                    skip_validation=False,
218                    progress_tracker=progress_tracker,
219                )
220                message_iterator = AirbyteMessageIterator.from_read_result(
221                    read_result=read_result,
222                )
223        else:  # Else we are reading from a read result
224            assert read_result is not None
225            message_iterator = AirbyteMessageIterator.from_read_result(
226                read_result=read_result,
227            )
228
229        # Write the data to the destination
230        try:
231            self._write_airbyte_message_stream(
232                stdin=message_iterator,
233                catalog_provider=catalog_provider,
234                write_strategy=write_strategy,
235                state_writer=destination_state_writer,
236                progress_tracker=progress_tracker,
237            )
238        except Exception as ex:
239            progress_tracker.log_failure(exception=ex)
240            raise
241        else:
242            # No exceptions were raised, so log success
243            progress_tracker.log_success()
244
245        return WriteResult(
246            destination=self,
247            source_data=source_data,
248            catalog_provider=catalog_provider,
249            state_writer=destination_state_writer,
250            progress_tracker=progress_tracker,
251        )

Write data from source connector or already cached source data.

Caching is enabled by default, unless explicitly disabled.

Arguments:
  • source_data: The source data to write. Can be a Source or a ReadResult object.
  • streams: The streams to write to the destination. If omitted or if "*" is provided, all streams will be written. If source_data is a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source.
  • cache: The cache to use for reading source_data. If None, no cache will be used. If False, the cache will be disabled. This must be None if source_data is already a Cache object.
  • state_cache: A cache to use for storing incremental state. You do not need to set this if cache is specified or if source_data is a Cache object. Set to False to disable state management.
  • write_strategy: The strategy to use for writing source_data. If AUTO, the connector will decide the best strategy to use.
  • force_full_refresh: Whether to force a full refresh of the source_data. If True, any existing state will be ignored and all source data will be reloaded.

For incremental syncs, cache or state_cache will be checked for matching state values. If the cache has tracked state, this will be used for the sync. Otherwise, if there is a known destination state, the destination-specific state will be used. If neither are available, a full refresh will be performed.

Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
set_config
get_config
config_hash
validate_config
config_spec
print_config_spec
docs_url
connector_version
check
install
uninstall
class DuckDBCache(airbyte._processors.sql.duckdb.DuckDBConfig, airbyte.caches.base.CacheBase):
38class DuckDBCache(DuckDBConfig, CacheBase):
39    """A DuckDB cache."""
40
41    _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)

A DuckDB cache.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'schema_name': FieldInfo(annotation=str, required=False, default='main'), 'table_prefix': FieldInfo(annotation=Union[str, NoneType], required=False, default=''), 'cache_dir': FieldInfo(annotation=Path, required=False, default=PosixPath('.cache')), 'cleanup': FieldInfo(annotation=bool, required=False, default=True), 'db_path': FieldInfo(annotation=Union[Path, str], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
124                    def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
125                        """We need to both initialize private attributes and call the user-defined model_post_init
126                        method.
127                        """
128                        init_private_attributes(self, context)
129                        original_model_post_init(self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
config_hash
execute_sql
processor
get_record_processor
get_records
get_pandas_dataframe
get_arrow_dataset
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.duckdb.DuckDBConfig
db_path
schema_name
get_sql_alchemy_url
get_database_name
get_sql_engine
airbyte.shared.sql_processor.SqlConfig
table_prefix
get_create_table_extra_clauses
get_vendor_client
pydantic.main.BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
airbyte._writers.base.AirbyteWriterInterface
name
class ReadResult(collections.abc.Mapping[str, airbyte.datasets._sql.CachedDataset]):
 33class ReadResult(Mapping[str, CachedDataset]):
 34    """The result of a read operation.
 35
 36    This class is used to return information about the read operation, such as the number of
 37    records read. It should not be created directly, but instead returned by the write method
 38    of a destination.
 39    """
 40
 41    def __init__(
 42        self,
 43        *,
 44        source_name: str,
 45        processed_streams: list[str],
 46        cache: CacheBase,
 47        progress_tracker: ProgressTracker,
 48    ) -> None:
 49        """Initialize a read result.
 50
 51        This class should not be created directly. Instead, it should be returned by the `read`
 52        method of the `Source` class.
 53        """
 54        self.source_name = source_name
 55        self._progress_tracker = progress_tracker
 56        self._cache = cache
 57        self._processed_streams = processed_streams
 58
 59    def __getitem__(self, stream: str) -> CachedDataset:
 60        """Return the cached dataset for a given stream name."""
 61        if stream not in self._processed_streams:
 62            raise KeyError(stream)
 63
 64        return CachedDataset(self._cache, stream)
 65
 66    def __contains__(self, stream: object) -> bool:
 67        """Return whether a given stream name was included in processing."""
 68        if not isinstance(stream, str):
 69            return False
 70
 71        return stream in self._processed_streams
 72
 73    def __iter__(self) -> Iterator[str]:
 74        """Return an iterator over the stream names that were processed."""
 75        return self._processed_streams.__iter__()
 76
 77    def __len__(self) -> int:
 78        """Return the number of streams that were processed."""
 79        return len(self._processed_streams)
 80
 81    def get_sql_engine(self) -> Engine:
 82        """Return the SQL engine used by the cache."""
 83        return self._cache.get_sql_engine()
 84
 85    @property
 86    def processed_records(self) -> int:
 87        """The total number of records read from the source."""
 88        return self._progress_tracker.total_records_read
 89
 90    @property
 91    def streams(self) -> Mapping[str, CachedDataset]:
 92        """Return a mapping of stream names to cached datasets."""
 93        return {
 94            stream_name: CachedDataset(self._cache, stream_name)
 95            for stream_name in self._processed_streams
 96        }
 97
 98    @property
 99    def cache(self) -> CacheBase:
100        """Return the cache object."""
101        return self._cache

The result of a read operation.

This class is used to return information about the read operation, such as the number of records read. It should not be created directly, but instead returned by the write method of a destination.

ReadResult( *, source_name: str, processed_streams: list[str], cache: airbyte.caches.CacheBase, progress_tracker: airbyte.progress.ProgressTracker)
41    def __init__(
42        self,
43        *,
44        source_name: str,
45        processed_streams: list[str],
46        cache: CacheBase,
47        progress_tracker: ProgressTracker,
48    ) -> None:
49        """Initialize a read result.
50
51        This class should not be created directly. Instead, it should be returned by the `read`
52        method of the `Source` class.
53        """
54        self.source_name = source_name
55        self._progress_tracker = progress_tracker
56        self._cache = cache
57        self._processed_streams = processed_streams

Initialize a read result.

This class should not be created directly. Instead, it should be returned by the read method of the Source class.

source_name
def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
81    def get_sql_engine(self) -> Engine:
82        """Return the SQL engine used by the cache."""
83        return self._cache.get_sql_engine()

Return the SQL engine used by the cache.

processed_records: int
85    @property
86    def processed_records(self) -> int:
87        """The total number of records read from the source."""
88        return self._progress_tracker.total_records_read

The total number of records read from the source.

streams: Mapping[str, CachedDataset]
90    @property
91    def streams(self) -> Mapping[str, CachedDataset]:
92        """Return a mapping of stream names to cached datasets."""
93        return {
94            stream_name: CachedDataset(self._cache, stream_name)
95            for stream_name in self._processed_streams
96        }

Return a mapping of stream names to cached datasets.

cache: airbyte.caches.CacheBase
 98    @property
 99    def cache(self) -> CacheBase:
100        """Return the cache object."""
101        return self._cache

Return the cache object.

Inherited Members
collections.abc.Mapping
get
keys
items
values
class SecretSourceEnum(builtins.str, enum.Enum):
24class SecretSourceEnum(str, Enum):
25    """Enumeration of secret sources supported by PyAirbyte."""
26
27    ENV = "env"
28    DOTENV = "dotenv"
29    GOOGLE_COLAB = "google_colab"
30    GOOGLE_GSM = "google_gsm"  # Not enabled by default
31
32    PROMPT = "prompt"

Enumeration of secret sources supported by PyAirbyte.

ENV = <SecretSourceEnum.ENV: 'env'>
DOTENV = <SecretSourceEnum.DOTENV: 'dotenv'>
GOOGLE_COLAB = <SecretSourceEnum.GOOGLE_COLAB: 'google_colab'>
GOOGLE_GSM = <SecretSourceEnum.GOOGLE_GSM: 'google_gsm'>
PROMPT = <SecretSourceEnum.PROMPT: 'prompt'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Source(airbyte._connector_base.ConnectorBase):
 53class Source(ConnectorBase):
 54    """A class representing a source that can be called."""
 55
 56    connector_type: Literal["source"] = "source"
 57
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67    ) -> None:
 68        """Initialize the source.
 69
 70        If config is provided, it will be validated against the spec if validate is True.
 71        """
 72        self._to_be_selected_streams: list[str] | str = []
 73        """Used to hold selection criteria before catalog is known."""
 74
 75        super().__init__(
 76            executor=executor,
 77            name=name,
 78            config=config,
 79            config_change_callback=config_change_callback,
 80            validate=validate,
 81        )
 82        self._config_dict: dict[str, Any] | None = None
 83        self._last_log_messages: list[str] = []
 84        self._discovered_catalog: AirbyteCatalog | None = None
 85        self._selected_stream_names: list[str] = []
 86        if config is not None:
 87            self.set_config(config, validate=validate)
 88        if streams is not None:
 89            self.select_streams(streams)
 90
 91        self._deployed_api_root: str | None = None
 92        self._deployed_workspace_id: str | None = None
 93        self._deployed_source_id: str | None = None
 94
 95    def set_streams(self, streams: list[str]) -> None:
 96        """Deprecated. See select_streams()."""
 97        warnings.warn(
 98            "The 'set_streams' method is deprecated and will be removed in a future version. "
 99            "Please use the 'select_streams' method instead.",
100            DeprecationWarning,
101            stacklevel=2,
102        )
103        self.select_streams(streams)
104
105    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
106        """Logs a warning message indicating stream selection which are not selected yet."""
107        if streams == "*":
108            print(
109                "Warning: Config is not set yet. All streams will be selected after config is set."
110            )
111        else:
112            print(
113                "Warning: Config is not set yet. "
114                f"Streams to be selected after config is set: {streams}"
115            )
116
117    def select_all_streams(self) -> None:
118        """Select all streams.
119
120        This is a more streamlined equivalent to:
121        > source.select_streams(source.get_available_streams()).
122        """
123        if self._config_dict is None:
124            self._to_be_selected_streams = "*"
125            self._log_warning_preselected_stream(self._to_be_selected_streams)
126            return
127
128        self._selected_stream_names = self.get_available_streams()
129
130    def select_streams(self, streams: str | list[str]) -> None:
131        """Select the stream names that should be read from the connector.
132
133        Args:
134            streams: A list of stream names to select. If set to "*", all streams will be selected.
135
136        Currently, if this is not set, all streams will be read.
137        """
138        if self._config_dict is None:
139            self._to_be_selected_streams = streams
140            self._log_warning_preselected_stream(streams)
141            return
142
143        if streams == "*":
144            self.select_all_streams()
145            return
146
147        if isinstance(streams, str):
148            # If a single stream is provided, convert it to a one-item list
149            streams = [streams]
150
151        available_streams = self.get_available_streams()
152        for stream in streams:
153            if stream not in available_streams:
154                raise exc.AirbyteStreamNotFoundError(
155                    stream_name=stream,
156                    connector_name=self.name,
157                    available_streams=available_streams,
158                )
159        self._selected_stream_names = streams
160
161    def get_selected_streams(self) -> list[str]:
162        """Get the selected streams.
163
164        If no streams are selected, return an empty list.
165        """
166        return self._selected_stream_names
167
168    def set_config(
169        self,
170        config: dict[str, Any],
171        *,
172        validate: bool = True,
173    ) -> None:
174        """Set the config for the connector.
175
176        If validate is True, raise an exception if the config fails validation.
177
178        If validate is False, validation will be deferred until check() or validate_config()
179        is called.
180        """
181        if validate:
182            self.validate_config(config)
183
184        self._config_dict = config
185
186        if self._to_be_selected_streams:
187            self.select_streams(self._to_be_selected_streams)
188            self._to_be_selected_streams = []
189
190    def get_config(self) -> dict[str, Any]:
191        """Get the config for the connector."""
192        return self._config
193
194    @property
195    def _config(self) -> dict[str, Any]:
196        if self._config_dict is None:
197            raise exc.AirbyteConnectorConfigurationMissingError(
198                connector_name=self.name,
199                guidance="Provide via get_source() or set_config()",
200            )
201        return self._config_dict
202
203    def _discover(self) -> AirbyteCatalog:
204        """Call discover on the connector.
205
206        This involves the following steps:
207        - Write the config to a temporary file
208        - execute the connector with discover --config <config_file>
209        - Listen to the messages and return the first AirbyteCatalog that comes along.
210        - Make sure the subprocess is killed when the function returns.
211        """
212        with as_temp_files([self._config]) as [config_file]:
213            for msg in self._execute(["discover", "--config", config_file]):
214                if msg.type == Type.CATALOG and msg.catalog:
215                    return msg.catalog
216            raise exc.AirbyteConnectorMissingCatalogError(
217                connector_name=self.name,
218                log_text=self._last_log_messages,
219            )
220
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]
224
225    def _get_incremental_stream_names(self) -> list[str]:
226        """Get the name of streams that support incremental sync."""
227        return [
228            stream.name
229            for stream in self.discovered_catalog.streams
230            if SyncMode.incremental in stream.supported_sync_modes
231        ]
232
233    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
234        """Call spec on the connector.
235
236        This involves the following steps:
237        * execute the connector with spec
238        * Listen to the messages and return the first AirbyteCatalog that comes along.
239        * Make sure the subprocess is killed when the function returns.
240        """
241        if force_refresh or self._spec is None:
242            for msg in self._execute(["spec"]):
243                if msg.type == Type.SPEC and msg.spec:
244                    self._spec = msg.spec
245                    break
246
247        if self._spec:
248            return self._spec
249
250        raise exc.AirbyteConnectorMissingSpecError(
251            connector_name=self.name,
252            log_text=self._last_log_messages,
253        )
254
255    @property
256    def config_spec(self) -> dict[str, Any]:
257        """Generate a configuration spec for this connector, as a JSON Schema definition.
258
259        This function generates a JSON Schema dictionary with configuration specs for the
260        current connector, as a dictionary.
261
262        Returns:
263            dict: The JSON Schema configuration spec as a dictionary.
264        """
265        return self._get_spec(force_refresh=True).connectionSpecification
266
267    def print_config_spec(
268        self,
269        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
270        *,
271        output_file: Path | str | None = None,
272    ) -> None:
273        """Print the configuration spec for this connector.
274
275        Args:
276            format: The format to print the spec in. Must be "yaml" or "json".
277            output_file: Optional. If set, the spec will be written to the given file path.
278                Otherwise, it will be printed to the console.
279        """
280        if format not in {"yaml", "json"}:
281            raise exc.PyAirbyteInputError(
282                message="Invalid format. Expected 'yaml' or 'json'",
283                input_value=format,
284            )
285        if isinstance(output_file, str):
286            output_file = Path(output_file)
287
288        if format == "yaml":
289            content = yaml.dump(self.config_spec, indent=2)
290        elif format == "json":
291            content = json.dumps(self.config_spec, indent=2)
292
293        if output_file:
294            output_file.write_text(content)
295            return
296
297        syntax_highlighted = Syntax(content, format)
298        print(syntax_highlighted)
299
300    @property
301    def _yaml_spec(self) -> str:
302        """Get the spec as a yaml string.
303
304        For now, the primary use case is for writing and debugging a valid config for a source.
305
306        This is private for now because we probably want better polish before exposing this
307        as a stable interface. This will also get easier when we have docs links with this info
308        for each connector.
309        """
310        spec_obj: ConnectorSpecification = self._get_spec()
311        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
312        # convert to a yaml string
313        return yaml.dump(spec_dict)
314
315    @property
316    def docs_url(self) -> str:
317        """Get the URL to the connector's documentation."""
318        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
319            "source-", ""
320        )
321
322    @property
323    def discovered_catalog(self) -> AirbyteCatalog:
324        """Get the raw catalog for the given streams.
325
326        If the catalog is not yet known, we call discover to get it.
327        """
328        if self._discovered_catalog is None:
329            self._discovered_catalog = self._discover()
330
331        return self._discovered_catalog
332
333    @property
334    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
335        """Get the configured catalog for the given streams.
336
337        If the raw catalog is not yet known, we call discover to get it.
338
339        If no specific streams are selected, we return a catalog that syncs all available streams.
340
341        TODO: We should consider disabling by default the streams that the connector would
342        disable by default. (For instance, streams that require a premium license are sometimes
343        disabled by default within the connector.)
344        """
345        # Ensure discovered catalog is cached before we start
346        _ = self.discovered_catalog
347
348        # Filter for selected streams if set, otherwise use all available streams:
349        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
350        return self.get_configured_catalog(streams=streams_filter)
351
352    def get_configured_catalog(
353        self,
354        streams: Literal["*"] | list[str] | None = None,
355    ) -> ConfiguredAirbyteCatalog:
356        """Get a configured catalog for the given streams.
357
358        If no streams are provided, the selected streams will be used. If no streams are selected,
359        all available streams will be used.
360
361        If '*' is provided, all available streams will be used.
362        """
363        selected_streams: list[str] = []
364        if streams is None:
365            selected_streams = self._selected_stream_names or self.get_available_streams()
366        elif streams == "*":
367            selected_streams = self.get_available_streams()
368        elif isinstance(streams, list):
369            selected_streams = streams
370        else:
371            raise exc.PyAirbyteInputError(
372                message="Invalid streams argument.",
373                input_value=streams,
374            )
375
376        return ConfiguredAirbyteCatalog(
377            streams=[
378                ConfiguredAirbyteStream(
379                    stream=stream,
380                    destination_sync_mode=DestinationSyncMode.overwrite,
381                    primary_key=stream.source_defined_primary_key,
382                    sync_mode=SyncMode.incremental,
383                )
384                for stream in self.discovered_catalog.streams
385                if stream.name in selected_streams
386            ],
387        )
388
389    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
390        """Return the JSON Schema spec for the specified stream name."""
391        catalog: AirbyteCatalog = self.discovered_catalog
392        found: list[AirbyteStream] = [
393            stream for stream in catalog.streams if stream.name == stream_name
394        ]
395
396        if len(found) == 0:
397            raise exc.PyAirbyteInputError(
398                message="Stream name does not exist in catalog.",
399                input_value=stream_name,
400            )
401
402        if len(found) > 1:
403            raise exc.PyAirbyteInternalError(
404                message="Duplicate streams found with the same name.",
405                context={
406                    "found_streams": found,
407                },
408            )
409
410        return found[0].json_schema
411
412    def get_records(
413        self,
414        stream: str,
415        *,
416        normalize_field_names: bool = False,
417        prune_undeclared_fields: bool = True,
418    ) -> LazyDataset:
419        """Read a stream from the connector.
420
421        Args:
422            stream: The name of the stream to read.
423            normalize_field_names: When `True`, field names will be normalized to lower case, with
424                special characters removed. This matches the behavior of PyAirbyte caches and most
425                Airbyte destinations.
426            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
427                which generally matches the behavior of PyAirbyte caches and most Airbyte
428                destinations, specifically when you expect the catalog may be stale. You can disable
429                this to keep all fields in the records.
430
431        This involves the following steps:
432        * Call discover to get the catalog
433        * Generate a configured catalog that syncs the given stream in full_refresh mode
434        * Write the configured catalog and the config to a temporary file
435        * execute the connector with read --config <config_file> --catalog <catalog_file>
436        * Listen to the messages and return the first AirbyteRecordMessages that come along.
437        * Make sure the subprocess is killed when the function returns.
438        """
439        discovered_catalog: AirbyteCatalog = self.discovered_catalog
440        configured_catalog = ConfiguredAirbyteCatalog(
441            streams=[
442                ConfiguredAirbyteStream(
443                    stream=s,
444                    sync_mode=SyncMode.full_refresh,
445                    destination_sync_mode=DestinationSyncMode.overwrite,
446                )
447                for s in discovered_catalog.streams
448                if s.name == stream
449            ],
450        )
451        if len(configured_catalog.streams) == 0:
452            raise exc.PyAirbyteInputError(
453                message="Requested stream does not exist.",
454                context={
455                    "stream": stream,
456                    "available_streams": self.get_available_streams(),
457                    "connector_name": self.name,
458                },
459            ) from KeyError(stream)
460
461        configured_stream = configured_catalog.streams[0]
462
463        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
464            yield from records
465
466        stream_record_handler = StreamRecordHandler(
467            json_schema=self.get_stream_json_schema(stream),
468            prune_extra_fields=prune_undeclared_fields,
469            normalize_keys=normalize_field_names,
470        )
471
472        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
473        progress_tracker = ProgressTracker(
474            ProgressStyle.PLAIN,
475            source=self,
476            cache=None,
477            destination=None,
478            expected_streams=[stream],
479        )
480
481        iterator: Iterator[dict[str, Any]] = (
482            StreamRecord.from_record_message(
483                record_message=record.record,
484                stream_record_handler=stream_record_handler,
485            )
486            for record in self._read_with_catalog(
487                catalog=configured_catalog,
488                progress_tracker=progress_tracker,
489            )
490            if record.record
491        )
492        progress_tracker.log_success()
493        return LazyDataset(
494            iterator,
495            stream_metadata=configured_stream,
496        )
497
498    def get_documents(
499        self,
500        stream: str,
501        title_property: str | None = None,
502        content_properties: list[str] | None = None,
503        metadata_properties: list[str] | None = None,
504        *,
505        render_metadata: bool = False,
506    ) -> Iterable[Document]:
507        """Read a stream from the connector and return the records as documents.
508
509        If metadata_properties is not set, all properties that are not content will be added to
510        the metadata.
511
512        If render_metadata is True, metadata will be rendered in the document, as well as the
513        the main content.
514        """
515        return self.get_records(stream).to_documents(
516            title_property=title_property,
517            content_properties=content_properties,
518            metadata_properties=metadata_properties,
519            render_metadata=render_metadata,
520        )
521
522    def _get_airbyte_message_iterator(
523        self,
524        *,
525        streams: Literal["*"] | list[str] | None = None,
526        state_provider: StateProviderBase | None = None,
527        progress_tracker: ProgressTracker,
528        force_full_refresh: bool = False,
529    ) -> AirbyteMessageIterator:
530        """Get an AirbyteMessageIterator for this source."""
531        return AirbyteMessageIterator(
532            self._read_with_catalog(
533                catalog=self.get_configured_catalog(streams=streams),
534                state=state_provider if not force_full_refresh else None,
535                progress_tracker=progress_tracker,
536            )
537        )
538
539    def _read_with_catalog(
540        self,
541        catalog: ConfiguredAirbyteCatalog,
542        progress_tracker: ProgressTracker,
543        state: StateProviderBase | None = None,
544    ) -> Generator[AirbyteMessage, None, None]:
545        """Call read on the connector.
546
547        This involves the following steps:
548        * Write the config to a temporary file
549        * execute the connector with read --config <config_file> --catalog <catalog_file>
550        * Listen to the messages and return the AirbyteRecordMessages that come along.
551        * Send out telemetry on the performed sync (with information about which source was used and
552          the type of the cache)
553        """
554        with as_temp_files(
555            [
556                self._config,
557                catalog.model_dump_json(),
558                state.to_state_input_file_text() if state else "[]",
559            ]
560        ) as [
561            config_file,
562            catalog_file,
563            state_file,
564        ]:
565            message_generator = self._execute(
566                [
567                    "read",
568                    "--config",
569                    config_file,
570                    "--catalog",
571                    catalog_file,
572                    "--state",
573                    state_file,
574                ],
575                progress_tracker=progress_tracker,
576            )
577            yield from progress_tracker.tally_records_read(message_generator)
578        progress_tracker.log_read_complete()
579
580    def _peek_airbyte_message(
581        self,
582        message: AirbyteMessage,
583        *,
584        raise_on_error: bool = True,
585    ) -> None:
586        """Process an Airbyte message.
587
588        This method handles reading Airbyte messages and taking action, if needed, based on the
589        message type. For instance, log messages are logged, records are tallied, and errors are
590        raised as exceptions if `raise_on_error` is True.
591
592        Raises:
593            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
594        """
595        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
596
597    def _log_incremental_streams(
598        self,
599        *,
600        incremental_streams: set[str] | None = None,
601    ) -> None:
602        """Log the streams which are using incremental sync mode."""
603        log_message = (
604            "The following streams are currently using incremental sync:\n"
605            f"{incremental_streams}\n"
606            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
607        )
608        print(log_message)
609
610    def read(
611        self,
612        cache: CacheBase | None = None,
613        *,
614        streams: str | list[str] | None = None,
615        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
616        force_full_refresh: bool = False,
617        skip_validation: bool = False,
618    ) -> ReadResult:
619        """Read from the connector and write to the cache.
620
621        Args:
622            cache: The cache to write to. If not set, a default cache will be used.
623            streams: Optional if already set. A list of stream names to select for reading. If set
624                to "*", all streams will be selected.
625            write_strategy: The strategy to use when writing to the cache. If a string, it must be
626                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
627                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
628                WriteStrategy.AUTO.
629            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
630                streams will be read in incremental mode if supported by the connector. This option
631                must be True when using the "replace" strategy.
632            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
633                running the connector. This can be helpful in debugging, when you want to send
634                configurations to the connector that otherwise might be rejected by JSON Schema
635                validation rules.
636        """
637        cache = cache or get_default_cache()
638        progress_tracker = ProgressTracker(
639            source=self,
640            cache=cache,
641            destination=None,
642            expected_streams=None,  # Will be set later
643        )
644
645        # Set up state provider if not in full refresh mode
646        if force_full_refresh:
647            state_provider: StateProviderBase | None = None
648        else:
649            state_provider = cache.get_state_provider(
650                source_name=self._name,
651            )
652        state_writer = cache.get_state_writer(source_name=self._name)
653
654        if streams:
655            self.select_streams(streams)
656
657        if not self._selected_stream_names:
658            raise exc.PyAirbyteNoStreamsSelectedError(
659                connector_name=self.name,
660                available_streams=self.get_available_streams(),
661            )
662
663        try:
664            result = self._read_to_cache(
665                cache=cache,
666                catalog_provider=CatalogProvider(self.configured_catalog),
667                stream_names=self._selected_stream_names,
668                state_provider=state_provider,
669                state_writer=state_writer,
670                write_strategy=write_strategy,
671                force_full_refresh=force_full_refresh,
672                skip_validation=skip_validation,
673                progress_tracker=progress_tracker,
674            )
675        except exc.PyAirbyteInternalError as ex:
676            progress_tracker.log_failure(exception=ex)
677            raise exc.AirbyteConnectorFailedError(
678                connector_name=self.name,
679                log_text=self._last_log_messages,
680            ) from ex
681        except Exception as ex:
682            progress_tracker.log_failure(exception=ex)
683            raise
684
685        progress_tracker.log_success()
686        return result
687
688    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
689        self,
690        cache: CacheBase,
691        *,
692        catalog_provider: CatalogProvider,
693        stream_names: list[str],
694        state_provider: StateProviderBase | None,
695        state_writer: StateWriterBase | None,
696        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
697        force_full_refresh: bool = False,
698        skip_validation: bool = False,
699        progress_tracker: ProgressTracker,
700    ) -> ReadResult:
701        """Internal read method."""
702        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
703            warnings.warn(
704                message=(
705                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
706                    "could result in data loss. "
707                    "To silence this warning, use the following: "
708                    'warnings.filterwarnings("ignore", '
709                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
710                ),
711                category=exc.PyAirbyteDataLossWarning,
712                stacklevel=1,
713            )
714        if isinstance(write_strategy, str):
715            try:
716                write_strategy = WriteStrategy(write_strategy)
717            except ValueError:
718                raise exc.PyAirbyteInputError(
719                    message="Invalid strategy",
720                    context={
721                        "write_strategy": write_strategy,
722                        "available_strategies": [s.value for s in WriteStrategy],
723                    },
724                ) from None
725
726        # Run optional validation step
727        if not skip_validation:
728            self.validate_config()
729
730        # Log incremental stream if incremental streams are known
731        if state_provider and state_provider.known_stream_names:
732            # Retrieve set of the known streams support which support incremental sync
733            incremental_streams = (
734                set(self._get_incremental_stream_names())
735                & state_provider.known_stream_names
736                & set(self.get_selected_streams())
737            )
738            if incremental_streams:
739                self._log_incremental_streams(incremental_streams=incremental_streams)
740
741        airbyte_message_iterator = AirbyteMessageIterator(
742            self._read_with_catalog(
743                catalog=catalog_provider.configured_catalog,
744                state=state_provider,
745                progress_tracker=progress_tracker,
746            )
747        )
748        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
749            stdin=airbyte_message_iterator,
750            catalog_provider=catalog_provider,
751            write_strategy=write_strategy,
752            state_writer=state_writer,
753            progress_tracker=progress_tracker,
754        )
755
756        # Flush the WAL, if applicable
757        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
758
759        return ReadResult(
760            source_name=self.name,
761            progress_tracker=progress_tracker,
762            processed_streams=stream_names,
763            cache=cache,
764        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False)
58    def __init__(
59        self,
60        executor: Executor,
61        name: str,
62        config: dict[str, Any] | None = None,
63        *,
64        config_change_callback: ConfigChangeCallback | None = None,
65        streams: str | list[str] | None = None,
66        validate: bool = False,
67    ) -> None:
68        """Initialize the source.
69
70        If config is provided, it will be validated against the spec if validate is True.
71        """
72        self._to_be_selected_streams: list[str] | str = []
73        """Used to hold selection criteria before catalog is known."""
74
75        super().__init__(
76            executor=executor,
77            name=name,
78            config=config,
79            config_change_callback=config_change_callback,
80            validate=validate,
81        )
82        self._config_dict: dict[str, Any] | None = None
83        self._last_log_messages: list[str] = []
84        self._discovered_catalog: AirbyteCatalog | None = None
85        self._selected_stream_names: list[str] = []
86        if config is not None:
87            self.set_config(config, validate=validate)
88        if streams is not None:
89            self.select_streams(streams)
90
91        self._deployed_api_root: str | None = None
92        self._deployed_workspace_id: str | None = None
93        self._deployed_source_id: str | None = None

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type: Literal['source'] = 'source'
def set_streams(self, streams: list[str]) -> None:
 95    def set_streams(self, streams: list[str]) -> None:
 96        """Deprecated. See select_streams()."""
 97        warnings.warn(
 98            "The 'set_streams' method is deprecated and will be removed in a future version. "
 99            "Please use the 'select_streams' method instead.",
100            DeprecationWarning,
101            stacklevel=2,
102        )
103        self.select_streams(streams)

Deprecated. See select_streams().

def select_all_streams(self) -> None:
117    def select_all_streams(self) -> None:
118        """Select all streams.
119
120        This is a more streamlined equivalent to:
121        > source.select_streams(source.get_available_streams()).
122        """
123        if self._config_dict is None:
124            self._to_be_selected_streams = "*"
125            self._log_warning_preselected_stream(self._to_be_selected_streams)
126            return
127
128        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
130    def select_streams(self, streams: str | list[str]) -> None:
131        """Select the stream names that should be read from the connector.
132
133        Args:
134            streams: A list of stream names to select. If set to "*", all streams will be selected.
135
136        Currently, if this is not set, all streams will be read.
137        """
138        if self._config_dict is None:
139            self._to_be_selected_streams = streams
140            self._log_warning_preselected_stream(streams)
141            return
142
143        if streams == "*":
144            self.select_all_streams()
145            return
146
147        if isinstance(streams, str):
148            # If a single stream is provided, convert it to a one-item list
149            streams = [streams]
150
151        available_streams = self.get_available_streams()
152        for stream in streams:
153            if stream not in available_streams:
154                raise exc.AirbyteStreamNotFoundError(
155                    stream_name=stream,
156                    connector_name=self.name,
157                    available_streams=available_streams,
158                )
159        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
161    def get_selected_streams(self) -> list[str]:
162        """Get the selected streams.
163
164        If no streams are selected, return an empty list.
165        """
166        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
168    def set_config(
169        self,
170        config: dict[str, Any],
171        *,
172        validate: bool = True,
173    ) -> None:
174        """Set the config for the connector.
175
176        If validate is True, raise an exception if the config fails validation.
177
178        If validate is False, validation will be deferred until check() or validate_config()
179        is called.
180        """
181        if validate:
182            self.validate_config(config)
183
184        self._config_dict = config
185
186        if self._to_be_selected_streams:
187            self.select_streams(self._to_be_selected_streams)
188            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_config(self) -> dict[str, typing.Any]:
190    def get_config(self) -> dict[str, Any]:
191        """Get the config for the connector."""
192        return self._config

Get the config for the connector.

def get_available_streams(self) -> list[str]:
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
255    @property
256    def config_spec(self) -> dict[str, Any]:
257        """Generate a configuration spec for this connector, as a JSON Schema definition.
258
259        This function generates a JSON Schema dictionary with configuration specs for the
260        current connector, as a dictionary.
261
262        Returns:
263            dict: The JSON Schema configuration spec as a dictionary.
264        """
265        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
267    def print_config_spec(
268        self,
269        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
270        *,
271        output_file: Path | str | None = None,
272    ) -> None:
273        """Print the configuration spec for this connector.
274
275        Args:
276            format: The format to print the spec in. Must be "yaml" or "json".
277            output_file: Optional. If set, the spec will be written to the given file path.
278                Otherwise, it will be printed to the console.
279        """
280        if format not in {"yaml", "json"}:
281            raise exc.PyAirbyteInputError(
282                message="Invalid format. Expected 'yaml' or 'json'",
283                input_value=format,
284            )
285        if isinstance(output_file, str):
286            output_file = Path(output_file)
287
288        if format == "yaml":
289            content = yaml.dump(self.config_spec, indent=2)
290        elif format == "json":
291            content = json.dumps(self.config_spec, indent=2)
292
293        if output_file:
294            output_file.write_text(content)
295            return
296
297        syntax_highlighted = Syntax(content, format)
298        print(syntax_highlighted)

Print the configuration spec for this connector.

Arguments:
  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
315    @property
316    def docs_url(self) -> str:
317        """Get the URL to the connector's documentation."""
318        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
319            "source-", ""
320        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
322    @property
323    def discovered_catalog(self) -> AirbyteCatalog:
324        """Get the raw catalog for the given streams.
325
326        If the catalog is not yet known, we call discover to get it.
327        """
328        if self._discovered_catalog is None:
329            self._discovered_catalog = self._discover()
330
331        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
333    @property
334    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
335        """Get the configured catalog for the given streams.
336
337        If the raw catalog is not yet known, we call discover to get it.
338
339        If no specific streams are selected, we return a catalog that syncs all available streams.
340
341        TODO: We should consider disabling by default the streams that the connector would
342        disable by default. (For instance, streams that require a premium license are sometimes
343        disabled by default within the connector.)
344        """
345        # Ensure discovered catalog is cached before we start
346        _ = self.discovered_catalog
347
348        # Filter for selected streams if set, otherwise use all available streams:
349        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
350        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
352    def get_configured_catalog(
353        self,
354        streams: Literal["*"] | list[str] | None = None,
355    ) -> ConfiguredAirbyteCatalog:
356        """Get a configured catalog for the given streams.
357
358        If no streams are provided, the selected streams will be used. If no streams are selected,
359        all available streams will be used.
360
361        If '*' is provided, all available streams will be used.
362        """
363        selected_streams: list[str] = []
364        if streams is None:
365            selected_streams = self._selected_stream_names or self.get_available_streams()
366        elif streams == "*":
367            selected_streams = self.get_available_streams()
368        elif isinstance(streams, list):
369            selected_streams = streams
370        else:
371            raise exc.PyAirbyteInputError(
372                message="Invalid streams argument.",
373                input_value=streams,
374            )
375
376        return ConfiguredAirbyteCatalog(
377            streams=[
378                ConfiguredAirbyteStream(
379                    stream=stream,
380                    destination_sync_mode=DestinationSyncMode.overwrite,
381                    primary_key=stream.source_defined_primary_key,
382                    sync_mode=SyncMode.incremental,
383                )
384                for stream in self.discovered_catalog.streams
385                if stream.name in selected_streams
386            ],
387        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
389    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
390        """Return the JSON Schema spec for the specified stream name."""
391        catalog: AirbyteCatalog = self.discovered_catalog
392        found: list[AirbyteStream] = [
393            stream for stream in catalog.streams if stream.name == stream_name
394        ]
395
396        if len(found) == 0:
397            raise exc.PyAirbyteInputError(
398                message="Stream name does not exist in catalog.",
399                input_value=stream_name,
400            )
401
402        if len(found) > 1:
403            raise exc.PyAirbyteInternalError(
404                message="Duplicate streams found with the same name.",
405                context={
406                    "found_streams": found,
407                },
408            )
409
410        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
412    def get_records(
413        self,
414        stream: str,
415        *,
416        normalize_field_names: bool = False,
417        prune_undeclared_fields: bool = True,
418    ) -> LazyDataset:
419        """Read a stream from the connector.
420
421        Args:
422            stream: The name of the stream to read.
423            normalize_field_names: When `True`, field names will be normalized to lower case, with
424                special characters removed. This matches the behavior of PyAirbyte caches and most
425                Airbyte destinations.
426            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
427                which generally matches the behavior of PyAirbyte caches and most Airbyte
428                destinations, specifically when you expect the catalog may be stale. You can disable
429                this to keep all fields in the records.
430
431        This involves the following steps:
432        * Call discover to get the catalog
433        * Generate a configured catalog that syncs the given stream in full_refresh mode
434        * Write the configured catalog and the config to a temporary file
435        * execute the connector with read --config <config_file> --catalog <catalog_file>
436        * Listen to the messages and return the first AirbyteRecordMessages that come along.
437        * Make sure the subprocess is killed when the function returns.
438        """
439        discovered_catalog: AirbyteCatalog = self.discovered_catalog
440        configured_catalog = ConfiguredAirbyteCatalog(
441            streams=[
442                ConfiguredAirbyteStream(
443                    stream=s,
444                    sync_mode=SyncMode.full_refresh,
445                    destination_sync_mode=DestinationSyncMode.overwrite,
446                )
447                for s in discovered_catalog.streams
448                if s.name == stream
449            ],
450        )
451        if len(configured_catalog.streams) == 0:
452            raise exc.PyAirbyteInputError(
453                message="Requested stream does not exist.",
454                context={
455                    "stream": stream,
456                    "available_streams": self.get_available_streams(),
457                    "connector_name": self.name,
458                },
459            ) from KeyError(stream)
460
461        configured_stream = configured_catalog.streams[0]
462
463        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
464            yield from records
465
466        stream_record_handler = StreamRecordHandler(
467            json_schema=self.get_stream_json_schema(stream),
468            prune_extra_fields=prune_undeclared_fields,
469            normalize_keys=normalize_field_names,
470        )
471
472        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
473        progress_tracker = ProgressTracker(
474            ProgressStyle.PLAIN,
475            source=self,
476            cache=None,
477            destination=None,
478            expected_streams=[stream],
479        )
480
481        iterator: Iterator[dict[str, Any]] = (
482            StreamRecord.from_record_message(
483                record_message=record.record,
484                stream_record_handler=stream_record_handler,
485            )
486            for record in self._read_with_catalog(
487                catalog=configured_catalog,
488                progress_tracker=progress_tracker,
489            )
490            if record.record
491        )
492        progress_tracker.log_success()
493        return LazyDataset(
494            iterator,
495            stream_metadata=configured_stream,
496        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
498    def get_documents(
499        self,
500        stream: str,
501        title_property: str | None = None,
502        content_properties: list[str] | None = None,
503        metadata_properties: list[str] | None = None,
504        *,
505        render_metadata: bool = False,
506    ) -> Iterable[Document]:
507        """Read a stream from the connector and return the records as documents.
508
509        If metadata_properties is not set, all properties that are not content will be added to
510        the metadata.
511
512        If render_metadata is True, metadata will be rendered in the document, as well as the
513        the main content.
514        """
515        return self.get_records(stream).to_documents(
516            title_property=title_property,
517            content_properties=content_properties,
518            metadata_properties=metadata_properties,
519            render_metadata=render_metadata,
520        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> ReadResult:
610    def read(
611        self,
612        cache: CacheBase | None = None,
613        *,
614        streams: str | list[str] | None = None,
615        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
616        force_full_refresh: bool = False,
617        skip_validation: bool = False,
618    ) -> ReadResult:
619        """Read from the connector and write to the cache.
620
621        Args:
622            cache: The cache to write to. If not set, a default cache will be used.
623            streams: Optional if already set. A list of stream names to select for reading. If set
624                to "*", all streams will be selected.
625            write_strategy: The strategy to use when writing to the cache. If a string, it must be
626                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
627                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
628                WriteStrategy.AUTO.
629            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
630                streams will be read in incremental mode if supported by the connector. This option
631                must be True when using the "replace" strategy.
632            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
633                running the connector. This can be helpful in debugging, when you want to send
634                configurations to the connector that otherwise might be rejected by JSON Schema
635                validation rules.
636        """
637        cache = cache or get_default_cache()
638        progress_tracker = ProgressTracker(
639            source=self,
640            cache=cache,
641            destination=None,
642            expected_streams=None,  # Will be set later
643        )
644
645        # Set up state provider if not in full refresh mode
646        if force_full_refresh:
647            state_provider: StateProviderBase | None = None
648        else:
649            state_provider = cache.get_state_provider(
650                source_name=self._name,
651            )
652        state_writer = cache.get_state_writer(source_name=self._name)
653
654        if streams:
655            self.select_streams(streams)
656
657        if not self._selected_stream_names:
658            raise exc.PyAirbyteNoStreamsSelectedError(
659                connector_name=self.name,
660                available_streams=self.get_available_streams(),
661            )
662
663        try:
664            result = self._read_to_cache(
665                cache=cache,
666                catalog_provider=CatalogProvider(self.configured_catalog),
667                stream_names=self._selected_stream_names,
668                state_provider=state_provider,
669                state_writer=state_writer,
670                write_strategy=write_strategy,
671                force_full_refresh=force_full_refresh,
672                skip_validation=skip_validation,
673                progress_tracker=progress_tracker,
674            )
675        except exc.PyAirbyteInternalError as ex:
676            progress_tracker.log_failure(exception=ex)
677            raise exc.AirbyteConnectorFailedError(
678                connector_name=self.name,
679                log_text=self._last_log_messages,
680            ) from ex
681        except Exception as ex:
682            progress_tracker.log_failure(exception=ex)
683            raise
684
685        progress_tracker.log_success()
686        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
config_hash
validate_config
connector_version
check
install
uninstall
class StreamRecord(dict[str, typing.Any]):
173class StreamRecord(dict[str, Any]):
174    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
175
176    It has these behaviors:
177    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
178      case-insensitive manner.
179    - The original case is stored in a separate dictionary, so that the original case can be
180      retrieved when needed.
181    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
182      Python dictionary.
183    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
184      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
185
186    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
187    references.
188
189    There are two ways this class can store keys internally:
190    - If normalize_keys is True, the keys are normalized using the given normalizer.
191    - If normalize_keys is False, the original case of the keys is stored.
192
193    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
194    dictionary will be initialized with the given keys. If a key is not found in the input data, it
195    will be initialized with a value of None. When provided, the 'expected_keys' input will also
196    determine the original case of the keys.
197    """
198
199    def __init__(
200        self,
201        from_dict: dict,
202        *,
203        stream_record_handler: StreamRecordHandler,
204        with_internal_columns: bool = True,
205        extracted_at: datetime | None = None,
206    ) -> None:
207        """Initialize the dictionary with the given data.
208
209        Args:
210            from_dict: The dictionary to initialize the StreamRecord with.
211            stream_record_handler: The StreamRecordHandler to use for processing the record.
212            with_internal_columns: If `True`, the internal columns will be added to the record.
213            extracted_at: The time the record was extracted. If not provided, the current time will
214                be used.
215        """
216        self._stream_handler: StreamRecordHandler = stream_record_handler
217
218        # Start by initializing all values to None
219        self.update(dict.fromkeys(stream_record_handler.index_keys))
220
221        # Update the dictionary with the given data
222        if self._stream_handler.prune_extra_fields:
223            self.update(
224                {
225                    self._stream_handler.to_index_case(k): v
226                    for k, v in from_dict.items()
227                    if self._stream_handler.to_index_case(k) in self._stream_handler.index_keys
228                }
229            )
230        else:
231            self.update({self._stream_handler.to_index_case(k): v for k, v in from_dict.items()})
232
233        if with_internal_columns:
234            self.update(
235                {
236                    AB_RAW_ID_COLUMN: uuid7str(),
237                    AB_EXTRACTED_AT_COLUMN: extracted_at or datetime.now(pytz.utc),
238                    AB_META_COLUMN: {},
239                }
240            )
241
242    @classmethod
243    def from_record_message(
244        cls,
245        record_message: AirbyteRecordMessage,
246        *,
247        stream_record_handler: StreamRecordHandler,
248    ) -> StreamRecord:
249        """Return a StreamRecord from a RecordMessage."""
250        data_dict: dict[str, Any] = record_message.data.copy()
251        return cls(
252            from_dict=data_dict,
253            stream_record_handler=stream_record_handler,
254            with_internal_columns=True,
255            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
256        )
257
258    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
259        """Return the item with the given key."""
260        try:
261            return super().__getitem__(key)
262        except KeyError:
263            return super().__getitem__(self._stream_handler.to_index_case(key))
264
265    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
266        """Set the item with the given key to the given value."""
267        index_case_key = self._stream_handler.to_index_case(key)
268        if (
269            self._stream_handler.prune_extra_fields
270            and index_case_key not in self._stream_handler.index_keys
271        ):
272            return
273
274        super().__setitem__(index_case_key, value)
275
276    def __delitem__(self, key: str) -> None:
277        """Delete the item with the given key."""
278        try:
279            super().__delitem__(key)
280        except KeyError:
281            index_case_key = self._stream_handler.to_index_case(key)
282            if super().__contains__(index_case_key):
283                super().__delitem__(index_case_key)
284                return
285        else:
286            # No failure. Key was deleted.
287            return
288
289        raise KeyError(key)
290
291    def __contains__(self, key: object) -> bool:
292        """Return whether the dictionary contains the given key."""
293        assert isinstance(key, str), "Key must be a string."
294        return super().__contains__(key) or super().__contains__(
295            self._stream_handler.to_index_case(key)
296        )
297
298    def __iter__(self) -> Iterator[str]:
299        """Return an iterator over the keys of the dictionary."""
300        return iter(super().__iter__())
301
302    def __len__(self) -> int:
303        """Return the number of items in the dictionary."""
304        return super().__len__()
305
306    def __eq__(self, other: object) -> bool:
307        """Return whether the StreamRecord is equal to the given dict or StreamRecord object."""
308        if isinstance(other, StreamRecord):
309            return dict(self) == dict(other)
310
311        if isinstance(other, dict):
312            return {k.lower(): v for k, v in self.items()} == {
313                k.lower(): v for k, v in other.items()
314            }
315        return False
316
317    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
318        """Return the hash of the dictionary with keys sorted."""
319        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
320        return hash(tuple(sorted(items)))

The StreamRecord class is a case-aware, case-insensitive dictionary implementation.

It has these behaviors:

  • When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
  • The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
  • Because it is subclassed from dict, the StreamRecord class can be passed as a normal Python dictionary.
  • In addition to the properties of the stream's records, the dictionary also stores the Airbyte metadata columns: _airbyte_raw_id, _airbyte_extracted_at, and _airbyte_meta.

This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.

There are two ways this class can store keys internally:

  • If normalize_keys is True, the keys are normalized using the given normalizer.
  • If normalize_keys is False, the original case of the keys is stored.

In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.

@classmethod
def from_record_message( cls, record_message: airbyte_protocol.models.airbyte_protocol.AirbyteRecordMessage, *, stream_record_handler: airbyte.records.StreamRecordHandler) -> StreamRecord:
242    @classmethod
243    def from_record_message(
244        cls,
245        record_message: AirbyteRecordMessage,
246        *,
247        stream_record_handler: StreamRecordHandler,
248    ) -> StreamRecord:
249        """Return a StreamRecord from a RecordMessage."""
250        data_dict: dict[str, Any] = record_message.data.copy()
251        return cls(
252            from_dict=data_dict,
253            stream_record_handler=stream_record_handler,
254            with_internal_columns=True,
255            extracted_at=datetime.fromtimestamp(record_message.emitted_at / 1000, tz=pytz.utc),
256        )

Return a StreamRecord from a RecordMessage.

Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class WriteResult:
104class WriteResult:
105    """The result of a write operation.
106
107    This class is used to return information about the write operation, such as the number of
108    records written. It should not be created directly, but instead returned by the write method
109    of a destination.
110    """
111
112    def __init__(
113        self,
114        *,
115        destination: AirbyteWriterInterface | Destination,
116        source_data: Source | ReadResult,
117        catalog_provider: CatalogProvider,
118        state_writer: StateWriterBase,
119        progress_tracker: ProgressTracker,
120    ) -> None:
121        """Initialize a write result.
122
123        This class should not be created directly. Instead, it should be returned by the `write`
124        method of the `Destination` class.
125        """
126        self._destination: AirbyteWriterInterface | Destination = destination
127        self._source_data: Source | ReadResult = source_data
128        self._catalog_provider: CatalogProvider = catalog_provider
129        self._state_writer: StateWriterBase = state_writer
130        self._progress_tracker: ProgressTracker = progress_tracker
131
132    @property
133    def processed_records(self) -> int:
134        """The total number of records written to the destination."""
135        return self._progress_tracker.total_destination_records_delivered
136
137    def get_state_provider(self) -> StateProviderBase:
138        """Return the state writer as a state provider.
139
140        As a public interface, we only expose the state writer as a state provider. This is because
141        the state writer itself is only intended for internal use. As a state provider, the state
142        writer can be used to read the state artifacts that were written. This can be useful for
143        testing or debugging.
144        """
145        return self._state_writer

The result of a write operation.

This class is used to return information about the write operation, such as the number of records written. It should not be created directly, but instead returned by the write method of a destination.

WriteResult( *, destination: airbyte._writers.base.AirbyteWriterInterface | Destination, source_data: Source | ReadResult, catalog_provider: airbyte.shared.catalog_providers.CatalogProvider, state_writer: airbyte.shared.state_writers.StateWriterBase, progress_tracker: airbyte.progress.ProgressTracker)
112    def __init__(
113        self,
114        *,
115        destination: AirbyteWriterInterface | Destination,
116        source_data: Source | ReadResult,
117        catalog_provider: CatalogProvider,
118        state_writer: StateWriterBase,
119        progress_tracker: ProgressTracker,
120    ) -> None:
121        """Initialize a write result.
122
123        This class should not be created directly. Instead, it should be returned by the `write`
124        method of the `Destination` class.
125        """
126        self._destination: AirbyteWriterInterface | Destination = destination
127        self._source_data: Source | ReadResult = source_data
128        self._catalog_provider: CatalogProvider = catalog_provider
129        self._state_writer: StateWriterBase = state_writer
130        self._progress_tracker: ProgressTracker = progress_tracker

Initialize a write result.

This class should not be created directly. Instead, it should be returned by the write method of the Destination class.

processed_records: int
132    @property
133    def processed_records(self) -> int:
134        """The total number of records written to the destination."""
135        return self._progress_tracker.total_destination_records_delivered

The total number of records written to the destination.

def get_state_provider(self) -> airbyte.shared.state_providers.StateProviderBase:
137    def get_state_provider(self) -> StateProviderBase:
138        """Return the state writer as a state provider.
139
140        As a public interface, we only expose the state writer as a state provider. This is because
141        the state writer itself is only intended for internal use. As a state provider, the state
142        writer can be used to read the state artifacts that were written. This can be useful for
143        testing or debugging.
144        """
145        return self._state_writer

Return the state writer as a state provider.

As a public interface, we only expose the state writer as a state provider. This is because the state writer itself is only intended for internal use. As a state provider, the state writer can be used to read the state artifacts that were written. This can be useful for testing or debugging.