airbyte_ops_mcp.cli.cloud

CLI commands for Airbyte Cloud operations.

Commands:

airbyte-ops cloud connector get-version-info - Get connector version info airbyte-ops cloud connector set-version-override - Set connector version override airbyte-ops cloud connector clear-version-override - Clear connector version override airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file

CLI reference

The commands below are regenerated by poe docs-generate via cyclopts's programmatic docs API; see docs/generate_cli.py.

airbyte-ops cloud COMMAND

Airbyte Cloud operations.

Commands:

  • connection: Connection operations in Airbyte Cloud.
  • connector: Deployed connector operations in Airbyte Cloud.
  • db: Database operations for Airbyte Cloud Prod DB Replica.
  • logs: GCP Cloud Logging operations for Airbyte Cloud.

airbyte-ops cloud connector

Deployed connector operations in Airbyte Cloud.

airbyte-ops cloud connector get-version-info

airbyte-ops cloud connector get-version-info WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE

Get the current version information for a deployed connector.

Parameters:

  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]
  • CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]

airbyte-ops cloud connector set-version-override

airbyte-ops cloud connector set-version-override VERSION REASON ISSUE-URL APPROVAL-COMMENT-URL [ARGS]

Set a version override for a deployed connector.

Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.

The --override-level flag selects the scope at which the pin is applied:

  • actor (default): pins a single deployed connector instance. Requires --workspace-id, --connector-id, and --connector-type.
  • workspace: pins ALL instances of a connector type within a workspace. Requires --workspace-id and --actor-definition-id.
  • organization: pins ALL instances of a connector type across an organization. Requires --organization-id and --actor-definition-id.

The customer_tier_filter gates the operation: the call fails if the actual tier of the target organization does not match. Use ALL to proceed regardless of tier (a warning is shown for sensitive tiers).

Parameters:

  • VERSION, --version: The semver version string to pin to (e.g., '2.1.5-preview.abc1234'). [required]
  • REASON, --reason: Explanation for the override (min 10 characters). [required]
  • ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]
  • APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]
  • OVERRIDE-LEVEL, --override-level: Scope at which to apply the version override: 'actor' (single deployed connector instance), 'workspace' (all instances of a connector type within a workspace), or 'organization' (all instances across an organization). Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]
  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.
  • ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.
  • CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]
  • ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.
  • AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).
  • REASON-URL, --reason-url: Optional URL with more context (e.g., issue link).
  • CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]

airbyte-ops cloud connector clear-version-override

airbyte-ops cloud connector clear-version-override ISSUE-URL APPROVAL-COMMENT-URL [ARGS]

Clear a version override from a deployed connector.

Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.

The --override-level flag selects the scope at which the pin is removed:

  • actor (default): clears a single deployed connector instance pin. Requires --workspace-id, --connector-id, and --connector-type.
  • workspace: clears the workspace-level pin for a connector type. Requires --workspace-id and --actor-definition-id.
  • organization: clears the organization-level pin for a connector type. Requires --organization-id and --actor-definition-id.

Parameters:

  • ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]
  • APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]
  • OVERRIDE-LEVEL, --override-level: Scope at which to clear the version override: 'actor', 'workspace', or 'organization'. Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]
  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.
  • ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.
  • CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]
  • ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.
  • AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).
  • CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]

airbyte-ops cloud connector regression-test

airbyte-ops cloud connector regression-test [ARGS]

Run regression tests on connectors.

This command supports two modes:

Comparison mode (skip_compare=False, default): Runs the specified Airbyte protocol command against both the target (new) and control (baseline) connector versions, then compares the results. This helps identify regressions between versions.

Single-version mode (skip_compare=True): Runs the specified Airbyte protocol command against a single connector and validates the output. No comparison is performed.

Results are written to the output directory and to GitHub Actions outputs if running in CI.

You can provide the test image in three ways:

  1. --test-image: Use a pre-built image from Docker registry
  2. --connector-name: Build the image locally from source code
  3. --connection-id: Auto-detect from an Airbyte Cloud connection

You can provide config/catalog either via file paths OR via a connection_id that fetches them from Airbyte Cloud.

Parameters:

  • SKIP-COMPARE, --skip-compare, --no-skip-compare: If True, skip comparison and run single-version tests only. If False (default), run comparison tests (target vs control). [default: False]
  • TEST-IMAGE, --test-image: Test connector image with tag (e.g., airbyte/source-github:1.0.0). This is the image under test - in comparison mode, it's compared against control_image.
  • CONTROL-IMAGE, --control-image: Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). Ignored if skip_compare=True.
  • CONNECTOR-NAME, --connector-name: Connector name to build image from source (e.g., 'source-pokeapi'). If provided, builds the image locally with tag 'dev'. For comparison tests (default), this builds the target image. For single-version tests (skip_compare=True), this builds the test image.
  • REPO-ROOT, --repo-root: Path to the airbyte repo root. Required if connector_name is provided and the repo cannot be auto-detected.
  • COMMAND, --command: The Airbyte command to run. [choices: spec, check, discover, read] [default: check]
  • CONNECTION-ID, --connection-id: Airbyte Cloud connection ID to fetch config/catalog from. Mutually exclusive with config-path/catalog-path. If provided, test_image/control_image can be auto-detected.
  • CONFIG-PATH, --config-path: Path to the connector config JSON file.
  • CATALOG-PATH, --catalog-path: Path to the configured catalog JSON file (required for read).
  • STATE-PATH, --state-path: Path to the state JSON file (optional for read).
  • OUTPUT-DIR, --output-dir: Directory to store test artifacts. [default: /tmp/regression_test_artifacts]
  • ENABLE-HTTP-METRICS, --enable-http-metrics, --no-enable-http-metrics: Capture HTTP traffic metrics via mitmproxy (experimental). Requires mitmdump to be installed. Only used in comparison mode. [default: False]
  • SELECTED-STREAMS, --selected-streams: Comma-separated list of stream names to include in the read. Only these streams will be included in the configured catalog. This is useful to limit data volume by testing only specific streams.
  • ENABLE-DEBUG-LOGS, --enable-debug-logs, --no-enable-debug-logs: Enable debug-level logging for regression test output. Also passed as LOG_LEVEL=DEBUG to the connector Docker container. [default: False]
  • WITH-STATE, --with-state, --no-state: Fetch and pass the connection's current state to the read command, producing a warm read instead of a cold read. Defaults to True when --connection-id is provided, False otherwise. Has no effect unless the command is read. Ignored when --state-path is explicitly provided.

airbyte-ops cloud connector fetch-connection-config

airbyte-ops cloud connector fetch-connection-config CONNECTION-ID [ARGS]

Fetch connection configuration from Airbyte Cloud to a local file.

This command retrieves the source configuration for a given connection ID and writes it to a JSON file. When --output-path is omitted the file is written to the platform temp directory to avoid accidentally committing secrets to a git repository.

Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET environment variables.

When --with-secrets is specified, the command fetches unmasked secrets from the internal database using the connection-retriever. This additionally requires:

  • An OC issue URL for audit logging (--oc-issue-url)
  • GCP credentials via GCP_PROD_DB_ACCESS_CREDENTIALS env var or gcloud auth application-default login
  • Cloud SQL Python Connector access to the Prod DB replica.

Parameters:

  • CONNECTION-ID, --connection-id: The UUID of the Airbyte Cloud connection. [required]
  • OUTPUT-PATH, --output-path: Path to output file or directory. If directory, writes connection--config.json inside it. Default: platform temp directory (e.g. /tmp/connection--config.json)
  • WITH-SECRETS, --with-secrets, --no-secrets: If set, fetches unmasked secrets from the internal database. Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or gcloud auth application-default login. Must be used with --oc-issue-url. [default: False]
  • OC-ISSUE-URL, --oc-issue-url: OC issue URL for audit logging. Required when using --with-secrets.

airbyte-ops cloud db

Database operations for Airbyte Cloud Prod DB Replica.

airbyte-ops cloud db start-proxy

airbyte-ops cloud db start-proxy [ARGS]

Start the Cloud SQL Proxy for database access.

This command starts the Cloud SQL Auth Proxy to enable connections to the Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.

By default, runs as a daemon (background process). Use --no-daemon to run in foreground mode where you can see logs and stop with Ctrl+C.

Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, which should contain the service account JSON credentials.

After starting the proxy, set these environment variables to use database tools: export USE_CLOUD_SQL_PROXY=1 export DB_PORT={port}

Parameters:

  • PORT, --port: Port for the Cloud SQL Proxy to listen on. [default: 15432]
  • DAEMON, --daemon, --no-daemon: Run as daemon in background (default). Use --no-daemon for foreground. [default: True]

airbyte-ops cloud db stop-proxy

airbyte-ops cloud db stop-proxy

Stop the Cloud SQL Proxy daemon.

This command stops a Cloud SQL Proxy that was started with 'start-proxy'. It reads the PID from the PID file and sends a SIGTERM signal to stop the process.

airbyte-ops cloud connection

Connection operations in Airbyte Cloud.

airbyte-ops cloud connection state

Connection state operations in Airbyte Cloud.

Commands:

  • get: Get the current state for an Airbyte Cloud connection.
  • reset: Reset a configured stream's state so the next sync full-refreshes it.
  • set: Set the state for an Airbyte Cloud connection.
airbyte-ops cloud connection state get
airbyte-ops cloud connection state get CONNECTION-ID [ARGS]

Get the current state for an Airbyte Cloud connection.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch state for. [required]
  • STREAM, --stream: Optional stream name to filter state for a single stream.
  • NAMESPACE, --namespace: Optional stream namespace to narrow the stream filter.
  • OUTPUT, --output: Path to write the state JSON output to a file.
airbyte-ops cloud connection state set
airbyte-ops cloud connection state set CONNECTION-ID [ARGS]

Set the state for an Airbyte Cloud connection.

State JSON can be provided as a positional argument, via --input , or piped through STDIN.

Uses the safe variant that prevents updates while a sync is running. When --stream is provided, only that stream's state is updated within the existing connection state.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]
  • STATE-JSON, --state-json: The connection state as a JSON string. When --stream is used, this is just the stream's state blob (e.g., '{"cursor": "2024-01-01"}'). Otherwise, must include 'stateType', 'connectionId', and the appropriate state field. Can also be provided via --input or STDIN.
  • STREAM, --stream: Optional stream name to update state for a single stream only.
  • NAMESPACE, --namespace: Optional stream namespace to identify the stream.
  • INPUT, --input: Path to a JSON file containing the state to set.
airbyte-ops cloud connection state reset
airbyte-ops cloud connection state reset CONNECTION-ID STREAM [ARGS]

Reset a configured stream's state so the next sync full-refreshes it.

