airbyte_ops_mcp.cli.cloud

CLI commands for Airbyte Cloud operations.

Commands:

airbyte-ops cloud connector get-version-info - Get connector version info airbyte-ops cloud connector set-version-override - Set connector version override airbyte-ops cloud connector clear-version-override - Clear connector version override airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file

CLI reference

The commands below are regenerated by poe docs-generate via cyclopts's programmatic docs API; see docs/generate_cli.py.

airbyte-ops cloud COMMAND

Airbyte Cloud operations.

Commands:

  • connection: Connection operations in Airbyte Cloud.
  • connector: Deployed connector operations in Airbyte Cloud.
  • db: Database operations for Airbyte Cloud Prod DB Replica.
  • logs: GCP Cloud Logging operations for Airbyte Cloud.

airbyte-ops cloud connector

Deployed connector operations in Airbyte Cloud.

airbyte-ops cloud connector get-version-info

airbyte-ops cloud connector get-version-info WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE

Get the current version information for a deployed connector.

Parameters:

  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]
  • CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]

airbyte-ops cloud connector set-version-override

airbyte-ops cloud connector set-version-override WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE VERSION REASON ISSUE-URL APPROVAL-COMMENT-URL [ARGS]

Set a version override for a deployed connector.

Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.

The customer_tier_filter gates the operation: the call fails if the actual tier of the workspace's organization does not match. Use ALL to proceed regardless of tier (a warning is shown for sensitive tiers).

Parameters:

  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]
  • CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]
  • VERSION, --version: The semver version string to pin to (e.g., '2.1.5-preview.abc1234'). [required]
  • REASON, --reason: Explanation for the override (min 10 characters). [required]
  • ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]
  • APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]
  • AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).
  • REASON-URL, --reason-url: Optional URL with more context (e.g., issue link).
  • CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]

airbyte-ops cloud connector clear-version-override

airbyte-ops cloud connector clear-version-override WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE ISSUE-URL APPROVAL-COMMENT-URL [ARGS]

Clear a version override from a deployed connector.

Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.

The customer_tier_filter gates the operation: the call fails if the actual tier of the workspace's organization does not match. Use ALL to proceed regardless of tier (a warning is shown for sensitive tiers).

Parameters:

  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]
  • CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]
  • ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]
  • APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]
  • AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).
  • CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]

airbyte-ops cloud connector regression-test

airbyte-ops cloud connector regression-test [ARGS]

Run regression tests on connectors.

This command supports two modes:

Comparison mode (skip_compare=False, default): Runs the specified Airbyte protocol command against both the target (new) and control (baseline) connector versions, then compares the results. This helps identify regressions between versions.

Single-version mode (skip_compare=True): Runs the specified Airbyte protocol command against a single connector and validates the output. No comparison is performed.

Results are written to the output directory and to GitHub Actions outputs if running in CI.

You can provide the test image in three ways:

  1. --test-image: Use a pre-built image from Docker registry
  2. --connector-name: Build the image locally from source code
  3. --connection-id: Auto-detect from an Airbyte Cloud connection

You can provide config/catalog either via file paths OR via a connection_id that fetches them from Airbyte Cloud.

Parameters:

  • SKIP-COMPARE, --skip-compare, --no-skip-compare: If True, skip comparison and run single-version tests only. If False (default), run comparison tests (target vs control). [default: False]
  • TEST-IMAGE, --test-image: Test connector image with tag (e.g., airbyte/source-github:1.0.0). This is the image under test - in comparison mode, it's compared against control_image.
  • CONTROL-IMAGE, --control-image: Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). Ignored if skip_compare=True.
  • CONNECTOR-NAME, --connector-name: Connector name to build image from source (e.g., 'source-pokeapi'). If provided, builds the image locally with tag 'dev'. For comparison tests (default), this builds the target image. For single-version tests (skip_compare=True), this builds the test image.
  • REPO-ROOT, --repo-root: Path to the airbyte repo root. Required if connector_name is provided and the repo cannot be auto-detected.
  • COMMAND, --command: The Airbyte command to run. [choices: spec, check, discover, read] [default: check]
  • CONNECTION-ID, --connection-id: Airbyte Cloud connection ID to fetch config/catalog from. Mutually exclusive with config-path/catalog-path. If provided, test_image/control_image can be auto-detected.
  • CONFIG-PATH, --config-path: Path to the connector config JSON file.
  • CATALOG-PATH, --catalog-path: Path to the configured catalog JSON file (required for read).
  • STATE-PATH, --state-path: Path to the state JSON file (optional for read).
  • OUTPUT-DIR, --output-dir: Directory to store test artifacts. [default: /tmp/regression_test_artifacts]
  • ENABLE-HTTP-METRICS, --enable-http-metrics, --no-enable-http-metrics: Capture HTTP traffic metrics via mitmproxy (experimental). Requires mitmdump to be installed. Only used in comparison mode. [default: False]
  • SELECTED-STREAMS, --selected-streams: Comma-separated list of stream names to include in the read. Only these streams will be included in the configured catalog. This is useful to limit data volume by testing only specific streams.
  • ENABLE-DEBUG-LOGS, --enable-debug-logs, --no-enable-debug-logs: Enable debug-level logging for regression test output. Also passed as LOG_LEVEL=DEBUG to the connector Docker container. [default: False]
  • WITH-STATE, --with-state, --no-state: Fetch and pass the connection's current state to the read command, producing a warm read instead of a cold read. Defaults to True when --connection-id is provided, False otherwise. Has no effect unless the command is read. Ignored when --state-path is explicitly provided.

airbyte-ops cloud connector fetch-connection-config

airbyte-ops cloud connector fetch-connection-config CONNECTION-ID [ARGS]

Fetch connection configuration from Airbyte Cloud to a local file.

This command retrieves the source configuration for a given connection ID and writes it to a JSON file. When --output-path is omitted the file is written to the platform temp directory to avoid accidentally committing secrets to a git repository.

Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET environment variables.

When --with-secrets is specified, the command fetches unmasked secrets from the internal database using the connection-retriever. This additionally requires:

  • An OC issue URL for audit logging (--oc-issue-url)
  • GCP credentials via GCP_PROD_DB_ACCESS_CREDENTIALS env var or gcloud auth application-default login
  • If CI=true: expects cloud-sql-proxy running on localhost, or direct network access to the Cloud SQL instance.

Parameters:

  • CONNECTION-ID, --connection-id: The UUID of the Airbyte Cloud connection. [required]
  • OUTPUT-PATH, --output-path: Path to output file or directory. If directory, writes connection--config.json inside it. Default: platform temp directory (e.g. /tmp/connection--config.json)
  • WITH-SECRETS, --with-secrets, --no-secrets: If set, fetches unmasked secrets from the internal database. Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or gcloud auth application-default login. Must be used with --oc-issue-url. [default: False]
  • OC-ISSUE-URL, --oc-issue-url: OC issue URL for audit logging. Required when using --with-secrets.

airbyte-ops cloud db

Database operations for Airbyte Cloud Prod DB Replica.

airbyte-ops cloud db start-proxy

airbyte-ops cloud db start-proxy [ARGS]

Start the Cloud SQL Proxy for database access.

This command starts the Cloud SQL Auth Proxy to enable connections to the Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.

By default, runs as a daemon (background process). Use --no-daemon to run in foreground mode where you can see logs and stop with Ctrl+C.

Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, which should contain the service account JSON credentials.

After starting the proxy, set these environment variables to use database tools: export USE_CLOUD_SQL_PROXY=1 export DB_PORT={port}

Parameters:

  • PORT, --port: Port for the Cloud SQL Proxy to listen on. [default: 15432]
  • DAEMON, --daemon, --no-daemon: Run as daemon in background (default). Use --no-daemon for foreground. [default: True]

airbyte-ops cloud db stop-proxy

airbyte-ops cloud db stop-proxy

Stop the Cloud SQL Proxy daemon.

This command stops a Cloud SQL Proxy that was started with 'start-proxy'. It reads the PID from the PID file and sends a SIGTERM signal to stop the process.

airbyte-ops cloud connection

Connection operations in Airbyte Cloud.

airbyte-ops cloud connection state

Connection state operations in Airbyte Cloud.

Commands:

  • get: Get the current state for an Airbyte Cloud connection.
  • set: Set the state for an Airbyte Cloud connection.
airbyte-ops cloud connection state get
airbyte-ops cloud connection state get CONNECTION-ID [ARGS]

Get the current state for an Airbyte Cloud connection.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch state for. [required]
  • STREAM, --stream: Optional stream name to filter state for a single stream.
  • NAMESPACE, --namespace: Optional stream namespace to narrow the stream filter.
  • OUTPUT, --output: Path to write the state JSON output to a file.
airbyte-ops cloud connection state set
airbyte-ops cloud connection state set CONNECTION-ID [ARGS]

Set the state for an Airbyte Cloud connection.

