airbyte.sources.util

Utility functions for working with sources.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Utility functions for working with sources."""
  3
  4from __future__ import annotations
  5
  6import warnings
  7from decimal import Decimal, InvalidOperation
  8from typing import TYPE_CHECKING, Any
  9
 10from airbyte._executors.util import get_connector_executor
 11from airbyte.exceptions import PyAirbyteInputError
 12from airbyte.sources.base import Source
 13
 14
 15if TYPE_CHECKING:
 16    from pathlib import Path
 17
 18    from airbyte.callbacks import ConfigChangeCallback
 19
 20
 21def get_connector(
 22    name: str,
 23    config: dict[str, Any] | None = None,
 24    *,
 25    version: str | None = None,
 26    pip_url: str | None = None,
 27    local_executable: Path | str | None = None,
 28    install_if_missing: bool = True,
 29) -> Source:
 30    """Deprecated. Use get_source instead."""
 31    warnings.warn(
 32        "The `get_connector()` function is deprecated and will be removed in a future version."
 33        "Please use `get_source()` instead.",
 34        DeprecationWarning,
 35        stacklevel=2,
 36    )
 37    return get_source(
 38        name=name,
 39        config=config,
 40        version=version,
 41        pip_url=pip_url,
 42        local_executable=local_executable,
 43        install_if_missing=install_if_missing,
 44    )
 45
 46
 47def get_source(  # noqa: PLR0913 # Too many arguments
 48    name: str,
 49    config: dict[str, Any] | None = None,
 50    *,
 51    config_change_callback: ConfigChangeCallback | None = None,
 52    streams: str | list[str] | None = None,
 53    version: str | None = None,
 54    use_python: bool | Path | str | None = None,
 55    pip_url: str | None = None,
 56    local_executable: Path | str | None = None,
 57    docker_image: bool | str | None = None,
 58    use_host_network: bool = False,
 59    source_manifest: bool | dict | Path | str | None = None,
 60    install_if_missing: bool = True,
 61    install_root: Path | None = None,
 62    no_executor: bool = False,
 63) -> Source:
 64    """Get a connector by name and version.
 65
 66    If an explicit install or execution method is requested (e.g. `local_executable`,
 67    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 68
 69    Otherwise, an appropriate method will be selected based on the available connector metadata:
 70    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 71       will be downloaded and used to to execute the connector.
 72    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 73    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 74       will be executed using Docker.
 75
 76    Args:
 77        name: connector name
 78        config: connector config - if not provided, you need to set it later via the set_config
 79            method.
 80        config_change_callback: callback function to be called when the connector config changes.
 81        streams: list of stream names to select for reading. If set to "*", all streams will be
 82            selected. If not provided, you can set it later via the `select_streams()` or
 83            `select_all_streams()` method.
 84        version: connector version - if not provided, the currently installed version will be used.
 85            If no version is installed, the latest available version will be used. The version can
 86            also be set to "latest" to force the use of the latest available version.
 87        use_python: (Optional.) Python interpreter specification:
 88            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
 89            - False: Use Docker instead.
 90            - Path: Use interpreter at this path.
 91            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
 92                installed, it will be installed by uv. (This generally adds less than 3 seconds
 93                to install times.)
 94        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 95            connector name.
 96        local_executable: If set, the connector will be assumed to already be installed and will be
 97            executed using this path or executable name. Otherwise, the connector will be installed
 98            automatically in a virtual environment.
 99        docker_image: If set, the connector will be executed using Docker. You can specify `True`
100            to use the default image for the connector, or you can specify a custom image name.
101            If `version` is specified and your image name does not already contain a tag
102            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
103        use_host_network: If set, along with docker_image, the connector will be executed using
104            the host network. This is useful for connectors that need to access resources on
105            the host machine, such as a local database. This parameter is ignored when
106            `docker_image` is not set.
107        source_manifest: If set, the connector will be executed based on a declarative YAML
108            source definition. This input can be `True` to attempt to auto-download a YAML spec,
109            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
110            the local file system, or `str` to pull the definition from a web URL.
111        install_if_missing: Whether to install the connector if it is not available locally. This
112            parameter is ignored when `local_executable` or `source_manifest` are set.
113        install_root: (Optional.) The root directory where the virtual environment will be
114            created. If not provided, the current working directory will be used.
115        no_executor: If True, use NoOpExecutor which fetches specs from the registry without
116            local installation. This is useful for scenarios where you need to validate
117            configurations but don't need to run the connector locally (e.g., deploying to Cloud).
118    """
119    executor = get_connector_executor(
120        name=name,
121        version=version,
122        use_python=use_python,
123        pip_url=pip_url,
124        local_executable=local_executable,
125        docker_image=docker_image,
126        use_host_network=use_host_network,
127        source_manifest=source_manifest,
128        install_if_missing=install_if_missing,
129        install_root=install_root,
130        no_executor=no_executor,
131    )
132
133    return Source(
134        name=name,
135        config=config,
136        config_change_callback=config_change_callback,
137        streams=streams,
138        executor=executor,
139    )
140
141
142def get_benchmark_source(
143    num_records: int | str = "5e5",
144    *,
145    install_if_missing: bool = True,
146) -> Source:
147    """Get a source for benchmarking.
148
149    This source will generate dummy records for performance benchmarking purposes.
150    You can specify the number of records to generate using the `num_records` parameter.
151    The `num_records` parameter can be an integer or a string in scientific notation.
152    For example, `"5e6"` will generate 5 million records. If underscores are providing
153    within a numeric a string, they will be ignored.
154
155    Args:
156        num_records: The number of records to generate. Defaults to "5e5", or
157            500,000 records.
158            Can be an integer (`1000`) or a string in scientific notation.
159            For example, `"5e6"` will generate 5 million records.
160        install_if_missing: Whether to install the source if it is not available locally.
161
162    Returns:
163        Source: The source object for benchmarking.
164    """
165    if isinstance(num_records, str):
166        try:
167            num_records = int(Decimal(num_records.replace("_", "")))
168        except InvalidOperation as ex:
169            raise PyAirbyteInputError(
170                message="Invalid number format.",
171                original_exception=ex,
172                input_value=str(num_records),
173            ) from None
174
175    return get_source(
176        name="source-e2e-test",
177        docker_image=True,
178        # docker_image="airbyte/source-e2e-test:latest",
179        config={
180            "type": "BENCHMARK",
181            "schema": "FIVE_STRING_COLUMNS",
182            "terminationCondition": {
183                "type": "MAX_RECORDS",
184                "max": num_records,
185            },
186        },
187        streams="*",
188        install_if_missing=install_if_missing,
189    )
190
191
192__all__ = [
193    "get_source",
194    "get_benchmark_source",
195]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, use_python: bool | pathlib.Path | str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None, no_executor: bool = False) -> airbyte.Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    use_python: bool | Path | str | None = None,
 56    pip_url: str | None = None,
 57    local_executable: Path | str | None = None,
 58    docker_image: bool | str | None = None,
 59    use_host_network: bool = False,
 60    source_manifest: bool | dict | Path | str | None = None,
 61    install_if_missing: bool = True,
 62    install_root: Path | None = None,
 63    no_executor: bool = False,
 64) -> Source:
 65    """Get a connector by name and version.
 66
 67    If an explicit install or execution method is requested (e.g. `local_executable`,
 68    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 69
 70    Otherwise, an appropriate method will be selected based on the available connector metadata:
 71    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 72       will be downloaded and used to to execute the connector.
 73    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 74    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 75       will be executed using Docker.
 76
 77    Args:
 78        name: connector name
 79        config: connector config - if not provided, you need to set it later via the set_config
 80            method.
 81        config_change_callback: callback function to be called when the connector config changes.
 82        streams: list of stream names to select for reading. If set to "*", all streams will be
 83            selected. If not provided, you can set it later via the `select_streams()` or
 84            `select_all_streams()` method.
 85        version: connector version - if not provided, the currently installed version will be used.
 86            If no version is installed, the latest available version will be used. The version can
 87            also be set to "latest" to force the use of the latest available version.
 88        use_python: (Optional.) Python interpreter specification:
 89            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
 90            - False: Use Docker instead.
 91            - Path: Use interpreter at this path.
 92            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
 93                installed, it will be installed by uv. (This generally adds less than 3 seconds
 94                to install times.)
 95        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 96            connector name.
 97        local_executable: If set, the connector will be assumed to already be installed and will be
 98            executed using this path or executable name. Otherwise, the connector will be installed
 99            automatically in a virtual environment.
100        docker_image: If set, the connector will be executed using Docker. You can specify `True`
101            to use the default image for the connector, or you can specify a custom image name.
102            If `version` is specified and your image name does not already contain a tag
103            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
104        use_host_network: If set, along with docker_image, the connector will be executed using
105            the host network. This is useful for connectors that need to access resources on
106            the host machine, such as a local database. This parameter is ignored when
107            `docker_image` is not set.
108        source_manifest: If set, the connector will be executed based on a declarative YAML
109            source definition. This input can be `True` to attempt to auto-download a YAML spec,
110            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
111            the local file system, or `str` to pull the definition from a web URL.
112        install_if_missing: Whether to install the connector if it is not available locally. This
113            parameter is ignored when `local_executable` or `source_manifest` are set.
114        install_root: (Optional.) The root directory where the virtual environment will be
115            created. If not provided, the current working directory will be used.
116        no_executor: If True, use NoOpExecutor which fetches specs from the registry without
117            local installation. This is useful for scenarios where you need to validate
118            configurations but don't need to run the connector locally (e.g., deploying to Cloud).
119    """
120    executor = get_connector_executor(
121        name=name,
122        version=version,
123        use_python=use_python,
124        pip_url=pip_url,
125        local_executable=local_executable,
126        docker_image=docker_image,
127        use_host_network=use_host_network,
128        source_manifest=source_manifest,
129        install_if_missing=install_if_missing,
130        install_root=install_root,
131        no_executor=no_executor,
132    )
133
134    return Source(
135        name=name,
136        config=config,
137        config_change_callback=config_change_callback,
138        streams=streams,
139        executor=executor,
140    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • use_python: (Optional.) Python interpreter specification:
    • True: Use current Python interpreter. (Inferred if pip_url is set.)
    • False: Use Docker instead.
    • Path: Use interpreter at this path.
    • str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
  • no_executor: If True, use NoOpExecutor which fetches specs from the registry without local installation. This is useful for scenarios where you need to validate configurations but don't need to run the connector locally (e.g., deploying to Cloud).
def get_benchmark_source( num_records: int | str = '5e5', *, install_if_missing: bool = True) -> airbyte.Source:
143def get_benchmark_source(
144    num_records: int | str = "5e5",
145    *,
146    install_if_missing: bool = True,
147) -> Source:
148    """Get a source for benchmarking.
149
150    This source will generate dummy records for performance benchmarking purposes.
151    You can specify the number of records to generate using the `num_records` parameter.
152    The `num_records` parameter can be an integer or a string in scientific notation.
153    For example, `"5e6"` will generate 5 million records. If underscores are providing
154    within a numeric a string, they will be ignored.
155
156    Args:
157        num_records: The number of records to generate. Defaults to "5e5", or
158            500,000 records.
159            Can be an integer (`1000`) or a string in scientific notation.
160            For example, `"5e6"` will generate 5 million records.
161        install_if_missing: Whether to install the source if it is not available locally.
162
163    Returns:
164        Source: The source object for benchmarking.
165    """
166    if isinstance(num_records, str):
167        try:
168            num_records = int(Decimal(num_records.replace("_", "")))
169        except InvalidOperation as ex:
170            raise PyAirbyteInputError(
171                message="Invalid number format.",
172                original_exception=ex,
173                input_value=str(num_records),
174            ) from None
175
176    return get_source(
177        name="source-e2e-test",
178        docker_image=True,
179        # docker_image="airbyte/source-e2e-test:latest",
180        config={
181            "type": "BENCHMARK",
182            "schema": "FIVE_STRING_COLUMNS",
183            "terminationCondition": {
184                "type": "MAX_RECORDS",
185                "max": num_records,
186            },
187        },
188        streams="*",
189        install_if_missing=install_if_missing,
190    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records: The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
  • install_if_missing: Whether to install the source if it is not available locally.
Returns:

Source: The source object for benchmarking.