airbyte.sources.util

Utility functions for working with sources.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Utility functions for working with sources."""
  3
  4from __future__ import annotations
  5
  6import warnings
  7from decimal import Decimal, InvalidOperation
  8from typing import TYPE_CHECKING, Any
  9
 10from airbyte._executors.util import get_connector_executor
 11from airbyte.exceptions import PyAirbyteInputError
 12from airbyte.sources.base import Source
 13
 14
 15if TYPE_CHECKING:
 16    from pathlib import Path
 17
 18    from airbyte.callbacks import ConfigChangeCallback
 19
 20
 21def get_connector(
 22    name: str,
 23    config: dict[str, Any] | None = None,
 24    *,
 25    version: str | None = None,
 26    pip_url: str | None = None,
 27    local_executable: Path | str | None = None,
 28    install_if_missing: bool = True,
 29) -> Source:
 30    """Deprecated. Use get_source instead."""
 31    warnings.warn(
 32        "The `get_connector()` function is deprecated and will be removed in a future version."
 33        "Please use `get_source()` instead.",
 34        DeprecationWarning,
 35        stacklevel=2,
 36    )
 37    return get_source(
 38        name=name,
 39        config=config,
 40        version=version,
 41        pip_url=pip_url,
 42        local_executable=local_executable,
 43        install_if_missing=install_if_missing,
 44    )
 45
 46
 47def get_source(  # noqa: PLR0913 # Too many arguments
 48    name: str,
 49    config: dict[str, Any] | None = None,
 50    *,
 51    config_change_callback: ConfigChangeCallback | None = None,
 52    streams: str | list[str] | None = None,
 53    version: str | None = None,
 54    pip_url: str | None = None,
 55    local_executable: Path | str | None = None,
 56    docker_image: bool | str | None = None,
 57    use_host_network: bool = False,
 58    source_manifest: bool | dict | Path | str | None = None,
 59    install_if_missing: bool = True,
 60    install_root: Path | None = None,
 61) -> Source:
 62    """Get a connector by name and version.
 63
 64    If an explicit install or execution method is requested (e.g. `local_executable`,
 65    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 66
 67    Otherwise, an appropriate method will be selected based on the available connector metadata:
 68    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 69       will be downloaded and used to to execute the connector.
 70    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 71    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 72       will be executed using Docker.
 73
 74    Args:
 75        name: connector name
 76        config: connector config - if not provided, you need to set it later via the set_config
 77            method.
 78        config_change_callback: callback function to be called when the connector config changes.
 79        streams: list of stream names to select for reading. If set to "*", all streams will be
 80            selected. If not provided, you can set it later via the `select_streams()` or
 81            `select_all_streams()` method.
 82        version: connector version - if not provided, the currently installed version will be used.
 83            If no version is installed, the latest available version will be used. The version can
 84            also be set to "latest" to force the use of the latest available version.
 85        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 86            connector name.
 87        local_executable: If set, the connector will be assumed to already be installed and will be
 88            executed using this path or executable name. Otherwise, the connector will be installed
 89            automatically in a virtual environment.
 90        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 91            to use the default image for the connector, or you can specify a custom image name.
 92            If `version` is specified and your image name does not already contain a tag
 93            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 94        use_host_network: If set, along with docker_image, the connector will be executed using
 95            the host network. This is useful for connectors that need to access resources on
 96            the host machine, such as a local database. This parameter is ignored when
 97            `docker_image` is not set.
 98        source_manifest: If set, the connector will be executed based on a declarative YAML
 99            source definition. This input can be `True` to attempt to auto-download a YAML spec,
100            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
101            the local file system, or `str` to pull the definition from a web URL.
102        install_if_missing: Whether to install the connector if it is not available locally. This
103            parameter is ignored when `local_executable` or `source_manifest` are set.
104        install_root: (Optional.) The root directory where the virtual environment will be
105            created. If not provided, the current working directory will be used.
106    """
107    return Source(
108        name=name,
109        config=config,
110        config_change_callback=config_change_callback,
111        streams=streams,
112        executor=get_connector_executor(
113            name=name,
114            version=version,
115            pip_url=pip_url,
116            local_executable=local_executable,
117            docker_image=docker_image,
118            use_host_network=use_host_network,
119            source_manifest=source_manifest,
120            install_if_missing=install_if_missing,
121            install_root=install_root,
122        ),
123    )
124
125
126def get_benchmark_source(
127    num_records: int | str = "5e5",
128) -> Source:
129    """Get a source for benchmarking.
130
131    This source will generate dummy records for performance benchmarking purposes.
132    You can specify the number of records to generate using the `num_records` parameter.
133    The `num_records` parameter can be an integer or a string in scientific notation.
134    For example, `"5e6"` will generate 5 million records. If underscores are providing
135    within a numeric a string, they will be ignored.
136
137    Args:
138        num_records (int | str): The number of records to generate. Defaults to "5e5", or
139            500,000 records.
140            Can be an integer (`1000`) or a string in scientific notation.
141            For example, `"5e6"` will generate 5 million records.
142
143    Returns:
144        Source: The source object for benchmarking.
145    """
146    if isinstance(num_records, str):
147        try:
148            num_records = int(Decimal(num_records.replace("_", "")))
149        except InvalidOperation as ex:
150            raise PyAirbyteInputError(
151                message="Invalid number format.",
152                original_exception=ex,
153                input_value=str(num_records),
154            ) from None
155
156    return get_source(
157        name="source-e2e-test",
158        docker_image=True,
159        # docker_image="airbyte/source-e2e-test:latest",
160        config={
161            "type": "BENCHMARK",
162            "schema": "FIVE_STRING_COLUMNS",
163            "terminationCondition": {
164                "type": "MAX_RECORDS",
165                "max": num_records,
166            },
167        },
168        streams="*",
169    )
170
171
172__all__ = [
173    "get_source",
174    "get_benchmark_source",
175]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> airbyte.Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    pip_url: str | None = None,
 56    local_executable: Path | str | None = None,
 57    docker_image: bool | str | None = None,
 58    use_host_network: bool = False,
 59    source_manifest: bool | dict | Path | str | None = None,
 60    install_if_missing: bool = True,
 61    install_root: Path | None = None,
 62) -> Source:
 63    """Get a connector by name and version.
 64
 65    If an explicit install or execution method is requested (e.g. `local_executable`,
 66    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 67
 68    Otherwise, an appropriate method will be selected based on the available connector metadata:
 69    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 70       will be downloaded and used to to execute the connector.
 71    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 72    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 73       will be executed using Docker.
 74
 75    Args:
 76        name: connector name
 77        config: connector config - if not provided, you need to set it later via the set_config
 78            method.
 79        config_change_callback: callback function to be called when the connector config changes.
 80        streams: list of stream names to select for reading. If set to "*", all streams will be
 81            selected. If not provided, you can set it later via the `select_streams()` or
 82            `select_all_streams()` method.
 83        version: connector version - if not provided, the currently installed version will be used.
 84            If no version is installed, the latest available version will be used. The version can
 85            also be set to "latest" to force the use of the latest available version.
 86        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 87            connector name.
 88        local_executable: If set, the connector will be assumed to already be installed and will be
 89            executed using this path or executable name. Otherwise, the connector will be installed
 90            automatically in a virtual environment.
 91        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 92            to use the default image for the connector, or you can specify a custom image name.
 93            If `version` is specified and your image name does not already contain a tag
 94            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 95        use_host_network: If set, along with docker_image, the connector will be executed using
 96            the host network. This is useful for connectors that need to access resources on
 97            the host machine, such as a local database. This parameter is ignored when
 98            `docker_image` is not set.
 99        source_manifest: If set, the connector will be executed based on a declarative YAML
100            source definition. This input can be `True` to attempt to auto-download a YAML spec,
101            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
102            the local file system, or `str` to pull the definition from a web URL.
103        install_if_missing: Whether to install the connector if it is not available locally. This
104            parameter is ignored when `local_executable` or `source_manifest` are set.
105        install_root: (Optional.) The root directory where the virtual environment will be
106            created. If not provided, the current working directory will be used.
107    """
108    return Source(
109        name=name,
110        config=config,
111        config_change_callback=config_change_callback,
112        streams=streams,
113        executor=get_connector_executor(
114            name=name,
115            version=version,
116            pip_url=pip_url,
117            local_executable=local_executable,
118            docker_image=docker_image,
119            use_host_network=use_host_network,
120            source_manifest=source_manifest,
121            install_if_missing=install_if_missing,
122            install_root=install_root,
123        ),
124    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def get_benchmark_source(num_records: int | str = '5e5') -> airbyte.Source:
127def get_benchmark_source(
128    num_records: int | str = "5e5",
129) -> Source:
130    """Get a source for benchmarking.
131
132    This source will generate dummy records for performance benchmarking purposes.
133    You can specify the number of records to generate using the `num_records` parameter.
134    The `num_records` parameter can be an integer or a string in scientific notation.
135    For example, `"5e6"` will generate 5 million records. If underscores are providing
136    within a numeric a string, they will be ignored.
137
138    Args:
139        num_records (int | str): The number of records to generate. Defaults to "5e5", or
140            500,000 records.
141            Can be an integer (`1000`) or a string in scientific notation.
142            For example, `"5e6"` will generate 5 million records.
143
144    Returns:
145        Source: The source object for benchmarking.
146    """
147    if isinstance(num_records, str):
148        try:
149            num_records = int(Decimal(num_records.replace("_", "")))
150        except InvalidOperation as ex:
151            raise PyAirbyteInputError(
152                message="Invalid number format.",
153                original_exception=ex,
154                input_value=str(num_records),
155            ) from None
156
157    return get_source(
158        name="source-e2e-test",
159        docker_image=True,
160        # docker_image="airbyte/source-e2e-test:latest",
161        config={
162            "type": "BENCHMARK",
163            "schema": "FIVE_STRING_COLUMNS",
164            "terminationCondition": {
165                "type": "MAX_RECORDS",
166                "max": num_records,
167            },
168        },
169        streams="*",
170    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records (int | str): The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
Returns:

Source: The source object for benchmarking.