State JSON can be provided as a positional argument, via --input , or piped through STDIN.

Uses the safe variant that prevents updates while a sync is running. When --stream is provided, only that stream's state is updated within the existing connection state.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]
  • STATE-JSON, --state-json: The connection state as a JSON string. When --stream is used, this is just the stream's state blob (e.g., '{"cursor": "2024-01-01"}'). Otherwise, must include 'stateType', 'connectionId', and the appropriate state field. Can also be provided via --input or STDIN.
  • STREAM, --stream: Optional stream name to update state for a single stream only.
  • NAMESPACE, --namespace: Optional stream namespace to identify the stream.
  • INPUT, --input: Path to a JSON file containing the state to set.

airbyte-ops cloud connection catalog

Connection catalog operations in Airbyte Cloud.

Commands:

  • get: Get the configured catalog for an Airbyte Cloud connection.
  • set: Set the configured catalog for an Airbyte Cloud connection.
airbyte-ops cloud connection catalog get
airbyte-ops cloud connection catalog get CONNECTION-ID [ARGS]

Get the configured catalog for an Airbyte Cloud connection.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch catalog for. [required]
  • OUTPUT, --output: Path to write the catalog JSON output to a file.
airbyte-ops cloud connection catalog set
airbyte-ops cloud connection catalog set CONNECTION-ID [ARGS]

Set the configured catalog for an Airbyte Cloud connection.

Catalog JSON can be provided as a positional argument, via --input , or piped through STDIN.

WARNING: This replaces the entire configured catalog.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update catalog for. [required]
  • CATALOG-JSON, --catalog-json: The configured catalog as a JSON string. Can also be provided via --input or STDIN.
  • INPUT, --input: Path to a JSON file containing the catalog to set.

airbyte-ops cloud logs

GCP Cloud Logging operations for Airbyte Cloud.

airbyte-ops cloud logs lookup-cloud-backend-error

airbyte-ops cloud logs lookup-cloud-backend-error ERROR-ID [ARGS]

Look up error details from GCP Cloud Logging by error ID.

