airbyte.cli

CLI for PyAirbyte.

The PyAirbyte CLI provides a command-line interface for testing connectors and running benchmarks.

After installing PyAirbyte, the CLI can be invoked with the pyairbyte CLI executable, or the shorter pyab alias.

These are equivalent:

python -m airbyte.cli --help
pyairbyte --help
pyab --help

You can also use pipx or the fast and powerful uv tool to run the PyAirbyte CLI without pre-installing:

# Install `uv` if you haven't already:
brew install uv

# Run the PyAirbyte CLI using `uvx`:
uvx --from=airbyte pyab --help

Example benchmark Usage:

# PyAirbyte System Benchmark (no-op):
pyab benchmark --num-records=2.4e6

# Source Benchmark:
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}'
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*'
pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields

# Source Benchmark from Docker Image:
pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}'
pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}'

# Destination Benchmark:
pyab benchmark --destination=destination-dev-null --config=/path/to/config.json

# Benchmark a Local Python Source (source-s3):
pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json
# Equivalent to:
LOCAL_EXECUTABLE=$(poetry run which source-s3)
CONFIG_PATH=$(realpath ./secrets/config.json)
pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH

Example validate Usage:

pyab validate --connector=source-hardcoded-records
pyab validate --connector=source-hardcoded-records --config='{count: 400_000}'

PyAirbyte CLI Guidance

Providing connector configuration:

When providing configuration via --config, you can providing any of the following:

  1. A path to a configuration file, in yaml or json format.

  2. An inline yaml string, e.g. --config='{key: value}', --config='{key: {nested: value}}'.

When providing an inline yaml string, it is recommended to use single quotes to avoid shell interpolation.

Providing secrets:

You can provide secrets in your configuration file by prefixing the secret value with SECRET:. For example, --config='{password: "SECRET:my_password"'} will look for a secret named my_password in the secret store. By default, PyAirbyte will look for secrets in environment variables and dotenv (.env) files. If a secret is not found, you'll be prompted to provide the secret value interactively in the terminal.

It is highly recommended to use secrets when using inline yaml strings, in order to avoid exposing secrets in plain text in the terminal history. Secrets provided interactively will not be echoed to the terminal.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""CLI for PyAirbyte.
  3
  4The PyAirbyte CLI provides a command-line interface for testing connectors and running benchmarks.
  5
  6After installing PyAirbyte, the CLI can be invoked with the `pyairbyte` CLI executable, or the
  7shorter `pyab` alias.
  8
  9These are equivalent:
 10
 11```bash
 12python -m airbyte.cli --help
 13pyairbyte --help
 14pyab --help
 15```
 16
 17You can also use `pipx` or the fast and powerful `uv` tool to run the PyAirbyte CLI
 18without pre-installing:
 19
 20```bash
 21# Install `uv` if you haven't already:
 22brew install uv
 23
 24# Run the PyAirbyte CLI using `uvx`:
 25uvx --from=airbyte pyab --help
 26```
 27
 28Example `benchmark` Usage:
 29
 30```bash
 31# PyAirbyte System Benchmark (no-op):
 32pyab benchmark --num-records=2.4e6
 33
 34# Source Benchmark:
 35pyab benchmark --source=source-hardcoded-records --config='{count: 400000}'
 36pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*'
 37pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields
 38
 39# Source Benchmark from Docker Image:
 40pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}'
 41pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}'
 42
 43# Destination Benchmark:
 44pyab benchmark --destination=destination-dev-null --config=/path/to/config.json
 45
 46# Benchmark a Local Python Source (source-s3):
 47pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json
 48# Equivalent to:
 49LOCAL_EXECUTABLE=$(poetry run which source-s3)
 50CONFIG_PATH=$(realpath ./secrets/config.json)
 51pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH
 52```
 53
 54Example `validate` Usage:
 55
 56```bash
 57pyab validate --connector=source-hardcoded-records
 58pyab validate --connector=source-hardcoded-records --config='{count: 400_000}'
 59```
 60"""
 61
 62from __future__ import annotations
 63
 64from pathlib import Path
 65from typing import TYPE_CHECKING, Any
 66
 67import click
 68import yaml
 69
 70from airbyte.destinations.util import get_destination, get_noop_destination
 71from airbyte.exceptions import PyAirbyteInputError
 72from airbyte.secrets.util import get_secret
 73from airbyte.sources.util import get_benchmark_source, get_source
 74
 75
 76if TYPE_CHECKING:
 77    from airbyte.destinations.base import Destination
 78    from airbyte.sources.base import Source
 79
 80
 81CLI_GUIDANCE = """
 82----------------------
 83
 84PyAirbyte CLI Guidance
 85
 86Providing connector configuration:
 87
 88When providing configuration via `--config`, you can providing any of the following:
 89
 901. A path to a configuration file, in yaml or json format.
 91
 922. An inline yaml string, e.g. `--config='{key: value}'`, --config='{key: {nested: value}}'.
 93
 94When providing an inline yaml string, it is recommended to use single quotes to avoid shell
 95interpolation.
 96
 97Providing secrets:
 98
 99You can provide secrets in your configuration file by prefixing the secret value with `SECRET:`.
