airbyte.cli
CLI for PyAirbyte.
The PyAirbyte CLI provides a command-line interface for testing connectors and running benchmarks.
After installing PyAirbyte, the CLI can be invoked with the pyairbyte
CLI executable, or the
shorter pyab
alias.
These are equivalent:
python -m airbyte.cli --help
pyairbyte --help
pyab --help
You can also use pipx
or the fast and powerful uv
tool to run the PyAirbyte CLI
without pre-installing:
# Install `uv` if you haven't already:
brew install uv
# Run the PyAirbyte CLI using `uvx`:
uvx --from=airbyte pyab --help
Example benchmark
Usage:
# PyAirbyte System Benchmark (no-op):
pyab benchmark --num-records=2.4e6
# Source Benchmark:
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}'
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*'
pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields
# Source Benchmark from Docker Image:
pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}'
pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}'
# Destination Benchmark:
pyab benchmark --destination=destination-dev-null --config=/path/to/config.json
# Benchmark a Local Python Source (source-s3):
pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json
# Equivalent to:
LOCAL_EXECUTABLE=$(poetry run which source-s3)
CONFIG_PATH=$(realpath ./secrets/config.json)
pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH
Example validate
Usage:
pyab validate --connector=source-hardcoded-records
pyab validate --connector=source-hardcoded-records --config='{count: 400_000}'
PyAirbyte CLI Guidance
Providing connector configuration:
When providing configuration via --config
, you can providing any of the following:
A path to a configuration file, in yaml or json format.
An inline yaml string, e.g.
--config='{key: value}'
, --config='{key: {nested: value}}'.
When providing an inline yaml string, it is recommended to use single quotes to avoid shell interpolation.
Providing secrets:
You can provide secrets in your configuration file by prefixing the secret value with SECRET:
.
For example, --config='{password: "SECRET:my_password"'} will look for a secret named my_password
in the secret store. By default, PyAirbyte will look for secrets in environment variables and
dotenv (.env) files. If a secret is not found, you'll be prompted to provide the secret value
interactively in the terminal.
It is highly recommended to use secrets when using inline yaml strings, in order to avoid exposing secrets in plain text in the terminal history. Secrets provided interactively will not be echoed to the terminal.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""CLI for PyAirbyte. 3 4The PyAirbyte CLI provides a command-line interface for testing connectors and running benchmarks. 5 6After installing PyAirbyte, the CLI can be invoked with the `pyairbyte` CLI executable, or the 7shorter `pyab` alias. 8 9These are equivalent: 10 11```bash 12python -m airbyte.cli --help 13pyairbyte --help 14pyab --help 15``` 16 17You can also use `pipx` or the fast and powerful `uv` tool to run the PyAirbyte CLI 18without pre-installing: 19 20```bash 21# Install `uv` if you haven't already: 22brew install uv 23 24# Run the PyAirbyte CLI using `uvx`: 25uvx --from=airbyte pyab --help 26``` 27 28Example `benchmark` Usage: 29 30```bash 31# PyAirbyte System Benchmark (no-op): 32pyab benchmark --num-records=2.4e6 33 34# Source Benchmark: 35pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' 36pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*' 37pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields 38 39# Source Benchmark from Docker Image: 40pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}' 41pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}' 42 43# Destination Benchmark: 44pyab benchmark --destination=destination-dev-null --config=/path/to/config.json 45 46# Benchmark a Local Python Source (source-s3): 47pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json 48# Equivalent to: 49LOCAL_EXECUTABLE=$(poetry run which source-s3) 50CONFIG_PATH=$(realpath ./secrets/config.json) 51pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH 52``` 53 54Example `validate` Usage: 55 56```bash 57pyab validate --connector=source-hardcoded-records 58pyab validate --connector=source-hardcoded-records --config='{count: 400_000}' 59``` 60""" 61 62from __future__ import annotations 63 64import re 65import sys 66from pathlib import Path 67from typing import TYPE_CHECKING, Any 68 69import click 70import yaml 71 72from airbyte.destinations.util import get_destination, get_noop_destination 73from airbyte.exceptions import PyAirbyteInputError 74from airbyte.secrets.util import get_secret 75from airbyte.sources.util import get_benchmark_source, get_source 76 77 78if TYPE_CHECKING: 79 from airbyte.destinations.base import Destination 80 from airbyte.sources.base import Source 81 82 83CLI_GUIDANCE = """ 84---------------------- 85 86PyAirbyte CLI Guidance 87 88Providing connector configuration: 89 90When providing configuration via `--config`, you can providing any of the following: 91 921. A path to a configuration file, in yaml or json format. 93 942. An inline yaml string, e.g. `--config='{key: value}'`, --config='{key: {nested: value}}'. 95 96When providing an inline yaml string, it is recommended to use single quotes to avoid shell 97interpolation. 98 99Providing secrets: 100 101You can provide secrets in your configuration file by prefixing the secret value with `SECRET:`. 102For example, --config='{password: "SECRET:my_password"'} will look for a secret named `my_password` 103in the secret store. By default, PyAirbyte will look for secrets in environment variables and 104dotenv (.env) files. If a secret is not found, you'll be prompted to provide the secret value 105interactively in the terminal. 106 107It is highly recommended to use secrets when using inline yaml strings, in order to avoid 108exposing secrets in plain text in the terminal history. Secrets provided interactively will 109not be echoed to the terminal. 110""" 111 112# Add the CLI guidance to the module docstring. 113globals()["__doc__"] = globals().get("__doc__", "") + CLI_GUIDANCE 114 115CONFIG_HELP = ( 116 "Either a path to a configuration file for the named source or destination, " 117 "or an inline yaml string. If providing an inline yaml string, use single quotes " 118 "to avoid shell interpolation. For example, --config='{key: value}' or " 119 "--config='{key: {nested: value}}'. \n" 120 "PyAirbyte secrets can be accessed by prefixing the secret name with 'SECRET:'. " 121 """For example, --config='{password: "SECRET:MY_PASSWORD"}'.""" 122) 123 124PIP_URL_HELP = ( 125 "This can be anything pip accepts, including: a PyPI package name, a local path, " 126 "a git repository, a git branch ref, etc. Use '.' to install from the current local " 127 "directory." 128) 129 130 131def _resolve_config( 132 config: str, 133) -> dict[str, Any]: 134 """Resolve the configuration file into a dictionary.""" 135 136 def _inject_secrets(config_dict: dict[str, Any]) -> None: 137 """Inject secrets into the configuration dictionary.""" 138 for key, value in config_dict.items(): 139 if isinstance(value, dict): 140 _inject_secrets(value) 141 elif isinstance(value, str) and value.startswith("SECRET:"): 142 config_dict[key] = get_secret(value.removeprefix("SECRET:").strip()) 143 144 config_dict: dict[str, Any] 145 if config.startswith("{"): 146 # Treat this as an inline yaml string: 147 config_dict = yaml.safe_load(config) 148 else: 149 # Treat this as a path to a config file: 150 config_path = Path(config) 151 if not config_path.exists(): 152 raise PyAirbyteInputError( 153 message="Config file not found.", 154 input_value=str(config_path), 155 ) 156 config_dict = yaml.safe_load(config_path.read_text(encoding="utf-8")) 157 158 _inject_secrets(config_dict) 159 return config_dict 160 161 162def _is_docker_image(image: str | None) -> bool: 163 """Check if the source or destination is a docker image.""" 164 return image is not None and ":" in image 165 166 167def _is_executable_path(connector_str: str) -> bool: 168 return connector_str.startswith(".") or "/" in connector_str 169 170 171def _get_connector_name(connector: str) -> str: 172 if _is_docker_image(connector): 173 return connector.split(":")[0].split("/")[-1] 174 175 return connector 176 177 178def _parse_use_python(use_python_str: str | None) -> bool | Path | str | None: 179 r"""Parse the use_python CLI parameter. 180 181 Args: 182 use_python_str: The raw string value from CLI input. 183 184 Returns: 185 - None: No parameter provided 186 - True: Use current Python interpreter ("true") 187 - False: Use Docker instead ("false") 188 - Path: Use interpreter at this path (paths containing / or \ or starting with .) 189 - str: Use uv-managed Python version (semver patterns like "3.12", "3.11.5") 190 or existing interpreter name (non-semver strings like "python3.10") 191 """ 192 if use_python_str is None: 193 return None 194 if use_python_str.lower() == "true": 195 return True 196 if use_python_str.lower() == "false": 197 return False 198 if "/" in use_python_str or "\\" in use_python_str or use_python_str.startswith("."): 199 return Path(use_python_str) 200 201 semver_pattern = r"^\d+\.\d+(?:\.\d+)?$" 202 if re.match(semver_pattern, use_python_str): 203 return use_python_str # Return as string for uv-managed version 204 205 return Path(use_python_str) 206 207 208def _resolve_source_job( 209 *, 210 source: str | None = None, 211 config: str | None = None, 212 streams: str | None = None, 213 pip_url: str | None = None, 214 use_python: str | None = None, 215) -> Source: 216 """Resolve the source job into a configured Source object. 217 218 Args: 219 source: The source name or source reference. 220 If a path is provided, the source will be loaded from the local path. 221 If the source contains a colon (':'), it will be interpreted as a docker image and tag. 222 config: The path to a configuration file for the named source or destination. 223 streams: A comma-separated list of stream names to select for reading. If set to "*", 224 all streams will be selected. If not provided, all streams will be selected. 225 pip_url: Optional. A location from which to install the connector. 226 use_python: Optional. Python interpreter specification. 227 """ 228 config_dict = _resolve_config(config) if config else None 229 streams_list: str | list[str] = streams or "*" 230 if isinstance(streams, str) and streams != "*": 231 streams_list = [stream.strip() for stream in streams.split(",")] 232 233 use_python_parsed = _parse_use_python(use_python) 234 235 source_obj: Source 236 if source and _is_docker_image(source): 237 source_obj = get_source( 238 name=_get_connector_name(source), 239 docker_image=source, 240 config=config_dict, 241 streams=streams_list, 242 pip_url=pip_url, 243 use_python=use_python_parsed, 244 ) 245 return source_obj 246 247 if source and _is_executable_path(source): 248 # Treat the source as a path. 249 source_executable = Path(source) 250 if not source_executable.exists(): 251 raise PyAirbyteInputError( 252 message="Source executable not found.", 253 context={ 254 "source": source, 255 }, 256 ) 257 source_obj = get_source( 258 name=source_executable.stem, 259 local_executable=source_executable, 260 config=config_dict, 261 streams=streams_list, 262 pip_url=pip_url, 263 use_python=use_python_parsed, 264 ) 265 return source_obj 266 267 if not source or not source.startswith("source-"): 268 raise PyAirbyteInputError( 269 message="Expected a source name, docker image, or path to executable.", 270 input_value=source, 271 ) 272 273 source_name: str = source 274 275 return get_source( 276 name=source_name, 277 config=config_dict, 278 streams=streams_list, 279 pip_url=pip_url, 280 use_python=use_python_parsed, 281 ) 282 283 284def _get_noop_destination_config() -> dict[str, Any]: 285 return { 286 "test_destination": { 287 "test_destination_type": "SILENT", 288 } 289 } 290 291 292def _resolve_destination_job( 293 *, 294 destination: str, 295 config: str | None = None, 296 pip_url: str | None = None, 297 use_python: str | None = None, 298) -> Destination: 299 """Resolve the destination job into a configured Destination object. 300 301 Args: 302 destination: The destination name or source reference. 303 If a path is provided, the source will be loaded from the local path. 304 If the destination contains a colon (':'), it will be interpreted as a docker image 305 and tag. 306 config: The path to a configuration file for the named source or destination. 307 pip_url: Optional. A location from which to install the connector. 308 use_python: Optional. Python interpreter specification. 309 """ 310 config_dict = _resolve_config(config) if config else {} 311 use_python_parsed = _parse_use_python(use_python) 312 313 destination_name = _get_connector_name(destination) 314 315 if destination_name == "destination-dev-null" and not config: 316 config_dict = _get_noop_destination_config() 317 318 if _is_docker_image(destination): 319 return get_destination( 320 name=destination_name, 321 docker_image=destination, 322 config=config_dict, 323 pip_url=pip_url, 324 use_python=use_python_parsed, 325 ) 326 327 if destination and (destination.startswith(".") or "/" in destination): 328 # Treat the destination as a path. 329 destination_executable = Path(destination) 330 if not destination_executable.exists(): 331 raise PyAirbyteInputError( 332 message="Destination executable not found.", 333 context={ 334 "destination": destination, 335 }, 336 ) 337 return get_destination( 338 name=destination_executable.stem, 339 local_executable=destination_executable, 340 config=config_dict, 341 pip_url=pip_url, 342 use_python=use_python_parsed, 343 ) 344 345 # else: # Treat the destination as a name. 346 347 return get_destination( 348 name=destination, 349 config=config_dict, 350 pip_url=pip_url, 351 use_python=use_python_parsed, 352 ) 353 354 355@click.command( 356 help=( 357 "Validate the connector has a valid CLI and is able to run `spec`. " 358 "If 'config' is provided, we will also run a `check` on the connector " 359 "with the provided config.\n\n" + CLI_GUIDANCE 360 ), 361) 362@click.option( 363 "--connector", 364 type=str, 365 help="The connector name or a path to the local executable.", 366) 367@click.option( 368 "--pip-url", 369 type=str, 370 help=( 371 "Optional. The location from which to install the connector. " 372 "This can be a anything pip accepts, including: a PyPI package name, a local path, " 373 "a git repository, a git branch ref, etc." 374 ), 375) 376@click.option( 377 "--config", 378 type=str, 379 required=False, 380 help=CONFIG_HELP, 381) 382@click.option( 383 "--use-python", 384 type=str, 385 help=( 386 "Python interpreter specification. Use 'true' for current Python, " 387 "'false' for Docker, a path for specific interpreter, or a version " 388 "string for uv-managed Python (e.g., '3.11', 'python3.12')." 389 ), 390) 391def validate( 392 connector: str | None = None, 393 config: str | None = None, 394 pip_url: str | None = None, 395 use_python: str | None = None, 396) -> None: 397 """CLI command to run a `benchmark` operation.""" 398 if not connector: 399 raise PyAirbyteInputError( 400 message="No connector provided.", 401 ) 402 403 connector_obj: Source | Destination 404 if "source-" in connector: 405 connector_obj = _resolve_source_job( 406 source=connector, 407 config=None, 408 streams=None, 409 pip_url=pip_url, 410 use_python=use_python, 411 ) 412 else: # destination 413 connector_obj = _resolve_destination_job( 414 destination=connector, 415 config=None, 416 pip_url=pip_url, 417 use_python=use_python, 418 ) 419 420 print("Getting `spec` output from connector...", file=sys.stderr) 421 connector_obj.print_config_spec(stderr=True) 422 423 if config: 424 print("Running connector check...") 425 config_dict: dict[str, Any] = _resolve_config(config) 426 connector_obj.set_config(config_dict) 427 connector_obj.check() 428 429 430@click.command() 431@click.option( 432 "--source", 433 type=str, 434 help=( 435 "The source name, with an optional version declaration. " 436 "If the name contains a colon (':'), it will be interpreted as a docker image and tag. " 437 ), 438) 439@click.option( 440 "--streams", 441 type=str, 442 default="*", 443 help=( 444 "A comma-separated list of stream names to select for reading. If set to '*', all streams " 445 "will be selected. Defaults to '*'." 446 ), 447) 448@click.option( 449 "--num-records", 450 type=str, 451 default="5e5", 452 help=( 453 "The number of records to generate for the benchmark. Ignored if a source is provided. " 454 "You can specify the number of records to generate using scientific notation. " 455 "For example, `5e6` will generate 5 million records. By default, 500,000 records will " 456 "be generated (`5e5` records). If underscores are providing within a numeric a string, " 457 "they will be ignored." 458 ), 459) 460@click.option( 461 "--destination", 462 type=str, 463 help=( 464 "The destination name, with an optional version declaration. " 465 "If a path is provided, it will be interpreted as a path to the local executable. " 466 ), 467) 468@click.option( 469 "--config", 470 type=str, 471 help=CONFIG_HELP, 472) 473@click.option( 474 "--use-python", 475 type=str, 476 help=( 477 "Python interpreter specification. Use 'true' for current Python, " 478 "'false' for Docker, a path for specific interpreter, or a version " 479 "string for uv-managed Python (e.g., '3.11', 'python3.12')." 480 ), 481) 482def benchmark( 483 source: str | None = None, 484 streams: str = "*", 485 num_records: int | str = "5e5", # 500,000 records 486 destination: str | None = None, 487 config: str | None = None, 488 use_python: str | None = None, 489) -> None: 490 """CLI command to run a `benchmark` operation. 491 492 You can provide either a source or a destination, but not both. If a destination is being 493 benchmarked, you can use `--num-records` to specify the number of records to generate for the 494 benchmark. 495 496 If a source is being benchmarked, you can provide a configuration file or a job 497 definition file to run the source job. 498 """ 499 if source and destination: 500 raise PyAirbyteInputError( 501 message="For benchmarking, source or destination can be provided, but not both.", 502 ) 503 destination_obj: Destination 504 source_obj: Source 505 506 source_obj = ( 507 _resolve_source_job( 508 source=source, 509 config=config, 510 streams=streams, 511 use_python=use_python, 512 ) 513 if source 514 else get_benchmark_source( 515 num_records=num_records, 516 ) 517 ) 518 destination_obj = ( 519 _resolve_destination_job( 520 destination=destination, 521 config=config, 522 use_python=use_python, 523 ) 524 if destination 525 else get_noop_destination() 526 ) 527 528 click.echo("Running benchmarks...", sys.stderr) 529 destination_obj.write( 530 source_data=source_obj, 531 cache=False, 532 state_cache=False, 533 ) 534 535 536@click.command() 537@click.option( 538 "--source", 539 type=str, 540 help=( 541 "The source name, with an optional version declaration. " 542 "If the name contains a colon (':'), it will be interpreted as a docker image and tag. " 543 ), 544) 545@click.option( 546 "--destination", 547 type=str, 548 help=( 549 "The destination name, with an optional version declaration. " 550 "If a path is provided, it will be interpreted as a path to the local executable. " 551 ), 552) 553@click.option( 554 "--streams", 555 type=str, 556 help=( 557 "A comma-separated list of stream names to select for reading. If set to '*', all streams " 558 "will be selected. Defaults to '*'." 559 ), 560) 561@click.option( 562 "--Sconfig", 563 "source_config", 564 type=str, 565 help="The source config. " + CONFIG_HELP, 566) 567@click.option( 568 "--Dconfig", 569 "destination_config", 570 type=str, 571 help="The destination config. " + CONFIG_HELP, 572) 573@click.option( 574 "--Spip-url", 575 "source_pip_url", 576 type=str, 577 help="Optional pip URL for the source (Python connectors only). " + PIP_URL_HELP, 578) 579@click.option( 580 "--Dpip-url", 581 "destination_pip_url", 582 type=str, 583 help="Optional pip URL for the destination (Python connectors only). " + PIP_URL_HELP, 584) 585@click.option( 586 "--use-python", 587 type=str, 588 help=( 589 "Python interpreter specification. Use 'true' for current Python, " 590 "'false' for Docker, a path for specific interpreter, or a version " 591 "string for uv-managed Python (e.g., '3.11', 'python3.12')." 592 ), 593) 594def sync( 595 *, 596 source: str, 597 source_config: str | None = None, 598 source_pip_url: str | None = None, 599 destination: str, 600 destination_config: str | None = None, 601 destination_pip_url: str | None = None, 602 streams: str | None = None, 603 use_python: str | None = None, 604) -> None: 605 """CLI command to run a `sync` operation. 606 607 Currently, this only supports full refresh syncs. Incremental syncs are not yet supported. 608 Custom catalog syncs are not yet supported. 609 """ 610 destination_obj: Destination 611 source_obj: Source 612 613 source_obj = _resolve_source_job( 614 source=source, 615 config=source_config, 616 streams=streams, 617 pip_url=source_pip_url, 618 use_python=use_python, 619 ) 620 destination_obj = _resolve_destination_job( 621 destination=destination, 622 config=destination_config, 623 pip_url=destination_pip_url, 624 use_python=use_python, 625 ) 626 627 click.echo("Running sync...") 628 destination_obj.write( 629 source_data=source_obj, 630 cache=False, 631 state_cache=False, 632 ) 633 634 635@click.group() 636def cli() -> None: 637 """@private PyAirbyte CLI.""" 638 pass 639 640 641cli.add_command(validate) 642cli.add_command(benchmark) 643cli.add_command(sync) 644 645if __name__ == "__main__": 646 cli()
CLI command to run a benchmark
operation.
CLI command to run a benchmark
operation.
You can provide either a source or a destination, but not both. If a destination is being
benchmarked, you can use --num-records
to specify the number of records to generate for the
benchmark.
If a source is being benchmarked, you can provide a configuration file or a job definition file to run the source job.
CLI command to run a sync
operation.
Currently, this only supports full refresh syncs. Incremental syncs are not yet supported. Custom catalog syncs are not yet supported.