airbyte_ops_mcp.cli.cloud

CLI commands for Airbyte Cloud operations.

Commands:

airbyte-ops cloud connector get-version-info - Get connector version info airbyte-ops cloud connector set-version-override - Set connector version override airbyte-ops cloud connector clear-version-override - Clear connector version override airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file airbyte-ops cloud organization search - Search organizations by name/email airbyte-ops cloud organization info - Get organization info airbyte-ops cloud workspace search - Search workspaces by name/slug or email domain airbyte-ops cloud workspace info - Get workspace info

CLI reference

The commands below are regenerated by poe docs-generate via cyclopts's programmatic docs API; see docs/generate_cli.py.

airbyte-ops cloud COMMAND

Airbyte Cloud operations.

Commands:

  • connection: Connection operations in Airbyte Cloud.
  • connector: Deployed connector operations in Airbyte Cloud.
  • db: Database operations for Airbyte Cloud Prod DB Replica.
  • logs: GCP Cloud Logging operations for Airbyte Cloud.
  • organization: Organization operations in Airbyte Cloud.
  • workspace: Workspace operations in Airbyte Cloud.

airbyte-ops cloud connector

Deployed connector operations in Airbyte Cloud.

airbyte-ops cloud connector get-version-info

airbyte-ops cloud connector get-version-info WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE

Get the current version information for a deployed connector.

Parameters:

  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]
  • CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]

airbyte-ops cloud connector set-version-override

airbyte-ops cloud connector set-version-override VERSION REASON ISSUE-URL APPROVAL-COMMENT-URL [ARGS]

Set a version override for a deployed connector.

Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.

The --override-level flag selects the scope at which the pin is applied:

  • actor (default): pins a single deployed connector instance. Requires --workspace-id, --connector-id, and --connector-type.
  • workspace: pins ALL instances of a connector type within a workspace. Requires --workspace-id and --actor-definition-id.
  • organization: pins ALL instances of a connector type across an organization. Requires --organization-id and --actor-definition-id.

The customer_tier_filter gates the operation: the call fails if the actual tier of the target organization does not match. Use ALL to proceed regardless of tier (a warning is shown for sensitive tiers).

Parameters:

  • VERSION, --version: The semver version string to pin to (e.g., '2.1.5-preview.abc1234'). [required]
  • REASON, --reason: Explanation for the override (min 10 characters). [required]
  • ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]
  • APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]
  • OVERRIDE-LEVEL, --override-level: Scope at which to apply the version override: 'actor' (single deployed connector instance), 'workspace' (all instances of a connector type within a workspace), or 'organization' (all instances across an organization). Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]
  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.
  • ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.
  • CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]
  • ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.
  • AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).
  • REASON-URL, --reason-url: Optional URL with more context (e.g., issue link).
  • CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]

airbyte-ops cloud connector clear-version-override

airbyte-ops cloud connector clear-version-override ISSUE-URL APPROVAL-COMMENT-URL [ARGS]

Clear a version override from a deployed connector.

Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.

The --override-level flag selects the scope at which the pin is removed:

  • actor (default): clears a single deployed connector instance pin. Requires --workspace-id, --connector-id, and --connector-type.
  • workspace: clears the workspace-level pin for a connector type. Requires --workspace-id and --actor-definition-id.
  • organization: clears the organization-level pin for a connector type. Requires --organization-id and --actor-definition-id.

Parameters:

  • ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]
  • APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]
  • OVERRIDE-LEVEL, --override-level: Scope at which to clear the version override: 'actor', 'workspace', or 'organization'. Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]
  • WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.
  • ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.
  • CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.
  • CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]
  • ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.
  • AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).
  • CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]

airbyte-ops cloud connector regression-test

airbyte-ops cloud connector regression-test [ARGS]

Run regression tests on connectors.

This command supports two modes:

Comparison mode (skip_compare=False, default): Runs the specified Airbyte protocol command against both the target (new) and control (baseline) connector versions, then compares the results. This helps identify regressions between versions.

Single-version mode (skip_compare=True): Runs the specified Airbyte protocol command against a single connector and validates the output. No comparison is performed.

Results are written to the output directory and to GitHub Actions outputs if running in CI.

You can provide the test image in three ways:

  1. --test-image: Use a pre-built image from Docker registry
  2. --connector-name: Build the image locally from source code
  3. --connection-id: Auto-detect from an Airbyte Cloud connection

You can provide config/catalog either via file paths OR via a connection_id that fetches them from Airbyte Cloud.

Parameters:

  • SKIP-COMPARE, --skip-compare, --no-skip-compare: If True, skip comparison and run single-version tests only. If False (default), run comparison tests (target vs control). [default: False]
  • TEST-IMAGE, --test-image: Test connector image with tag (e.g., airbyte/source-github:1.0.0). This is the image under test - in comparison mode, it's compared against control_image.
  • CONTROL-IMAGE, --control-image: Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). Ignored if skip_compare=True.
  • CONNECTOR-NAME, --connector-name: Connector name to build image from source (e.g., 'source-pokeapi'). If provided, builds the image locally with tag 'dev'. For comparison tests (default), this builds the target image. For single-version tests (skip_compare=True), this builds the test image.
  • REPO-ROOT, --repo-root: Path to the airbyte repo root. Required if connector_name is provided and the repo cannot be auto-detected.
  • COMMAND, --command: The Airbyte command to run. [choices: spec, check, discover, read] [default: check]
  • CONNECTION-ID, --connection-id: Airbyte Cloud connection ID to fetch config/catalog from. Mutually exclusive with config-path/catalog-path. If provided, test_image/control_image can be auto-detected.
  • CONFIG-PATH, --config-path: Path to the connector config JSON file.
  • CATALOG-PATH, --catalog-path: Path to the configured catalog JSON file (required for read).
  • STATE-PATH, --state-path: Path to the state JSON file (optional for read).
  • OUTPUT-DIR, --output-dir: Directory to store test artifacts. [default: /tmp/regression_test_artifacts]
  • ENABLE-HTTP-METRICS, --enable-http-metrics, --no-enable-http-metrics: Capture HTTP traffic metrics via mitmproxy (experimental). Requires mitmdump to be installed. Only used in comparison mode. [default: False]
  • SELECTED-STREAMS, --selected-streams: Comma-separated list of stream names to include in the read. Only these streams will be included in the configured catalog. This is useful to limit data volume by testing only specific streams.
  • ENABLE-DEBUG-LOGS, --enable-debug-logs, --no-enable-debug-logs: Enable debug-level logging for regression test output. Also passed as LOG_LEVEL=DEBUG to the connector Docker container. [default: False]
  • WITH-STATE, --with-state, --no-state: Fetch and pass the connection's current state to the read command, producing a warm read instead of a cold read. Defaults to True when --connection-id is provided, False otherwise. Has no effect unless the command is read. Ignored when --state-path is explicitly provided.

airbyte-ops cloud connector fetch-connection-config

airbyte-ops cloud connector fetch-connection-config CONNECTION-ID [ARGS]

Fetch connection configuration from Airbyte Cloud to a local file.

This command retrieves the source configuration for a given connection ID and writes it to a JSON file. When --output-path is omitted the file is written to the platform temp directory to avoid accidentally committing secrets to a git repository.

Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET environment variables.

When --with-secrets is specified, the command fetches unmasked secrets from the internal database using the connection-retriever. This additionally requires:

  • An OC issue URL for audit logging (--oc-issue-url)
  • GCP credentials via GCP_PROD_DB_ACCESS_CREDENTIALS env var or gcloud auth application-default login
  • Cloud SQL Python Connector access to the Prod DB replica.

Parameters:

  • CONNECTION-ID, --connection-id: The UUID of the Airbyte Cloud connection. [required]
  • OUTPUT-PATH, --output-path: Path to output file or directory. If directory, writes connection--config.json inside it. Default: platform temp directory (e.g. /tmp/connection--config.json)
  • WITH-SECRETS, --with-secrets, --no-secrets: If set, fetches unmasked secrets from the internal database. Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or gcloud auth application-default login. Must be used with --oc-issue-url. [default: False]
  • OC-ISSUE-URL, --oc-issue-url: OC issue URL for audit logging. Required when using --with-secrets.

airbyte-ops cloud connector rollout

Connector rollout operations.

Commands:

  • autopilot: AutoPilot rollout orchestration commands.
  • list: List connector rollouts from the production database.
airbyte-ops cloud connector rollout autopilot

AutoPilot rollout orchestration commands.

airbyte-ops cloud connector rollout autopilot auto-start
airbyte-ops cloud connector rollout autopilot auto-start [ARGS]

Start INITIALIZED rollouts that have autopilot auto-start enabled.

Queries the prod DB for rollouts in initialized state, checks the connector's autopilotConfig.autoStart gate, and starts eligible rollouts via the Cloud Config API.

Parameters:

  • CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').
  • DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-advance
airbyte-ops cloud connector rollout autopilot auto-advance [ARGS]

Advance IN_PROGRESS rollouts within their current tier.

Finds rollouts with automated strategy that haven't reached their target percentage and advances them based on the configured strategy pacing (fast/slow/default).

Parameters:

  • CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').
  • DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-promote
airbyte-ops cloud connector rollout autopilot auto-promote [ARGS]

Promote rollouts when at 100% of current tier.

Checks the autopilotConfig.autoPromoteStages gate. Currently finalizes rollouts at ALL tier as succeeded (GA promotion). Cross-tier promotion (TIER_2 -> TIER_1 -> ALL) is not yet implemented.

Parameters:

  • CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').
  • DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-triage-failed
airbyte-ops cloud connector rollout autopilot auto-triage-failed [ARGS]

Triage failed rollouts: log all failures, check unpin eligibility.

Finds rollouts in errored or paused state. Logs every failure unconditionally. Checks safe-to-downgrade eligibility per unsafeDowngrades. Actor-level unpinning is not yet implemented.

Parameters:

  • CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').
  • DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-supersede
airbyte-ops cloud connector rollout autopilot auto-supersede [ARGS]

Cancel older rollouts when a newer RC exists for the same connector.

Detects connectors with multiple active rollouts at different RC versions. Cancels older rollouts with retain_pins_on_cancellation=True so pinned actors remain on the old version until the new rollout progressively advances to include them.

Parameters:

  • CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').
  • DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-rollback-failed
airbyte-ops cloud connector rollout autopilot auto-rollback-failed [ARGS]

Full rollback/cancel of failed rollouts when safe to downgrade.

Finds rollouts in errored state. Checks the unsafeDowngrades gate. If the version is safe to downgrade, calls finalize_connector_rollout with state failed_rolled_back to cancel the entire rollout.

Parameters:

  • CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').
  • DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout list
airbyte-ops cloud connector rollout list [OPTIONS]

List connector rollouts from the production database.

Parameters:

  • --include-inactive, --no-include-inactive: Include terminal (canceled/succeeded/errored) rollouts. Requires --limit. [default: False]
  • --limit: Maximum number of rollouts to return. Required when --include-inactive is set. [default: 0]

airbyte-ops cloud db

Database operations for Airbyte Cloud Prod DB Replica.

airbyte-ops cloud db start-proxy

airbyte-ops cloud db start-proxy [ARGS]

Start the Cloud SQL Proxy for database access.

This command starts the Cloud SQL Auth Proxy to enable connections to the Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.

By default, runs as a daemon (background process). Use --no-daemon to run in foreground mode where you can see logs and stop with Ctrl+C.

Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, which should contain the service account JSON credentials.

After starting the proxy, set these environment variables to use database tools: export USE_CLOUD_SQL_PROXY=1 export DB_PORT={port}

Parameters:

  • PORT, --port: Port for the Cloud SQL Proxy to listen on. [default: 15432]
  • DAEMON, --daemon, --no-daemon: Run as daemon in background (default). Use --no-daemon for foreground. [default: True]

airbyte-ops cloud db stop-proxy

airbyte-ops cloud db stop-proxy

Stop the Cloud SQL Proxy daemon.

