airbyte.experimental

Experimental features which may change.

The experimental get_source implementation allows you to run sources using Docker containers. This feature is still in development and may change in the future.

To use this feature, import get_source from this module and use it in place of the get_source function from the airbyte module.

Instead of this:

from airbyte import ab

source = ab.get_source(...)

Use this:

from airbyte.experimental import get_source

source = get_source(...)

Experimental features may change without notice between minor versions of PyAirbyte. Although rare, they may also be entirely removed or refactored in future versions of PyAirbyte. Experimental features may also be less stable than other features, and may not be as well-tested.

You can help improve this product by reporting issues and providing feedback for improvements in our GitHub issue tracker.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Experimental features which may change.
 3
 4The experimental `get_source` implementation allows you to run sources
 5using Docker containers. This feature is still in development and may
 6change in the future.
 7
 8To use this feature, import `get_source` from this module and use it in place of the `get_source`
 9function from the `airbyte` module.
10
11Instead of this:
12
13```python
14from airbyte import ab
15
16source = ab.get_source(...)
17```
18
19Use this:
20
21```python
22from airbyte.experimental import get_source
23
24source = get_source(...)
25```
26
27Experimental features may change without notice between minor versions of PyAirbyte. Although rare,
28they may also be entirely removed or refactored in future versions of PyAirbyte. Experimental
29features may also be less stable than other features, and may not be as well-tested.
30
31You can help improve this product by reporting issues and providing feedback for improvements in our
32[GitHub issue tracker](https://github.com/airbytehq/pyairbyte/issues).
33"""
34
35from __future__ import annotations
36
37from airbyte.sources.util import _get_source as get_source
38
39
40__all__ = [
41    "get_source",
42]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, streams: str | list[str] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str = False, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str = False, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> airbyte.sources.base.Source:
 54def _get_source(  # noqa: PLR0912, PLR0913, PLR0915 # Too complex
 55    name: str,
 56    config: dict[str, Any] | None = None,
 57    *,
 58    streams: str | list[str] | None = None,
 59    version: str | None = None,
 60    pip_url: str | None = None,
 61    local_executable: Path | str | None = None,
 62    docker_image: bool | str = False,
 63    use_host_network: bool = False,
 64    source_manifest: bool | dict | Path | str = False,
 65    install_if_missing: bool = True,
 66    install_root: Path | None = None,
 67) -> Source:
 68    """Get a connector by name and version.
 69
 70    Args:
 71        name: connector name
 72        config: connector config - if not provided, you need to set it later via the set_config
 73            method.
 74        streams: list of stream names to select for reading. If set to "*", all streams will be
 75            selected. If not provided, you can set it later via the `select_streams()` or
 76            `select_all_streams()` method.
 77        version: connector version - if not provided, the currently installed version will be used.
 78            If no version is installed, the latest available version will be used. The version can
 79            also be set to "latest" to force the use of the latest available version.
 80        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 81            connector name.
 82        local_executable: If set, the connector will be assumed to already be installed and will be
 83            executed using this path or executable name. Otherwise, the connector will be installed
 84            automatically in a virtual environment.
 85        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 86            to use the default image for the connector, or you can specify a custom image name.
 87            If `version` is specified and your image name does not already contain a tag
 88            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 89        use_host_network: If set, along with docker_image, the connector will be executed using
 90            the host network. This is useful for connectors that need to access resources on
 91            the host machine, such as a local database.
 92        source_manifest: If set, the connector will be executed based on a declarative Yaml
 93            source definition. This input can be `True` to auto-download the yaml spec, `dict`
 94            to accept a Python dictionary as the manifest, `Path` to pull a manifest from
 95            the local file system, or `str` to pull the definition from a web URL.
 96        install_if_missing: Whether to install the connector if it is not available locally. This
 97            parameter is ignored when local_executable is set.
 98        install_root: (Optional.) The root directory where the virtual environment will be
 99            created. If not provided, the current working directory will be used.
100    """
101    if (
102        sum(
103            [
104                bool(local_executable),
105                bool(docker_image),
106                bool(pip_url),
107                bool(source_manifest),
108            ]
109        )
110        > 1
111    ):
112        raise exc.PyAirbyteInputError(
113            message=(
114                "You can only specify one of the settings: 'local_executable', 'docker_image', "
115                "'pip_url', or 'source_manifest'."
116            ),
117            context={
118                "local_executable": local_executable,
119                "docker_image": docker_image,
120                "pip_url": pip_url,
121                "source_manifest": source_manifest,
122            },
123        )
124
125    if local_executable:
126        if version:
127            raise exc.PyAirbyteInputError(
128                message="Param 'version' is not supported when 'local_executable' is set."
129            )
130
131        if isinstance(local_executable, str):
132            if "/" in local_executable or "\\" in local_executable:
133                # Assume this is a path
134                local_executable = Path(local_executable).absolute()
135            else:
136                which_executable: str | None = None
137                which_executable = shutil.which(local_executable)
138                if not which_executable and sys.platform == "win32":
139                    # Try with the .exe extension
140                    local_executable = f"{local_executable}.exe"
141                    which_executable = shutil.which(local_executable)
142
143                if which_executable is None:
144                    raise exc.AirbyteConnectorExecutableNotFoundError(
145                        connector_name=name,
146                        context={
147                            "executable": local_executable,
148                            "working_directory": Path.cwd().absolute(),
149                        },
150                    ) from FileNotFoundError(local_executable)
151                local_executable = Path(which_executable).absolute()
152
153        print(f"Using local `{name}` executable: {local_executable!s}")
154        return Source(
155            name=name,
156            config=config,
157            streams=streams,
158            executor=PathExecutor(
159                name=name,
160                path=local_executable,
161            ),
162        )
163
164    if docker_image:
165        if docker_image is True:
166            # Use the default image name for the connector
167            docker_image = f"airbyte/{name}"
168
169        if version is not None and ":" in docker_image:
170            raise exc.PyAirbyteInputError(
171                message="The 'version' parameter is not supported when a tag is already set in the "
172                "'docker_image' parameter.",
173                context={
174                    "docker_image": docker_image,
175                    "version": version,
176                },
177            )
178
179        if ":" not in docker_image:
180            docker_image = f"{docker_image}:{version or 'latest'}"
181
182        temp_dir = tempfile.gettempdir()
183
184        docker_cmd = [
185            "docker",
186            "run",
187            "--rm",
188            "-i",
189            "--volume",
190            f"{temp_dir}:{temp_dir}",
191        ]
192
193        if use_host_network is True:
194            docker_cmd.extend(["--network", "host"])
195
196        docker_cmd.extend([docker_image])
197
198        return Source(
199            name=name,
200            config=config,
201            streams=streams,
202            executor=DockerExecutor(
203                name=name,
204                executable=docker_cmd,
205            ),
206        )
207
208    if source_manifest:
209        if source_manifest is True:
210            http_url = (
211                "https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
212                f"/connectors/{name}/{name.replace('-', '_')}/manifest.yaml"
213            )
214            print("Installing connector from YAML manifest:", http_url)
215            # Download the file
216            response = requests.get(http_url)
217            response.raise_for_status()  # Raise an exception if the download failed
218
219            if "class_name:" in response.text:
220                raise exc.AirbyteConnectorInstallationError(
221                    message=(
222                        "The provided manifest requires additional code files (`class_name` key "
223                        "detected). This feature is not compatible with the declarative YAML "
224                        "executor. To use this executor, please try again with the Python "
225                        "executor."
226                    ),
227                    connector_name=name,
228                    context={
229                        "manifest_url": http_url,
230                    },
231                )
232
233            try:
234                source_manifest = cast(dict, yaml.safe_load(response.text))
235            except JSONDecodeError as ex:
236                raise exc.AirbyteConnectorInstallationError(
237                    connector_name=name,
238                    context={
239                        "manifest_url": http_url,
240                    },
241                ) from ex
242
243        return Source(
244            name=name,
245            config=config,
246            streams=streams,
247            executor=DeclarativeExecutor(
248                manifest=source_manifest,
249            ),
250        )
251    # else: we are installing a connector in a virtual environment:
252
253    metadata: ConnectorMetadata | None = None
254    try:
255        metadata = get_connector_metadata(name)
256    except exc.AirbyteConnectorNotRegisteredError as ex:
257        if not pip_url:
258            log_install_state(name, state=EventState.FAILED, exception=ex)
259            # We don't have a pip url or registry entry, so we can't install the connector
260            raise
261
262    try:
263        executor = VenvExecutor(
264            name=name,
265            metadata=metadata,
266            target_version=version,
267            pip_url=pip_url,
268            install_root=install_root,
269        )
270        if install_if_missing:
271            executor.ensure_installation()
272
273        return Source(
274            name=name,
275            config=config,
276            streams=streams,
277            executor=executor,
278        )
279    except Exception as e:
280        log_install_state(name, state=EventState.FAILED, exception=e)
281        raise

Get a connector by name and version.

Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database.
  • source_manifest: If set, the connector will be executed based on a declarative Yaml source definition. This input can be True to auto-download the yaml spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.