airbyte_ops_mcp.cli.cloud

CLI commands for Airbyte Cloud operations.

Commands:

airbyte-ops cloud connector get-version-info - Get connector version info airbyte-ops cloud connector set-version-override - Set connector version override airbyte-ops cloud connector clear-version-override - Clear connector version override airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file

CLI reference

The commands below are regenerated by poe docs-generate via cyclopts's programmatic docs API; see docs/generate_cli.py.

airbyte-ops cloud COMMAND

Airbyte Cloud operations.

Commands:

  • connection: Connection operations in Airbyte Cloud.
  • connector: Deployed connector operations in Airbyte Cloud.
  • db: Database operations for Airbyte Cloud Prod DB Replica.
  • logs: GCP Cloud Logging operations for Airbyte Cloud.

airbyte-ops cloud connector

Deployed connector operations in Airbyte Cloud.

airbyte-ops cloud connector get-version-info

airbyte-ops cloud connector get-version-info WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE

Get the current version information for a deployed connector.

Parameters:

  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]
  • CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]

airbyte-ops cloud connector set-version-override

airbyte-ops cloud connector set-version-override VERSION REASON ISSUE-URL APPROVAL-COMMENT-URL [ARGS]

Set a version override for a deployed connector.

Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.

The --override-level flag selects the scope at which the pin is applied:

  • actor (default): pins a single deployed connector instance. Requires --workspace-id, --connector-id, and --connector-type.
  • workspace: pins ALL instances of a connector type within a workspace. Requires --workspace-id and --actor-definition-id.
  • organization: pins ALL instances of a connector type across an organization. Requires --organization-id and --actor-definition-id.

The customer_tier_filter gates the operation: the call fails if the actual tier of the target organization does not match. Use ALL to proceed regardless of tier (a warning is shown for sensitive tiers).

Parameters:

  • VERSION, --version: The semver version string to pin to (e.g., '2.1.5-preview.abc1234'). [required]
  • REASON, --reason: Explanation for the override (min 10 characters). [required]
  • ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]
  • APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]
  • OVERRIDE-LEVEL, --override-level: Scope at which to apply the version override: 'actor' (single deployed connector instance), 'workspace' (all instances of a connector type within a workspace), or 'organization' (all instances across an organization). Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]
  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.
  • ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.
  • CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]
  • ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.
  • AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).
  • REASON-URL, --reason-url: Optional URL with more context (e.g., issue link).
  • CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]

airbyte-ops cloud connector clear-version-override

airbyte-ops cloud connector clear-version-override ISSUE-URL APPROVAL-COMMENT-URL [ARGS]

Clear a version override from a deployed connector.

Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.

The --override-level flag selects the scope at which the pin is removed:

  • actor (default): clears a single deployed connector instance pin. Requires --workspace-id, --connector-id, and --connector-type.
  • workspace: clears the workspace-level pin for a connector type. Requires --workspace-id and --actor-definition-id.
  • organization: clears the organization-level pin for a connector type. Requires --organization-id and --actor-definition-id.

Parameters:

  • ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]
  • APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]
  • OVERRIDE-LEVEL, --override-level: Scope at which to clear the version override: 'actor', 'workspace', or 'organization'. Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]
  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.
  • ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.
  • CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]
  • ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.
  • AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).
  • CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]

airbyte-ops cloud connector regression-test

airbyte-ops cloud connector regression-test [ARGS]

Run regression tests on connectors.

This command supports two modes:

Comparison mode (skip_compare=False, default): Runs the specified Airbyte protocol command against both the target (new) and control (baseline) connector versions, then compares the results. This helps identify regressions between versions.

Single-version mode (skip_compare=True): Runs the specified Airbyte protocol command against a single connector and validates the output. No comparison is performed.

Results are written to the output directory and to GitHub Actions outputs if running in CI.

You can provide the test image in three ways:

  1. --test-image: Use a pre-built image from Docker registry
  2. --connector-name: Build the image locally from source code
  3. --connection-id: Auto-detect from an Airbyte Cloud connection

You can provide config/catalog either via file paths OR via a connection_id that fetches them from Airbyte Cloud.

Parameters:

  • SKIP-COMPARE, --skip-compare, --no-skip-compare: If True, skip comparison and run single-version tests only. If False (default), run comparison tests (target vs control). [default: False]
  • TEST-IMAGE, --test-image: Test connector image with tag (e.g., airbyte/source-github:1.0.0). This is the image under test - in comparison mode, it's compared against control_image.
  • CONTROL-IMAGE, --control-image: Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). Ignored if skip_compare=True.
  • CONNECTOR-NAME, --connector-name: Connector name to build image from source (e.g., 'source-pokeapi'). If provided, builds the image locally with tag 'dev'. For comparison tests (default), this builds the target image. For single-version tests (skip_compare=True), this builds the test image.
  • REPO-ROOT, --repo-root: Path to the airbyte repo root. Required if connector_name is provided and the repo cannot be auto-detected.
  • COMMAND, --command: The Airbyte command to run. [choices: spec, check, discover, read] [default: check]
  • CONNECTION-ID, --connection-id: Airbyte Cloud connection ID to fetch config/catalog from. Mutually exclusive with config-path/catalog-path. If provided, test_image/control_image can be auto-detected.
  • CONFIG-PATH, --config-path: Path to the connector config JSON file.
  • CATALOG-PATH, --catalog-path: Path to the configured catalog JSON file (required for read).
  • STATE-PATH, --state-path: Path to the state JSON file (optional for read).
  • OUTPUT-DIR, --output-dir: Directory to store test artifacts. [default: /tmp/regression_test_artifacts]
  • ENABLE-HTTP-METRICS, --enable-http-metrics, --no-enable-http-metrics: Capture HTTP traffic metrics via mitmproxy (experimental). Requires mitmdump to be installed. Only used in comparison mode. [default: False]
  • SELECTED-STREAMS, --selected-streams: Comma-separated list of stream names to include in the read. Only these streams will be included in the configured catalog. This is useful to limit data volume by testing only specific streams.
  • ENABLE-DEBUG-LOGS, --enable-debug-logs, --no-enable-debug-logs: Enable debug-level logging for regression test output. Also passed as LOG_LEVEL=DEBUG to the connector Docker container. [default: False]
  • WITH-STATE, --with-state, --no-state: Fetch and pass the connection's current state to the read command, producing a warm read instead of a cold read. Defaults to True when --connection-id is provided, False otherwise. Has no effect unless the command is read. Ignored when --state-path is explicitly provided.

airbyte-ops cloud connector fetch-connection-config

airbyte-ops cloud connector fetch-connection-config CONNECTION-ID [ARGS]

Fetch connection configuration from Airbyte Cloud to a local file.

This command retrieves the source configuration for a given connection ID and writes it to a JSON file. When --output-path is omitted the file is written to the platform temp directory to avoid accidentally committing secrets to a git repository.

Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET environment variables.

When --with-secrets is specified, the command fetches unmasked secrets from the internal database using the connection-retriever. This additionally requires:

  • An OC issue URL for audit logging (--oc-issue-url)
  • GCP credentials via GCP_PROD_DB_ACCESS_CREDENTIALS env var or gcloud auth application-default login
  • If CI=true: expects cloud-sql-proxy running on localhost, or direct network access to the Cloud SQL instance.

Parameters:

  • CONNECTION-ID, --connection-id: The UUID of the Airbyte Cloud connection. [required]
  • OUTPUT-PATH, --output-path: Path to output file or directory. If directory, writes connection--config.json inside it. Default: platform temp directory (e.g. /tmp/connection--config.json)
  • WITH-SECRETS, --with-secrets, --no-secrets: If set, fetches unmasked secrets from the internal database. Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or gcloud auth application-default login. Must be used with --oc-issue-url. [default: False]
  • OC-ISSUE-URL, --oc-issue-url: OC issue URL for audit logging. Required when using --with-secrets.

airbyte-ops cloud db

Database operations for Airbyte Cloud Prod DB Replica.

airbyte-ops cloud db start-proxy

airbyte-ops cloud db start-proxy [ARGS]

Start the Cloud SQL Proxy for database access.

This command starts the Cloud SQL Auth Proxy to enable connections to the Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.

By default, runs as a daemon (background process). Use --no-daemon to run in foreground mode where you can see logs and stop with Ctrl+C.

Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, which should contain the service account JSON credentials.

After starting the proxy, set these environment variables to use database tools: export USE_CLOUD_SQL_PROXY=1 export DB_PORT={port}

Parameters:

  • PORT, --port: Port for the Cloud SQL Proxy to listen on. [default: 15432]
  • DAEMON, --daemon, --no-daemon: Run as daemon in background (default). Use --no-daemon for foreground. [default: True]

airbyte-ops cloud db stop-proxy

airbyte-ops cloud db stop-proxy

Stop the Cloud SQL Proxy daemon.

This command stops a Cloud SQL Proxy that was started with 'start-proxy'. It reads the PID from the PID file and sends a SIGTERM signal to stop the process.

airbyte-ops cloud connection

Connection operations in Airbyte Cloud.

airbyte-ops cloud connection state

Connection state operations in Airbyte Cloud.

Commands:

  • get: Get the current state for an Airbyte Cloud connection.
  • set: Set the state for an Airbyte Cloud connection.
airbyte-ops cloud connection state get
airbyte-ops cloud connection state get CONNECTION-ID [ARGS]

Get the current state for an Airbyte Cloud connection.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch state for. [required]
  • STREAM, --stream: Optional stream name to filter state for a single stream.
  • NAMESPACE, --namespace: Optional stream namespace to narrow the stream filter.
  • OUTPUT, --output: Path to write the state JSON output to a file.
airbyte-ops cloud connection state set
airbyte-ops cloud connection state set CONNECTION-ID [ARGS]

Set the state for an Airbyte Cloud connection.

State JSON can be provided as a positional argument, via --input , or piped through STDIN.

Uses the safe variant that prevents updates while a sync is running. When --stream is provided, only that stream's state is updated within the existing connection state.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]
  • STATE-JSON, --state-json: The connection state as a JSON string. When --stream is used, this is just the stream's state blob (e.g., '{"cursor": "2024-01-01"}'). Otherwise, must include 'stateType', 'connectionId', and the appropriate state field. Can also be provided via --input or STDIN.
  • STREAM, --stream: Optional stream name to update state for a single stream only.
  • NAMESPACE, --namespace: Optional stream namespace to identify the stream.
  • INPUT, --input: Path to a JSON file containing the state to set.

