airbyte.sources.util
Utility functions for working with sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Utility functions for working with sources.""" 3 4from __future__ import annotations 5 6import warnings 7from decimal import Decimal, InvalidOperation 8from typing import TYPE_CHECKING, Any 9 10from airbyte._executors.util import get_connector_executor 11from airbyte.exceptions import PyAirbyteInputError 12from airbyte.sources.base import Source 13 14 15if TYPE_CHECKING: 16 from pathlib import Path 17 18 from airbyte.callbacks import ConfigChangeCallback 19 20 21def get_connector( 22 name: str, 23 config: dict[str, Any] | None = None, 24 *, 25 version: str | None = None, 26 pip_url: str | None = None, 27 local_executable: Path | str | None = None, 28 install_if_missing: bool = True, 29) -> Source: 30 """Deprecated. Use get_source instead.""" 31 warnings.warn( 32 "The `get_connector()` function is deprecated and will be removed in a future version." 33 "Please use `get_source()` instead.", 34 DeprecationWarning, 35 stacklevel=2, 36 ) 37 return get_source( 38 name=name, 39 config=config, 40 version=version, 41 pip_url=pip_url, 42 local_executable=local_executable, 43 install_if_missing=install_if_missing, 44 ) 45 46 47def get_source( # noqa: PLR0913 # Too many arguments 48 name: str, 49 config: dict[str, Any] | None = None, 50 *, 51 config_change_callback: ConfigChangeCallback | None = None, 52 streams: str | list[str] | None = None, 53 version: str | None = None, 54 use_python: bool | Path | str | None = None, 55 pip_url: str | None = None, 56 local_executable: Path | str | None = None, 57 docker_image: bool | str | None = None, 58 use_host_network: bool = False, 59 source_manifest: bool | dict | Path | str | None = None, 60 install_if_missing: bool = True, 61 install_root: Path | None = None, 62 no_executor: bool = False, 63) -> Source: 64 """Get a connector by name and version. 65 66 If an explicit install or execution method is requested (e.g. `local_executable`, 67 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 68 69 Otherwise, an appropriate method will be selected based on the available connector metadata: 70 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 71 will be downloaded and used to to execute the connector. 72 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 73 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 74 will be executed using Docker. 75 76 Args: 77 name: connector name 78 config: connector config - if not provided, you need to set it later via the set_config 79 method. 80 config_change_callback: callback function to be called when the connector config changes. 81 streams: list of stream names to select for reading. If set to "*", all streams will be 82 selected. If not provided, you can set it later via the `select_streams()` or 83 `select_all_streams()` method. 84 version: connector version - if not provided, the currently installed version will be used. 85 If no version is installed, the latest available version will be used. The version can 86 also be set to "latest" to force the use of the latest available version. 87 use_python: (Optional.) Python interpreter specification: 88 - True: Use current Python interpreter. (Inferred if `pip_url` is set.) 89 - False: Use Docker instead. 90 - Path: Use interpreter at this path. 91 - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet 92 installed, it will be installed by uv. (This generally adds less than 3 seconds 93 to install times.) 94 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 95 connector name. 96 local_executable: If set, the connector will be assumed to already be installed and will be 97 executed using this path or executable name. Otherwise, the connector will be installed 98 automatically in a virtual environment. 99 docker_image: If set, the connector will be executed using Docker. You can specify `True` 100 to use the default image for the connector, or you can specify a custom image name. 101 If `version` is specified and your image name does not already contain a tag 102 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 103 use_host_network: If set, along with docker_image, the connector will be executed using 104 the host network. This is useful for connectors that need to access resources on 105 the host machine, such as a local database. This parameter is ignored when 106 `docker_image` is not set. 107 source_manifest: If set, the connector will be executed based on a declarative YAML 108 source definition. This input can be `True` to attempt to auto-download a YAML spec, 109 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 110 the local file system, or `str` to pull the definition from a web URL. 111 install_if_missing: Whether to install the connector if it is not available locally. This 112 parameter is ignored when `local_executable` or `source_manifest` are set. 113 install_root: (Optional.) The root directory where the virtual environment will be 114 created. If not provided, the current working directory will be used. 115 no_executor: If True, use NoOpExecutor which fetches specs from the registry without 116 local installation. This is useful for scenarios where you need to validate 117 configurations but don't need to run the connector locally (e.g., deploying to Cloud). 118 """ 119 executor = get_connector_executor( 120 name=name, 121 version=version, 122 use_python=use_python, 123 pip_url=pip_url, 124 local_executable=local_executable, 125 docker_image=docker_image, 126 use_host_network=use_host_network, 127 source_manifest=source_manifest, 128 install_if_missing=install_if_missing, 129 install_root=install_root, 130 no_executor=no_executor, 131 ) 132 133 return Source( 134 name=name, 135 config=config, 136 config_change_callback=config_change_callback, 137 streams=streams, 138 executor=executor, 139 ) 140 141 142def get_benchmark_source( 143 num_records: int | str = "5e5", 144 *, 145 install_if_missing: bool = True, 146) -> Source: 147 """Get a source for benchmarking. 148 149 This source will generate dummy records for performance benchmarking purposes. 150 You can specify the number of records to generate using the `num_records` parameter. 151 The `num_records` parameter can be an integer or a string in scientific notation. 152 For example, `"5e6"` will generate 5 million records. If underscores are providing 153 within a numeric a string, they will be ignored. 154 155 Args: 156 num_records: The number of records to generate. Defaults to "5e5", or 157 500,000 records. 158 Can be an integer (`1000`) or a string in scientific notation. 159 For example, `"5e6"` will generate 5 million records. 160 install_if_missing: Whether to install the source if it is not available locally. 161 162 Returns: 163 Source: The source object for benchmarking. 164 """ 165 if isinstance(num_records, str): 166 try: 167 num_records = int(Decimal(num_records.replace("_", ""))) 168 except InvalidOperation as ex: 169 raise PyAirbyteInputError( 170 message="Invalid number format.", 171 original_exception=ex, 172 input_value=str(num_records), 173 ) from None 174 175 return get_source( 176 name="source-e2e-test", 177 docker_image=True, 178 # docker_image="airbyte/source-e2e-test:latest", 179 config={ 180 "type": "BENCHMARK", 181 "schema": "FIVE_STRING_COLUMNS", 182 "terminationCondition": { 183 "type": "MAX_RECORDS", 184 "max": num_records, 185 }, 186 }, 187 streams="*", 188 install_if_missing=install_if_missing, 189 ) 190 191 192__all__ = [ 193 "get_source", 194 "get_benchmark_source", 195]
48def get_source( # noqa: PLR0913 # Too many arguments 49 name: str, 50 config: dict[str, Any] | None = None, 51 *, 52 config_change_callback: ConfigChangeCallback | None = None, 53 streams: str | list[str] | None = None, 54 version: str | None = None, 55 use_python: bool | Path | str | None = None, 56 pip_url: str | None = None, 57 local_executable: Path | str | None = None, 58 docker_image: bool | str | None = None, 59 use_host_network: bool = False, 60 source_manifest: bool | dict | Path | str | None = None, 61 install_if_missing: bool = True, 62 install_root: Path | None = None, 63 no_executor: bool = False, 64) -> Source: 65 """Get a connector by name and version. 66 67 If an explicit install or execution method is requested (e.g. `local_executable`, 68 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 69 70 Otherwise, an appropriate method will be selected based on the available connector metadata: 71 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 72 will be downloaded and used to to execute the connector. 73 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 74 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 75 will be executed using Docker. 76 77 Args: 78 name: connector name 79 config: connector config - if not provided, you need to set it later via the set_config 80 method. 81 config_change_callback: callback function to be called when the connector config changes. 82 streams: list of stream names to select for reading. If set to "*", all streams will be 83 selected. If not provided, you can set it later via the `select_streams()` or 84 `select_all_streams()` method. 85 version: connector version - if not provided, the currently installed version will be used. 86 If no version is installed, the latest available version will be used. The version can 87 also be set to "latest" to force the use of the latest available version. 88 use_python: (Optional.) Python interpreter specification: 89 - True: Use current Python interpreter. (Inferred if `pip_url` is set.) 90 - False: Use Docker instead. 91 - Path: Use interpreter at this path. 92 - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet 93 installed, it will be installed by uv. (This generally adds less than 3 seconds 94 to install times.) 95 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 96 connector name. 97 local_executable: If set, the connector will be assumed to already be installed and will be 98 executed using this path or executable name. Otherwise, the connector will be installed 99 automatically in a virtual environment. 100 docker_image: If set, the connector will be executed using Docker. You can specify `True` 101 to use the default image for the connector, or you can specify a custom image name. 102 If `version` is specified and your image name does not already contain a tag 103 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 104 use_host_network: If set, along with docker_image, the connector will be executed using 105 the host network. This is useful for connectors that need to access resources on 106 the host machine, such as a local database. This parameter is ignored when 107 `docker_image` is not set. 108 source_manifest: If set, the connector will be executed based on a declarative YAML 109 source definition. This input can be `True` to attempt to auto-download a YAML spec, 110 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 111 the local file system, or `str` to pull the definition from a web URL. 112 install_if_missing: Whether to install the connector if it is not available locally. This 113 parameter is ignored when `local_executable` or `source_manifest` are set. 114 install_root: (Optional.) The root directory where the virtual environment will be 115 created. If not provided, the current working directory will be used. 116 no_executor: If True, use NoOpExecutor which fetches specs from the registry without 117 local installation. This is useful for scenarios where you need to validate 118 configurations but don't need to run the connector locally (e.g., deploying to Cloud). 119 """ 120 executor = get_connector_executor( 121 name=name, 122 version=version, 123 use_python=use_python, 124 pip_url=pip_url, 125 local_executable=local_executable, 126 docker_image=docker_image, 127 use_host_network=use_host_network, 128 source_manifest=source_manifest, 129 install_if_missing=install_if_missing, 130 install_root=install_root, 131 no_executor=no_executor, 132 ) 133 134 return Source( 135 name=name, 136 config=config, 137 config_change_callback=config_change_callback, 138 streams=streams, 139 executor=executor, 140 )
Get a connector by name and version.
If an explicit install or execution method is requested (e.g. local_executable,
docker_image, pip_url, source_manifest), the connector will be executed using this method.
Otherwise, an appropriate method will be selected based on the available connector metadata:
- If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
- Else, if the connector is registered and has a PyPI package, it will be installed via pip.
- Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()orselect_all_streams()method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- use_python: (Optional.) Python interpreter specification:
- True: Use current Python interpreter. (Inferred if
pip_urlis set.) - False: Use Docker instead.
- Path: Use interpreter at this path.
- str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
- True: Use current Python interpreter. (Inferred if
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
Trueto use the default image for the connector, or you can specify a custom image name. Ifversionis specified and your image name does not already contain a tag (e.g.my-image:latest), the version will be appended as a tag (e.g.my-image:0.1.0). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_imageis not set. - source_manifest: If set, the connector will be executed based on a declarative YAML
source definition. This input can be
Trueto attempt to auto-download a YAML spec,dictto accept a Python dictionary as the manifest,Pathto pull a manifest from the local file system, orstrto pull the definition from a web URL. - install_if_missing: Whether to install the connector if it is not available locally. This
parameter is ignored when
local_executableorsource_manifestare set. - install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
- no_executor: If True, use NoOpExecutor which fetches specs from the registry without local installation. This is useful for scenarios where you need to validate configurations but don't need to run the connector locally (e.g., deploying to Cloud).
143def get_benchmark_source( 144 num_records: int | str = "5e5", 145 *, 146 install_if_missing: bool = True, 147) -> Source: 148 """Get a source for benchmarking. 149 150 This source will generate dummy records for performance benchmarking purposes. 151 You can specify the number of records to generate using the `num_records` parameter. 152 The `num_records` parameter can be an integer or a string in scientific notation. 153 For example, `"5e6"` will generate 5 million records. If underscores are providing 154 within a numeric a string, they will be ignored. 155 156 Args: 157 num_records: The number of records to generate. Defaults to "5e5", or 158 500,000 records. 159 Can be an integer (`1000`) or a string in scientific notation. 160 For example, `"5e6"` will generate 5 million records. 161 install_if_missing: Whether to install the source if it is not available locally. 162 163 Returns: 164 Source: The source object for benchmarking. 165 """ 166 if isinstance(num_records, str): 167 try: 168 num_records = int(Decimal(num_records.replace("_", ""))) 169 except InvalidOperation as ex: 170 raise PyAirbyteInputError( 171 message="Invalid number format.", 172 original_exception=ex, 173 input_value=str(num_records), 174 ) from None 175 176 return get_source( 177 name="source-e2e-test", 178 docker_image=True, 179 # docker_image="airbyte/source-e2e-test:latest", 180 config={ 181 "type": "BENCHMARK", 182 "schema": "FIVE_STRING_COLUMNS", 183 "terminationCondition": { 184 "type": "MAX_RECORDS", 185 "max": num_records, 186 }, 187 }, 188 streams="*", 189 install_if_missing=install_if_missing, 190 )
Get a source for benchmarking.
This source will generate dummy records for performance benchmarking purposes.
You can specify the number of records to generate using the num_records parameter.
The num_records parameter can be an integer or a string in scientific notation.
For example, "5e6" will generate 5 million records. If underscores are providing
within a numeric a string, they will be ignored.
Arguments:
- num_records: The number of records to generate. Defaults to "5e5", or
500,000 records.
Can be an integer (
1000) or a string in scientific notation. For example,"5e6"will generate 5 million records. - install_if_missing: Whether to install the source if it is not available locally.
Returns:
Source: The source object for benchmarking.