airbyte.cli

CLI for PyAirbyte.

The PyAirbyte CLI provides a command-line interface for testing connectors and running benchmarks.

After installing PyAirbyte, the CLI can be invoked with the pyairbyte CLI executable, or the shorter pyab alias.

These are equivalent:

python -m airbyte.cli --help
pyairbyte --help
pyab --help

You can also use pipx or the fast and powerful uv tool to run the PyAirbyte CLI without pre-installing:

# Install `uv` if you haven't already:
brew install uv

# Run the PyAirbyte CLI using `uvx`:
uvx --from=airbyte pyab --help

Example benchmark Usage:

# PyAirbyte System Benchmark (no-op):
pyab benchmark --num-records=2.4e6

# Source Benchmark:
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}'
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*'
pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields

# Source Benchmark from Docker Image:
pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}'
pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}'

# Destination Benchmark:
pyab benchmark --destination=destination-dev-null --config=/path/to/config.json

# Benchmark a Local Python Source (source-s3):
pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json
# Equivalent to:
LOCAL_EXECUTABLE=$(poetry run which source-s3)
CONFIG_PATH=$(realpath ./secrets/config.json)
pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH

Example validate Usage:

pyab validate --connector=source-hardcoded-records
pyab validate --connector=source-hardcoded-records --config='{count: 400_000}'

PyAirbyte CLI Guidance

Providing connector configuration:

When providing configuration via --config, you can providing any of the following:

  1. A path to a configuration file, in yaml or json format.

  2. An inline yaml string, e.g. --config='{key: value}', --config='{key: {nested: value}}'.

When providing an inline yaml string, it is recommended to use single quotes to avoid shell interpolation.

Providing secrets:

You can provide secrets in your configuration file by prefixing the secret value with SECRET:. For example, --config='{password: "SECRET:my_password"'} will look for a secret named my_password in the secret store. By default, PyAirbyte will look for secrets in environment variables and dotenv (.env) files. If a secret is not found, you'll be prompted to provide the secret value interactively in the terminal.

It is highly recommended to use secrets when using inline yaml strings, in order to avoid exposing secrets in plain text in the terminal history. Secrets provided interactively will not be echoed to the terminal.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""CLI for PyAirbyte.
  3
  4The PyAirbyte CLI provides a command-line interface for testing connectors and running benchmarks.
  5
  6After installing PyAirbyte, the CLI can be invoked with the `pyairbyte` CLI executable, or the
  7shorter `pyab` alias.
  8
  9These are equivalent:
 10
 11```bash
 12python -m airbyte.cli --help
 13pyairbyte --help
 14pyab --help
 15```
 16
 17You can also use `pipx` or the fast and powerful `uv` tool to run the PyAirbyte CLI
 18without pre-installing:
 19
 20```bash
 21# Install `uv` if you haven't already:
 22brew install uv
 23
 24# Run the PyAirbyte CLI using `uvx`:
 25uvx --from=airbyte pyab --help
 26```
 27
 28Example `benchmark` Usage:
 29
 30```bash
 31# PyAirbyte System Benchmark (no-op):
 32pyab benchmark --num-records=2.4e6
 33
 34# Source Benchmark:
 35pyab benchmark --source=source-hardcoded-records --config='{count: 400000}'
 36pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*'
 37pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields
 38
 39# Source Benchmark from Docker Image:
 40pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}'
 41pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}'
 42
 43# Destination Benchmark:
 44pyab benchmark --destination=destination-dev-null --config=/path/to/config.json
 45
 46# Benchmark a Local Python Source (source-s3):
 47pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json
 48# Equivalent to:
 49LOCAL_EXECUTABLE=$(poetry run which source-s3)
 50CONFIG_PATH=$(realpath ./secrets/config.json)
 51pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH
 52```
 53
 54Example `validate` Usage:
 55
 56```bash
 57pyab validate --connector=source-hardcoded-records
 58pyab validate --connector=source-hardcoded-records --config='{count: 400_000}'
 59```
 60"""
 61
 62from __future__ import annotations
 63
 64import re
 65import sys
 66from pathlib import Path
 67from typing import TYPE_CHECKING, Any
 68
 69import click
 70import yaml
 71
 72from airbyte.destinations.util import get_destination, get_noop_destination
 73from airbyte.exceptions import PyAirbyteInputError
 74from airbyte.secrets.util import get_secret
 75from airbyte.sources.util import get_benchmark_source, get_source
 76
 77
 78if TYPE_CHECKING:
 79    from airbyte.destinations.base import Destination
 80    from airbyte.sources.base import Source
 81
 82
 83CLI_GUIDANCE = """
 84----------------------
 85
 86PyAirbyte CLI Guidance
 87
 88Providing connector configuration:
 89
 90When providing configuration via `--config`, you can providing any of the following:
 91
 921. A path to a configuration file, in yaml or json format.
 93
 942. An inline yaml string, e.g. `--config='{key: value}'`, --config='{key: {nested: value}}'.
 95
 96When providing an inline yaml string, it is recommended to use single quotes to avoid shell
 97interpolation.
 98
 99Providing secrets:
