airbyte.sources.util
Utility functions for working with sources.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Utility functions for working with sources.""" 3 4from __future__ import annotations 5 6import warnings 7from decimal import Decimal, InvalidOperation 8from typing import TYPE_CHECKING, Any 9 10from airbyte._executors.util import get_connector_executor 11from airbyte.exceptions import PyAirbyteInputError 12from airbyte.sources.base import Source 13 14 15if TYPE_CHECKING: 16 from pathlib import Path 17 18 from airbyte.callbacks import ConfigChangeCallback 19 20 21def get_connector( 22 name: str, 23 config: dict[str, Any] | None = None, 24 *, 25 version: str | None = None, 26 pip_url: str | None = None, 27 local_executable: Path | str | None = None, 28 install_if_missing: bool = True, 29) -> Source: 30 """Deprecated. Use get_source instead.""" 31 warnings.warn( 32 "The `get_connector()` function is deprecated and will be removed in a future version." 33 "Please use `get_source()` instead.", 34 DeprecationWarning, 35 stacklevel=2, 36 ) 37 return get_source( 38 name=name, 39 config=config, 40 version=version, 41 pip_url=pip_url, 42 local_executable=local_executable, 43 install_if_missing=install_if_missing, 44 ) 45 46 47def get_source( # noqa: PLR0913 # Too many arguments 48 name: str, 49 config: dict[str, Any] | None = None, 50 *, 51 config_change_callback: ConfigChangeCallback | None = None, 52 streams: str | list[str] | None = None, 53 version: str | None = None, 54 pip_url: str | None = None, 55 local_executable: Path | str | None = None, 56 docker_image: bool | str | None = None, 57 use_host_network: bool = False, 58 source_manifest: bool | dict | Path | str | None = None, 59 install_if_missing: bool = True, 60 install_root: Path | None = None, 61) -> Source: 62 """Get a connector by name and version. 63 64 If an explicit install or execution method is requested (e.g. `local_executable`, 65 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 66 67 Otherwise, an appropriate method will be selected based on the available connector metadata: 68 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 69 will be downloaded and used to to execute the connector. 70 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 71 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 72 will be executed using Docker. 73 74 Args: 75 name: connector name 76 config: connector config - if not provided, you need to set it later via the set_config 77 method. 78 config_change_callback: callback function to be called when the connector config changes. 79 streams: list of stream names to select for reading. If set to "*", all streams will be 80 selected. If not provided, you can set it later via the `select_streams()` or 81 `select_all_streams()` method. 82 version: connector version - if not provided, the currently installed version will be used. 83 If no version is installed, the latest available version will be used. The version can 84 also be set to "latest" to force the use of the latest available version. 85 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 86 connector name. 87 local_executable: If set, the connector will be assumed to already be installed and will be 88 executed using this path or executable name. Otherwise, the connector will be installed 89 automatically in a virtual environment. 90 docker_image: If set, the connector will be executed using Docker. You can specify `True` 91 to use the default image for the connector, or you can specify a custom image name. 92 If `version` is specified and your image name does not already contain a tag 93 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 94 use_host_network: If set, along with docker_image, the connector will be executed using 95 the host network. This is useful for connectors that need to access resources on 96 the host machine, such as a local database. This parameter is ignored when 97 `docker_image` is not set. 98 source_manifest: If set, the connector will be executed based on a declarative YAML 99 source definition. This input can be `True` to attempt to auto-download a YAML spec, 100 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 101 the local file system, or `str` to pull the definition from a web URL. 102 install_if_missing: Whether to install the connector if it is not available locally. This 103 parameter is ignored when `local_executable` or `source_manifest` are set. 104 install_root: (Optional.) The root directory where the virtual environment will be 105 created. If not provided, the current working directory will be used. 106 """ 107 return Source( 108 name=name, 109 config=config, 110 config_change_callback=config_change_callback, 111 streams=streams, 112 executor=get_connector_executor( 113 name=name, 114 version=version, 115 pip_url=pip_url, 116 local_executable=local_executable, 117 docker_image=docker_image, 118 use_host_network=use_host_network, 119 source_manifest=source_manifest, 120 install_if_missing=install_if_missing, 121 install_root=install_root, 122 ), 123 ) 124 125 126def get_benchmark_source( 127 num_records: int | str = "5e5", 128 *, 129 install_if_missing: bool = True, 130) -> Source: 131 """Get a source for benchmarking. 132 133 This source will generate dummy records for performance benchmarking purposes. 134 You can specify the number of records to generate using the `num_records` parameter. 135 The `num_records` parameter can be an integer or a string in scientific notation. 136 For example, `"5e6"` will generate 5 million records. If underscores are providing 137 within a numeric a string, they will be ignored. 138 139 Args: 140 num_records: The number of records to generate. Defaults to "5e5", or 141 500,000 records. 142 Can be an integer (`1000`) or a string in scientific notation. 143 For example, `"5e6"` will generate 5 million records. 144 install_if_missing: Whether to install the source if it is not available locally. 145 146 Returns: 147 Source: The source object for benchmarking. 148 """ 149 if isinstance(num_records, str): 150 try: 151 num_records = int(Decimal(num_records.replace("_", ""))) 152 except InvalidOperation as ex: 153 raise PyAirbyteInputError( 154 message="Invalid number format.", 155 original_exception=ex, 156 input_value=str(num_records), 157 ) from None 158 159 return get_source( 160 name="source-e2e-test", 161 docker_image=True, 162 # docker_image="airbyte/source-e2e-test:latest", 163 config={ 164 "type": "BENCHMARK", 165 "schema": "FIVE_STRING_COLUMNS", 166 "terminationCondition": { 167 "type": "MAX_RECORDS", 168 "max": num_records, 169 }, 170 }, 171 streams="*", 172 install_if_missing=install_if_missing, 173 ) 174 175 176__all__ = [ 177 "get_source", 178 "get_benchmark_source", 179]
48def get_source( # noqa: PLR0913 # Too many arguments 49 name: str, 50 config: dict[str, Any] | None = None, 51 *, 52 config_change_callback: ConfigChangeCallback | None = None, 53 streams: str | list[str] | None = None, 54 version: str | None = None, 55 pip_url: str | None = None, 56 local_executable: Path | str | None = None, 57 docker_image: bool | str | None = None, 58 use_host_network: bool = False, 59 source_manifest: bool | dict | Path | str | None = None, 60 install_if_missing: bool = True, 61 install_root: Path | None = None, 62) -> Source: 63 """Get a connector by name and version. 64 65 If an explicit install or execution method is requested (e.g. `local_executable`, 66 `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method. 67 68 Otherwise, an appropriate method will be selected based on the available connector metadata: 69 1. If the connector is registered and has a YAML source manifest is available, the YAML manifest 70 will be downloaded and used to to execute the connector. 71 2. Else, if the connector is registered and has a PyPI package, it will be installed via pip. 72 3. Else, if the connector is registered and has a Docker image, and if Docker is available, it 73 will be executed using Docker. 74 75 Args: 76 name: connector name 77 config: connector config - if not provided, you need to set it later via the set_config 78 method. 79 config_change_callback: callback function to be called when the connector config changes. 80 streams: list of stream names to select for reading. If set to "*", all streams will be 81 selected. If not provided, you can set it later via the `select_streams()` or 82 `select_all_streams()` method. 83 version: connector version - if not provided, the currently installed version will be used. 84 If no version is installed, the latest available version will be used. The version can 85 also be set to "latest" to force the use of the latest available version. 86 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 87 connector name. 88 local_executable: If set, the connector will be assumed to already be installed and will be 89 executed using this path or executable name. Otherwise, the connector will be installed 90 automatically in a virtual environment. 91 docker_image: If set, the connector will be executed using Docker. You can specify `True` 92 to use the default image for the connector, or you can specify a custom image name. 93 If `version` is specified and your image name does not already contain a tag 94 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 95 use_host_network: If set, along with docker_image, the connector will be executed using 96 the host network. This is useful for connectors that need to access resources on 97 the host machine, such as a local database. This parameter is ignored when 98 `docker_image` is not set. 99 source_manifest: If set, the connector will be executed based on a declarative YAML 100 source definition. This input can be `True` to attempt to auto-download a YAML spec, 101 `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from 102 the local file system, or `str` to pull the definition from a web URL. 103 install_if_missing: Whether to install the connector if it is not available locally. This 104 parameter is ignored when `local_executable` or `source_manifest` are set. 105 install_root: (Optional.) The root directory where the virtual environment will be 106 created. If not provided, the current working directory will be used. 107 """ 108 return Source( 109 name=name, 110 config=config, 111 config_change_callback=config_change_callback, 112 streams=streams, 113 executor=get_connector_executor( 114 name=name, 115 version=version, 116 pip_url=pip_url, 117 local_executable=local_executable, 118 docker_image=docker_image, 119 use_host_network=use_host_network, 120 source_manifest=source_manifest, 121 install_if_missing=install_if_missing, 122 install_root=install_root, 123 ), 124 )
Get a connector by name and version.
If an explicit install or execution method is requested (e.g. local_executable
,
docker_image
, pip_url
, source_manifest
), the connector will be executed using this method.
Otherwise, an appropriate method will be selected based on the available connector metadata:
- If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
- Else, if the connector is registered and has a PyPI package, it will be installed via pip.
- Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
True
to use the default image for the connector, or you can specify a custom image name. Ifversion
is specified and your image name does not already contain a tag (e.g.my-image:latest
), the version will be appended as a tag (e.g.my-image:0.1.0
). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_image
is not set. - source_manifest: If set, the connector will be executed based on a declarative YAML
source definition. This input can be
True
to attempt to auto-download a YAML spec,dict
to accept a Python dictionary as the manifest,Path
to pull a manifest from the local file system, orstr
to pull the definition from a web URL. - install_if_missing: Whether to install the connector if it is not available locally. This
parameter is ignored when
local_executable
orsource_manifest
are set. - install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
127def get_benchmark_source( 128 num_records: int | str = "5e5", 129 *, 130 install_if_missing: bool = True, 131) -> Source: 132 """Get a source for benchmarking. 133 134 This source will generate dummy records for performance benchmarking purposes. 135 You can specify the number of records to generate using the `num_records` parameter. 136 The `num_records` parameter can be an integer or a string in scientific notation. 137 For example, `"5e6"` will generate 5 million records. If underscores are providing 138 within a numeric a string, they will be ignored. 139 140 Args: 141 num_records: The number of records to generate. Defaults to "5e5", or 142 500,000 records. 143 Can be an integer (`1000`) or a string in scientific notation. 144 For example, `"5e6"` will generate 5 million records. 145 install_if_missing: Whether to install the source if it is not available locally. 146 147 Returns: 148 Source: The source object for benchmarking. 149 """ 150 if isinstance(num_records, str): 151 try: 152 num_records = int(Decimal(num_records.replace("_", ""))) 153 except InvalidOperation as ex: 154 raise PyAirbyteInputError( 155 message="Invalid number format.", 156 original_exception=ex, 157 input_value=str(num_records), 158 ) from None 159 160 return get_source( 161 name="source-e2e-test", 162 docker_image=True, 163 # docker_image="airbyte/source-e2e-test:latest", 164 config={ 165 "type": "BENCHMARK", 166 "schema": "FIVE_STRING_COLUMNS", 167 "terminationCondition": { 168 "type": "MAX_RECORDS", 169 "max": num_records, 170 }, 171 }, 172 streams="*", 173 install_if_missing=install_if_missing, 174 )
Get a source for benchmarking.
This source will generate dummy records for performance benchmarking purposes.
You can specify the number of records to generate using the num_records
parameter.
The num_records
parameter can be an integer or a string in scientific notation.
For example, "5e6"
will generate 5 million records. If underscores are providing
within a numeric a string, they will be ignored.
Arguments:
- num_records: The number of records to generate. Defaults to "5e5", or
500,000 records.
Can be an integer (
1000
) or a string in scientific notation. For example,"5e6"
will generate 5 million records. - install_if_missing: Whether to install the source if it is not available locally.
Returns:
Source: The source object for benchmarking.