When an Airbyte Cloud API returns an error response with only an error ID (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command fetches the full stack trace and error details from GCP Cloud Logging.

Requires GCP credentials with Logs Viewer role on the target project. Set up credentials with: gcloud auth application-default login

Parameters:

  • ERROR-ID, --error-id: The error ID (UUID) to search for. This is typically returned in API error responses as {'errorId': '...'} [required]
  • LOOKBACK-DAYS, --lookback-days: Number of days to look back in logs. [default: 7]
  • MIN-SEVERITY-FILTER, --min-severity-filter: Optional minimum severity level to filter logs. [choices: debug, info, notice, warning, error, critical, alert, emergency]
  • RAW, --raw, --no-raw: Output raw JSON instead of formatted text. [default: False]
   1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
   2"""CLI commands for Airbyte Cloud operations.
   3
   4Commands:
   5    airbyte-ops cloud connector get-version-info - Get connector version info
   6    airbyte-ops cloud connector set-version-override - Set connector version override
   7    airbyte-ops cloud connector clear-version-override - Clear connector version override
   8    airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison)
   9    airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file
  10
  11## CLI reference
  12
  13The commands below are regenerated by `poe docs-generate` via cyclopts's
  14programmatic docs API; see `docs/generate_cli.py`.
  15
  16.. include:: ../../../docs/generated/cli/cloud.md
  17   :start-line: 2
  18"""
  19
  20from __future__ import annotations
  21
  22# Hide Python-level members from the pdoc page for this module; the rendered
  23# docs for this CLI group come entirely from the grafted `.. include::` in
  24# the module docstring above.
  25__all__: list[str] = []
  26
  27import json
  28import logging
  29import os
  30import shutil
  31import signal
  32import socket
  33import subprocess
  34import sys
  35import tempfile
  36import time
  37from pathlib import Path
  38from typing import Annotated, Literal
  39
  40import requests
  41import yaml
  42from airbyte.cloud.workspaces import CloudWorkspace
  43from airbyte_cdk.models.connector_metadata import MetadataFile
  44from airbyte_cdk.utils.connector_paths import find_connector_root_from_name
  45from airbyte_cdk.utils.docker import build_connector_image, verify_docker_installation
  46from airbyte_protocol.models import ConfiguredAirbyteCatalog
  47from cyclopts import Parameter
  48
  49from airbyte_ops_mcp.cli._base import App, app
  50from airbyte_ops_mcp.cli._shared import (
  51    exit_with_error,
  52    print_error,
  53    print_json,
  54    print_success,
  55    print_warning,
  56)
  57from airbyte_ops_mcp.cloud_admin.connection_config import fetch_connection_config
  58from airbyte_ops_mcp.constants import (
  59    CLOUD_SQL_INSTANCE,
  60    CLOUD_SQL_PROXY_PID_FILE,
  61    DEFAULT_CLOUD_SQL_PROXY_PORT,
  62    ENV_GCP_PROD_DB_ACCESS_CREDENTIALS,
  63)
  64from airbyte_ops_mcp.gcp_logs import GCPSeverity, fetch_error_logs
  65from airbyte_ops_mcp.mcp.cloud_connector_versions import (
  66    get_cloud_connector_version,
  67    set_cloud_connector_version_override,
  68)
  69from airbyte_ops_mcp.regression_tests.cdk_secrets import get_first_config_from_secrets
  70from airbyte_ops_mcp.regression_tests.ci_output import (
  71    generate_regression_report,
  72    generate_single_version_report,
  73    write_github_output,
  74    write_github_outputs,
  75    write_github_summary,
  76    write_json_output,
  77    write_test_summary,
  78)
  79from airbyte_ops_mcp.regression_tests.config_overrides import (
  80    filter_configured_catalog_file,
  81)
  82from airbyte_ops_mcp.regression_tests.connection_fetcher import (
  83    fetch_connection_data,
  84    fetch_connection_state,
  85    save_connection_data_to_files,
  86)
  87from airbyte_ops_mcp.regression_tests.connection_secret_retriever import (
  88    SecretRetrievalError,
  89    enrich_config_with_secrets,
  90    should_use_secret_retriever,
  91)
  92from airbyte_ops_mcp.regression_tests.connector_runner import (
  93    ConnectorRunner,
  94    ensure_image_available,
  95)
  96from airbyte_ops_mcp.regression_tests.http_metrics import (
  97    MitmproxyManager,
  98    parse_http_dump,
  99)
 100from airbyte_ops_mcp.regression_tests.models import (
 101    Command,
 102    ConnectorUnderTest,
 103    ExecutionInputs,
 104    TargetOrControl,
 105)
 106from airbyte_ops_mcp.telemetry import track_regression_test
 107
 108# Path to connectors directory within the airbyte repo
 109CONNECTORS_SUBDIR = Path("airbyte-integrations") / "connectors"
 110
 111# Create the cloud sub-app
 112cloud_app = App(name="cloud", help="Airbyte Cloud operations.")
 113app.command(cloud_app)
 114
 115# Create the connector sub-app under cloud
 116connector_app = App(
 117    name="connector", help="Deployed connector operations in Airbyte Cloud."
 118)
 119cloud_app.command(connector_app)
 120
 121# Create the db sub-app under cloud
 122db_app = App(name="db", help="Database operations for Airbyte Cloud Prod DB Replica.")
 123cloud_app.command(db_app)
 124
 125# Create the connection sub-app under cloud
 126connection_app = App(name="connection", help="Connection operations in Airbyte Cloud.")
 127cloud_app.command(connection_app)
 128
 129# Create the state sub-app under connection
 130state_app = App(name="state", help="Connection state operations in Airbyte Cloud.")
 131connection_app.command(state_app)
 132
 133# Create the catalog sub-app under connection
 134catalog_app = App(
 135    name="catalog", help="Connection catalog operations in Airbyte Cloud."
 136)
 137connection_app.command(catalog_app)
 138
 139# Create the logs sub-app under cloud
 140logs_app = App(name="logs", help="GCP Cloud Logging operations for Airbyte Cloud.")
 141cloud_app.command(logs_app)
 142
 143
 144@db_app.command(name="start-proxy")
 145def start_proxy(
 146    port: Annotated[
 147        int,
 148        Parameter(help="Port for the Cloud SQL Proxy to listen on."),
 149    ] = DEFAULT_CLOUD_SQL_PROXY_PORT,
 150    daemon: Annotated[
 151        bool,
 152        Parameter(
 153            help="Run as daemon in background (default). Use --no-daemon for foreground."
 154        ),
 155    ] = True,
 156) -> None:
 157    """Start the Cloud SQL Proxy for database access.
 158
 159    This command starts the Cloud SQL Auth Proxy to enable connections to the
 160    Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.
 161
 162    By default, runs as a daemon (background process). Use --no-daemon to run in
 163    foreground mode where you can see logs and stop with Ctrl+C.
 164
 165    Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable,
 166    which should contain the service account JSON credentials.
 167
 168    After starting the proxy, set these environment variables to use database tools:
 169        export USE_CLOUD_SQL_PROXY=1
 170        export DB_PORT={port}
 171
 172    Example:
 173        airbyte-ops cloud db start-proxy
 174        airbyte-ops cloud db start-proxy --port 15432
 175        airbyte-ops cloud db start-proxy --no-daemon
 176    """
 177    # Check if proxy is already running on the requested port (idempotency)
 178    try:
 179        with socket.create_connection(("127.0.0.1", port), timeout=0.5):
 180            # Something is already listening on this port
 181            pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 182            pid_info = ""
 183            if pid_file.exists():
 184                pid_info = f" (PID: {pid_file.read_text().strip()})"
 185            print_success(
 186                f"Cloud SQL Proxy is already running on port {port}{pid_info}"
 187            )
 188            print_success("")
 189            print_success("To use database tools, set these environment variables:")
 190            print_success("  export USE_CLOUD_SQL_PROXY=1")
 191            print_success(f"  export DB_PORT={port}")
 192            return
 193    except (OSError, TimeoutError, ConnectionRefusedError):
 194        pass  # Port not in use, proceed with starting proxy
 195
 196    # Check if cloud-sql-proxy is installed
 197    proxy_path = shutil.which("cloud-sql-proxy")
 198    if not proxy_path:
 199        exit_with_error(
 200            "cloud-sql-proxy not found in PATH. "
 201            "Install it from: https://cloud.google.com/sql/docs/mysql/sql-proxy"
 202        )
 203
 204    # Get credentials from environment
 205    creds_json = os.getenv(ENV_GCP_PROD_DB_ACCESS_CREDENTIALS)
 206    if not creds_json:
 207        exit_with_error(
 208            f"{ENV_GCP_PROD_DB_ACCESS_CREDENTIALS} environment variable is not set. "
 209            "This should contain the GCP service account JSON credentials."
 210        )
 211
 212    # Build the command using --json-credentials to avoid writing to disk
 213    cmd = [
 214        proxy_path,
 215        CLOUD_SQL_INSTANCE,
 216        f"--port={port}",
 217        f"--json-credentials={creds_json}",
 218    ]
 219
 220    print_success(f"Starting Cloud SQL Proxy on port {port}...")
 221    print_success(f"Instance: {CLOUD_SQL_INSTANCE}")
 222    print_success("")
 223    print_success("To use database tools, set these environment variables:")
 224    print_success("  export USE_CLOUD_SQL_PROXY=1")
 225    print_success(f"  export DB_PORT={port}")
 226    print_success("")
 227
 228    if daemon:
 229        # Run in background (daemon mode) with log file for diagnostics
 230        log_file_path = Path("/tmp/airbyte-cloud-sql-proxy.log")
 231        log_file = log_file_path.open("ab")
 232        process = subprocess.Popen(
 233            cmd,
 234            stdout=subprocess.DEVNULL,
 235            stderr=log_file,
 236            start_new_session=True,
 237        )
 238
 239        # Brief wait to verify the process started successfully
 240        time.sleep(0.5)
 241        if process.poll() is not None:
 242            # Process exited immediately - read any error output
 243            log_file.close()
 244            error_output = ""
 245            if log_file_path.exists():
 246                error_output = log_file_path.read_text()[-1000:]  # Last 1000 chars
 247            exit_with_error(
 248                f"Cloud SQL Proxy failed to start (exit code: {process.returncode}).\n"
 249                f"Check logs at {log_file_path}\n"
 250                f"Recent output: {error_output}"
 251            )
 252
 253        # Write PID to file for stop-proxy command
 254        pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 255        pid_file.write_text(str(process.pid))
 256        print_success(f"Cloud SQL Proxy started as daemon (PID: {process.pid})")
 257        print_success(f"Logs: {log_file_path}")
 258        print_success("To stop: airbyte-ops cloud db stop-proxy")
 259    else:
 260        # Run in foreground - replace current process
 261        # Signals (Ctrl+C) will be handled directly by the cloud-sql-proxy process
 262        print_success("Running in foreground. Press Ctrl+C to stop the proxy.")
 263        print_success("")
 264        os.execv(proxy_path, cmd)
 265
 266
 267@db_app.command(name="stop-proxy")
 268def stop_proxy() -> None:
 269    """Stop the Cloud SQL Proxy daemon.
 270
 271    This command stops a Cloud SQL Proxy that was started with 'start-proxy'.
 272    It reads the PID from the PID file and sends a SIGTERM signal to stop the process.
 273
 274    Example:
 275        airbyte-ops cloud db stop-proxy
 276    """
 277    pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 278
 279    if not pid_file.exists():
 280        exit_with_error(
 281            f"PID file not found at {CLOUD_SQL_PROXY_PID_FILE}. "
 282            "No Cloud SQL Proxy daemon appears to be running."
 283        )
 284
 285    pid_str = pid_file.read_text().strip()
 286    if not pid_str.isdigit():
 287        pid_file.unlink()
 288        exit_with_error(f"Invalid PID in {CLOUD_SQL_PROXY_PID_FILE}: {pid_str}")
 289
 290    pid = int(pid_str)
 291
 292    # Check if process is still running
 293    try:
 294        os.kill(pid, 0)  # Signal 0 just checks if process exists
 295    except ProcessLookupError:
 296        pid_file.unlink()
 297        print_success(
 298            f"Cloud SQL Proxy (PID: {pid}) is not running. Cleaned up PID file."
 299        )
 300        return
 301    except PermissionError:
 302        exit_with_error(f"Permission denied to check process {pid}.")
 303
 304    # Send SIGTERM to stop the process
 305    try:
 306        os.kill(pid, signal.SIGTERM)
 307        print_success(f"Sent SIGTERM to Cloud SQL Proxy (PID: {pid}).")
 308    except ProcessLookupError:
 309        print_success(f"Cloud SQL Proxy (PID: {pid}) already stopped.")
 310    except PermissionError:
 311        exit_with_error(f"Permission denied to stop process {pid}.")
 312
 313    # Clean up PID file
 314    pid_file.unlink(missing_ok=True)
 315    print_success("Cloud SQL Proxy stopped.")
 316
 317
 318@connector_app.command(name="get-version-info")
 319def get_version_info(
 320    workspace_id: Annotated[
 321        str,
 322        Parameter(help="The Airbyte Cloud workspace ID."),
 323    ],
 324    connector_id: Annotated[
 325        str,
 326        Parameter(help="The ID of the deployed connector (source or destination)."),
 327    ],
 328    connector_type: Annotated[
 329        Literal["source", "destination"],
 330        Parameter(help="The type of connector."),
 331    ],
 332) -> None:
 333    """Get the current version information for a deployed connector."""
 334    result = get_cloud_connector_version(
 335        workspace_id=workspace_id,
 336        actor_id=connector_id,
 337        actor_type=connector_type,
 338    )
 339    print_json(result.model_dump())
 340
 341
 342@connector_app.command(name="set-version-override")
 343def set_version_override(
 344    workspace_id: Annotated[
 345        str,
 346        Parameter(help="The Airbyte Cloud workspace ID."),
 347    ],
 348    connector_id: Annotated[
 349        str,
 350        Parameter(help="The ID of the deployed connector (source or destination)."),
 351    ],
 352    connector_type: Annotated[
 353        Literal["source", "destination"],
 354        Parameter(help="The type of connector."),
 355    ],
 356    version: Annotated[
 357        str,
 358        Parameter(
 359            help="The semver version string to pin to (e.g., '2.1.5-preview.abc1234')."
 360        ),
 361    ],
 362    reason: Annotated[
 363        str,
 364        Parameter(help="Explanation for the override (min 10 characters)."),
 365    ],
 366    issue_url: Annotated[
 367        str,
 368        Parameter(help="GitHub issue URL providing context for this operation."),
 369    ],
 370    approval_comment_url: Annotated[
 371        str,
 372        Parameter(
 373            help="Slack approval record URL where admin authorized this deployment."
 374        ),
 375    ],
 376    ai_agent_session_url: Annotated[
 377        str | None,
 378        Parameter(
 379            help="URL to AI agent session driving this operation (for auditability)."
 380        ),
 381    ] = None,
 382    reason_url: Annotated[
 383        str | None,
 384        Parameter(help="Optional URL with more context (e.g., issue link)."),
 385    ] = None,
 386    customer_tier_filter: Annotated[
 387        Literal["TIER_0", "TIER_1", "TIER_2", "ALL"],
 388        Parameter(
 389            help=(
 390                "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 391                "The operation will be rejected if the actual customer tier does not match. "
 392                "Defaults to 'TIER_2' (non-sensitive customers)."
 393            ),
 394        ),
 395    ] = "TIER_2",
 396) -> None:
 397    """Set a version override for a deployed connector.
 398
 399    Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and
 400    AIRBYTE_INTERNAL_ADMIN_USER environment variables.
 401
 402    The `customer_tier_filter` gates the operation: the call fails if
 403    the actual tier of the workspace's organization does not match.
 404    Use `ALL` to proceed regardless of tier (a warning is shown for sensitive tiers).
 405    """
 406    admin_user_email = os.environ.get("AIRBYTE_INTERNAL_ADMIN_USER")
 407    result = set_cloud_connector_version_override(
 408        workspace_id=workspace_id,
 409        actor_id=connector_id,
 410        actor_type=connector_type,
 411        version=version,
 412        unset=False,
 413        override_reason=reason,
 414        override_reason_reference_url=reason_url,
 415        admin_user_email=admin_user_email,
 416        issue_url=issue_url,
 417        approval_comment_url=approval_comment_url,
 418        ai_agent_session_url=ai_agent_session_url,
 419        customer_tier_filter=customer_tier_filter,
 420    )
 421    if result.success:
 422        print_success(result.message)
 423    else:
 424        print_error(result.message)
 425    print_json(result.model_dump())
 426
 427
 428@connector_app.command(name="clear-version-override")
 429def clear_version_override(
 430    workspace_id: Annotated[
 431        str,
 432        Parameter(help="The Airbyte Cloud workspace ID."),
 433    ],
 434    connector_id: Annotated[
 435        str,
 436        Parameter(help="The ID of the deployed connector (source or destination)."),
 437    ],
 438    connector_type: Annotated[
 439        Literal["source", "destination"],
 440        Parameter(help="The type of connector."),
 441    ],
 442    issue_url: Annotated[
 443        str,
 444        Parameter(help="GitHub issue URL providing context for this operation."),
 445    ],
 446    approval_comment_url: Annotated[
 447        str,
 448        Parameter(
 449            help="Slack approval record URL where admin authorized this deployment."
 450        ),
 451    ],
 452    ai_agent_session_url: Annotated[
 453        str | None,
 454        Parameter(
 455            help="URL to AI agent session driving this operation (for auditability)."
 456        ),
 457    ] = None,
 458    customer_tier_filter: Annotated[
 459        Literal["TIER_0", "TIER_1", "TIER_2", "ALL"],
 460        Parameter(
 461            help=(
 462                "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 463                "The operation will be rejected if the actual customer tier does not match. "
 464                "Defaults to 'TIER_2' (non-sensitive customers)."
 465            ),
 466        ),
 467    ] = "TIER_2",
 468) -> None:
 469    """Clear a version override from a deployed connector.
 470
 471    Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and
 472    AIRBYTE_INTERNAL_ADMIN_USER environment variables.
 473
 474    The `customer_tier_filter` gates the operation: the call fails if
 475    the actual tier of the workspace's organization does not match.
 476    Use `ALL` to proceed regardless of tier (a warning is shown for sensitive tiers).
 477    """
 478    admin_user_email = os.environ.get("AIRBYTE_INTERNAL_ADMIN_USER")
 479    result = set_cloud_connector_version_override(
 480        workspace_id=workspace_id,
 481        actor_id=connector_id,
 482        actor_type=connector_type,
 483        version=None,
 484        unset=True,
 485        override_reason=None,
 486        override_reason_reference_url=None,
 487        admin_user_email=admin_user_email,
 488        issue_url=issue_url,
 489        approval_comment_url=approval_comment_url,
 490        ai_agent_session_url=ai_agent_session_url,
 491        customer_tier_filter=customer_tier_filter,
 492    )
 493    if result.success:
 494        print_success(result.message)
 495    else:
 496        print_error(result.message)
 497    print_json(result.model_dump())
 498
 499
 500def _load_json_file(file_path: Path) -> dict | None:
 501    """Load a JSON file and return its contents.
 502
 503    Returns None if the file doesn't exist or contains invalid JSON.
 504    """
 505    if not file_path.exists():
 506        return None
 507    try:
 508        return json.loads(file_path.read_text())
 509    except json.JSONDecodeError as e:
 510        print_error(f"Failed to parse JSON in file: {file_path}\nError: {e}")
 511        return None
 512
 513
 514def _run_connector_command(
 515    connector_image: str,
 516    command: Command,
 517    output_dir: Path,
 518    target_or_control: TargetOrControl,
 519    config_path: Path | None = None,
 520    catalog_path: Path | None = None,
 521    state_path: Path | None = None,
 522    proxy_url: str | None = None,
 523    enable_debug_logs: bool = False,
 524) -> dict:
 525    """Run a connector command and return results as a dict.
 526
 527    Args:
 528        connector_image: Full connector image name with tag.
 529        command: The Airbyte command to run.
 530        output_dir: Directory to store output files.
 531        target_or_control: Whether this is target or control version.
 532        config_path: Path to connector config JSON file.
 533        catalog_path: Path to configured catalog JSON file.
 534        state_path: Path to state JSON file.
 535        proxy_url: Optional HTTP proxy URL for traffic capture.
 536        enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container.
 537
 538    Returns:
 539        Dictionary with execution results.
 540    """
 541    connector = ConnectorUnderTest.from_image_name(connector_image, target_or_control)
 542
 543    config = _load_json_file(config_path) if config_path else None
 544    state = _load_json_file(state_path) if state_path else None
 545
 546    configured_catalog = None
 547    if catalog_path and catalog_path.exists():
 548        catalog_json = catalog_path.read_text()
 549        configured_catalog = ConfiguredAirbyteCatalog.parse_raw(catalog_json)
 550
 551    # Pass log level as an environment variable to the connector container
 552    container_env: dict[str, str] = {}
 553    if enable_debug_logs:
 554        container_env["LOG_LEVEL"] = "DEBUG"
 555
 556    execution_inputs = ExecutionInputs(
 557        connector_under_test=connector,
 558        command=command,
 559        output_dir=output_dir,
 560        config=config,
 561        configured_catalog=configured_catalog,
 562        state=state,
 563        environment_variables=container_env or None,
 564    )
 565
 566    runner = ConnectorRunner(execution_inputs, proxy_url=proxy_url)
 567    result = runner.run()
 568
 569    result.save_artifacts(output_dir)
 570
 571    return {
 572        "connector": connector_image,
 573        "command": command.value,
 574        "success": result.success,
 575        "exit_code": result.exit_code,
 576        "stdout_file": str(result.stdout_file_path),
 577        "stderr_file": str(result.stderr_file_path),
 578        "message_counts": {
 579            k.value: v for k, v in result.get_message_count_per_type().items()
 580        },
 581        "record_counts_per_stream": result.get_record_count_per_stream(),
 582    }
 583
 584
 585def _build_connector_image_from_source(
 586    connector_name: str,
 587    repo_root: Path | None = None,
 588    tag: str = "dev",
 589) -> str | None:
 590    """Build a connector image from source code.
 591
 592    Args:
 593        connector_name: Name of the connector (e.g., 'source-pokeapi').
 594        repo_root: Optional path to the airbyte repo root. If not provided,
 595            will attempt to auto-detect from current directory.
 596        tag: Tag to apply to the built image (default: 'dev').
 597
 598    Returns:
 599        The full image name with tag if successful, None if build fails.
 600    """
 601    if not verify_docker_installation():
 602        print_error("Docker is not installed or not running")
 603        return None
 604
 605    try:
 606        connector_directory = find_connector_root_from_name(connector_name)
 607    except FileNotFoundError:
 608        if repo_root:
 609            connector_directory = repo_root / CONNECTORS_SUBDIR / connector_name
 610            if not connector_directory.exists():
 611                print_error(f"Connector directory not found: {connector_directory}")
 612                return None
 613        else:
 614            print_error(
 615                f"Could not find connector '{connector_name}'. "
 616                "Try providing --repo-root to specify the airbyte repo location."
 617            )
 618            return None
 619
 620    metadata_file_path = connector_directory / "metadata.yaml"
 621    if not metadata_file_path.exists():
 622        print_error(f"metadata.yaml not found at {metadata_file_path}")
 623        return None
 624
 625    metadata = MetadataFile.from_file(metadata_file_path)
 626    print_success(f"Building image for connector: {connector_name}")
 627
 628    built_image = build_connector_image(
 629        connector_name=connector_name,
 630        connector_directory=connector_directory,
 631        metadata=metadata,
 632        tag=tag,
 633        no_verify=False,
 634    )
 635    print_success(f"Successfully built image: {built_image}")
 636    return built_image
 637
 638
 639def _fetch_control_image_from_metadata(connector_name: str) -> str | None:
 640    """Fetch the current released connector image from metadata.yaml on main branch.
 641
 642    This fetches the connector's metadata.yaml from the airbyte monorepo's master branch
 643    and extracts the dockerRepository and dockerImageTag to construct the control image.
 644
 645    Args:
 646        connector_name: The connector name (e.g., 'source-github').
 647
 648    Returns:
 649        The full connector image with tag (e.g., 'airbyte/source-github:1.0.0'),
 650        or None if the metadata could not be fetched or parsed.
 651    """
 652    metadata_url = (
 653        f"https://raw.githubusercontent.com/airbytehq/airbyte/master/"
 654        f"airbyte-integrations/connectors/{connector_name}/metadata.yaml"
 655    )
 656    response = requests.get(metadata_url, timeout=30)
 657    if not response.ok:
 658        print_error(
 659            f"Failed to fetch metadata for {connector_name}: "
 660            f"HTTP {response.status_code} from {metadata_url}"
 661        )
 662        return None
 663
 664    metadata = yaml.safe_load(response.text)
 665    if not isinstance(metadata, dict):
 666        print_error(f"Invalid metadata format for {connector_name}: expected dict")
 667        return None
 668
 669    data = metadata.get("data", {})
 670    docker_repository = data.get("dockerRepository")
 671    docker_image_tag = data.get("dockerImageTag")
 672
 673    if not docker_repository or not docker_image_tag:
 674        print_error(
 675            f"Could not find dockerRepository/dockerImageTag in metadata for {connector_name}"
 676        )
 677        return None
 678
 679    return f"{docker_repository}:{docker_image_tag}"
 680
 681
 682def _run_with_optional_http_metrics(
 683    connector_image: str,
 684    command: Command,
 685    output_dir: Path,
 686    target_or_control: TargetOrControl,
 687    enable_http_metrics: bool,
 688    config_path: Path | None,
 689    catalog_path: Path | None,
 690    state_path: Path | None,
 691    enable_debug_logs: bool = False,
 692) -> dict:
 693    """Run a connector command with optional HTTP metrics capture.
 694
 695    When enable_http_metrics is True, starts mitmproxy to capture HTTP traffic.
 696    If mitmproxy fails to start, falls back to running without metrics.
 697
 698    Args:
 699        connector_image: Full connector image name with tag.
 700        command: The Airbyte command to run.
 701        output_dir: Directory to store output files.
 702        target_or_control: Whether this is target or control version.
 703        enable_http_metrics: Whether to capture HTTP metrics via mitmproxy.
 704        config_path: Path to connector config JSON file.
 705        catalog_path: Path to configured catalog JSON file.
 706        state_path: Path to state JSON file.
 707        enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container.
 708
 709    Returns:
 710        Dictionary with execution results, optionally including http_metrics.
 711    """
 712    if not enable_http_metrics:
 713        return _run_connector_command(
 714            connector_image=connector_image,
 715            command=command,
 716            output_dir=output_dir,
 717            target_or_control=target_or_control,
 718            config_path=config_path,
 719            catalog_path=catalog_path,
 720            state_path=state_path,
 721            enable_debug_logs=enable_debug_logs,
 722        )
 723
 724    with MitmproxyManager.start(output_dir) as session:
 725        if session is None:
 726            print_error("Mitmproxy unavailable, running without HTTP metrics")
 727            return _run_connector_command(
 728                connector_image=connector_image,
 729                command=command,
 730                output_dir=output_dir,
 731                target_or_control=target_or_control,
 732                config_path=config_path,
 733                catalog_path=catalog_path,
 734                state_path=state_path,
 735                enable_debug_logs=enable_debug_logs,
 736            )
 737
 738        print_success(f"Started mitmproxy on {session.proxy_url}")
 739        result = _run_connector_command(
 740            connector_image=connector_image,
 741            command=command,
 742            output_dir=output_dir,
 743            target_or_control=target_or_control,
 744            config_path=config_path,
 745            catalog_path=catalog_path,
 746            state_path=state_path,
 747            proxy_url=session.proxy_url,
 748            enable_debug_logs=enable_debug_logs,
 749        )
 750
 751        http_metrics = parse_http_dump(session.dump_file_path)
 752        result["http_metrics"] = {
 753            "flow_count": http_metrics.flow_count,
 754            "duplicate_flow_count": http_metrics.duplicate_flow_count,
 755        }
 756        print_success(
 757            f"Captured {http_metrics.flow_count} HTTP flows "
 758            f"({http_metrics.duplicate_flow_count} duplicates)"
 759        )
 760        return result
 761
 762
 763@connector_app.command(name="regression-test")
 764def regression_test(
 765    skip_compare: Annotated[
 766        bool,
 767        Parameter(
 768            help="If True, skip comparison and run single-version tests only. "
 769            "If False (default), run comparison tests (target vs control)."
 770        ),
 771    ] = False,
 772    test_image: Annotated[
 773        str | None,
 774        Parameter(
 775            help="Test connector image with tag (e.g., airbyte/source-github:1.0.0). "
 776            "This is the image under test - in comparison mode, it's compared against control_image."
 777        ),
 778    ] = None,
 779    control_image: Annotated[
 780        str | None,
 781        Parameter(
 782            help="Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). "
 783            "Ignored if `skip_compare=True`."
 784        ),
 785    ] = None,
 786    connector_name: Annotated[
 787        str | None,
 788        Parameter(
 789            help="Connector name to build image from source (e.g., 'source-pokeapi'). "
 790            "If provided, builds the image locally with tag 'dev'. "
 791            "For comparison tests (default), this builds the target image. "
 792            "For single-version tests (skip_compare=True), this builds the test image."
 793        ),
 794    ] = None,
 795    repo_root: Annotated[
 796        str | None,
 797        Parameter(
 798            help="Path to the airbyte repo root. Required if connector_name is provided "
 799            "and the repo cannot be auto-detected."
 800        ),
 801    ] = None,
 802    command: Annotated[
 803        Literal["spec", "check", "discover", "read"],
 804        Parameter(help="The Airbyte command to run."),
 805    ] = "check",
 806    connection_id: Annotated[
 807        str | None,
 808        Parameter(
 809            help="Airbyte Cloud connection ID to fetch config/catalog from. "
 810            "Mutually exclusive with config-path/catalog-path. "
 811            "If provided, test_image/control_image can be auto-detected."
 812        ),
 813    ] = None,
 814    config_path: Annotated[
 815        str | None,
 816        Parameter(help="Path to the connector config JSON file."),
 817    ] = None,
 818    catalog_path: Annotated[
 819        str | None,
 820        Parameter(help="Path to the configured catalog JSON file (required for read)."),
 821    ] = None,
 822    state_path: Annotated[
 823        str | None,
 824        Parameter(help="Path to the state JSON file (optional for read)."),
 825    ] = None,
 826    output_dir: Annotated[
 827        str,
 828        Parameter(help="Directory to store test artifacts."),
 829    ] = "/tmp/regression_test_artifacts",
 830    enable_http_metrics: Annotated[
 831        bool,
 832        Parameter(
 833            help="Capture HTTP traffic metrics via mitmproxy (experimental). "
 834            "Requires mitmdump to be installed. Only used in comparison mode."
 835        ),
 836    ] = False,
 837    selected_streams: Annotated[
 838        str | None,
 839        Parameter(
 840            help="Comma-separated list of stream names to include in the read. "
 841            "Only these streams will be included in the configured catalog. "
 842            "This is useful to limit data volume by testing only specific streams."
 843        ),
 844    ] = None,
 845    enable_debug_logs: Annotated[
 846        bool,
 847        Parameter(
 848            help="Enable debug-level logging for regression test output. "
 849            "Also passed as `LOG_LEVEL=DEBUG` to the connector Docker container."
 850        ),
 851    ] = False,
 852    with_state: Annotated[
 853        bool | None,
 854        Parameter(
 855            negative="--no-state",
 856            help="Fetch and pass the connection's current state to the read command, "
 857            "producing a warm read instead of a cold read. Defaults to `True` when "
 858            "`--connection-id` is provided, `False` otherwise. Has no effect unless "
 859            "the command is `read`. Ignored when `--state-path` is explicitly provided.",
 860        ),
 861    ] = None,
 862) -> None:
 863    """Run regression tests on connectors.
 864
 865    This command supports two modes:
 866
 867    Comparison mode (skip_compare=False, default):
 868        Runs the specified Airbyte protocol command against both the target (new)
 869        and control (baseline) connector versions, then compares the results.
 870        This helps identify regressions between versions.
 871
 872    Single-version mode (skip_compare=True):
 873        Runs the specified Airbyte protocol command against a single connector
 874        and validates the output. No comparison is performed.
 875
 876    Results are written to the output directory and to GitHub Actions outputs
 877    if running in CI.
 878
 879    You can provide the test image in three ways:
 880    1. --test-image: Use a pre-built image from Docker registry
 881    2. --connector-name: Build the image locally from source code
 882    3. --connection-id: Auto-detect from an Airbyte Cloud connection
 883
 884    You can provide config/catalog either via file paths OR via a connection_id
 885    that fetches them from Airbyte Cloud.
 886    """
 887    # Configure debug logging for the regression test harness when requested
 888    if enable_debug_logs:
 889        logging.basicConfig(
 890            level=logging.DEBUG,
 891            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 892            force=True,
 893        )
 894
 895    output_path = Path(output_dir)
 896    output_path.mkdir(parents=True, exist_ok=True)
 897
 898    cmd = Command(command)
 899
 900    config_file: Path | None = None
 901    catalog_file: Path | None = None
 902    state_file = Path(state_path) if state_path else None
 903
 904    # Resolve the test image (used in both single-version and comparison modes)
 905    resolved_test_image: str | None = test_image
 906    resolved_control_image: str | None = control_image
 907
 908    # Validate conflicting parameters
 909    # Single-version mode: reject comparison-specific parameters
 910    if skip_compare and control_image:
 911        write_github_output("success", False)
 912        write_github_output(
 913            "error", "Cannot specify control_image with skip_compare=True"
 914        )
 915        exit_with_error(
 916            "Cannot specify --control-image with --skip-compare. "
 917            "Control image is only used in comparison mode."
 918        )
 919
 920    # If connector_name is provided, build the image from source
 921    if connector_name:
 922        if resolved_test_image:
 923            write_github_output("success", False)
 924            write_github_output(
 925                "error", "Cannot specify both test_image and connector_name"
 926            )
 927            exit_with_error("Cannot specify both --test-image and --connector-name")
 928
 929        repo_root_path = Path(repo_root) if repo_root else None
 930        built_image = _build_connector_image_from_source(
 931            connector_name=connector_name,
 932            repo_root=repo_root_path,
 933            tag="dev",
 934        )
 935        if not built_image:
 936            write_github_output("success", False)
 937            write_github_output("error", f"Failed to build image for {connector_name}")
 938            exit_with_error(f"Failed to build image for {connector_name}")
 939        resolved_test_image = built_image
 940
 941    if connection_id:
 942        if config_path or catalog_path:
 943            write_github_output("success", False)
 944            write_github_output(
 945                "error", "Cannot specify both connection_id and file paths"
 946            )
 947            exit_with_error(
 948                "Cannot specify both connection_id and config_path/catalog_path"
 949            )
 950
 951        print_success(f"Fetching config/catalog from connection: {connection_id}")
 952        connection_data = fetch_connection_data(connection_id)
 953
 954        # Check if we should retrieve unmasked secrets
 955        if should_use_secret_retriever():
 956            print_success(
 957                "USE_CONNECTION_SECRET_RETRIEVER enabled - enriching config with unmasked secrets..."
 958            )
 959            try:
 960                connection_data = enrich_config_with_secrets(
 961                    connection_data,
 962                    retrieval_reason="Regression test with USE_CONNECTION_SECRET_RETRIEVER=true",
 963                )
 964                print_success("Successfully retrieved unmasked secrets from database")
 965            except SecretRetrievalError as e:
 966                write_github_output("success", False)
 967                write_github_output("error", str(e))
 968                exit_with_error(
 969                    f"{e}\n\n"
 970                    f"This connection cannot be used for regression testing. "
 971                    f"Please use a connection from a non-EU workspace, or use GSM-based "
 972                    f"integration test credentials instead (by omitting --connection-id)."
 973                )
 974
 975        # Resolve with_state default: True when connection_id is provided
 976        resolved_with_state = with_state if with_state is not None else True
 977
 978        if resolved_with_state and not state_file and command == "read":
 979            print_success("Fetching connection state for warm read...")
 980            try:
 981                connection_data.state = fetch_connection_state(connection_data)
 982                if connection_data.state:
 983                    print_success(
 984                        f"Fetched state with {len(connection_data.state)} stream(s)"
 985                    )
 986                else:
 987                    connection_data.state = None
 988                    print_success(
 989                        "No state available for this connection (initial sync)"
 990                    )
 991            except Exception as exc:
 992                print_error(f"Failed to fetch connection state: {exc}")
 993                print_success("Falling back to cold read (no state)")
 994
 995        config_file, catalog_file, state_file_from_conn = save_connection_data_to_files(
 996            connection_data, output_path / "connection_data"
 997        )
 998
 999        if state_file_from_conn and not state_file:
1000            state_file = state_file_from_conn
1001
1002        print_success(
1003            f"Fetched config for source: {connection_data.source_name} "
1004            f"with {len(connection_data.stream_names)} streams"
1005        )
1006
1007        # Auto-detect test/control image from connection if not provided
1008        if not resolved_test_image and connection_data.connector_image:
1009            resolved_test_image = connection_data.connector_image
1010            print_success(f"Auto-detected test image: {resolved_test_image}")
1011
1012        if (
1013            not skip_compare
1014            and not resolved_control_image
1015            and connection_data.connector_image
1016        ):
1017            resolved_control_image = connection_data.connector_image
1018            print_success(f"Auto-detected control image: {resolved_control_image}")
1019    elif config_path:
1020        config_file = Path(config_path)
1021        catalog_file = Path(catalog_path) if catalog_path else None
1022    elif connector_name:
1023        # Fallback: fetch integration test secrets from GSM using PyAirbyte API
1024        print_success(
1025            f"No connection_id or config_path provided. "
1026            f"Attempting to fetch integration test config from GSM for {connector_name}..."
1027        )
1028        gsm_config = get_first_config_from_secrets(connector_name)
1029        if gsm_config:
1030            # Write config to a temp file (not in output_path to avoid artifact upload)
1031            gsm_config_dir = Path(
1032                tempfile.mkdtemp(prefix=f"gsm-config-{connector_name}-")
1033            )
1034            gsm_config_dir.chmod(0o700)
1035            gsm_config_file = gsm_config_dir / "config.json"
1036            gsm_config_file.write_text(json.dumps(gsm_config, indent=2))
1037            gsm_config_file.chmod(0o600)
1038            config_file = gsm_config_file
1039            # Use catalog_path if provided (e.g., generated from discover output)
1040            catalog_file = Path(catalog_path) if catalog_path else None
1041            print_success(
1042                f"Fetched integration test config from GSM for {connector_name}"
1043            )
1044        else:
1045            print_error(
1046                f"Failed to fetch integration test config from GSM for {connector_name}."
1047            )
1048            config_file = None
1049            # Use catalog_path if provided (e.g., generated from discover output)
1050            catalog_file = Path(catalog_path) if catalog_path else None
1051    else:
1052        config_file = None
1053        catalog_file = Path(catalog_path) if catalog_path else None
1054
1055    # Auto-detect control_image from metadata.yaml if connector_name is provided (comparison mode only)
1056    if not skip_compare and not resolved_control_image and connector_name:
1057        resolved_control_image = _fetch_control_image_from_metadata(connector_name)
1058        if resolved_control_image:
1059            print_success(
1060                f"Auto-detected control image from metadata.yaml: {resolved_control_image}"
1061            )
1062
1063    # Validate that we have the required images
1064    if not resolved_test_image:
1065        write_github_output("success", False)
1066        write_github_output("error", "No test image specified")
1067        exit_with_error(
1068            "You must provide one of the following: a test_image, a connector_name "
1069            "to build the image from source, or a connection_id to auto-detect the image."
1070        )
1071
1072    if not skip_compare and not resolved_control_image:
1073        write_github_output("success", False)
1074        write_github_output("error", "No control image specified")
1075        exit_with_error(
1076            "You must provide one of the following: a control_image, a connection_id "
1077            "for a connection that has an associated connector image, or a connector_name "
1078            "to auto-detect the control image from the airbyte repo's metadata.yaml."
1079        )
1080
1081    # Pull images if they weren't just built locally
1082    # If connector_name was provided, we just built the test image locally
1083    if not connector_name and not ensure_image_available(resolved_test_image):
1084        write_github_output("success", False)
1085        write_github_output("error", f"Failed to pull image: {resolved_test_image}")
1086        exit_with_error(f"Failed to pull test image: {resolved_test_image}")
1087
1088    if (
1089        not skip_compare
1090        and resolved_control_image
1091        and not ensure_image_available(resolved_control_image)
1092    ):
1093        write_github_output("success", False)
1094        write_github_output("error", f"Failed to pull image: {resolved_control_image}")
1095        exit_with_error(
1096            f"Failed to pull control connector image: {resolved_control_image}"
1097        )
1098
1099    # Apply selected_streams filter to catalog if requested
1100    if selected_streams and catalog_file:
1101        streams_set = {s.strip() for s in selected_streams.split(",") if s.strip()}
1102        if streams_set:
1103            print_success(
1104                f"Filtering catalog to {len(streams_set)} selected streams: "
1105                f"{', '.join(sorted(streams_set))}"
1106            )
1107            filter_configured_catalog_file(catalog_file, streams_set)
1108
1109    # Upgrade READ → READ_WITH_STATE when a state file is available
1110    if cmd == Command.READ and state_file:
1111        cmd = Command.READ_WITH_STATE
1112        print_success("State available — using read-with-state (warm read)")
1113
1114    # Track telemetry for the regression test
1115    # Extract version from image tag (e.g., "airbyte/source-github:1.0.0" -> "1.0.0")
1116    target_version = (
1117        resolved_test_image.rsplit(":", 1)[-1]
1118        if ":" in resolved_test_image
1119        else "unknown"
1120    )
1121    control_version = None
1122    if resolved_control_image and ":" in resolved_control_image:
1123        control_version = resolved_control_image.rsplit(":", 1)[-1]
1124
1125    # Get tester identity from environment (GitHub Actions sets GITHUB_ACTOR)
1126    tester = os.getenv("GITHUB_ACTOR") or os.getenv("USER")
1127
1128    track_regression_test(
1129        user_id=tester,
1130        connector_image=resolved_test_image,
1131        command=command,
1132        target_version=target_version,
1133        control_version=control_version,
1134        additional_properties={
1135            "connection_id": connection_id,
1136            "skip_compare": skip_compare,
1137            "with_state": cmd == Command.READ_WITH_STATE,
1138        },
1139    )
1140
1141    # Execute the appropriate mode
1142    if skip_compare:
1143        # Single-version mode: run only the connector image
1144        result = _run_connector_command(
1145            connector_image=resolved_test_image,
1146            command=cmd,
1147            output_dir=output_path,
1148            target_or_control=TargetOrControl.TARGET,
1149            config_path=config_file,
1150            catalog_path=catalog_file,
1151            state_path=state_file,
1152            enable_debug_logs=enable_debug_logs,
1153        )
1154
1155        print_json(result)
1156
1157        write_github_outputs(
1158            {
1159                "success": result["success"],
1160                "connector": resolved_test_image,
1161                "command": command,
1162                "exit_code": result["exit_code"],
1163            }
1164        )
1165
1166        write_test_summary(
1167            connector_image=resolved_test_image,
1168            test_type="regression-test",
1169            success=result["success"],
1170            results={
1171                "command": command,
1172                "exit_code": result["exit_code"],
1173                "output_dir": output_dir,
1174            },
1175        )
1176
1177        # Generate report.md with detailed metrics
1178        report_path = generate_single_version_report(
1179            connector_image=resolved_test_image,
1180            command=command,
1181            result=result,
1182            output_dir=output_path,
1183        )
1184        print_success(f"Generated report: {report_path}")
1185
1186        # Write report to GITHUB_STEP_SUMMARY (if env var exists)
1187        write_github_summary(report_path.read_text())
1188
1189        if result["success"]:
1190            print_success(
1191                f"Single-version regression test passed for {resolved_test_image}"
1192            )
1193        else:
1194            print_error(
1195                f"Single-version regression test failed for {resolved_test_image}"
1196            )
1197    else:
1198        # Comparison mode: run both target and control images
1199        target_output = output_path / "target"
1200        control_output = output_path / "control"
1201
1202        target_result = _run_with_optional_http_metrics(
1203            connector_image=resolved_test_image,
1204            command=cmd,
1205            output_dir=target_output,
1206            target_or_control=TargetOrControl.TARGET,
1207            enable_http_metrics=enable_http_metrics,
1208            config_path=config_file,
1209            catalog_path=catalog_file,
1210            state_path=state_file,
1211            enable_debug_logs=enable_debug_logs,
1212        )
1213
1214        control_result = _run_with_optional_http_metrics(
1215            connector_image=resolved_control_image,  # type: ignore[arg-type]
1216            command=cmd,
1217            output_dir=control_output,
1218            target_or_control=TargetOrControl.CONTROL,
1219            enable_http_metrics=enable_http_metrics,
1220            config_path=config_file,
1221            catalog_path=catalog_file,
1222            state_path=state_file,
1223            enable_debug_logs=enable_debug_logs,
1224        )
1225
1226        both_succeeded = target_result["success"] and control_result["success"]
1227        regression_detected = target_result["success"] != control_result["success"]
1228
1229        combined_result = {
1230            "target": target_result,
1231            "control": control_result,
1232            "both_succeeded": both_succeeded,
1233            "regression_detected": regression_detected,
1234        }
1235
1236        print_json(combined_result)
1237
1238        both_failed = not target_result["success"] and not control_result["success"]
1239
1240        write_github_outputs(
1241            {
1242                "success": not regression_detected,
1243                "target_image": resolved_test_image,
1244                "control_image": resolved_control_image,
1245                "command": command,
1246                "target_exit_code": target_result["exit_code"],
1247                "control_exit_code": control_result["exit_code"],
1248                "regression_detected": regression_detected,
1249                "both_failed": both_failed,
1250            }
1251        )
1252
1253        write_json_output("regression_report", combined_result)
1254
1255        report_path = generate_regression_report(
1256            target_image=resolved_test_image,
1257            control_image=resolved_control_image,  # type: ignore[arg-type]
1258            command=command,
1259            target_result=target_result,
1260            control_result=control_result,
1261            output_dir=output_path,
1262        )
1263        print_success(f"Generated regression report: {report_path}")
1264
1265        # Write report to GITHUB_STEP_SUMMARY (if env var exists)
1266        write_github_summary(report_path.read_text())
1267
1268        if regression_detected:
1269            print_error(
1270                f"Regression detected between {resolved_test_image} and {resolved_control_image}"
1271            )
1272        elif both_succeeded:
1273            print_success(
1274                f"Regression test passed for {resolved_test_image} vs {resolved_control_image}"
1275            )
1276        else:
1277            # Both versions failed, but no regression between them was detected; treat this as a
1278            # test environment issue (e.g., expired credentials, transient API errors, rate limiting).
1279            # Exit successfully since no regression between versions was detected.
1280            print_warning(
1281                f"Both versions failed for {resolved_test_image} vs {resolved_control_image}. "
1282                "No regression detected. This may indicate expired credentials or a transient API issue."
1283            )
1284
1285
1286@connector_app.command(name="fetch-connection-config")
1287def fetch_connection_config_cmd(
1288    connection_id: Annotated[
1289        str,
1290        Parameter(help="The UUID of the Airbyte Cloud connection."),
1291    ],
1292    output_path: Annotated[
1293        str | None,
1294        Parameter(
1295            help="Path to output file or directory. "
1296            "If directory, writes connection-<id>-config.json inside it. "
1297            "Default: platform temp directory (e.g. /tmp/connection-<id>-config.json)"
1298        ),
1299    ] = None,
1300    with_secrets: Annotated[
1301        bool,
1302        Parameter(
1303            name="--with-secrets",
1304            negative="--no-secrets",
1305            help="If set, fetches unmasked secrets from the internal database. "
1306            "Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or `gcloud auth application-default login`. "
1307            "Must be used with --oc-issue-url.",
1308        ),
1309    ] = False,
1310    oc_issue_url: Annotated[
1311        str | None,
1312        Parameter(
1313            help="OC issue URL for audit logging. Required when using --with-secrets."
1314        ),
1315    ] = None,
1316) -> None:
1317    """Fetch connection configuration from Airbyte Cloud to a local file.
1318
1319    This command retrieves the source configuration for a given connection ID
1320    and writes it to a JSON file. When `--output-path` is omitted the file is
1321    written to the platform temp directory to avoid accidentally committing
1322    secrets to a git repository.
1323
1324    Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and
1325    AIRBYTE_CLOUD_CLIENT_SECRET environment variables.
1326
1327    When --with-secrets is specified, the command fetches unmasked secrets from
1328    the internal database using the connection-retriever. This additionally requires:
1329    - An OC issue URL for audit logging (--oc-issue-url)
1330    - GCP credentials via `GCP_PROD_DB_ACCESS_CREDENTIALS` env var or `gcloud auth application-default login`
1331    - If `CI=true`: expects `cloud-sql-proxy` running on localhost, or
1332      direct network access to the Cloud SQL instance.
1333    """
1334    path = Path(output_path) if output_path else None
1335    result = fetch_connection_config(
1336        connection_id=connection_id,
1337        output_path=path,
1338        with_secrets=with_secrets,
1339        oc_issue_url=oc_issue_url,
1340    )
1341    if result.success:
1342        print_success(result.message)
1343    else:
1344        print_error(result.message)
1345    print_json(result.model_dump())
1346
1347
1348def _read_json_input(
1349    json_str: str | None,
1350    input_file: str | None,
1351) -> dict:
1352    """Read JSON from a positional arg, --input file, or STDIN (in that priority)."""
1353    if json_str is not None:
1354        return json.loads(json_str)
1355    if input_file is not None:
1356        return json.loads(Path(input_file).read_text())
1357    if not sys.stdin.isatty():
1358        return json.loads(sys.stdin.read())
1359    exit_with_error(
1360        "No JSON input provided. Pass it as a positional argument, "
1361        "via --input <file>, or pipe to STDIN."
1362    )
1363    raise SystemExit(1)  # unreachable, but satisfies type checker
1364
1365
1366@state_app.command(name="get")
1367def get_connection_state(
1368    connection_id: Annotated[
1369        str,
1370        Parameter(help="The connection ID (UUID) to fetch state for."),
1371    ],
1372    stream_name: Annotated[
1373        str | None,
1374        Parameter(
1375            help="Optional stream name to filter state for a single stream.",
1376            name="--stream",
1377        ),
1378    ] = None,
1379    stream_namespace: Annotated[
1380        str | None,
1381        Parameter(
1382            help="Optional stream namespace to narrow the stream filter.",
1383            name="--namespace",
1384        ),
1385    ] = None,
1386    output: Annotated[
1387        str | None,
1388        Parameter(
1389            help="Path to write the state JSON output to a file.",
1390            name="--output",
1391        ),
1392    ] = None,
1393) -> None:
1394    """Get the current state for an Airbyte Cloud connection."""
1395    workspace = CloudWorkspace.from_env()
1396    conn = workspace.get_connection(connection_id)
1397
1398    if stream_name is not None:
1399        stream_state = conn.get_stream_state(
1400            stream_name=stream_name,
1401            stream_namespace=stream_namespace,
1402        )
1403        result = {
1404            "connection_id": connection_id,
1405            "stream_name": stream_name,
1406            "stream_namespace": stream_namespace,
1407            "stream_state": stream_state,
1408        }
1409    else:
1410        result = conn.dump_raw_state()
1411
1412    json_output = json.dumps(result, indent=2, default=str)
1413    if output is not None:
1414        Path(output).write_text(json_output + "\n")
1415        print(f"State written to {output}", file=sys.stderr)
1416    else:
1417        print(json_output)
1418
1419
1420@state_app.command(name="set")
1421def set_connection_state(
1422    connection_id: Annotated[
1423        str,
1424        Parameter(help="The connection ID (UUID) to update state for."),
1425    ],
1426    state_json: Annotated[
1427        str | None,
1428        Parameter(
1429            help="The connection state as a JSON string. When --stream is used, "
1430            'this is just the stream\'s state blob (e.g., \'{"cursor": "2024-01-01"}\').'
1431            " Otherwise, must include 'stateType', 'connectionId', and the appropriate "
1432            "state field. Can also be provided via --input or STDIN."
1433        ),
1434    ] = None,
1435    stream_name: Annotated[
1436        str | None,
1437        Parameter(
1438            help="Optional stream name to update state for a single stream only.",
1439            name="--stream",
1440        ),
1441    ] = None,
1442    stream_namespace: Annotated[
1443        str | None,
1444        Parameter(
1445            help="Optional stream namespace to identify the stream.",
1446            name="--namespace",
1447        ),
1448    ] = None,
1449    input_file: Annotated[
1450        str | None,
1451        Parameter(
1452            help="Path to a JSON file containing the state to set.",
1453            name="--input",
1454        ),
1455    ] = None,
1456) -> None:
1457    """Set the state for an Airbyte Cloud connection.
1458
1459    State JSON can be provided as a positional argument, via --input <file>,
1460    or piped through STDIN.
1461
1462    Uses the safe variant that prevents updates while a sync is running.
1463    When --stream is provided, only that stream's state is updated within
1464    the existing connection state.
1465    """
1466    parsed_state = _read_json_input(state_json, input_file)
1467    workspace = CloudWorkspace.from_env()
1468    conn = workspace.get_connection(connection_id)
1469
1470    if stream_name is not None:
1471        conn.set_stream_state(
1472            stream_name=stream_name,
1473            state_blob_dict=parsed_state,
1474            stream_namespace=stream_namespace,
1475        )
1476    else:
1477        conn.import_raw_state(parsed_state)
1478
1479    result = conn.dump_raw_state()
1480    json_output = json.dumps(result, indent=2, default=str)
1481    print(json_output)
1482
1483
1484@catalog_app.command(name="get")
1485def get_connection_catalog(
1486    connection_id: Annotated[
1487        str,
1488        Parameter(help="The connection ID (UUID) to fetch catalog for."),
1489    ],
1490    output: Annotated[
1491        str | None,
1492        Parameter(
1493            help="Path to write the catalog JSON output to a file.",
1494            name="--output",
1495        ),
1496    ] = None,
1497) -> None:
1498    """Get the configured catalog for an Airbyte Cloud connection."""
1499    workspace = CloudWorkspace.from_env()
1500    conn = workspace.get_connection(connection_id)
1501    result = conn.dump_raw_catalog()
1502    if result is None:
1503        exit_with_error("No configured catalog found for this connection.")
1504        raise SystemExit(1)  # unreachable, but satisfies type checker
1505
1506    json_output = json.dumps(result, indent=2, default=str)
1507    if output is not None:
1508        Path(output).write_text(json_output + "\n")
1509        print(f"Catalog written to {output}", file=sys.stderr)
1510    else:
1511        print(json_output)
1512
1513
1514@catalog_app.command(name="set")
1515def set_connection_catalog(
1516    connection_id: Annotated[
1517        str,
1518        Parameter(help="The connection ID (UUID) to update catalog for."),
1519    ],
1520    catalog_json: Annotated[
1521        str | None,
1522        Parameter(
1523            help="The configured catalog as a JSON string. "
1524            "Can also be provided via --input or STDIN."
1525        ),
1526    ] = None,
1527    input_file: Annotated[
1528        str | None,
1529        Parameter(
1530            help="Path to a JSON file containing the catalog to set.",
1531            name="--input",
1532        ),
1533    ] = None,
1534) -> None:
1535    """Set the configured catalog for an Airbyte Cloud connection.
1536
1537    Catalog JSON can be provided as a positional argument, via --input <file>,
1538    or piped through STDIN.
1539
1540    WARNING: This replaces the entire configured catalog.
1541    """
1542    parsed_catalog = _read_json_input(catalog_json, input_file)
1543    workspace = CloudWorkspace.from_env()
1544    conn = workspace.get_connection(connection_id)
1545    conn.import_raw_catalog(parsed_catalog)
1546
1547    result = conn.dump_raw_catalog()
1548    if result is None:
1549        exit_with_error("Failed to retrieve catalog after import.")
1550        raise SystemExit(1)  # unreachable, but satisfies type checker
1551
1552    json_output = json.dumps(result, indent=2, default=str)
1553    print(json_output)
1554
1555
1556@logs_app.command(name="lookup-cloud-backend-error")
1557def lookup_cloud_backend_error(
1558    error_id: Annotated[
1559        str,
1560        Parameter(
1561            help=(
1562                "The error ID (UUID) to search for. This is typically returned "
1563                "in API error responses as {'errorId': '...'}"
1564            )
1565        ),
1566    ],
1567    lookback_days: Annotated[
1568        int,
1569        Parameter(help="Number of days to look back in logs."),
1570    ] = 7,
1571    min_severity_filter: Annotated[
1572        GCPSeverity | None,
1573        Parameter(
1574            help="Optional minimum severity level to filter logs.",
1575        ),
1576    ] = None,
1577    raw: Annotated[
1578        bool,
1579        Parameter(help="Output raw JSON instead of formatted text."),
1580    ] = False,
1581) -> None:
1582    """Look up error details from GCP Cloud Logging by error ID.
1583
1584    When an Airbyte Cloud API returns an error response with only an error ID
1585    (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command
1586    fetches the full stack trace and error details from GCP Cloud Logging.
1587
1588    Requires GCP credentials with Logs Viewer role on the target project.
1589    Set up credentials with: gcloud auth application-default login
1590    """
1591    print(f"Searching for error ID: {error_id}", file=sys.stderr)
1592    print(f"Lookback days: {lookback_days}", file=sys.stderr)
1593    if min_severity_filter:
1594        print(f"Severity filter: {min_severity_filter}", file=sys.stderr)
1595    print(file=sys.stderr)
1596
1597    result = fetch_error_logs(
1598        error_id=error_id,
1599        lookback_days=lookback_days,
1600        min_severity_filter=min_severity_filter,
1601    )
1602
1603    if raw:
1604        print_json(result.model_dump())
1605        return
1606
1607    print(f"Found {result.total_entries_found} log entries", file=sys.stderr)
1608    print(file=sys.stderr)
1609
1610    if result.payloads:
1611        for i, payload in enumerate(result.payloads):
1612            print(f"=== Log Group {i + 1} ===")
1613            print(f"Timestamp: {payload.timestamp}")
1614            print(f"Severity: {payload.severity}")
1615            if payload.resource.labels.pod_name:
1616                print(f"Pod: {payload.resource.labels.pod_name}")
1617            print(f"Lines: {payload.num_log_lines}")
1618            print()
1619            print(payload.message)
1620            print()
1621    elif result.entries:
1622        print("No grouped payloads, showing raw entries:", file=sys.stderr)
1623        for entry in result.entries:
1624            print(f"[{entry.timestamp}] {entry.severity}: {entry.payload}")
1625    else:
1626        print_error("No log entries found for this error ID.")