100
101You can provide secrets in your configuration file by prefixing the secret value with `SECRET:`.
102For example, --config='{password: "SECRET:my_password"'} will look for a secret named `my_password`
103in the secret store. By default, PyAirbyte will look for secrets in environment variables and
104dotenv (.env) files. If a secret is not found, you'll be prompted to provide the secret value
105interactively in the terminal.
106
107It is highly recommended to use secrets when using inline yaml strings, in order to avoid
108exposing secrets in plain text in the terminal history. Secrets provided interactively will
109not be echoed to the terminal.
110"""
111
112# Add the CLI guidance to the module docstring.
113globals()["__doc__"] = globals().get("__doc__", "") + CLI_GUIDANCE
114
115CONFIG_HELP = (
116    "Either a path to a configuration file for the named source or destination, "
117    "or an inline yaml string. If providing an inline yaml string, use single quotes "
118    "to avoid shell interpolation. For example, --config='{key: value}' or "
119    "--config='{key: {nested: value}}'. \n"
120    "PyAirbyte secrets can be accessed by prefixing the secret name with 'SECRET:'. "
121    """For example, --config='{password: "SECRET:MY_PASSWORD"}'."""
122)
123
124PIP_URL_HELP = (
125    "This can be anything pip accepts, including: a PyPI package name, a local path, "
126    "a git repository, a git branch ref, etc. Use '.' to install from the current local "
127    "directory."
128)
129
130
131def _resolve_config(
132    config: str,
133) -> dict[str, Any]:
134    """Resolve the configuration file into a dictionary."""
135
136    def _inject_secrets(config_dict: dict[str, Any]) -> None:
137        """Inject secrets into the configuration dictionary."""
138        for key, value in config_dict.items():
139            if isinstance(value, dict):
140                _inject_secrets(value)
141            elif isinstance(value, str) and value.startswith("SECRET:"):
142                config_dict[key] = get_secret(value.removeprefix("SECRET:").strip())
143
144    config_dict: dict[str, Any]
145    if config.startswith("{"):
146        # Treat this as an inline yaml string:
147        config_dict = yaml.safe_load(config)
148    else:
149        # Treat this as a path to a config file:
150        config_path = Path(config)
151        if not config_path.exists():
152            raise PyAirbyteInputError(
153                message="Config file not found.",
154                input_value=str(config_path),
155            )
156        config_dict = yaml.safe_load(config_path.read_text(encoding="utf-8"))
157
158    _inject_secrets(config_dict)
159    return config_dict
160
161
162def _is_docker_image(image: str | None) -> bool:
163    """Check if the source or destination is a docker image."""
164    return image is not None and ":" in image
165
166
167def _is_executable_path(connector_str: str) -> bool:
168    return connector_str.startswith(".") or "/" in connector_str
169
170
171def _get_connector_name(connector: str) -> str:
172    if _is_docker_image(connector):
173        return connector.split(":")[0].split("/")[-1]
174
175    return connector
176
177
178def _parse_use_python(use_python_str: str | None) -> bool | Path | str | None:
179    r"""Parse the use_python CLI parameter.
180
181    Args:
182        use_python_str: The raw string value from CLI input.
183
184    Returns:
185        - None: No parameter provided
186        - True: Use current Python interpreter ("true")
187        - False: Use Docker instead ("false")
188        - Path: Use interpreter at this path (paths containing / or \ or starting with .)
189        - str: Use uv-managed Python version (semver patterns like "3.12", "3.11.5")
190               or existing interpreter name (non-semver strings like "python3.10")
191    """
192    if use_python_str is None:
193        return None
194    if use_python_str.lower() == "true":
195        return True
196    if use_python_str.lower() == "false":
197        return False
198    if "/" in use_python_str or "\\" in use_python_str or use_python_str.startswith("."):
199        return Path(use_python_str)
200
201    semver_pattern = r"^\d+\.\d+(?:\.\d+)?$"
202    if re.match(semver_pattern, use_python_str):
203        return use_python_str  # Return as string for uv-managed version
204
205    return Path(use_python_str)
206
207
208def _resolve_source_job(
209    *,
210    source: str | None = None,
211    config: str | None = None,
212    streams: str | None = None,
213    pip_url: str | None = None,
214    use_python: str | None = None,
215) -> Source:
216    """Resolve the source job into a configured Source object.
217
218    Args:
219        source: The source name or source reference.
220            If a path is provided, the source will be loaded from the local path.
221            If the source contains a colon (':'), it will be interpreted as a docker image and tag.
222        config: The path to a configuration file for the named source or destination.
223        streams: A comma-separated list of stream names to select for reading. If set to "*",
224            all streams will be selected. If not provided, all streams will be selected.
225        pip_url: Optional. A location from which to install the connector.
226        use_python: Optional. Python interpreter specification.
227    """
228    config_dict = _resolve_config(config) if config else None
229    streams_list: str | list[str] = streams or "*"
230    if isinstance(streams, str) and streams != "*":
231        streams_list = [stream.strip() for stream in streams.split(",")]
232
233    use_python_parsed = _parse_use_python(use_python)
234
235    source_obj: Source
236    if source and _is_docker_image(source):
237        source_obj = get_source(
238            name=_get_connector_name(source),
239            docker_image=source,
240            config=config_dict,
241            streams=streams_list,
242            pip_url=pip_url,
243            use_python=use_python_parsed,
244        )
245        return source_obj
246
247    if source and _is_executable_path(source):
248        # Treat the source as a path.
249        source_executable = Path(source)
250        if not source_executable.exists():
251            raise PyAirbyteInputError(
252                message="Source executable not found.",
253                context={
254                    "source": source,
255                },
256            )
257        source_obj = get_source(
258            name=source_executable.stem,
259            local_executable=source_executable,
260            config=config_dict,
261            streams=streams_list,
262            pip_url=pip_url,
263            use_python=use_python_parsed,
264        )
265        return source_obj
266
267    if not source or not source.startswith("source-"):
268        raise PyAirbyteInputError(
269            message="Expected a source name, docker image, or path to executable.",
270            input_value=source,
271        )
272
273    source_name: str = source
274
275    return get_source(
276        name=source_name,
277        config=config_dict,
278        streams=streams_list,
279        pip_url=pip_url,
280        use_python=use_python_parsed,
281    )
282
283
284def _get_noop_destination_config() -> dict[str, Any]:
285    return {
286        "test_destination": {
287            "test_destination_type": "SILENT",
288        }
289    }
290
291
292def _resolve_destination_job(
293    *,
294    destination: str,
295    config: str | None = None,
296    pip_url: str | None = None,
297    use_python: str | None = None,
298) -> Destination:
299    """Resolve the destination job into a configured Destination object.
300
301    Args:
302        destination: The destination name or source reference.
303            If a path is provided, the source will be loaded from the local path.
304            If the destination contains a colon (':'), it will be interpreted as a docker image
305            and tag.
306        config: The path to a configuration file for the named source or destination.
307        pip_url: Optional. A location from which to install the connector.
308        use_python: Optional. Python interpreter specification.
309    """
310    config_dict = _resolve_config(config) if config else {}
311    use_python_parsed = _parse_use_python(use_python)
312
313    destination_name = _get_connector_name(destination)
314
315    if destination_name == "destination-dev-null" and not config:
316        config_dict = _get_noop_destination_config()
317
318    if _is_docker_image(destination):
319        return get_destination(
320            name=destination_name,
321            docker_image=destination,
322            config=config_dict,
323            pip_url=pip_url,
324            use_python=use_python_parsed,
325        )
326
327    if destination and (destination.startswith(".") or "/" in destination):
328        # Treat the destination as a path.
329        destination_executable = Path(destination)
330        if not destination_executable.exists():
331            raise PyAirbyteInputError(
332                message="Destination executable not found.",
333                context={
334                    "destination": destination,
335                },
336            )
337        return get_destination(
338            name=destination_executable.stem,
339            local_executable=destination_executable,
340            config=config_dict,
341            pip_url=pip_url,
342            use_python=use_python_parsed,
343        )
344
345    # else: # Treat the destination as a name.
346
347    return get_destination(
348        name=destination,
349        config=config_dict,
350        pip_url=pip_url,
351        use_python=use_python_parsed,
352    )
353
354
355@click.command(
356    help=(
357        "Validate the connector has a valid CLI and is able to run `spec`. "
358        "If 'config' is provided, we will also run a `check` on the connector "
359        "with the provided config.\n\n" + CLI_GUIDANCE
360    ),
361)
362@click.option(
363    "--connector",
364    type=str,
365    help="The connector name or a path to the local executable.",
366)
367@click.option(
368    "--pip-url",
369    type=str,
370    help=(
371        "Optional. The location from which to install the connector. "
372        "This can be a anything pip accepts, including: a PyPI package name, a local path, "
373        "a git repository, a git branch ref, etc."
374    ),
375)
376@click.option(
377    "--config",
378    type=str,
379    required=False,
380    help=CONFIG_HELP,
381)
382@click.option(
383    "--use-python",
384    type=str,
385    help=(
386        "Python interpreter specification. Use 'true' for current Python, "
387        "'false' for Docker, a path for specific interpreter, or a version "
388        "string for uv-managed Python (e.g., '3.11', 'python3.12')."
389    ),
390)
391def validate(
392    connector: str | None = None,
393    config: str | None = None,
394    pip_url: str | None = None,
395    use_python: str | None = None,
396) -> None:
397    """CLI command to run a `benchmark` operation."""
398    if not connector:
399        raise PyAirbyteInputError(
400            message="No connector provided.",
401        )
402
403    connector_obj: Source | Destination
404    if "source-" in connector:
405        connector_obj = _resolve_source_job(
406            source=connector,
407            config=None,
408            streams=None,
409            pip_url=pip_url,
410            use_python=use_python,
411        )
412    else:  # destination
413        connector_obj = _resolve_destination_job(
414            destination=connector,
415            config=None,
416            pip_url=pip_url,
417            use_python=use_python,
418        )
419
420    print("Getting `spec` output from connector...", file=sys.stderr)
421    connector_obj.print_config_spec(stderr=True)
422
423    if config:
424        print("Running connector check...")
425        config_dict: dict[str, Any] = _resolve_config(config)
426        connector_obj.set_config(config_dict)
427        connector_obj.check()
428
429
430@click.command()
431@click.option(
432    "--source",
433    type=str,
434    help=(
435        "The source name, with an optional version declaration. "
436        "If the name contains a colon (':'), it will be interpreted as a docker image and tag. "
437    ),
438)
439@click.option(
440    "--streams",
441    type=str,
442    default="*",
443    help=(
444        "A comma-separated list of stream names to select for reading. If set to '*', all streams "
445        "will be selected. Defaults to '*'."
446    ),
447)
448@click.option(
449    "--num-records",
450    type=str,
451    default="5e5",
452    help=(
453        "The number of records to generate for the benchmark. Ignored if a source is provided. "
454        "You can specify the number of records to generate using scientific notation. "
455        "For example, `5e6` will generate 5 million records. By default, 500,000 records will "
456        "be generated (`5e5` records). If underscores are providing within a numeric a string, "
457        "they will be ignored."
458    ),
459)
460@click.option(
461    "--destination",
462    type=str,
463    help=(
464        "The destination name, with an optional version declaration. "
465        "If a path is provided, it will be interpreted as a path to the local executable. "
466    ),
467)
468@click.option(
469    "--config",
470    type=str,
471    help=CONFIG_HELP,
472)
473@click.option(
474    "--use-python",
475    type=str,
476    help=(
477        "Python interpreter specification. Use 'true' for current Python, "
478        "'false' for Docker, a path for specific interpreter, or a version "
479        "string for uv-managed Python (e.g., '3.11', 'python3.12')."
480    ),
481)
482def benchmark(
483    source: str | None = None,
484    streams: str = "*",
485    num_records: int | str = "5e5",  # 500,000 records
486    destination: str | None = None,
487    config: str | None = None,
488    use_python: str | None = None,
489) -> None:
490    """CLI command to run a `benchmark` operation.
491
492    You can provide either a source or a destination, but not both. If a destination is being
493    benchmarked, you can use `--num-records` to specify the number of records to generate for the
494    benchmark.
495
496    If a source is being benchmarked, you can provide a configuration file or a job
497    definition file to run the source job.
498    """
499    if source and destination:
500        raise PyAirbyteInputError(
501            message="For benchmarking, source or destination can be provided, but not both.",
502        )
503    destination_obj: Destination
504    source_obj: Source
505
506    source_obj = (
507        _resolve_source_job(
508            source=source,
509            config=config,
510            streams=streams,
511            use_python=use_python,
512        )
513        if source
514        else get_benchmark_source(
515            num_records=num_records,
516        )
517    )
518    destination_obj = (
519        _resolve_destination_job(
520            destination=destination,
521            config=config,
522            use_python=use_python,
523        )
524        if destination
525        else get_noop_destination()
526    )
527
528    click.echo("Running benchmarks...", sys.stderr)
529    destination_obj.write(
530        source_data=source_obj,
531        cache=False,
532        state_cache=False,
533    )
534
535
536@click.command()
537@click.option(
538    "--source",
539    type=str,
540    help=(
541        "The source name, with an optional version declaration. "
542        "If the name contains a colon (':'), it will be interpreted as a docker image and tag. "
543    ),
544)
545@click.option(
546    "--destination",
547    type=str,
548    help=(
549        "The destination name, with an optional version declaration. "
550        "If a path is provided, it will be interpreted as a path to the local executable. "
551    ),
552)
553@click.option(
554    "--streams",
555    type=str,
556    help=(
557        "A comma-separated list of stream names to select for reading. If set to '*', all streams "
558        "will be selected. Defaults to '*'."
559    ),
560)
561@click.option(
562    "--Sconfig",
563    "source_config",
564    type=str,
565    help="The source config. " + CONFIG_HELP,
566)
567@click.option(
568    "--Dconfig",
569    "destination_config",
570    type=str,
571    help="The destination config. " + CONFIG_HELP,
572)
573@click.option(
574    "--Spip-url",
575    "source_pip_url",
576    type=str,
577    help="Optional pip URL for the source (Python connectors only). " + PIP_URL_HELP,
578)
579@click.option(
580    "--Dpip-url",
581    "destination_pip_url",
582    type=str,
583    help="Optional pip URL for the destination (Python connectors only). " + PIP_URL_HELP,
584)
585@click.option(
586    "--use-python",
587    type=str,
588    help=(
589        "Python interpreter specification. Use 'true' for current Python, "
590        "'false' for Docker, a path for specific interpreter, or a version "
591        "string for uv-managed Python (e.g., '3.11', 'python3.12')."
592    ),
593)
594def sync(
595    *,
596    source: str,
597    source_config: str | None = None,
598    source_pip_url: str | None = None,
599    destination: str,
600    destination_config: str | None = None,
601    destination_pip_url: str | None = None,
602    streams: str | None = None,
603    use_python: str | None = None,
604) -> None:
605    """CLI command to run a `sync` operation.
606
607    Currently, this only supports full refresh syncs. Incremental syncs are not yet supported.
608    Custom catalog syncs are not yet supported.
609    """
610    destination_obj: Destination
611    source_obj: Source
612
613    source_obj = _resolve_source_job(
614        source=source,
615        config=source_config,
616        streams=streams,
617        pip_url=source_pip_url,
618        use_python=use_python,
619    )
620    destination_obj = _resolve_destination_job(
621        destination=destination,
622        config=destination_config,
623        pip_url=destination_pip_url,
624        use_python=use_python,
625    )
626
627    click.echo("Running sync...")
628    destination_obj.write(
629        source_data=source_obj,
630        cache=False,
631        state_cache=False,
632    )
633
634
635@click.group()
636def cli() -> None:
637    """@private PyAirbyte CLI."""
638    pass
639
640
641cli.add_command(validate)
642cli.add_command(benchmark)
643cli.add_command(sync)
644
645if __name__ == "__main__":
646    cli()
CLI_GUIDANCE = '\n----------------------\n\nPyAirbyte CLI Guidance\n\nProviding connector configuration:\n\nWhen providing configuration via `--config`, you can providing any of the following:\n\n1. A path to a configuration file, in yaml or json format.\n\n2. An inline yaml string, e.g. `--config=\'{key: value}\'`, --config=\'{key: {nested: value}}\'.\n\nWhen providing an inline yaml string, it is recommended to use single quotes to avoid shell\ninterpolation.\n\nProviding secrets:\n\nYou can provide secrets in your configuration file by prefixing the secret value with `SECRET:`.\nFor example, --config=\'{password: "SECRET:my_password"\'} will look for a secret named `my_password`\nin the secret store. By default, PyAirbyte will look for secrets in environment variables and\ndotenv (.env) files. If a secret is not found, you\'ll be prompted to provide the secret value\ninteractively in the terminal.\n\nIt is highly recommended to use secrets when using inline yaml strings, in order to avoid\nexposing secrets in plain text in the terminal history. Secrets provided interactively will\nnot be echoed to the terminal.\n'
CONFIG_HELP = 'Either a path to a configuration file for the named source or destination, or an inline yaml string. If providing an inline yaml string, use single quotes to avoid shell interpolation. For example, --config=\'{key: value}\' or --config=\'{key: {nested: value}}\'. \nPyAirbyte secrets can be accessed by prefixing the secret name with \'SECRET:\'. For example, --config=\'{password: "SECRET:MY_PASSWORD"}\'.'
PIP_URL_HELP = "This can be anything pip accepts, including: a PyPI package name, a local path, a git repository, a git branch ref, etc. Use '.' to install from the current local directory."
validate = <Command validate>

CLI command to run a benchmark operation.

benchmark = <Command benchmark>

CLI command to run a benchmark operation.

You can provide either a source or a destination, but not both. If a destination is being benchmarked, you can use --num-records to specify the number of records to generate for the benchmark.

If a source is being benchmarked, you can provide a configuration file or a job definition file to run the source job.

sync = <Command sync>

CLI command to run a sync operation.

Currently, this only supports full refresh syncs. Incremental syncs are not yet supported. Custom catalog syncs are not yet supported.