Uses the safe variant that prevents updates while a sync is running. Returns a restorable previous_state_backup in raw Config API format.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]
  • STREAM, --stream: The configured stream name whose state should be reset. [required]
  • NAMESPACE, --namespace: Optional stream namespace to identify the stream.

airbyte-ops cloud connection catalog

Connection catalog operations in Airbyte Cloud.

Commands:

  • get: Get the configured catalog for an Airbyte Cloud connection.
  • set: Set the configured catalog for an Airbyte Cloud connection.
airbyte-ops cloud connection catalog get
airbyte-ops cloud connection catalog get CONNECTION-ID [ARGS]

Get the configured catalog for an Airbyte Cloud connection.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch catalog for. [required]
  • OUTPUT, --output: Path to write the catalog JSON output to a file.
airbyte-ops cloud connection catalog set
airbyte-ops cloud connection catalog set CONNECTION-ID [ARGS]

Set the configured catalog for an Airbyte Cloud connection.

Catalog JSON can be provided as a positional argument, via --input , or piped through STDIN.

WARNING: This replaces the entire configured catalog.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update catalog for. [required]
  • CATALOG-JSON, --catalog-json: The configured catalog as a JSON string. Can also be provided via --input or STDIN.
  • INPUT, --input: Path to a JSON file containing the catalog to set.

airbyte-ops cloud logs

GCP Cloud Logging operations for Airbyte Cloud.

airbyte-ops cloud logs lookup-cloud-backend-error

airbyte-ops cloud logs lookup-cloud-backend-error ERROR-ID [ARGS]

Look up error details from GCP Cloud Logging by error ID.