100For example, --config='{password: "SECRET:my_password"'} will look for a secret named `my_password`
101in the secret store. By default, PyAirbyte will look for secrets in environment variables and
102dotenv (.env) files. If a secret is not found, you'll be prompted to provide the secret value
103interactively in the terminal.
104
105It is highly recommended to use secrets when using inline yaml strings, in order to avoid
106exposing secrets in plain text in the terminal history. Secrets provided interactively will
107not be echoed to the terminal.
108"""
109
110# Add the CLI guidance to the module docstring.
111globals()["__doc__"] = globals().get("__doc__", "") + CLI_GUIDANCE
112
113CONFIG_HELP = (
114    "Either a path to a configuration file for the named source or destination, "
115    "or an inline yaml string. If providing an inline yaml string, use single quotes "
116    "to avoid shell interpolation. For example, --config='{key: value}' or "
117    "--config='{key: {nested: value}}'. \n"
118    "PyAirbyte secrets can be accessed by prefixing the secret name with 'SECRET:'. "
119    """For example, --config='{password: "SECRET:MY_PASSWORD"}'."""
120)
121
122PIP_URL_HELP = (
123    "This can be anything pip accepts, including: a PyPI package name, a local path, "
124    "a git repository, a git branch ref, etc. Use '.' to install from the current local "
125    "directory."
126)
127
128
129def _resolve_config(
130    config: str,
131) -> dict[str, Any]:
132    """Resolve the configuration file into a dictionary."""
133
134    def _inject_secrets(config_dict: dict[str, Any]) -> None:
135        """Inject secrets into the configuration dictionary."""
136        for key, value in config_dict.items():
137            if isinstance(value, dict):
138                _inject_secrets(value)
139            elif isinstance(value, str) and value.startswith("SECRET:"):
140                config_dict[key] = get_secret(value.removeprefix("SECRET:").strip())
141
142    config_dict: dict[str, Any]
143    if config.startswith("{"):
144        # Treat this as an inline yaml string:
145        config_dict = yaml.safe_load(config)
146    else:
147        # Treat this as a path to a config file:
148        config_path = Path(config)
149        if not config_path.exists():
150            raise PyAirbyteInputError(
151                message="Config file not found.",
152                input_value=str(config_path),
153            )
154        config_dict = yaml.safe_load(config_path.read_text(encoding="utf-8"))
155
156    _inject_secrets(config_dict)
157    return config_dict
158
159
160def _is_docker_image(image: str | None) -> bool:
161    """Check if the source or destination is a docker image."""
162    return image is not None and ":" in image
163
164
165def _is_executable_path(connector_str: str) -> bool:
166    return connector_str.startswith(".") or "/" in connector_str
167
168
169def _get_connector_name(connector: str) -> str:
170    if _is_docker_image(connector):
171        return connector.split(":")[0].split("/")[-1]
172
173    return connector
174
175
176def _resolve_source_job(
177    *,
178    source: str | None = None,
179    config: str | None = None,
180    streams: str | None = None,
181    pip_url: str | None = None,
182) -> Source:
183    """Resolve the source job into a configured Source object.
184
185    Args:
186        source: The source name or source reference.
187            If a path is provided, the source will be loaded from the local path.
188            If the source contains a colon (':'), it will be interpreted as a docker image and tag.
189        config: The path to a configuration file for the named source or destination.
190        streams: A comma-separated list of stream names to select for reading. If set to "*",
191            all streams will be selected. If not provided, all streams will be selected.
192        pip_url: Optional. A location from which to install the connector.
193    """
194    config_dict = _resolve_config(config) if config else None
195    streams_list: str | list[str] = streams or "*"
196    if isinstance(streams, str) and streams != "*":
197        streams_list = [stream.strip() for stream in streams.split(",")]
198
199    source_obj: Source
200    if source and _is_docker_image(source):
201        source_obj = get_source(
202            name=_get_connector_name(source),
203            docker_image=source,
204            config=config_dict,
205            streams=streams_list,
206            pip_url=pip_url,
207        )
208        return source_obj
209
210    if source and _is_executable_path(source):
211        # Treat the source as a path.
212        source_executable = Path(source)
213        if not source_executable.exists():
214            raise PyAirbyteInputError(
215                message="Source executable not found.",
216                context={
217                    "source": source,
218                },
219            )
220        source_obj = get_source(
221            name=source_executable.stem,
222            local_executable=source_executable,
223            config=config_dict,
224            streams=streams_list,
225            pip_url=pip_url,
226        )
227        return source_obj
228
229    if not source or not source.startswith("source-"):
230        raise PyAirbyteInputError(
231            message="Expected a source name, docker image, or path to executable.",
232            input_value=source,
233        )
234
235    source_name: str = source
236
237    return get_source(
238        name=source_name,
239        config=config_dict,
240        streams=streams_list,
241        pip_url=pip_url,
242    )
243
244
245def _resolve_destination_job(
246    *,
247    destination: str,
248    config: str | None = None,
249    pip_url: str | None = None,
250) -> Destination:
251    """Resolve the destination job into a configured Destination object.
252
253    Args:
254        destination: The destination name or source reference.
255            If a path is provided, the source will be loaded from the local path.
256            If the destination contains a colon (':'), it will be interpreted as a docker image
257            and tag.
258        config: The path to a configuration file for the named source or destination.
259        pip_url: Optional. A location from which to install the connector.
260    """
261    config_dict = _resolve_config(config) if config else None
262
263    if destination and (destination.startswith(".") or "/" in destination):
264        # Treat the destination as a path.
265        destination_executable = Path(destination)
266        if not destination_executable.exists():
267            raise PyAirbyteInputError(
268                message="Destination executable not found.",
269                context={
270                    "destination": destination,
271                },
272            )
273        return get_destination(
274            name=destination_executable.stem,
275            local_executable=destination_executable,
276            config=config_dict,
277            pip_url=pip_url,
278        )
279
280    # else: # Treat the destination as a name.
281
282    return get_destination(
283        name=destination,
284        config=config_dict,
285        pip_url=pip_url,
286    )
287
288
289@click.command(
290    help=(
291        "Validate the connector has a valid CLI and is able to run `spec`. "
292        "If 'config' is provided, we will also run a `check` on the connector "
293        "with the provided config.\n\n" + CLI_GUIDANCE
294    ),
295)
296@click.option(
297    "--connector",
298    type=str,
299    help="The connector name or a path to the local executable.",
300)
301@click.option(
302    "--pip-url",
303    type=str,
304    help=(
305        "Optional. The location from which to install the connector. "
306        "This can be a anything pip accepts, including: a PyPI package name, a local path, "
307        "a git repository, a git branch ref, etc."
308    ),
309)
310@click.option(
311    "--config",
312    type=str,
313    required=False,
314    help=CONFIG_HELP,
315)
316def validate(
317    connector: str | None = None,
318    config: str | None = None,
319    pip_url: str | None = None,
320) -> None:
321    """CLI command to run a `benchmark` operation."""
322    if not connector:
323        raise PyAirbyteInputError(
324            message="No connector provided.",
325        )
326
327    connector_obj: Source | Destination
328    if "source-" in connector:
329        connector_obj = _resolve_source_job(
330            source=connector,
331            config=None,
332            streams=None,
333            pip_url=pip_url,
334        )
335    else:  # destination
336        connector_obj = _resolve_destination_job(
337            destination=connector,
338            config=None,
339            pip_url=pip_url,
340        )
341
342    print("Getting `spec` output from connector...")
343    connector_obj.print_config_spec()
344
345    if config:
346        print("Running connector check...")
347        config_dict: dict[str, Any] = _resolve_config(config)
348        connector_obj.set_config(config_dict)
349        connector_obj.check()
350
351
352@click.command()
353@click.option(
354    "--source",
355    type=str,
356    help=(
357        "The source name, with an optional version declaration. "
358        "If the name contains a colon (':'), it will be interpreted as a docker image and tag. "
359    ),
360)
361@click.option(
362    "--streams",
363    type=str,
364    default="*",
365    help=(
366        "A comma-separated list of stream names to select for reading. If set to '*', all streams "
367        "will be selected. Defaults to '*'."
368    ),
369)
370@click.option(
371    "--num-records",
372    type=str,
373    default="5e5",
374    help=(
375        "The number of records to generate for the benchmark. Ignored if a source is provided. "
376        "You can specify the number of records to generate using scientific notation. "
377        "For example, `5e6` will generate 5 million records. By default, 500,000 records will "
378        "be generated (`5e5` records). If underscores are providing within a numeric a string, "
379        "they will be ignored."
380    ),
381)
382@click.option(
383    "--destination",
384    type=str,
385    help=(
386        "The destination name, with an optional version declaration. "
387        "If a path is provided, it will be interpreted as a path to the local executable. "
388    ),
389)
390@click.option(
391    "--config",
392    type=str,
393    help=CONFIG_HELP,
394)
395def benchmark(
396    source: str | None = None,
397    streams: str = "*",
398    num_records: int | str = "5e5",  # 500,000 records
399    destination: str | None = None,
400    config: str | None = None,
401) -> None:
402    """CLI command to run a `benchmark` operation.
403
404    You can provide either a source or a destination, but not both. If a destination is being
405    benchmarked, you can use `--num-records` to specify the number of records to generate for the
406    benchmark.
407
408    If a source is being benchmarked, you can provide a configuration file or a job
409    definition file to run the source job.
410    """
411    if source and destination:
412        raise PyAirbyteInputError(
413            message="For benchmarking, source or destination can be provided, but not both.",
414        )
415    destination_obj: Destination
416    source_obj: Source
417
418    source_obj = (
419        _resolve_source_job(
420            source=source,
421            config=config,
422            streams=streams,
423        )
424        if source
425        else get_benchmark_source(
426            num_records=num_records,
427        )
428    )
429    destination_obj = (
430        _resolve_destination_job(
431            destination=destination,
432            config=config,
433        )
434        if destination
435        else get_noop_destination()
436    )
437
438    click.echo("Running benchmarks...")
439    destination_obj.write(
440        source_data=source_obj,
441        cache=False,
442        state_cache=False,
443    )
444
445
446@click.command()
447@click.option(
448    "--source",
449    type=str,
450    help=(
451        "The source name, with an optional version declaration. "
452        "If the name contains a colon (':'), it will be interpreted as a docker image and tag. "
453    ),
454)
455@click.option(
456    "--destination",
457    type=str,
458    help=(
459        "The destination name, with an optional version declaration. "
460        "If a path is provided, it will be interpreted as a path to the local executable. "
461    ),
462)
463@click.option(
464    "--streams",
465    type=str,
466    help=(
467        "A comma-separated list of stream names to select for reading. If set to '*', all streams "
468        "will be selected. Defaults to '*'."
469    ),
470)
471@click.option(
472    "--Sconfig",
473    "source_config",
474    type=str,
475    help="The source config. " + CONFIG_HELP,
476)
477@click.option(
478    "--Dconfig",
479    "destination_config",
480    type=str,
481    help="The destination config. " + CONFIG_HELP,
482)
483@click.option(
484    "--Spip-url",
485    "source_pip_url",
486    type=str,
487    help="Optional pip URL for the source (Python connectors only). " + PIP_URL_HELP,
488)
489@click.option(
490    "--Dpip-url",
491    "destination_pip_url",
492    type=str,
493    help="Optional pip URL for the destination (Python connectors only). " + PIP_URL_HELP,
494)
495def sync(
496    *,
497    source: str,
498    source_config: str | None = None,
499    source_pip_url: str | None = None,
500    destination: str,
501    destination_config: str | None = None,
502    destination_pip_url: str | None = None,
503    streams: str | None = None,
504) -> None:
505    """CLI command to run a `sync` operation.
506
507    Currently, this only supports full refresh syncs. Incremental syncs are not yet supported.
508    Custom catalog syncs are not yet supported.
509    """
510    destination_obj: Destination
511    source_obj: Source
512
513    source_obj = _resolve_source_job(
514        source=source,
515        config=source_config,
516        streams=streams,
517        pip_url=source_pip_url,
518    )
519    destination_obj = _resolve_destination_job(
520        destination=destination,
521        config=destination_config,
522        pip_url=destination_pip_url,
523    )
524
525    click.echo("Running sync...")
526    destination_obj.write(
527        source_data=source_obj,
528        cache=False,
529        state_cache=False,
530    )
531
532
533@click.group()
534def cli() -> None:
535    """@private PyAirbyte CLI."""
536    pass
537
538
539cli.add_command(validate)
540cli.add_command(benchmark)
541cli.add_command(sync)
542
543if __name__ == "__main__":
544    cli()
CLI_GUIDANCE = '\n----------------------\n\nPyAirbyte CLI Guidance\n\nProviding connector configuration:\n\nWhen providing configuration via `--config`, you can providing any of the following:\n\n1. A path to a configuration file, in yaml or json format.\n\n2. An inline yaml string, e.g. `--config=\'{key: value}\'`, --config=\'{key: {nested: value}}\'.\n\nWhen providing an inline yaml string, it is recommended to use single quotes to avoid shell\ninterpolation.\n\nProviding secrets:\n\nYou can provide secrets in your configuration file by prefixing the secret value with `SECRET:`.\nFor example, --config=\'{password: "SECRET:my_password"\'} will look for a secret named `my_password`\nin the secret store. By default, PyAirbyte will look for secrets in environment variables and\ndotenv (.env) files. If a secret is not found, you\'ll be prompted to provide the secret value\ninteractively in the terminal.\n\nIt is highly recommended to use secrets when using inline yaml strings, in order to avoid\nexposing secrets in plain text in the terminal history. Secrets provided interactively will\nnot be echoed to the terminal.\n'
CONFIG_HELP = 'Either a path to a configuration file for the named source or destination, or an inline yaml string. If providing an inline yaml string, use single quotes to avoid shell interpolation. For example, --config=\'{key: value}\' or --config=\'{key: {nested: value}}\'. \nPyAirbyte secrets can be accessed by prefixing the secret name with \'SECRET:\'. For example, --config=\'{password: "SECRET:MY_PASSWORD"}\'.'
PIP_URL_HELP = "This can be anything pip accepts, including: a PyPI package name, a local path, a git repository, a git branch ref, etc. Use '.' to install from the current local directory."
validate = <Command validate>

CLI command to run a benchmark operation.

benchmark = <Command benchmark>

CLI command to run a benchmark operation.

You can provide either a source or a destination, but not both. If a destination is being benchmarked, you can use --num-records to specify the number of records to generate for the benchmark.

If a source is being benchmarked, you can provide a configuration file or a job definition file to run the source job.

sync = <Command sync>

CLI command to run a sync operation.

Currently, this only supports full refresh syncs. Incremental syncs are not yet supported. Custom catalog syncs are not yet supported.