airbyte.sources.util
Utility functions for working with sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Utility functions for working with sources.""" 3 4from __future__ import annotations 5 6import warnings 7from decimal import Decimal, InvalidOperation 8from typing import TYPE_CHECKING, Any 9 10from airbyte._executors.util import get_connector_executor 11from airbyte.exceptions import PyAirbyteInputError 12from airbyte.sources.base import Source 13 14 15if TYPE_CHECKING: 16 from pathlib import Path 17 18 from airbyte.callbacks import ConfigChangeCallback 19 20 21def get_connector( 22 name: str, 23 config: dict[str, Any] | None = None, 24 *, 25 version: str | None = None, 26 pip_url: str | None = None, 27 local_executable: Path | str | None = None, 28 install_if_missing: bool = True, 29) -> Source: 30 """Deprecated. Use get_source instead.""" 31 warnings.warn( 32 "The `get_connector()` function is deprecated and will be removed in a future version." 33 "Please use `get_source()` instead.", 34 DeprecationWarning, 35 stacklevel=2, 36 ) 37 return get_source( 38 name=name, 39 config=config, 40 version=version, 41 pip_url=pip_url, 42 local_executable=local_executable, 43 install_if_missing=install_if_missing, 44 ) 45 46 47def get_source( # noqa: PLR0913 # Too many arguments 48 name: str, 49 config: dict[str, Any] | None = None, 50 *, 51 config_change_callback: ConfigChangeCallback | None = None, 52 streams: str | list[str] | None = None, 53 version: str | None = None, 54 pip_url: str | None = None, 55 local_executable: Path | str | None = None, 56 docker_image: bool | str | None = None, 57 use_host_network: bool = False, 58 source_manifest: bool | dict | Path | str | None = None, 59 install_if_missing: bool = True, 60 install_root: Path | None = None, 61) -> Source: 62 """Get a connector by name and version. 63 64 If an explicit install or execution method is requested (e.g. `local_executable`, 65 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 66 67 Otherwise, an appropriate method will be selected based on the available connector metadata: 68 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 69 will be downloaded and used to to execute the connector. 70 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 71 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 72 will be executed using Docker. 73 74 Args: 75 name: connector name 76 config: connector config - if not provided, you need to set it later via the set_config 77 method. 78 config_change_callback: callback function to be called when the connector config changes. 79 streams: list of stream names to select for reading. If set to "*", all streams will be 80 selected. If not provided, you can set it later via the `select_streams()` or 81 `select_all_streams()` method. 82 version: connector version - if not provided, the currently installed version will be used. 83 If no version is installed, the latest available version will be used. The version can 84 also be set to "latest" to force the use of the latest available version. 85 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 86 connector name. 87 local_executable: If set, the connector will be assumed to already be installed and will be 88 executed using this path or executable name. Otherwise, the connector will be installed 89 automatically in a virtual environment. 90 docker_image: If set, the connector will be executed using Docker. You can specify `True` 91 to use the default image for the connector, or you can specify a custom image name. 92 If `version` is specified and your image name does not already contain a tag 93 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 94 use_host_network: If set, along with docker_image, the connector will be executed using 95 the host network. This is useful for connectors that need to access resources on 96 the host machine, such as a local database. This parameter is ignored when 97 `docker_image` is not set. 98 source_manifest: If set, the connector will be executed based on a declarative YAML 99 source definition. This input can be `True` to attempt to auto-download a YAML spec, 100 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 101 the local file system, or `str` to pull the definition from a web URL. 102 install_if_missing: Whether to install the connector if it is not available locally. This 103 parameter is ignored when `local_executable` or `source_manifest` are set. 104 install_root: (Optional.) The root directory where the virtual environment will be 105 created. If not provided, the current working directory will be used. 106 """ 107 return Source( 108 name=name, 109 config=config, 110 config_change_callback=config_change_callback, 111 streams=streams, 112 executor=get_connector_executor( 113 name=name, 114 version=version, 115 pip_url=pip_url, 116 local_executable=local_executable, 117 docker_image=docker_image, 118 use_host_network=use_host_network, 119 source_manifest=source_manifest, 120 install_if_missing=install_if_missing, 121 install_root=install_root, 122 ), 123 ) 124 125 126def get_benchmark_source( 127 num_records: int | str = "5e5", 128) -> Source: 129 """Get a source for benchmarking. 130 131 This source will generate dummy records for performance benchmarking purposes. 132 You can specify the number of records to generate using the `num_records` parameter. 133 The `num_records` parameter can be an integer or a string in scientific notation. 134 For example, `"5e6"` will generate 5 million records. If underscores are providing 135 within a numeric a string, they will be ignored. 136 137 Args: 138 num_records (int | str): The number of records to generate. Defaults to "5e5", or 139 500,000 records. 140 Can be an integer (`1000`) or a string in scientific notation. 141 For example, `"5e6"` will generate 5 million records. 142 143 Returns: 144 Source: The source object for benchmarking. 145 """ 146 if isinstance(num_records, str): 147 try: 148 num_records = int(Decimal(num_records.replace("_", ""))) 149 except InvalidOperation as ex: 150 raise PyAirbyteInputError( 151 message="Invalid number format.", 152 original_exception=ex, 153 input_value=str(num_records), 154 ) from None 155 156 return get_source( 157 name="source-e2e-test", 158 docker_image=True, 159 # docker_image="airbyte/source-e2e-test:latest", 160 config={ 161 "type": "BENCHMARK", 162 "schema": "FIVE_STRING_COLUMNS", 163 "terminationCondition": { 164 "type": "MAX_RECORDS", 165 "max": num_records, 166 }, 167 }, 168 streams="*", 169 ) 170 171 172__all__ = [ 173 "get_source", 174 "get_benchmark_source", 175]
48def get_source( # noqa: PLR0913 # Too many arguments 49 name: str, 50 config: dict[str, Any] | None = None, 51 *, 52 config_change_callback: ConfigChangeCallback | None = None, 53 streams: str | list[str] | None = None, 54 version: str | None = None, 55 pip_url: str | None = None, 56 local_executable: Path | str | None = None, 57 docker_image: bool | str | None = None, 58 use_host_network: bool = False, 59 source_manifest: bool | dict | Path | str | None = None, 60 install_if_missing: bool = True, 61 install_root: Path | None = None, 62) -> Source: 63 """Get a connector by name and version. 64 65 If an explicit install or execution method is requested (e.g. `local_executable`, 66 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 67 68 Otherwise, an appropriate method will be selected based on the available connector metadata: 69 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 70 will be downloaded and used to to execute the connector. 71 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 72 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 73 will be executed using Docker. 74 75 Args: 76 name: connector name 77 config: connector config - if not provided, you need to set it later via the set_config 78 method. 79 config_change_callback: callback function to be called when the connector config changes. 80 streams: list of stream names to select for reading. If set to "*", all streams will be 81 selected. If not provided, you can set it later via the `select_streams()` or 82 `select_all_streams()` method. 83 version: connector version - if not provided, the currently installed version will be used. 84 If no version is installed, the latest available version will be used. The version can 85 also be set to "latest" to force the use of the latest available version. 86 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 87 connector name. 88 local_executable: If set, the connector will be assumed to already be installed and will be 89 executed using this path or executable name. Otherwise, the connector will be installed 90 automatically in a virtual environment. 91 docker_image: If set, the connector will be executed using Docker. You can specify `True` 92 to use the default image for the connector, or you can specify a custom image name. 93 If `version` is specified and your image name does not already contain a tag 94 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 95 use_host_network: If set, along with docker_image, the connector will be executed using 96 the host network. This is useful for connectors that need to access resources on 97 the host machine, such as a local database. This parameter is ignored when 98 `docker_image` is not set. 99 source_manifest: If set, the connector will be executed based on a declarative YAML 100 source definition. This input can be `True` to attempt to auto-download a YAML spec, 101 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 102 the local file system, or `str` to pull the definition from a web URL. 103 install_if_missing: Whether to install the connector if it is not available locally. This 104 parameter is ignored when `local_executable` or `source_manifest` are set. 105 install_root: (Optional.) The root directory where the virtual environment will be 106 created. If not provided, the current working directory will be used. 107 """ 108 return Source( 109 name=name, 110 config=config, 111 config_change_callback=config_change_callback, 112 streams=streams, 113 executor=get_connector_executor( 114 name=name, 115 version=version, 116 pip_url=pip_url, 117 local_executable=local_executable, 118 docker_image=docker_image, 119 use_host_network=use_host_network, 120 source_manifest=source_manifest, 121 install_if_missing=install_if_missing, 122 install_root=install_root, 123 ), 124 )
Get a connector by name and version.
If an explicit install or execution method is requested (e.g. local_executable
,
docker_image
, pip_url
, source_manifest
), the connector will be executed using this method.
Otherwise, an appropriate method will be selected based on the available connector metadata:
- If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
- Else, if the connector is registered and has a PyPI package, it will be installed via pip.
- Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
True
to use the default image for the connector, or you can specify a custom image name. Ifversion
is specified and your image name does not already contain a tag (e.g.my-image:latest
), the version will be appended as a tag (e.g.my-image:0.1.0
). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_image
is not set. - source_manifest: If set, the connector will be executed based on a declarative YAML
source definition. This input can be
True
to attempt to auto-download a YAML spec,dict
to accept a Python dictionary as the manifest,Path
to pull a manifest from the local file system, orstr
to pull the definition from a web URL. - install_if_missing: Whether to install the connector if it is not available locally. This
parameter is ignored when
local_executable
orsource_manifest
are set. - install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
127def get_benchmark_source( 128 num_records: int | str = "5e5", 129) -> Source: 130 """Get a source for benchmarking. 131 132 This source will generate dummy records for performance benchmarking purposes. 133 You can specify the number of records to generate using the `num_records` parameter. 134 The `num_records` parameter can be an integer or a string in scientific notation. 135 For example, `"5e6"` will generate 5 million records. If underscores are providing 136 within a numeric a string, they will be ignored. 137 138 Args: 139 num_records (int | str): The number of records to generate. Defaults to "5e5", or 140 500,000 records. 141 Can be an integer (`1000`) or a string in scientific notation. 142 For example, `"5e6"` will generate 5 million records. 143 144 Returns: 145 Source: The source object for benchmarking. 146 """ 147 if isinstance(num_records, str): 148 try: 149 num_records = int(Decimal(num_records.replace("_", ""))) 150 except InvalidOperation as ex: 151 raise PyAirbyteInputError( 152 message="Invalid number format.", 153 original_exception=ex, 154 input_value=str(num_records), 155 ) from None 156 157 return get_source( 158 name="source-e2e-test", 159 docker_image=True, 160 # docker_image="airbyte/source-e2e-test:latest", 161 config={ 162 "type": "BENCHMARK", 163 "schema": "FIVE_STRING_COLUMNS", 164 "terminationCondition": { 165 "type": "MAX_RECORDS", 166 "max": num_records, 167 }, 168 }, 169 streams="*", 170 )
Get a source for benchmarking.
This source will generate dummy records for performance benchmarking purposes.
You can specify the number of records to generate using the num_records
parameter.
The num_records
parameter can be an integer or a string in scientific notation.
For example, "5e6"
will generate 5 million records. If underscores are providing
within a numeric a string, they will be ignored.
Arguments:
- num_records (int | str): The number of records to generate. Defaults to "5e5", or
500,000 records.
Can be an integer (
1000
) or a string in scientific notation. For example,"5e6"
will generate 5 million records.
Returns:
Source: The source object for benchmarking.