This command stops a Cloud SQL Proxy that was started with 'start-proxy'. It reads the PID from the PID file and sends a SIGTERM signal to stop the process.

airbyte-ops cloud connection

Connection operations in Airbyte Cloud.

airbyte-ops cloud connection state

Connection state operations in Airbyte Cloud.

Commands:

  • get: Get the current state for an Airbyte Cloud connection.
  • reset: Reset a configured stream's state so the next sync full-refreshes it.
  • set: Set the state for an Airbyte Cloud connection.
airbyte-ops cloud connection state get
airbyte-ops cloud connection state get CONNECTION-ID [ARGS]

Get the current state for an Airbyte Cloud connection.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch state for. [required]
  • STREAM, --stream: Optional stream name to filter state for a single stream.
  • NAMESPACE, --namespace: Optional stream namespace to narrow the stream filter.
  • OUTPUT, --output: Path to write the state JSON output to a file.
airbyte-ops cloud connection state set
airbyte-ops cloud connection state set CONNECTION-ID [ARGS]

Set the state for an Airbyte Cloud connection.

State JSON can be provided as a positional argument, via --input , or piped through STDIN.

Uses the safe variant that prevents updates while a sync is running. When --stream is provided, only that stream's state is updated within the existing connection state.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]
  • STATE-JSON, --state-json: The connection state as a JSON string. When --stream is used, this is just the stream's state blob (e.g., '{"cursor": "2024-01-01"}'). Otherwise, must include 'stateType', 'connectionId', and the appropriate state field. Can also be provided via --input or STDIN.
  • STREAM, --stream: Optional stream name to update state for a single stream only.
  • NAMESPACE, --namespace: Optional stream namespace to identify the stream.
  • INPUT, --input: Path to a JSON file containing the state to set.
airbyte-ops cloud connection state reset
airbyte-ops cloud connection state reset CONNECTION-ID STREAM [ARGS]

Reset a configured stream's state so the next sync full-refreshes it.

Uses the safe variant that prevents updates while a sync is running. Returns a restorable previous_state_backup in raw Config API format.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]
  • STREAM, --stream: The configured stream name whose state should be reset. [required]
  • NAMESPACE, --namespace: Optional stream namespace to identify the stream.

airbyte-ops cloud connection catalog

Connection catalog operations in Airbyte Cloud.

Commands:

  • get: Get the configured catalog for an Airbyte Cloud connection.
  • set: Set the configured catalog for an Airbyte Cloud connection.
airbyte-ops cloud connection catalog get
airbyte-ops cloud connection catalog get CONNECTION-ID [ARGS]

Get the configured catalog for an Airbyte Cloud connection.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch catalog for. [required]
  • OUTPUT, --output: Path to write the catalog JSON output to a file.
airbyte-ops cloud connection catalog set
airbyte-ops cloud connection catalog set CONNECTION-ID [ARGS]

Set the configured catalog for an Airbyte Cloud connection.

Catalog JSON can be provided as a positional argument, via --input , or piped through STDIN.

WARNING: This replaces the entire configured catalog.

Parameters:

  • CONNECTION-ID, --connection-id: The connection ID (UUID) to update catalog for. [required]
  • CATALOG-JSON, --catalog-json: The configured catalog as a JSON string. Can also be provided via --input or STDIN.
  • INPUT, --input: Path to a JSON file containing the catalog to set.

airbyte-ops cloud logs

GCP Cloud Logging operations for Airbyte Cloud.

airbyte-ops cloud logs lookup-cloud-backend-error

airbyte-ops cloud logs lookup-cloud-backend-error ERROR-ID [ARGS]

Look up error details from GCP Cloud Logging by error ID.

