airbyte.sources.util

Utility functions for working with sources.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Utility functions for working with sources."""
  3
  4from __future__ import annotations
  5
  6import warnings
  7from decimal import Decimal, InvalidOperation
  8from typing import TYPE_CHECKING, Any
  9
 10from airbyte._executors.util import get_connector_executor
 11from airbyte.exceptions import PyAirbyteInputError
 12from airbyte.sources.base import Source
 13
 14
 15if TYPE_CHECKING:
 16    from pathlib import Path
 17
 18    from airbyte.callbacks import ConfigChangeCallback
 19
 20
 21def get_connector(
 22    name: str,
 23    config: dict[str, Any] | None = None,
 24    *,
 25    version: str | None = None,
 26    pip_url: str | None = None,
 27    local_executable: Path | str | None = None,
 28    install_if_missing: bool = True,
 29) -> Source:
 30    """Deprecated. Use get_source instead."""
 31    warnings.warn(
 32        "The `get_connector()` function is deprecated and will be removed in a future version."
 33        "Please use `get_source()` instead.",
 34        DeprecationWarning,
 35        stacklevel=2,
 36    )
 37    return get_source(
 38        name=name,
 39        config=config,
 40        version=version,
 41        pip_url=pip_url,
 42        local_executable=local_executable,
 43        install_if_missing=install_if_missing,
 44    )
 45
 46
 47def get_source(  # noqa: PLR0913 # Too many arguments
 48    name: str,
 49    config: dict[str, Any] | None = None,
 50    *,
 51    config_change_callback: ConfigChangeCallback | None = None,
 52    streams: str | list[str] | None = None,
 53    version: str | None = None,
 54    pip_url: str | None = None,
 55    local_executable: Path | str | None = None,
 56    docker_image: bool | str | None = None,
 57    use_host_network: bool = False,
 58    source_manifest: bool | dict | Path | str | None = None,
 59    install_if_missing: bool = True,
 60    install_root: Path | None = None,
 61) -> Source:
 62    """Get a connector by name and version.
 63
 64    If an explicit install or execution method is requested (e.g. `local_executable`,
 65    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 66
 67    Otherwise, an appropriate method will be selected based on the available connector metadata:
 68    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 69       will be downloaded and used to to execute the connector.
 70    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 71    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 72       will be executed using Docker.
 73
 74    Args:
 75        name: connector name
 76        config: connector config - if not provided, you need to set it later via the set_config
 77            method.
 78        config_change_callback: callback function to be called when the connector config changes.
 79        streams: list of stream names to select for reading. If set to "*", all streams will be
 80            selected. If not provided, you can set it later via the `select_streams()` or
 81            `select_all_streams()` method.
 82        version: connector version - if not provided, the currently installed version will be used.
 83            If no version is installed, the latest available version will be used. The version can
 84            also be set to "latest" to force the use of the latest available version.
 85        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 86            connector name.
 87        local_executable: If set, the connector will be assumed to already be installed and will be
 88            executed using this path or executable name. Otherwise, the connector will be installed
 89            automatically in a virtual environment.
 90        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 91            to use the default image for the connector, or you can specify a custom image name.
 92            If `version` is specified and your image name does not already contain a tag
 93            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 94        use_host_network: If set, along with docker_image, the connector will be executed using
 95            the host network. This is useful for connectors that need to access resources on
 96            the host machine, such as a local database. This parameter is ignored when
 97            `docker_image` is not set.
 98        source_manifest: If set, the connector will be executed based on a declarative YAML
 99            source definition. This input can be `True` to attempt to auto-download a YAML spec,
100            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
101            the local file system, or `str` to pull the definition from a web URL.
102        install_if_missing: Whether to install the connector if it is not available locally. This
103            parameter is ignored when `local_executable` or `source_manifest` are set.
104        install_root: (Optional.) The root directory where the virtual environment will be
105            created. If not provided, the current working directory will be used.
106    """
107    return Source(
108        name=name,
109        config=config,
110        config_change_callback=config_change_callback,
111        streams=streams,
112        executor=get_connector_executor(
113            name=name,
114            version=version,
115            pip_url=pip_url,
116            local_executable=local_executable,
117            docker_image=docker_image,
118            use_host_network=use_host_network,
119            source_manifest=source_manifest,
120            install_if_missing=install_if_missing,
121            install_root=install_root,
122        ),
123    )
124
125
126def get_benchmark_source(
127    num_records: int | str = "5e5",
128    *,
129    install_if_missing: bool = True,
130) -> Source:
131    """Get a source for benchmarking.
132
133    This source will generate dummy records for performance benchmarking purposes.
134    You can specify the number of records to generate using the `num_records` parameter.
135    The `num_records` parameter can be an integer or a string in scientific notation.
136    For example, `"5e6"` will generate 5 million records. If underscores are providing
137    within a numeric a string, they will be ignored.
138
139    Args:
140        num_records: The number of records to generate. Defaults to "5e5", or
141            500,000 records.
142            Can be an integer (`1000`) or a string in scientific notation.
143            For example, `"5e6"` will generate 5 million records.
144        install_if_missing: Whether to install the source if it is not available locally.
145
146    Returns:
147        Source: The source object for benchmarking.
148    """
149    if isinstance(num_records, str):
150        try:
151            num_records = int(Decimal(num_records.replace("_", "")))
152        except InvalidOperation as ex:
153            raise PyAirbyteInputError(
154                message="Invalid number format.",
155                original_exception=ex,
156                input_value=str(num_records),
157            ) from None
158
159    return get_source(
160        name="source-e2e-test",
161        docker_image=True,
162        # docker_image="airbyte/source-e2e-test:latest",
163        config={
164            "type": "BENCHMARK",
165            "schema": "FIVE_STRING_COLUMNS",
166            "terminationCondition": {
167                "type": "MAX_RECORDS",
168                "max": num_records,
169            },
170        },
171        streams="*",
172        install_if_missing=install_if_missing,
173    )
174
175
176__all__ = [
177    "get_source",
178    "get_benchmark_source",
179]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> airbyte.Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    pip_url: str | None = None,
 56    local_executable: Path | str | None = None,
 57    docker_image: bool | str | None = None,
 58    use_host_network: bool = False,
 59    source_manifest: bool | dict | Path | str | None = None,
 60    install_if_missing: bool = True,
 61    install_root: Path | None = None,
 62) -> Source:
 63    """Get a connector by name and version.
 64
 65    If an explicit install or execution method is requested (e.g. `local_executable`,
 66    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 67
 68    Otherwise, an appropriate method will be selected based on the available connector metadata:
 69    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 70       will be downloaded and used to to execute the connector.
 71    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 72    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 73       will be executed using Docker.
 74
 75    Args:
 76        name: connector name
 77        config: connector config - if not provided, you need to set it later via the set_config
 78            method.
 79        config_change_callback: callback function to be called when the connector config changes.
 80        streams: list of stream names to select for reading. If set to "*", all streams will be
 81            selected. If not provided, you can set it later via the `select_streams()` or
 82            `select_all_streams()` method.
 83        version: connector version - if not provided, the currently installed version will be used.
 84            If no version is installed, the latest available version will be used. The version can
 85            also be set to "latest" to force the use of the latest available version.
 86        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 87            connector name.
 88        local_executable: If set, the connector will be assumed to already be installed and will be
 89            executed using this path or executable name. Otherwise, the connector will be installed
 90            automatically in a virtual environment.
 91        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 92            to use the default image for the connector, or you can specify a custom image name.
 93            If `version` is specified and your image name does not already contain a tag
 94            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 95        use_host_network: If set, along with docker_image, the connector will be executed using
 96            the host network. This is useful for connectors that need to access resources on
 97            the host machine, such as a local database. This parameter is ignored when
 98            `docker_image` is not set.
 99        source_manifest: If set, the connector will be executed based on a declarative YAML
100            source definition. This input can be `True` to attempt to auto-download a YAML spec,
101            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
102            the local file system, or `str` to pull the definition from a web URL.
103        install_if_missing: Whether to install the connector if it is not available locally. This
104            parameter is ignored when `local_executable` or `source_manifest` are set.
105        install_root: (Optional.) The root directory where the virtual environment will be
106            created. If not provided, the current working directory will be used.
107    """
108    return Source(
109        name=name,
110        config=config,
111        config_change_callback=config_change_callback,
112        streams=streams,
113        executor=get_connector_executor(
114            name=name,
115            version=version,
116            pip_url=pip_url,
117            local_executable=local_executable,
118            docker_image=docker_image,
119            use_host_network=use_host_network,
120            source_manifest=source_manifest,
121            install_if_missing=install_if_missing,
122            install_root=install_root,
123        ),
124    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def get_benchmark_source( num_records: int | str = '5e5', *, install_if_missing: bool = True) -> airbyte.Source:
127def get_benchmark_source(
128    num_records: int | str = "5e5",
129    *,
130    install_if_missing: bool = True,
131) -> Source:
132    """Get a source for benchmarking.
133
134    This source will generate dummy records for performance benchmarking purposes.
135    You can specify the number of records to generate using the `num_records` parameter.
136    The `num_records` parameter can be an integer or a string in scientific notation.
137    For example, `"5e6"` will generate 5 million records. If underscores are providing
138    within a numeric a string, they will be ignored.
139
140    Args:
141        num_records: The number of records to generate. Defaults to "5e5", or
142            500,000 records.
143            Can be an integer (`1000`) or a string in scientific notation.
144            For example, `"5e6"` will generate 5 million records.
145        install_if_missing: Whether to install the source if it is not available locally.
146
147    Returns:
148        Source: The source object for benchmarking.
149    """
150    if isinstance(num_records, str):
151        try:
152            num_records = int(Decimal(num_records.replace("_", "")))
153        except InvalidOperation as ex:
154            raise PyAirbyteInputError(
155                message="Invalid number format.",
156                original_exception=ex,
157                input_value=str(num_records),
158            ) from None
159
160    return get_source(
161        name="source-e2e-test",
162        docker_image=True,
163        # docker_image="airbyte/source-e2e-test:latest",
164        config={
165            "type": "BENCHMARK",
166            "schema": "FIVE_STRING_COLUMNS",
167            "terminationCondition": {
168                "type": "MAX_RECORDS",
169                "max": num_records,
170            },
171        },
172        streams="*",
173        install_if_missing=install_if_missing,
174    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records: The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
  • install_if_missing: Whether to install the source if it is not available locally.
Returns:

Source: The source object for benchmarking.