airbyte.sources.util

Utility functions for working with sources.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Utility functions for working with sources."""
  3
  4from __future__ import annotations
  5
  6import warnings
  7from decimal import Decimal, InvalidOperation
  8from typing import TYPE_CHECKING, Any
  9
 10from airbyte._executors.util import get_connector_executor
 11from airbyte.exceptions import PyAirbyteInputError
 12from airbyte.sources.base import Source
 13
 14
 15if TYPE_CHECKING:
 16    from pathlib import Path
 17
 18    from airbyte.callbacks import ConfigChangeCallback
 19
 20
 21def get_connector(
 22    name: str,
 23    config: dict[str, Any] | None = None,
 24    *,
 25    version: str | None = None,
 26    pip_url: str | None = None,
 27    local_executable: Path | str | None = None,
 28    install_if_missing: bool = True,
 29) -> Source:
 30    """Deprecated. Use get_source instead."""
 31    warnings.warn(
 32        "The `get_connector()` function is deprecated and will be removed in a future version."
 33        "Please use `get_source()` instead.",
 34        DeprecationWarning,
 35        stacklevel=2,
 36    )
 37    return get_source(
 38        name=name,
 39        config=config,
 40        version=version,
 41        pip_url=pip_url,
 42        local_executable=local_executable,
 43        install_if_missing=install_if_missing,
 44    )
 45
 46
 47def get_source(  # noqa: PLR0913 # Too many arguments
 48    name: str,
 49    config: dict[str, Any] | None = None,
 50    *,
 51    config_change_callback: ConfigChangeCallback | None = None,
 52    streams: str | list[str] | None = None,
 53    version: str | None = None,
 54    use_python: bool | Path | str | None = None,
 55    pip_url: str | None = None,
 56    local_executable: Path | str | None = None,
 57    docker_image: bool | str | None = None,
 58    use_host_network: bool = False,
 59    source_manifest: bool | dict | Path | str | None = None,
 60    install_if_missing: bool = True,
 61    install_root: Path | None = None,
 62) -> Source:
 63    """Get a connector by name and version.
 64
 65    If an explicit install or execution method is requested (e.g. `local_executable`,
 66    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 67
 68    Otherwise, an appropriate method will be selected based on the available connector metadata:
 69    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 70       will be downloaded and used to to execute the connector.
 71    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 72    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 73       will be executed using Docker.
 74
 75    Args:
 76        name: connector name
 77        config: connector config - if not provided, you need to set it later via the set_config
 78            method.
 79        config_change_callback: callback function to be called when the connector config changes.
 80        streams: list of stream names to select for reading. If set to "*", all streams will be
 81            selected. If not provided, you can set it later via the `select_streams()` or
 82            `select_all_streams()` method.
 83        version: connector version - if not provided, the currently installed version will be used.
 84            If no version is installed, the latest available version will be used. The version can
 85            also be set to "latest" to force the use of the latest available version.
 86        use_python: (Optional.) Python interpreter specification:
 87            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
 88            - False: Use Docker instead.
 89            - Path: Use interpreter at this path.
 90            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
 91                installed, it will be installed by uv. (This generally adds less than 3 seconds
 92                to install times.)
 93        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 94            connector name.
 95        local_executable: If set, the connector will be assumed to already be installed and will be
 96            executed using this path or executable name. Otherwise, the connector will be installed
 97            automatically in a virtual environment.
 98        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 99            to use the default image for the connector, or you can specify a custom image name.
100            If `version` is specified and your image name does not already contain a tag
101            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
102        use_host_network: If set, along with docker_image, the connector will be executed using
103            the host network. This is useful for connectors that need to access resources on
104            the host machine, such as a local database. This parameter is ignored when
105            `docker_image` is not set.
106        source_manifest: If set, the connector will be executed based on a declarative YAML
107            source definition. This input can be `True` to attempt to auto-download a YAML spec,
108            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
109            the local file system, or `str` to pull the definition from a web URL.
110        install_if_missing: Whether to install the connector if it is not available locally. This
111            parameter is ignored when `local_executable` or `source_manifest` are set.
112        install_root: (Optional.) The root directory where the virtual environment will be
113            created. If not provided, the current working directory will be used.
114    """
115    return Source(
116        name=name,
117        config=config,
118        config_change_callback=config_change_callback,
119        streams=streams,
120        executor=get_connector_executor(
121            name=name,
122            version=version,
123            use_python=use_python,
124            pip_url=pip_url,
125            local_executable=local_executable,
126            docker_image=docker_image,
127            use_host_network=use_host_network,
128            source_manifest=source_manifest,
129            install_if_missing=install_if_missing,
130            install_root=install_root,
131        ),
132    )
133
134
135def get_benchmark_source(
136    num_records: int | str = "5e5",
137    *,
138    install_if_missing: bool = True,
139) -> Source:
140    """Get a source for benchmarking.
141
142    This source will generate dummy records for performance benchmarking purposes.
143    You can specify the number of records to generate using the `num_records` parameter.
144    The `num_records` parameter can be an integer or a string in scientific notation.
145    For example, `"5e6"` will generate 5 million records. If underscores are providing
146    within a numeric a string, they will be ignored.
147
148    Args:
149        num_records: The number of records to generate. Defaults to "5e5", or
150            500,000 records.
151            Can be an integer (`1000`) or a string in scientific notation.
152            For example, `"5e6"` will generate 5 million records.
153        install_if_missing: Whether to install the source if it is not available locally.
154
155    Returns:
156        Source: The source object for benchmarking.
157    """
158    if isinstance(num_records, str):
159        try:
160            num_records = int(Decimal(num_records.replace("_", "")))
161        except InvalidOperation as ex:
162            raise PyAirbyteInputError(
163                message="Invalid number format.",
164                original_exception=ex,
165                input_value=str(num_records),
166            ) from None
167
168    return get_source(
169        name="source-e2e-test",
170        docker_image=True,
171        # docker_image="airbyte/source-e2e-test:latest",
172        config={
173            "type": "BENCHMARK",
174            "schema": "FIVE_STRING_COLUMNS",
175            "terminationCondition": {
176                "type": "MAX_RECORDS",
177                "max": num_records,
178            },
179        },
180        streams="*",
181        install_if_missing=install_if_missing,
182    )
183
184
185__all__ = [
186    "get_source",
187    "get_benchmark_source",
188]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, use_python: bool | pathlib.Path | str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> airbyte.Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    use_python: bool | Path | str | None = None,
 56    pip_url: str | None = None,
 57    local_executable: Path | str | None = None,
 58    docker_image: bool | str | None = None,
 59    use_host_network: bool = False,
 60    source_manifest: bool | dict | Path | str | None = None,
 61    install_if_missing: bool = True,
 62    install_root: Path | None = None,
 63) -> Source:
 64    """Get a connector by name and version.
 65
 66    If an explicit install or execution method is requested (e.g. `local_executable`,
 67    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 68
 69    Otherwise, an appropriate method will be selected based on the available connector metadata:
 70    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 71       will be downloaded and used to to execute the connector.
 72    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 73    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 74       will be executed using Docker.
 75
 76    Args:
 77        name: connector name
 78        config: connector config - if not provided, you need to set it later via the set_config
 79            method.
 80        config_change_callback: callback function to be called when the connector config changes.
 81        streams: list of stream names to select for reading. If set to "*", all streams will be
 82            selected. If not provided, you can set it later via the `select_streams()` or
 83            `select_all_streams()` method.
 84        version: connector version - if not provided, the currently installed version will be used.
 85            If no version is installed, the latest available version will be used. The version can
 86            also be set to "latest" to force the use of the latest available version.
 87        use_python: (Optional.) Python interpreter specification:
 88            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
 89            - False: Use Docker instead.
 90            - Path: Use interpreter at this path.
 91            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
 92                installed, it will be installed by uv. (This generally adds less than 3 seconds
 93                to install times.)
 94        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 95            connector name.
 96        local_executable: If set, the connector will be assumed to already be installed and will be
 97            executed using this path or executable name. Otherwise, the connector will be installed
 98            automatically in a virtual environment.
 99        docker_image: If set, the connector will be executed using Docker. You can specify `True`
100            to use the default image for the connector, or you can specify a custom image name.
101            If `version` is specified and your image name does not already contain a tag
102            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
103        use_host_network: If set, along with docker_image, the connector will be executed using
104            the host network. This is useful for connectors that need to access resources on
105            the host machine, such as a local database. This parameter is ignored when
106            `docker_image` is not set.
107        source_manifest: If set, the connector will be executed based on a declarative YAML
108            source definition. This input can be `True` to attempt to auto-download a YAML spec,
109            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
110            the local file system, or `str` to pull the definition from a web URL.
111        install_if_missing: Whether to install the connector if it is not available locally. This
112            parameter is ignored when `local_executable` or `source_manifest` are set.
113        install_root: (Optional.) The root directory where the virtual environment will be
114            created. If not provided, the current working directory will be used.
115    """
116    return Source(
117        name=name,
118        config=config,
119        config_change_callback=config_change_callback,
120        streams=streams,
121        executor=get_connector_executor(
122            name=name,
123            version=version,
124            use_python=use_python,
125            pip_url=pip_url,
126            local_executable=local_executable,
127            docker_image=docker_image,
128            use_host_network=use_host_network,
129            source_manifest=source_manifest,
130            install_if_missing=install_if_missing,
131            install_root=install_root,
132        ),
133    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • use_python: (Optional.) Python interpreter specification:
    • True: Use current Python interpreter. (Inferred if pip_url is set.)
    • False: Use Docker instead.
    • Path: Use interpreter at this path.
    • str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def get_benchmark_source( num_records: int | str = '5e5', *, install_if_missing: bool = True) -> airbyte.Source:
136def get_benchmark_source(
137    num_records: int | str = "5e5",
138    *,
139    install_if_missing: bool = True,
140) -> Source:
141    """Get a source for benchmarking.
142
143    This source will generate dummy records for performance benchmarking purposes.
144    You can specify the number of records to generate using the `num_records` parameter.
145    The `num_records` parameter can be an integer or a string in scientific notation.
146    For example, `"5e6"` will generate 5 million records. If underscores are providing
147    within a numeric a string, they will be ignored.
148
149    Args:
150        num_records: The number of records to generate. Defaults to "5e5", or
151            500,000 records.
152            Can be an integer (`1000`) or a string in scientific notation.
153            For example, `"5e6"` will generate 5 million records.
154        install_if_missing: Whether to install the source if it is not available locally.
155
156    Returns:
157        Source: The source object for benchmarking.
158    """
159    if isinstance(num_records, str):
160        try:
161            num_records = int(Decimal(num_records.replace("_", "")))
162        except InvalidOperation as ex:
163            raise PyAirbyteInputError(
164                message="Invalid number format.",
165                original_exception=ex,
166                input_value=str(num_records),
167            ) from None
168
169    return get_source(
170        name="source-e2e-test",
171        docker_image=True,
172        # docker_image="airbyte/source-e2e-test:latest",
173        config={
174            "type": "BENCHMARK",
175            "schema": "FIVE_STRING_COLUMNS",
176            "terminationCondition": {
177                "type": "MAX_RECORDS",
178                "max": num_records,
179            },
180        },
181        streams="*",
182        install_if_missing=install_if_missing,
183    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records: The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
  • install_if_missing: Whether to install the source if it is not available locally.
Returns:

Source: The source object for benchmarking.