When an Airbyte Cloud API returns an error response with only an error ID (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command fetches the full stack trace and error details from GCP Cloud Logging.

Requires GCP credentials with Logs Viewer role on the target project. Set up credentials with: gcloud auth application-default login

Parameters:

  • ERROR-ID, --error-id: The error ID (UUID) to search for. This is typically returned in API error responses as {'errorId': '...'} [required]
  • LOOKBACK-DAYS, --lookback-days: Number of days to look back in logs. [default: 7]
  • MIN-SEVERITY-FILTER, --min-severity-filter: Optional minimum severity level to filter logs. [choices: debug, info, notice, warning, error, critical, alert, emergency]
  • RAW, --raw, --no-raw: Output raw JSON instead of formatted text. [default: False]

airbyte-ops cloud organization

Organization operations in Airbyte Cloud.

airbyte-ops cloud organization search NAME-CONTAINS [ARGS]

Search organizations by name or email substring.

Parameters:

  • NAME-CONTAINS, --name-contains: Case-insensitive substring to match against organization name or email. [required]
  • LIMIT, --limit: Maximum number of results to return. [default: 20]

airbyte-ops cloud organization info

airbyte-ops cloud organization info ORGANIZATION-ID

Get information about an organization (stub — not yet implemented).

Parameters:

  • ORGANIZATION-ID, --organization-id: The organization UUID. [required]

airbyte-ops cloud workspace

Workspace operations in Airbyte Cloud.

airbyte-ops cloud workspace search [ARGS]

Search workspaces by name substring or email domain.

Parameters:

  • NAME-CONTAINS, --name-contains: Case-insensitive substring to match against workspace name or slug.
  • EMAIL-DOMAIN, --email-domain: Email domain to search for (e.g. 'motherduck.com').
  • LIMIT, --limit: Maximum number of results to return. [default: 100]

airbyte-ops cloud workspace info

airbyte-ops cloud workspace info WORKSPACE-ID

Get information about a workspace.

Parameters:

  • WORKSPACE-ID, --workspace-id: The workspace UUID. [required]
   1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
   2"""CLI commands for Airbyte Cloud operations.
   3
   4Commands:
   5    airbyte-ops cloud connector get-version-info - Get connector version info
   6    airbyte-ops cloud connector set-version-override - Set connector version override
   7    airbyte-ops cloud connector clear-version-override - Clear connector version override
   8    airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison)
   9    airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file
  10    airbyte-ops cloud organization search - Search organizations by name/email
  11    airbyte-ops cloud organization info - Get organization info
  12    airbyte-ops cloud workspace search - Search workspaces by name/slug or email domain
  13    airbyte-ops cloud workspace info - Get workspace info
  14
  15## CLI reference
  16
  17The commands below are regenerated by `poe docs-generate` via cyclopts's
  18programmatic docs API; see `docs/generate_cli.py`.
  19
  20.. include:: ../../../docs/generated/cli/cloud.md
  21   :start-line: 2
  22"""
  23
  24from __future__ import annotations
  25
  26# Hide Python-level members from the pdoc page for this module; the rendered
  27# docs for this CLI group come entirely from the grafted `.. include::` in
  28# the module docstring above.
  29__all__: list[str] = []
  30
  31import json
  32import logging
  33import os
  34import shutil
  35import signal
  36import socket
  37import subprocess
  38import sys
  39import tempfile
  40import time
  41from pathlib import Path
  42from typing import Annotated, Literal
  43
  44import requests
  45import yaml
  46from airbyte.cloud.workspaces import CloudWorkspace
  47from airbyte.exceptions import PyAirbyteInputError
  48from airbyte_cdk.models.connector_metadata import MetadataFile
  49from airbyte_cdk.utils.connector_paths import find_connector_root_from_name
  50from airbyte_cdk.utils.docker import build_connector_image, verify_docker_installation
  51from airbyte_protocol.models import ConfiguredAirbyteCatalog
  52from cyclopts import Parameter
  53from fastmcp_extensions.cli import (
  54    exit_with_error,
  55    print_error,
  56    print_json,
  57    print_success,
  58    print_warning,
  59)
  60
  61from airbyte_ops_mcp.cli._base import App, app
  62from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError
  63from airbyte_ops_mcp.cloud_admin.connection_config import fetch_connection_config
  64from airbyte_ops_mcp.cloud_admin.connection_state import (
  65    reset_stream_state as reset_cloud_stream_state,
  66)
  67from airbyte_ops_mcp.cloud_admin.registry_lookup import (
  68    resolve_definition_id_to_canonical_info,
  69)
  70from airbyte_ops_mcp.cloud_admin.version_overrides import (
  71    ResolvedCloudAuth,
  72    VersionOverrideTarget,
  73    get_connector_version_info,
  74)
  75from airbyte_ops_mcp.cloud_admin.version_overrides import (
  76    set_version_override as set_cloud_version_override,
  77)
  78from airbyte_ops_mcp.constants import (
  79    CLOUD_SQL_INSTANCE,
  80    CLOUD_SQL_PROXY_PID_FILE,
  81    DEFAULT_CLOUD_SQL_PROXY_PORT,
  82    ENV_GCP_PROD_DB_ACCESS_CREDENTIALS,
  83)
  84from airbyte_ops_mcp.gcp_logs import GCPSeverity, fetch_error_logs
  85from airbyte_ops_mcp.prod_db_access.queries import (
  86    query_workspace_info,
  87    query_workspaces_by_email_domain,
  88)
  89from airbyte_ops_mcp.prod_db_access.queries import (
  90    search_organizations as _search_organizations,
  91)
  92from airbyte_ops_mcp.prod_db_access.queries import (
  93    search_workspaces as _search_workspaces,
  94)
  95from airbyte_ops_mcp.regression_tests.cdk_secrets import get_first_config_from_secrets
  96from airbyte_ops_mcp.regression_tests.ci_output import (
  97    generate_regression_report,
  98    generate_single_version_report,
  99    write_github_output,
 100    write_github_outputs,
 101    write_github_summary,
 102    write_json_output,
 103    write_test_summary,
 104)
 105from airbyte_ops_mcp.regression_tests.config_overrides import (
 106    filter_configured_catalog_file,
 107)
 108from airbyte_ops_mcp.regression_tests.connection_fetcher import (
 109    fetch_connection_data,
 110    fetch_connection_state,
 111    save_connection_data_to_files,
 112)
 113from airbyte_ops_mcp.regression_tests.connection_secret_retriever import (
 114    SecretRetrievalError,
 115    enrich_config_with_secrets,
 116    should_use_secret_retriever,
 117)
 118from airbyte_ops_mcp.regression_tests.connector_runner import (
 119    ConnectorRunner,
 120    ensure_image_available,
 121)
 122from airbyte_ops_mcp.regression_tests.http_metrics import (
 123    MitmproxyManager,
 124    parse_http_dump,
 125)
 126from airbyte_ops_mcp.regression_tests.models import (
 127    Command,
 128    ConnectorUnderTest,
 129    ExecutionInputs,
 130    TargetOrControl,
 131)
 132from airbyte_ops_mcp.telemetry import track_regression_test
 133from airbyte_ops_mcp.tier_cache import resolve_workspace
 134
 135# Path to connectors directory within the airbyte repo
 136CONNECTORS_SUBDIR = Path("airbyte-integrations") / "connectors"
 137
 138# Create the cloud sub-app
 139cloud_app = App(name="cloud", help="Airbyte Cloud operations.")
 140app.command(cloud_app)
 141
 142# Create the connector sub-app under cloud
 143connector_app = App(
 144    name="connector", help="Deployed connector operations in Airbyte Cloud."
 145)
 146cloud_app.command(connector_app)
 147
 148# Create the db sub-app under cloud
 149db_app = App(name="db", help="Database operations for Airbyte Cloud Prod DB Replica.")
 150cloud_app.command(db_app)
 151
 152# Create the connection sub-app under cloud
 153connection_app = App(name="connection", help="Connection operations in Airbyte Cloud.")
 154cloud_app.command(connection_app)
 155
 156# Create the state sub-app under connection
 157state_app = App(name="state", help="Connection state operations in Airbyte Cloud.")
 158connection_app.command(state_app)
 159
 160# Create the catalog sub-app under connection
 161catalog_app = App(
 162    name="catalog", help="Connection catalog operations in Airbyte Cloud."
 163)
 164connection_app.command(catalog_app)
 165
 166# Create the logs sub-app under cloud
 167logs_app = App(name="logs", help="GCP Cloud Logging operations for Airbyte Cloud.")
 168cloud_app.command(logs_app)
 169
 170# Create the organization sub-app under cloud
 171organization_app = App(
 172    name="organization", help="Organization operations in Airbyte Cloud."
 173)
 174cloud_app.command(organization_app)
 175
 176# Create the workspace sub-app under cloud
 177workspace_app = App(name="workspace", help="Workspace operations in Airbyte Cloud.")
 178cloud_app.command(workspace_app)
 179
 180
 181@db_app.command(name="start-proxy")
 182def start_proxy(
 183    port: Annotated[
 184        int,
 185        Parameter(help="Port for the Cloud SQL Proxy to listen on."),
 186    ] = DEFAULT_CLOUD_SQL_PROXY_PORT,
 187    daemon: Annotated[
 188        bool,
 189        Parameter(
 190            help="Run as daemon in background (default). Use --no-daemon for foreground."
 191        ),
 192    ] = True,
 193) -> None:
 194    """Start the Cloud SQL Proxy for database access.
 195
 196    This command starts the Cloud SQL Auth Proxy to enable connections to the
 197    Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.
 198
 199    By default, runs as a daemon (background process). Use --no-daemon to run in
 200    foreground mode where you can see logs and stop with Ctrl+C.
 201
 202    Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable,
 203    which should contain the service account JSON credentials.
 204
 205    After starting the proxy, set these environment variables to use database tools:
 206        export USE_CLOUD_SQL_PROXY=1
 207        export DB_PORT={port}
 208
 209    Example:
 210        airbyte-ops cloud db start-proxy
 211        airbyte-ops cloud db start-proxy --port 15432
 212        airbyte-ops cloud db start-proxy --no-daemon
 213    """
 214    # Check if proxy is already running on the requested port (idempotency)
 215    try:
 216        with socket.create_connection(("127.0.0.1", port), timeout=0.5):
 217            # Something is already listening on this port
 218            pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 219            pid_info = ""
 220            if pid_file.exists():
 221                pid_info = f" (PID: {pid_file.read_text().strip()})"
 222            print_success(
 223                f"Cloud SQL Proxy is already running on port {port}{pid_info}"
 224            )
 225            print_success("")
 226            print_success("To use database tools, set these environment variables:")
 227            print_success("  export USE_CLOUD_SQL_PROXY=1")
 228            print_success(f"  export DB_PORT={port}")
 229            return
 230    except (OSError, TimeoutError, ConnectionRefusedError):
 231        pass  # Port not in use, proceed with starting proxy
 232
 233    # Check if cloud-sql-proxy is installed
 234    proxy_path = shutil.which("cloud-sql-proxy")
 235    if not proxy_path:
 236        exit_with_error(
 237            "cloud-sql-proxy not found in PATH. "
 238            "Install it from: https://cloud.google.com/sql/docs/mysql/sql-proxy"
 239        )
 240
 241    # Get credentials from environment
 242    creds_json = os.getenv(ENV_GCP_PROD_DB_ACCESS_CREDENTIALS)
 243    if not creds_json:
 244        exit_with_error(
 245            f"{ENV_GCP_PROD_DB_ACCESS_CREDENTIALS} environment variable is not set. "
 246            "This should contain the GCP service account JSON credentials."
 247        )
 248
 249    # Build the command using --json-credentials to avoid writing to disk
 250    cmd = [
 251        proxy_path,
 252        CLOUD_SQL_INSTANCE,
 253        f"--port={port}",
 254        f"--json-credentials={creds_json}",
 255    ]
 256
 257    print_success(f"Starting Cloud SQL Proxy on port {port}...")
 258    print_success(f"Instance: {CLOUD_SQL_INSTANCE}")
 259    print_success("")
 260    print_success("To use database tools, set these environment variables:")
 261    print_success("  export USE_CLOUD_SQL_PROXY=1")
 262    print_success(f"  export DB_PORT={port}")
 263    print_success("")
 264
 265    if daemon:
 266        # Run in background (daemon mode) with log file for diagnostics
 267        log_file_path = Path("/tmp/airbyte-cloud-sql-proxy.log")
 268        log_file = log_file_path.open("ab")
 269        process = subprocess.Popen(
 270            cmd,
 271            stdout=subprocess.DEVNULL,
 272            stderr=log_file,
 273            start_new_session=True,
 274        )
 275
 276        # Brief wait to verify the process started successfully
 277        time.sleep(0.5)
 278        if process.poll() is not None:
 279            # Process exited immediately - read any error output
 280            log_file.close()
 281            error_output = ""
 282            if log_file_path.exists():
 283                error_output = log_file_path.read_text()[-1000:]  # Last 1000 chars
 284            exit_with_error(
 285                f"Cloud SQL Proxy failed to start (exit code: {process.returncode}).\n"
 286                f"Check logs at {log_file_path}\n"
 287                f"Recent output: {error_output}"
 288            )
 289
 290        # Write PID to file for stop-proxy command
 291        pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 292        pid_file.write_text(str(process.pid))
 293        print_success(f"Cloud SQL Proxy started as daemon (PID: {process.pid})")
 294        print_success(f"Logs: {log_file_path}")
 295        print_success("To stop: airbyte-ops cloud db stop-proxy")
 296    else:
 297        # Run in foreground - replace current process
 298        # Signals (Ctrl+C) will be handled directly by the cloud-sql-proxy process
 299        print_success("Running in foreground. Press Ctrl+C to stop the proxy.")
 300        print_success("")
 301        os.execv(proxy_path, cmd)
 302
 303
 304@db_app.command(name="stop-proxy")
 305def stop_proxy() -> None:
 306    """Stop the Cloud SQL Proxy daemon.
 307
 308    This command stops a Cloud SQL Proxy that was started with 'start-proxy'.
 309    It reads the PID from the PID file and sends a SIGTERM signal to stop the process.
 310
 311    Example:
 312        airbyte-ops cloud db stop-proxy
 313    """
 314    pid_file = Path(CLOUD_SQL_PROXY_PID_FILE)
 315
 316    if not pid_file.exists():
 317        exit_with_error(
 318            f"PID file not found at {CLOUD_SQL_PROXY_PID_FILE}. "
 319            "No Cloud SQL Proxy daemon appears to be running."
 320        )
 321
 322    pid_str = pid_file.read_text().strip()
 323    if not pid_str.isdigit():
 324        pid_file.unlink()
 325        exit_with_error(f"Invalid PID in {CLOUD_SQL_PROXY_PID_FILE}: {pid_str}")
 326
 327    pid = int(pid_str)
 328
 329    # Check if process is still running
 330    try:
 331        os.kill(pid, 0)  # Signal 0 just checks if process exists
 332    except ProcessLookupError:
 333        pid_file.unlink()
 334        print_success(
 335            f"Cloud SQL Proxy (PID: {pid}) is not running. Cleaned up PID file."
 336        )
 337        return
 338    except PermissionError:
 339        exit_with_error(f"Permission denied to check process {pid}.")
 340
 341    # Send SIGTERM to stop the process
 342    try:
 343        os.kill(pid, signal.SIGTERM)
 344        print_success(f"Sent SIGTERM to Cloud SQL Proxy (PID: {pid}).")
 345    except ProcessLookupError:
 346        print_success(f"Cloud SQL Proxy (PID: {pid}) already stopped.")
 347    except PermissionError:
 348        exit_with_error(f"Permission denied to stop process {pid}.")
 349
 350    # Clean up PID file
 351    pid_file.unlink(missing_ok=True)
 352    print_success("Cloud SQL Proxy stopped.")
 353
 354
 355@connector_app.command(name="get-version-info")
 356def get_version_info(
 357    workspace_id: Annotated[
 358        str,
 359        Parameter(help="The Airbyte Cloud workspace ID."),
 360    ],
 361    connector_id: Annotated[
 362        str,
 363        Parameter(help="The ID of the deployed connector (source or destination)."),
 364    ],
 365    connector_type: Annotated[
 366        Literal["source", "destination"],
 367        Parameter(help="The type of connector."),
 368    ],
 369) -> None:
 370    """Get the current version information for a deployed connector."""
 371    result = get_connector_version_info(
 372        auth=_resolve_cli_cloud_auth(),
 373        workspace_id=workspace_id,
 374        actor_id=connector_id,
 375        actor_type=connector_type,
 376    )
 377    print_json(result.model_dump())
 378
 379
 380_OverrideLevel = Literal["actor", "workspace", "organization"]
 381_TierFilter = Literal["TIER_0", "TIER_1", "TIER_2", "ALL"]
 382
 383
 384def _resolve_cli_cloud_auth() -> ResolvedCloudAuth:
 385    """Resolve Airbyte Cloud auth from environment variables for CLI use.
 386
 387    Mirrors the priority order the MCP server uses:
 388
 389    1. `AIRBYTE_CLOUD_BEARER_TOKEN`
 390    2. `AIRBYTE_CLOUD_CLIENT_ID` + `AIRBYTE_CLOUD_CLIENT_SECRET`
 391
 392    Raises a CLI error via `exit_with_error` if no usable credentials are present.
 393    """
 394    bearer_token = os.environ.get("AIRBYTE_CLOUD_BEARER_TOKEN")
 395    if bearer_token:
 396        return ResolvedCloudAuth(bearer_token=bearer_token)
 397
 398    client_id = os.environ.get("AIRBYTE_CLOUD_CLIENT_ID")
 399    client_secret = os.environ.get("AIRBYTE_CLOUD_CLIENT_SECRET")
 400    if client_id and client_secret:
 401        return ResolvedCloudAuth(client_id=client_id, client_secret=client_secret)
 402
 403    exit_with_error(
 404        "Missing Airbyte Cloud credentials. Set AIRBYTE_CLOUD_BEARER_TOKEN, or "
 405        "both AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET."
 406    )
 407
 408
 409def _validate_override_scope_args(
 410    *,
 411    override_level: _OverrideLevel,
 412    workspace_id: str | None,
 413    organization_id: str | None,
 414    connector_id: str | None,
 415    connector_type: Literal["source", "destination"] | None,
 416    actor_definition_id: str | None,
 417) -> None:
 418    """Validate scope-specific arguments for `set/clear-version-override`.
 419
 420    Calls `exit_with_error` (which terminates the process) if the supplied
 421    arguments are inconsistent with the requested `override_level`.
 422    """
 423    if override_level == "actor":
 424        missing = [
 425            name
 426            for name, value in (
 427                ("--workspace-id", workspace_id),
 428                ("--connector-id", connector_id),
 429                ("--connector-type", connector_type),
 430            )
 431            if not value
 432        ]
 433        if missing:
 434            exit_with_error(
 435                "Override level 'actor' requires " + ", ".join(missing) + "."
 436            )
 437        if actor_definition_id is not None:
 438            exit_with_error(
 439                "Override level 'actor' must not be combined with "
 440                "--actor-definition-id."
 441            )
 442        if organization_id is not None:
 443            exit_with_error(
 444                "Override level 'actor' must not be combined with --organization-id."
 445            )
 446        return
 447
 448    if override_level == "workspace":
 449        missing = [
 450            name
 451            for name, value in (
 452                ("--workspace-id", workspace_id),
 453                ("--actor-definition-id", actor_definition_id),
 454            )
 455            if not value
 456        ]
 457        if missing:
 458            exit_with_error(
 459                "Override level 'workspace' requires " + ", ".join(missing) + "."
 460            )
 461        if connector_id is not None or connector_type is not None:
 462            exit_with_error(
 463                "Override level 'workspace' must not be combined with "
 464                "--connector-id or --connector-type."
 465            )
 466        if organization_id is not None:
 467            exit_with_error(
 468                "Override level 'workspace' must not be combined with "
 469                "--organization-id."
 470            )
 471        return
 472
 473    # override_level == "organization"
 474    missing = [
 475        name
 476        for name, value in (
 477            ("--organization-id", organization_id),
 478            ("--actor-definition-id", actor_definition_id),
 479        )
 480        if not value
 481    ]
 482    if missing:
 483        exit_with_error(
 484            "Override level 'organization' requires " + ", ".join(missing) + "."
 485        )
 486    if connector_id is not None or connector_type is not None:
 487        exit_with_error(
 488            "Override level 'organization' must not be combined with "
 489            "--connector-id or --connector-type."
 490        )
 491    if workspace_id is not None:
 492        exit_with_error(
 493            "Override level 'organization' must not be combined with --workspace-id."
 494        )
 495
 496
 497def _dispatch_version_override(
 498    *,
 499    override_level: _OverrideLevel,
 500    workspace_id: str | None,
 501    organization_id: str | None,
 502    connector_id: str | None,
 503    connector_type: Literal["source", "destination"] | None,
 504    actor_definition_id: str | None,
 505    version: str | None,
 506    unset: bool,
 507    reason: str | None,
 508    reason_url: str | None,
 509    issue_url: str,
 510    approval_comment_url: str,
 511    ai_agent_session_url: str | None,
 512    customer_tier_filter: _TierFilter,
 513) -> None:
 514    """Route a version-override request through the normalized core helper.
 515
 516    Resolves `--actor-definition-id` to its canonical name and connector type
 517    via the cloud registry for `workspace`/`organization` scopes. Prints the
 518    operation result as JSON.
 519    """
 520    _validate_override_scope_args(
 521        override_level=override_level,
 522        workspace_id=workspace_id,
 523        organization_id=organization_id,
 524        connector_id=connector_id,
 525        connector_type=connector_type,
 526        actor_definition_id=actor_definition_id,
 527    )
 528
 529    auth = _resolve_cli_cloud_auth()
 530
 531    if override_level == "actor":
 532        assert workspace_id is not None
 533        assert connector_id is not None
 534        assert connector_type is not None
 535        assert organization_id is None
 536        try:
 537            ws_resolution = resolve_workspace(workspace_id)
 538            if not ws_resolution.organization_id:
 539                exit_with_error("Could not resolve organization for workspace.")
 540            result = set_cloud_version_override(
 541                auth=auth,
 542                target=VersionOverrideTarget(
 543                    scope="actor",
 544                    organization_id=ws_resolution.organization_id,
 545                    workspace_id=workspace_id,
 546                    actor_id=connector_id,
 547                    connector_type=connector_type,
 548                ),
 549                approval_comment_url=approval_comment_url,
 550                version=version,
 551                unset=unset,
 552                override_reason=reason,
 553                override_reason_reference_url=reason_url,
 554                issue_url=issue_url,
 555                ai_agent_session_url=ai_agent_session_url,
 556                customer_tier_filter=customer_tier_filter,
 557            )
 558        except (PyAirbyteInputError, CloudAuthError) as e:
 559            exit_with_error(str(e))
 560        _print_override_result(result.success, result.message)
 561        print_json(result.model_dump())
 562        return
 563
 564    try:
 565        assert actor_definition_id is not None
 566        canonical_name, resolved_connector_type = (
 567            resolve_definition_id_to_canonical_info(actor_definition_id)
 568        )
 569    except (PyAirbyteInputError, requests.RequestException) as e:
 570        exit_with_error(f"Failed to resolve --actor-definition-id: {e}")
 571    typed_connector_type: Literal["source", "destination"] = (
 572        "source" if resolved_connector_type == "source" else "destination"
 573    )
 574
 575    if override_level == "workspace":
 576        assert workspace_id is not None
 577        try:
 578            ws_resolution = resolve_workspace(workspace_id)
 579            if not ws_resolution.organization_id:
 580                exit_with_error("Could not resolve organization for workspace.")
 581            target = VersionOverrideTarget(
 582                scope="workspace",
 583                organization_id=ws_resolution.organization_id,
 584                workspace_id=workspace_id,
 585                connector_name=canonical_name,
 586                connector_type=typed_connector_type,
 587            )
 588        except PyAirbyteInputError as e:
 589            exit_with_error(str(e))
 590    else:
 591        assert organization_id is not None
 592        target = VersionOverrideTarget(
 593            scope="organization",
 594            organization_id=organization_id,
 595            connector_name=canonical_name,
 596            connector_type=typed_connector_type,
 597        )
 598
 599    try:
 600        result = set_cloud_version_override(
 601            auth=auth,
 602            target=target,
 603            approval_comment_url=approval_comment_url,
 604            version=version,
 605            unset=unset,
 606            override_reason=reason,
 607            override_reason_reference_url=reason_url,
 608            issue_url=issue_url,
 609            ai_agent_session_url=ai_agent_session_url,
 610            customer_tier_filter=customer_tier_filter,
 611        )
 612    except (PyAirbyteInputError, CloudAuthError) as e:
 613        exit_with_error(str(e))
 614    _print_override_result(result.success, result.message)
 615    print_json(result.model_dump())
 616
 617
 618def _print_override_result(success: bool, message: str) -> None:
 619    """Emit an override-operation status message via the shared CLI helpers."""
 620    if success:
 621        print_success(message)
 622    else:
 623        print_error(message)
 624
 625
 626@connector_app.command(name="set-version-override")
 627def set_version_override(
 628    version: Annotated[
 629        str,
 630        Parameter(
 631            help="The semver version string to pin to (e.g., '2.1.5-preview.abc1234')."
 632        ),
 633    ],
 634    reason: Annotated[
 635        str,
 636        Parameter(help="Explanation for the override (min 10 characters)."),
 637    ],
 638    issue_url: Annotated[
 639        str,
 640        Parameter(help="GitHub issue URL providing context for this operation."),
 641    ],
 642    approval_comment_url: Annotated[
 643        str,
 644        Parameter(
 645            help="Slack approval record URL where admin authorized this deployment."
 646        ),
 647    ],
 648    override_level: Annotated[
 649        _OverrideLevel,
 650        Parameter(
 651            help=(
 652                "Scope at which to apply the version override: 'actor' (single "
 653                "deployed connector instance), 'workspace' (all instances of a "
 654                "connector type within a workspace), or 'organization' (all "
 655                "instances across an organization). Defaults to 'actor'."
 656            ),
 657        ),
 658    ] = "actor",
 659    workspace_id: Annotated[
 660        str | None,
 661        Parameter(
 662            help=(
 663                "The Airbyte Cloud workspace ID. Required for 'actor' and "
 664                "'workspace' override levels."
 665            )
 666        ),
 667    ] = None,
 668    organization_id: Annotated[
 669        str | None,
 670        Parameter(
 671            help=(
 672                "The Airbyte Cloud organization ID. Required for "
 673                "'organization' override level."
 674            )
 675        ),
 676    ] = None,
 677    connector_id: Annotated[
 678        str | None,
 679        Parameter(
 680            help=(
 681                "The ID of the deployed connector (source or destination). "
 682                "Required for 'actor' override level."
 683            )
 684        ),
 685    ] = None,
 686    connector_type: Annotated[
 687        Literal["source", "destination"] | None,
 688        Parameter(help="The type of connector. Required for 'actor' override level."),
 689    ] = None,
 690    actor_definition_id: Annotated[
 691        str | None,
 692        Parameter(
 693            help=(
 694                "The connector definition UUID. Required for 'workspace' and "
 695                "'organization' override levels."
 696            )
 697        ),
 698    ] = None,
 699    ai_agent_session_url: Annotated[
 700        str | None,
 701        Parameter(
 702            help="URL to AI agent session driving this operation (for auditability)."
 703        ),
 704    ] = None,
 705    reason_url: Annotated[
 706        str | None,
 707        Parameter(help="Optional URL with more context (e.g., issue link)."),
 708    ] = None,
 709    customer_tier_filter: Annotated[
 710        _TierFilter,
 711        Parameter(
 712            help=(
 713                "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 714                "The operation will be rejected if the actual customer tier does not match. "
 715                "Defaults to 'TIER_2' (non-sensitive customers)."
 716            ),
 717        ),
 718    ] = "TIER_2",
 719) -> None:
 720    """Set a version override for a deployed connector.
 721
 722    Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and
 723    `AIRBYTE_INTERNAL_ADMIN_USER` environment variables.
 724
 725    The `--override-level` flag selects the scope at which the pin is applied:
 726
 727    - `actor` (default): pins a single deployed connector instance. Requires
 728      `--workspace-id`, `--connector-id`, and `--connector-type`.
 729    - `workspace`: pins ALL instances of a connector type within a workspace.
 730      Requires `--workspace-id` and `--actor-definition-id`.
 731    - `organization`: pins ALL instances of a connector type across an
 732      organization. Requires `--organization-id` and `--actor-definition-id`.
 733
 734    The `customer_tier_filter` gates the operation: the call fails if the
 735    actual tier of the target organization does not match. Use `ALL` to
 736    proceed regardless of tier (a warning is shown for sensitive tiers).
 737    """
 738    _dispatch_version_override(
 739        override_level=override_level,
 740        workspace_id=workspace_id,
 741        organization_id=organization_id,
 742        connector_id=connector_id,
 743        connector_type=connector_type,
 744        actor_definition_id=actor_definition_id,
 745        version=version,
 746        unset=False,
 747        reason=reason,
 748        reason_url=reason_url,
 749        issue_url=issue_url,
 750        approval_comment_url=approval_comment_url,
 751        ai_agent_session_url=ai_agent_session_url,
 752        customer_tier_filter=customer_tier_filter,
 753    )
 754
 755
 756@connector_app.command(name="clear-version-override")
 757def clear_version_override(
 758    issue_url: Annotated[
 759        str,
 760        Parameter(help="GitHub issue URL providing context for this operation."),
 761    ],
 762    approval_comment_url: Annotated[
 763        str,
 764        Parameter(
 765            help="Slack approval record URL where admin authorized this deployment."
 766        ),
 767    ],
 768    override_level: Annotated[
 769        _OverrideLevel,
 770        Parameter(
 771            help=(
 772                "Scope at which to clear the version override: 'actor', "
 773                "'workspace', or 'organization'. Defaults to 'actor'."
 774            ),
 775        ),
 776    ] = "actor",
 777    workspace_id: Annotated[
 778        str | None,
 779        Parameter(
 780            help=(
 781                "The Airbyte Cloud workspace ID. Required for 'actor' and "
 782                "'workspace' override levels."
 783            )
 784        ),
 785    ] = None,
 786    organization_id: Annotated[
 787        str | None,
 788        Parameter(
 789            help=(
 790                "The Airbyte Cloud organization ID. Required for "
 791                "'organization' override level."
 792            )
 793        ),
 794    ] = None,
 795    connector_id: Annotated[
 796        str | None,
 797        Parameter(
 798            help=(
 799                "The ID of the deployed connector (source or destination). "
 800                "Required for 'actor' override level."
 801            )
 802        ),
 803    ] = None,
 804    connector_type: Annotated[
 805        Literal["source", "destination"] | None,
 806        Parameter(help="The type of connector. Required for 'actor' override level."),
 807    ] = None,
 808    actor_definition_id: Annotated[
 809        str | None,
 810        Parameter(
 811            help=(
 812                "The connector definition UUID. Required for 'workspace' and "
 813                "'organization' override levels."
 814            )
 815        ),
 816    ] = None,
 817    ai_agent_session_url: Annotated[
 818        str | None,
 819        Parameter(
 820            help="URL to AI agent session driving this operation (for auditability)."
 821        ),
 822    ] = None,
 823    customer_tier_filter: Annotated[
 824        _TierFilter,
 825        Parameter(
 826            help=(
 827                "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 828                "The operation will be rejected if the actual customer tier does not match. "
 829                "Defaults to 'TIER_2' (non-sensitive customers)."
 830            ),
 831        ),
 832    ] = "TIER_2",
 833) -> None:
 834    """Clear a version override from a deployed connector.
 835
 836    Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and
 837    `AIRBYTE_INTERNAL_ADMIN_USER` environment variables.
 838
 839    The `--override-level` flag selects the scope at which the pin is removed:
 840
 841    - `actor` (default): clears a single deployed connector instance pin.
 842      Requires `--workspace-id`, `--connector-id`, and `--connector-type`.
 843    - `workspace`: clears the workspace-level pin for a connector type.
 844      Requires `--workspace-id` and `--actor-definition-id`.
 845    - `organization`: clears the organization-level pin for a connector type.
 846      Requires `--organization-id` and `--actor-definition-id`.
 847    """
 848    _dispatch_version_override(
 849        override_level=override_level,
 850        workspace_id=workspace_id,
 851        organization_id=organization_id,
 852        connector_id=connector_id,
 853        connector_type=connector_type,
 854        actor_definition_id=actor_definition_id,
 855        version=None,
 856        unset=True,
 857        reason=None,
 858        reason_url=None,
 859        issue_url=issue_url,
 860        approval_comment_url=approval_comment_url,
 861        ai_agent_session_url=ai_agent_session_url,
 862        customer_tier_filter=customer_tier_filter,
 863    )
 864
 865
 866def _load_json_file(file_path: Path) -> dict | None:
 867    """Load a JSON file and return its contents.
 868
 869    Returns None if the file doesn't exist or contains invalid JSON.
 870    """
 871    if not file_path.exists():
 872        return None
 873    try:
 874        return json.loads(file_path.read_text())
 875    except json.JSONDecodeError as e:
 876        print_error(f"Failed to parse JSON in file: {file_path}\nError: {e}")
 877        return None
 878
 879
 880def _run_connector_command(
 881    connector_image: str,
 882    command: Command,
 883    output_dir: Path,
 884    target_or_control: TargetOrControl,
 885    config_path: Path | None = None,
 886    catalog_path: Path | None = None,
 887    state_path: Path | None = None,
 888    proxy_url: str | None = None,
 889    enable_debug_logs: bool = False,
 890) -> dict:
 891    """Run a connector command and return results as a dict.
 892
 893    Args:
 894        connector_image: Full connector image name with tag.
 895        command: The Airbyte command to run.
 896        output_dir: Directory to store output files.
 897        target_or_control: Whether this is target or control version.
 898        config_path: Path to connector config JSON file.
 899        catalog_path: Path to configured catalog JSON file.
 900        state_path: Path to state JSON file.
 901        proxy_url: Optional HTTP proxy URL for traffic capture.
 902        enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container.
 903
 904    Returns:
 905        Dictionary with execution results.
 906    """
 907    connector = ConnectorUnderTest.from_image_name(connector_image, target_or_control)
 908
 909    config = _load_json_file(config_path) if config_path else None
 910    state = _load_json_file(state_path) if state_path else None
 911
 912    configured_catalog = None
 913    if catalog_path and catalog_path.exists():
 914        catalog_json = catalog_path.read_text()
 915        configured_catalog = ConfiguredAirbyteCatalog.parse_raw(catalog_json)
 916
 917    # Pass log level as an environment variable to the connector container
 918    container_env: dict[str, str] = {}
 919    if enable_debug_logs:
 920        container_env["LOG_LEVEL"] = "DEBUG"
 921
 922    execution_inputs = ExecutionInputs(
 923        connector_under_test=connector,
 924        command=command,
 925        output_dir=output_dir,
 926        config=config,
 927        configured_catalog=configured_catalog,
 928        state=state,
 929        environment_variables=container_env or None,
 930    )
 931
 932    runner = ConnectorRunner(execution_inputs, proxy_url=proxy_url)
 933    result = runner.run()
 934
 935    result.save_artifacts(output_dir)
 936
 937    result_dict: dict[str, object] = {
 938        "connector": connector_image,
 939        "command": command.value,
 940        "success": result.success,
 941        "exit_code": result.exit_code,
 942        "stdout_file": str(result.stdout_file_path),
 943        "stderr_file": str(result.stderr_file_path),
 944        "message_counts": {
 945            k.value: v for k, v in result.get_message_count_per_type().items()
 946        },
 947        "record_counts_per_stream": result.get_record_count_per_stream(),
 948    }
 949
 950    # For CHECK, include the parsed connectionStatus from the Airbyte protocol.
 951    # The CDK exits 0 regardless of check outcome; the real pass/fail signal
 952    # lives in the CONNECTION_STATUS message (status=SUCCEEDED or FAILED).
 953    if command == Command.CHECK:
 954        conn_status = result.get_connection_status()
 955        if conn_status is not None:
 956            result_dict["connection_status"] = conn_status.status.value
 957            result_dict["connection_status_message"] = conn_status.message or ""
 958        else:
 959            result_dict["connection_status"] = None
 960            result_dict["connection_status_message"] = ""
 961
 962    return result_dict
 963
 964
 965def _build_connector_image_from_source(
 966    connector_name: str,
 967    repo_root: Path | None = None,
 968    tag: str = "dev",
 969) -> str | None:
 970    """Build a connector image from source code.
 971
 972    Args:
 973        connector_name: Name of the connector (e.g., 'source-pokeapi').
 974        repo_root: Optional path to the airbyte repo root. If not provided,
 975            will attempt to auto-detect from current directory.
 976        tag: Tag to apply to the built image (default: 'dev').
 977
 978    Returns:
 979        The full image name with tag if successful, None if build fails.
 980    """
 981    if not verify_docker_installation():
 982        print_error("Docker is not installed or not running")
 983        return None
 984
 985    try:
 986        connector_directory = find_connector_root_from_name(connector_name)
 987    except FileNotFoundError:
 988        if repo_root:
 989            connector_directory = repo_root / CONNECTORS_SUBDIR / connector_name
 990            if not connector_directory.exists():
 991                print_error(f"Connector directory not found: {connector_directory}")
 992                return None
 993        else:
 994            print_error(
 995                f"Could not find connector '{connector_name}'. "
 996                "Try providing --repo-root to specify the airbyte repo location."
 997            )
 998            return None
 999
1000    metadata_file_path = connector_directory / "metadata.yaml"
1001    if not metadata_file_path.exists():
1002        print_error(f"metadata.yaml not found at {metadata_file_path}")
1003        return None
1004
1005    metadata = MetadataFile.from_file(metadata_file_path)
1006    print_success(f"Building image for connector: {connector_name}")
1007
1008    built_image = build_connector_image(
1009        connector_name=connector_name,
1010        connector_directory=connector_directory,
1011        metadata=metadata,
1012        tag=tag,
1013        no_verify=False,
1014    )
1015    print_success(f"Successfully built image: {built_image}")
1016    return built_image
1017
1018
1019def _fetch_control_image_from_metadata(connector_name: str) -> str | None:
1020    """Fetch the current released connector image from metadata.yaml on main branch.
1021
1022    This fetches the connector's metadata.yaml from the airbyte monorepo's master branch
1023    and extracts the dockerRepository and dockerImageTag to construct the control image.
1024
1025    Args:
1026        connector_name: The connector name (e.g., 'source-github').
1027
1028    Returns:
1029        The full connector image with tag (e.g., 'airbyte/source-github:1.0.0'),
1030        or None if the metadata could not be fetched or parsed.
1031    """
1032    metadata_url = (
1033        f"https://raw.githubusercontent.com/airbytehq/airbyte/master/"
1034        f"airbyte-integrations/connectors/{connector_name}/metadata.yaml"
1035    )
1036    response = requests.get(metadata_url, timeout=30)
1037    if not response.ok:
1038        print_error(
1039            f"Failed to fetch metadata for {connector_name}: "
1040            f"HTTP {response.status_code} from {metadata_url}"
1041        )
1042        return None
1043
1044    metadata = yaml.safe_load(response.text)
1045    if not isinstance(metadata, dict):
1046        print_error(f"Invalid metadata format for {connector_name}: expected dict")
1047        return None
1048
1049    data = metadata.get("data", {})
1050    docker_repository = data.get("dockerRepository")
1051    docker_image_tag = data.get("dockerImageTag")
1052
1053    if not docker_repository or not docker_image_tag:
1054        print_error(
1055            f"Could not find dockerRepository/dockerImageTag in metadata for {connector_name}"
1056        )
1057        return None
1058
1059    return f"{docker_repository}:{docker_image_tag}"
1060
1061
1062def _run_with_optional_http_metrics(
1063    connector_image: str,
1064    command: Command,
1065    output_dir: Path,
1066    target_or_control: TargetOrControl,
1067    enable_http_metrics: bool,
1068    config_path: Path | None,
1069    catalog_path: Path | None,
1070    state_path: Path | None,
1071    enable_debug_logs: bool = False,
1072) -> dict:
1073    """Run a connector command with optional HTTP metrics capture.
1074
1075    When enable_http_metrics is True, starts mitmproxy to capture HTTP traffic.
1076    If mitmproxy fails to start, falls back to running without metrics.
1077
1078    Args:
1079        connector_image: Full connector image name with tag.
1080        command: The Airbyte command to run.
1081        output_dir: Directory to store output files.
1082        target_or_control: Whether this is target or control version.
1083        enable_http_metrics: Whether to capture HTTP metrics via mitmproxy.
1084        config_path: Path to connector config JSON file.
1085        catalog_path: Path to configured catalog JSON file.
1086        state_path: Path to state JSON file.
1087        enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container.
1088
1089    Returns:
1090        Dictionary with execution results, optionally including http_metrics.
1091    """
1092    if not enable_http_metrics:
1093        return _run_connector_command(
1094            connector_image=connector_image,
1095            command=command,
1096            output_dir=output_dir,
1097            target_or_control=target_or_control,
1098            config_path=config_path,
1099            catalog_path=catalog_path,
1100            state_path=state_path,
1101            enable_debug_logs=enable_debug_logs,
1102        )
1103
1104    with MitmproxyManager.start(output_dir) as session:
1105        if session is None:
1106            print_error("Mitmproxy unavailable, running without HTTP metrics")
1107            return _run_connector_command(
1108                connector_image=connector_image,
1109                command=command,
1110                output_dir=output_dir,
1111                target_or_control=target_or_control,
1112                config_path=config_path,
1113                catalog_path=catalog_path,
1114                state_path=state_path,
1115                enable_debug_logs=enable_debug_logs,
1116            )
1117
1118        print_success(f"Started mitmproxy on {session.proxy_url}")
1119        result = _run_connector_command(
1120            connector_image=connector_image,
1121            command=command,
1122            output_dir=output_dir,
1123            target_or_control=target_or_control,
1124            config_path=config_path,
1125            catalog_path=catalog_path,
1126            state_path=state_path,
1127            proxy_url=session.proxy_url,
1128            enable_debug_logs=enable_debug_logs,
1129        )
1130
1131        http_metrics = parse_http_dump(session.dump_file_path)
1132        result["http_metrics"] = {
1133            "flow_count": http_metrics.flow_count,
1134            "duplicate_flow_count": http_metrics.duplicate_flow_count,
1135        }
1136        print_success(
1137            f"Captured {http_metrics.flow_count} HTTP flows "
1138            f"({http_metrics.duplicate_flow_count} duplicates)"
1139        )
1140        return result
1141
1142
1143@connector_app.command(name="regression-test")
1144def regression_test(
1145    skip_compare: Annotated[
1146        bool,
1147        Parameter(
1148            help="If True, skip comparison and run single-version tests only. "
1149            "If False (default), run comparison tests (target vs control)."
1150        ),
1151    ] = False,
1152    test_image: Annotated[
1153        str | None,
1154        Parameter(
1155            help="Test connector image with tag (e.g., airbyte/source-github:1.0.0). "
1156            "This is the image under test - in comparison mode, it's compared against control_image."
1157        ),
1158    ] = None,
1159    control_image: Annotated[
1160        str | None,
1161        Parameter(
1162            help="Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). "
1163            "Ignored if `skip_compare=True`."
1164        ),
1165    ] = None,
1166    connector_name: Annotated[
1167        str | None,
1168        Parameter(
1169            help="Connector name to build image from source (e.g., 'source-pokeapi'). "
1170            "If provided, builds the image locally with tag 'dev'. "
1171            "For comparison tests (default), this builds the target image. "
1172            "For single-version tests (skip_compare=True), this builds the test image."
1173        ),
1174    ] = None,
1175    repo_root: Annotated[
1176        str | None,
1177        Parameter(
1178            help="Path to the airbyte repo root. Required if connector_name is provided "
1179            "and the repo cannot be auto-detected."
1180        ),
1181    ] = None,
1182    command: Annotated[
1183        Literal["spec", "check", "discover", "read"],
1184        Parameter(help="The Airbyte command to run."),
1185    ] = "check",
1186    connection_id: Annotated[
1187        str | None,
1188        Parameter(
1189            help="Airbyte Cloud connection ID to fetch config/catalog from. "
1190            "Mutually exclusive with config-path/catalog-path. "
1191            "If provided, test_image/control_image can be auto-detected."
1192        ),
1193    ] = None,
1194    config_path: Annotated[
1195        str | None,
1196        Parameter(help="Path to the connector config JSON file."),
1197    ] = None,
1198    catalog_path: Annotated[
1199        str | None,
1200        Parameter(help="Path to the configured catalog JSON file (required for read)."),
1201    ] = None,
1202    state_path: Annotated[
1203        str | None,
1204        Parameter(help="Path to the state JSON file (optional for read)."),
1205    ] = None,
1206    output_dir: Annotated[
1207        str,
1208        Parameter(help="Directory to store test artifacts."),
1209    ] = "/tmp/regression_test_artifacts",
1210    enable_http_metrics: Annotated[
1211        bool,
1212        Parameter(
1213            help="Capture HTTP traffic metrics via mitmproxy (experimental). "
1214            "Requires mitmdump to be installed. Only used in comparison mode."
1215        ),
1216    ] = False,
1217    selected_streams: Annotated[
1218        str | None,
1219        Parameter(
1220            help="Comma-separated list of stream names to include in the read. "
1221            "Only these streams will be included in the configured catalog. "
1222            "This is useful to limit data volume by testing only specific streams."
1223        ),
1224    ] = None,
1225    enable_debug_logs: Annotated[
1226        bool,
1227        Parameter(
1228            help="Enable debug-level logging for regression test output. "
1229            "Also passed as `LOG_LEVEL=DEBUG` to the connector Docker container."
1230        ),
1231    ] = False,
1232    with_state: Annotated[
1233        bool | None,
1234        Parameter(
1235            negative="--no-state",
1236            help="Fetch and pass the connection's current state to the read command, "
1237            "producing a warm read instead of a cold read. Defaults to `True` when "
1238            "`--connection-id` is provided, `False` otherwise. Has no effect unless "
1239            "the command is `read`. Ignored when `--state-path` is explicitly provided.",
1240        ),
1241    ] = None,
1242) -> None:
1243    """Run regression tests on connectors.
1244
1245    This command supports two modes:
1246
1247    Comparison mode (skip_compare=False, default):
1248        Runs the specified Airbyte protocol command against both the target (new)
1249        and control (baseline) connector versions, then compares the results.
1250        This helps identify regressions between versions.
1251
1252    Single-version mode (skip_compare=True):
1253        Runs the specified Airbyte protocol command against a single connector
1254        and validates the output. No comparison is performed.
1255
1256    Results are written to the output directory and to GitHub Actions outputs
1257    if running in CI.
1258
1259    You can provide the test image in three ways:
1260    1. --test-image: Use a pre-built image from Docker registry
1261    2. --connector-name: Build the image locally from source code
1262    3. --connection-id: Auto-detect from an Airbyte Cloud connection
1263
1264    You can provide config/catalog either via file paths OR via a connection_id
1265    that fetches them from Airbyte Cloud.
1266    """
1267    # Configure debug logging for the regression test harness when requested
1268    if enable_debug_logs:
1269        logging.basicConfig(
1270            level=logging.DEBUG,
1271            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
1272            force=True,
1273        )
1274
1275    output_path = Path(output_dir)
1276    output_path.mkdir(parents=True, exist_ok=True)
1277
1278    cmd = Command(command)
1279
1280    config_file: Path | None = None
1281    catalog_file: Path | None = None
1282    state_file = Path(state_path) if state_path else None
1283
1284    # Resolve the test image (used in both single-version and comparison modes)
1285    resolved_test_image: str | None = test_image
1286    resolved_control_image: str | None = control_image
1287
1288    # Validate conflicting parameters
1289    # Single-version mode: reject comparison-specific parameters
1290    if skip_compare and control_image:
1291        write_github_output("success", False)
1292        write_github_output(
1293            "error", "Cannot specify control_image with skip_compare=True"
1294        )
1295        exit_with_error(
1296            "Cannot specify --control-image with --skip-compare. "
1297            "Control image is only used in comparison mode."
1298        )
1299
1300    # If connector_name is provided, build the image from source
1301    if connector_name:
1302        if resolved_test_image:
1303            write_github_output("success", False)
1304            write_github_output(
1305                "error", "Cannot specify both test_image and connector_name"
1306            )
1307            exit_with_error("Cannot specify both --test-image and --connector-name")
1308
1309        repo_root_path = Path(repo_root) if repo_root else None
1310        built_image = _build_connector_image_from_source(
1311            connector_name=connector_name,
1312            repo_root=repo_root_path,
1313            tag="dev",
1314        )
1315        if not built_image:
1316            write_github_output("success", False)
1317            write_github_output("error", f"Failed to build image for {connector_name}")
1318            exit_with_error(f"Failed to build image for {connector_name}")
1319        resolved_test_image = built_image
1320
1321    if connection_id:
1322        if config_path or catalog_path:
1323            write_github_output("success", False)
1324            write_github_output(
1325                "error", "Cannot specify both connection_id and file paths"
1326            )
1327            exit_with_error(
1328                "Cannot specify both connection_id and config_path/catalog_path"
1329            )
1330
1331        print_success(f"Fetching config/catalog from connection: {connection_id}")
1332        connection_data = fetch_connection_data(connection_id)
1333
1334        # Check if we should retrieve unmasked secrets
1335        if should_use_secret_retriever():
1336            print_success(
1337                "USE_CONNECTION_SECRET_RETRIEVER enabled - enriching config with unmasked secrets..."
1338            )
1339            try:
1340                connection_data = enrich_config_with_secrets(
1341                    connection_data,
1342                    retrieval_reason="Regression test with USE_CONNECTION_SECRET_RETRIEVER=true",
1343                )
1344                print_success("Successfully retrieved unmasked secrets from database")
1345            except SecretRetrievalError as e:
1346                write_github_output("success", False)
1347                write_github_output("error", str(e))
1348                exit_with_error(
1349                    f"{e}\n\n"
1350                    f"This connection cannot be used for regression testing. "
1351                    f"Please use a connection from a non-EU workspace, or use GSM-based "
1352                    f"integration test credentials instead (by omitting --connection-id)."
1353                )
1354
1355        # Resolve with_state default: True when connection_id is provided
1356        resolved_with_state = with_state if with_state is not None else True
1357
1358        if resolved_with_state and not state_file and command == "read":
1359            print_success("Fetching connection state for warm read...")
1360            try:
1361                connection_data.state = fetch_connection_state(connection_data)
1362                if connection_data.state:
1363                    print_success(
1364                        f"Fetched state with {len(connection_data.state)} stream(s)"
1365                    )
1366                else:
1367                    connection_data.state = None
1368                    print_success(
1369                        "No state available for this connection (initial sync)"
1370                    )
1371            except Exception as exc:
1372                print_error(f"Failed to fetch connection state: {exc}")
1373                print_success("Falling back to cold read (no state)")
1374
1375        config_file, catalog_file, state_file_from_conn = save_connection_data_to_files(
1376            connection_data, output_path / "connection_data"
1377        )
1378
1379        if state_file_from_conn and not state_file:
1380            state_file = state_file_from_conn
1381
1382        print_success(
1383            f"Fetched config for source: {connection_data.source_name} "
1384            f"with {len(connection_data.stream_names)} streams"
1385        )
1386
1387        # Auto-detect test/control image from connection if not provided
1388        if not resolved_test_image and connection_data.connector_image:
1389            resolved_test_image = connection_data.connector_image
1390            print_success(f"Auto-detected test image: {resolved_test_image}")
1391
1392        if (
1393            not skip_compare
1394            and not resolved_control_image
1395            and connection_data.connector_image
1396        ):
1397            resolved_control_image = connection_data.connector_image
1398            print_success(f"Auto-detected control image: {resolved_control_image}")
1399    elif config_path:
1400        config_file = Path(config_path)
1401        catalog_file = Path(catalog_path) if catalog_path else None
1402    elif connector_name:
1403        # Fallback: fetch integration test secrets from GSM using PyAirbyte API
1404        print_success(
1405            f"No connection_id or config_path provided. "
1406            f"Attempting to fetch integration test config from GSM for {connector_name}..."
1407        )
1408        gsm_config = get_first_config_from_secrets(connector_name)
1409        if gsm_config:
1410            # Write config to a temp file (not in output_path to avoid artifact upload)
1411            gsm_config_dir = Path(
1412                tempfile.mkdtemp(prefix=f"gsm-config-{connector_name}-")
1413            )
1414            gsm_config_dir.chmod(0o700)
1415            gsm_config_file = gsm_config_dir / "config.json"
1416            gsm_config_file.write_text(json.dumps(gsm_config, indent=2))
1417            gsm_config_file.chmod(0o600)
1418            config_file = gsm_config_file
1419            # Use catalog_path if provided (e.g., generated from discover output)
1420            catalog_file = Path(catalog_path) if catalog_path else None
1421            print_success(
1422                f"Fetched integration test config from GSM for {connector_name}"
1423            )
1424        else:
1425            print_error(
1426                f"Failed to fetch integration test config from GSM for {connector_name}."
1427            )
1428            config_file = None
1429            # Use catalog_path if provided (e.g., generated from discover output)
1430            catalog_file = Path(catalog_path) if catalog_path else None
1431    else:
1432        config_file = None
1433        catalog_file = Path(catalog_path) if catalog_path else None
1434
1435    # Auto-detect control_image from metadata.yaml if connector_name is provided (comparison mode only)
1436    if not skip_compare and not resolved_control_image and connector_name:
1437        resolved_control_image = _fetch_control_image_from_metadata(connector_name)
1438        if resolved_control_image:
1439            print_success(
1440                f"Auto-detected control image from metadata.yaml: {resolved_control_image}"
1441            )
1442
1443    # Validate that we have the required images
1444    if not resolved_test_image:
1445        write_github_output("success", False)
1446        write_github_output("error", "No test image specified")
1447        exit_with_error(
1448            "You must provide one of the following: a test_image, a connector_name "
1449            "to build the image from source, or a connection_id to auto-detect the image."
1450        )
1451
1452    if not skip_compare and not resolved_control_image:
1453        write_github_output("success", False)
1454        write_github_output("error", "No control image specified")
1455        exit_with_error(
1456            "You must provide one of the following: a control_image, a connection_id "
1457            "for a connection that has an associated connector image, or a connector_name "
1458            "to auto-detect the control image from the airbyte repo's metadata.yaml."
1459        )
1460
1461    # Pull images if they weren't just built locally
1462    # If connector_name was provided, we just built the test image locally
1463    if not connector_name and not ensure_image_available(resolved_test_image):
1464        write_github_output("success", False)
1465        write_github_output("error", f"Failed to pull image: {resolved_test_image}")
1466        exit_with_error(f"Failed to pull test image: {resolved_test_image}")
1467
1468    if (
1469        not skip_compare
1470        and resolved_control_image
1471        and not ensure_image_available(resolved_control_image)
1472    ):
1473        write_github_output("success", False)
1474        write_github_output("error", f"Failed to pull image: {resolved_control_image}")
1475        exit_with_error(
1476            f"Failed to pull control connector image: {resolved_control_image}"
1477        )
1478
1479    # Apply selected_streams filter to catalog if requested
1480    if selected_streams and catalog_file:
1481        streams_set = {s.strip() for s in selected_streams.split(",") if s.strip()}
1482        if streams_set:
1483            print_success(
1484                f"Filtering catalog to {len(streams_set)} selected streams: "
1485                f"{', '.join(sorted(streams_set))}"
1486            )
1487            filter_configured_catalog_file(catalog_file, streams_set)
1488
1489    # Upgrade READ → READ_WITH_STATE when a state file is available
1490    if cmd == Command.READ and state_file:
1491        cmd = Command.READ_WITH_STATE
1492        print_success("State available — using read-with-state (warm read)")
1493
1494    # Track telemetry for the regression test
1495    # Extract version from image tag (e.g., "airbyte/source-github:1.0.0" -> "1.0.0")
1496    target_version = (
1497        resolved_test_image.rsplit(":", 1)[-1]
1498        if ":" in resolved_test_image
1499        else "unknown"
1500    )
1501    control_version = None
1502    if resolved_control_image and ":" in resolved_control_image:
1503        control_version = resolved_control_image.rsplit(":", 1)[-1]
1504
1505    # Get tester identity from environment (GitHub Actions sets GITHUB_ACTOR)
1506    tester = os.getenv("GITHUB_ACTOR") or os.getenv("USER")
1507
1508    track_regression_test(
1509        user_id=tester,
1510        connector_image=resolved_test_image,
1511        command=command,
1512        target_version=target_version,
1513        control_version=control_version,
1514        additional_properties={
1515            "connection_id": connection_id,
1516            "skip_compare": skip_compare,
1517            "with_state": cmd == Command.READ_WITH_STATE,
1518        },
1519    )
1520
1521    # Execute the appropriate mode
1522    if skip_compare:
1523        # Single-version mode: run only the connector image
1524        result = _run_connector_command(
1525            connector_image=resolved_test_image,
1526            command=cmd,
1527            output_dir=output_path,
1528            target_or_control=TargetOrControl.TARGET,
1529            config_path=config_file,
1530            catalog_path=catalog_file,
1531            state_path=state_file,
1532            enable_debug_logs=enable_debug_logs,
1533        )
1534
1535        print_json(result)
1536
1537        write_github_outputs(
1538            {
1539                "success": result["success"],
1540                "connector": resolved_test_image,
1541                "command": command,
1542                "exit_code": result["exit_code"],
1543            }
1544        )
1545
1546        write_test_summary(
1547            connector_image=resolved_test_image,
1548            test_type="regression-test",
1549            success=result["success"],
1550            results={
1551                "command": command,
1552                "exit_code": result["exit_code"],
1553                "output_dir": output_dir,
1554            },
1555        )
1556
1557        # Generate report.md with detailed metrics
1558        report_path = generate_single_version_report(
1559            connector_image=resolved_test_image,
1560            command=command,
1561            result=result,
1562            output_dir=output_path,
1563        )
1564        print_success(f"Generated report: {report_path}")
1565
1566        # Write report to GITHUB_STEP_SUMMARY (if env var exists)
1567        write_github_summary(report_path.read_text())
1568
1569        if result["success"]:
1570            print_success(
1571                f"Single-version regression test passed for {resolved_test_image}"
1572            )
1573        else:
1574            print_error(
1575                f"Single-version regression test failed for {resolved_test_image}"
1576            )
1577    else:
1578        # Comparison mode: run both target and control images
1579        target_output = output_path / "target"
1580        control_output = output_path / "control"
1581
1582        target_result = _run_with_optional_http_metrics(
1583            connector_image=resolved_test_image,
1584            command=cmd,
1585            output_dir=target_output,
1586            target_or_control=TargetOrControl.TARGET,
1587            enable_http_metrics=enable_http_metrics,
1588            config_path=config_file,
1589            catalog_path=catalog_file,
1590            state_path=state_file,
1591            enable_debug_logs=enable_debug_logs,
1592        )
1593
1594        control_result = _run_with_optional_http_metrics(
1595            connector_image=resolved_control_image,  # type: ignore[arg-type]
1596            command=cmd,
1597            output_dir=control_output,
1598            target_or_control=TargetOrControl.CONTROL,
1599            enable_http_metrics=enable_http_metrics,
1600            config_path=config_file,
1601            catalog_path=catalog_file,
1602            state_path=state_file,
1603            enable_debug_logs=enable_debug_logs,
1604        )
1605
1606        both_succeeded = target_result["success"] and control_result["success"]
1607        regression_detected = target_result["success"] != control_result["success"]
1608
1609        # For CHECK commands, override pass/fail using connectionStatus instead
1610        # of exit codes.  The CDK exits 0 even when a check fails; the real
1611        # signal is in CONNECTION_STATUS.status.
1612        if command == "check":
1613            target_conn = target_result.get("connection_status")
1614            control_conn = control_result.get("connection_status")
1615            if target_conn is not None and control_conn is not None:
1616                both_succeeded = (
1617                    target_conn == "SUCCEEDED" and control_conn == "SUCCEEDED"
1618                )
1619                # Only flag a regression when the target got worse.
1620                # An improvement (target SUCCEEDED, control FAILED) is not a
1621                # regression.
1622                regression_detected = (
1623                    target_conn == "FAILED" and control_conn == "SUCCEEDED"
1624                )
1625
1626        combined_result = {
1627            "target": target_result,
1628            "control": control_result,
1629            "both_succeeded": both_succeeded,
1630            "regression_detected": regression_detected,
1631        }
1632
1633        print_json(combined_result)
1634
1635        both_failed = not target_result["success"] and not control_result["success"]
1636
1637        # For CHECK, also flag both-failed when both have FAILED connectionStatus
1638        if command == "check":
1639            target_conn = target_result.get("connection_status")
1640            control_conn = control_result.get("connection_status")
1641            if target_conn == "FAILED" and control_conn == "FAILED":
1642                both_failed = True
1643
1644        gh_outputs: dict[str, object] = {
1645            "success": not regression_detected,
1646            "target_image": resolved_test_image,
1647            "control_image": resolved_control_image,
1648            "command": command,
1649            "target_exit_code": target_result["exit_code"],
1650            "control_exit_code": control_result["exit_code"],
1651            "regression_detected": regression_detected,
1652            "both_failed": both_failed,
1653        }
1654
1655        if command == "check":
1656            gh_outputs["target_connection_status"] = target_result.get(
1657                "connection_status"
1658            )
1659            gh_outputs["control_connection_status"] = control_result.get(
1660                "connection_status"
1661            )
1662
1663        write_github_outputs(gh_outputs)
1664
1665        write_json_output("regression_report", combined_result)
1666
1667        report_path = generate_regression_report(
1668            target_image=resolved_test_image,
1669            control_image=resolved_control_image,  # type: ignore[arg-type]
1670            command=command,
1671            target_result=target_result,
1672            control_result=control_result,
1673            output_dir=output_path,
1674        )
1675        print_success(f"Generated regression report: {report_path}")
1676
1677        # Write report to GITHUB_STEP_SUMMARY (if env var exists)
1678        write_github_summary(report_path.read_text())
1679
1680        if regression_detected:
1681            print_error(
1682                f"Regression detected between {resolved_test_image} and {resolved_control_image}"
1683            )
1684        elif both_succeeded:
1685            print_success(
1686                f"Regression test passed for {resolved_test_image} vs {resolved_control_image}"
1687            )
1688        else:
1689            # Both versions failed, but no regression between them was detected; treat this as a
1690            # test environment issue (e.g., expired credentials, transient API errors, rate limiting).
1691            # Exit successfully since no regression between versions was detected.
1692            print_warning(
1693                f"Both versions failed for {resolved_test_image} vs {resolved_control_image}. "
1694                "No regression detected. This may indicate expired credentials or a transient API issue."
1695            )
1696
1697
1698@connector_app.command(name="fetch-connection-config")
1699def fetch_connection_config_cmd(
1700    connection_id: Annotated[
1701        str,
1702        Parameter(help="The UUID of the Airbyte Cloud connection."),
1703    ],
1704    output_path: Annotated[
1705        str | None,
1706        Parameter(
1707            help="Path to output file or directory. "
1708            "If directory, writes connection-<id>-config.json inside it. "
1709            "Default: platform temp directory (e.g. /tmp/connection-<id>-config.json)"
1710        ),
1711    ] = None,
1712    with_secrets: Annotated[
1713        bool,
1714        Parameter(
1715            name="--with-secrets",
1716            negative="--no-secrets",
1717            help="If set, fetches unmasked secrets from the internal database. "
1718            "Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or `gcloud auth application-default login`. "
1719            "Must be used with --oc-issue-url.",
1720        ),
1721    ] = False,
1722    oc_issue_url: Annotated[
1723        str | None,
1724        Parameter(
1725            help="OC issue URL for audit logging. Required when using --with-secrets."
1726        ),
1727    ] = None,
1728) -> None:
1729    """Fetch connection configuration from Airbyte Cloud to a local file.
1730
1731    This command retrieves the source configuration for a given connection ID
1732    and writes it to a JSON file. When `--output-path` is omitted the file is
1733    written to the platform temp directory to avoid accidentally committing
1734    secrets to a git repository.
1735
1736    Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and
1737    AIRBYTE_CLOUD_CLIENT_SECRET environment variables.
1738
1739    When --with-secrets is specified, the command fetches unmasked secrets from
1740    the internal database using the connection-retriever. This additionally requires:
1741    - An OC issue URL for audit logging (--oc-issue-url)
1742    - GCP credentials via `GCP_PROD_DB_ACCESS_CREDENTIALS` env var or `gcloud auth application-default login`
1743    - Cloud SQL Python Connector access to the Prod DB replica.
1744    """
1745    path = Path(output_path) if output_path else None
1746    result = fetch_connection_config(
1747        connection_id=connection_id,
1748        output_path=path,
1749        with_secrets=with_secrets,
1750        oc_issue_url=oc_issue_url,
1751    )
1752    if result.success:
1753        print_success(result.message)
1754    else:
1755        print_error(result.message)
1756    print_json(result.model_dump())
1757
1758
1759def _read_json_input(
1760    json_str: str | None,
1761    input_file: str | None,
1762) -> dict:
1763    """Read JSON from a positional arg, --input file, or STDIN (in that priority)."""
1764    if json_str is not None:
1765        return json.loads(json_str)
1766    if input_file is not None:
1767        return json.loads(Path(input_file).read_text())
1768    if not sys.stdin.isatty():
1769        return json.loads(sys.stdin.read())
1770    exit_with_error(
1771        "No JSON input provided. Pass it as a positional argument, "
1772        "via --input <file>, or pipe to STDIN."
1773    )
1774    raise SystemExit(1)  # unreachable, but satisfies type checker
1775
1776
1777@state_app.command(name="get")
1778def get_connection_state(
1779    connection_id: Annotated[
1780        str,
1781        Parameter(help="The connection ID (UUID) to fetch state for."),
1782    ],
1783    stream_name: Annotated[
1784        str | None,
1785        Parameter(
1786            help="Optional stream name to filter state for a single stream.",
1787            name="--stream",
1788        ),
1789    ] = None,
1790    stream_namespace: Annotated[
1791        str | None,
1792        Parameter(
1793            help="Optional stream namespace to narrow the stream filter.",
1794            name="--namespace",
1795        ),
1796    ] = None,
1797    output: Annotated[
1798        str | None,
1799        Parameter(
1800            help="Path to write the state JSON output to a file.",
1801            name="--output",
1802        ),
1803    ] = None,
1804) -> None:
1805    """Get the current state for an Airbyte Cloud connection."""
1806    workspace = CloudWorkspace.from_env()
1807    conn = workspace.get_connection(connection_id)
1808
1809    if stream_name is not None:
1810        stream_state = conn.get_stream_state(
1811            stream_name=stream_name,
1812            stream_namespace=stream_namespace,
1813        )
1814        result = {
1815            "connection_id": connection_id,
1816            "stream_name": stream_name,
1817            "stream_namespace": stream_namespace,
1818            "stream_state": stream_state,
1819        }
1820    else:
1821        result = conn.dump_raw_state()
1822
1823    json_output = json.dumps(result, indent=2, default=str)
1824    if output is not None:
1825        Path(output).write_text(json_output + "\n")
1826        print(f"State written to {output}", file=sys.stderr)
1827    else:
1828        print(json_output)
1829
1830
1831@state_app.command(name="set")
1832def set_connection_state(
1833    connection_id: Annotated[
1834        str,
1835        Parameter(help="The connection ID (UUID) to update state for."),
1836    ],
1837    state_json: Annotated[
1838        str | None,
1839        Parameter(
1840            help="The connection state as a JSON string. When --stream is used, "
1841            'this is just the stream\'s state blob (e.g., \'{"cursor": "2024-01-01"}\').'
1842            " Otherwise, must include 'stateType', 'connectionId', and the appropriate "
1843            "state field. Can also be provided via --input or STDIN."
1844        ),
1845    ] = None,
1846    stream_name: Annotated[
1847        str | None,
1848        Parameter(
1849            help="Optional stream name to update state for a single stream only.",
1850            name="--stream",
1851        ),
1852    ] = None,
1853    stream_namespace: Annotated[
1854        str | None,
1855        Parameter(
1856            help="Optional stream namespace to identify the stream.",
1857            name="--namespace",
1858        ),
1859    ] = None,
1860    input_file: Annotated[
1861        str | None,
1862        Parameter(
1863            help="Path to a JSON file containing the state to set.",
1864            name="--input",
1865        ),
1866    ] = None,
1867) -> None:
1868    """Set the state for an Airbyte Cloud connection.
1869
1870    State JSON can be provided as a positional argument, via --input <file>,
1871    or piped through STDIN.
1872
1873    Uses the safe variant that prevents updates while a sync is running.
1874    When --stream is provided, only that stream's state is updated within
1875    the existing connection state.
1876    """
1877    parsed_state = _read_json_input(state_json, input_file)
1878    workspace = CloudWorkspace.from_env()
1879    conn = workspace.get_connection(connection_id)
1880
1881    if stream_name is not None:
1882        conn.set_stream_state(
1883            stream_name=stream_name,
1884            state_blob_dict=parsed_state,
1885            stream_namespace=stream_namespace,
1886        )
1887    else:
1888        conn.import_raw_state(parsed_state)
1889
1890    result = conn.dump_raw_state()
1891    json_output = json.dumps(result, indent=2, default=str)
1892    print(json_output)
1893
1894
1895@state_app.command(name="reset")
1896def reset_stream_state(
1897    connection_id: Annotated[
1898        str,
1899        Parameter(help="The connection ID (UUID) to update state for."),
1900    ],
1901    stream_name: Annotated[
1902        str,
1903        Parameter(
1904            help="The configured stream name whose state should be reset.",
1905            name="--stream",
1906        ),
1907    ],
1908    stream_namespace: Annotated[
1909        str | None,
1910        Parameter(
1911            help="Optional stream namespace to identify the stream.",
1912            name="--namespace",
1913        ),
1914    ] = None,
1915) -> None:
1916    """Reset a configured stream's state so the next sync full-refreshes it.
1917
1918    Uses the safe variant that prevents updates while a sync is running.
1919    Returns a restorable `previous_state_backup` in raw Config API format.
1920    """
1921    workspace = CloudWorkspace.from_env()
1922    conn = workspace.get_connection(connection_id)
1923
1924    try:
1925        result = reset_cloud_stream_state(
1926            conn,
1927            stream_name=stream_name,
1928            stream_namespace=stream_namespace,
1929        )
1930    except PyAirbyteInputError as e:
1931        exit_with_error(str(e))
1932
1933    print(json.dumps(result.model_dump(), indent=2, default=str))
1934
1935
1936@catalog_app.command(name="get")
1937def get_connection_catalog(
1938    connection_id: Annotated[
1939        str,
1940        Parameter(help="The connection ID (UUID) to fetch catalog for."),
1941    ],
1942    output: Annotated[
1943        str | None,
1944        Parameter(
1945            help="Path to write the catalog JSON output to a file.",
1946            name="--output",
1947        ),
1948    ] = None,
1949) -> None:
1950    """Get the configured catalog for an Airbyte Cloud connection."""
1951    workspace = CloudWorkspace.from_env()
1952    conn = workspace.get_connection(connection_id)
1953    result = conn.dump_raw_catalog()
1954    if result is None:
1955        exit_with_error("No configured catalog found for this connection.")
1956        raise SystemExit(1)  # unreachable, but satisfies type checker
1957
1958    json_output = json.dumps(result, indent=2, default=str)
1959    if output is not None:
1960        Path(output).write_text(json_output + "\n")
1961        print(f"Catalog written to {output}", file=sys.stderr)
1962    else:
1963        print(json_output)
1964
1965
1966@catalog_app.command(name="set")
1967def set_connection_catalog(
1968    connection_id: Annotated[
1969        str,
1970        Parameter(help="The connection ID (UUID) to update catalog for."),
1971    ],
1972    catalog_json: Annotated[
1973        str | None,
1974        Parameter(
1975            help="The configured catalog as a JSON string. "
1976            "Can also be provided via --input or STDIN."
1977        ),
1978    ] = None,
1979    input_file: Annotated[
1980        str | None,
1981        Parameter(
1982            help="Path to a JSON file containing the catalog to set.",
1983            name="--input",
1984        ),
1985    ] = None,
1986) -> None:
1987    """Set the configured catalog for an Airbyte Cloud connection.
1988
1989    Catalog JSON can be provided as a positional argument, via --input <file>,
1990    or piped through STDIN.
1991
1992    WARNING: This replaces the entire configured catalog.
1993    """
1994    parsed_catalog = _read_json_input(catalog_json, input_file)
1995    workspace = CloudWorkspace.from_env()
1996    conn = workspace.get_connection(connection_id)
1997    conn.import_raw_catalog(parsed_catalog)
1998
1999    result = conn.dump_raw_catalog()
2000    if result is None:
2001        exit_with_error("Failed to retrieve catalog after import.")
2002        raise SystemExit(1)  # unreachable, but satisfies type checker
2003
2004    json_output = json.dumps(result, indent=2, default=str)
2005    print(json_output)
2006
2007
2008@logs_app.command(name="lookup-cloud-backend-error")
2009def lookup_cloud_backend_error(
2010    error_id: Annotated[
2011        str,
2012        Parameter(
2013            help=(
2014                "The error ID (UUID) to search for. This is typically returned "
2015                "in API error responses as {'errorId': '...'}"
2016            )
2017        ),
2018    ],
2019    lookback_days: Annotated[
2020        int,
2021        Parameter(help="Number of days to look back in logs."),
2022    ] = 7,
2023    min_severity_filter: Annotated[
2024        GCPSeverity | None,
2025        Parameter(
2026            help="Optional minimum severity level to filter logs.",
2027        ),
2028    ] = None,
2029    raw: Annotated[
2030        bool,
2031        Parameter(help="Output raw JSON instead of formatted text."),
2032    ] = False,
2033) -> None:
2034    """Look up error details from GCP Cloud Logging by error ID.
2035
2036    When an Airbyte Cloud API returns an error response with only an error ID
2037    (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command
2038    fetches the full stack trace and error details from GCP Cloud Logging.
2039
2040    Requires GCP credentials with Logs Viewer role on the target project.
2041    Set up credentials with: gcloud auth application-default login
2042    """
2043    print(f"Searching for error ID: {error_id}", file=sys.stderr)
2044    print(f"Lookback days: {lookback_days}", file=sys.stderr)
2045    if min_severity_filter:
2046        print(f"Severity filter: {min_severity_filter}", file=sys.stderr)
2047    print(file=sys.stderr)
2048
2049    result = fetch_error_logs(
2050        error_id=error_id,
2051        lookback_days=lookback_days,
2052        min_severity_filter=min_severity_filter,
2053    )
2054
2055    if raw:
2056        print_json(result.model_dump())
2057        return
2058
2059    print(f"Found {result.total_entries_found} log entries", file=sys.stderr)
2060    print(file=sys.stderr)
2061
2062    if result.payloads:
2063        for i, payload in enumerate(result.payloads):
2064            print(f"=== Log Group {i + 1} ===")
2065            print(f"Timestamp: {payload.timestamp}")
2066            print(f"Severity: {payload.severity}")
2067            if payload.resource.labels.pod_name:
2068                print(f"Pod: {payload.resource.labels.pod_name}")
2069            print(f"Lines: {payload.num_log_lines}")
2070            print()
2071            print(payload.message)
2072            print()
2073    elif result.entries:
2074        print("No grouped payloads, showing raw entries:", file=sys.stderr)
2075        for entry in result.entries:
2076            print(f"[{entry.timestamp}] {entry.severity}: {entry.payload}")
2077    else:
2078        print_error("No log entries found for this error ID.")
2079
2080
2081# =============================================================================
2082# Organization commands
2083# =============================================================================
2084
2085
2086@organization_app.command(name="search")
2087def organization_search(
2088    name_contains: Annotated[
2089        str,
2090        Parameter(
2091            name="--name-contains",
2092            help="Case-insensitive substring to match against organization name or email.",
2093        ),
2094    ],
2095    limit: Annotated[
2096        int,
2097        Parameter(
2098            name="--limit",
2099            help="Maximum number of results to return.",
2100        ),
2101    ] = 20,
2102) -> None:
2103    """Search organizations by name or email substring."""
2104    rows = _search_organizations(name_contains=name_contains, limit=limit)
2105    if not rows:
2106        print_error(f"No organizations found matching '{name_contains}'.")
2107        return
2108
2109    print_json([dict(row) for row in rows])
2110
2111
2112@organization_app.command(name="info")
2113def organization_info(
2114    organization_id: Annotated[
2115        str,
2116        Parameter(help="The organization UUID."),
2117    ],
2118) -> None:
2119    """Get information about an organization (stub — not yet implemented)."""
2120    exit_with_error(
2121        "organization info is not yet implemented. "
2122        "Use 'airbyte-ops cloud organization search' to find organizations by name."
2123    )
2124
2125
2126# =============================================================================
2127# Workspace commands
2128# =============================================================================
2129
2130
2131@workspace_app.command(name="search")
2132def workspace_search(
2133    name_contains: Annotated[
2134        str | None,
2135        Parameter(
2136            name="--name-contains",
2137            help="Case-insensitive substring to match against workspace name or slug.",
2138        ),
2139    ] = None,
2140    email_domain: Annotated[
2141        str | None,
2142        Parameter(
2143            name="--email-domain",
2144            help="Email domain to search for (e.g. 'motherduck.com').",
2145        ),
2146    ] = None,
2147    limit: Annotated[
2148        int,
2149        Parameter(
2150            name="--limit",
2151            help="Maximum number of results to return.",
2152        ),
2153    ] = 100,
2154) -> None:
2155    """Search workspaces by name substring or email domain."""
2156    if not name_contains and not email_domain:
2157        exit_with_error(
2158            "At least one of --name-contains or --email-domain must be provided."
2159        )
2160
2161    if name_contains:
2162        rows = _search_workspaces(name_contains=name_contains, limit=limit)
2163    else:
2164        rows = query_workspaces_by_email_domain(
2165            email_domain=email_domain.lstrip("@"),  # type: ignore[union-attr]
2166            limit=limit,
2167        )
2168
2169    if not rows:
2170        search_term = name_contains or email_domain
2171        print_error(f"No workspaces found matching '{search_term}'.")
2172        return
2173
2174    print_json([dict(row) for row in rows])
2175
2176
2177@workspace_app.command(name="info")
2178def workspace_info_cmd(
2179    workspace_id: Annotated[
2180        str,
2181        Parameter(help="The workspace UUID."),
2182    ],
2183) -> None:
2184    """Get information about a workspace."""
2185    result = query_workspace_info(workspace_id=workspace_id)
2186    if not result:
2187        print_error(f"No workspace found with ID '{workspace_id}'.")
2188        return
2189
2190    print_json(dict(result))