airbyte_ops_mcp.cli.cloud
CLI commands for Airbyte Cloud operations.
Commands:
airbyte-ops cloud connector get-version-info - Get connector version info airbyte-ops cloud connector set-version-override - Set connector version override airbyte-ops cloud connector clear-version-override - Clear connector version override airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file airbyte-ops cloud organization search - Search organizations by name/email airbyte-ops cloud organization info - Get organization info airbyte-ops cloud workspace search - Search workspaces by name/slug or email domain airbyte-ops cloud workspace info - Get workspace info
CLI reference
The commands below are regenerated by poe docs-generate via cyclopts's
programmatic docs API; see docs/generate_cli.py.
airbyte-ops cloud COMMAND
Airbyte Cloud operations.
Commands:
connection: Connection operations in Airbyte Cloud.connector: Deployed connector operations in Airbyte Cloud.db: Database operations for Airbyte Cloud Prod DB Replica.logs: GCP Cloud Logging operations for Airbyte Cloud.organization: Organization operations in Airbyte Cloud.workspace: Workspace operations in Airbyte Cloud.
airbyte-ops cloud connector
Deployed connector operations in Airbyte Cloud.
airbyte-ops cloud connector get-version-info
airbyte-ops cloud connector get-version-info WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE
Get the current version information for a deployed connector.
Parameters:
WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]
airbyte-ops cloud connector set-version-override
airbyte-ops cloud connector set-version-override VERSION REASON ISSUE-URL APPROVAL-COMMENT-URL [ARGS]
Set a version override for a deployed connector.
Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and
AIRBYTE_INTERNAL_ADMIN_USER environment variables.
The --override-level flag selects the scope at which the pin is applied:
actor(default): pins a single deployed connector instance. Requires--workspace-id,--connector-id, and--connector-type.workspace: pins ALL instances of a connector type within a workspace. Requires--workspace-idand--actor-definition-id.organization: pins ALL instances of a connector type across an organization. Requires--organization-idand--actor-definition-id.
The customer_tier_filter gates the operation: the call fails if the
actual tier of the target organization does not match. Use ALL to
proceed regardless of tier (a warning is shown for sensitive tiers).
Parameters:
VERSION, --version: The semver version string to pin to (e.g., '2.1.5-preview.abc1234'). [required]REASON, --reason: Explanation for the override (min 10 characters). [required]ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]OVERRIDE-LEVEL, --override-level: Scope at which to apply the version override: 'actor' (single deployed connector instance), 'workspace' (all instances of a connector type within a workspace), or 'organization' (all instances across an organization). Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).REASON-URL, --reason-url: Optional URL with more context (e.g., issue link).CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]
airbyte-ops cloud connector clear-version-override
airbyte-ops cloud connector clear-version-override ISSUE-URL APPROVAL-COMMENT-URL [ARGS]
Clear a version override from a deployed connector.
Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and
AIRBYTE_INTERNAL_ADMIN_USER environment variables.
The --override-level flag selects the scope at which the pin is removed:
actor(default): clears a single deployed connector instance pin. Requires--workspace-id,--connector-id, and--connector-type.workspace: clears the workspace-level pin for a connector type. Requires--workspace-idand--actor-definition-id.organization: clears the organization-level pin for a connector type. Requires--organization-idand--actor-definition-id.
Parameters:
ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]OVERRIDE-LEVEL, --override-level: Scope at which to clear the version override: 'actor', 'workspace', or 'organization'. Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]
airbyte-ops cloud connector regression-test
airbyte-ops cloud connector regression-test [ARGS]
Run regression tests on connectors.
This command supports two modes:
Comparison mode (skip_compare=False, default): Runs the specified Airbyte protocol command against both the target (new) and control (baseline) connector versions, then compares the results. This helps identify regressions between versions.
Single-version mode (skip_compare=True): Runs the specified Airbyte protocol command against a single connector and validates the output. No comparison is performed.
Results are written to the output directory and to GitHub Actions outputs if running in CI.
You can provide the test image in three ways:
- --test-image: Use a pre-built image from Docker registry
- --connector-name: Build the image locally from source code
- --connection-id: Auto-detect from an Airbyte Cloud connection
You can provide config/catalog either via file paths OR via a connection_id that fetches them from Airbyte Cloud.
Parameters:
SKIP-COMPARE, --skip-compare, --no-skip-compare: If True, skip comparison and run single-version tests only. If False (default), run comparison tests (target vs control). [default: False]TEST-IMAGE, --test-image: Test connector image with tag (e.g., airbyte/source-github:1.0.0). This is the image under test - in comparison mode, it's compared against control_image.CONTROL-IMAGE, --control-image: Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). Ignored ifskip_compare=True.CONNECTOR-NAME, --connector-name: Connector name to build image from source (e.g., 'source-pokeapi'). If provided, builds the image locally with tag 'dev'. For comparison tests (default), this builds the target image. For single-version tests (skip_compare=True), this builds the test image.REPO-ROOT, --repo-root: Path to the airbyte repo root. Required if connector_name is provided and the repo cannot be auto-detected.COMMAND, --command: The Airbyte command to run. [choices: spec, check, discover, read] [default: check]CONNECTION-ID, --connection-id: Airbyte Cloud connection ID to fetch config/catalog from. Mutually exclusive with config-path/catalog-path. If provided, test_image/control_image can be auto-detected.CONFIG-PATH, --config-path: Path to the connector config JSON file.CATALOG-PATH, --catalog-path: Path to the configured catalog JSON file (required for read).STATE-PATH, --state-path: Path to the state JSON file (optional for read).OUTPUT-DIR, --output-dir: Directory to store test artifacts. [default: /tmp/regression_test_artifacts]ENABLE-HTTP-METRICS, --enable-http-metrics, --no-enable-http-metrics: Capture HTTP traffic metrics via mitmproxy (experimental). Requires mitmdump to be installed. Only used in comparison mode. [default: False]SELECTED-STREAMS, --selected-streams: Comma-separated list of stream names to include in the read. Only these streams will be included in the configured catalog. This is useful to limit data volume by testing only specific streams.ENABLE-DEBUG-LOGS, --enable-debug-logs, --no-enable-debug-logs: Enable debug-level logging for regression test output. Also passed asLOG_LEVEL=DEBUGto the connector Docker container. [default: False]WITH-STATE, --with-state, --no-state: Fetch and pass the connection's current state to the read command, producing a warm read instead of a cold read. Defaults toTruewhen--connection-idis provided,Falseotherwise. Has no effect unless the command isread. Ignored when--state-pathis explicitly provided.
airbyte-ops cloud connector fetch-connection-config
airbyte-ops cloud connector fetch-connection-config CONNECTION-ID [ARGS]
Fetch connection configuration from Airbyte Cloud to a local file.
This command retrieves the source configuration for a given connection ID
and writes it to a JSON file. When --output-path is omitted the file is
written to the platform temp directory to avoid accidentally committing
secrets to a git repository.
Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET environment variables.
When --with-secrets is specified, the command fetches unmasked secrets from the internal database using the connection-retriever. This additionally requires:
- An OC issue URL for audit logging (--oc-issue-url)
- GCP credentials via
GCP_PROD_DB_ACCESS_CREDENTIALSenv var orgcloud auth application-default login - Cloud SQL Python Connector access to the Prod DB replica.
Parameters:
CONNECTION-ID, --connection-id: The UUID of the Airbyte Cloud connection. [required]OUTPUT-PATH, --output-path: Path to output file or directory. If directory, writes connection--config.json inside it. Default: platform temp directory (e.g. /tmp/connection- -config.json) WITH-SECRETS, --with-secrets, --no-secrets: If set, fetches unmasked secrets from the internal database. Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var orgcloud auth application-default login. Must be used with --oc-issue-url. [default: False]OC-ISSUE-URL, --oc-issue-url: OC issue URL for audit logging. Required when using --with-secrets.
airbyte-ops cloud connector rollout
Connector rollout operations.
Commands:
autopilot: AutoPilot rollout orchestration commands.list: List connector rollouts from the production database.
airbyte-ops cloud connector rollout autopilot
AutoPilot rollout orchestration commands.
airbyte-ops cloud connector rollout autopilot auto-start
airbyte-ops cloud connector rollout autopilot auto-start [ARGS]
Start INITIALIZED rollouts that have autopilot auto-start enabled.
Queries the prod DB for rollouts in initialized state, checks the
connector's autopilotConfig.autoStart gate, and starts eligible
rollouts via the Cloud Config API.
Parameters:
CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-advance
airbyte-ops cloud connector rollout autopilot auto-advance [ARGS]
Advance IN_PROGRESS rollouts within their current tier.
Finds rollouts with automated strategy that haven't reached their
target percentage and advances them based on the configured strategy
pacing (fast/slow/default).
Parameters:
CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-promote
airbyte-ops cloud connector rollout autopilot auto-promote [ARGS]
Promote rollouts when at 100% of current tier.
Checks the autopilotConfig.autoPromoteStages gate. Currently
finalizes rollouts at ALL tier as succeeded (GA promotion).
Cross-tier promotion (TIER_2 -> TIER_1 -> ALL) is not yet implemented.
Parameters:
CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-triage-failed
airbyte-ops cloud connector rollout autopilot auto-triage-failed [ARGS]
Triage failed rollouts: log all failures, check unpin eligibility.
Finds rollouts in errored or paused state. Logs every failure
unconditionally. Checks safe-to-downgrade eligibility per
unsafeDowngrades. Actor-level unpinning is not yet implemented.
Parameters:
CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-supersede
airbyte-ops cloud connector rollout autopilot auto-supersede [ARGS]
Cancel older rollouts when a newer RC exists for the same connector.
Detects connectors with multiple active rollouts at different RC versions.
Cancels older rollouts with retain_pins_on_cancellation=True so pinned
actors remain on the old version until the new rollout progressively
advances to include them.
Parameters:
CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout autopilot auto-rollback-failed
airbyte-ops cloud connector rollout autopilot auto-rollback-failed [ARGS]
Full rollback/cancel of failed rollouts when safe to downgrade.
Finds rollouts in errored state. Checks the unsafeDowngrades gate.
If the version is safe to downgrade, calls finalize_connector_rollout
with state failed_rolled_back to cancel the entire rollout.
Parameters:
CONNECTOR, --connector: Filter to a specific connector by canonical name (e.g., 'source-github').DRY-RUN, --dry-run, --no-dry-run: Preview actions without executing them. [default: False]
airbyte-ops cloud connector rollout list
airbyte-ops cloud connector rollout list [OPTIONS]
List connector rollouts from the production database.
Parameters:
--include-inactive, --no-include-inactive: Include terminal (canceled/succeeded/errored) rollouts. Requires --limit. [default: False]--limit: Maximum number of rollouts to return. Required when --include-inactive is set. [default: 0]
airbyte-ops cloud db
Database operations for Airbyte Cloud Prod DB Replica.
airbyte-ops cloud db start-proxy
airbyte-ops cloud db start-proxy [ARGS]
Start the Cloud SQL Proxy for database access.
This command starts the Cloud SQL Auth Proxy to enable connections to the Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.
By default, runs as a daemon (background process). Use --no-daemon to run in foreground mode where you can see logs and stop with Ctrl+C.
Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, which should contain the service account JSON credentials.
After starting the proxy, set these environment variables to use database tools: export USE_CLOUD_SQL_PROXY=1 export DB_PORT={port}
Parameters:
PORT, --port: Port for the Cloud SQL Proxy to listen on. [default: 15432]DAEMON, --daemon, --no-daemon: Run as daemon in background (default). Use --no-daemon for foreground. [default: True]
airbyte-ops cloud db stop-proxy
airbyte-ops cloud db stop-proxy
Stop the Cloud SQL Proxy daemon.
This command stops a Cloud SQL Proxy that was started with 'start-proxy'. It reads the PID from the PID file and sends a SIGTERM signal to stop the process.
airbyte-ops cloud connection
Connection operations in Airbyte Cloud.
airbyte-ops cloud connection state
Connection state operations in Airbyte Cloud.
Commands:
get: Get the current state for an Airbyte Cloud connection.reset: Reset a configured stream's state so the next sync full-refreshes it.set: Set the state for an Airbyte Cloud connection.
airbyte-ops cloud connection state get
airbyte-ops cloud connection state get CONNECTION-ID [ARGS]
Get the current state for an Airbyte Cloud connection.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch state for. [required]STREAM, --stream: Optional stream name to filter state for a single stream.NAMESPACE, --namespace: Optional stream namespace to narrow the stream filter.OUTPUT, --output: Path to write the state JSON output to a file.
airbyte-ops cloud connection state set
airbyte-ops cloud connection state set CONNECTION-ID [ARGS]
Set the state for an Airbyte Cloud connection.
State JSON can be provided as a positional argument, via --input
Uses the safe variant that prevents updates while a sync is running. When --stream is provided, only that stream's state is updated within the existing connection state.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]STATE-JSON, --state-json: The connection state as a JSON string. When --stream is used, this is just the stream's state blob (e.g., '{"cursor": "2024-01-01"}'). Otherwise, must include 'stateType', 'connectionId', and the appropriate state field. Can also be provided via --input or STDIN.STREAM, --stream: Optional stream name to update state for a single stream only.NAMESPACE, --namespace: Optional stream namespace to identify the stream.INPUT, --input: Path to a JSON file containing the state to set.
airbyte-ops cloud connection state reset
airbyte-ops cloud connection state reset CONNECTION-ID STREAM [ARGS]
Reset a configured stream's state so the next sync full-refreshes it.
Uses the safe variant that prevents updates while a sync is running.
Returns a restorable previous_state_backup in raw Config API format.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]STREAM, --stream: The configured stream name whose state should be reset. [required]NAMESPACE, --namespace: Optional stream namespace to identify the stream.
airbyte-ops cloud connection catalog
Connection catalog operations in Airbyte Cloud.
Commands:
get: Get the configured catalog for an Airbyte Cloud connection.set: Set the configured catalog for an Airbyte Cloud connection.
airbyte-ops cloud connection catalog get
airbyte-ops cloud connection catalog get CONNECTION-ID [ARGS]
Get the configured catalog for an Airbyte Cloud connection.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch catalog for. [required]OUTPUT, --output: Path to write the catalog JSON output to a file.
airbyte-ops cloud connection catalog set
airbyte-ops cloud connection catalog set CONNECTION-ID [ARGS]
Set the configured catalog for an Airbyte Cloud connection.
Catalog JSON can be provided as a positional argument, via --input
WARNING: This replaces the entire configured catalog.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update catalog for. [required]CATALOG-JSON, --catalog-json: The configured catalog as a JSON string. Can also be provided via --input or STDIN.INPUT, --input: Path to a JSON file containing the catalog to set.
airbyte-ops cloud logs
GCP Cloud Logging operations for Airbyte Cloud.
airbyte-ops cloud logs lookup-cloud-backend-error
airbyte-ops cloud logs lookup-cloud-backend-error ERROR-ID [ARGS]
Look up error details from GCP Cloud Logging by error ID.
When an Airbyte Cloud API returns an error response with only an error ID (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command fetches the full stack trace and error details from GCP Cloud Logging.
Requires GCP credentials with Logs Viewer role on the target project. Set up credentials with: gcloud auth application-default login
Parameters:
ERROR-ID, --error-id: The error ID (UUID) to search for. This is typically returned in API error responses as {'errorId': '...'} [required]LOOKBACK-DAYS, --lookback-days: Number of days to look back in logs. [default: 7]MIN-SEVERITY-FILTER, --min-severity-filter: Optional minimum severity level to filter logs. [choices: debug, info, notice, warning, error, critical, alert, emergency]RAW, --raw, --no-raw: Output raw JSON instead of formatted text. [default: False]
airbyte-ops cloud organization
Organization operations in Airbyte Cloud.
airbyte-ops cloud organization search
airbyte-ops cloud organization search NAME-CONTAINS [ARGS]
Search organizations by name or email substring.
Parameters:
NAME-CONTAINS, --name-contains: Case-insensitive substring to match against organization name or email. [required]LIMIT, --limit: Maximum number of results to return. [default: 20]
airbyte-ops cloud organization info
airbyte-ops cloud organization info ORGANIZATION-ID
Get information about an organization (stub — not yet implemented).
Parameters:
ORGANIZATION-ID, --organization-id: The organization UUID. [required]
airbyte-ops cloud workspace
Workspace operations in Airbyte Cloud.
airbyte-ops cloud workspace search
airbyte-ops cloud workspace search [ARGS]
Search workspaces by name substring or email domain.
Parameters:
NAME-CONTAINS, --name-contains: Case-insensitive substring to match against workspace name or slug.EMAIL-DOMAIN, --email-domain: Email domain to search for (e.g. 'motherduck.com').LIMIT, --limit: Maximum number of results to return. [default: 100]
airbyte-ops cloud workspace info
airbyte-ops cloud workspace info WORKSPACE-ID
Get information about a workspace.
Parameters:
WORKSPACE-ID, --workspace-id: The workspace UUID. [required]
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""CLI commands for Airbyte Cloud operations. 3 4Commands: 5 airbyte-ops cloud connector get-version-info - Get connector version info 6 airbyte-ops cloud connector set-version-override - Set connector version override 7 airbyte-ops cloud connector clear-version-override - Clear connector version override 8 airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) 9 airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file 10 airbyte-ops cloud organization search - Search organizations by name/email 11 airbyte-ops cloud organization info - Get organization info 12 airbyte-ops cloud workspace search - Search workspaces by name/slug or email domain 13 airbyte-ops cloud workspace info - Get workspace info 14 15## CLI reference 16 17The commands below are regenerated by `poe docs-generate` via cyclopts's 18programmatic docs API; see `docs/generate_cli.py`. 19 20.. include:: ../../../docs/generated/cli/cloud.md 21 :start-line: 2 22""" 23 24from __future__ import annotations 25 26# Hide Python-level members from the pdoc page for this module; the rendered 27# docs for this CLI group come entirely from the grafted `.. include::` in 28# the module docstring above. 29__all__: list[str] = [] 30 31import json 32import logging 33import os 34import shutil 35import signal 36import socket 37import subprocess 38import sys 39import tempfile 40import time 41from pathlib import Path 42from typing import Annotated, Literal 43 44import requests 45import yaml 46from airbyte.cloud.workspaces import CloudWorkspace 47from airbyte.exceptions import PyAirbyteInputError 48from airbyte_cdk.models.connector_metadata import MetadataFile 49from airbyte_cdk.utils.connector_paths import find_connector_root_from_name 50from airbyte_cdk.utils.docker import build_connector_image, verify_docker_installation 51from airbyte_protocol.models import ConfiguredAirbyteCatalog 52from cyclopts import Parameter 53from fastmcp_extensions.cli import ( 54 exit_with_error, 55 print_error, 56 print_json, 57 print_success, 58 print_warning, 59) 60 61from airbyte_ops_mcp.cli._base import App, app 62from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError 63from airbyte_ops_mcp.cloud_admin.connection_config import fetch_connection_config 64from airbyte_ops_mcp.cloud_admin.connection_state import ( 65 reset_stream_state as reset_cloud_stream_state, 66) 67from airbyte_ops_mcp.cloud_admin.registry_lookup import ( 68 resolve_definition_id_to_canonical_info, 69) 70from airbyte_ops_mcp.cloud_admin.version_overrides import ( 71 ResolvedCloudAuth, 72 VersionOverrideTarget, 73 get_connector_version_info, 74) 75from airbyte_ops_mcp.cloud_admin.version_overrides import ( 76 set_version_override as set_cloud_version_override, 77) 78from airbyte_ops_mcp.constants import ( 79 CLOUD_SQL_INSTANCE, 80 CLOUD_SQL_PROXY_PID_FILE, 81 DEFAULT_CLOUD_SQL_PROXY_PORT, 82 ENV_GCP_PROD_DB_ACCESS_CREDENTIALS, 83) 84from airbyte_ops_mcp.gcp_logs import GCPSeverity, fetch_error_logs 85from airbyte_ops_mcp.prod_db_access.queries import ( 86 query_workspace_info, 87 query_workspaces_by_email_domain, 88) 89from airbyte_ops_mcp.prod_db_access.queries import ( 90 search_organizations as _search_organizations, 91) 92from airbyte_ops_mcp.prod_db_access.queries import ( 93 search_workspaces as _search_workspaces, 94) 95from airbyte_ops_mcp.regression_tests.cdk_secrets import get_first_config_from_secrets 96from airbyte_ops_mcp.regression_tests.ci_output import ( 97 generate_regression_report, 98 generate_single_version_report, 99 write_github_output, 100 write_github_outputs, 101 write_github_summary, 102 write_json_output, 103 write_test_summary, 104) 105from airbyte_ops_mcp.regression_tests.config_overrides import ( 106 filter_configured_catalog_file, 107) 108from airbyte_ops_mcp.regression_tests.connection_fetcher import ( 109 fetch_connection_data, 110 fetch_connection_state, 111 save_connection_data_to_files, 112) 113from airbyte_ops_mcp.regression_tests.connection_secret_retriever import ( 114 SecretRetrievalError, 115 enrich_config_with_secrets, 116 should_use_secret_retriever, 117) 118from airbyte_ops_mcp.regression_tests.connector_runner import ( 119 ConnectorRunner, 120 ensure_image_available, 121) 122from airbyte_ops_mcp.regression_tests.http_metrics import ( 123 MitmproxyManager, 124 parse_http_dump, 125) 126from airbyte_ops_mcp.regression_tests.models import ( 127 Command, 128 ConnectorUnderTest, 129 ExecutionInputs, 130 TargetOrControl, 131) 132from airbyte_ops_mcp.telemetry import track_regression_test 133from airbyte_ops_mcp.tier_cache import resolve_workspace 134 135# Path to connectors directory within the airbyte repo 136CONNECTORS_SUBDIR = Path("airbyte-integrations") / "connectors" 137 138# Create the cloud sub-app 139cloud_app = App(name="cloud", help="Airbyte Cloud operations.") 140app.command(cloud_app) 141 142# Create the connector sub-app under cloud 143connector_app = App( 144 name="connector", help="Deployed connector operations in Airbyte Cloud." 145) 146cloud_app.command(connector_app) 147 148# Create the db sub-app under cloud 149db_app = App(name="db", help="Database operations for Airbyte Cloud Prod DB Replica.") 150cloud_app.command(db_app) 151 152# Create the connection sub-app under cloud 153connection_app = App(name="connection", help="Connection operations in Airbyte Cloud.") 154cloud_app.command(connection_app) 155 156# Create the state sub-app under connection 157state_app = App(name="state", help="Connection state operations in Airbyte Cloud.") 158connection_app.command(state_app) 159 160# Create the catalog sub-app under connection 161catalog_app = App( 162 name="catalog", help="Connection catalog operations in Airbyte Cloud." 163) 164connection_app.command(catalog_app) 165 166# Create the logs sub-app under cloud 167logs_app = App(name="logs", help="GCP Cloud Logging operations for Airbyte Cloud.") 168cloud_app.command(logs_app) 169 170# Create the organization sub-app under cloud 171organization_app = App( 172 name="organization", help="Organization operations in Airbyte Cloud." 173) 174cloud_app.command(organization_app) 175 176# Create the workspace sub-app under cloud 177workspace_app = App(name="workspace", help="Workspace operations in Airbyte Cloud.") 178cloud_app.command(workspace_app) 179 180 181@db_app.command(name="start-proxy") 182def start_proxy( 183 port: Annotated[ 184 int, 185 Parameter(help="Port for the Cloud SQL Proxy to listen on."), 186 ] = DEFAULT_CLOUD_SQL_PROXY_PORT, 187 daemon: Annotated[ 188 bool, 189 Parameter( 190 help="Run as daemon in background (default). Use --no-daemon for foreground." 191 ), 192 ] = True, 193) -> None: 194 """Start the Cloud SQL Proxy for database access. 195 196 This command starts the Cloud SQL Auth Proxy to enable connections to the 197 Airbyte Cloud Prod DB Replica. The proxy is required for database query tools. 198 199 By default, runs as a daemon (background process). Use --no-daemon to run in 200 foreground mode where you can see logs and stop with Ctrl+C. 201 202 Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, 203 which should contain the service account JSON credentials. 204 205 After starting the proxy, set these environment variables to use database tools: 206 export USE_CLOUD_SQL_PROXY=1 207 export DB_PORT={port} 208 209 Example: 210 airbyte-ops cloud db start-proxy 211 airbyte-ops cloud db start-proxy --port 15432 212 airbyte-ops cloud db start-proxy --no-daemon 213 """ 214 # Check if proxy is already running on the requested port (idempotency) 215 try: 216 with socket.create_connection(("127.0.0.1", port), timeout=0.5): 217 # Something is already listening on this port 218 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 219 pid_info = "" 220 if pid_file.exists(): 221 pid_info = f" (PID: {pid_file.read_text().strip()})" 222 print_success( 223 f"Cloud SQL Proxy is already running on port {port}{pid_info}" 224 ) 225 print_success("") 226 print_success("To use database tools, set these environment variables:") 227 print_success(" export USE_CLOUD_SQL_PROXY=1") 228 print_success(f" export DB_PORT={port}") 229 return 230 except (OSError, TimeoutError, ConnectionRefusedError): 231 pass # Port not in use, proceed with starting proxy 232 233 # Check if cloud-sql-proxy is installed 234 proxy_path = shutil.which("cloud-sql-proxy") 235 if not proxy_path: 236 exit_with_error( 237 "cloud-sql-proxy not found in PATH. " 238 "Install it from: https://cloud.google.com/sql/docs/mysql/sql-proxy" 239 ) 240 241 # Get credentials from environment 242 creds_json = os.getenv(ENV_GCP_PROD_DB_ACCESS_CREDENTIALS) 243 if not creds_json: 244 exit_with_error( 245 f"{ENV_GCP_PROD_DB_ACCESS_CREDENTIALS} environment variable is not set. " 246 "This should contain the GCP service account JSON credentials." 247 ) 248 249 # Build the command using --json-credentials to avoid writing to disk 250 cmd = [ 251 proxy_path, 252 CLOUD_SQL_INSTANCE, 253 f"--port={port}", 254 f"--json-credentials={creds_json}", 255 ] 256 257 print_success(f"Starting Cloud SQL Proxy on port {port}...") 258 print_success(f"Instance: {CLOUD_SQL_INSTANCE}") 259 print_success("") 260 print_success("To use database tools, set these environment variables:") 261 print_success(" export USE_CLOUD_SQL_PROXY=1") 262 print_success(f" export DB_PORT={port}") 263 print_success("") 264 265 if daemon: 266 # Run in background (daemon mode) with log file for diagnostics 267 log_file_path = Path("/tmp/airbyte-cloud-sql-proxy.log") 268 log_file = log_file_path.open("ab") 269 process = subprocess.Popen( 270 cmd, 271 stdout=subprocess.DEVNULL, 272 stderr=log_file, 273 start_new_session=True, 274 ) 275 276 # Brief wait to verify the process started successfully 277 time.sleep(0.5) 278 if process.poll() is not None: 279 # Process exited immediately - read any error output 280 log_file.close() 281 error_output = "" 282 if log_file_path.exists(): 283 error_output = log_file_path.read_text()[-1000:] # Last 1000 chars 284 exit_with_error( 285 f"Cloud SQL Proxy failed to start (exit code: {process.returncode}).\n" 286 f"Check logs at {log_file_path}\n" 287 f"Recent output: {error_output}" 288 ) 289 290 # Write PID to file for stop-proxy command 291 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 292 pid_file.write_text(str(process.pid)) 293 print_success(f"Cloud SQL Proxy started as daemon (PID: {process.pid})") 294 print_success(f"Logs: {log_file_path}") 295 print_success("To stop: airbyte-ops cloud db stop-proxy") 296 else: 297 # Run in foreground - replace current process 298 # Signals (Ctrl+C) will be handled directly by the cloud-sql-proxy process 299 print_success("Running in foreground. Press Ctrl+C to stop the proxy.") 300 print_success("") 301 os.execv(proxy_path, cmd) 302 303 304@db_app.command(name="stop-proxy") 305def stop_proxy() -> None: 306 """Stop the Cloud SQL Proxy daemon. 307 308 This command stops a Cloud SQL Proxy that was started with 'start-proxy'. 309 It reads the PID from the PID file and sends a SIGTERM signal to stop the process. 310 311 Example: 312 airbyte-ops cloud db stop-proxy 313 """ 314 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 315 316 if not pid_file.exists(): 317 exit_with_error( 318 f"PID file not found at {CLOUD_SQL_PROXY_PID_FILE}. " 319 "No Cloud SQL Proxy daemon appears to be running." 320 ) 321 322 pid_str = pid_file.read_text().strip() 323 if not pid_str.isdigit(): 324 pid_file.unlink() 325 exit_with_error(f"Invalid PID in {CLOUD_SQL_PROXY_PID_FILE}: {pid_str}") 326 327 pid = int(pid_str) 328 329 # Check if process is still running 330 try: 331 os.kill(pid, 0) # Signal 0 just checks if process exists 332 except ProcessLookupError: 333 pid_file.unlink() 334 print_success( 335 f"Cloud SQL Proxy (PID: {pid}) is not running. Cleaned up PID file." 336 ) 337 return 338 except PermissionError: 339 exit_with_error(f"Permission denied to check process {pid}.") 340 341 # Send SIGTERM to stop the process 342 try: 343 os.kill(pid, signal.SIGTERM) 344 print_success(f"Sent SIGTERM to Cloud SQL Proxy (PID: {pid}).") 345 except ProcessLookupError: 346 print_success(f"Cloud SQL Proxy (PID: {pid}) already stopped.") 347 except PermissionError: 348 exit_with_error(f"Permission denied to stop process {pid}.") 349 350 # Clean up PID file 351 pid_file.unlink(missing_ok=True) 352 print_success("Cloud SQL Proxy stopped.") 353 354 355@connector_app.command(name="get-version-info") 356def get_version_info( 357 workspace_id: Annotated[ 358 str, 359 Parameter(help="The Airbyte Cloud workspace ID."), 360 ], 361 connector_id: Annotated[ 362 str, 363 Parameter(help="The ID of the deployed connector (source or destination)."), 364 ], 365 connector_type: Annotated[ 366 Literal["source", "destination"], 367 Parameter(help="The type of connector."), 368 ], 369) -> None: 370 """Get the current version information for a deployed connector.""" 371 result = get_connector_version_info( 372 auth=_resolve_cli_cloud_auth(), 373 workspace_id=workspace_id, 374 actor_id=connector_id, 375 actor_type=connector_type, 376 ) 377 print_json(result.model_dump()) 378 379 380_OverrideLevel = Literal["actor", "workspace", "organization"] 381_TierFilter = Literal["TIER_0", "TIER_1", "TIER_2", "ALL"] 382 383 384def _resolve_cli_cloud_auth() -> ResolvedCloudAuth: 385 """Resolve Airbyte Cloud auth from environment variables for CLI use. 386 387 Mirrors the priority order the MCP server uses: 388 389 1. `AIRBYTE_CLOUD_BEARER_TOKEN` 390 2. `AIRBYTE_CLOUD_CLIENT_ID` + `AIRBYTE_CLOUD_CLIENT_SECRET` 391 392 Raises a CLI error via `exit_with_error` if no usable credentials are present. 393 """ 394 bearer_token = os.environ.get("AIRBYTE_CLOUD_BEARER_TOKEN") 395 if bearer_token: 396 return ResolvedCloudAuth(bearer_token=bearer_token) 397 398 client_id = os.environ.get("AIRBYTE_CLOUD_CLIENT_ID") 399 client_secret = os.environ.get("AIRBYTE_CLOUD_CLIENT_SECRET") 400 if client_id and client_secret: 401 return ResolvedCloudAuth(client_id=client_id, client_secret=client_secret) 402 403 exit_with_error( 404 "Missing Airbyte Cloud credentials. Set AIRBYTE_CLOUD_BEARER_TOKEN, or " 405 "both AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET." 406 ) 407 408 409def _validate_override_scope_args( 410 *, 411 override_level: _OverrideLevel, 412 workspace_id: str | None, 413 organization_id: str | None, 414 connector_id: str | None, 415 connector_type: Literal["source", "destination"] | None, 416 actor_definition_id: str | None, 417) -> None: 418 """Validate scope-specific arguments for `set/clear-version-override`. 419 420 Calls `exit_with_error` (which terminates the process) if the supplied 421 arguments are inconsistent with the requested `override_level`. 422 """ 423 if override_level == "actor": 424 missing = [ 425 name 426 for name, value in ( 427 ("--workspace-id", workspace_id), 428 ("--connector-id", connector_id), 429 ("--connector-type", connector_type), 430 ) 431 if not value 432 ] 433 if missing: 434 exit_with_error( 435 "Override level 'actor' requires " + ", ".join(missing) + "." 436 ) 437 if actor_definition_id is not None: 438 exit_with_error( 439 "Override level 'actor' must not be combined with " 440 "--actor-definition-id." 441 ) 442 if organization_id is not None: 443 exit_with_error( 444 "Override level 'actor' must not be combined with --organization-id." 445 ) 446 return 447 448 if override_level == "workspace": 449 missing = [ 450 name 451 for name, value in ( 452 ("--workspace-id", workspace_id), 453 ("--actor-definition-id", actor_definition_id), 454 ) 455 if not value 456 ] 457 if missing: 458 exit_with_error( 459 "Override level 'workspace' requires " + ", ".join(missing) + "." 460 ) 461 if connector_id is not None or connector_type is not None: 462 exit_with_error( 463 "Override level 'workspace' must not be combined with " 464 "--connector-id or --connector-type." 465 ) 466 if organization_id is not None: 467 exit_with_error( 468 "Override level 'workspace' must not be combined with " 469 "--organization-id." 470 ) 471 return 472 473 # override_level == "organization" 474 missing = [ 475 name 476 for name, value in ( 477 ("--organization-id", organization_id), 478 ("--actor-definition-id", actor_definition_id), 479 ) 480 if not value 481 ] 482 if missing: 483 exit_with_error( 484 "Override level 'organization' requires " + ", ".join(missing) + "." 485 ) 486 if connector_id is not None or connector_type is not None: 487 exit_with_error( 488 "Override level 'organization' must not be combined with " 489 "--connector-id or --connector-type." 490 ) 491 if workspace_id is not None: 492 exit_with_error( 493 "Override level 'organization' must not be combined with --workspace-id." 494 ) 495 496 497def _dispatch_version_override( 498 *, 499 override_level: _OverrideLevel, 500 workspace_id: str | None, 501 organization_id: str | None, 502 connector_id: str | None, 503 connector_type: Literal["source", "destination"] | None, 504 actor_definition_id: str | None, 505 version: str | None, 506 unset: bool, 507 reason: str | None, 508 reason_url: str | None, 509 issue_url: str, 510 approval_comment_url: str, 511 ai_agent_session_url: str | None, 512 customer_tier_filter: _TierFilter, 513) -> None: 514 """Route a version-override request through the normalized core helper. 515 516 Resolves `--actor-definition-id` to its canonical name and connector type 517 via the cloud registry for `workspace`/`organization` scopes. Prints the 518 operation result as JSON. 519 """ 520 _validate_override_scope_args( 521 override_level=override_level, 522 workspace_id=workspace_id, 523 organization_id=organization_id, 524 connector_id=connector_id, 525 connector_type=connector_type, 526 actor_definition_id=actor_definition_id, 527 ) 528 529 auth = _resolve_cli_cloud_auth() 530 531 if override_level == "actor": 532 assert workspace_id is not None 533 assert connector_id is not None 534 assert connector_type is not None 535 assert organization_id is None 536 try: 537 ws_resolution = resolve_workspace(workspace_id) 538 if not ws_resolution.organization_id: 539 exit_with_error("Could not resolve organization for workspace.") 540 result = set_cloud_version_override( 541 auth=auth, 542 target=VersionOverrideTarget( 543 scope="actor", 544 organization_id=ws_resolution.organization_id, 545 workspace_id=workspace_id, 546 actor_id=connector_id, 547 connector_type=connector_type, 548 ), 549 approval_comment_url=approval_comment_url, 550 version=version, 551 unset=unset, 552 override_reason=reason, 553 override_reason_reference_url=reason_url, 554 issue_url=issue_url, 555 ai_agent_session_url=ai_agent_session_url, 556 customer_tier_filter=customer_tier_filter, 557 ) 558 except (PyAirbyteInputError, CloudAuthError) as e: 559 exit_with_error(str(e)) 560 _print_override_result(result.success, result.message) 561 print_json(result.model_dump()) 562 return 563 564 try: 565 assert actor_definition_id is not None 566 canonical_name, resolved_connector_type = ( 567 resolve_definition_id_to_canonical_info(actor_definition_id) 568 ) 569 except (PyAirbyteInputError, requests.RequestException) as e: 570 exit_with_error(f"Failed to resolve --actor-definition-id: {e}") 571 typed_connector_type: Literal["source", "destination"] = ( 572 "source" if resolved_connector_type == "source" else "destination" 573 ) 574 575 if override_level == "workspace": 576 assert workspace_id is not None 577 try: 578 ws_resolution = resolve_workspace(workspace_id) 579 if not ws_resolution.organization_id: 580 exit_with_error("Could not resolve organization for workspace.") 581 target = VersionOverrideTarget( 582 scope="workspace", 583 organization_id=ws_resolution.organization_id, 584 workspace_id=workspace_id, 585 connector_name=canonical_name, 586 connector_type=typed_connector_type, 587 ) 588 except PyAirbyteInputError as e: 589 exit_with_error(str(e)) 590 else: 591 assert organization_id is not None 592 target = VersionOverrideTarget( 593 scope="organization", 594 organization_id=organization_id, 595 connector_name=canonical_name, 596 connector_type=typed_connector_type, 597 ) 598 599 try: 600 result = set_cloud_version_override( 601 auth=auth, 602 target=target, 603 approval_comment_url=approval_comment_url, 604 version=version, 605 unset=unset, 606 override_reason=reason, 607 override_reason_reference_url=reason_url, 608 issue_url=issue_url, 609 ai_agent_session_url=ai_agent_session_url, 610 customer_tier_filter=customer_tier_filter, 611 ) 612 except (PyAirbyteInputError, CloudAuthError) as e: 613 exit_with_error(str(e)) 614 _print_override_result(result.success, result.message) 615 print_json(result.model_dump()) 616 617 618def _print_override_result(success: bool, message: str) -> None: 619 """Emit an override-operation status message via the shared CLI helpers.""" 620 if success: 621 print_success(message) 622 else: 623 print_error(message) 624 625 626@connector_app.command(name="set-version-override") 627def set_version_override( 628 version: Annotated[ 629 str, 630 Parameter( 631 help="The semver version string to pin to (e.g., '2.1.5-preview.abc1234')." 632 ), 633 ], 634 reason: Annotated[ 635 str, 636 Parameter(help="Explanation for the override (min 10 characters)."), 637 ], 638 issue_url: Annotated[ 639 str, 640 Parameter(help="GitHub issue URL providing context for this operation."), 641 ], 642 approval_comment_url: Annotated[ 643 str, 644 Parameter( 645 help="Slack approval record URL where admin authorized this deployment." 646 ), 647 ], 648 override_level: Annotated[ 649 _OverrideLevel, 650 Parameter( 651 help=( 652 "Scope at which to apply the version override: 'actor' (single " 653 "deployed connector instance), 'workspace' (all instances of a " 654 "connector type within a workspace), or 'organization' (all " 655 "instances across an organization). Defaults to 'actor'." 656 ), 657 ), 658 ] = "actor", 659 workspace_id: Annotated[ 660 str | None, 661 Parameter( 662 help=( 663 "The Airbyte Cloud workspace ID. Required for 'actor' and " 664 "'workspace' override levels." 665 ) 666 ), 667 ] = None, 668 organization_id: Annotated[ 669 str | None, 670 Parameter( 671 help=( 672 "The Airbyte Cloud organization ID. Required for " 673 "'organization' override level." 674 ) 675 ), 676 ] = None, 677 connector_id: Annotated[ 678 str | None, 679 Parameter( 680 help=( 681 "The ID of the deployed connector (source or destination). " 682 "Required for 'actor' override level." 683 ) 684 ), 685 ] = None, 686 connector_type: Annotated[ 687 Literal["source", "destination"] | None, 688 Parameter(help="The type of connector. Required for 'actor' override level."), 689 ] = None, 690 actor_definition_id: Annotated[ 691 str | None, 692 Parameter( 693 help=( 694 "The connector definition UUID. Required for 'workspace' and " 695 "'organization' override levels." 696 ) 697 ), 698 ] = None, 699 ai_agent_session_url: Annotated[ 700 str | None, 701 Parameter( 702 help="URL to AI agent session driving this operation (for auditability)." 703 ), 704 ] = None, 705 reason_url: Annotated[ 706 str | None, 707 Parameter(help="Optional URL with more context (e.g., issue link)."), 708 ] = None, 709 customer_tier_filter: Annotated[ 710 _TierFilter, 711 Parameter( 712 help=( 713 "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 714 "The operation will be rejected if the actual customer tier does not match. " 715 "Defaults to 'TIER_2' (non-sensitive customers)." 716 ), 717 ), 718 ] = "TIER_2", 719) -> None: 720 """Set a version override for a deployed connector. 721 722 Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and 723 `AIRBYTE_INTERNAL_ADMIN_USER` environment variables. 724 725 The `--override-level` flag selects the scope at which the pin is applied: 726 727 - `actor` (default): pins a single deployed connector instance. Requires 728 `--workspace-id`, `--connector-id`, and `--connector-type`. 729 - `workspace`: pins ALL instances of a connector type within a workspace. 730 Requires `--workspace-id` and `--actor-definition-id`. 731 - `organization`: pins ALL instances of a connector type across an 732 organization. Requires `--organization-id` and `--actor-definition-id`. 733 734 The `customer_tier_filter` gates the operation: the call fails if the 735 actual tier of the target organization does not match. Use `ALL` to 736 proceed regardless of tier (a warning is shown for sensitive tiers). 737 """ 738 _dispatch_version_override( 739 override_level=override_level, 740 workspace_id=workspace_id, 741 organization_id=organization_id, 742 connector_id=connector_id, 743 connector_type=connector_type, 744 actor_definition_id=actor_definition_id, 745 version=version, 746 unset=False, 747 reason=reason, 748 reason_url=reason_url, 749 issue_url=issue_url, 750 approval_comment_url=approval_comment_url, 751 ai_agent_session_url=ai_agent_session_url, 752 customer_tier_filter=customer_tier_filter, 753 ) 754 755 756@connector_app.command(name="clear-version-override") 757def clear_version_override( 758 issue_url: Annotated[ 759 str, 760 Parameter(help="GitHub issue URL providing context for this operation."), 761 ], 762 approval_comment_url: Annotated[ 763 str, 764 Parameter( 765 help="Slack approval record URL where admin authorized this deployment." 766 ), 767 ], 768 override_level: Annotated[ 769 _OverrideLevel, 770 Parameter( 771 help=( 772 "Scope at which to clear the version override: 'actor', " 773 "'workspace', or 'organization'. Defaults to 'actor'." 774 ), 775 ), 776 ] = "actor", 777 workspace_id: Annotated[ 778 str | None, 779 Parameter( 780 help=( 781 "The Airbyte Cloud workspace ID. Required for 'actor' and " 782 "'workspace' override levels." 783 ) 784 ), 785 ] = None, 786 organization_id: Annotated[ 787 str | None, 788 Parameter( 789 help=( 790 "The Airbyte Cloud organization ID. Required for " 791 "'organization' override level." 792 ) 793 ), 794 ] = None, 795 connector_id: Annotated[ 796 str | None, 797 Parameter( 798 help=( 799 "The ID of the deployed connector (source or destination). " 800 "Required for 'actor' override level." 801 ) 802 ), 803 ] = None, 804 connector_type: Annotated[ 805 Literal["source", "destination"] | None, 806 Parameter(help="The type of connector. Required for 'actor' override level."), 807 ] = None, 808 actor_definition_id: Annotated[ 809 str | None, 810 Parameter( 811 help=( 812 "The connector definition UUID. Required for 'workspace' and " 813 "'organization' override levels." 814 ) 815 ), 816 ] = None, 817 ai_agent_session_url: Annotated[ 818 str | None, 819 Parameter( 820 help="URL to AI agent session driving this operation (for auditability)." 821 ), 822 ] = None, 823 customer_tier_filter: Annotated[ 824 _TierFilter, 825 Parameter( 826 help=( 827 "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 828 "The operation will be rejected if the actual customer tier does not match. " 829 "Defaults to 'TIER_2' (non-sensitive customers)." 830 ), 831 ), 832 ] = "TIER_2", 833) -> None: 834 """Clear a version override from a deployed connector. 835 836 Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and 837 `AIRBYTE_INTERNAL_ADMIN_USER` environment variables. 838 839 The `--override-level` flag selects the scope at which the pin is removed: 840 841 - `actor` (default): clears a single deployed connector instance pin. 842 Requires `--workspace-id`, `--connector-id`, and `--connector-type`. 843 - `workspace`: clears the workspace-level pin for a connector type. 844 Requires `--workspace-id` and `--actor-definition-id`. 845 - `organization`: clears the organization-level pin for a connector type. 846 Requires `--organization-id` and `--actor-definition-id`. 847 """ 848 _dispatch_version_override( 849 override_level=override_level, 850 workspace_id=workspace_id, 851 organization_id=organization_id, 852 connector_id=connector_id, 853 connector_type=connector_type, 854 actor_definition_id=actor_definition_id, 855 version=None, 856 unset=True, 857 reason=None, 858 reason_url=None, 859 issue_url=issue_url, 860 approval_comment_url=approval_comment_url, 861 ai_agent_session_url=ai_agent_session_url, 862 customer_tier_filter=customer_tier_filter, 863 ) 864 865 866def _load_json_file(file_path: Path) -> dict | None: 867 """Load a JSON file and return its contents. 868 869 Returns None if the file doesn't exist or contains invalid JSON. 870 """ 871 if not file_path.exists(): 872 return None 873 try: 874 return json.loads(file_path.read_text()) 875 except json.JSONDecodeError as e: 876 print_error(f"Failed to parse JSON in file: {file_path}\nError: {e}") 877 return None 878 879 880def _run_connector_command( 881 connector_image: str, 882 command: Command, 883 output_dir: Path, 884 target_or_control: TargetOrControl, 885 config_path: Path | None = None, 886 catalog_path: Path | None = None, 887 state_path: Path | None = None, 888 proxy_url: str | None = None, 889 enable_debug_logs: bool = False, 890) -> dict: 891 """Run a connector command and return results as a dict. 892 893 Args: 894 connector_image: Full connector image name with tag. 895 command: The Airbyte command to run. 896 output_dir: Directory to store output files. 897 target_or_control: Whether this is target or control version. 898 config_path: Path to connector config JSON file. 899 catalog_path: Path to configured catalog JSON file. 900 state_path: Path to state JSON file. 901 proxy_url: Optional HTTP proxy URL for traffic capture. 902 enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container. 903 904 Returns: 905 Dictionary with execution results. 906 """ 907 connector = ConnectorUnderTest.from_image_name(connector_image, target_or_control) 908 909 config = _load_json_file(config_path) if config_path else None 910 state = _load_json_file(state_path) if state_path else None 911 912 configured_catalog = None 913 if catalog_path and catalog_path.exists(): 914 catalog_json = catalog_path.read_text() 915 configured_catalog = ConfiguredAirbyteCatalog.parse_raw(catalog_json) 916 917 # Pass log level as an environment variable to the connector container 918 container_env: dict[str, str] = {} 919 if enable_debug_logs: 920 container_env["LOG_LEVEL"] = "DEBUG" 921 922 execution_inputs = ExecutionInputs( 923 connector_under_test=connector, 924 command=command, 925 output_dir=output_dir, 926 config=config, 927 configured_catalog=configured_catalog, 928 state=state, 929 environment_variables=container_env or None, 930 ) 931 932 runner = ConnectorRunner(execution_inputs, proxy_url=proxy_url) 933 result = runner.run() 934 935 result.save_artifacts(output_dir) 936 937 result_dict: dict[str, object] = { 938 "connector": connector_image, 939 "command": command.value, 940 "success": result.success, 941 "exit_code": result.exit_code, 942 "stdout_file": str(result.stdout_file_path), 943 "stderr_file": str(result.stderr_file_path), 944 "message_counts": { 945 k.value: v for k, v in result.get_message_count_per_type().items() 946 }, 947 "record_counts_per_stream": result.get_record_count_per_stream(), 948 } 949 950 # For CHECK, include the parsed connectionStatus from the Airbyte protocol. 951 # The CDK exits 0 regardless of check outcome; the real pass/fail signal 952 # lives in the CONNECTION_STATUS message (status=SUCCEEDED or FAILED). 953 if command == Command.CHECK: 954 conn_status = result.get_connection_status() 955 if conn_status is not None: 956 result_dict["connection_status"] = conn_status.status.value 957 result_dict["connection_status_message"] = conn_status.message or "" 958 else: 959 result_dict["connection_status"] = None 960 result_dict["connection_status_message"] = "" 961 962 return result_dict 963 964 965def _build_connector_image_from_source( 966 connector_name: str, 967 repo_root: Path | None = None, 968 tag: str = "dev", 969) -> str | None: 970 """Build a connector image from source code. 971 972 Args: 973 connector_name: Name of the connector (e.g., 'source-pokeapi'). 974 repo_root: Optional path to the airbyte repo root. If not provided, 975 will attempt to auto-detect from current directory. 976 tag: Tag to apply to the built image (default: 'dev'). 977 978 Returns: 979 The full image name with tag if successful, None if build fails. 980 """ 981 if not verify_docker_installation(): 982 print_error("Docker is not installed or not running") 983 return None 984 985 try: 986 connector_directory = find_connector_root_from_name(connector_name) 987 except FileNotFoundError: 988 if repo_root: 989 connector_directory = repo_root / CONNECTORS_SUBDIR / connector_name 990 if not connector_directory.exists(): 991 print_error(f"Connector directory not found: {connector_directory}") 992 return None 993 else: 994 print_error( 995 f"Could not find connector '{connector_name}'. " 996 "Try providing --repo-root to specify the airbyte repo location." 997 ) 998 return None 999 1000 metadata_file_path = connector_directory / "metadata.yaml" 1001 if not metadata_file_path.exists(): 1002 print_error(f"metadata.yaml not found at {metadata_file_path}") 1003 return None 1004 1005 metadata = MetadataFile.from_file(metadata_file_path) 1006 print_success(f"Building image for connector: {connector_name}") 1007 1008 built_image = build_connector_image( 1009 connector_name=connector_name, 1010 connector_directory=connector_directory, 1011 metadata=metadata, 1012 tag=tag, 1013 no_verify=False, 1014 ) 1015 print_success(f"Successfully built image: {built_image}") 1016 return built_image 1017 1018 1019def _fetch_control_image_from_metadata(connector_name: str) -> str | None: 1020 """Fetch the current released connector image from metadata.yaml on main branch. 1021 1022 This fetches the connector's metadata.yaml from the airbyte monorepo's master branch 1023 and extracts the dockerRepository and dockerImageTag to construct the control image. 1024 1025 Args: 1026 connector_name: The connector name (e.g., 'source-github'). 1027 1028 Returns: 1029 The full connector image with tag (e.g., 'airbyte/source-github:1.0.0'), 1030 or None if the metadata could not be fetched or parsed. 1031 """ 1032 metadata_url = ( 1033 f"https://raw.githubusercontent.com/airbytehq/airbyte/master/" 1034 f"airbyte-integrations/connectors/{connector_name}/metadata.yaml" 1035 ) 1036 response = requests.get(metadata_url, timeout=30) 1037 if not response.ok: 1038 print_error( 1039 f"Failed to fetch metadata for {connector_name}: " 1040 f"HTTP {response.status_code} from {metadata_url}" 1041 ) 1042 return None 1043 1044 metadata = yaml.safe_load(response.text) 1045 if not isinstance(metadata, dict): 1046 print_error(f"Invalid metadata format for {connector_name}: expected dict") 1047 return None 1048 1049 data = metadata.get("data", {}) 1050 docker_repository = data.get("dockerRepository") 1051 docker_image_tag = data.get("dockerImageTag") 1052 1053 if not docker_repository or not docker_image_tag: 1054 print_error( 1055 f"Could not find dockerRepository/dockerImageTag in metadata for {connector_name}" 1056 ) 1057 return None 1058 1059 return f"{docker_repository}:{docker_image_tag}" 1060 1061 1062def _run_with_optional_http_metrics( 1063 connector_image: str, 1064 command: Command, 1065 output_dir: Path, 1066 target_or_control: TargetOrControl, 1067 enable_http_metrics: bool, 1068 config_path: Path | None, 1069 catalog_path: Path | None, 1070 state_path: Path | None, 1071 enable_debug_logs: bool = False, 1072) -> dict: 1073 """Run a connector command with optional HTTP metrics capture. 1074 1075 When enable_http_metrics is True, starts mitmproxy to capture HTTP traffic. 1076 If mitmproxy fails to start, falls back to running without metrics. 1077 1078 Args: 1079 connector_image: Full connector image name with tag. 1080 command: The Airbyte command to run. 1081 output_dir: Directory to store output files. 1082 target_or_control: Whether this is target or control version. 1083 enable_http_metrics: Whether to capture HTTP metrics via mitmproxy. 1084 config_path: Path to connector config JSON file. 1085 catalog_path: Path to configured catalog JSON file. 1086 state_path: Path to state JSON file. 1087 enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container. 1088 1089 Returns: 1090 Dictionary with execution results, optionally including http_metrics. 1091 """ 1092 if not enable_http_metrics: 1093 return _run_connector_command( 1094 connector_image=connector_image, 1095 command=command, 1096 output_dir=output_dir, 1097 target_or_control=target_or_control, 1098 config_path=config_path, 1099 catalog_path=catalog_path, 1100 state_path=state_path, 1101 enable_debug_logs=enable_debug_logs, 1102 ) 1103 1104 with MitmproxyManager.start(output_dir) as session: 1105 if session is None: 1106 print_error("Mitmproxy unavailable, running without HTTP metrics") 1107 return _run_connector_command( 1108 connector_image=connector_image, 1109 command=command, 1110 output_dir=output_dir, 1111 target_or_control=target_or_control, 1112 config_path=config_path, 1113 catalog_path=catalog_path, 1114 state_path=state_path, 1115 enable_debug_logs=enable_debug_logs, 1116 ) 1117 1118 print_success(f"Started mitmproxy on {session.proxy_url}") 1119 result = _run_connector_command( 1120 connector_image=connector_image, 1121 command=command, 1122 output_dir=output_dir, 1123 target_or_control=target_or_control, 1124 config_path=config_path, 1125 catalog_path=catalog_path, 1126 state_path=state_path, 1127 proxy_url=session.proxy_url, 1128 enable_debug_logs=enable_debug_logs, 1129 ) 1130 1131 http_metrics = parse_http_dump(session.dump_file_path) 1132 result["http_metrics"] = { 1133 "flow_count": http_metrics.flow_count, 1134 "duplicate_flow_count": http_metrics.duplicate_flow_count, 1135 } 1136 print_success( 1137 f"Captured {http_metrics.flow_count} HTTP flows " 1138 f"({http_metrics.duplicate_flow_count} duplicates)" 1139 ) 1140 return result 1141 1142 1143@connector_app.command(name="regression-test") 1144def regression_test( 1145 skip_compare: Annotated[ 1146 bool, 1147 Parameter( 1148 help="If True, skip comparison and run single-version tests only. " 1149 "If False (default), run comparison tests (target vs control)." 1150 ), 1151 ] = False, 1152 test_image: Annotated[ 1153 str | None, 1154 Parameter( 1155 help="Test connector image with tag (e.g., airbyte/source-github:1.0.0). " 1156 "This is the image under test - in comparison mode, it's compared against control_image." 1157 ), 1158 ] = None, 1159 control_image: Annotated[ 1160 str | None, 1161 Parameter( 1162 help="Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). " 1163 "Ignored if `skip_compare=True`." 1164 ), 1165 ] = None, 1166 connector_name: Annotated[ 1167 str | None, 1168 Parameter( 1169 help="Connector name to build image from source (e.g., 'source-pokeapi'). " 1170 "If provided, builds the image locally with tag 'dev'. " 1171 "For comparison tests (default), this builds the target image. " 1172 "For single-version tests (skip_compare=True), this builds the test image." 1173 ), 1174 ] = None, 1175 repo_root: Annotated[ 1176 str | None, 1177 Parameter( 1178 help="Path to the airbyte repo root. Required if connector_name is provided " 1179 "and the repo cannot be auto-detected." 1180 ), 1181 ] = None, 1182 command: Annotated[ 1183 Literal["spec", "check", "discover", "read"], 1184 Parameter(help="The Airbyte command to run."), 1185 ] = "check", 1186 connection_id: Annotated[ 1187 str | None, 1188 Parameter( 1189 help="Airbyte Cloud connection ID to fetch config/catalog from. " 1190 "Mutually exclusive with config-path/catalog-path. " 1191 "If provided, test_image/control_image can be auto-detected." 1192 ), 1193 ] = None, 1194 config_path: Annotated[ 1195 str | None, 1196 Parameter(help="Path to the connector config JSON file."), 1197 ] = None, 1198 catalog_path: Annotated[ 1199 str | None, 1200 Parameter(help="Path to the configured catalog JSON file (required for read)."), 1201 ] = None, 1202 state_path: Annotated[ 1203 str | None, 1204 Parameter(help="Path to the state JSON file (optional for read)."), 1205 ] = None, 1206 output_dir: Annotated[ 1207 str, 1208 Parameter(help="Directory to store test artifacts."), 1209 ] = "/tmp/regression_test_artifacts", 1210 enable_http_metrics: Annotated[ 1211 bool, 1212 Parameter( 1213 help="Capture HTTP traffic metrics via mitmproxy (experimental). " 1214 "Requires mitmdump to be installed. Only used in comparison mode." 1215 ), 1216 ] = False, 1217 selected_streams: Annotated[ 1218 str | None, 1219 Parameter( 1220 help="Comma-separated list of stream names to include in the read. " 1221 "Only these streams will be included in the configured catalog. " 1222 "This is useful to limit data volume by testing only specific streams." 1223 ), 1224 ] = None, 1225 enable_debug_logs: Annotated[ 1226 bool, 1227 Parameter( 1228 help="Enable debug-level logging for regression test output. " 1229 "Also passed as `LOG_LEVEL=DEBUG` to the connector Docker container." 1230 ), 1231 ] = False, 1232 with_state: Annotated[ 1233 bool | None, 1234 Parameter( 1235 negative="--no-state", 1236 help="Fetch and pass the connection's current state to the read command, " 1237 "producing a warm read instead of a cold read. Defaults to `True` when " 1238 "`--connection-id` is provided, `False` otherwise. Has no effect unless " 1239 "the command is `read`. Ignored when `--state-path` is explicitly provided.", 1240 ), 1241 ] = None, 1242) -> None: 1243 """Run regression tests on connectors. 1244 1245 This command supports two modes: 1246 1247 Comparison mode (skip_compare=False, default): 1248 Runs the specified Airbyte protocol command against both the target (new) 1249 and control (baseline) connector versions, then compares the results. 1250 This helps identify regressions between versions. 1251 1252 Single-version mode (skip_compare=True): 1253 Runs the specified Airbyte protocol command against a single connector 1254 and validates the output. No comparison is performed. 1255 1256 Results are written to the output directory and to GitHub Actions outputs 1257 if running in CI. 1258 1259 You can provide the test image in three ways: 1260 1. --test-image: Use a pre-built image from Docker registry 1261 2. --connector-name: Build the image locally from source code 1262 3. --connection-id: Auto-detect from an Airbyte Cloud connection 1263 1264 You can provide config/catalog either via file paths OR via a connection_id 1265 that fetches them from Airbyte Cloud. 1266 """ 1267 # Configure debug logging for the regression test harness when requested 1268 if enable_debug_logs: 1269 logging.basicConfig( 1270 level=logging.DEBUG, 1271 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 1272 force=True, 1273 ) 1274 1275 output_path = Path(output_dir) 1276 output_path.mkdir(parents=True, exist_ok=True) 1277 1278 cmd = Command(command) 1279 1280 config_file: Path | None = None 1281 catalog_file: Path | None = None 1282 state_file = Path(state_path) if state_path else None 1283 1284 # Resolve the test image (used in both single-version and comparison modes) 1285 resolved_test_image: str | None = test_image 1286 resolved_control_image: str | None = control_image 1287 1288 # Validate conflicting parameters 1289 # Single-version mode: reject comparison-specific parameters 1290 if skip_compare and control_image: 1291 write_github_output("success", False) 1292 write_github_output( 1293 "error", "Cannot specify control_image with skip_compare=True" 1294 ) 1295 exit_with_error( 1296 "Cannot specify --control-image with --skip-compare. " 1297 "Control image is only used in comparison mode." 1298 ) 1299 1300 # If connector_name is provided, build the image from source 1301 if connector_name: 1302 if resolved_test_image: 1303 write_github_output("success", False) 1304 write_github_output( 1305 "error", "Cannot specify both test_image and connector_name" 1306 ) 1307 exit_with_error("Cannot specify both --test-image and --connector-name") 1308 1309 repo_root_path = Path(repo_root) if repo_root else None 1310 built_image = _build_connector_image_from_source( 1311 connector_name=connector_name, 1312 repo_root=repo_root_path, 1313 tag="dev", 1314 ) 1315 if not built_image: 1316 write_github_output("success", False) 1317 write_github_output("error", f"Failed to build image for {connector_name}") 1318 exit_with_error(f"Failed to build image for {connector_name}") 1319 resolved_test_image = built_image 1320 1321 if connection_id: 1322 if config_path or catalog_path: 1323 write_github_output("success", False) 1324 write_github_output( 1325 "error", "Cannot specify both connection_id and file paths" 1326 ) 1327 exit_with_error( 1328 "Cannot specify both connection_id and config_path/catalog_path" 1329 ) 1330 1331 print_success(f"Fetching config/catalog from connection: {connection_id}") 1332 connection_data = fetch_connection_data(connection_id) 1333 1334 # Check if we should retrieve unmasked secrets 1335 if should_use_secret_retriever(): 1336 print_success( 1337 "USE_CONNECTION_SECRET_RETRIEVER enabled - enriching config with unmasked secrets..." 1338 ) 1339 try: 1340 connection_data = enrich_config_with_secrets( 1341 connection_data, 1342 retrieval_reason="Regression test with USE_CONNECTION_SECRET_RETRIEVER=true", 1343 ) 1344 print_success("Successfully retrieved unmasked secrets from database") 1345 except SecretRetrievalError as e: 1346 write_github_output("success", False) 1347 write_github_output("error", str(e)) 1348 exit_with_error( 1349 f"{e}\n\n" 1350 f"This connection cannot be used for regression testing. " 1351 f"Please use a connection from a non-EU workspace, or use GSM-based " 1352 f"integration test credentials instead (by omitting --connection-id)." 1353 ) 1354 1355 # Resolve with_state default: True when connection_id is provided 1356 resolved_with_state = with_state if with_state is not None else True 1357 1358 if resolved_with_state and not state_file and command == "read": 1359 print_success("Fetching connection state for warm read...") 1360 try: 1361 connection_data.state = fetch_connection_state(connection_data) 1362 if connection_data.state: 1363 print_success( 1364 f"Fetched state with {len(connection_data.state)} stream(s)" 1365 ) 1366 else: 1367 connection_data.state = None 1368 print_success( 1369 "No state available for this connection (initial sync)" 1370 ) 1371 except Exception as exc: 1372 print_error(f"Failed to fetch connection state: {exc}") 1373 print_success("Falling back to cold read (no state)") 1374 1375 config_file, catalog_file, state_file_from_conn = save_connection_data_to_files( 1376 connection_data, output_path / "connection_data" 1377 ) 1378 1379 if state_file_from_conn and not state_file: 1380 state_file = state_file_from_conn 1381 1382 print_success( 1383 f"Fetched config for source: {connection_data.source_name} " 1384 f"with {len(connection_data.stream_names)} streams" 1385 ) 1386 1387 # Auto-detect test/control image from connection if not provided 1388 if not resolved_test_image and connection_data.connector_image: 1389 resolved_test_image = connection_data.connector_image 1390 print_success(f"Auto-detected test image: {resolved_test_image}") 1391 1392 if ( 1393 not skip_compare 1394 and not resolved_control_image 1395 and connection_data.connector_image 1396 ): 1397 resolved_control_image = connection_data.connector_image 1398 print_success(f"Auto-detected control image: {resolved_control_image}") 1399 elif config_path: 1400 config_file = Path(config_path) 1401 catalog_file = Path(catalog_path) if catalog_path else None 1402 elif connector_name: 1403 # Fallback: fetch integration test secrets from GSM using PyAirbyte API 1404 print_success( 1405 f"No connection_id or config_path provided. " 1406 f"Attempting to fetch integration test config from GSM for {connector_name}..." 1407 ) 1408 gsm_config = get_first_config_from_secrets(connector_name) 1409 if gsm_config: 1410 # Write config to a temp file (not in output_path to avoid artifact upload) 1411 gsm_config_dir = Path( 1412 tempfile.mkdtemp(prefix=f"gsm-config-{connector_name}-") 1413 ) 1414 gsm_config_dir.chmod(0o700) 1415 gsm_config_file = gsm_config_dir / "config.json" 1416 gsm_config_file.write_text(json.dumps(gsm_config, indent=2)) 1417 gsm_config_file.chmod(0o600) 1418 config_file = gsm_config_file 1419 # Use catalog_path if provided (e.g., generated from discover output) 1420 catalog_file = Path(catalog_path) if catalog_path else None 1421 print_success( 1422 f"Fetched integration test config from GSM for {connector_name}" 1423 ) 1424 else: 1425 print_error( 1426 f"Failed to fetch integration test config from GSM for {connector_name}." 1427 ) 1428 config_file = None 1429 # Use catalog_path if provided (e.g., generated from discover output) 1430 catalog_file = Path(catalog_path) if catalog_path else None 1431 else: 1432 config_file = None 1433 catalog_file = Path(catalog_path) if catalog_path else None 1434 1435 # Auto-detect control_image from metadata.yaml if connector_name is provided (comparison mode only) 1436 if not skip_compare and not resolved_control_image and connector_name: 1437 resolved_control_image = _fetch_control_image_from_metadata(connector_name) 1438 if resolved_control_image: 1439 print_success( 1440 f"Auto-detected control image from metadata.yaml: {resolved_control_image}" 1441 ) 1442 1443 # Validate that we have the required images 1444 if not resolved_test_image: 1445 write_github_output("success", False) 1446 write_github_output("error", "No test image specified") 1447 exit_with_error( 1448 "You must provide one of the following: a test_image, a connector_name " 1449 "to build the image from source, or a connection_id to auto-detect the image." 1450 ) 1451 1452 if not skip_compare and not resolved_control_image: 1453 write_github_output("success", False) 1454 write_github_output("error", "No control image specified") 1455 exit_with_error( 1456 "You must provide one of the following: a control_image, a connection_id " 1457 "for a connection that has an associated connector image, or a connector_name " 1458 "to auto-detect the control image from the airbyte repo's metadata.yaml." 1459 ) 1460 1461 # Pull images if they weren't just built locally 1462 # If connector_name was provided, we just built the test image locally 1463 if not connector_name and not ensure_image_available(resolved_test_image): 1464 write_github_output("success", False) 1465 write_github_output("error", f"Failed to pull image: {resolved_test_image}") 1466 exit_with_error(f"Failed to pull test image: {resolved_test_image}") 1467 1468 if ( 1469 not skip_compare 1470 and resolved_control_image 1471 and not ensure_image_available(resolved_control_image) 1472 ): 1473 write_github_output("success", False) 1474 write_github_output("error", f"Failed to pull image: {resolved_control_image}") 1475 exit_with_error( 1476 f"Failed to pull control connector image: {resolved_control_image}" 1477 ) 1478 1479 # Apply selected_streams filter to catalog if requested 1480 if selected_streams and catalog_file: 1481 streams_set = {s.strip() for s in selected_streams.split(",") if s.strip()} 1482 if streams_set: 1483 print_success( 1484 f"Filtering catalog to {len(streams_set)} selected streams: " 1485 f"{', '.join(sorted(streams_set))}" 1486 ) 1487 filter_configured_catalog_file(catalog_file, streams_set) 1488 1489 # Upgrade READ → READ_WITH_STATE when a state file is available 1490 if cmd == Command.READ and state_file: 1491 cmd = Command.READ_WITH_STATE 1492 print_success("State available — using read-with-state (warm read)") 1493 1494 # Track telemetry for the regression test 1495 # Extract version from image tag (e.g., "airbyte/source-github:1.0.0" -> "1.0.0") 1496 target_version = ( 1497 resolved_test_image.rsplit(":", 1)[-1] 1498 if ":" in resolved_test_image 1499 else "unknown" 1500 ) 1501 control_version = None 1502 if resolved_control_image and ":" in resolved_control_image: 1503 control_version = resolved_control_image.rsplit(":", 1)[-1] 1504 1505 # Get tester identity from environment (GitHub Actions sets GITHUB_ACTOR) 1506 tester = os.getenv("GITHUB_ACTOR") or os.getenv("USER") 1507 1508 track_regression_test( 1509 user_id=tester, 1510 connector_image=resolved_test_image, 1511 command=command, 1512 target_version=target_version, 1513 control_version=control_version, 1514 additional_properties={ 1515 "connection_id": connection_id, 1516 "skip_compare": skip_compare, 1517 "with_state": cmd == Command.READ_WITH_STATE, 1518 }, 1519 ) 1520 1521 # Execute the appropriate mode 1522 if skip_compare: 1523 # Single-version mode: run only the connector image 1524 result = _run_connector_command( 1525 connector_image=resolved_test_image, 1526 command=cmd, 1527 output_dir=output_path, 1528 target_or_control=TargetOrControl.TARGET, 1529 config_path=config_file, 1530 catalog_path=catalog_file, 1531 state_path=state_file, 1532 enable_debug_logs=enable_debug_logs, 1533 ) 1534 1535 print_json(result) 1536 1537 write_github_outputs( 1538 { 1539 "success": result["success"], 1540 "connector": resolved_test_image, 1541 "command": command, 1542 "exit_code": result["exit_code"], 1543 } 1544 ) 1545 1546 write_test_summary( 1547 connector_image=resolved_test_image, 1548 test_type="regression-test", 1549 success=result["success"], 1550 results={ 1551 "command": command, 1552 "exit_code": result["exit_code"], 1553 "output_dir": output_dir, 1554 }, 1555 ) 1556 1557 # Generate report.md with detailed metrics 1558 report_path = generate_single_version_report( 1559 connector_image=resolved_test_image, 1560 command=command, 1561 result=result, 1562 output_dir=output_path, 1563 ) 1564 print_success(f"Generated report: {report_path}") 1565 1566 # Write report to GITHUB_STEP_SUMMARY (if env var exists) 1567 write_github_summary(report_path.read_text()) 1568 1569 if result["success"]: 1570 print_success( 1571 f"Single-version regression test passed for {resolved_test_image}" 1572 ) 1573 else: 1574 print_error( 1575 f"Single-version regression test failed for {resolved_test_image}" 1576 ) 1577 else: 1578 # Comparison mode: run both target and control images 1579 target_output = output_path / "target" 1580 control_output = output_path / "control" 1581 1582 target_result = _run_with_optional_http_metrics( 1583 connector_image=resolved_test_image, 1584 command=cmd, 1585 output_dir=target_output, 1586 target_or_control=TargetOrControl.TARGET, 1587 enable_http_metrics=enable_http_metrics, 1588 config_path=config_file, 1589 catalog_path=catalog_file, 1590 state_path=state_file, 1591 enable_debug_logs=enable_debug_logs, 1592 ) 1593 1594 control_result = _run_with_optional_http_metrics( 1595 connector_image=resolved_control_image, # type: ignore[arg-type] 1596 command=cmd, 1597 output_dir=control_output, 1598 target_or_control=TargetOrControl.CONTROL, 1599 enable_http_metrics=enable_http_metrics, 1600 config_path=config_file, 1601 catalog_path=catalog_file, 1602 state_path=state_file, 1603 enable_debug_logs=enable_debug_logs, 1604 ) 1605 1606 both_succeeded = target_result["success"] and control_result["success"] 1607 regression_detected = target_result["success"] != control_result["success"] 1608 1609 # For CHECK commands, override pass/fail using connectionStatus instead 1610 # of exit codes. The CDK exits 0 even when a check fails; the real 1611 # signal is in CONNECTION_STATUS.status. 1612 if command == "check": 1613 target_conn = target_result.get("connection_status") 1614 control_conn = control_result.get("connection_status") 1615 if target_conn is not None and control_conn is not None: 1616 both_succeeded = ( 1617 target_conn == "SUCCEEDED" and control_conn == "SUCCEEDED" 1618 ) 1619 # Only flag a regression when the target got worse. 1620 # An improvement (target SUCCEEDED, control FAILED) is not a 1621 # regression. 1622 regression_detected = ( 1623 target_conn == "FAILED" and control_conn == "SUCCEEDED" 1624 ) 1625 1626 combined_result = { 1627 "target": target_result, 1628 "control": control_result, 1629 "both_succeeded": both_succeeded, 1630 "regression_detected": regression_detected, 1631 } 1632 1633 print_json(combined_result) 1634 1635 both_failed = not target_result["success"] and not control_result["success"] 1636 1637 # For CHECK, also flag both-failed when both have FAILED connectionStatus 1638 if command == "check": 1639 target_conn = target_result.get("connection_status") 1640 control_conn = control_result.get("connection_status") 1641 if target_conn == "FAILED" and control_conn == "FAILED": 1642 both_failed = True 1643 1644 gh_outputs: dict[str, object] = { 1645 "success": not regression_detected, 1646 "target_image": resolved_test_image, 1647 "control_image": resolved_control_image, 1648 "command": command, 1649 "target_exit_code": target_result["exit_code"], 1650 "control_exit_code": control_result["exit_code"], 1651 "regression_detected": regression_detected, 1652 "both_failed": both_failed, 1653 } 1654 1655 if command == "check": 1656 gh_outputs["target_connection_status"] = target_result.get( 1657 "connection_status" 1658 ) 1659 gh_outputs["control_connection_status"] = control_result.get( 1660 "connection_status" 1661 ) 1662 1663 write_github_outputs(gh_outputs) 1664 1665 write_json_output("regression_report", combined_result) 1666 1667 report_path = generate_regression_report( 1668 target_image=resolved_test_image, 1669 control_image=resolved_control_image, # type: ignore[arg-type] 1670 command=command, 1671 target_result=target_result, 1672 control_result=control_result, 1673 output_dir=output_path, 1674 ) 1675 print_success(f"Generated regression report: {report_path}") 1676 1677 # Write report to GITHUB_STEP_SUMMARY (if env var exists) 1678 write_github_summary(report_path.read_text()) 1679 1680 if regression_detected: 1681 print_error( 1682 f"Regression detected between {resolved_test_image} and {resolved_control_image}" 1683 ) 1684 elif both_succeeded: 1685 print_success( 1686 f"Regression test passed for {resolved_test_image} vs {resolved_control_image}" 1687 ) 1688 else: 1689 # Both versions failed, but no regression between them was detected; treat this as a 1690 # test environment issue (e.g., expired credentials, transient API errors, rate limiting). 1691 # Exit successfully since no regression between versions was detected. 1692 print_warning( 1693 f"Both versions failed for {resolved_test_image} vs {resolved_control_image}. " 1694 "No regression detected. This may indicate expired credentials or a transient API issue." 1695 ) 1696 1697 1698@connector_app.command(name="fetch-connection-config") 1699def fetch_connection_config_cmd( 1700 connection_id: Annotated[ 1701 str, 1702 Parameter(help="The UUID of the Airbyte Cloud connection."), 1703 ], 1704 output_path: Annotated[ 1705 str | None, 1706 Parameter( 1707 help="Path to output file or directory. " 1708 "If directory, writes connection-<id>-config.json inside it. " 1709 "Default: platform temp directory (e.g. /tmp/connection-<id>-config.json)" 1710 ), 1711 ] = None, 1712 with_secrets: Annotated[ 1713 bool, 1714 Parameter( 1715 name="--with-secrets", 1716 negative="--no-secrets", 1717 help="If set, fetches unmasked secrets from the internal database. " 1718 "Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or `gcloud auth application-default login`. " 1719 "Must be used with --oc-issue-url.", 1720 ), 1721 ] = False, 1722 oc_issue_url: Annotated[ 1723 str | None, 1724 Parameter( 1725 help="OC issue URL for audit logging. Required when using --with-secrets." 1726 ), 1727 ] = None, 1728) -> None: 1729 """Fetch connection configuration from Airbyte Cloud to a local file. 1730 1731 This command retrieves the source configuration for a given connection ID 1732 and writes it to a JSON file. When `--output-path` is omitted the file is 1733 written to the platform temp directory to avoid accidentally committing 1734 secrets to a git repository. 1735 1736 Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and 1737 AIRBYTE_CLOUD_CLIENT_SECRET environment variables. 1738 1739 When --with-secrets is specified, the command fetches unmasked secrets from 1740 the internal database using the connection-retriever. This additionally requires: 1741 - An OC issue URL for audit logging (--oc-issue-url) 1742 - GCP credentials via `GCP_PROD_DB_ACCESS_CREDENTIALS` env var or `gcloud auth application-default login` 1743 - Cloud SQL Python Connector access to the Prod DB replica. 1744 """ 1745 path = Path(output_path) if output_path else None 1746 result = fetch_connection_config( 1747 connection_id=connection_id, 1748 output_path=path, 1749 with_secrets=with_secrets, 1750 oc_issue_url=oc_issue_url, 1751 ) 1752 if result.success: 1753 print_success(result.message) 1754 else: 1755 print_error(result.message) 1756 print_json(result.model_dump()) 1757 1758 1759def _read_json_input( 1760 json_str: str | None, 1761 input_file: str | None, 1762) -> dict: 1763 """Read JSON from a positional arg, --input file, or STDIN (in that priority).""" 1764 if json_str is not None: 1765 return json.loads(json_str) 1766 if input_file is not None: 1767 return json.loads(Path(input_file).read_text()) 1768 if not sys.stdin.isatty(): 1769 return json.loads(sys.stdin.read()) 1770 exit_with_error( 1771 "No JSON input provided. Pass it as a positional argument, " 1772 "via --input <file>, or pipe to STDIN." 1773 ) 1774 raise SystemExit(1) # unreachable, but satisfies type checker 1775 1776 1777@state_app.command(name="get") 1778def get_connection_state( 1779 connection_id: Annotated[ 1780 str, 1781 Parameter(help="The connection ID (UUID) to fetch state for."), 1782 ], 1783 stream_name: Annotated[ 1784 str | None, 1785 Parameter( 1786 help="Optional stream name to filter state for a single stream.", 1787 name="--stream", 1788 ), 1789 ] = None, 1790 stream_namespace: Annotated[ 1791 str | None, 1792 Parameter( 1793 help="Optional stream namespace to narrow the stream filter.", 1794 name="--namespace", 1795 ), 1796 ] = None, 1797 output: Annotated[ 1798 str | None, 1799 Parameter( 1800 help="Path to write the state JSON output to a file.", 1801 name="--output", 1802 ), 1803 ] = None, 1804) -> None: 1805 """Get the current state for an Airbyte Cloud connection.""" 1806 workspace = CloudWorkspace.from_env() 1807 conn = workspace.get_connection(connection_id) 1808 1809 if stream_name is not None: 1810 stream_state = conn.get_stream_state( 1811 stream_name=stream_name, 1812 stream_namespace=stream_namespace, 1813 ) 1814 result = { 1815 "connection_id": connection_id, 1816 "stream_name": stream_name, 1817 "stream_namespace": stream_namespace, 1818 "stream_state": stream_state, 1819 } 1820 else: 1821 result = conn.dump_raw_state() 1822 1823 json_output = json.dumps(result, indent=2, default=str) 1824 if output is not None: 1825 Path(output).write_text(json_output + "\n") 1826 print(f"State written to {output}", file=sys.stderr) 1827 else: 1828 print(json_output) 1829 1830 1831@state_app.command(name="set") 1832def set_connection_state( 1833 connection_id: Annotated[ 1834 str, 1835 Parameter(help="The connection ID (UUID) to update state for."), 1836 ], 1837 state_json: Annotated[ 1838 str | None, 1839 Parameter( 1840 help="The connection state as a JSON string. When --stream is used, " 1841 'this is just the stream\'s state blob (e.g., \'{"cursor": "2024-01-01"}\').' 1842 " Otherwise, must include 'stateType', 'connectionId', and the appropriate " 1843 "state field. Can also be provided via --input or STDIN." 1844 ), 1845 ] = None, 1846 stream_name: Annotated[ 1847 str | None, 1848 Parameter( 1849 help="Optional stream name to update state for a single stream only.", 1850 name="--stream", 1851 ), 1852 ] = None, 1853 stream_namespace: Annotated[ 1854 str | None, 1855 Parameter( 1856 help="Optional stream namespace to identify the stream.", 1857 name="--namespace", 1858 ), 1859 ] = None, 1860 input_file: Annotated[ 1861 str | None, 1862 Parameter( 1863 help="Path to a JSON file containing the state to set.", 1864 name="--input", 1865 ), 1866 ] = None, 1867) -> None: 1868 """Set the state for an Airbyte Cloud connection. 1869 1870 State JSON can be provided as a positional argument, via --input <file>, 1871 or piped through STDIN. 1872 1873 Uses the safe variant that prevents updates while a sync is running. 1874 When --stream is provided, only that stream's state is updated within 1875 the existing connection state. 1876 """ 1877 parsed_state = _read_json_input(state_json, input_file) 1878 workspace = CloudWorkspace.from_env() 1879 conn = workspace.get_connection(connection_id) 1880 1881 if stream_name is not None: 1882 conn.set_stream_state( 1883 stream_name=stream_name, 1884 state_blob_dict=parsed_state, 1885 stream_namespace=stream_namespace, 1886 ) 1887 else: 1888 conn.import_raw_state(parsed_state) 1889 1890 result = conn.dump_raw_state() 1891 json_output = json.dumps(result, indent=2, default=str) 1892 print(json_output) 1893 1894 1895@state_app.command(name="reset") 1896def reset_stream_state( 1897 connection_id: Annotated[ 1898 str, 1899 Parameter(help="The connection ID (UUID) to update state for."), 1900 ], 1901 stream_name: Annotated[ 1902 str, 1903 Parameter( 1904 help="The configured stream name whose state should be reset.", 1905 name="--stream", 1906 ), 1907 ], 1908 stream_namespace: Annotated[ 1909 str | None, 1910 Parameter( 1911 help="Optional stream namespace to identify the stream.", 1912 name="--namespace", 1913 ), 1914 ] = None, 1915) -> None: 1916 """Reset a configured stream's state so the next sync full-refreshes it. 1917 1918 Uses the safe variant that prevents updates while a sync is running. 1919 Returns a restorable `previous_state_backup` in raw Config API format. 1920 """ 1921 workspace = CloudWorkspace.from_env() 1922 conn = workspace.get_connection(connection_id) 1923 1924 try: 1925 result = reset_cloud_stream_state( 1926 conn, 1927 stream_name=stream_name, 1928 stream_namespace=stream_namespace, 1929 ) 1930 except PyAirbyteInputError as e: 1931 exit_with_error(str(e)) 1932 1933 print(json.dumps(result.model_dump(), indent=2, default=str)) 1934 1935 1936@catalog_app.command(name="get") 1937def get_connection_catalog( 1938 connection_id: Annotated[ 1939 str, 1940 Parameter(help="The connection ID (UUID) to fetch catalog for."), 1941 ], 1942 output: Annotated[ 1943 str | None, 1944 Parameter( 1945 help="Path to write the catalog JSON output to a file.", 1946 name="--output", 1947 ), 1948 ] = None, 1949) -> None: 1950 """Get the configured catalog for an Airbyte Cloud connection.""" 1951 workspace = CloudWorkspace.from_env() 1952 conn = workspace.get_connection(connection_id) 1953 result = conn.dump_raw_catalog() 1954 if result is None: 1955 exit_with_error("No configured catalog found for this connection.") 1956 raise SystemExit(1) # unreachable, but satisfies type checker 1957 1958 json_output = json.dumps(result, indent=2, default=str) 1959 if output is not None: 1960 Path(output).write_text(json_output + "\n") 1961 print(f"Catalog written to {output}", file=sys.stderr) 1962 else: 1963 print(json_output) 1964 1965 1966@catalog_app.command(name="set") 1967def set_connection_catalog( 1968 connection_id: Annotated[ 1969 str, 1970 Parameter(help="The connection ID (UUID) to update catalog for."), 1971 ], 1972 catalog_json: Annotated[ 1973 str | None, 1974 Parameter( 1975 help="The configured catalog as a JSON string. " 1976 "Can also be provided via --input or STDIN." 1977 ), 1978 ] = None, 1979 input_file: Annotated[ 1980 str | None, 1981 Parameter( 1982 help="Path to a JSON file containing the catalog to set.", 1983 name="--input", 1984 ), 1985 ] = None, 1986) -> None: 1987 """Set the configured catalog for an Airbyte Cloud connection. 1988 1989 Catalog JSON can be provided as a positional argument, via --input <file>, 1990 or piped through STDIN. 1991 1992 WARNING: This replaces the entire configured catalog. 1993 """ 1994 parsed_catalog = _read_json_input(catalog_json, input_file) 1995 workspace = CloudWorkspace.from_env() 1996 conn = workspace.get_connection(connection_id) 1997 conn.import_raw_catalog(parsed_catalog) 1998 1999 result = conn.dump_raw_catalog() 2000 if result is None: 2001 exit_with_error("Failed to retrieve catalog after import.") 2002 raise SystemExit(1) # unreachable, but satisfies type checker 2003 2004 json_output = json.dumps(result, indent=2, default=str) 2005 print(json_output) 2006 2007 2008@logs_app.command(name="lookup-cloud-backend-error") 2009def lookup_cloud_backend_error( 2010 error_id: Annotated[ 2011 str, 2012 Parameter( 2013 help=( 2014 "The error ID (UUID) to search for. This is typically returned " 2015 "in API error responses as {'errorId': '...'}" 2016 ) 2017 ), 2018 ], 2019 lookback_days: Annotated[ 2020 int, 2021 Parameter(help="Number of days to look back in logs."), 2022 ] = 7, 2023 min_severity_filter: Annotated[ 2024 GCPSeverity | None, 2025 Parameter( 2026 help="Optional minimum severity level to filter logs.", 2027 ), 2028 ] = None, 2029 raw: Annotated[ 2030 bool, 2031 Parameter(help="Output raw JSON instead of formatted text."), 2032 ] = False, 2033) -> None: 2034 """Look up error details from GCP Cloud Logging by error ID. 2035 2036 When an Airbyte Cloud API returns an error response with only an error ID 2037 (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command 2038 fetches the full stack trace and error details from GCP Cloud Logging. 2039 2040 Requires GCP credentials with Logs Viewer role on the target project. 2041 Set up credentials with: gcloud auth application-default login 2042 """ 2043 print(f"Searching for error ID: {error_id}", file=sys.stderr) 2044 print(f"Lookback days: {lookback_days}", file=sys.stderr) 2045 if min_severity_filter: 2046 print(f"Severity filter: {min_severity_filter}", file=sys.stderr) 2047 print(file=sys.stderr) 2048 2049 result = fetch_error_logs( 2050 error_id=error_id, 2051 lookback_days=lookback_days, 2052 min_severity_filter=min_severity_filter, 2053 ) 2054 2055 if raw: 2056 print_json(result.model_dump()) 2057 return 2058 2059 print(f"Found {result.total_entries_found} log entries", file=sys.stderr) 2060 print(file=sys.stderr) 2061 2062 if result.payloads: 2063 for i, payload in enumerate(result.payloads): 2064 print(f"=== Log Group {i + 1} ===") 2065 print(f"Timestamp: {payload.timestamp}") 2066 print(f"Severity: {payload.severity}") 2067 if payload.resource.labels.pod_name: 2068 print(f"Pod: {payload.resource.labels.pod_name}") 2069 print(f"Lines: {payload.num_log_lines}") 2070 print() 2071 print(payload.message) 2072 print() 2073 elif result.entries: 2074 print("No grouped payloads, showing raw entries:", file=sys.stderr) 2075 for entry in result.entries: 2076 print(f"[{entry.timestamp}] {entry.severity}: {entry.payload}") 2077 else: 2078 print_error("No log entries found for this error ID.") 2079 2080 2081# ============================================================================= 2082# Organization commands 2083# ============================================================================= 2084 2085 2086@organization_app.command(name="search") 2087def organization_search( 2088 name_contains: Annotated[ 2089 str, 2090 Parameter( 2091 name="--name-contains", 2092 help="Case-insensitive substring to match against organization name or email.", 2093 ), 2094 ], 2095 limit: Annotated[ 2096 int, 2097 Parameter( 2098 name="--limit", 2099 help="Maximum number of results to return.", 2100 ), 2101 ] = 20, 2102) -> None: 2103 """Search organizations by name or email substring.""" 2104 rows = _search_organizations(name_contains=name_contains, limit=limit) 2105 if not rows: 2106 print_error(f"No organizations found matching '{name_contains}'.") 2107 return 2108 2109 print_json([dict(row) for row in rows]) 2110 2111 2112@organization_app.command(name="info") 2113def organization_info( 2114 organization_id: Annotated[ 2115 str, 2116 Parameter(help="The organization UUID."), 2117 ], 2118) -> None: 2119 """Get information about an organization (stub — not yet implemented).""" 2120 exit_with_error( 2121 "organization info is not yet implemented. " 2122 "Use 'airbyte-ops cloud organization search' to find organizations by name." 2123 ) 2124 2125 2126# ============================================================================= 2127# Workspace commands 2128# ============================================================================= 2129 2130 2131@workspace_app.command(name="search") 2132def workspace_search( 2133 name_contains: Annotated[ 2134 str | None, 2135 Parameter( 2136 name="--name-contains", 2137 help="Case-insensitive substring to match against workspace name or slug.", 2138 ), 2139 ] = None, 2140 email_domain: Annotated[ 2141 str | None, 2142 Parameter( 2143 name="--email-domain", 2144 help="Email domain to search for (e.g. 'motherduck.com').", 2145 ), 2146 ] = None, 2147 limit: Annotated[ 2148 int, 2149 Parameter( 2150 name="--limit", 2151 help="Maximum number of results to return.", 2152 ), 2153 ] = 100, 2154) -> None: 2155 """Search workspaces by name substring or email domain.""" 2156 if not name_contains and not email_domain: 2157 exit_with_error( 2158 "At least one of --name-contains or --email-domain must be provided." 2159 ) 2160 2161 if name_contains: 2162 rows = _search_workspaces(name_contains=name_contains, limit=limit) 2163 else: 2164 rows = query_workspaces_by_email_domain( 2165 email_domain=email_domain.lstrip("@"), # type: ignore[union-attr] 2166 limit=limit, 2167 ) 2168 2169 if not rows: 2170 search_term = name_contains or email_domain 2171 print_error(f"No workspaces found matching '{search_term}'.") 2172 return 2173 2174 print_json([dict(row) for row in rows]) 2175 2176 2177@workspace_app.command(name="info") 2178def workspace_info_cmd( 2179 workspace_id: Annotated[ 2180 str, 2181 Parameter(help="The workspace UUID."), 2182 ], 2183) -> None: 2184 """Get information about a workspace.""" 2185 result = query_workspace_info(workspace_id=workspace_id) 2186 if not result: 2187 print_error(f"No workspace found with ID '{workspace_id}'.") 2188 return 2189 2190 print_json(dict(result))