When an Airbyte Cloud API returns an error response with only an error ID (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command fetches the full stack trace and error details from GCP Cloud Logging.

Requires GCP credentials with Logs Viewer role on the target project. Set up credentials with: gcloud auth application-default login

Parameters:

  • ERROR-ID, --error-id: The error ID (UUID) to search for. This is typically returned in API error responses as {'errorId': '...'} [required]
  • LOOKBACK-DAYS, --lookback-days: Number of days to look back in logs. [default: 7]
  • MIN-SEVERITY-FILTER, --min-severity-filter: Optional minimum severity level to filter logs. [choices: debug, info, notice, warning, error, critical, alert, emergency]
  • RAW, --raw, --no-raw: Output raw JSON instead of formatted text. [default: False]
   1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
   2"""CLI commands for Airbyte Cloud operations.
   3
   4Commands:
   5    airbyte-ops cloud connector get-version-info - Get connector version info
   6    airbyte-ops cloud connector set-version-override - Set connector version override
   7    airbyte-ops cloud connector clear-version-override - Clear connector version override
   8    airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison)
   9    airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file
  10
  11## CLI reference
  12
  13The commands below are regenerated by `poe docs-generate` via cyclopts's
  14programmatic docs API; see `docs/generate_cli.py`.
  15
  16.. include:: ../../../docs/generated/cli/cloud.md
  17   :start-line: 2
  18"""
  19
  20from __future__ import annotations
  21
  22# Hide Python-level members from the pdoc page for this module; the rendered
  23# docs for this CLI group come entirely from the grafted `.. include::` in
  24# the module docstring above.
  25__all__: list[str] = []
  26
  27import json
  28import logging
  29import os
  30import shutil
  31import signal
  32import socket
  33import subprocess
  34import sys
  35import tempfile
  36import time
  37from pathlib import Path
  38from typing import Annotated, Literal
  39
  40import requests
  41import yaml
  42from airbyte.cloud.workspaces import CloudWorkspace
  43from airbyte.exceptions import PyAirbyteInputError
  44from airbyte_cdk.models.connector_metadata import MetadataFile
  45from airbyte_cdk.utils.connector_paths import find_connector_root_from_name
  46from airbyte_cdk.utils.docker import build_connector_image, verify_docker_installation
  47from airbyte_protocol.models import ConfiguredAirbyteCatalog
  48from cyclopts import Parameter
  49
  50from airbyte_ops_mcp.cli._base import App, app
  51from airbyte_ops_mcp.cli._shared import (
  52    exit_with_error,
  53    print_error,
  54    print_json,
  55    print_success,
  56    print_warning,
  57)
  58from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError
  59from airbyte_ops_mcp.cloud_admin.connection_config import fetch_connection_config
  60from airbyte_ops_mcp.cloud_admin.connection_state import (
  61    reset_stream_state as reset_cloud_stream_state,
  62)
  63from airbyte_ops_mcp.cloud_admin.registry_lookup import (
  64    resolve_definition_id_to_canonical_info,
  65)
  66from airbyte_ops_mcp.cloud_admin.version_overrides import (
  67    ResolvedCloudAuth,
  68    VersionOverrideTarget,
  69    get_connector_version_info,
  70)
  71from airbyte_ops_mcp.cloud_admin.version_overrides import (
  72    set_version_override as set_cloud_version_override,
  73)
  74from airbyte_ops_mcp.constants import (
  75    CLOUD_SQL_INSTANCE,
  76    CLOUD_SQL_PROXY_PID_FILE,
  77    DEFAULT_CLOUD_SQL_PROXY_PORT,
  78    ENV_GCP_PROD_DB_ACCESS_CREDENTIALS,
  79)
  80from airbyte_ops_mcp.gcp_logs import GCPSeverity, fetch_error_logs
  81from airbyte_ops_mcp.regression_tests.cdk_secrets import get_first_config_from_secrets
  82from airbyte_ops_mcp.regression_tests.ci_output import (
  83    generate_regression_report,
  84    generate_single_version_report,
  85    write_github_output,
  86    write_github_outputs,
  87    write_github_summary,
  88    write_json_output,
  89    write_test_summary,
  90)
  91from airbyte_ops_mcp.regression_tests.config_overrides import (
  92    filter_configured_catalog_file,
  93)
  94from airbyte_ops_mcp.regression_tests.connection_fetcher import (
  95    fetch_connection_data,
  96    fetch_connection_state,
  97    save_connection_data_to_files,
  98)
  99from airbyte_ops_mcp.regression_tests.connection_secret_retriever import (
 100    SecretRetrievalError,
 101    enrich_config_with_secrets,
 102    should_use_secret_retriever,
 103)
 104from airbyte_ops_mcp.regression_tests.connector_runner import (
 105    ConnectorRunner,
 106    ensure_image_available,
 107)
 108from airbyte_ops_mcp.regression_tests.http_metrics import (
 109    MitmproxyManager,
 110    parse_http_dump,
 111)
 112from airbyte_ops_mcp.regression_tests.models import (
 113    Command,
 114    ConnectorUnderTest,
 115    ExecutionInputs,
 116    TargetOrControl,
 117)
 118from airbyte_ops_mcp.telemetry import track_regression_test
 119from airbyte_ops_mcp.tier_cache import resolve_workspace
 120
 121# Path to connectors directory within the airbyte repo
 122CONNECTORS_SUBDIR = Path("airbyte-integrations") / "connectors"
 123
 124# Create the cloud sub-app
 125cloud_app = App(name="cloud", help="Airbyte Cloud operations.")
 126app.command(cloud_app)
 127
 128# Create the connector sub-app under cloud
 129connector_app = App(
 130    name="connector", help="Deployed connector operations in Airbyte Cloud."
 131)
 132cloud_app.command(connector_app)
 133
 134# Create the db sub-app under cloud
 135db_app = App(name="db", help="Database operations for Airbyte Cloud Prod DB Replica.")
 136cloud_app.command(db_app)
 137
 138# Create the connection sub-app under cloud
 139connection_app = App(name="connection", help="Connection operations in Airbyte Cloud.")
 140cloud_app.command(connection_app)
 141
 142# Create the state sub-app under connection
 143state_app = App(name="state", help="Connection state operations in Airbyte Cloud.")
 144connection_app.command(state_app)
 145
 146# Create the catalog sub-app under connection
 147catalog_app = App(
 148    name="catalog", help="Connection catalog operations in Airbyte Cloud."
 149)
 150connection_app.command(catalog_app)
 151
 152# Create the logs sub-app under cloud
 153logs_app = App(name="logs", help="GCP Cloud Logging operations for Airbyte Cloud.")
 154cloud_app.command(logs_app)
 155
 156
 157@db_app.command(name="start-proxy")
 158def start_proxy(
 159    port: Annotated[
 160        int,
 161        Parameter(help="Port for the Cloud SQL Proxy to listen on."),
 162    ] = DEFAULT_CLOUD_SQL_PROXY_PORT,
 163    daemon: Annotated[
 164        bool,
 165        Parameter(
 166            help="Run as daemon in background (default). Use --no-daemon for foreground."
 167        ),
 168    ] = True,
 169) -> None:
 170    """Start the Cloud SQL Proxy for database access.
 171
 172    This command starts the Cloud SQL Auth Proxy to enable connections to the
 173    Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.
 174
 175    By default, runs as a daemon (background process). Use --no-daemon to run in
 176    foreground mode where you can see logs and stop with Ctrl+C.
 177
 178    Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable,
 179    which should contain the service account JSON credentials.
 180
 181    After starting the proxy, set these environment variables to use database tools:
 182        export USE_CLOUD_SQL_PROXY=1
 183        export DB_PORT={port}
 184
 185    Example:
 186        airbyte-ops cloud db start-proxy
 187        airbyte-ops cloud db start-proxy --port 15432
 188        airbyte-ops cloud db start-proxy --no-daemon
 189    """
 190    # Check if proxy is already running on the requested port (idempotency)
 191    try:
 192        with socket.create_connection(("127.0.0.1", port), timeout=0.5):
 193            # Something is already listening on this port
 194            pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 195            pid_info = ""
 196            if pid_file.exists():
 197                pid_info = f" (PID: {pid_file.read_text().strip()})"
 198            print_success(
 199                f"Cloud SQL Proxy is already running on port {port}{pid_info}"
 200            )
 201            print_success("")
 202            print_success("To use database tools, set these environment variables:")
 203            print_success("  export USE_CLOUD_SQL_PROXY=1")
 204            print_success(f"  export DB_PORT={port}")
 205            return
 206    except (OSError, TimeoutError, ConnectionRefusedError):
 207        pass  # Port not in use, proceed with starting proxy
 208
 209    # Check if cloud-sql-proxy is installed
 210    proxy_path = shutil.which("cloud-sql-proxy")
 211    if not proxy_path:
 212        exit_with_error(
 213            "cloud-sql-proxy not found in PATH. "
 214            "Install it from: https://cloud.google.com/sql/docs/mysql/sql-proxy"
 215        )
 216
 217    # Get credentials from environment
 218    creds_json = os.getenv(ENV_GCP_PROD_DB_ACCESS_CREDENTIALS)
 219    if not creds_json:
 220        exit_with_error(
 221            f"{ENV_GCP_PROD_DB_ACCESS_CREDENTIALS} environment variable is not set. "
 222            "This should contain the GCP service account JSON credentials."
 223        )
 224
 225    # Build the command using --json-credentials to avoid writing to disk
 226    cmd = [
 227        proxy_path,
 228        CLOUD_SQL_INSTANCE,
 229        f"--port={port}",
 230        f"--json-credentials={creds_json}",
 231    ]
 232
 233    print_success(f"Starting Cloud SQL Proxy on port {port}...")
 234    print_success(f"Instance: {CLOUD_SQL_INSTANCE}")
 235    print_success("")
 236    print_success("To use database tools, set these environment variables:")
 237    print_success("  export USE_CLOUD_SQL_PROXY=1")
 238    print_success(f"  export DB_PORT={port}")
 239    print_success("")
 240
 241    if daemon:
 242        # Run in background (daemon mode) with log file for diagnostics
 243        log_file_path = Path("/tmp/airbyte-cloud-sql-proxy.log")
 244        log_file = log_file_path.open("ab")
 245        process = subprocess.Popen(
 246            cmd,
 247            stdout=subprocess.DEVNULL,
 248            stderr=log_file,
 249            start_new_session=True,
 250        )
 251
 252        # Brief wait to verify the process started successfully
 253        time.sleep(0.5)
 254        if process.poll() is not None:
 255            # Process exited immediately - read any error output
 256            log_file.close()
 257            error_output = ""
 258            if log_file_path.exists():
 259                error_output = log_file_path.read_text()[-1000:]  # Last 1000 chars
 260            exit_with_error(
 261                f"Cloud SQL Proxy failed to start (exit code: {process.returncode}).\n"
 262                f"Check logs at {log_file_path}\n"
 263                f"Recent output: {error_output}"
 264            )
 265
 266        # Write PID to file for stop-proxy command
 267        pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 268        pid_file.write_text(str(process.pid))
 269        print_success(f"Cloud SQL Proxy started as daemon (PID: {process.pid})")
 270        print_success(f"Logs: {log_file_path}")
 271        print_success("To stop: airbyte-ops cloud db stop-proxy")
 272    else:
 273        # Run in foreground - replace current process
 274        # Signals (Ctrl+C) will be handled directly by the cloud-sql-proxy process
 275        print_success("Running in foreground. Press Ctrl+C to stop the proxy.")
 276        print_success("")
 277        os.execv(proxy_path, cmd)
 278
 279
 280@db_app.command(name="stop-proxy")
 281def stop_proxy() -> None:
 282    """Stop the Cloud SQL Proxy daemon.
 283
 284    This command stops a Cloud SQL Proxy that was started with 'start-proxy'.
 285    It reads the PID from the PID file and sends a SIGTERM signal to stop the process.
 286
 287    Example:
 288        airbyte-ops cloud db stop-proxy
 289    """
 290    pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 291
 292    if not pid_file.exists():
 293        exit_with_error(
 294            f"PID file not found at {CLOUD_SQL_PROXY_PID_FILE}. "
 295            "No Cloud SQL Proxy daemon appears to be running."
 296        )
 297
 298    pid_str = pid_file.read_text().strip()
 299    if not pid_str.isdigit():
 300        pid_file.unlink()
 301        exit_with_error(f"Invalid PID in {CLOUD_SQL_PROXY_PID_FILE}: {pid_str}")
 302
 303    pid = int(pid_str)
 304
 305    # Check if process is still running
 306    try:
 307        os.kill(pid, 0)  # Signal 0 just checks if process exists
 308    except ProcessLookupError:
 309        pid_file.unlink()
 310        print_success(
 311            f"Cloud SQL Proxy (PID: {pid}) is not running. Cleaned up PID file."
 312        )
 313        return
 314    except PermissionError:
 315        exit_with_error(f"Permission denied to check process {pid}.")
 316
 317    # Send SIGTERM to stop the process
 318    try:
 319        os.kill(pid, signal.SIGTERM)
 320        print_success(f"Sent SIGTERM to Cloud SQL Proxy (PID: {pid}).")
 321    except ProcessLookupError:
 322        print_success(f"Cloud SQL Proxy (PID: {pid}) already stopped.")
 323    except PermissionError:
 324        exit_with_error(f"Permission denied to stop process {pid}.")
 325
 326    # Clean up PID file
 327    pid_file.unlink(missing_ok=True)
 328    print_success("Cloud SQL Proxy stopped.")
 329
 330
 331@connector_app.command(name="get-version-info")
 332def get_version_info(
 333    workspace_id: Annotated[
 334        str,
 335        Parameter(help="The Airbyte Cloud workspace ID."),
 336    ],
 337    connector_id: Annotated[
 338        str,
 339        Parameter(help="The ID of the deployed connector (source or destination)."),
 340    ],
 341    connector_type: Annotated[
 342        Literal["source", "destination"],
 343        Parameter(help="The type of connector."),
 344    ],
 345) -> None:
 346    """Get the current version information for a deployed connector."""
 347    result = get_connector_version_info(
 348        auth=_resolve_cli_cloud_auth(),
 349        workspace_id=workspace_id,
 350        actor_id=connector_id,
 351        actor_type=connector_type,
 352    )
 353    print_json(result.model_dump())
 354
 355
 356_OverrideLevel = Literal["actor", "workspace", "organization"]
 357_TierFilter = Literal["TIER_0", "TIER_1", "TIER_2", "ALL"]
 358
 359
 360def _resolve_cli_cloud_auth() -> ResolvedCloudAuth:
 361    """Resolve Airbyte Cloud auth from environment variables for CLI use.
 362
 363    Mirrors the priority order the MCP server uses:
 364
 365    1. `AIRBYTE_CLOUD_BEARER_TOKEN`
 366    2. `AIRBYTE_CLOUD_CLIENT_ID` + `AIRBYTE_CLOUD_CLIENT_SECRET`
 367
 368    Raises a CLI error via `exit_with_error` if no usable credentials are present.
 369    """
 370    bearer_token = os.environ.get("AIRBYTE_CLOUD_BEARER_TOKEN")
 371    if bearer_token:
 372        return ResolvedCloudAuth(bearer_token=bearer_token)
 373
 374    client_id = os.environ.get("AIRBYTE_CLOUD_CLIENT_ID")
 375    client_secret = os.environ.get("AIRBYTE_CLOUD_CLIENT_SECRET")
 376    if client_id and client_secret:
 377        return ResolvedCloudAuth(client_id=client_id, client_secret=client_secret)
 378
 379    exit_with_error(
 380        "Missing Airbyte Cloud credentials. Set AIRBYTE_CLOUD_BEARER_TOKEN, or "
 381        "both AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET."
 382    )
 383
 384
 385def _validate_override_scope_args(
 386    *,
 387    override_level: _OverrideLevel,
 388    workspace_id: str | None,
 389    organization_id: str | None,
 390    connector_id: str | None,
 391    connector_type: Literal["source", "destination"] | None,
 392    actor_definition_id: str | None,
 393) -> None:
 394    """Validate scope-specific arguments for `set/clear-version-override`.
 395
 396    Calls `exit_with_error` (which terminates the process) if the supplied
 397    arguments are inconsistent with the requested `override_level`.
 398    """
 399    if override_level == "actor":
 400        missing = [
 401            name
 402            for name, value in (
 403                ("--workspace-id", workspace_id),
 404                ("--connector-id", connector_id),
 405                ("--connector-type", connector_type),
 406            )
 407            if not value
 408        ]
 409        if missing:
 410            exit_with_error(
 411                "Override level 'actor' requires " + ", ".join(missing) + "."
 412            )
 413        if actor_definition_id is not None:
 414            exit_with_error(
 415                "Override level 'actor' must not be combined with "
 416                "--actor-definition-id."
 417            )
 418        if organization_id is not None:
 419            exit_with_error(
 420                "Override level 'actor' must not be combined with --organization-id."
 421            )
 422        return
 423
 424    if override_level == "workspace":
 425        missing = [
 426            name
 427            for name, value in (
 428                ("--workspace-id", workspace_id),
 429                ("--actor-definition-id", actor_definition_id),
 430            )
 431            if not value
 432        ]
 433        if missing:
 434            exit_with_error(
 435                "Override level 'workspace' requires " + ", ".join(missing) + "."
 436            )
 437        if connector_id is not None or connector_type is not None:
 438            exit_with_error(
 439                "Override level 'workspace' must not be combined with "
 440                "--connector-id or --connector-type."
 441            )
 442        if organization_id is not None:
 443            exit_with_error(
 444                "Override level 'workspace' must not be combined with "
 445                "--organization-id."
 446            )
 447        return
 448
 449    # override_level == "organization"
 450    missing = [
 451        name
 452        for name, value in (
 453            ("--organization-id", organization_id),
 454            ("--actor-definition-id", actor_definition_id),
 455        )
 456        if not value
 457    ]
 458    if missing:
 459        exit_with_error(
 460            "Override level 'organization' requires " + ", ".join(missing) + "."
 461        )
 462    if connector_id is not None or connector_type is not None:
 463        exit_with_error(
 464            "Override level 'organization' must not be combined with "
 465            "--connector-id or --connector-type."
 466        )
 467    if workspace_id is not None:
 468        exit_with_error(
 469            "Override level 'organization' must not be combined with --workspace-id."
 470        )
 471
 472
 473def _dispatch_version_override(
 474    *,
 475    override_level: _OverrideLevel,
 476    workspace_id: str | None,
 477    organization_id: str | None,
 478    connector_id: str | None,
 479    connector_type: Literal["source", "destination"] | None,
 480    actor_definition_id: str | None,
 481    version: str | None,
 482    unset: bool,
 483    reason: str | None,
 484    reason_url: str | None,
 485    issue_url: str,
 486    approval_comment_url: str,
 487    ai_agent_session_url: str | None,
 488    customer_tier_filter: _TierFilter,
 489) -> None:
 490    """Route a version-override request through the normalized core helper.
 491
 492    Resolves `--actor-definition-id` to its canonical name and connector type
 493    via the cloud registry for `workspace`/`organization` scopes. Prints the
 494    operation result as JSON.
 495    """
 496    _validate_override_scope_args(
 497        override_level=override_level,
 498        workspace_id=workspace_id,
 499        organization_id=organization_id,
 500        connector_id=connector_id,
 501        connector_type=connector_type,
 502        actor_definition_id=actor_definition_id,
 503    )
 504
 505    auth = _resolve_cli_cloud_auth()
 506
 507    if override_level == "actor":
 508        assert workspace_id is not None
 509        assert connector_id is not None
 510        assert connector_type is not None
 511        assert organization_id is None
 512        try:
 513            ws_resolution = resolve_workspace(workspace_id)
 514            if not ws_resolution.organization_id:
 515                exit_with_error("Could not resolve organization for workspace.")
 516            result = set_cloud_version_override(
 517                auth=auth,
 518                target=VersionOverrideTarget(
 519                    scope="actor",
 520                    organization_id=ws_resolution.organization_id,
 521                    workspace_id=workspace_id,
 522                    actor_id=connector_id,
 523                    connector_type=connector_type,
 524                ),
 525                approval_comment_url=approval_comment_url,
 526                version=version,
 527                unset=unset,
 528                override_reason=reason,
 529                override_reason_reference_url=reason_url,
 530                issue_url=issue_url,
 531                ai_agent_session_url=ai_agent_session_url,
 532                customer_tier_filter=customer_tier_filter,
 533            )
 534        except (PyAirbyteInputError, CloudAuthError) as e:
 535            exit_with_error(str(e))
 536        _print_override_result(result.success, result.message)
 537        print_json(result.model_dump())
 538        return
 539
 540    try:
 541        assert actor_definition_id is not None
 542        canonical_name, resolved_connector_type = (
 543            resolve_definition_id_to_canonical_info(actor_definition_id)
 544        )
 545    except (PyAirbyteInputError, requests.RequestException) as e:
 546        exit_with_error(f"Failed to resolve --actor-definition-id: {e}")
 547    typed_connector_type: Literal["source", "destination"] = (
 548        "source" if resolved_connector_type == "source" else "destination"
 549    )
 550
 551    if override_level == "workspace":
 552        assert workspace_id is not None
 553        try:
 554            ws_resolution = resolve_workspace(workspace_id)
 555            if not ws_resolution.organization_id:
 556                exit_with_error("Could not resolve organization for workspace.")
 557            target = VersionOverrideTarget(
 558                scope="workspace",
 559                organization_id=ws_resolution.organization_id,
 560                workspace_id=workspace_id,
 561                connector_name=canonical_name,
 562                connector_type=typed_connector_type,
 563            )
 564        except PyAirbyteInputError as e:
 565            exit_with_error(str(e))
 566    else:
 567        assert organization_id is not None
 568        target = VersionOverrideTarget(
 569            scope="organization",
 570            organization_id=organization_id,
 571            connector_name=canonical_name,
 572            connector_type=typed_connector_type,
 573        )
 574
 575    try:
 576        result = set_cloud_version_override(
 577            auth=auth,
 578            target=target,
 579            approval_comment_url=approval_comment_url,
 580            version=version,
 581            unset=unset,
 582            override_reason=reason,
 583            override_reason_reference_url=reason_url,
 584            issue_url=issue_url,
 585            ai_agent_session_url=ai_agent_session_url,
 586            customer_tier_filter=customer_tier_filter,
 587        )
 588    except (PyAirbyteInputError, CloudAuthError) as e:
 589        exit_with_error(str(e))
 590    _print_override_result(result.success, result.message)
 591    print_json(result.model_dump())
 592
 593
 594def _print_override_result(success: bool, message: str) -> None:
 595    """Emit an override-operation status message via the shared CLI helpers."""
 596    if success:
 597        print_success(message)
 598    else:
 599        print_error(message)
 600
 601
 602@connector_app.command(name="set-version-override")
 603def set_version_override(
 604    version: Annotated[
 605        str,
 606        Parameter(
 607            help="The semver version string to pin to (e.g., '2.1.5-preview.abc1234')."
 608        ),
 609    ],
 610    reason: Annotated[
 611        str,
 612        Parameter(help="Explanation for the override (min 10 characters)."),
 613    ],
 614    issue_url: Annotated[
 615        str,
 616        Parameter(help="GitHub issue URL providing context for this operation."),
 617    ],
 618    approval_comment_url: Annotated[
 619        str,
 620        Parameter(
 621            help="Slack approval record URL where admin authorized this deployment."
 622        ),
 623    ],
 624    override_level: Annotated[
 625        _OverrideLevel,
 626        Parameter(
 627            help=(
 628                "Scope at which to apply the version override: 'actor' (single "
 629                "deployed connector instance), 'workspace' (all instances of a "
 630                "connector type within a workspace), or 'organization' (all "
 631                "instances across an organization). Defaults to 'actor'."
 632            ),
 633        ),
 634    ] = "actor",
 635    workspace_id: Annotated[
 636        str | None,
 637        Parameter(
 638            help=(
 639                "The Airbyte Cloud workspace ID. Required for 'actor' and "
 640                "'workspace' override levels."
 641            )
 642        ),
 643    ] = None,
 644    organization_id: Annotated[
 645        str | None,
 646        Parameter(
 647            help=(
 648                "The Airbyte Cloud organization ID. Required for "
 649                "'organization' override level."
 650            )
 651        ),
 652    ] = None,
 653    connector_id: Annotated[
 654        str | None,
 655        Parameter(
 656            help=(
 657                "The ID of the deployed connector (source or destination). "
 658                "Required for 'actor' override level."
 659            )
 660        ),
 661    ] = None,
 662    connector_type: Annotated[
 663        Literal["source", "destination"] | None,
 664        Parameter(help="The type of connector. Required for 'actor' override level."),
 665    ] = None,
 666    actor_definition_id: Annotated[
 667        str | None,
 668        Parameter(
 669            help=(
 670                "The connector definition UUID. Required for 'workspace' and "
 671                "'organization' override levels."
 672            )
 673        ),
 674    ] = None,
 675    ai_agent_session_url: Annotated[
 676        str | None,
 677        Parameter(
 678            help="URL to AI agent session driving this operation (for auditability)."
 679        ),
 680    ] = None,
 681    reason_url: Annotated[
 682        str | None,
 683        Parameter(help="Optional URL with more context (e.g., issue link)."),
 684    ] = None,
 685    customer_tier_filter: Annotated[
 686        _TierFilter,
 687        Parameter(
 688            help=(
 689                "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 690                "The operation will be rejected if the actual customer tier does not match. "
 691                "Defaults to 'TIER_2' (non-sensitive customers)."
 692            ),
 693        ),
 694    ] = "TIER_2",
 695) -> None:
 696    """Set a version override for a deployed connector.
 697
 698    Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and
 699    `AIRBYTE_INTERNAL_ADMIN_USER` environment variables.
 700
 701    The `--override-level` flag selects the scope at which the pin is applied:
 702
 703    - `actor` (default): pins a single deployed connector instance. Requires
 704      `--workspace-id`, `--connector-id`, and `--connector-type`.
 705    - `workspace`: pins ALL instances of a connector type within a workspace.
 706      Requires `--workspace-id` and `--actor-definition-id`.
 707    - `organization`: pins ALL instances of a connector type across an
 708      organization. Requires `--organization-id` and `--actor-definition-id`.
 709
 710    The `customer_tier_filter` gates the operation: the call fails if the
 711    actual tier of the target organization does not match. Use `ALL` to
 712    proceed regardless of tier (a warning is shown for sensitive tiers).
 713    """
 714    _dispatch_version_override(
 715        override_level=override_level,
 716        workspace_id=workspace_id,
 717        organization_id=organization_id,
 718        connector_id=connector_id,
 719        connector_type=connector_type,
 720        actor_definition_id=actor_definition_id,
 721        version=version,
 722        unset=False,
 723        reason=reason,
 724        reason_url=reason_url,
 725        issue_url=issue_url,
 726        approval_comment_url=approval_comment_url,
 727        ai_agent_session_url=ai_agent_session_url,
 728        customer_tier_filter=customer_tier_filter,
 729    )
 730
 731
 732@connector_app.command(name="clear-version-override")
 733def clear_version_override(
 734    issue_url: Annotated[
 735        str,
 736        Parameter(help="GitHub issue URL providing context for this operation."),
 737    ],
 738    approval_comment_url: Annotated[
 739        str,
 740        Parameter(
 741            help="Slack approval record URL where admin authorized this deployment."
 742        ),
 743    ],
 744    override_level: Annotated[
 745        _OverrideLevel,
 746        Parameter(
 747            help=(
 748                "Scope at which to clear the version override: 'actor', "
 749                "'workspace', or 'organization'. Defaults to 'actor'."
 750            ),
 751        ),
 752    ] = "actor",
 753    workspace_id: Annotated[
 754        str | None,
 755        Parameter(
 756            help=(
 757                "The Airbyte Cloud workspace ID. Required for 'actor' and "
 758                "'workspace' override levels."
 759            )
 760        ),
 761    ] = None,
 762    organization_id: Annotated[
 763        str | None,
 764        Parameter(
 765            help=(
 766                "The Airbyte Cloud organization ID. Required for "
 767                "'organization' override level."
 768            )
 769        ),
 770    ] = None,
 771    connector_id: Annotated[
 772        str | None,
 773        Parameter(
 774            help=(
 775                "The ID of the deployed connector (source or destination). "
 776                "Required for 'actor' override level."
 777            )
 778        ),
 779    ] = None,
 780    connector_type: Annotated[
 781        Literal["source", "destination"] | None,
 782        Parameter(help="The type of connector. Required for 'actor' override level."),
 783    ] = None,
 784    actor_definition_id: Annotated[
 785        str | None,
 786        Parameter(
 787            help=(
 788                "The connector definition UUID. Required for 'workspace' and "
 789                "'organization' override levels."
 790            )
 791        ),
 792    ] = None,
 793    ai_agent_session_url: Annotated[
 794        str | None,
 795        Parameter(
 796            help="URL to AI agent session driving this operation (for auditability)."
 797        ),
 798    ] = None,
 799    customer_tier_filter: Annotated[
 800        _TierFilter,
 801        Parameter(
 802            help=(
 803                "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 804                "The operation will be rejected if the actual customer tier does not match. "
 805                "Defaults to 'TIER_2' (non-sensitive customers)."
 806            ),
 807        ),
 808    ] = "TIER_2",
 809) -> None:
 810    """Clear a version override from a deployed connector.
 811
 812    Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and
 813    `AIRBYTE_INTERNAL_ADMIN_USER` environment variables.
 814
 815    The `--override-level` flag selects the scope at which the pin is removed:
 816
 817    - `actor` (default): clears a single deployed connector instance pin.
 818      Requires `--workspace-id`, `--connector-id`, and `--connector-type`.
 819    - `workspace`: clears the workspace-level pin for a connector type.
 820      Requires `--workspace-id` and `--actor-definition-id`.
 821    - `organization`: clears the organization-level pin for a connector type.
 822      Requires `--organization-id` and `--actor-definition-id`.
 823    """
 824    _dispatch_version_override(
 825        override_level=override_level,
 826        workspace_id=workspace_id,
 827        organization_id=organization_id,
 828        connector_id=connector_id,
 829        connector_type=connector_type,
 830        actor_definition_id=actor_definition_id,
 831        version=None,
 832        unset=True,
 833        reason=None,
 834        reason_url=None,
 835        issue_url=issue_url,
 836        approval_comment_url=approval_comment_url,
 837        ai_agent_session_url=ai_agent_session_url,
 838        customer_tier_filter=customer_tier_filter,
 839    )
 840
 841
 842def _load_json_file(file_path: Path) -> dict | None:
 843    """Load a JSON file and return its contents.
 844
 845    Returns None if the file doesn't exist or contains invalid JSON.
 846    """
 847    if not file_path.exists():
 848        return None
 849    try:
 850        return json.loads(file_path.read_text())
 851    except json.JSONDecodeError as e:
 852        print_error(f"Failed to parse JSON in file: {file_path}\nError: {e}")
 853        return None
 854
 855
 856def _run_connector_command(
 857    connector_image: str,
 858    command: Command,
 859    output_dir: Path,
 860    target_or_control: TargetOrControl,
 861    config_path: Path | None = None,
 862    catalog_path: Path | None = None,
 863    state_path: Path | None = None,
 864    proxy_url: str | None = None,
 865    enable_debug_logs: bool = False,
 866) -> dict:
 867    """Run a connector command and return results as a dict.
 868
 869    Args:
 870        connector_image: Full connector image name with tag.
 871        command: The Airbyte command to run.
 872        output_dir: Directory to store output files.
 873        target_or_control: Whether this is target or control version.
 874        config_path: Path to connector config JSON file.
 875        catalog_path: Path to configured catalog JSON file.
 876        state_path: Path to state JSON file.
 877        proxy_url: Optional HTTP proxy URL for traffic capture.
 878        enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container.
 879
 880    Returns:
 881        Dictionary with execution results.
 882    """
 883    connector = ConnectorUnderTest.from_image_name(connector_image, target_or_control)
 884
 885    config = _load_json_file(config_path) if config_path else None
 886    state = _load_json_file(state_path) if state_path else None
 887
 888    configured_catalog = None
 889    if catalog_path and catalog_path.exists():
 890        catalog_json = catalog_path.read_text()
 891        configured_catalog = ConfiguredAirbyteCatalog.parse_raw(catalog_json)
 892
 893    # Pass log level as an environment variable to the connector container
 894    container_env: dict[str, str] = {}
 895    if enable_debug_logs:
 896        container_env["LOG_LEVEL"] = "DEBUG"
 897
 898    execution_inputs = ExecutionInputs(
 899        connector_under_test=connector,
 900        command=command,
 901        output_dir=output_dir,
 902        config=config,
 903        configured_catalog=configured_catalog,
 904        state=state,
 905        environment_variables=container_env or None,
 906    )
 907
 908    runner = ConnectorRunner(execution_inputs, proxy_url=proxy_url)
 909    result = runner.run()
 910
 911    result.save_artifacts(output_dir)
 912
 913    return {
 914        "connector": connector_image,
 915        "command": command.value,
 916        "success": result.success,
 917        "exit_code": result.exit_code,
 918        "stdout_file": str(result.stdout_file_path),
 919        "stderr_file": str(result.stderr_file_path),
 920        "message_counts": {
 921            k.value: v for k, v in result.get_message_count_per_type().items()
 922        },
 923        "record_counts_per_stream": result.get_record_count_per_stream(),
 924    }
 925
 926
 927def _build_connector_image_from_source(
 928    connector_name: str,
 929    repo_root: Path | None = None,
 930    tag: str = "dev",
 931) -> str | None:
 932    """Build a connector image from source code.
 933
 934    Args:
 935        connector_name: Name of the connector (e.g., 'source-pokeapi').
 936        repo_root: Optional path to the airbyte repo root. If not provided,
 937            will attempt to auto-detect from current directory.
 938        tag: Tag to apply to the built image (default: 'dev').
 939
 940    Returns:
 941        The full image name with tag if successful, None if build fails.
 942    """
 943    if not verify_docker_installation():
 944        print_error("Docker is not installed or not running")
 945        return None
 946
 947    try:
 948        connector_directory = find_connector_root_from_name(connector_name)
 949    except FileNotFoundError:
 950        if repo_root:
 951            connector_directory = repo_root / CONNECTORS_SUBDIR / connector_name
 952            if not connector_directory.exists():
 953                print_error(f"Connector directory not found: {connector_directory}")
 954                return None
 955        else:
 956            print_error(
 957                f"Could not find connector '{connector_name}'. "
 958                "Try providing --repo-root to specify the airbyte repo location."
 959            )
 960            return None
 961
 962    metadata_file_path = connector_directory / "metadata.yaml"
 963    if not metadata_file_path.exists():
 964        print_error(f"metadata.yaml not found at {metadata_file_path}")
 965        return None
 966
 967    metadata = MetadataFile.from_file(metadata_file_path)
 968    print_success(f"Building image for connector: {connector_name}")
 969
 970    built_image = build_connector_image(
 971        connector_name=connector_name,
 972        connector_directory=connector_directory,
 973        metadata=metadata,
 974        tag=tag,
 975        no_verify=False,
 976    )
 977    print_success(f"Successfully built image: {built_image}")
 978    return built_image
 979
 980
 981def _fetch_control_image_from_metadata(connector_name: str) -> str | None:
 982    """Fetch the current released connector image from metadata.yaml on main branch.
 983
 984    This fetches the connector's metadata.yaml from the airbyte monorepo's master branch
 985    and extracts the dockerRepository and dockerImageTag to construct the control image.
 986
 987    Args:
 988        connector_name: The connector name (e.g., 'source-github').
 989
 990    Returns:
 991        The full connector image with tag (e.g., 'airbyte/source-github:1.0.0'),
 992        or None if the metadata could not be fetched or parsed.
 993    """
 994    metadata_url = (
 995        f"https://raw.githubusercontent.com/airbytehq/airbyte/master/"
 996        f"airbyte-integrations/connectors/{connector_name}/metadata.yaml"
 997    )
 998    response = requests.get(metadata_url, timeout=30)
 999    if not response.ok:
1000        print_error(
1001            f"Failed to fetch metadata for {connector_name}: "
1002            f"HTTP {response.status_code} from {metadata_url}"
1003        )
1004        return None
1005
1006    metadata = yaml.safe_load(response.text)
1007    if not isinstance(metadata, dict):
1008        print_error(f"Invalid metadata format for {connector_name}: expected dict")
1009        return None
1010
1011    data = metadata.get("data", {})
1012    docker_repository = data.get("dockerRepository")
1013    docker_image_tag = data.get("dockerImageTag")
1014
1015    if not docker_repository or not docker_image_tag:
1016        print_error(
1017            f"Could not find dockerRepository/dockerImageTag in metadata for {connector_name}"
1018        )
1019        return None
1020
1021    return f"{docker_repository}:{docker_image_tag}"
1022
1023
1024def _run_with_optional_http_metrics(
1025    connector_image: str,
1026    command: Command,
1027    output_dir: Path,
1028    target_or_control: TargetOrControl,
1029    enable_http_metrics: bool,
1030    config_path: Path | None,
1031    catalog_path: Path | None,
1032    state_path: Path | None,
1033    enable_debug_logs: bool = False,
1034) -> dict:
1035    """Run a connector command with optional HTTP metrics capture.
1036
1037    When enable_http_metrics is True, starts mitmproxy to capture HTTP traffic.
1038    If mitmproxy fails to start, falls back to running without metrics.
1039
1040    Args:
1041        connector_image: Full connector image name with tag.
1042        command: The Airbyte command to run.
1043        output_dir: Directory to store output files.
1044        target_or_control: Whether this is target or control version.
1045        enable_http_metrics: Whether to capture HTTP metrics via mitmproxy.
1046        config_path: Path to connector config JSON file.
1047        catalog_path: Path to configured catalog JSON file.
1048        state_path: Path to state JSON file.
1049        enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container.
1050
1051    Returns:
1052        Dictionary with execution results, optionally including http_metrics.
1053    """
1054    if not enable_http_metrics:
1055        return _run_connector_command(
1056            connector_image=connector_image,
1057            command=command,
1058            output_dir=output_dir,
1059            target_or_control=target_or_control,
1060            config_path=config_path,
1061            catalog_path=catalog_path,
1062            state_path=state_path,
1063            enable_debug_logs=enable_debug_logs,
1064        )
1065
1066    with MitmproxyManager.start(output_dir) as session:
1067        if session is None:
1068            print_error("Mitmproxy unavailable, running without HTTP metrics")
1069            return _run_connector_command(
1070                connector_image=connector_image,
1071                command=command,
1072                output_dir=output_dir,
1073                target_or_control=target_or_control,
1074                config_path=config_path,
1075                catalog_path=catalog_path,
1076                state_path=state_path,
1077                enable_debug_logs=enable_debug_logs,
1078            )
1079
1080        print_success(f"Started mitmproxy on {session.proxy_url}")
1081        result = _run_connector_command(
1082            connector_image=connector_image,
1083            command=command,
1084            output_dir=output_dir,
1085            target_or_control=target_or_control,
1086            config_path=config_path,
1087            catalog_path=catalog_path,
1088            state_path=state_path,
1089            proxy_url=session.proxy_url,
1090            enable_debug_logs=enable_debug_logs,
1091        )
1092
1093        http_metrics = parse_http_dump(session.dump_file_path)
1094        result["http_metrics"] = {
1095            "flow_count": http_metrics.flow_count,
1096            "duplicate_flow_count": http_metrics.duplicate_flow_count,
1097        }
1098        print_success(
1099            f"Captured {http_metrics.flow_count} HTTP flows "
1100            f"({http_metrics.duplicate_flow_count} duplicates)"
1101        )
1102        return result
1103
1104
1105@connector_app.command(name="regression-test")
1106def regression_test(
1107    skip_compare: Annotated[
1108        bool,
1109        Parameter(
1110            help="If True, skip comparison and run single-version tests only. "
1111            "If False (default), run comparison tests (target vs control)."
1112        ),
1113    ] = False,
1114    test_image: Annotated[
1115        str | None,
1116        Parameter(
1117            help="Test connector image with tag (e.g., airbyte/source-github:1.0.0). "
1118            "This is the image under test - in comparison mode, it's compared against control_image."
1119        ),
1120    ] = None,
1121    control_image: Annotated[
1122        str | None,
1123        Parameter(
1124            help="Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). "
1125            "Ignored if `skip_compare=True`."
1126        ),
1127    ] = None,
1128    connector_name: Annotated[
1129        str | None,
1130        Parameter(
1131            help="Connector name to build image from source (e.g., 'source-pokeapi'). "
1132            "If provided, builds the image locally with tag 'dev'. "
1133            "For comparison tests (default), this builds the target image. "
1134            "For single-version tests (skip_compare=True), this builds the test image."
1135        ),
1136    ] = None,
1137    repo_root: Annotated[
1138        str | None,
1139        Parameter(
1140            help="Path to the airbyte repo root. Required if connector_name is provided "
1141            "and the repo cannot be auto-detected."
1142        ),
1143    ] = None,
1144    command: Annotated[
1145        Literal["spec", "check", "discover", "read"],
1146        Parameter(help="The Airbyte command to run."),
1147    ] = "check",
1148    connection_id: Annotated[
1149        str | None,
1150        Parameter(
1151            help="Airbyte Cloud connection ID to fetch config/catalog from. "
1152            "Mutually exclusive with config-path/catalog-path. "
1153            "If provided, test_image/control_image can be auto-detected."
1154        ),
1155    ] = None,
1156    config_path: Annotated[
1157        str | None,
1158        Parameter(help="Path to the connector config JSON file."),
1159    ] = None,
1160    catalog_path: Annotated[
1161        str | None,
1162        Parameter(help="Path to the configured catalog JSON file (required for read)."),
1163    ] = None,
1164    state_path: Annotated[
1165        str | None,
1166        Parameter(help="Path to the state JSON file (optional for read)."),
1167    ] = None,
1168    output_dir: Annotated[
1169        str,
1170        Parameter(help="Directory to store test artifacts."),
1171    ] = "/tmp/regression_test_artifacts",
1172    enable_http_metrics: Annotated[
1173        bool,
1174        Parameter(
1175            help="Capture HTTP traffic metrics via mitmproxy (experimental). "
1176            "Requires mitmdump to be installed. Only used in comparison mode."
1177        ),
1178    ] = False,
1179    selected_streams: Annotated[
1180        str | None,
1181        Parameter(
1182            help="Comma-separated list of stream names to include in the read. "
1183            "Only these streams will be included in the configured catalog. "
1184            "This is useful to limit data volume by testing only specific streams."
1185        ),
1186    ] = None,
1187    enable_debug_logs: Annotated[
1188        bool,
1189        Parameter(
1190            help="Enable debug-level logging for regression test output. "
1191            "Also passed as `LOG_LEVEL=DEBUG` to the connector Docker container."
1192        ),
1193    ] = False,
1194    with_state: Annotated[
1195        bool | None,
1196        Parameter(
1197            negative="--no-state",
1198            help="Fetch and pass the connection's current state to the read command, "
1199            "producing a warm read instead of a cold read. Defaults to `True` when "
1200            "`--connection-id` is provided, `False` otherwise. Has no effect unless "
1201            "the command is `read`. Ignored when `--state-path` is explicitly provided.",
1202        ),
1203    ] = None,
1204) -> None:
1205    """Run regression tests on connectors.
1206
1207    This command supports two modes:
1208
1209    Comparison mode (skip_compare=False, default):
1210        Runs the specified Airbyte protocol command against both the target (new)
1211        and control (baseline) connector versions, then compares the results.
1212        This helps identify regressions between versions.
1213
1214    Single-version mode (skip_compare=True):
1215        Runs the specified Airbyte protocol command against a single connector
1216        and validates the output. No comparison is performed.
1217
1218    Results are written to the output directory and to GitHub Actions outputs
1219    if running in CI.
1220
1221    You can provide the test image in three ways:
1222    1. --test-image: Use a pre-built image from Docker registry
1223    2. --connector-name: Build the image locally from source code
1224    3. --connection-id: Auto-detect from an Airbyte Cloud connection
1225
1226    You can provide config/catalog either via file paths OR via a connection_id
1227    that fetches them from Airbyte Cloud.
1228    """
1229    # Configure debug logging for the regression test harness when requested
1230    if enable_debug_logs:
1231        logging.basicConfig(
1232            level=logging.DEBUG,
1233            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
1234            force=True,
1235        )
1236
1237    output_path = Path(output_dir)
1238    output_path.mkdir(parents=True, exist_ok=True)
1239
1240    cmd = Command(command)
1241
1242    config_file: Path | None = None
1243    catalog_file: Path | None = None
1244    state_file = Path(state_path) if state_path else None
1245
1246    # Resolve the test image (used in both single-version and comparison modes)
1247    resolved_test_image: str | None = test_image
1248    resolved_control_image: str | None = control_image
1249
1250    # Validate conflicting parameters
1251    # Single-version mode: reject comparison-specific parameters
1252    if skip_compare and control_image:
1253        write_github_output("success", False)
1254        write_github_output(
1255            "error", "Cannot specify control_image with skip_compare=True"
1256        )
1257        exit_with_error(
1258            "Cannot specify --control-image with --skip-compare. "
1259            "Control image is only used in comparison mode."
1260        )
1261
1262    # If connector_name is provided, build the image from source
1263    if connector_name:
1264        if resolved_test_image:
1265            write_github_output("success", False)
1266            write_github_output(
1267                "error", "Cannot specify both test_image and connector_name"
1268            )
1269            exit_with_error("Cannot specify both --test-image and --connector-name")
1270
1271        repo_root_path = Path(repo_root) if repo_root else None
1272        built_image = _build_connector_image_from_source(
1273            connector_name=connector_name,
1274            repo_root=repo_root_path,
1275            tag="dev",
1276        )
1277        if not built_image:
1278            write_github_output("success", False)
1279            write_github_output("error", f"Failed to build image for {connector_name}")
1280            exit_with_error(f"Failed to build image for {connector_name}")
1281        resolved_test_image = built_image
1282
1283    if connection_id:
1284        if config_path or catalog_path:
1285            write_github_output("success", False)
1286            write_github_output(
1287                "error", "Cannot specify both connection_id and file paths"
1288            )
1289            exit_with_error(
1290                "Cannot specify both connection_id and config_path/catalog_path"
1291            )
1292
1293        print_success(f"Fetching config/catalog from connection: {connection_id}")
1294        connection_data = fetch_connection_data(connection_id)
1295
1296        # Check if we should retrieve unmasked secrets
1297        if should_use_secret_retriever():
1298            print_success(
1299                "USE_CONNECTION_SECRET_RETRIEVER enabled - enriching config with unmasked secrets..."
1300            )
1301            try:
1302                connection_data = enrich_config_with_secrets(
1303                    connection_data,
1304                    retrieval_reason="Regression test with USE_CONNECTION_SECRET_RETRIEVER=true",
1305                )
1306                print_success("Successfully retrieved unmasked secrets from database")
1307            except SecretRetrievalError as e:
1308                write_github_output("success", False)
1309                write_github_output("error", str(e))
1310                exit_with_error(
1311                    f"{e}\n\n"
1312                    f"This connection cannot be used for regression testing. "
1313                    f"Please use a connection from a non-EU workspace, or use GSM-based "
1314                    f"integration test credentials instead (by omitting --connection-id)."
1315                )
1316
1317        # Resolve with_state default: True when connection_id is provided
1318        resolved_with_state = with_state if with_state is not None else True
1319
1320        if resolved_with_state and not state_file and command == "read":
1321            print_success("Fetching connection state for warm read...")
1322            try:
1323                connection_data.state = fetch_connection_state(connection_data)
1324                if connection_data.state:
1325                    print_success(
1326                        f"Fetched state with {len(connection_data.state)} stream(s)"
1327                    )
1328                else:
1329                    connection_data.state = None
1330                    print_success(
1331                        "No state available for this connection (initial sync)"
1332                    )
1333            except Exception as exc:
1334                print_error(f"Failed to fetch connection state: {exc}")
1335                print_success("Falling back to cold read (no state)")
1336
1337        config_file, catalog_file, state_file_from_conn = save_connection_data_to_files(
1338            connection_data, output_path / "connection_data"
1339        )
1340
1341        if state_file_from_conn and not state_file:
1342            state_file = state_file_from_conn
1343
1344        print_success(
1345            f"Fetched config for source: {connection_data.source_name} "
1346            f"with {len(connection_data.stream_names)} streams"
1347        )
1348
1349        # Auto-detect test/control image from connection if not provided
1350        if not resolved_test_image and connection_data.connector_image:
1351            resolved_test_image = connection_data.connector_image
1352            print_success(f"Auto-detected test image: {resolved_test_image}")
1353
1354        if (
1355            not skip_compare
1356            and not resolved_control_image
1357            and connection_data.connector_image
1358        ):
1359            resolved_control_image = connection_data.connector_image
1360            print_success(f"Auto-detected control image: {resolved_control_image}")
1361    elif config_path:
1362        config_file = Path(config_path)
1363        catalog_file = Path(catalog_path) if catalog_path else None
1364    elif connector_name:
1365        # Fallback: fetch integration test secrets from GSM using PyAirbyte API
1366        print_success(
1367            f"No connection_id or config_path provided. "
1368            f"Attempting to fetch integration test config from GSM for {connector_name}..."
1369        )
1370        gsm_config = get_first_config_from_secrets(connector_name)
1371        if gsm_config:
1372            # Write config to a temp file (not in output_path to avoid artifact upload)
1373            gsm_config_dir = Path(
1374                tempfile.mkdtemp(prefix=f"gsm-config-{connector_name}-")
1375            )
1376            gsm_config_dir.chmod(0o700)
1377            gsm_config_file = gsm_config_dir / "config.json"
1378            gsm_config_file.write_text(json.dumps(gsm_config, indent=2))
1379            gsm_config_file.chmod(0o600)
1380            config_file = gsm_config_file
1381            # Use catalog_path if provided (e.g., generated from discover output)
1382            catalog_file = Path(catalog_path) if catalog_path else None
1383            print_success(
1384                f"Fetched integration test config from GSM for {connector_name}"
1385            )
1386        else:
1387            print_error(
1388                f"Failed to fetch integration test config from GSM for {connector_name}."
1389            )
1390            config_file = None
1391            # Use catalog_path if provided (e.g., generated from discover output)
1392            catalog_file = Path(catalog_path) if catalog_path else None
1393    else:
1394        config_file = None
1395        catalog_file = Path(catalog_path) if catalog_path else None
1396
1397    # Auto-detect control_image from metadata.yaml if connector_name is provided (comparison mode only)
1398    if not skip_compare and not resolved_control_image and connector_name:
1399        resolved_control_image = _fetch_control_image_from_metadata(connector_name)
1400        if resolved_control_image:
1401            print_success(
1402                f"Auto-detected control image from metadata.yaml: {resolved_control_image}"
1403            )
1404
1405    # Validate that we have the required images
1406    if not resolved_test_image:
1407        write_github_output("success", False)
1408        write_github_output("error", "No test image specified")
1409        exit_with_error(
1410            "You must provide one of the following: a test_image, a connector_name "
1411            "to build the image from source, or a connection_id to auto-detect the image."
1412        )
1413
1414    if not skip_compare and not resolved_control_image:
1415        write_github_output("success", False)
1416        write_github_output("error", "No control image specified")
1417        exit_with_error(
1418            "You must provide one of the following: a control_image, a connection_id "
1419            "for a connection that has an associated connector image, or a connector_name "
1420            "to auto-detect the control image from the airbyte repo's metadata.yaml."
1421        )
1422
1423    # Pull images if they weren't just built locally
1424    # If connector_name was provided, we just built the test image locally
1425    if not connector_name and not ensure_image_available(resolved_test_image):
1426        write_github_output("success", False)
1427        write_github_output("error", f"Failed to pull image: {resolved_test_image}")
1428        exit_with_error(f"Failed to pull test image: {resolved_test_image}")
1429
1430    if (
1431        not skip_compare
1432        and resolved_control_image
1433        and not ensure_image_available(resolved_control_image)
1434    ):
1435        write_github_output("success", False)
1436        write_github_output("error", f"Failed to pull image: {resolved_control_image}")
1437        exit_with_error(
1438            f"Failed to pull control connector image: {resolved_control_image}"
1439        )
1440
1441    # Apply selected_streams filter to catalog if requested
1442    if selected_streams and catalog_file:
1443        streams_set = {s.strip() for s in selected_streams.split(",") if s.strip()}
1444        if streams_set:
1445            print_success(
1446                f"Filtering catalog to {len(streams_set)} selected streams: "
1447                f"{', '.join(sorted(streams_set))}"
1448            )
1449            filter_configured_catalog_file(catalog_file, streams_set)
1450
1451    # Upgrade READ → READ_WITH_STATE when a state file is available
1452    if cmd == Command.READ and state_file:
1453        cmd = Command.READ_WITH_STATE
1454        print_success("State available — using read-with-state (warm read)")
1455
1456    # Track telemetry for the regression test
1457    # Extract version from image tag (e.g., "airbyte/source-github:1.0.0" -> "1.0.0")
1458    target_version = (
1459        resolved_test_image.rsplit(":", 1)[-1]
1460        if ":" in resolved_test_image
1461        else "unknown"
1462    )
1463    control_version = None
1464    if resolved_control_image and ":" in resolved_control_image:
1465        control_version = resolved_control_image.rsplit(":", 1)[-1]
1466
1467    # Get tester identity from environment (GitHub Actions sets GITHUB_ACTOR)
1468    tester = os.getenv("GITHUB_ACTOR") or os.getenv("USER")
1469
1470    track_regression_test(
1471        user_id=tester,
1472        connector_image=resolved_test_image,
1473        command=command,
1474        target_version=target_version,
1475        control_version=control_version,
1476        additional_properties={
1477            "connection_id": connection_id,
1478            "skip_compare": skip_compare,
1479            "with_state": cmd == Command.READ_WITH_STATE,
1480        },
1481    )
1482
1483    # Execute the appropriate mode
1484    if skip_compare:
1485        # Single-version mode: run only the connector image
1486        result = _run_connector_command(
1487            connector_image=resolved_test_image,
1488            command=cmd,
1489            output_dir=output_path,
1490            target_or_control=TargetOrControl.TARGET,
1491            config_path=config_file,
1492            catalog_path=catalog_file,
1493            state_path=state_file,
1494            enable_debug_logs=enable_debug_logs,
1495        )
1496
1497        print_json(result)
1498
1499        write_github_outputs(
1500            {
1501                "success": result["success"],
1502                "connector": resolved_test_image,
1503                "command": command,
1504                "exit_code": result["exit_code"],
1505            }
1506        )
1507
1508        write_test_summary(
1509            connector_image=resolved_test_image,
1510            test_type="regression-test",
1511            success=result["success"],
1512            results={
1513                "command": command,
1514                "exit_code": result["exit_code"],
1515                "output_dir": output_dir,
1516            },
1517        )
1518
1519        # Generate report.md with detailed metrics
1520        report_path = generate_single_version_report(
1521            connector_image=resolved_test_image,
1522            command=command,
1523            result=result,
1524            output_dir=output_path,
1525        )
1526        print_success(f"Generated report: {report_path}")
1527
1528        # Write report to GITHUB_STEP_SUMMARY (if env var exists)
1529        write_github_summary(report_path.read_text())
1530
1531        if result["success"]:
1532            print_success(
1533                f"Single-version regression test passed for {resolved_test_image}"
1534            )
1535        else:
1536            print_error(
1537                f"Single-version regression test failed for {resolved_test_image}"
1538            )
1539    else:
1540        # Comparison mode: run both target and control images
1541        target_output = output_path / "target"
1542        control_output = output_path / "control"
1543
1544        target_result = _run_with_optional_http_metrics(
1545            connector_image=resolved_test_image,
1546            command=cmd,
1547            output_dir=target_output,
1548            target_or_control=TargetOrControl.TARGET,
1549            enable_http_metrics=enable_http_metrics,
1550            config_path=config_file,
1551            catalog_path=catalog_file,
1552            state_path=state_file,
1553            enable_debug_logs=enable_debug_logs,
1554        )
1555
1556        control_result = _run_with_optional_http_metrics(
1557            connector_image=resolved_control_image,  # type: ignore[arg-type]
1558            command=cmd,
1559            output_dir=control_output,
1560            target_or_control=TargetOrControl.CONTROL,
1561            enable_http_metrics=enable_http_metrics,
1562            config_path=config_file,
1563            catalog_path=catalog_file,
1564            state_path=state_file,
1565            enable_debug_logs=enable_debug_logs,
1566        )
1567
1568        both_succeeded = target_result["success"] and control_result["success"]
1569        regression_detected = target_result["success"] != control_result["success"]
1570
1571        combined_result = {
1572            "target": target_result,
1573            "control": control_result,
1574            "both_succeeded": both_succeeded,
1575            "regression_detected": regression_detected,
1576        }
1577
1578        print_json(combined_result)
1579
1580        both_failed = not target_result["success"] and not control_result["success"]
1581
1582        write_github_outputs(
1583            {
1584                "success": not regression_detected,
1585                "target_image": resolved_test_image,
1586                "control_image": resolved_control_image,
1587                "command": command,
1588                "target_exit_code": target_result["exit_code"],
1589                "control_exit_code": control_result["exit_code"],
1590                "regression_detected": regression_detected,
1591                "both_failed": both_failed,
1592            }
1593        )
1594
1595        write_json_output("regression_report", combined_result)
1596
1597        report_path = generate_regression_report(
1598            target_image=resolved_test_image,
1599            control_image=resolved_control_image,  # type: ignore[arg-type]
1600            command=command,
1601            target_result=target_result,
1602            control_result=control_result,
1603            output_dir=output_path,
1604        )
1605        print_success(f"Generated regression report: {report_path}")
1606
1607        # Write report to GITHUB_STEP_SUMMARY (if env var exists)
1608        write_github_summary(report_path.read_text())
1609
1610        if regression_detected:
1611            print_error(
1612                f"Regression detected between {resolved_test_image} and {resolved_control_image}"
1613            )
1614        elif both_succeeded:
1615            print_success(
1616                f"Regression test passed for {resolved_test_image} vs {resolved_control_image}"
1617            )
1618        else:
1619            # Both versions failed, but no regression between them was detected; treat this as a
1620            # test environment issue (e.g., expired credentials, transient API errors, rate limiting).
1621            # Exit successfully since no regression between versions was detected.
1622            print_warning(
1623                f"Both versions failed for {resolved_test_image} vs {resolved_control_image}. "
1624                "No regression detected. This may indicate expired credentials or a transient API issue."
1625            )
1626
1627
1628@connector_app.command(name="fetch-connection-config")
1629def fetch_connection_config_cmd(
1630    connection_id: Annotated[
1631        str,
1632        Parameter(help="The UUID of the Airbyte Cloud connection."),
1633    ],
1634    output_path: Annotated[
1635        str | None,
1636        Parameter(
1637            help="Path to output file or directory. "
1638            "If directory, writes connection-<id>-config.json inside it. "
1639            "Default: platform temp directory (e.g. /tmp/connection-<id>-config.json)"
1640        ),
1641    ] = None,
1642    with_secrets: Annotated[
1643        bool,
1644        Parameter(
1645            name="--with-secrets",
1646            negative="--no-secrets",
1647            help="If set, fetches unmasked secrets from the internal database. "
1648            "Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or `gcloud auth application-default login`. "
1649            "Must be used with --oc-issue-url.",
1650        ),
1651    ] = False,
1652    oc_issue_url: Annotated[
1653        str | None,
1654        Parameter(
1655            help="OC issue URL for audit logging. Required when using --with-secrets."
1656        ),
1657    ] = None,
1658) -> None:
1659    """Fetch connection configuration from Airbyte Cloud to a local file.
1660
1661    This command retrieves the source configuration for a given connection ID
1662    and writes it to a JSON file. When `--output-path` is omitted the file is
1663    written to the platform temp directory to avoid accidentally committing
1664    secrets to a git repository.
1665
1666    Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and
1667    AIRBYTE_CLOUD_CLIENT_SECRET environment variables.
1668
1669    When --with-secrets is specified, the command fetches unmasked secrets from
1670    the internal database using the connection-retriever. This additionally requires:
1671    - An OC issue URL for audit logging (--oc-issue-url)
1672    - GCP credentials via `GCP_PROD_DB_ACCESS_CREDENTIALS` env var or `gcloud auth application-default login`
1673    - Cloud SQL Python Connector access to the Prod DB replica.
1674    """
1675    path = Path(output_path) if output_path else None
1676    result = fetch_connection_config(
1677        connection_id=connection_id,
1678        output_path=path,
1679        with_secrets=with_secrets,
1680        oc_issue_url=oc_issue_url,
1681    )
1682    if result.success:
1683        print_success(result.message)
1684    else:
1685        print_error(result.message)
1686    print_json(result.model_dump())
1687
1688
1689def _read_json_input(
1690    json_str: str | None,
1691    input_file: str | None,
1692) -> dict:
1693    """Read JSON from a positional arg, --input file, or STDIN (in that priority)."""
1694    if json_str is not None:
1695        return json.loads(json_str)
1696    if input_file is not None:
1697        return json.loads(Path(input_file).read_text())
1698    if not sys.stdin.isatty():
1699        return json.loads(sys.stdin.read())
1700    exit_with_error(
1701        "No JSON input provided. Pass it as a positional argument, "
1702        "via --input <file>, or pipe to STDIN."
1703    )
1704    raise SystemExit(1)  # unreachable, but satisfies type checker
1705
1706
1707@state_app.command(name="get")
1708def get_connection_state(
1709    connection_id: Annotated[
1710        str,
1711        Parameter(help="The connection ID (UUID) to fetch state for."),
1712    ],
1713    stream_name: Annotated[
1714        str | None,
1715        Parameter(
1716            help="Optional stream name to filter state for a single stream.",
1717            name="--stream",
1718        ),
1719    ] = None,
1720    stream_namespace: Annotated[
1721        str | None,
1722        Parameter(
1723            help="Optional stream namespace to narrow the stream filter.",
1724            name="--namespace",
1725        ),
1726    ] = None,
1727    output: Annotated[
1728        str | None,
1729        Parameter(
1730            help="Path to write the state JSON output to a file.",
1731            name="--output",
1732        ),
1733    ] = None,
1734) -> None:
1735    """Get the current state for an Airbyte Cloud connection."""
1736    workspace = CloudWorkspace.from_env()
1737    conn = workspace.get_connection(connection_id)
1738
1739    if stream_name is not None:
1740        stream_state = conn.get_stream_state(
1741            stream_name=stream_name,
1742            stream_namespace=stream_namespace,
1743        )
1744        result = {
1745            "connection_id": connection_id,
1746            "stream_name": stream_name,
1747            "stream_namespace": stream_namespace,
1748            "stream_state": stream_state,
1749        }
1750    else:
1751        result = conn.dump_raw_state()
1752
1753    json_output = json.dumps(result, indent=2, default=str)
1754    if output is not None:
1755        Path(output).write_text(json_output + "\n")
1756        print(f"State written to {output}", file=sys.stderr)
1757    else:
1758        print(json_output)
1759
1760
1761@state_app.command(name="set")
1762def set_connection_state(
1763    connection_id: Annotated[
1764        str,
1765        Parameter(help="The connection ID (UUID) to update state for."),
1766    ],
1767    state_json: Annotated[
1768        str | None,
1769        Parameter(
1770            help="The connection state as a JSON string. When --stream is used, "
1771            'this is just the stream\'s state blob (e.g., \'{"cursor": "2024-01-01"}\').'
1772            " Otherwise, must include 'stateType', 'connectionId', and the appropriate "
1773            "state field. Can also be provided via --input or STDIN."
1774        ),
1775    ] = None,
1776    stream_name: Annotated[
1777        str | None,
1778        Parameter(
1779            help="Optional stream name to update state for a single stream only.",
1780            name="--stream",
1781        ),
1782    ] = None,
1783    stream_namespace: Annotated[
1784        str | None,
1785        Parameter(
1786            help="Optional stream namespace to identify the stream.",
1787            name="--namespace",
1788        ),
1789    ] = None,
1790    input_file: Annotated[
1791        str | None,
1792        Parameter(
1793            help="Path to a JSON file containing the state to set.",
1794            name="--input",
1795        ),
1796    ] = None,
1797) -> None:
1798    """Set the state for an Airbyte Cloud connection.
1799
1800    State JSON can be provided as a positional argument, via --input <file>,
1801    or piped through STDIN.
1802
1803    Uses the safe variant that prevents updates while a sync is running.
1804    When --stream is provided, only that stream's state is updated within
1805    the existing connection state.
1806    """
1807    parsed_state = _read_json_input(state_json, input_file)
1808    workspace = CloudWorkspace.from_env()
1809    conn = workspace.get_connection(connection_id)
1810
1811    if stream_name is not None:
1812        conn.set_stream_state(
1813            stream_name=stream_name,
1814            state_blob_dict=parsed_state,
1815            stream_namespace=stream_namespace,
1816        )
1817    else:
1818        conn.import_raw_state(parsed_state)
1819
1820    result = conn.dump_raw_state()
1821    json_output = json.dumps(result, indent=2, default=str)
1822    print(json_output)
1823
1824
1825@state_app.command(name="reset")
1826def reset_stream_state(
1827    connection_id: Annotated[
1828        str,
1829        Parameter(help="The connection ID (UUID) to update state for."),
1830    ],
1831    stream_name: Annotated[
1832        str,
1833        Parameter(
1834            help="The configured stream name whose state should be reset.",
1835            name="--stream",
1836        ),
1837    ],
1838    stream_namespace: Annotated[
1839        str | None,
1840        Parameter(
1841            help="Optional stream namespace to identify the stream.",
1842            name="--namespace",
1843        ),
1844    ] = None,
1845) -> None:
1846    """Reset a configured stream's state so the next sync full-refreshes it.
1847
1848    Uses the safe variant that prevents updates while a sync is running.
1849    Returns a restorable `previous_state_backup` in raw Config API format.
1850    """
1851    workspace = CloudWorkspace.from_env()
1852    conn = workspace.get_connection(connection_id)
1853
1854    try:
1855        result = reset_cloud_stream_state(
1856            conn,
1857            stream_name=stream_name,
1858            stream_namespace=stream_namespace,
1859        )
1860    except PyAirbyteInputError as e:
1861        exit_with_error(str(e))
1862
1863    print(json.dumps(result.model_dump(), indent=2, default=str))
1864
1865
1866@catalog_app.command(name="get")
1867def get_connection_catalog(
1868    connection_id: Annotated[
1869        str,
1870        Parameter(help="The connection ID (UUID) to fetch catalog for."),
1871    ],
1872    output: Annotated[
1873        str | None,
1874        Parameter(
1875            help="Path to write the catalog JSON output to a file.",
1876            name="--output",
1877        ),
1878    ] = None,
1879) -> None:
1880    """Get the configured catalog for an Airbyte Cloud connection."""
1881    workspace = CloudWorkspace.from_env()
1882    conn = workspace.get_connection(connection_id)
1883    result = conn.dump_raw_catalog()
1884    if result is None:
1885        exit_with_error("No configured catalog found for this connection.")
1886        raise SystemExit(1)  # unreachable, but satisfies type checker
1887
1888    json_output = json.dumps(result, indent=2, default=str)
1889    if output is not None:
1890        Path(output).write_text(json_output + "\n")
1891        print(f"Catalog written to {output}", file=sys.stderr)
1892    else:
1893        print(json_output)
1894
1895
1896@catalog_app.command(name="set")
1897def set_connection_catalog(
1898    connection_id: Annotated[
1899        str,
1900        Parameter(help="The connection ID (UUID) to update catalog for."),
1901    ],
1902    catalog_json: Annotated[
1903        str | None,
1904        Parameter(
1905            help="The configured catalog as a JSON string. "
1906            "Can also be provided via --input or STDIN."
1907        ),
1908    ] = None,
1909    input_file: Annotated[
1910        str | None,
1911        Parameter(
1912            help="Path to a JSON file containing the catalog to set.",
1913            name="--input",
1914        ),
1915    ] = None,
1916) -> None:
1917    """Set the configured catalog for an Airbyte Cloud connection.
1918
1919    Catalog JSON can be provided as a positional argument, via --input <file>,
1920    or piped through STDIN.
1921
1922    WARNING: This replaces the entire configured catalog.
1923    """
1924    parsed_catalog = _read_json_input(catalog_json, input_file)
1925    workspace = CloudWorkspace.from_env()
1926    conn = workspace.get_connection(connection_id)
1927    conn.import_raw_catalog(parsed_catalog)
1928
1929    result = conn.dump_raw_catalog()
1930    if result is None:
1931        exit_with_error("Failed to retrieve catalog after import.")
1932        raise SystemExit(1)  # unreachable, but satisfies type checker
1933
1934    json_output = json.dumps(result, indent=2, default=str)
1935    print(json_output)
1936
1937
1938@logs_app.command(name="lookup-cloud-backend-error")
1939def lookup_cloud_backend_error(
1940    error_id: Annotated[
1941        str,
1942        Parameter(
1943            help=(
1944                "The error ID (UUID) to search for. This is typically returned "
1945                "in API error responses as {'errorId': '...'}"
1946            )
1947        ),
1948    ],
1949    lookback_days: Annotated[
1950        int,
1951        Parameter(help="Number of days to look back in logs."),
1952    ] = 7,
1953    min_severity_filter: Annotated[
1954        GCPSeverity | None,
1955        Parameter(
1956            help="Optional minimum severity level to filter logs.",
1957        ),
1958    ] = None,
1959    raw: Annotated[
1960        bool,
1961        Parameter(help="Output raw JSON instead of formatted text."),
1962    ] = False,
1963) -> None:
1964    """Look up error details from GCP Cloud Logging by error ID.
1965
1966    When an Airbyte Cloud API returns an error response with only an error ID
1967    (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command
1968    fetches the full stack trace and error details from GCP Cloud Logging.
1969
1970    Requires GCP credentials with Logs Viewer role on the target project.
1971    Set up credentials with: gcloud auth application-default login
1972    """
1973    print(f"Searching for error ID: {error_id}", file=sys.stderr)
1974    print(f"Lookback days: {lookback_days}", file=sys.stderr)
1975    if min_severity_filter:
1976        print(f"Severity filter: {min_severity_filter}", file=sys.stderr)
1977    print(file=sys.stderr)
1978
1979    result = fetch_error_logs(
1980        error_id=error_id,
1981        lookback_days=lookback_days,
1982        min_severity_filter=min_severity_filter,
1983    )
1984
1985    if raw:
1986        print_json(result.model_dump())
1987        return
1988
1989    print(f"Found {result.total_entries_found} log entries", file=sys.stderr)
1990    print(file=sys.stderr)
1991
1992    if result.payloads:
1993        for i, payload in enumerate(result.payloads):
1994            print(f"=== Log Group {i + 1} ===")
1995            print(f"Timestamp: {payload.timestamp}")
1996            print(f"Severity: {payload.severity}")
1997            if payload.resource.labels.pod_name:
1998                print(f"Pod: {payload.resource.labels.pod_name}")
1999            print(f"Lines: {payload.num_log_lines}")
2000            print()
2001            print(payload.message)
2002            print()
2003    elif result.entries:
2004        print("No grouped payloads, showing raw entries:", file=sys.stderr)
2005        for entry in result.entries:
2006            print(f"[{entry.timestamp}] {entry.severity}: {entry.payload}")
2007    else:
2008        print_error("No log entries found for this error ID.")