airbyte.sources.util
Utility functions for working with sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Utility functions for working with sources.""" 3 4from __future__ import annotations 5 6import warnings 7from decimal import Decimal, InvalidOperation 8from typing import TYPE_CHECKING, Any 9 10from airbyte._executors.util import get_connector_executor 11from airbyte.exceptions import PyAirbyteInputError 12from airbyte.sources.base import Source 13 14 15if TYPE_CHECKING: 16 from pathlib import Path 17 18 from airbyte.callbacks import ConfigChangeCallback 19 20 21def get_connector( 22 name: str, 23 config: dict[str, Any] | None = None, 24 *, 25 version: str | None = None, 26 pip_url: str | None = None, 27 local_executable: Path | str | None = None, 28 install_if_missing: bool = True, 29) -> Source: 30 """Deprecated. Use get_source instead.""" 31 warnings.warn( 32 "The `get_connector()` function is deprecated and will be removed in a future version." 33 "Please use `get_source()` instead.", 34 DeprecationWarning, 35 stacklevel=2, 36 ) 37 return get_source( 38 name=name, 39 config=config, 40 version=version, 41 pip_url=pip_url, 42 local_executable=local_executable, 43 install_if_missing=install_if_missing, 44 ) 45 46 47def get_source( # noqa: PLR0913 # Too many arguments 48 name: str, 49 config: dict[str, Any] | None = None, 50 *, 51 config_change_callback: ConfigChangeCallback | None = None, 52 streams: str | list[str] | None = None, 53 version: str | None = None, 54 use_python: bool | Path | str | None = None, 55 pip_url: str | None = None, 56 local_executable: Path | str | None = None, 57 docker_image: bool | str | None = None, 58 use_host_network: bool = False, 59 source_manifest: bool | dict | Path | str | None = None, 60 install_if_missing: bool = True, 61 install_root: Path | None = None, 62) -> Source: 63 """Get a connector by name and version. 64 65 If an explicit install or execution method is requested (e.g. `local_executable`, 66 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 67 68 Otherwise, an appropriate method will be selected based on the available connector metadata: 69 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 70 will be downloaded and used to to execute the connector. 71 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 72 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 73 will be executed using Docker. 74 75 Args: 76 name: connector name 77 config: connector config - if not provided, you need to set it later via the set_config 78 method. 79 config_change_callback: callback function to be called when the connector config changes. 80 streams: list of stream names to select for reading. If set to "*", all streams will be 81 selected. If not provided, you can set it later via the `select_streams()` or 82 `select_all_streams()` method. 83 version: connector version - if not provided, the currently installed version will be used. 84 If no version is installed, the latest available version will be used. The version can 85 also be set to "latest" to force the use of the latest available version. 86 use_python: (Optional.) Python interpreter specification: 87 - True: Use current Python interpreter. (Inferred if `pip_url` is set.) 88 - False: Use Docker instead. 89 - Path: Use interpreter at this path. 90 - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet 91 installed, it will be installed by uv. (This generally adds less than 3 seconds 92 to install times.) 93 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 94 connector name. 95 local_executable: If set, the connector will be assumed to already be installed and will be 96 executed using this path or executable name. Otherwise, the connector will be installed 97 automatically in a virtual environment. 98 docker_image: If set, the connector will be executed using Docker. You can specify `True` 99 to use the default image for the connector, or you can specify a custom image name. 100 If `version` is specified and your image name does not already contain a tag 101 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 102 use_host_network: If set, along with docker_image, the connector will be executed using 103 the host network. This is useful for connectors that need to access resources on 104 the host machine, such as a local database. This parameter is ignored when 105 `docker_image` is not set. 106 source_manifest: If set, the connector will be executed based on a declarative YAML 107 source definition. This input can be `True` to attempt to auto-download a YAML spec, 108 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 109 the local file system, or `str` to pull the definition from a web URL. 110 install_if_missing: Whether to install the connector if it is not available locally. This 111 parameter is ignored when `local_executable` or `source_manifest` are set. 112 install_root: (Optional.) The root directory where the virtual environment will be 113 created. If not provided, the current working directory will be used. 114 """ 115 return Source( 116 name=name, 117 config=config, 118 config_change_callback=config_change_callback, 119 streams=streams, 120 executor=get_connector_executor( 121 name=name, 122 version=version, 123 use_python=use_python, 124 pip_url=pip_url, 125 local_executable=local_executable, 126 docker_image=docker_image, 127 use_host_network=use_host_network, 128 source_manifest=source_manifest, 129 install_if_missing=install_if_missing, 130 install_root=install_root, 131 ), 132 ) 133 134 135def get_benchmark_source( 136 num_records: int | str = "5e5", 137 *, 138 install_if_missing: bool = True, 139) -> Source: 140 """Get a source for benchmarking. 141 142 This source will generate dummy records for performance benchmarking purposes. 143 You can specify the number of records to generate using the `num_records` parameter. 144 The `num_records` parameter can be an integer or a string in scientific notation. 145 For example, `"5e6"` will generate 5 million records. If underscores are providing 146 within a numeric a string, they will be ignored. 147 148 Args: 149 num_records: The number of records to generate. Defaults to "5e5", or 150 500,000 records. 151 Can be an integer (`1000`) or a string in scientific notation. 152 For example, `"5e6"` will generate 5 million records. 153 install_if_missing: Whether to install the source if it is not available locally. 154 155 Returns: 156 Source: The source object for benchmarking. 157 """ 158 if isinstance(num_records, str): 159 try: 160 num_records = int(Decimal(num_records.replace("_", ""))) 161 except InvalidOperation as ex: 162 raise PyAirbyteInputError( 163 message="Invalid number format.", 164 original_exception=ex, 165 input_value=str(num_records), 166 ) from None 167 168 return get_source( 169 name="source-e2e-test", 170 docker_image=True, 171 # docker_image="airbyte/source-e2e-test:latest", 172 config={ 173 "type": "BENCHMARK", 174 "schema": "FIVE_STRING_COLUMNS", 175 "terminationCondition": { 176 "type": "MAX_RECORDS", 177 "max": num_records, 178 }, 179 }, 180 streams="*", 181 install_if_missing=install_if_missing, 182 ) 183 184 185__all__ = [ 186 "get_source", 187 "get_benchmark_source", 188]
48def get_source( # noqa: PLR0913 # Too many arguments 49 name: str, 50 config: dict[str, Any] | None = None, 51 *, 52 config_change_callback: ConfigChangeCallback | None = None, 53 streams: str | list[str] | None = None, 54 version: str | None = None, 55 use_python: bool | Path | str | None = None, 56 pip_url: str | None = None, 57 local_executable: Path | str | None = None, 58 docker_image: bool | str | None = None, 59 use_host_network: bool = False, 60 source_manifest: bool | dict | Path | str | None = None, 61 install_if_missing: bool = True, 62 install_root: Path | None = None, 63) -> Source: 64 """Get a connector by name and version. 65 66 If an explicit install or execution method is requested (e.g. `local_executable`, 67 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 68 69 Otherwise, an appropriate method will be selected based on the available connector metadata: 70 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 71 will be downloaded and used to to execute the connector. 72 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 73 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 74 will be executed using Docker. 75 76 Args: 77 name: connector name 78 config: connector config - if not provided, you need to set it later via the set_config 79 method. 80 config_change_callback: callback function to be called when the connector config changes. 81 streams: list of stream names to select for reading. If set to "*", all streams will be 82 selected. If not provided, you can set it later via the `select_streams()` or 83 `select_all_streams()` method. 84 version: connector version - if not provided, the currently installed version will be used. 85 If no version is installed, the latest available version will be used. The version can 86 also be set to "latest" to force the use of the latest available version. 87 use_python: (Optional.) Python interpreter specification: 88 - True: Use current Python interpreter. (Inferred if `pip_url` is set.) 89 - False: Use Docker instead. 90 - Path: Use interpreter at this path. 91 - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet 92 installed, it will be installed by uv. (This generally adds less than 3 seconds 93 to install times.) 94 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 95 connector name. 96 local_executable: If set, the connector will be assumed to already be installed and will be 97 executed using this path or executable name. Otherwise, the connector will be installed 98 automatically in a virtual environment. 99 docker_image: If set, the connector will be executed using Docker. You can specify `True` 100 to use the default image for the connector, or you can specify a custom image name. 101 If `version` is specified and your image name does not already contain a tag 102 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 103 use_host_network: If set, along with docker_image, the connector will be executed using 104 the host network. This is useful for connectors that need to access resources on 105 the host machine, such as a local database. This parameter is ignored when 106 `docker_image` is not set. 107 source_manifest: If set, the connector will be executed based on a declarative YAML 108 source definition. This input can be `True` to attempt to auto-download a YAML spec, 109 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 110 the local file system, or `str` to pull the definition from a web URL. 111 install_if_missing: Whether to install the connector if it is not available locally. This 112 parameter is ignored when `local_executable` or `source_manifest` are set. 113 install_root: (Optional.) The root directory where the virtual environment will be 114 created. If not provided, the current working directory will be used. 115 """ 116 return Source( 117 name=name, 118 config=config, 119 config_change_callback=config_change_callback, 120 streams=streams, 121 executor=get_connector_executor( 122 name=name, 123 version=version, 124 use_python=use_python, 125 pip_url=pip_url, 126 local_executable=local_executable, 127 docker_image=docker_image, 128 use_host_network=use_host_network, 129 source_manifest=source_manifest, 130 install_if_missing=install_if_missing, 131 install_root=install_root, 132 ), 133 )
Get a connector by name and version.
If an explicit install or execution method is requested (e.g. local_executable
,
docker_image
, pip_url
, source_manifest
), the connector will be executed using this method.
Otherwise, an appropriate method will be selected based on the available connector metadata:
- If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
- Else, if the connector is registered and has a PyPI package, it will be installed via pip.
- Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- use_python: (Optional.) Python interpreter specification:
- True: Use current Python interpreter. (Inferred if
pip_url
is set.) - False: Use Docker instead.
- Path: Use interpreter at this path.
- str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
- True: Use current Python interpreter. (Inferred if
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
True
to use the default image for the connector, or you can specify a custom image name. Ifversion
is specified and your image name does not already contain a tag (e.g.my-image:latest
), the version will be appended as a tag (e.g.my-image:0.1.0
). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_image
is not set. - source_manifest: If set, the connector will be executed based on a declarative YAML
source definition. This input can be
True
to attempt to auto-download a YAML spec,dict
to accept a Python dictionary as the manifest,Path
to pull a manifest from the local file system, orstr
to pull the definition from a web URL. - install_if_missing: Whether to install the connector if it is not available locally. This
parameter is ignored when
local_executable
orsource_manifest
are set. - install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
136def get_benchmark_source( 137 num_records: int | str = "5e5", 138 *, 139 install_if_missing: bool = True, 140) -> Source: 141 """Get a source for benchmarking. 142 143 This source will generate dummy records for performance benchmarking purposes. 144 You can specify the number of records to generate using the `num_records` parameter. 145 The `num_records` parameter can be an integer or a string in scientific notation. 146 For example, `"5e6"` will generate 5 million records. If underscores are providing 147 within a numeric a string, they will be ignored. 148 149 Args: 150 num_records: The number of records to generate. Defaults to "5e5", or 151 500,000 records. 152 Can be an integer (`1000`) or a string in scientific notation. 153 For example, `"5e6"` will generate 5 million records. 154 install_if_missing: Whether to install the source if it is not available locally. 155 156 Returns: 157 Source: The source object for benchmarking. 158 """ 159 if isinstance(num_records, str): 160 try: 161 num_records = int(Decimal(num_records.replace("_", ""))) 162 except InvalidOperation as ex: 163 raise PyAirbyteInputError( 164 message="Invalid number format.", 165 original_exception=ex, 166 input_value=str(num_records), 167 ) from None 168 169 return get_source( 170 name="source-e2e-test", 171 docker_image=True, 172 # docker_image="airbyte/source-e2e-test:latest", 173 config={ 174 "type": "BENCHMARK", 175 "schema": "FIVE_STRING_COLUMNS", 176 "terminationCondition": { 177 "type": "MAX_RECORDS", 178 "max": num_records, 179 }, 180 }, 181 streams="*", 182 install_if_missing=install_if_missing, 183 )
Get a source for benchmarking.
This source will generate dummy records for performance benchmarking purposes.
You can specify the number of records to generate using the num_records
parameter.
The num_records
parameter can be an integer or a string in scientific notation.
For example, "5e6"
will generate 5 million records. If underscores are providing
within a numeric a string, they will be ignored.
Arguments:
- num_records: The number of records to generate. Defaults to "5e5", or
500,000 records.
Can be an integer (
1000
) or a string in scientific notation. For example,"5e6"
will generate 5 million records. - install_if_missing: Whether to install the source if it is not available locally.
Returns:
Source: The source object for benchmarking.