airbyte-ops cloud connection catalog

Connection catalog operations in Airbyte Cloud.

Commands:

  • get: Get the configured catalog for an Airbyte Cloud connection.
  • set: Set the configured catalog for an Airbyte Cloud connection.
airbyte-ops cloud connection catalog get
airbyte-ops cloud connection catalog get CONNECTION-ID [ARGS]

Get the configured catalog for an Airbyte Cloud connection.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch catalog for. [required]
  • OUTPUT, --output: Path to write the catalog JSON output to a file.
airbyte-ops cloud connection catalog set
airbyte-ops cloud connection catalog set CONNECTION-ID [ARGS]

Set the configured catalog for an Airbyte Cloud connection.

Catalog JSON can be provided as a positional argument, via --input , or piped through STDIN.

WARNING: This replaces the entire configured catalog.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update catalog for. [required]
  • CATALOG-JSON, --catalog-json: The configured catalog as a JSON string. Can also be provided via --input or STDIN.
  • INPUT, --input: Path to a JSON file containing the catalog to set.

airbyte-ops cloud logs

GCP Cloud Logging operations for Airbyte Cloud.

airbyte-ops cloud logs lookup-cloud-backend-error

airbyte-ops cloud logs lookup-cloud-backend-error ERROR-ID [ARGS]

Look up error details from GCP Cloud Logging by error ID.

