airbyte.cli
CLI for PyAirbyte.
The PyAirbyte CLI provides a command-line interface for testing connectors and running benchmarks.
After installing PyAirbyte, the CLI can be invoked with the pyairbyte
CLI executable, or the
shorter pyab
alias.
These are equivalent:
python -m airbyte.cli --help
pyairbyte --help
pyab --help
You can also use pipx
or the fast and powerful uv
tool to run the PyAirbyte CLI
without pre-installing:
# Install `uv` if you haven't already:
brew install uv
# Run the PyAirbyte CLI using `uvx`:
uvx --from=airbyte pyab --help
Example benchmark
Usage:
# PyAirbyte System Benchmark (no-op):
pyab benchmark --num-records=2.4e6
# Source Benchmark:
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}'
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*'
pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields
# Source Benchmark from Docker Image:
pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}'
pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}'
# Destination Benchmark:
pyab benchmark --destination=destination-dev-null --config=/path/to/config.json
# Benchmark a Local Python Source (source-s3):
pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json
# Equivalent to:
LOCAL_EXECUTABLE=$(poetry run which source-s3)
CONFIG_PATH=$(realpath ./secrets/config.json)
pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH
Example validate
Usage:
pyab validate --connector=source-hardcoded-records
pyab validate --connector=source-hardcoded-records --config='{count: 400_000}'
PyAirbyte CLI Guidance
Providing connector configuration:
When providing configuration via --config
, you can providing any of the following:
A path to a configuration file, in yaml or json format.
An inline yaml string, e.g.
--config='{key: value}'
, --config='{key: {nested: value}}'.
When providing an inline yaml string, it is recommended to use single quotes to avoid shell interpolation.
Providing secrets:
You can provide secrets in your configuration file by prefixing the secret value with SECRET:
.
For example, --config='{password: "SECRET:my_password"'} will look for a secret named my_password
in the secret store. By default, PyAirbyte will look for secrets in environment variables and
dotenv (.env) files. If a secret is not found, you'll be prompted to provide the secret value
interactively in the terminal.
It is highly recommended to use secrets when using inline yaml strings, in order to avoid exposing secrets in plain text in the terminal history. Secrets provided interactively will not be echoed to the terminal.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""CLI for PyAirbyte. 3 4The PyAirbyte CLI provides a command-line interface for testing connectors and running benchmarks. 5 6After installing PyAirbyte, the CLI can be invoked with the `pyairbyte` CLI executable, or the 7shorter `pyab` alias. 8 9These are equivalent: 10 11```bash 12python -m airbyte.cli --help 13pyairbyte --help 14pyab --help 15``` 16 17You can also use `pipx` or the fast and powerful `uv` tool to run the PyAirbyte CLI 18without pre-installing: 19 20```bash 21# Install `uv` if you haven't already: 22brew install uv 23 24# Run the PyAirbyte CLI using `uvx`: 25uvx --from=airbyte pyab --help 26``` 27 28Example `benchmark` Usage: 29 30```bash 31# PyAirbyte System Benchmark (no-op): 32pyab benchmark --num-records=2.4e6 33 34# Source Benchmark: 35pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' 36pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*' 37pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields 38 39# Source Benchmark from Docker Image: 40pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}' 41pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}' 42 43# Destination Benchmark: 44pyab benchmark --destination=destination-dev-null --config=/path/to/config.json 45 46# Benchmark a Local Python Source (source-s3): 47pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json 48# Equivalent to: 49LOCAL_EXECUTABLE=$(poetry run which source-s3) 50CONFIG_PATH=$(realpath ./secrets/config.json) 51pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH 52``` 53 54Example `validate` Usage: 55 56```bash 57pyab validate --connector=source-hardcoded-records 58pyab validate --connector=source-hardcoded-records --config='{count: 400_000}' 59``` 60""" 61 62from __future__ import annotations 63 64from pathlib import Path 65from typing import TYPE_CHECKING, Any 66 67import click 68import yaml 69 70from airbyte.destinations.util import get_destination, get_noop_destination 71from airbyte.exceptions import PyAirbyteInputError 72from airbyte.secrets.util import get_secret 73from airbyte.sources.util import get_benchmark_source, get_source 74 75 76if TYPE_CHECKING: 77 from airbyte.destinations.base import Destination 78 from airbyte.sources.base import Source 79 80 81CLI_GUIDANCE = """ 82---------------------- 83 84PyAirbyte CLI Guidance 85 86Providing connector configuration: 87 88When providing configuration via `--config`, you can providing any of the following: 89 901. A path to a configuration file, in yaml or json format. 91 922. An inline yaml string, e.g. `--config='{key: value}'`, --config='{key: {nested: value}}'. 93 94When providing an inline yaml string, it is recommended to use single quotes to avoid shell 95interpolation. 96 97Providing secrets: 98 99You can provide secrets in your configuration file by prefixing the secret value with `SECRET:`. 100For example, --config='{password: "SECRET:my_password"'} will look for a secret named `my_password` 101in the secret store. By default, PyAirbyte will look for secrets in environment variables and 102dotenv (.env) files. If a secret is not found, you'll be prompted to provide the secret value 103interactively in the terminal. 104 105It is highly recommended to use secrets when using inline yaml strings, in order to avoid 106exposing secrets in plain text in the terminal history. Secrets provided interactively will 107not be echoed to the terminal. 108""" 109 110# Add the CLI guidance to the module docstring. 111globals()["__doc__"] = globals().get("__doc__", "") + CLI_GUIDANCE 112 113CONFIG_HELP = ( 114 "Either a path to a configuration file for the named source or destination, " 115 "or an inline yaml string. If providing an inline yaml string, use single quotes " 116 "to avoid shell interpolation. For example, --config='{key: value}' or " 117 "--config='{key: {nested: value}}'. \n" 118 "PyAirbyte secrets can be accessed by prefixing the secret name with 'SECRET:'. " 119 """For example, --config='{password: "SECRET:MY_PASSWORD"}'.""" 120) 121 122PIP_URL_HELP = ( 123 "This can be anything pip accepts, including: a PyPI package name, a local path, " 124 "a git repository, a git branch ref, etc. Use '.' to install from the current local " 125 "directory." 126) 127 128 129def _resolve_config( 130 config: str, 131) -> dict[str, Any]: 132 """Resolve the configuration file into a dictionary.""" 133 134 def _inject_secrets(config_dict: dict[str, Any]) -> None: 135 """Inject secrets into the configuration dictionary.""" 136 for key, value in config_dict.items(): 137 if isinstance(value, dict): 138 _inject_secrets(value) 139 elif isinstance(value, str) and value.startswith("SECRET:"): 140 config_dict[key] = get_secret(value.removeprefix("SECRET:").strip()) 141 142 config_dict: dict[str, Any] 143 if config.startswith("{"): 144 # Treat this as an inline yaml string: 145 config_dict = yaml.safe_load(config) 146 else: 147 # Treat this as a path to a config file: 148 config_path = Path(config) 149 if not config_path.exists(): 150 raise PyAirbyteInputError( 151 message="Config file not found.", 152 input_value=str(config_path), 153 ) 154 config_dict = yaml.safe_load(config_path.read_text(encoding="utf-8")) 155 156 _inject_secrets(config_dict) 157 return config_dict 158 159 160def _is_docker_image(image: str | None) -> bool: 161 """Check if the source or destination is a docker image.""" 162 return image is not None and ":" in image 163 164 165def _is_executable_path(connector_str: str) -> bool: 166 return connector_str.startswith(".") or "/" in connector_str 167 168 169def _get_connector_name(connector: str) -> str: 170 if _is_docker_image(connector): 171 return connector.split(":")[0].split("/")[-1] 172 173 return connector 174 175 176def _resolve_source_job( 177 *, 178 source: str | None = None, 179 config: str | None = None, 180 streams: str | None = None, 181 pip_url: str | None = None, 182) -> Source: 183 """Resolve the source job into a configured Source object. 184 185 Args: 186 source: The source name or source reference. 187 If a path is provided, the source will be loaded from the local path. 188 If the source contains a colon (':'), it will be interpreted as a docker image and tag. 189 config: The path to a configuration file for the named source or destination. 190 streams: A comma-separated list of stream names to select for reading. If set to "*", 191 all streams will be selected. If not provided, all streams will be selected. 192 pip_url: Optional. A location from which to install the connector. 193 """ 194 config_dict = _resolve_config(config) if config else None 195 streams_list: str | list[str] = streams or "*" 196 if isinstance(streams, str) and streams != "*": 197 streams_list = [stream.strip() for stream in streams.split(",")] 198 199 source_obj: Source 200 if source and _is_docker_image(source): 201 source_obj = get_source( 202 name=_get_connector_name(source), 203 docker_image=source, 204 config=config_dict, 205 streams=streams_list, 206 pip_url=pip_url, 207 ) 208 return source_obj 209 210 if source and _is_executable_path(source): 211 # Treat the source as a path. 212 source_executable = Path(source) 213 if not source_executable.exists(): 214 raise PyAirbyteInputError( 215 message="Source executable not found.", 216 context={ 217 "source": source, 218 }, 219 ) 220 source_obj = get_source( 221 name=source_executable.stem, 222 local_executable=source_executable, 223 config=config_dict, 224 streams=streams_list, 225 pip_url=pip_url, 226 ) 227 return source_obj 228 229 if not source or not source.startswith("source-"): 230 raise PyAirbyteInputError( 231 message="Expected a source name, docker image, or path to executable.", 232 input_value=source, 233 ) 234 235 source_name: str = source 236 237 return get_source( 238 name=source_name, 239 config=config_dict, 240 streams=streams_list, 241 pip_url=pip_url, 242 ) 243 244 245def _resolve_destination_job( 246 *, 247 destination: str, 248 config: str | None = None, 249 pip_url: str | None = None, 250) -> Destination: 251 """Resolve the destination job into a configured Destination object. 252 253 Args: 254 destination: The destination name or source reference. 255 If a path is provided, the source will be loaded from the local path. 256 If the destination contains a colon (':'), it will be interpreted as a docker image 257 and tag. 258 config: The path to a configuration file for the named source or destination. 259 pip_url: Optional. A location from which to install the connector. 260 """ 261 config_dict = _resolve_config(config) if config else None 262 263 if destination and (destination.startswith(".") or "/" in destination): 264 # Treat the destination as a path. 265 destination_executable = Path(destination) 266 if not destination_executable.exists(): 267 raise PyAirbyteInputError( 268 message="Destination executable not found.", 269 context={ 270 "destination": destination, 271 }, 272 ) 273 return get_destination( 274 name=destination_executable.stem, 275 local_executable=destination_executable, 276 config=config_dict, 277 pip_url=pip_url, 278 ) 279 280 # else: # Treat the destination as a name. 281 282 return get_destination( 283 name=destination, 284 config=config_dict, 285 pip_url=pip_url, 286 ) 287 288 289@click.command( 290 help=( 291 "Validate the connector has a valid CLI and is able to run `spec`. " 292 "If 'config' is provided, we will also run a `check` on the connector " 293 "with the provided config.\n\n" + CLI_GUIDANCE 294 ), 295) 296@click.option( 297 "--connector", 298 type=str, 299 help="The connector name or a path to the local executable.", 300) 301@click.option( 302 "--pip-url", 303 type=str, 304 help=( 305 "Optional. The location from which to install the connector. " 306 "This can be a anything pip accepts, including: a PyPI package name, a local path, " 307 "a git repository, a git branch ref, etc." 308 ), 309) 310@click.option( 311 "--config", 312 type=str, 313 required=False, 314 help=CONFIG_HELP, 315) 316def validate( 317 connector: str | None = None, 318 config: str | None = None, 319 pip_url: str | None = None, 320) -> None: 321 """CLI command to run a `benchmark` operation.""" 322 if not connector: 323 raise PyAirbyteInputError( 324 message="No connector provided.", 325 ) 326 327 connector_obj: Source | Destination 328 if "source-" in connector: 329 connector_obj = _resolve_source_job( 330 source=connector, 331 config=None, 332 streams=None, 333 pip_url=pip_url, 334 ) 335 else: # destination 336 connector_obj = _resolve_destination_job( 337 destination=connector, 338 config=None, 339 pip_url=pip_url, 340 ) 341 342 print("Getting `spec` output from connector...") 343 connector_obj.print_config_spec() 344 345 if config: 346 print("Running connector check...") 347 config_dict: dict[str, Any] = _resolve_config(config) 348 connector_obj.set_config(config_dict) 349 connector_obj.check() 350 351 352@click.command() 353@click.option( 354 "--source", 355 type=str, 356 help=( 357 "The source name, with an optional version declaration. " 358 "If the name contains a colon (':'), it will be interpreted as a docker image and tag. " 359 ), 360) 361@click.option( 362 "--streams", 363 type=str, 364 default="*", 365 help=( 366 "A comma-separated list of stream names to select for reading. If set to '*', all streams " 367 "will be selected. Defaults to '*'." 368 ), 369) 370@click.option( 371 "--num-records", 372 type=str, 373 default="5e5", 374 help=( 375 "The number of records to generate for the benchmark. Ignored if a source is provided. " 376 "You can specify the number of records to generate using scientific notation. " 377 "For example, `5e6` will generate 5 million records. By default, 500,000 records will " 378 "be generated (`5e5` records). If underscores are providing within a numeric a string, " 379 "they will be ignored." 380 ), 381) 382@click.option( 383 "--destination", 384 type=str, 385 help=( 386 "The destination name, with an optional version declaration. " 387 "If a path is provided, it will be interpreted as a path to the local executable. " 388 ), 389) 390@click.option( 391 "--config", 392 type=str, 393 help=CONFIG_HELP, 394) 395def benchmark( 396 source: str | None = None, 397 streams: str = "*", 398 num_records: int | str = "5e5", # 500,000 records 399 destination: str | None = None, 400 config: str | None = None, 401) -> None: 402 """CLI command to run a `benchmark` operation. 403 404 You can provide either a source or a destination, but not both. If a destination is being 405 benchmarked, you can use `--num-records` to specify the number of records to generate for the 406 benchmark. 407 408 If a source is being benchmarked, you can provide a configuration file or a job 409 definition file to run the source job. 410 """ 411 if source and destination: 412 raise PyAirbyteInputError( 413 message="For benchmarking, source or destination can be provided, but not both.", 414 ) 415 destination_obj: Destination 416 source_obj: Source 417 418 source_obj = ( 419 _resolve_source_job( 420 source=source, 421 config=config, 422 streams=streams, 423 ) 424 if source 425 else get_benchmark_source( 426 num_records=num_records, 427 ) 428 ) 429 destination_obj = ( 430 _resolve_destination_job( 431 destination=destination, 432 config=config, 433 ) 434 if destination 435 else get_noop_destination() 436 ) 437 438 click.echo("Running benchmarks...") 439 destination_obj.write( 440 source_data=source_obj, 441 cache=False, 442 state_cache=False, 443 ) 444 445 446@click.command() 447@click.option( 448 "--source", 449 type=str, 450 help=( 451 "The source name, with an optional version declaration. " 452 "If the name contains a colon (':'), it will be interpreted as a docker image and tag. " 453 ), 454) 455@click.option( 456 "--destination", 457 type=str, 458 help=( 459 "The destination name, with an optional version declaration. " 460 "If a path is provided, it will be interpreted as a path to the local executable. " 461 ), 462) 463@click.option( 464 "--streams", 465 type=str, 466 help=( 467 "A comma-separated list of stream names to select for reading. If set to '*', all streams " 468 "will be selected. Defaults to '*'." 469 ), 470) 471@click.option( 472 "--Sconfig", 473 "source_config", 474 type=str, 475 help="The source config. " + CONFIG_HELP, 476) 477@click.option( 478 "--Dconfig", 479 "destination_config", 480 type=str, 481 help="The destination config. " + CONFIG_HELP, 482) 483@click.option( 484 "--Spip-url", 485 "source_pip_url", 486 type=str, 487 help="Optional pip URL for the source (Python connectors only). " + PIP_URL_HELP, 488) 489@click.option( 490 "--Dpip-url", 491 "destination_pip_url", 492 type=str, 493 help="Optional pip URL for the destination (Python connectors only). " + PIP_URL_HELP, 494) 495def sync( 496 *, 497 source: str, 498 source_config: str | None = None, 499 source_pip_url: str | None = None, 500 destination: str, 501 destination_config: str | None = None, 502 destination_pip_url: str | None = None, 503 streams: str | None = None, 504) -> None: 505 """CLI command to run a `sync` operation. 506 507 Currently, this only supports full refresh syncs. Incremental syncs are not yet supported. 508 Custom catalog syncs are not yet supported. 509 """ 510 destination_obj: Destination 511 source_obj: Source 512 513 source_obj = _resolve_source_job( 514 source=source, 515 config=source_config, 516 streams=streams, 517 pip_url=source_pip_url, 518 ) 519 destination_obj = _resolve_destination_job( 520 destination=destination, 521 config=destination_config, 522 pip_url=destination_pip_url, 523 ) 524 525 click.echo("Running sync...") 526 destination_obj.write( 527 source_data=source_obj, 528 cache=False, 529 state_cache=False, 530 ) 531 532 533@click.group() 534def cli() -> None: 535 """@private PyAirbyte CLI.""" 536 pass 537 538 539cli.add_command(validate) 540cli.add_command(benchmark) 541cli.add_command(sync) 542 543if __name__ == "__main__": 544 cli()
CLI command to run a benchmark
operation.
CLI command to run a benchmark
operation.
You can provide either a source or a destination, but not both. If a destination is being
benchmarked, you can use --num-records
to specify the number of records to generate for the
benchmark.
If a source is being benchmarked, you can provide a configuration file or a job definition file to run the source job.
CLI command to run a sync
operation.
Currently, this only supports full refresh syncs. Incremental syncs are not yet supported. Custom catalog syncs are not yet supported.