airbyte.experimental
Experimental features which may change.
The experimental get_source
implementation allows you to run sources
using Docker containers. This feature is still in development and may
change in the future.
To use this feature, import get_source
from this module and use it in place of the get_source
function from the airbyte
module.
Instead of this:
from airbyte import ab
source = ab.get_source(...)
Use this:
from airbyte.experimental import get_source
source = get_source(...)
Experimental features may change without notice between minor versions of PyAirbyte. Although rare, they may also be entirely removed or refactored in future versions of PyAirbyte. Experimental features may also be less stable than other features, and may not be as well-tested.
You can help improve this product by reporting issues and providing feedback for improvements in our GitHub issue tracker.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Experimental features which may change. 3 4The experimental `get_source` implementation allows you to run sources 5using Docker containers. This feature is still in development and may 6change in the future. 7 8To use this feature, import `get_source` from this module and use it in place of the `get_source` 9function from the `airbyte` module. 10 11Instead of this: 12 13```python 14from airbyte import ab 15 16source = ab.get_source(...) 17``` 18 19Use this: 20 21```python 22from airbyte.experimental import get_source 23 24source = get_source(...) 25``` 26 27Experimental features may change without notice between minor versions of PyAirbyte. Although rare, 28they may also be entirely removed or refactored in future versions of PyAirbyte. Experimental 29features may also be less stable than other features, and may not be as well-tested. 30 31You can help improve this product by reporting issues and providing feedback for improvements in our 32[GitHub issue tracker](https://github.com/airbytehq/pyairbyte/issues). 33""" 34 35from __future__ import annotations 36 37from airbyte.sources.util import _get_source as get_source 38 39 40__all__ = [ 41 "get_source", 42]
54def _get_source( # noqa: PLR0912, PLR0913, PLR0915 # Too complex 55 name: str, 56 config: dict[str, Any] | None = None, 57 *, 58 streams: str | list[str] | None = None, 59 version: str | None = None, 60 pip_url: str | None = None, 61 local_executable: Path | str | None = None, 62 docker_image: bool | str = False, 63 use_host_network: bool = False, 64 source_manifest: bool | dict | Path | str = False, 65 install_if_missing: bool = True, 66 install_root: Path | None = None, 67) -> Source: 68 """Get a connector by name and version. 69 70 Args: 71 name: connector name 72 config: connector config - if not provided, you need to set it later via the set_config 73 method. 74 streams: list of stream names to select for reading. If set to "*", all streams will be 75 selected. If not provided, you can set it later via the `select_streams()` or 76 `select_all_streams()` method. 77 version: connector version - if not provided, the currently installed version will be used. 78 If no version is installed, the latest available version will be used. The version can 79 also be set to "latest" to force the use of the latest available version. 80 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 81 connector name. 82 local_executable: If set, the connector will be assumed to already be installed and will be 83 executed using this path or executable name. Otherwise, the connector will be installed 84 automatically in a virtual environment. 85 docker_image: If set, the connector will be executed using Docker. You can specify `True` 86 to use the default image for the connector, or you can specify a custom image name. 87 If `version` is specified and your image name does not already contain a tag 88 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 89 use_host_network: If set, along with docker_image, the connector will be executed using 90 the host network. This is useful for connectors that need to access resources on 91 the host machine, such as a local database. 92 source_manifest: If set, the connector will be executed based on a declarative Yaml 93 source definition. This input can be `True` to auto-download the yaml spec, `dict` 94 to accept a Python dictionary as the manifest, `Path` to pull a manifest from 95 the local file system, or `str` to pull the definition from a web URL. 96 install_if_missing: Whether to install the connector if it is not available locally. This 97 parameter is ignored when local_executable is set. 98 install_root: (Optional.) The root directory where the virtual environment will be 99 created. If not provided, the current working directory will be used. 100 """ 101 if ( 102 sum( 103 [ 104 bool(local_executable), 105 bool(docker_image), 106 bool(pip_url), 107 bool(source_manifest), 108 ] 109 ) 110 > 1 111 ): 112 raise exc.PyAirbyteInputError( 113 message=( 114 "You can only specify one of the settings: 'local_executable', 'docker_image', " 115 "'pip_url', or 'source_manifest'." 116 ), 117 context={ 118 "local_executable": local_executable, 119 "docker_image": docker_image, 120 "pip_url": pip_url, 121 "source_manifest": source_manifest, 122 }, 123 ) 124 125 if local_executable: 126 if version: 127 raise exc.PyAirbyteInputError( 128 message="Param 'version' is not supported when 'local_executable' is set." 129 ) 130 131 if isinstance(local_executable, str): 132 if "/" in local_executable or "\\" in local_executable: 133 # Assume this is a path 134 local_executable = Path(local_executable).absolute() 135 else: 136 which_executable: str | None = None 137 which_executable = shutil.which(local_executable) 138 if not which_executable and sys.platform == "win32": 139 # Try with the .exe extension 140 local_executable = f"{local_executable}.exe" 141 which_executable = shutil.which(local_executable) 142 143 if which_executable is None: 144 raise exc.AirbyteConnectorExecutableNotFoundError( 145 connector_name=name, 146 context={ 147 "executable": local_executable, 148 "working_directory": Path.cwd().absolute(), 149 }, 150 ) from FileNotFoundError(local_executable) 151 local_executable = Path(which_executable).absolute() 152 153 print(f"Using local `{name}` executable: {local_executable!s}") 154 return Source( 155 name=name, 156 config=config, 157 streams=streams, 158 executor=PathExecutor( 159 name=name, 160 path=local_executable, 161 ), 162 ) 163 164 if docker_image: 165 if docker_image is True: 166 # Use the default image name for the connector 167 docker_image = f"airbyte/{name}" 168 169 if version is not None and ":" in docker_image: 170 raise exc.PyAirbyteInputError( 171 message="The 'version' parameter is not supported when a tag is already set in the " 172 "'docker_image' parameter.", 173 context={ 174 "docker_image": docker_image, 175 "version": version, 176 }, 177 ) 178 179 if ":" not in docker_image: 180 docker_image = f"{docker_image}:{version or 'latest'}" 181 182 temp_dir = tempfile.gettempdir() 183 184 docker_cmd = [ 185 "docker", 186 "run", 187 "--rm", 188 "-i", 189 "--volume", 190 f"{temp_dir}:{temp_dir}", 191 ] 192 193 if use_host_network is True: 194 docker_cmd.extend(["--network", "host"]) 195 196 docker_cmd.extend([docker_image]) 197 198 return Source( 199 name=name, 200 config=config, 201 streams=streams, 202 executor=DockerExecutor( 203 name=name, 204 executable=docker_cmd, 205 ), 206 ) 207 208 if source_manifest: 209 if source_manifest is True: 210 http_url = ( 211 "https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations" 212 f"/connectors/{name}/{name.replace('-', '_')}/manifest.yaml" 213 ) 214 print("Installing connector from YAML manifest:", http_url) 215 # Download the file 216 response = requests.get(http_url) 217 response.raise_for_status() # Raise an exception if the download failed 218 219 if "class_name:" in response.text: 220 raise exc.AirbyteConnectorInstallationError( 221 message=( 222 "The provided manifest requires additional code files (`class_name` key " 223 "detected). This feature is not compatible with the declarative YAML " 224 "executor. To use this executor, please try again with the Python " 225 "executor." 226 ), 227 connector_name=name, 228 context={ 229 "manifest_url": http_url, 230 }, 231 ) 232 233 try: 234 source_manifest = cast(dict, yaml.safe_load(response.text)) 235 except JSONDecodeError as ex: 236 raise exc.AirbyteConnectorInstallationError( 237 connector_name=name, 238 context={ 239 "manifest_url": http_url, 240 }, 241 ) from ex 242 243 return Source( 244 name=name, 245 config=config, 246 streams=streams, 247 executor=DeclarativeExecutor( 248 manifest=source_manifest, 249 ), 250 ) 251 # else: we are installing a connector in a virtual environment: 252 253 metadata: ConnectorMetadata | None = None 254 try: 255 metadata = get_connector_metadata(name) 256 except exc.AirbyteConnectorNotRegisteredError as ex: 257 if not pip_url: 258 log_install_state(name, state=EventState.FAILED, exception=ex) 259 # We don't have a pip url or registry entry, so we can't install the connector 260 raise 261 262 try: 263 executor = VenvExecutor( 264 name=name, 265 metadata=metadata, 266 target_version=version, 267 pip_url=pip_url, 268 install_root=install_root, 269 ) 270 if install_if_missing: 271 executor.ensure_installation() 272 273 return Source( 274 name=name, 275 config=config, 276 streams=streams, 277 executor=executor, 278 ) 279 except Exception as e: 280 log_install_state(name, state=EventState.FAILED, exception=e) 281 raise
Get a connector by name and version.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
True
to use the default image for the connector, or you can specify a custom image name. Ifversion
is specified and your image name does not already contain a tag (e.g.my-image:latest
), the version will be appended as a tag (e.g.my-image:0.1.0
). - use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database.
- source_manifest: If set, the connector will be executed based on a declarative Yaml
source definition. This input can be
True
to auto-download the yaml spec,dict
to accept a Python dictionary as the manifest,Path
to pull a manifest from the local file system, orstr
to pull the definition from a web URL. - install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
- install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.