When an Airbyte Cloud API returns an error response with only an error ID (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command fetches the full stack trace and error details from GCP Cloud Logging.

Requires GCP credentials with Logs Viewer role on the target project. Set up credentials with: gcloud auth application-default login

Parameters:

  • ERROR-ID, --error-id: The error ID (UUID) to search for. This is typically returned in API error responses as {'errorId': '...'} [required]
  • LOOKBACK-DAYS, --lookback-days: Number of days to look back in logs. [default: 7]
  • MIN-SEVERITY-FILTER, --min-severity-filter: Optional minimum severity level to filter logs. [choices: debug, info, notice, warning, error, critical, alert, emergency]
  • RAW, --raw, --no-raw: Output raw JSON instead of formatted text. [default: False]
   1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
   2"""CLI commands for Airbyte Cloud operations.
   3
   4Commands:
   5    airbyte-ops cloud connector get-version-info - Get connector version info
   6    airbyte-ops cloud connector set-version-override - Set connector version override
   7    airbyte-ops cloud connector clear-version-override - Clear connector version override
   8    airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison)
   9    airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file
  10
  11## CLI reference
  12
  13The commands below are regenerated by `poe docs-generate` via cyclopts's
  14programmatic docs API; see `docs/generate_cli.py`.
  15
  16.. include:: ../../../docs/generated/cli/cloud.md
  17   :start-line: 2
  18"""
  19
  20from __future__ import annotations
  21
  22# Hide Python-level members from the pdoc page for this module; the rendered
  23# docs for this CLI group come entirely from the grafted `.. include::` in
  24# the module docstring above.
  25__all__: list[str] = []
  26
  27import json
  28import logging
  29import os
  30import shutil
  31import signal
  32import socket
  33import subprocess
  34import sys
  35import tempfile
  36import time
  37from pathlib import Path
  38from typing import Annotated, Literal
  39
  40import requests
  41import yaml
  42from airbyte.cloud.workspaces import CloudWorkspace
  43from airbyte.exceptions import PyAirbyteInputError
  44from airbyte_cdk.models.connector_metadata import MetadataFile
  45from airbyte_cdk.utils.connector_paths import find_connector_root_from_name
  46from airbyte_cdk.utils.docker import build_connector_image, verify_docker_installation
  47from airbyte_protocol.models import ConfiguredAirbyteCatalog
  48from cyclopts import Parameter
  49
  50from airbyte_ops_mcp.cli._base import App, app
  51from airbyte_ops_mcp.cli._shared import (
  52    exit_with_error,
  53    print_error,
  54    print_json,
  55    print_success,
  56    print_warning,
  57)
  58from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError
  59from airbyte_ops_mcp.cloud_admin.connection_config import fetch_connection_config
  60from airbyte_ops_mcp.cloud_admin.registry_lookup import (
  61    resolve_definition_id_to_canonical_info,
  62)
  63from airbyte_ops_mcp.cloud_admin.version_overrides import (
  64    ResolvedCloudAuth,
  65    get_connector_version_info,
  66    set_actor_version_override,
  67    set_organization_version_override,
  68    set_workspace_version_override,
  69)
  70from airbyte_ops_mcp.constants import (
  71    CLOUD_SQL_INSTANCE,
  72    CLOUD_SQL_PROXY_PID_FILE,
  73    DEFAULT_CLOUD_SQL_PROXY_PORT,
  74    ENV_GCP_PROD_DB_ACCESS_CREDENTIALS,
  75)
  76from airbyte_ops_mcp.gcp_logs import GCPSeverity, fetch_error_logs
  77from airbyte_ops_mcp.regression_tests.cdk_secrets import get_first_config_from_secrets
  78from airbyte_ops_mcp.regression_tests.ci_output import (
  79    generate_regression_report,
  80    generate_single_version_report,
  81    write_github_output,
  82    write_github_outputs,
  83    write_github_summary,
  84    write_json_output,
  85    write_test_summary,
  86)
  87from airbyte_ops_mcp.regression_tests.config_overrides import (
  88    filter_configured_catalog_file,
  89)
  90from airbyte_ops_mcp.regression_tests.connection_fetcher import (
  91    fetch_connection_data,
  92    fetch_connection_state,
  93    save_connection_data_to_files,
  94)
  95from airbyte_ops_mcp.regression_tests.connection_secret_retriever import (
  96    SecretRetrievalError,
  97    enrich_config_with_secrets,
  98    should_use_secret_retriever,
  99)
 100from airbyte_ops_mcp.regression_tests.connector_runner import (
 101    ConnectorRunner,
 102    ensure_image_available,
 103)
 104from airbyte_ops_mcp.regression_tests.http_metrics import (
 105    MitmproxyManager,
 106    parse_http_dump,
 107)
 108from airbyte_ops_mcp.regression_tests.models import (
 109    Command,
 110    ConnectorUnderTest,
 111    ExecutionInputs,
 112    TargetOrControl,
 113)
 114from airbyte_ops_mcp.telemetry import track_regression_test
 115
 116# Path to connectors directory within the airbyte repo
 117CONNECTORS_SUBDIR = Path("airbyte-integrations") / "connectors"
 118
 119# Create the cloud sub-app
 120cloud_app = App(name="cloud", help="Airbyte Cloud operations.")
 121app.command(cloud_app)
 122
 123# Create the connector sub-app under cloud
 124connector_app = App(
 125    name="connector", help="Deployed connector operations in Airbyte Cloud."
 126)
 127cloud_app.command(connector_app)
 128
 129# Create the db sub-app under cloud
 130db_app = App(name="db", help="Database operations for Airbyte Cloud Prod DB Replica.")
 131cloud_app.command(db_app)
 132
 133# Create the connection sub-app under cloud
 134connection_app = App(name="connection", help="Connection operations in Airbyte Cloud.")
 135cloud_app.command(connection_app)
 136
 137# Create the state sub-app under connection
 138state_app = App(name="state", help="Connection state operations in Airbyte Cloud.")
 139connection_app.command(state_app)
 140
 141# Create the catalog sub-app under connection
 142catalog_app = App(
 143    name="catalog", help="Connection catalog operations in Airbyte Cloud."
 144)
 145connection_app.command(catalog_app)
 146
 147# Create the logs sub-app under cloud
 148logs_app = App(name="logs", help="GCP Cloud Logging operations for Airbyte Cloud.")
 149cloud_app.command(logs_app)
 150
 151
 152@db_app.command(name="start-proxy")
 153def start_proxy(
 154    port: Annotated[
 155        int,
 156        Parameter(help="Port for the Cloud SQL Proxy to listen on."),
 157    ] = DEFAULT_CLOUD_SQL_PROXY_PORT,
 158    daemon: Annotated[
 159        bool,
 160        Parameter(
 161            help="Run as daemon in background (default). Use --no-daemon for foreground."
 162        ),
 163    ] = True,
 164) -> None:
 165    """Start the Cloud SQL Proxy for database access.
 166
 167    This command starts the Cloud SQL Auth Proxy to enable connections to the
 168    Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.
 169
 170    By default, runs as a daemon (background process). Use --no-daemon to run in
 171    foreground mode where you can see logs and stop with Ctrl+C.
 172
 173    Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable,
 174    which should contain the service account JSON credentials.
 175
 176    After starting the proxy, set these environment variables to use database tools:
 177        export USE_CLOUD_SQL_PROXY=1
 178        export DB_PORT={port}
 179
 180    Example:
 181        airbyte-ops cloud db start-proxy
 182        airbyte-ops cloud db start-proxy --port 15432
 183        airbyte-ops cloud db start-proxy --no-daemon
 184    """
 185    # Check if proxy is already running on the requested port (idempotency)
 186    try:
 187        with socket.create_connection(("127.0.0.1", port), timeout=0.5):
 188            # Something is already listening on this port
 189            pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 190            pid_info = ""
 191            if pid_file.exists():
 192                pid_info = f" (PID: {pid_file.read_text().strip()})"
 193            print_success(
 194                f"Cloud SQL Proxy is already running on port {port}{pid_info}"
 195            )
 196            print_success("")
 197            print_success("To use database tools, set these environment variables:")
 198            print_success("  export USE_CLOUD_SQL_PROXY=1")
 199            print_success(f"  export DB_PORT={port}")
 200            return
 201    except (OSError, TimeoutError, ConnectionRefusedError):
 202        pass  # Port not in use, proceed with starting proxy
 203
 204    # Check if cloud-sql-proxy is installed
 205    proxy_path = shutil.which("cloud-sql-proxy")
 206    if not proxy_path:
 207        exit_with_error(
 208            "cloud-sql-proxy not found in PATH. "
 209            "Install it from: https://cloud.google.com/sql/docs/mysql/sql-proxy"
 210        )
 211
 212    # Get credentials from environment
 213    creds_json = os.getenv(ENV_GCP_PROD_DB_ACCESS_CREDENTIALS)
 214    if not creds_json:
 215        exit_with_error(
 216            f"{ENV_GCP_PROD_DB_ACCESS_CREDENTIALS} environment variable is not set. "
 217            "This should contain the GCP service account JSON credentials."
 218        )
 219
 220    # Build the command using --json-credentials to avoid writing to disk
 221    cmd = [
 222        proxy_path,
 223        CLOUD_SQL_INSTANCE,
 224        f"--port={port}",
 225        f"--json-credentials={creds_json}",
 226    ]
 227
 228    print_success(f"Starting Cloud SQL Proxy on port {port}...")
 229    print_success(f"Instance: {CLOUD_SQL_INSTANCE}")
 230    print_success("")
 231    print_success("To use database tools, set these environment variables:")
 232    print_success("  export USE_CLOUD_SQL_PROXY=1")
 233    print_success(f"  export DB_PORT={port}")
 234    print_success("")
 235
 236    if daemon:
 237        # Run in background (daemon mode) with log file for diagnostics
 238        log_file_path = Path("/tmp/airbyte-cloud-sql-proxy.log")
 239        log_file = log_file_path.open("ab")
 240        process = subprocess.Popen(
 241            cmd,
 242            stdout=subprocess.DEVNULL,
 243            stderr=log_file,
 244            start_new_session=True,
 245        )
 246
 247        # Brief wait to verify the process started successfully
 248        time.sleep(0.5)
 249        if process.poll() is not None:
 250            # Process exited immediately - read any error output
 251            log_file.close()
 252            error_output = ""
 253            if log_file_path.exists():
 254                error_output = log_file_path.read_text()[-1000:]  # Last 1000 chars
 255            exit_with_error(
 256                f"Cloud SQL Proxy failed to start (exit code: {process.returncode}).\n"
 257                f"Check logs at {log_file_path}\n"
 258                f"Recent output: {error_output}"
 259            )
 260
 261        # Write PID to file for stop-proxy command
 262        pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 263        pid_file.write_text(str(process.pid))
 264        print_success(f"Cloud SQL Proxy started as daemon (PID: {process.pid})")
 265        print_success(f"Logs: {log_file_path}")
 266        print_success("To stop: airbyte-ops cloud db stop-proxy")
 267    else:
 268        # Run in foreground - replace current process
 269        # Signals (Ctrl+C) will be handled directly by the cloud-sql-proxy process
 270        print_success("Running in foreground. Press Ctrl+C to stop the proxy.")
 271        print_success("")
 272        os.execv(proxy_path, cmd)
 273
 274
 275@db_app.command(name="stop-proxy")
 276def stop_proxy() -> None:
 277    """Stop the Cloud SQL Proxy daemon.
 278
 279    This command stops a Cloud SQL Proxy that was started with 'start-proxy'.
 280    It reads the PID from the PID file and sends a SIGTERM signal to stop the process.
 281
 282    Example:
 283        airbyte-ops cloud db stop-proxy
 284    """
 285    pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 286
 287    if not pid_file.exists():
 288        exit_with_error(
 289            f"PID file not found at {CLOUD_SQL_PROXY_PID_FILE}. "
 290            "No Cloud SQL Proxy daemon appears to be running."
 291        )
 292
 293    pid_str = pid_file.read_text().strip()
 294    if not pid_str.isdigit():
 295        pid_file.unlink()
 296        exit_with_error(f"Invalid PID in {CLOUD_SQL_PROXY_PID_FILE}: {pid_str}")
 297
 298    pid = int(pid_str)
 299
 300    # Check if process is still running
 301    try:
 302        os.kill(pid, 0)  # Signal 0 just checks if process exists
 303    except ProcessLookupError:
 304        pid_file.unlink()
 305        print_success(
 306            f"Cloud SQL Proxy (PID: {pid}) is not running. Cleaned up PID file."
 307        )
 308        return
 309    except PermissionError:
 310        exit_with_error(f"Permission denied to check process {pid}.")
 311
 312    # Send SIGTERM to stop the process
 313    try:
 314        os.kill(pid, signal.SIGTERM)
 315        print_success(f"Sent SIGTERM to Cloud SQL Proxy (PID: {pid}).")
 316    except ProcessLookupError:
 317        print_success(f"Cloud SQL Proxy (PID: {pid}) already stopped.")
 318    except PermissionError:
 319        exit_with_error(f"Permission denied to stop process {pid}.")
 320
 321    # Clean up PID file
 322    pid_file.unlink(missing_ok=True)
 323    print_success("Cloud SQL Proxy stopped.")
 324
 325
 326@connector_app.command(name="get-version-info")
 327def get_version_info(
 328    workspace_id: Annotated[
 329        str,
 330        Parameter(help="The Airbyte Cloud workspace ID."),
 331    ],
 332    connector_id: Annotated[
 333        str,
 334        Parameter(help="The ID of the deployed connector (source or destination)."),
 335    ],
 336    connector_type: Annotated[
 337        Literal["source", "destination"],
 338        Parameter(help="The type of connector."),
 339    ],
 340) -> None:
 341    """Get the current version information for a deployed connector."""
 342    result = get_connector_version_info(
 343        auth=_resolve_cli_cloud_auth(),
 344        workspace_id=workspace_id,
 345        actor_id=connector_id,
 346        actor_type=connector_type,
 347    )
 348    print_json(result.model_dump())
 349
 350
 351_OverrideLevel = Literal["actor", "workspace", "organization"]
 352_TierFilter = Literal["TIER_0", "TIER_1", "TIER_2", "ALL"]
 353
 354
 355def _resolve_cli_cloud_auth() -> ResolvedCloudAuth:
 356    """Resolve Airbyte Cloud auth from environment variables for CLI use.
 357
 358    Mirrors the priority order the MCP server uses:
 359
 360    1. `AIRBYTE_CLOUD_BEARER_TOKEN`
 361    2. `AIRBYTE_CLOUD_CLIENT_ID` + `AIRBYTE_CLOUD_CLIENT_SECRET`
 362
 363    Raises a CLI error via `exit_with_error` if no usable credentials are present.
 364    """
 365    bearer_token = os.environ.get("AIRBYTE_CLOUD_BEARER_TOKEN")
 366    if bearer_token:
 367        return ResolvedCloudAuth(bearer_token=bearer_token)
 368
 369    client_id = os.environ.get("AIRBYTE_CLOUD_CLIENT_ID")
 370    client_secret = os.environ.get("AIRBYTE_CLOUD_CLIENT_SECRET")
 371    if client_id and client_secret:
 372        return ResolvedCloudAuth(client_id=client_id, client_secret=client_secret)
 373
 374    exit_with_error(
 375        "Missing Airbyte Cloud credentials. Set AIRBYTE_CLOUD_BEARER_TOKEN, or "
 376        "both AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET."
 377    )
 378
 379
 380def _validate_override_scope_args(
 381    *,
 382    override_level: _OverrideLevel,
 383    workspace_id: str | None,
 384    organization_id: str | None,
 385    connector_id: str | None,
 386    connector_type: Literal["source", "destination"] | None,
 387    actor_definition_id: str | None,
 388) -> None:
 389    """Validate scope-specific arguments for `set/clear-version-override`.
 390
 391    Calls `exit_with_error` (which terminates the process) if the supplied
 392    arguments are inconsistent with the requested `override_level`.
 393    """
 394    if override_level == "actor":
 395        missing = [
 396            name
 397            for name, value in (
 398                ("--workspace-id", workspace_id),
 399                ("--connector-id", connector_id),
 400                ("--connector-type", connector_type),
 401            )
 402            if not value
 403        ]
 404        if missing:
 405            exit_with_error(
 406                "Override level 'actor' requires " + ", ".join(missing) + "."
 407            )
 408        if actor_definition_id is not None:
 409            exit_with_error(
 410                "Override level 'actor' must not be combined with "
 411                "--actor-definition-id."
 412            )
 413        if organization_id is not None:
 414            exit_with_error(
 415                "Override level 'actor' must not be combined with --organization-id."
 416            )
 417        return
 418
 419    if override_level == "workspace":
 420        missing = [
 421            name
 422            for name, value in (
 423                ("--workspace-id", workspace_id),
 424                ("--actor-definition-id", actor_definition_id),
 425            )
 426            if not value
 427        ]
 428        if missing:
 429            exit_with_error(
 430                "Override level 'workspace' requires " + ", ".join(missing) + "."
 431            )
 432        if connector_id is not None or connector_type is not None:
 433            exit_with_error(
 434                "Override level 'workspace' must not be combined with "
 435                "--connector-id or --connector-type."
 436            )
 437        if organization_id is not None:
 438            exit_with_error(
 439                "Override level 'workspace' must not be combined with "
 440                "--organization-id."
 441            )
 442        return
 443
 444    # override_level == "organization"
 445    missing = [
 446        name
 447        for name, value in (
 448            ("--organization-id", organization_id),
 449            ("--actor-definition-id", actor_definition_id),
 450        )
 451        if not value
 452    ]
 453    if missing:
 454        exit_with_error(
 455            "Override level 'organization' requires " + ", ".join(missing) + "."
 456        )
 457    if connector_id is not None or connector_type is not None:
 458        exit_with_error(
 459            "Override level 'organization' must not be combined with "
 460            "--connector-id or --connector-type."
 461        )
 462    if workspace_id is not None:
 463        exit_with_error(
 464            "Override level 'organization' must not be combined with --workspace-id."
 465        )
 466
 467
 468def _dispatch_version_override(
 469    *,
 470    override_level: _OverrideLevel,
 471    workspace_id: str | None,
 472    organization_id: str | None,
 473    connector_id: str | None,
 474    connector_type: Literal["source", "destination"] | None,
 475    actor_definition_id: str | None,
 476    version: str | None,
 477    unset: bool,
 478    reason: str | None,
 479    reason_url: str | None,
 480    issue_url: str,
 481    approval_comment_url: str,
 482    ai_agent_session_url: str | None,
 483    customer_tier_filter: _TierFilter,
 484) -> None:
 485    """Route a version-override request to the helper for `override_level`.
 486
 487    Resolves `--actor-definition-id` to its canonical name and connector type
 488    via the cloud registry for `workspace`/`organization` scopes, then calls
 489    the appropriate MCP-tool function. Prints the operation result as JSON.
 490    """
 491    _validate_override_scope_args(
 492        override_level=override_level,
 493        workspace_id=workspace_id,
 494        organization_id=organization_id,
 495        connector_id=connector_id,
 496        connector_type=connector_type,
 497        actor_definition_id=actor_definition_id,
 498    )
 499
 500    auth = _resolve_cli_cloud_auth()
 501
 502    if override_level == "actor":
 503        assert workspace_id is not None
 504        assert connector_id is not None
 505        assert connector_type is not None
 506        try:
 507            actor_result = set_actor_version_override(
 508                auth=auth,
 509                workspace_id=workspace_id,
 510                actor_id=connector_id,
 511                actor_type=connector_type,
 512                approval_comment_url=approval_comment_url,
 513                version=version,
 514                unset=unset,
 515                override_reason=reason,
 516                override_reason_reference_url=reason_url,
 517                issue_url=issue_url,
 518                ai_agent_session_url=ai_agent_session_url,
 519                customer_tier_filter=customer_tier_filter,
 520            )
 521        except (PyAirbyteInputError, CloudAuthError) as e:
 522            exit_with_error(str(e))
 523        _print_override_result(actor_result.success, actor_result.message)
 524        print_json(actor_result.model_dump())
 525        return
 526
 527    assert actor_definition_id is not None
 528    try:
 529        canonical_name, resolved_connector_type = (
 530            resolve_definition_id_to_canonical_info(actor_definition_id)
 531        )
 532    except (PyAirbyteInputError, requests.RequestException) as e:
 533        exit_with_error(f"Failed to resolve --actor-definition-id: {e}")
 534    typed_connector_type: Literal["source", "destination"] = (
 535        "source" if resolved_connector_type == "source" else "destination"
 536    )
 537
 538    if override_level == "workspace":
 539        assert workspace_id is not None
 540        try:
 541            ws_result = set_workspace_version_override(
 542                auth=auth,
 543                workspace_id=workspace_id,
 544                connector_name=canonical_name,
 545                connector_type=typed_connector_type,
 546                approval_comment_url=approval_comment_url,
 547                version=version,
 548                unset=unset,
 549                override_reason=reason,
 550                override_reason_reference_url=reason_url,
 551                issue_url=issue_url,
 552                ai_agent_session_url=ai_agent_session_url,
 553                customer_tier_filter=customer_tier_filter,
 554            )
 555        except (PyAirbyteInputError, CloudAuthError) as e:
 556            exit_with_error(str(e))
 557        _print_override_result(ws_result.success, ws_result.message)
 558        print_json(ws_result.model_dump())
 559        return
 560
 561    # override_level == "organization"
 562    assert organization_id is not None
 563    try:
 564        org_result = set_organization_version_override(
 565            auth=auth,
 566            organization_id=organization_id,
 567            connector_name=canonical_name,
 568            connector_type=typed_connector_type,
 569            approval_comment_url=approval_comment_url,
 570            version=version,
 571            unset=unset,
 572            override_reason=reason,
 573            override_reason_reference_url=reason_url,
 574            issue_url=issue_url,
 575            ai_agent_session_url=ai_agent_session_url,
 576            customer_tier_filter=customer_tier_filter,
 577        )
 578    except (PyAirbyteInputError, CloudAuthError) as e:
 579        exit_with_error(str(e))
 580    _print_override_result(org_result.success, org_result.message)
 581    print_json(org_result.model_dump())
 582
 583
 584def _print_override_result(success: bool, message: str) -> None:
 585    """Emit an override-operation status message via the shared CLI helpers."""
 586    if success:
 587        print_success(message)
 588    else:
 589        print_error(message)
 590
 591
 592@connector_app.command(name="set-version-override")
 593def set_version_override(
 594    version: Annotated[
 595        str,
 596        Parameter(
 597            help="The semver version string to pin to (e.g., '2.1.5-preview.abc1234')."
 598        ),
 599    ],
 600    reason: Annotated[
 601        str,
 602        Parameter(help="Explanation for the override (min 10 characters)."),
 603    ],
 604    issue_url: Annotated[
 605        str,
 606        Parameter(help="GitHub issue URL providing context for this operation."),
 607    ],
 608    approval_comment_url: Annotated[
 609        str,
 610        Parameter(
 611            help="Slack approval record URL where admin authorized this deployment."
 612        ),
 613    ],
 614    override_level: Annotated[
 615        _OverrideLevel,
 616        Parameter(
 617            help=(
 618                "Scope at which to apply the version override: 'actor' (single "
 619                "deployed connector instance), 'workspace' (all instances of a "
 620                "connector type within a workspace), or 'organization' (all "
 621                "instances across an organization). Defaults to 'actor'."
 622            ),
 623        ),
 624    ] = "actor",
 625    workspace_id: Annotated[
 626        str | None,
 627        Parameter(
 628            help=(
 629                "The Airbyte Cloud workspace ID. Required for 'actor' and "
 630                "'workspace' override levels."
 631            )
 632        ),
 633    ] = None,
 634    organization_id: Annotated[
 635        str | None,
 636        Parameter(
 637            help=(
 638                "The Airbyte Cloud organization ID. Required for "
 639                "'organization' override level."
 640            )
 641        ),
 642    ] = None,
 643    connector_id: Annotated[
 644        str | None,
 645        Parameter(
 646            help=(
 647                "The ID of the deployed connector (source or destination). "
 648                "Required for 'actor' override level."
 649            )
 650        ),
 651    ] = None,
 652    connector_type: Annotated[
 653        Literal["source", "destination"] | None,
 654        Parameter(help="The type of connector. Required for 'actor' override level."),
 655    ] = None,
 656    actor_definition_id: Annotated[
 657        str | None,
 658        Parameter(
 659            help=(
 660                "The connector definition UUID. Required for 'workspace' and "
 661                "'organization' override levels."
 662            )
 663        ),
 664    ] = None,
 665    ai_agent_session_url: Annotated[
 666        str | None,
 667        Parameter(
 668            help="URL to AI agent session driving this operation (for auditability)."
 669        ),
 670    ] = None,
 671    reason_url: Annotated[
 672        str | None,
 673        Parameter(help="Optional URL with more context (e.g., issue link)."),
 674    ] = None,
 675    customer_tier_filter: Annotated[
 676        _TierFilter,
 677        Parameter(
 678            help=(
 679                "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 680                "The operation will be rejected if the actual customer tier does not match. "
 681                "Defaults to 'TIER_2' (non-sensitive customers)."
 682            ),
 683        ),
 684    ] = "TIER_2",
 685) -> None:
 686    """Set a version override for a deployed connector.
 687
 688    Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and
 689    `AIRBYTE_INTERNAL_ADMIN_USER` environment variables.
 690
 691    The `--override-level` flag selects the scope at which the pin is applied:
 692
 693    - `actor` (default): pins a single deployed connector instance. Requires
 694      `--workspace-id`, `--connector-id`, and `--connector-type`.
 695    - `workspace`: pins ALL instances of a connector type within a workspace.
 696      Requires `--workspace-id` and `--actor-definition-id`.
 697    - `organization`: pins ALL instances of a connector type across an
 698      organization. Requires `--organization-id` and `--actor-definition-id`.
 699
 700    The `customer_tier_filter` gates the operation: the call fails if the
 701    actual tier of the target organization does not match. Use `ALL` to
 702    proceed regardless of tier (a warning is shown for sensitive tiers).
 703    """
 704    _dispatch_version_override(
 705        override_level=override_level,
 706        workspace_id=workspace_id,
 707        organization_id=organization_id,
 708        connector_id=connector_id,
 709        connector_type=connector_type,
 710        actor_definition_id=actor_definition_id,
 711        version=version,
 712        unset=False,
 713        reason=reason,
 714        reason_url=reason_url,
 715        issue_url=issue_url,
 716        approval_comment_url=approval_comment_url,
 717        ai_agent_session_url=ai_agent_session_url,
 718        customer_tier_filter=customer_tier_filter,
 719    )
 720
 721
 722@connector_app.command(name="clear-version-override")
 723def clear_version_override(
 724    issue_url: Annotated[
 725        str,
 726        Parameter(help="GitHub issue URL providing context for this operation."),
 727    ],
 728    approval_comment_url: Annotated[
 729        str,
 730        Parameter(
 731            help="Slack approval record URL where admin authorized this deployment."
 732        ),
 733    ],
 734    override_level: Annotated[
 735        _OverrideLevel,
 736        Parameter(
 737            help=(
 738                "Scope at which to clear the version override: 'actor', "
 739                "'workspace', or 'organization'. Defaults to 'actor'."
 740            ),
 741        ),
 742    ] = "actor",
 743    workspace_id: Annotated[
 744        str | None,
 745        Parameter(
 746            help=(
 747                "The Airbyte Cloud workspace ID. Required for 'actor' and "
 748                "'workspace' override levels."
 749            )
 750        ),
 751    ] = None,
 752    organization_id: Annotated[
 753        str | None,
 754        Parameter(
 755            help=(
 756                "The Airbyte Cloud organization ID. Required for "
 757                "'organization' override level."
 758            )
 759        ),
 760    ] = None,
 761    connector_id: Annotated[
 762        str | None,
 763        Parameter(
 764            help=(
 765                "The ID of the deployed connector (source or destination). "
 766                "Required for 'actor' override level."
 767            )
 768        ),
 769    ] = None,
 770    connector_type: Annotated[
 771        Literal["source", "destination"] | None,
 772        Parameter(help="The type of connector. Required for 'actor' override level."),
 773    ] = None,
 774    actor_definition_id: Annotated[
 775        str | None,
 776        Parameter(
 777            help=(
 778                "The connector definition UUID. Required for 'workspace' and "
 779                "'organization' override levels."
 780            )
 781        ),
 782    ] = None,
 783    ai_agent_session_url: Annotated[
 784        str | None,
 785        Parameter(
 786            help="URL to AI agent session driving this operation (for auditability)."
 787        ),
 788    ] = None,
 789    customer_tier_filter: Annotated[
 790        _TierFilter,
 791        Parameter(
 792            help=(
 793                "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 794                "The operation will be rejected if the actual customer tier does not match. "
 795                "Defaults to 'TIER_2' (non-sensitive customers)."
 796            ),
 797        ),
 798    ] = "TIER_2",
 799) -> None:
 800    """Clear a version override from a deployed connector.
 801
 802    Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and
 803    `AIRBYTE_INTERNAL_ADMIN_USER` environment variables.
 804
 805    The `--override-level` flag selects the scope at which the pin is removed:
 806
 807    - `actor` (default): clears a single deployed connector instance pin.
 808      Requires `--workspace-id`, `--connector-id`, and `--connector-type`.
 809    - `workspace`: clears the workspace-level pin for a connector type.
 810      Requires `--workspace-id` and `--actor-definition-id`.
 811    - `organization`: clears the organization-level pin for a connector type.
 812      Requires `--organization-id` and `--actor-definition-id`.
 813    """
 814    _dispatch_version_override(
 815        override_level=override_level,
 816        workspace_id=workspace_id,
 817        organization_id=organization_id,
 818        connector_id=connector_id,
 819        connector_type=connector_type,
 820        actor_definition_id=actor_definition_id,
 821        version=None,
 822        unset=True,
 823        reason=None,
 824        reason_url=None,
 825        issue_url=issue_url,
 826        approval_comment_url=approval_comment_url,
 827        ai_agent_session_url=ai_agent_session_url,
 828        customer_tier_filter=customer_tier_filter,
 829    )
 830
 831
 832def _load_json_file(file_path: Path) -> dict | None:
 833    """Load a JSON file and return its contents.
 834
 835    Returns None if the file doesn't exist or contains invalid JSON.
 836    """
 837    if not file_path.exists():
 838        return None
 839    try:
 840        return json.loads(file_path.read_text())
 841    except json.JSONDecodeError as e:
 842        print_error(f"Failed to parse JSON in file: {file_path}\nError: {e}")
 843        return None
 844
 845
 846def _run_connector_command(
 847    connector_image: str,
 848    command: Command,
 849    output_dir: Path,
 850    target_or_control: TargetOrControl,
 851    config_path: Path | None = None,
 852    catalog_path: Path | None = None,
 853    state_path: Path | None = None,
 854    proxy_url: str | None = None,
 855    enable_debug_logs: bool = False,
 856) -> dict:
 857    """Run a connector command and return results as a dict.
 858
 859    Args:
 860        connector_image: Full connector image name with tag.
 861        command: The Airbyte command to run.
 862        output_dir: Directory to store output files.
 863        target_or_control: Whether this is target or control version.
 864        config_path: Path to connector config JSON file.
 865        catalog_path: Path to configured catalog JSON file.
 866        state_path: Path to state JSON file.
 867        proxy_url: Optional HTTP proxy URL for traffic capture.
 868        enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container.
 869
 870    Returns:
 871        Dictionary with execution results.
 872    """
 873    connector = ConnectorUnderTest.from_image_name(connector_image, target_or_control)
 874
 875    config = _load_json_file(config_path) if config_path else None
 876    state = _load_json_file(state_path) if state_path else None
 877
 878    configured_catalog = None
 879    if catalog_path and catalog_path.exists():
 880        catalog_json = catalog_path.read_text()
 881        configured_catalog = ConfiguredAirbyteCatalog.parse_raw(catalog_json)
 882
 883    # Pass log level as an environment variable to the connector container
 884    container_env: dict[str, str] = {}
 885    if enable_debug_logs:
 886        container_env["LOG_LEVEL"] = "DEBUG"
 887
 888    execution_inputs = ExecutionInputs(
 889        connector_under_test=connector,
 890        command=command,
 891        output_dir=output_dir,
 892        config=config,
 893        configured_catalog=configured_catalog,
 894        state=state,
 895        environment_variables=container_env or None,
 896    )
 897
 898    runner = ConnectorRunner(execution_inputs, proxy_url=proxy_url)
 899    result = runner.run()
 900
 901    result.save_artifacts(output_dir)
 902
 903    return {
 904        "connector": connector_image,
 905        "command": command.value,
 906        "success": result.success,
 907        "exit_code": result.exit_code,
 908        "stdout_file": str(result.stdout_file_path),
 909        "stderr_file": str(result.stderr_file_path),
 910        "message_counts": {
 911            k.value: v for k, v in result.get_message_count_per_type().items()
 912        },
 913        "record_counts_per_stream": result.get_record_count_per_stream(),
 914    }
 915
 916
 917def _build_connector_image_from_source(
 918    connector_name: str,
 919    repo_root: Path | None = None,
 920    tag: str = "dev",
 921) -> str | None:
 922    """Build a connector image from source code.
 923
 924    Args:
 925        connector_name: Name of the connector (e.g., 'source-pokeapi').
 926        repo_root: Optional path to the airbyte repo root. If not provided,
 927            will attempt to auto-detect from current directory.
 928        tag: Tag to apply to the built image (default: 'dev').
 929
 930    Returns:
 931        The full image name with tag if successful, None if build fails.
 932    """
 933    if not verify_docker_installation():
 934        print_error("Docker is not installed or not running")
 935        return None
 936
 937    try:
 938        connector_directory = find_connector_root_from_name(connector_name)
 939    except FileNotFoundError:
 940        if repo_root:
 941            connector_directory = repo_root / CONNECTORS_SUBDIR / connector_name
 942            if not connector_directory.exists():
 943                print_error(f"Connector directory not found: {connector_directory}")
 944                return None
 945        else:
 946            print_error(
 947                f"Could not find connector '{connector_name}'. "
 948                "Try providing --repo-root to specify the airbyte repo location."
 949            )
 950            return None
 951
 952    metadata_file_path = connector_directory / "metadata.yaml"
 953    if not metadata_file_path.exists():
 954        print_error(f"metadata.yaml not found at {metadata_file_path}")
 955        return None
 956
 957    metadata = MetadataFile.from_file(metadata_file_path)
 958    print_success(f"Building image for connector: {connector_name}")
 959
 960    built_image = build_connector_image(
 961        connector_name=connector_name,
 962        connector_directory=connector_directory,
 963        metadata=metadata,
 964        tag=tag,
 965        no_verify=False,
 966    )
 967    print_success(f"Successfully built image: {built_image}")
 968    return built_image
 969
 970
 971def _fetch_control_image_from_metadata(connector_name: str) -> str | None:
 972    """Fetch the current released connector image from metadata.yaml on main branch.
 973
 974    This fetches the connector's metadata.yaml from the airbyte monorepo's master branch
 975    and extracts the dockerRepository and dockerImageTag to construct the control image.
 976
 977    Args:
 978        connector_name: The connector name (e.g., 'source-github').
 979
 980    Returns:
 981        The full connector image with tag (e.g., 'airbyte/source-github:1.0.0'),
 982        or None if the metadata could not be fetched or parsed.
 983    """
 984    metadata_url = (
 985        f"https://raw.githubusercontent.com/airbytehq/airbyte/master/"
 986        f"airbyte-integrations/connectors/{connector_name}/metadata.yaml"
 987    )
 988    response = requests.get(metadata_url, timeout=30)
 989    if not response.ok:
 990        print_error(
 991            f"Failed to fetch metadata for {connector_name}: "
 992            f"HTTP {response.status_code} from {metadata_url}"
 993        )
 994        return None
 995
 996    metadata = yaml.safe_load(response.text)
 997    if not isinstance(metadata, dict):
 998        print_error(f"Invalid metadata format for {connector_name}: expected dict")
 999        return None
1000
1001    data = metadata.get("data", {})
1002    docker_repository = data.get("dockerRepository")
1003    docker_image_tag = data.get("dockerImageTag")
1004
1005    if not docker_repository or not docker_image_tag:
1006        print_error(
1007            f"Could not find dockerRepository/dockerImageTag in metadata for {connector_name}"
1008        )
1009        return None
1010
1011    return f"{docker_repository}:{docker_image_tag}"
1012
1013
1014def _run_with_optional_http_metrics(
1015    connector_image: str,
1016    command: Command,
1017    output_dir: Path,
1018    target_or_control: TargetOrControl,
1019    enable_http_metrics: bool,
1020    config_path: Path | None,
1021    catalog_path: Path | None,
1022    state_path: Path | None,
1023    enable_debug_logs: bool = False,
1024) -> dict:
1025    """Run a connector command with optional HTTP metrics capture.
1026
1027    When enable_http_metrics is True, starts mitmproxy to capture HTTP traffic.
1028    If mitmproxy fails to start, falls back to running without metrics.
1029
1030    Args:
1031        connector_image: Full connector image name with tag.
1032        command: The Airbyte command to run.
1033        output_dir: Directory to store output files.
1034        target_or_control: Whether this is target or control version.
1035        enable_http_metrics: Whether to capture HTTP metrics via mitmproxy.
1036        config_path: Path to connector config JSON file.
1037        catalog_path: Path to configured catalog JSON file.
1038        state_path: Path to state JSON file.
1039        enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container.
1040
1041    Returns:
1042        Dictionary with execution results, optionally including http_metrics.
1043    """
1044    if not enable_http_metrics:
1045        return _run_connector_command(
1046            connector_image=connector_image,
1047            command=command,
1048            output_dir=output_dir,
1049            target_or_control=target_or_control,
1050            config_path=config_path,
1051            catalog_path=catalog_path,
1052            state_path=state_path,
1053            enable_debug_logs=enable_debug_logs,
1054        )
1055
1056    with MitmproxyManager.start(output_dir) as session:
1057        if session is None:
1058            print_error("Mitmproxy unavailable, running without HTTP metrics")
1059            return _run_connector_command(
1060                connector_image=connector_image,
1061                command=command,
1062                output_dir=output_dir,
1063                target_or_control=target_or_control,
1064                config_path=config_path,
1065                catalog_path=catalog_path,
1066                state_path=state_path,
1067                enable_debug_logs=enable_debug_logs,
1068            )
1069
1070        print_success(f"Started mitmproxy on {session.proxy_url}")
1071        result = _run_connector_command(
1072            connector_image=connector_image,
1073            command=command,
1074            output_dir=output_dir,
1075            target_or_control=target_or_control,
1076            config_path=config_path,
1077            catalog_path=catalog_path,
1078            state_path=state_path,
1079            proxy_url=session.proxy_url,
1080            enable_debug_logs=enable_debug_logs,
1081        )
1082
1083        http_metrics = parse_http_dump(session.dump_file_path)
1084        result["http_metrics"] = {
1085            "flow_count": http_metrics.flow_count,
1086            "duplicate_flow_count": http_metrics.duplicate_flow_count,
1087        }
1088        print_success(
1089            f"Captured {http_metrics.flow_count} HTTP flows "
1090            f"({http_metrics.duplicate_flow_count} duplicates)"
1091        )
1092        return result
1093
1094
1095@connector_app.command(name="regression-test")
1096def regression_test(
1097    skip_compare: Annotated[
1098        bool,
1099        Parameter(
1100            help="If True, skip comparison and run single-version tests only. "
1101            "If False (default), run comparison tests (target vs control)."
1102        ),
1103    ] = False,
1104    test_image: Annotated[
1105        str | None,
1106        Parameter(
1107            help="Test connector image with tag (e.g., airbyte/source-github:1.0.0). "
1108            "This is the image under test - in comparison mode, it's compared against control_image."
1109        ),
1110    ] = None,
1111    control_image: Annotated[
1112        str | None,
1113        Parameter(
1114            help="Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). "
1115            "Ignored if `skip_compare=True`."
1116        ),
1117    ] = None,
1118    connector_name: Annotated[
1119        str | None,
1120        Parameter(
1121            help="Connector name to build image from source (e.g., 'source-pokeapi'). "
1122            "If provided, builds the image locally with tag 'dev'. "
1123            "For comparison tests (default), this builds the target image. "
1124            "For single-version tests (skip_compare=True), this builds the test image."
1125        ),
1126    ] = None,
1127    repo_root: Annotated[
1128        str | None,
1129        Parameter(
1130            help="Path to the airbyte repo root. Required if connector_name is provided "
1131            "and the repo cannot be auto-detected."
1132        ),
1133    ] = None,
1134    command: Annotated[
1135        Literal["spec", "check", "discover", "read"],
1136        Parameter(help="The Airbyte command to run."),
1137    ] = "check",
1138    connection_id: Annotated[
1139        str | None,
1140        Parameter(
1141            help="Airbyte Cloud connection ID to fetch config/catalog from. "
1142            "Mutually exclusive with config-path/catalog-path. "
1143            "If provided, test_image/control_image can be auto-detected."
1144        ),
1145    ] = None,
1146    config_path: Annotated[
1147        str | None,
1148        Parameter(help="Path to the connector config JSON file."),
1149    ] = None,
1150    catalog_path: Annotated[
1151        str | None,
1152        Parameter(help="Path to the configured catalog JSON file (required for read)."),
1153    ] = None,
1154    state_path: Annotated[
1155        str | None,
1156        Parameter(help="Path to the state JSON file (optional for read)."),
1157    ] = None,
1158    output_dir: Annotated[
1159        str,
1160        Parameter(help="Directory to store test artifacts."),
1161    ] = "/tmp/regression_test_artifacts",
1162    enable_http_metrics: Annotated[
1163        bool,
1164        Parameter(
1165            help="Capture HTTP traffic metrics via mitmproxy (experimental). "
1166            "Requires mitmdump to be installed. Only used in comparison mode."
1167        ),
1168    ] = False,
1169    selected_streams: Annotated[
1170        str | None,
1171        Parameter(
1172            help="Comma-separated list of stream names to include in the read. "
1173            "Only these streams will be included in the configured catalog. "
1174            "This is useful to limit data volume by testing only specific streams."
1175        ),
1176    ] = None,
1177    enable_debug_logs: Annotated[
1178        bool,
1179        Parameter(
1180            help="Enable debug-level logging for regression test output. "
1181            "Also passed as `LOG_LEVEL=DEBUG` to the connector Docker container."
1182        ),
1183    ] = False,
1184    with_state: Annotated[
1185        bool | None,
1186        Parameter(
1187            negative="--no-state",
1188            help="Fetch and pass the connection's current state to the read command, "
1189            "producing a warm read instead of a cold read. Defaults to `True` when "
1190            "`--connection-id` is provided, `False` otherwise. Has no effect unless "
1191            "the command is `read`. Ignored when `--state-path` is explicitly provided.",
1192        ),
1193    ] = None,
1194) -> None:
1195    """Run regression tests on connectors.
1196
1197    This command supports two modes:
1198
1199    Comparison mode (skip_compare=False, default):
1200        Runs the specified Airbyte protocol command against both the target (new)
1201        and control (baseline) connector versions, then compares the results.
1202        This helps identify regressions between versions.
1203
1204    Single-version mode (skip_compare=True):
1205        Runs the specified Airbyte protocol command against a single connector
1206        and validates the output. No comparison is performed.
1207
1208    Results are written to the output directory and to GitHub Actions outputs
1209    if running in CI.
1210
1211    You can provide the test image in three ways:
1212    1. --test-image: Use a pre-built image from Docker registry
1213    2. --connector-name: Build the image locally from source code
1214    3. --connection-id: Auto-detect from an Airbyte Cloud connection
1215
1216    You can provide config/catalog either via file paths OR via a connection_id
1217    that fetches them from Airbyte Cloud.
1218    """
1219    # Configure debug logging for the regression test harness when requested
1220    if enable_debug_logs:
1221        logging.basicConfig(
1222            level=logging.DEBUG,
1223            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
1224            force=True,
1225        )
1226
1227    output_path = Path(output_dir)
1228    output_path.mkdir(parents=True, exist_ok=True)
1229
1230    cmd = Command(command)
1231
1232    config_file: Path | None = None
1233    catalog_file: Path | None = None
1234    state_file = Path(state_path) if state_path else None
1235
1236    # Resolve the test image (used in both single-version and comparison modes)
1237    resolved_test_image: str | None = test_image
1238    resolved_control_image: str | None = control_image
1239
1240    # Validate conflicting parameters
1241    # Single-version mode: reject comparison-specific parameters
1242    if skip_compare and control_image:
1243        write_github_output("success", False)
1244        write_github_output(
1245            "error", "Cannot specify control_image with skip_compare=True"
1246        )
1247        exit_with_error(
1248            "Cannot specify --control-image with --skip-compare. "
1249            "Control image is only used in comparison mode."
1250        )
1251
1252    # If connector_name is provided, build the image from source
1253    if connector_name:
1254        if resolved_test_image:
1255            write_github_output("success", False)
1256            write_github_output(
1257                "error", "Cannot specify both test_image and connector_name"
1258            )
1259            exit_with_error("Cannot specify both --test-image and --connector-name")
1260
1261        repo_root_path = Path(repo_root) if repo_root else None
1262        built_image = _build_connector_image_from_source(
1263            connector_name=connector_name,
1264            repo_root=repo_root_path,
1265            tag="dev",
1266        )
1267        if not built_image:
1268            write_github_output("success", False)
1269            write_github_output("error", f"Failed to build image for {connector_name}")
1270            exit_with_error(f"Failed to build image for {connector_name}")
1271        resolved_test_image = built_image
1272
1273    if connection_id:
1274        if config_path or catalog_path:
1275            write_github_output("success", False)
1276            write_github_output(
1277                "error", "Cannot specify both connection_id and file paths"
1278            )
1279            exit_with_error(
1280                "Cannot specify both connection_id and config_path/catalog_path"
1281            )
1282
1283        print_success(f"Fetching config/catalog from connection: {connection_id}")
1284        connection_data = fetch_connection_data(connection_id)
1285
1286        # Check if we should retrieve unmasked secrets
1287        if should_use_secret_retriever():
1288            print_success(
1289                "USE_CONNECTION_SECRET_RETRIEVER enabled - enriching config with unmasked secrets..."
1290            )
1291            try:
1292                connection_data = enrich_config_with_secrets(
1293                    connection_data,
1294                    retrieval_reason="Regression test with USE_CONNECTION_SECRET_RETRIEVER=true",
1295                )
1296                print_success("Successfully retrieved unmasked secrets from database")
1297            except SecretRetrievalError as e:
1298                write_github_output("success", False)
1299                write_github_output("error", str(e))
1300                exit_with_error(
1301                    f"{e}\n\n"
1302                    f"This connection cannot be used for regression testing. "
1303                    f"Please use a connection from a non-EU workspace, or use GSM-based "
1304                    f"integration test credentials instead (by omitting --connection-id)."
1305                )
1306
1307        # Resolve with_state default: True when connection_id is provided
1308        resolved_with_state = with_state if with_state is not None else True
1309
1310        if resolved_with_state and not state_file and command == "read":
1311            print_success("Fetching connection state for warm read...")
1312            try:
1313                connection_data.state = fetch_connection_state(connection_data)
1314                if connection_data.state:
1315                    print_success(
1316                        f"Fetched state with {len(connection_data.state)} stream(s)"
1317                    )
1318                else:
1319                    connection_data.state = None
1320                    print_success(
1321                        "No state available for this connection (initial sync)"
1322                    )
1323            except Exception as exc:
1324                print_error(f"Failed to fetch connection state: {exc}")
1325                print_success("Falling back to cold read (no state)")
1326
1327        config_file, catalog_file, state_file_from_conn = save_connection_data_to_files(
1328            connection_data, output_path / "connection_data"
1329        )
1330
1331        if state_file_from_conn and not state_file:
1332            state_file = state_file_from_conn
1333
1334        print_success(
1335            f"Fetched config for source: {connection_data.source_name} "
1336            f"with {len(connection_data.stream_names)} streams"
1337        )
1338
1339        # Auto-detect test/control image from connection if not provided
1340        if not resolved_test_image and connection_data.connector_image:
1341            resolved_test_image = connection_data.connector_image
1342            print_success(f"Auto-detected test image: {resolved_test_image}")
1343
1344        if (
1345            not skip_compare
1346            and not resolved_control_image
1347            and connection_data.connector_image
1348        ):
1349            resolved_control_image = connection_data.connector_image
1350            print_success(f"Auto-detected control image: {resolved_control_image}")
1351    elif config_path:
1352        config_file = Path(config_path)
1353        catalog_file = Path(catalog_path) if catalog_path else None
1354    elif connector_name:
1355        # Fallback: fetch integration test secrets from GSM using PyAirbyte API
1356        print_success(
1357            f"No connection_id or config_path provided. "
1358            f"Attempting to fetch integration test config from GSM for {connector_name}..."
1359        )
1360        gsm_config = get_first_config_from_secrets(connector_name)
1361        if gsm_config:
1362            # Write config to a temp file (not in output_path to avoid artifact upload)
1363            gsm_config_dir = Path(
1364                tempfile.mkdtemp(prefix=f"gsm-config-{connector_name}-")
1365            )
1366            gsm_config_dir.chmod(0o700)
1367            gsm_config_file = gsm_config_dir / "config.json"
1368            gsm_config_file.write_text(json.dumps(gsm_config, indent=2))
1369            gsm_config_file.chmod(0o600)
1370            config_file = gsm_config_file
1371            # Use catalog_path if provided (e.g., generated from discover output)
1372            catalog_file = Path(catalog_path) if catalog_path else None
1373            print_success(
1374                f"Fetched integration test config from GSM for {connector_name}"
1375            )
1376        else:
1377            print_error(
1378                f"Failed to fetch integration test config from GSM for {connector_name}."
1379            )
1380            config_file = None
1381            # Use catalog_path if provided (e.g., generated from discover output)
1382            catalog_file = Path(catalog_path) if catalog_path else None
1383    else:
1384        config_file = None
1385        catalog_file = Path(catalog_path) if catalog_path else None
1386
1387    # Auto-detect control_image from metadata.yaml if connector_name is provided (comparison mode only)
1388    if not skip_compare and not resolved_control_image and connector_name:
1389        resolved_control_image = _fetch_control_image_from_metadata(connector_name)
1390        if resolved_control_image:
1391            print_success(
1392                f"Auto-detected control image from metadata.yaml: {resolved_control_image}"
1393            )
1394
1395    # Validate that we have the required images
1396    if not resolved_test_image:
1397        write_github_output("success", False)
1398        write_github_output("error", "No test image specified")
1399        exit_with_error(
1400            "You must provide one of the following: a test_image, a connector_name "
1401            "to build the image from source, or a connection_id to auto-detect the image."
1402        )
1403
1404    if not skip_compare and not resolved_control_image:
1405        write_github_output("success", False)
1406        write_github_output("error", "No control image specified")
1407        exit_with_error(
1408            "You must provide one of the following: a control_image, a connection_id "
1409            "for a connection that has an associated connector image, or a connector_name "
1410            "to auto-detect the control image from the airbyte repo's metadata.yaml."
1411        )
1412
1413    # Pull images if they weren't just built locally
1414    # If connector_name was provided, we just built the test image locally
1415    if not connector_name and not ensure_image_available(resolved_test_image):
1416        write_github_output("success", False)
1417        write_github_output("error", f"Failed to pull image: {resolved_test_image}")
1418        exit_with_error(f"Failed to pull test image: {resolved_test_image}")
1419
1420    if (
1421        not skip_compare
1422        and resolved_control_image
1423        and not ensure_image_available(resolved_control_image)
1424    ):
1425        write_github_output("success", False)
1426        write_github_output("error", f"Failed to pull image: {resolved_control_image}")
1427        exit_with_error(
1428            f"Failed to pull control connector image: {resolved_control_image}"
1429        )
1430
1431    # Apply selected_streams filter to catalog if requested
1432    if selected_streams and catalog_file:
1433        streams_set = {s.strip() for s in selected_streams.split(",") if s.strip()}
1434        if streams_set:
1435            print_success(
1436                f"Filtering catalog to {len(streams_set)} selected streams: "
1437                f"{', '.join(sorted(streams_set))}"
1438            )
1439            filter_configured_catalog_file(catalog_file, streams_set)
1440
1441    # Upgrade READ → READ_WITH_STATE when a state file is available
1442    if cmd == Command.READ and state_file:
1443        cmd = Command.READ_WITH_STATE
1444        print_success("State available — using read-with-state (warm read)")
1445
1446    # Track telemetry for the regression test
1447    # Extract version from image tag (e.g., "airbyte/source-github:1.0.0" -> "1.0.0")
1448    target_version = (
1449        resolved_test_image.rsplit(":", 1)[-1]
1450        if ":" in resolved_test_image
1451        else "unknown"
1452    )
1453    control_version = None
1454    if resolved_control_image and ":" in resolved_control_image:
1455        control_version = resolved_control_image.rsplit(":", 1)[-1]
1456
1457    # Get tester identity from environment (GitHub Actions sets GITHUB_ACTOR)
1458    tester = os.getenv("GITHUB_ACTOR") or os.getenv("USER")
1459
1460    track_regression_test(
1461        user_id=tester,
1462        connector_image=resolved_test_image,
1463        command=command,
1464        target_version=target_version,
1465        control_version=control_version,
1466        additional_properties={
1467            "connection_id": connection_id,
1468            "skip_compare": skip_compare,
1469            "with_state": cmd == Command.READ_WITH_STATE,
1470        },
1471    )
1472
1473    # Execute the appropriate mode
1474    if skip_compare:
1475        # Single-version mode: run only the connector image
1476        result = _run_connector_command(
1477            connector_image=resolved_test_image,
1478            command=cmd,
1479            output_dir=output_path,
1480            target_or_control=TargetOrControl.TARGET,
1481            config_path=config_file,
1482            catalog_path=catalog_file,
1483            state_path=state_file,
1484            enable_debug_logs=enable_debug_logs,
1485        )
1486
1487        print_json(result)
1488
1489        write_github_outputs(
1490            {
1491                "success": result["success"],
1492                "connector": resolved_test_image,
1493                "command": command,
1494                "exit_code": result["exit_code"],
1495            }
1496        )
1497
1498        write_test_summary(
1499            connector_image=resolved_test_image,
1500            test_type="regression-test",
1501            success=result["success"],
1502            results={
1503                "command": command,
1504                "exit_code": result["exit_code"],
1505                "output_dir": output_dir,
1506            },
1507        )
1508
1509        # Generate report.md with detailed metrics
1510        report_path = generate_single_version_report(
1511            connector_image=resolved_test_image,
1512            command=command,
1513            result=result,
1514            output_dir=output_path,
1515        )
1516        print_success(f"Generated report: {report_path}")
1517
1518        # Write report to GITHUB_STEP_SUMMARY (if env var exists)
1519        write_github_summary(report_path.read_text())
1520
1521        if result["success"]:
1522            print_success(
1523                f"Single-version regression test passed for {resolved_test_image}"
1524            )
1525        else:
1526            print_error(
1527                f"Single-version regression test failed for {resolved_test_image}"
1528            )
1529    else:
1530        # Comparison mode: run both target and control images
1531        target_output = output_path / "target"
1532        control_output = output_path / "control"
1533
1534        target_result = _run_with_optional_http_metrics(
1535            connector_image=resolved_test_image,
1536            command=cmd,
1537            output_dir=target_output,
1538            target_or_control=TargetOrControl.TARGET,
1539            enable_http_metrics=enable_http_metrics,
1540            config_path=config_file,
1541            catalog_path=catalog_file,
1542            state_path=state_file,
1543            enable_debug_logs=enable_debug_logs,
1544        )
1545
1546        control_result = _run_with_optional_http_metrics(
1547            connector_image=resolved_control_image,  # type: ignore[arg-type]
1548            command=cmd,
1549            output_dir=control_output,
1550            target_or_control=TargetOrControl.CONTROL,
1551            enable_http_metrics=enable_http_metrics,
1552            config_path=config_file,
1553            catalog_path=catalog_file,
1554            state_path=state_file,
1555            enable_debug_logs=enable_debug_logs,
1556        )
1557
1558        both_succeeded = target_result["success"] and control_result["success"]
1559        regression_detected = target_result["success"] != control_result["success"]
1560
1561        combined_result = {
1562            "target": target_result,
1563            "control": control_result,
1564            "both_succeeded": both_succeeded,
1565            "regression_detected": regression_detected,
1566        }
1567
1568        print_json(combined_result)
1569
1570        both_failed = not target_result["success"] and not control_result["success"]
1571
1572        write_github_outputs(
1573            {
1574                "success": not regression_detected,
1575                "target_image": resolved_test_image,
1576                "control_image": resolved_control_image,
1577                "command": command,
1578                "target_exit_code": target_result["exit_code"],
1579                "control_exit_code": control_result["exit_code"],
1580                "regression_detected": regression_detected,
1581                "both_failed": both_failed,
1582            }
1583        )
1584
1585        write_json_output("regression_report", combined_result)
1586
1587        report_path = generate_regression_report(
1588            target_image=resolved_test_image,
1589            control_image=resolved_control_image,  # type: ignore[arg-type]
1590            command=command,
1591            target_result=target_result,
1592            control_result=control_result,
1593            output_dir=output_path,
1594        )
1595        print_success(f"Generated regression report: {report_path}")
1596
1597        # Write report to GITHUB_STEP_SUMMARY (if env var exists)
1598        write_github_summary(report_path.read_text())
1599
1600        if regression_detected:
1601            print_error(
1602                f"Regression detected between {resolved_test_image} and {resolved_control_image}"
1603            )
1604        elif both_succeeded:
1605            print_success(
1606                f"Regression test passed for {resolved_test_image} vs {resolved_control_image}"
1607            )
1608        else:
1609            # Both versions failed, but no regression between them was detected; treat this as a
1610            # test environment issue (e.g., expired credentials, transient API errors, rate limiting).
1611            # Exit successfully since no regression between versions was detected.
1612            print_warning(
1613                f"Both versions failed for {resolved_test_image} vs {resolved_control_image}. "
1614                "No regression detected. This may indicate expired credentials or a transient API issue."
1615            )
1616
1617
1618@connector_app.command(name="fetch-connection-config")
1619def fetch_connection_config_cmd(
1620    connection_id: Annotated[
1621        str,
1622        Parameter(help="The UUID of the Airbyte Cloud connection."),
1623    ],
1624    output_path: Annotated[
1625        str | None,
1626        Parameter(
1627            help="Path to output file or directory. "
1628            "If directory, writes connection-<id>-config.json inside it. "
1629            "Default: platform temp directory (e.g. /tmp/connection-<id>-config.json)"
1630        ),
1631    ] = None,
1632    with_secrets: Annotated[
1633        bool,
1634        Parameter(
1635            name="--with-secrets",
1636            negative="--no-secrets",
1637            help="If set, fetches unmasked secrets from the internal database. "
1638            "Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or `gcloud auth application-default login`. "
1639            "Must be used with --oc-issue-url.",
1640        ),
1641    ] = False,
1642    oc_issue_url: Annotated[
1643        str | None,
1644        Parameter(
1645            help="OC issue URL for audit logging. Required when using --with-secrets."
1646        ),
1647    ] = None,
1648) -> None:
1649    """Fetch connection configuration from Airbyte Cloud to a local file.
1650
1651    This command retrieves the source configuration for a given connection ID
1652    and writes it to a JSON file. When `--output-path` is omitted the file is
1653    written to the platform temp directory to avoid accidentally committing
1654    secrets to a git repository.
1655
1656    Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and
1657    AIRBYTE_CLOUD_CLIENT_SECRET environment variables.
1658
1659    When --with-secrets is specified, the command fetches unmasked secrets from
1660    the internal database using the connection-retriever. This additionally requires:
1661    - An OC issue URL for audit logging (--oc-issue-url)
1662    - GCP credentials via `GCP_PROD_DB_ACCESS_CREDENTIALS` env var or `gcloud auth application-default login`
1663    - If `CI=true`: expects `cloud-sql-proxy` running on localhost, or
1664      direct network access to the Cloud SQL instance.
1665    """
1666    path = Path(output_path) if output_path else None
1667    result = fetch_connection_config(
1668        connection_id=connection_id,
1669        output_path=path,
1670        with_secrets=with_secrets,
1671        oc_issue_url=oc_issue_url,
1672    )
1673    if result.success:
1674        print_success(result.message)
1675    else:
1676        print_error(result.message)
1677    print_json(result.model_dump())
1678
1679
1680def _read_json_input(
1681    json_str: str | None,
1682    input_file: str | None,
1683) -> dict:
1684    """Read JSON from a positional arg, --input file, or STDIN (in that priority)."""
1685    if json_str is not None:
1686        return json.loads(json_str)
1687    if input_file is not None:
1688        return json.loads(Path(input_file).read_text())
1689    if not sys.stdin.isatty():
1690        return json.loads(sys.stdin.read())
1691    exit_with_error(
1692        "No JSON input provided. Pass it as a positional argument, "
1693        "via --input <file>, or pipe to STDIN."
1694    )
1695    raise SystemExit(1)  # unreachable, but satisfies type checker
1696
1697
1698@state_app.command(name="get")
1699def get_connection_state(
1700    connection_id: Annotated[
1701        str,
1702        Parameter(help="The connection ID (UUID) to fetch state for."),
1703    ],
1704    stream_name: Annotated[
1705        str | None,
1706        Parameter(
1707            help="Optional stream name to filter state for a single stream.",
1708            name="--stream",
1709        ),
1710    ] = None,
1711    stream_namespace: Annotated[
1712        str | None,
1713        Parameter(
1714            help="Optional stream namespace to narrow the stream filter.",
1715            name="--namespace",
1716        ),
1717    ] = None,
1718    output: Annotated[
1719        str | None,
1720        Parameter(
1721            help="Path to write the state JSON output to a file.",
1722            name="--output",
1723        ),
1724    ] = None,
1725) -> None:
1726    """Get the current state for an Airbyte Cloud connection."""
1727    workspace = CloudWorkspace.from_env()
1728    conn = workspace.get_connection(connection_id)
1729
1730    if stream_name is not None:
1731        stream_state = conn.get_stream_state(
1732            stream_name=stream_name,
1733            stream_namespace=stream_namespace,
1734        )
1735        result = {
1736            "connection_id": connection_id,
1737            "stream_name": stream_name,
1738            "stream_namespace": stream_namespace,
1739            "stream_state": stream_state,
1740        }
1741    else:
1742        result = conn.dump_raw_state()
1743
1744    json_output = json.dumps(result, indent=2, default=str)
1745    if output is not None:
1746        Path(output).write_text(json_output + "\n")
1747        print(f"State written to {output}", file=sys.stderr)
1748    else:
1749        print(json_output)
1750
1751
1752@state_app.command(name="set")
1753def set_connection_state(
1754    connection_id: Annotated[
1755        str,
1756        Parameter(help="The connection ID (UUID) to update state for."),
1757    ],
1758    state_json: Annotated[
1759        str | None,
1760        Parameter(
1761            help="The connection state as a JSON string. When --stream is used, "
1762            'this is just the stream\'s state blob (e.g., \'{"cursor": "2024-01-01"}\').'
1763            " Otherwise, must include 'stateType', 'connectionId', and the appropriate "
1764            "state field. Can also be provided via --input or STDIN."
1765        ),
1766    ] = None,
1767    stream_name: Annotated[
1768        str | None,
1769        Parameter(
1770            help="Optional stream name to update state for a single stream only.",
1771            name="--stream",
1772        ),
1773    ] = None,
1774    stream_namespace: Annotated[
1775        str | None,
1776        Parameter(
1777            help="Optional stream namespace to identify the stream.",
1778            name="--namespace",
1779        ),
1780    ] = None,
1781    input_file: Annotated[
1782        str | None,
1783        Parameter(
1784            help="Path to a JSON file containing the state to set.",
1785            name="--input",
1786        ),
1787    ] = None,
1788) -> None:
1789    """Set the state for an Airbyte Cloud connection.
1790
1791    State JSON can be provided as a positional argument, via --input <file>,
1792    or piped through STDIN.
1793
1794    Uses the safe variant that prevents updates while a sync is running.
1795    When --stream is provided, only that stream's state is updated within
1796    the existing connection state.
1797    """
1798    parsed_state = _read_json_input(state_json, input_file)
1799    workspace = CloudWorkspace.from_env()
1800    conn = workspace.get_connection(connection_id)
1801
1802    if stream_name is not None:
1803        conn.set_stream_state(
1804            stream_name=stream_name,
1805            state_blob_dict=parsed_state,
1806            stream_namespace=stream_namespace,
1807        )
1808    else:
1809        conn.import_raw_state(parsed_state)
1810
1811    result = conn.dump_raw_state()
1812    json_output = json.dumps(result, indent=2, default=str)
1813    print(json_output)
1814
1815
1816@catalog_app.command(name="get")
1817def get_connection_catalog(
1818    connection_id: Annotated[
1819        str,
1820        Parameter(help="The connection ID (UUID) to fetch catalog for."),
1821    ],
1822    output: Annotated[
1823        str | None,
1824        Parameter(
1825            help="Path to write the catalog JSON output to a file.",
1826            name="--output",
1827        ),
1828    ] = None,
1829) -> None:
1830    """Get the configured catalog for an Airbyte Cloud connection."""
1831    workspace = CloudWorkspace.from_env()
1832    conn = workspace.get_connection(connection_id)
1833    result = conn.dump_raw_catalog()
1834    if result is None:
1835        exit_with_error("No configured catalog found for this connection.")
1836        raise SystemExit(1)  # unreachable, but satisfies type checker
1837
1838    json_output = json.dumps(result, indent=2, default=str)
1839    if output is not None:
1840        Path(output).write_text(json_output + "\n")
1841        print(f"Catalog written to {output}", file=sys.stderr)
1842    else:
1843        print(json_output)
1844
1845
1846@catalog_app.command(name="set")
1847def set_connection_catalog(
1848    connection_id: Annotated[
1849        str,
1850        Parameter(help="The connection ID (UUID) to update catalog for."),
1851    ],
1852    catalog_json: Annotated[
1853        str | None,
1854        Parameter(
1855            help="The configured catalog as a JSON string. "
1856            "Can also be provided via --input or STDIN."
1857        ),
1858    ] = None,
1859    input_file: Annotated[
1860        str | None,
1861        Parameter(
1862            help="Path to a JSON file containing the catalog to set.",
1863            name="--input",
1864        ),
1865    ] = None,
1866) -> None:
1867    """Set the configured catalog for an Airbyte Cloud connection.
1868
1869    Catalog JSON can be provided as a positional argument, via --input <file>,
1870    or piped through STDIN.
1871
1872    WARNING: This replaces the entire configured catalog.
1873    """
1874    parsed_catalog = _read_json_input(catalog_json, input_file)
1875    workspace = CloudWorkspace.from_env()
1876    conn = workspace.get_connection(connection_id)
1877    conn.import_raw_catalog(parsed_catalog)
1878
1879    result = conn.dump_raw_catalog()
1880    if result is None:
1881        exit_with_error("Failed to retrieve catalog after import.")
1882        raise SystemExit(1)  # unreachable, but satisfies type checker
1883
1884    json_output = json.dumps(result, indent=2, default=str)
1885    print(json_output)
1886
1887
1888@logs_app.command(name="lookup-cloud-backend-error")
1889def lookup_cloud_backend_error(
1890    error_id: Annotated[
1891        str,
1892        Parameter(
1893            help=(
1894                "The error ID (UUID) to search for. This is typically returned "
1895                "in API error responses as {'errorId': '...'}"
1896            )
1897        ),
1898    ],
1899    lookback_days: Annotated[
1900        int,
1901        Parameter(help="Number of days to look back in logs."),
1902    ] = 7,
1903    min_severity_filter: Annotated[
1904        GCPSeverity | None,
1905        Parameter(
1906            help="Optional minimum severity level to filter logs.",
1907        ),
1908    ] = None,
1909    raw: Annotated[
1910        bool,
1911        Parameter(help="Output raw JSON instead of formatted text."),
1912    ] = False,
1913) -> None:
1914    """Look up error details from GCP Cloud Logging by error ID.
1915
1916    When an Airbyte Cloud API returns an error response with only an error ID
1917    (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command
1918    fetches the full stack trace and error details from GCP Cloud Logging.
1919
1920    Requires GCP credentials with Logs Viewer role on the target project.
1921    Set up credentials with: gcloud auth application-default login
1922    """
1923    print(f"Searching for error ID: {error_id}", file=sys.stderr)
1924    print(f"Lookback days: {lookback_days}", file=sys.stderr)
1925    if min_severity_filter:
1926        print(f"Severity filter: {min_severity_filter}", file=sys.stderr)
1927    print(file=sys.stderr)
1928
1929    result = fetch_error_logs(
1930        error_id=error_id,
1931        lookback_days=lookback_days,
1932        min_severity_filter=min_severity_filter,
1933    )
1934
1935    if raw:
1936        print_json(result.model_dump())
1937        return
1938
1939    print(f"Found {result.total_entries_found} log entries", file=sys.stderr)
1940    print(file=sys.stderr)
1941
1942    if result.payloads:
1943        for i, payload in enumerate(result.payloads):
1944            print(f"=== Log Group {i + 1} ===")
1945            print(f"Timestamp: {payload.timestamp}")
1946            print(f"Severity: {payload.severity}")
1947            if payload.resource.labels.pod_name:
1948                print(f"Pod: {payload.resource.labels.pod_name}")
1949            print(f"Lines: {payload.num_log_lines}")
1950            print()
1951            print(payload.message)
1952            print()
1953    elif result.entries:
1954        print("No grouped payloads, showing raw entries:", file=sys.stderr)
1955        for entry in result.entries:
1956            print(f"[{entry.timestamp}] {entry.severity}: {entry.payload}")
1957    else:
1958        print_error("No log entries found for this error ID.")