airbyte_ops_mcp.cli.cloud
CLI commands for Airbyte Cloud operations.
Commands:
airbyte-ops cloud connector get-version-info - Get connector version info airbyte-ops cloud connector set-version-override - Set connector version override airbyte-ops cloud connector clear-version-override - Clear connector version override airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file
CLI reference
The commands below are regenerated by poe docs-generate via cyclopts's
programmatic docs API; see docs/generate_cli.py.
airbyte-ops cloud COMMAND
Airbyte Cloud operations.
Commands:
connection: Connection operations in Airbyte Cloud.connector: Deployed connector operations in Airbyte Cloud.db: Database operations for Airbyte Cloud Prod DB Replica.logs: GCP Cloud Logging operations for Airbyte Cloud.
airbyte-ops cloud connector
Deployed connector operations in Airbyte Cloud.
airbyte-ops cloud connector get-version-info
airbyte-ops cloud connector get-version-info WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE
Get the current version information for a deployed connector.
Parameters:
WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]
airbyte-ops cloud connector set-version-override
airbyte-ops cloud connector set-version-override WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE VERSION REASON ISSUE-URL APPROVAL-COMMENT-URL [ARGS]
Set a version override for a deployed connector.
Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.
The customer_tier_filter gates the operation: the call fails if
the actual tier of the workspace's organization does not match.
Use ALL to proceed regardless of tier (a warning is shown for sensitive tiers).
Parameters:
WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]VERSION, --version: The semver version string to pin to (e.g., '2.1.5-preview.abc1234'). [required]REASON, --reason: Explanation for the override (min 10 characters). [required]ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).REASON-URL, --reason-url: Optional URL with more context (e.g., issue link).CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]
airbyte-ops cloud connector clear-version-override
airbyte-ops cloud connector clear-version-override WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE ISSUE-URL APPROVAL-COMMENT-URL [ARGS]
Clear a version override from a deployed connector.
Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment variables.
The customer_tier_filter gates the operation: the call fails if
the actual tier of the workspace's organization does not match.
Use ALL to proceed regardless of tier (a warning is shown for sensitive tiers).
Parameters:
WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]
airbyte-ops cloud connector regression-test
airbyte-ops cloud connector regression-test [ARGS]
Run regression tests on connectors.
This command supports two modes:
Comparison mode (skip_compare=False, default): Runs the specified Airbyte protocol command against both the target (new) and control (baseline) connector versions, then compares the results. This helps identify regressions between versions.
Single-version mode (skip_compare=True): Runs the specified Airbyte protocol command against a single connector and validates the output. No comparison is performed.
Results are written to the output directory and to GitHub Actions outputs if running in CI.
You can provide the test image in three ways:
- --test-image: Use a pre-built image from Docker registry
- --connector-name: Build the image locally from source code
- --connection-id: Auto-detect from an Airbyte Cloud connection
You can provide config/catalog either via file paths OR via a connection_id that fetches them from Airbyte Cloud.
Parameters:
SKIP-COMPARE, --skip-compare, --no-skip-compare: If True, skip comparison and run single-version tests only. If False (default), run comparison tests (target vs control). [default: False]TEST-IMAGE, --test-image: Test connector image with tag (e.g., airbyte/source-github:1.0.0). This is the image under test - in comparison mode, it's compared against control_image.CONTROL-IMAGE, --control-image: Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). Ignored ifskip_compare=True.CONNECTOR-NAME, --connector-name: Connector name to build image from source (e.g., 'source-pokeapi'). If provided, builds the image locally with tag 'dev'. For comparison tests (default), this builds the target image. For single-version tests (skip_compare=True), this builds the test image.REPO-ROOT, --repo-root: Path to the airbyte repo root. Required if connector_name is provided and the repo cannot be auto-detected.COMMAND, --command: The Airbyte command to run. [choices: spec, check, discover, read] [default: check]CONNECTION-ID, --connection-id: Airbyte Cloud connection ID to fetch config/catalog from. Mutually exclusive with config-path/catalog-path. If provided, test_image/control_image can be auto-detected.CONFIG-PATH, --config-path: Path to the connector config JSON file.CATALOG-PATH, --catalog-path: Path to the configured catalog JSON file (required for read).STATE-PATH, --state-path: Path to the state JSON file (optional for read).OUTPUT-DIR, --output-dir: Directory to store test artifacts. [default: /tmp/regression_test_artifacts]ENABLE-HTTP-METRICS, --enable-http-metrics, --no-enable-http-metrics: Capture HTTP traffic metrics via mitmproxy (experimental). Requires mitmdump to be installed. Only used in comparison mode. [default: False]SELECTED-STREAMS, --selected-streams: Comma-separated list of stream names to include in the read. Only these streams will be included in the configured catalog. This is useful to limit data volume by testing only specific streams.ENABLE-DEBUG-LOGS, --enable-debug-logs, --no-enable-debug-logs: Enable debug-level logging for regression test output. Also passed asLOG_LEVEL=DEBUGto the connector Docker container. [default: False]WITH-STATE, --with-state, --no-state: Fetch and pass the connection's current state to the read command, producing a warm read instead of a cold read. Defaults toTruewhen--connection-idis provided,Falseotherwise. Has no effect unless the command isread. Ignored when--state-pathis explicitly provided.
airbyte-ops cloud connector fetch-connection-config
airbyte-ops cloud connector fetch-connection-config CONNECTION-ID [ARGS]
Fetch connection configuration from Airbyte Cloud to a local file.
This command retrieves the source configuration for a given connection ID
and writes it to a JSON file. When --output-path is omitted the file is
written to the platform temp directory to avoid accidentally committing
secrets to a git repository.
Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET environment variables.
When --with-secrets is specified, the command fetches unmasked secrets from the internal database using the connection-retriever. This additionally requires:
- An OC issue URL for audit logging (--oc-issue-url)
- GCP credentials via
GCP_PROD_DB_ACCESS_CREDENTIALSenv var orgcloud auth application-default login - If
CI=true: expectscloud-sql-proxyrunning on localhost, or direct network access to the Cloud SQL instance.
Parameters:
CONNECTION-ID, --connection-id: The UUID of the Airbyte Cloud connection. [required]OUTPUT-PATH, --output-path: Path to output file or directory. If directory, writes connection--config.json inside it. Default: platform temp directory (e.g. /tmp/connection- -config.json) WITH-SECRETS, --with-secrets, --no-secrets: If set, fetches unmasked secrets from the internal database. Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var orgcloud auth application-default login. Must be used with --oc-issue-url. [default: False]OC-ISSUE-URL, --oc-issue-url: OC issue URL for audit logging. Required when using --with-secrets.
airbyte-ops cloud db
Database operations for Airbyte Cloud Prod DB Replica.
airbyte-ops cloud db start-proxy
airbyte-ops cloud db start-proxy [ARGS]
Start the Cloud SQL Proxy for database access.
This command starts the Cloud SQL Auth Proxy to enable connections to the Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.
By default, runs as a daemon (background process). Use --no-daemon to run in foreground mode where you can see logs and stop with Ctrl+C.
Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, which should contain the service account JSON credentials.
After starting the proxy, set these environment variables to use database tools: export USE_CLOUD_SQL_PROXY=1 export DB_PORT={port}
Parameters:
PORT, --port: Port for the Cloud SQL Proxy to listen on. [default: 15432]DAEMON, --daemon, --no-daemon: Run as daemon in background (default). Use --no-daemon for foreground. [default: True]
airbyte-ops cloud db stop-proxy
airbyte-ops cloud db stop-proxy
Stop the Cloud SQL Proxy daemon.
This command stops a Cloud SQL Proxy that was started with 'start-proxy'. It reads the PID from the PID file and sends a SIGTERM signal to stop the process.
airbyte-ops cloud connection
Connection operations in Airbyte Cloud.
airbyte-ops cloud connection state
Connection state operations in Airbyte Cloud.
Commands:
get: Get the current state for an Airbyte Cloud connection.set: Set the state for an Airbyte Cloud connection.
airbyte-ops cloud connection state get
airbyte-ops cloud connection state get CONNECTION-ID [ARGS]
Get the current state for an Airbyte Cloud connection.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch state for. [required]STREAM, --stream: Optional stream name to filter state for a single stream.NAMESPACE, --namespace: Optional stream namespace to narrow the stream filter.OUTPUT, --output: Path to write the state JSON output to a file.
airbyte-ops cloud connection state set
airbyte-ops cloud connection state set CONNECTION-ID [ARGS]
Set the state for an Airbyte Cloud connection.
State JSON can be provided as a positional argument, via --input
Uses the safe variant that prevents updates while a sync is running. When --stream is provided, only that stream's state is updated within the existing connection state.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]STATE-JSON, --state-json: The connection state as a JSON string. When --stream is used, this is just the stream's state blob (e.g., '{"cursor": "2024-01-01"}'). Otherwise, must include 'stateType', 'connectionId', and the appropriate state field. Can also be provided via --input or STDIN.STREAM, --stream: Optional stream name to update state for a single stream only.NAMESPACE, --namespace: Optional stream namespace to identify the stream.INPUT, --input: Path to a JSON file containing the state to set.
airbyte-ops cloud connection catalog
Connection catalog operations in Airbyte Cloud.
Commands:
get: Get the configured catalog for an Airbyte Cloud connection.set: Set the configured catalog for an Airbyte Cloud connection.
airbyte-ops cloud connection catalog get
airbyte-ops cloud connection catalog get CONNECTION-ID [ARGS]
Get the configured catalog for an Airbyte Cloud connection.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch catalog for. [required]OUTPUT, --output: Path to write the catalog JSON output to a file.
airbyte-ops cloud connection catalog set
airbyte-ops cloud connection catalog set CONNECTION-ID [ARGS]
Set the configured catalog for an Airbyte Cloud connection.
Catalog JSON can be provided as a positional argument, via --input
WARNING: This replaces the entire configured catalog.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update catalog for. [required]CATALOG-JSON, --catalog-json: The configured catalog as a JSON string. Can also be provided via --input or STDIN.INPUT, --input: Path to a JSON file containing the catalog to set.
airbyte-ops cloud logs
GCP Cloud Logging operations for Airbyte Cloud.
airbyte-ops cloud logs lookup-cloud-backend-error
airbyte-ops cloud logs lookup-cloud-backend-error ERROR-ID [ARGS]
Look up error details from GCP Cloud Logging by error ID.
When an Airbyte Cloud API returns an error response with only an error ID (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command fetches the full stack trace and error details from GCP Cloud Logging.
Requires GCP credentials with Logs Viewer role on the target project. Set up credentials with: gcloud auth application-default login
Parameters:
ERROR-ID, --error-id: The error ID (UUID) to search for. This is typically returned in API error responses as {'errorId': '...'} [required]LOOKBACK-DAYS, --lookback-days: Number of days to look back in logs. [default: 7]MIN-SEVERITY-FILTER, --min-severity-filter: Optional minimum severity level to filter logs. [choices: debug, info, notice, warning, error, critical, alert, emergency]RAW, --raw, --no-raw: Output raw JSON instead of formatted text. [default: False]
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""CLI commands for Airbyte Cloud operations. 3 4Commands: 5 airbyte-ops cloud connector get-version-info - Get connector version info 6 airbyte-ops cloud connector set-version-override - Set connector version override 7 airbyte-ops cloud connector clear-version-override - Clear connector version override 8 airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) 9 airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file 10 11## CLI reference 12 13The commands below are regenerated by `poe docs-generate` via cyclopts's 14programmatic docs API; see `docs/generate_cli.py`. 15 16.. include:: ../../../docs/generated/cli/cloud.md 17 :start-line: 2 18""" 19 20from __future__ import annotations 21 22# Hide Python-level members from the pdoc page for this module; the rendered 23# docs for this CLI group come entirely from the grafted `.. include::` in 24# the module docstring above. 25__all__: list[str] = [] 26 27import json 28import logging 29import os 30import shutil 31import signal 32import socket 33import subprocess 34import sys 35import tempfile 36import time 37from pathlib import Path 38from typing import Annotated, Literal 39 40import requests 41import yaml 42from airbyte.cloud.workspaces import CloudWorkspace 43from airbyte_cdk.models.connector_metadata import MetadataFile 44from airbyte_cdk.utils.connector_paths import find_connector_root_from_name 45from airbyte_cdk.utils.docker import build_connector_image, verify_docker_installation 46from airbyte_protocol.models import ConfiguredAirbyteCatalog 47from cyclopts import Parameter 48 49from airbyte_ops_mcp.cli._base import App, app 50from airbyte_ops_mcp.cli._shared import ( 51 exit_with_error, 52 print_error, 53 print_json, 54 print_success, 55 print_warning, 56) 57from airbyte_ops_mcp.cloud_admin.connection_config import fetch_connection_config 58from airbyte_ops_mcp.constants import ( 59 CLOUD_SQL_INSTANCE, 60 CLOUD_SQL_PROXY_PID_FILE, 61 DEFAULT_CLOUD_SQL_PROXY_PORT, 62 ENV_GCP_PROD_DB_ACCESS_CREDENTIALS, 63) 64from airbyte_ops_mcp.gcp_logs import GCPSeverity, fetch_error_logs 65from airbyte_ops_mcp.mcp.cloud_connector_versions import ( 66 get_cloud_connector_version, 67 set_cloud_connector_version_override, 68) 69from airbyte_ops_mcp.regression_tests.cdk_secrets import get_first_config_from_secrets 70from airbyte_ops_mcp.regression_tests.ci_output import ( 71 generate_regression_report, 72 generate_single_version_report, 73 write_github_output, 74 write_github_outputs, 75 write_github_summary, 76 write_json_output, 77 write_test_summary, 78) 79from airbyte_ops_mcp.regression_tests.config_overrides import ( 80 filter_configured_catalog_file, 81) 82from airbyte_ops_mcp.regression_tests.connection_fetcher import ( 83 fetch_connection_data, 84 fetch_connection_state, 85 save_connection_data_to_files, 86) 87from airbyte_ops_mcp.regression_tests.connection_secret_retriever import ( 88 SecretRetrievalError, 89 enrich_config_with_secrets, 90 should_use_secret_retriever, 91) 92from airbyte_ops_mcp.regression_tests.connector_runner import ( 93 ConnectorRunner, 94 ensure_image_available, 95) 96from airbyte_ops_mcp.regression_tests.http_metrics import ( 97 MitmproxyManager, 98 parse_http_dump, 99) 100from airbyte_ops_mcp.regression_tests.models import ( 101 Command, 102 ConnectorUnderTest, 103 ExecutionInputs, 104 TargetOrControl, 105) 106from airbyte_ops_mcp.telemetry import track_regression_test 107 108# Path to connectors directory within the airbyte repo 109CONNECTORS_SUBDIR = Path("airbyte-integrations") / "connectors" 110 111# Create the cloud sub-app 112cloud_app = App(name="cloud", help="Airbyte Cloud operations.") 113app.command(cloud_app) 114 115# Create the connector sub-app under cloud 116connector_app = App( 117 name="connector", help="Deployed connector operations in Airbyte Cloud." 118) 119cloud_app.command(connector_app) 120 121# Create the db sub-app under cloud 122db_app = App(name="db", help="Database operations for Airbyte Cloud Prod DB Replica.") 123cloud_app.command(db_app) 124 125# Create the connection sub-app under cloud 126connection_app = App(name="connection", help="Connection operations in Airbyte Cloud.") 127cloud_app.command(connection_app) 128 129# Create the state sub-app under connection 130state_app = App(name="state", help="Connection state operations in Airbyte Cloud.") 131connection_app.command(state_app) 132 133# Create the catalog sub-app under connection 134catalog_app = App( 135 name="catalog", help="Connection catalog operations in Airbyte Cloud." 136) 137connection_app.command(catalog_app) 138 139# Create the logs sub-app under cloud 140logs_app = App(name="logs", help="GCP Cloud Logging operations for Airbyte Cloud.") 141cloud_app.command(logs_app) 142 143 144@db_app.command(name="start-proxy") 145def start_proxy( 146 port: Annotated[ 147 int, 148 Parameter(help="Port for the Cloud SQL Proxy to listen on."), 149 ] = DEFAULT_CLOUD_SQL_PROXY_PORT, 150 daemon: Annotated[ 151 bool, 152 Parameter( 153 help="Run as daemon in background (default). Use --no-daemon for foreground." 154 ), 155 ] = True, 156) -> None: 157 """Start the Cloud SQL Proxy for database access. 158 159 This command starts the Cloud SQL Auth Proxy to enable connections to the 160 Airbyte Cloud Prod DB Replica. The proxy is required for database query tools. 161 162 By default, runs as a daemon (background process). Use --no-daemon to run in 163 foreground mode where you can see logs and stop with Ctrl+C. 164 165 Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, 166 which should contain the service account JSON credentials. 167 168 After starting the proxy, set these environment variables to use database tools: 169 export USE_CLOUD_SQL_PROXY=1 170 export DB_PORT={port} 171 172 Example: 173 airbyte-ops cloud db start-proxy 174 airbyte-ops cloud db start-proxy --port 15432 175 airbyte-ops cloud db start-proxy --no-daemon 176 """ 177 # Check if proxy is already running on the requested port (idempotency) 178 try: 179 with socket.create_connection(("127.0.0.1", port), timeout=0.5): 180 # Something is already listening on this port 181 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 182 pid_info = "" 183 if pid_file.exists(): 184 pid_info = f" (PID: {pid_file.read_text().strip()})" 185 print_success( 186 f"Cloud SQL Proxy is already running on port {port}{pid_info}" 187 ) 188 print_success("") 189 print_success("To use database tools, set these environment variables:") 190 print_success(" export USE_CLOUD_SQL_PROXY=1") 191 print_success(f" export DB_PORT={port}") 192 return 193 except (OSError, TimeoutError, ConnectionRefusedError): 194 pass # Port not in use, proceed with starting proxy 195 196 # Check if cloud-sql-proxy is installed 197 proxy_path = shutil.which("cloud-sql-proxy") 198 if not proxy_path: 199 exit_with_error( 200 "cloud-sql-proxy not found in PATH. " 201 "Install it from: https://cloud.google.com/sql/docs/mysql/sql-proxy" 202 ) 203 204 # Get credentials from environment 205 creds_json = os.getenv(ENV_GCP_PROD_DB_ACCESS_CREDENTIALS) 206 if not creds_json: 207 exit_with_error( 208 f"{ENV_GCP_PROD_DB_ACCESS_CREDENTIALS} environment variable is not set. " 209 "This should contain the GCP service account JSON credentials." 210 ) 211 212 # Build the command using --json-credentials to avoid writing to disk 213 cmd = [ 214 proxy_path, 215 CLOUD_SQL_INSTANCE, 216 f"--port={port}", 217 f"--json-credentials={creds_json}", 218 ] 219 220 print_success(f"Starting Cloud SQL Proxy on port {port}...") 221 print_success(f"Instance: {CLOUD_SQL_INSTANCE}") 222 print_success("") 223 print_success("To use database tools, set these environment variables:") 224 print_success(" export USE_CLOUD_SQL_PROXY=1") 225 print_success(f" export DB_PORT={port}") 226 print_success("") 227 228 if daemon: 229 # Run in background (daemon mode) with log file for diagnostics 230 log_file_path = Path("/tmp/airbyte-cloud-sql-proxy.log") 231 log_file = log_file_path.open("ab") 232 process = subprocess.Popen( 233 cmd, 234 stdout=subprocess.DEVNULL, 235 stderr=log_file, 236 start_new_session=True, 237 ) 238 239 # Brief wait to verify the process started successfully 240 time.sleep(0.5) 241 if process.poll() is not None: 242 # Process exited immediately - read any error output 243 log_file.close() 244 error_output = "" 245 if log_file_path.exists(): 246 error_output = log_file_path.read_text()[-1000:] # Last 1000 chars 247 exit_with_error( 248 f"Cloud SQL Proxy failed to start (exit code: {process.returncode}).\n" 249 f"Check logs at {log_file_path}\n" 250 f"Recent output: {error_output}" 251 ) 252 253 # Write PID to file for stop-proxy command 254 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 255 pid_file.write_text(str(process.pid)) 256 print_success(f"Cloud SQL Proxy started as daemon (PID: {process.pid})") 257 print_success(f"Logs: {log_file_path}") 258 print_success("To stop: airbyte-ops cloud db stop-proxy") 259 else: 260 # Run in foreground - replace current process 261 # Signals (Ctrl+C) will be handled directly by the cloud-sql-proxy process 262 print_success("Running in foreground. Press Ctrl+C to stop the proxy.") 263 print_success("") 264 os.execv(proxy_path, cmd) 265 266 267@db_app.command(name="stop-proxy") 268def stop_proxy() -> None: 269 """Stop the Cloud SQL Proxy daemon. 270 271 This command stops a Cloud SQL Proxy that was started with 'start-proxy'. 272 It reads the PID from the PID file and sends a SIGTERM signal to stop the process. 273 274 Example: 275 airbyte-ops cloud db stop-proxy 276 """ 277 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 278 279 if not pid_file.exists(): 280 exit_with_error( 281 f"PID file not found at {CLOUD_SQL_PROXY_PID_FILE}. " 282 "No Cloud SQL Proxy daemon appears to be running." 283 ) 284 285 pid_str = pid_file.read_text().strip() 286 if not pid_str.isdigit(): 287 pid_file.unlink() 288 exit_with_error(f"Invalid PID in {CLOUD_SQL_PROXY_PID_FILE}: {pid_str}") 289 290 pid = int(pid_str) 291 292 # Check if process is still running 293 try: 294 os.kill(pid, 0) # Signal 0 just checks if process exists 295 except ProcessLookupError: 296 pid_file.unlink() 297 print_success( 298 f"Cloud SQL Proxy (PID: {pid}) is not running. Cleaned up PID file." 299 ) 300 return 301 except PermissionError: 302 exit_with_error(f"Permission denied to check process {pid}.") 303 304 # Send SIGTERM to stop the process 305 try: 306 os.kill(pid, signal.SIGTERM) 307 print_success(f"Sent SIGTERM to Cloud SQL Proxy (PID: {pid}).") 308 except ProcessLookupError: 309 print_success(f"Cloud SQL Proxy (PID: {pid}) already stopped.") 310 except PermissionError: 311 exit_with_error(f"Permission denied to stop process {pid}.") 312 313 # Clean up PID file 314 pid_file.unlink(missing_ok=True) 315 print_success("Cloud SQL Proxy stopped.") 316 317 318@connector_app.command(name="get-version-info") 319def get_version_info( 320 workspace_id: Annotated[ 321 str, 322 Parameter(help="The Airbyte Cloud workspace ID."), 323 ], 324 connector_id: Annotated[ 325 str, 326 Parameter(help="The ID of the deployed connector (source or destination)."), 327 ], 328 connector_type: Annotated[ 329 Literal["source", "destination"], 330 Parameter(help="The type of connector."), 331 ], 332) -> None: 333 """Get the current version information for a deployed connector.""" 334 result = get_cloud_connector_version( 335 workspace_id=workspace_id, 336 actor_id=connector_id, 337 actor_type=connector_type, 338 ) 339 print_json(result.model_dump()) 340 341 342@connector_app.command(name="set-version-override") 343def set_version_override( 344 workspace_id: Annotated[ 345 str, 346 Parameter(help="The Airbyte Cloud workspace ID."), 347 ], 348 connector_id: Annotated[ 349 str, 350 Parameter(help="The ID of the deployed connector (source or destination)."), 351 ], 352 connector_type: Annotated[ 353 Literal["source", "destination"], 354 Parameter(help="The type of connector."), 355 ], 356 version: Annotated[ 357 str, 358 Parameter( 359 help="The semver version string to pin to (e.g., '2.1.5-preview.abc1234')." 360 ), 361 ], 362 reason: Annotated[ 363 str, 364 Parameter(help="Explanation for the override (min 10 characters)."), 365 ], 366 issue_url: Annotated[ 367 str, 368 Parameter(help="GitHub issue URL providing context for this operation."), 369 ], 370 approval_comment_url: Annotated[ 371 str, 372 Parameter( 373 help="Slack approval record URL where admin authorized this deployment." 374 ), 375 ], 376 ai_agent_session_url: Annotated[ 377 str | None, 378 Parameter( 379 help="URL to AI agent session driving this operation (for auditability)." 380 ), 381 ] = None, 382 reason_url: Annotated[ 383 str | None, 384 Parameter(help="Optional URL with more context (e.g., issue link)."), 385 ] = None, 386 customer_tier_filter: Annotated[ 387 Literal["TIER_0", "TIER_1", "TIER_2", "ALL"], 388 Parameter( 389 help=( 390 "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 391 "The operation will be rejected if the actual customer tier does not match. " 392 "Defaults to 'TIER_2' (non-sensitive customers)." 393 ), 394 ), 395 ] = "TIER_2", 396) -> None: 397 """Set a version override for a deployed connector. 398 399 Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and 400 AIRBYTE_INTERNAL_ADMIN_USER environment variables. 401 402 The `customer_tier_filter` gates the operation: the call fails if 403 the actual tier of the workspace's organization does not match. 404 Use `ALL` to proceed regardless of tier (a warning is shown for sensitive tiers). 405 """ 406 admin_user_email = os.environ.get("AIRBYTE_INTERNAL_ADMIN_USER") 407 result = set_cloud_connector_version_override( 408 workspace_id=workspace_id, 409 actor_id=connector_id, 410 actor_type=connector_type, 411 version=version, 412 unset=False, 413 override_reason=reason, 414 override_reason_reference_url=reason_url, 415 admin_user_email=admin_user_email, 416 issue_url=issue_url, 417 approval_comment_url=approval_comment_url, 418 ai_agent_session_url=ai_agent_session_url, 419 customer_tier_filter=customer_tier_filter, 420 ) 421 if result.success: 422 print_success(result.message) 423 else: 424 print_error(result.message) 425 print_json(result.model_dump()) 426 427 428@connector_app.command(name="clear-version-override") 429def clear_version_override( 430 workspace_id: Annotated[ 431 str, 432 Parameter(help="The Airbyte Cloud workspace ID."), 433 ], 434 connector_id: Annotated[ 435 str, 436 Parameter(help="The ID of the deployed connector (source or destination)."), 437 ], 438 connector_type: Annotated[ 439 Literal["source", "destination"], 440 Parameter(help="The type of connector."), 441 ], 442 issue_url: Annotated[ 443 str, 444 Parameter(help="GitHub issue URL providing context for this operation."), 445 ], 446 approval_comment_url: Annotated[ 447 str, 448 Parameter( 449 help="Slack approval record URL where admin authorized this deployment." 450 ), 451 ], 452 ai_agent_session_url: Annotated[ 453 str | None, 454 Parameter( 455 help="URL to AI agent session driving this operation (for auditability)." 456 ), 457 ] = None, 458 customer_tier_filter: Annotated[ 459 Literal["TIER_0", "TIER_1", "TIER_2", "ALL"], 460 Parameter( 461 help=( 462 "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 463 "The operation will be rejected if the actual customer tier does not match. " 464 "Defaults to 'TIER_2' (non-sensitive customers)." 465 ), 466 ), 467 ] = "TIER_2", 468) -> None: 469 """Clear a version override from a deployed connector. 470 471 Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and 472 AIRBYTE_INTERNAL_ADMIN_USER environment variables. 473 474 The `customer_tier_filter` gates the operation: the call fails if 475 the actual tier of the workspace's organization does not match. 476 Use `ALL` to proceed regardless of tier (a warning is shown for sensitive tiers). 477 """ 478 admin_user_email = os.environ.get("AIRBYTE_INTERNAL_ADMIN_USER") 479 result = set_cloud_connector_version_override( 480 workspace_id=workspace_id, 481 actor_id=connector_id, 482 actor_type=connector_type, 483 version=None, 484 unset=True, 485 override_reason=None, 486 override_reason_reference_url=None, 487 admin_user_email=admin_user_email, 488 issue_url=issue_url, 489 approval_comment_url=approval_comment_url, 490 ai_agent_session_url=ai_agent_session_url, 491 customer_tier_filter=customer_tier_filter, 492 ) 493 if result.success: 494 print_success(result.message) 495 else: 496 print_error(result.message) 497 print_json(result.model_dump()) 498 499 500def _load_json_file(file_path: Path) -> dict | None: 501 """Load a JSON file and return its contents. 502 503 Returns None if the file doesn't exist or contains invalid JSON. 504 """ 505 if not file_path.exists(): 506 return None 507 try: 508 return json.loads(file_path.read_text()) 509 except json.JSONDecodeError as e: 510 print_error(f"Failed to parse JSON in file: {file_path}\nError: {e}") 511 return None 512 513 514def _run_connector_command( 515 connector_image: str, 516 command: Command, 517 output_dir: Path, 518 target_or_control: TargetOrControl, 519 config_path: Path | None = None, 520 catalog_path: Path | None = None, 521 state_path: Path | None = None, 522 proxy_url: str | None = None, 523 enable_debug_logs: bool = False, 524) -> dict: 525 """Run a connector command and return results as a dict. 526 527 Args: 528 connector_image: Full connector image name with tag. 529 command: The Airbyte command to run. 530 output_dir: Directory to store output files. 531 target_or_control: Whether this is target or control version. 532 config_path: Path to connector config JSON file. 533 catalog_path: Path to configured catalog JSON file. 534 state_path: Path to state JSON file. 535 proxy_url: Optional HTTP proxy URL for traffic capture. 536 enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container. 537 538 Returns: 539 Dictionary with execution results. 540 """ 541 connector = ConnectorUnderTest.from_image_name(connector_image, target_or_control) 542 543 config = _load_json_file(config_path) if config_path else None 544 state = _load_json_file(state_path) if state_path else None 545 546 configured_catalog = None 547 if catalog_path and catalog_path.exists(): 548 catalog_json = catalog_path.read_text() 549 configured_catalog = ConfiguredAirbyteCatalog.parse_raw(catalog_json) 550 551 # Pass log level as an environment variable to the connector container 552 container_env: dict[str, str] = {} 553 if enable_debug_logs: 554 container_env["LOG_LEVEL"] = "DEBUG" 555 556 execution_inputs = ExecutionInputs( 557 connector_under_test=connector, 558 command=command, 559 output_dir=output_dir, 560 config=config, 561 configured_catalog=configured_catalog, 562 state=state, 563 environment_variables=container_env or None, 564 ) 565 566 runner = ConnectorRunner(execution_inputs, proxy_url=proxy_url) 567 result = runner.run() 568 569 result.save_artifacts(output_dir) 570 571 return { 572 "connector": connector_image, 573 "command": command.value, 574 "success": result.success, 575 "exit_code": result.exit_code, 576 "stdout_file": str(result.stdout_file_path), 577 "stderr_file": str(result.stderr_file_path), 578 "message_counts": { 579 k.value: v for k, v in result.get_message_count_per_type().items() 580 }, 581 "record_counts_per_stream": result.get_record_count_per_stream(), 582 } 583 584 585def _build_connector_image_from_source( 586 connector_name: str, 587 repo_root: Path | None = None, 588 tag: str = "dev", 589) -> str | None: 590 """Build a connector image from source code. 591 592 Args: 593 connector_name: Name of the connector (e.g., 'source-pokeapi'). 594 repo_root: Optional path to the airbyte repo root. If not provided, 595 will attempt to auto-detect from current directory. 596 tag: Tag to apply to the built image (default: 'dev'). 597 598 Returns: 599 The full image name with tag if successful, None if build fails. 600 """ 601 if not verify_docker_installation(): 602 print_error("Docker is not installed or not running") 603 return None 604 605 try: 606 connector_directory = find_connector_root_from_name(connector_name) 607 except FileNotFoundError: 608 if repo_root: 609 connector_directory = repo_root / CONNECTORS_SUBDIR / connector_name 610 if not connector_directory.exists(): 611 print_error(f"Connector directory not found: {connector_directory}") 612 return None 613 else: 614 print_error( 615 f"Could not find connector '{connector_name}'. " 616 "Try providing --repo-root to specify the airbyte repo location." 617 ) 618 return None 619 620 metadata_file_path = connector_directory / "metadata.yaml" 621 if not metadata_file_path.exists(): 622 print_error(f"metadata.yaml not found at {metadata_file_path}") 623 return None 624 625 metadata = MetadataFile.from_file(metadata_file_path) 626 print_success(f"Building image for connector: {connector_name}") 627 628 built_image = build_connector_image( 629 connector_name=connector_name, 630 connector_directory=connector_directory, 631 metadata=metadata, 632 tag=tag, 633 no_verify=False, 634 ) 635 print_success(f"Successfully built image: {built_image}") 636 return built_image 637 638 639def _fetch_control_image_from_metadata(connector_name: str) -> str | None: 640 """Fetch the current released connector image from metadata.yaml on main branch. 641 642 This fetches the connector's metadata.yaml from the airbyte monorepo's master branch 643 and extracts the dockerRepository and dockerImageTag to construct the control image. 644 645 Args: 646 connector_name: The connector name (e.g., 'source-github'). 647 648 Returns: 649 The full connector image with tag (e.g., 'airbyte/source-github:1.0.0'), 650 or None if the metadata could not be fetched or parsed. 651 """ 652 metadata_url = ( 653 f"https://raw.githubusercontent.com/airbytehq/airbyte/master/" 654 f"airbyte-integrations/connectors/{connector_name}/metadata.yaml" 655 ) 656 response = requests.get(metadata_url, timeout=30) 657 if not response.ok: 658 print_error( 659 f"Failed to fetch metadata for {connector_name}: " 660 f"HTTP {response.status_code} from {metadata_url}" 661 ) 662 return None 663 664 metadata = yaml.safe_load(response.text) 665 if not isinstance(metadata, dict): 666 print_error(f"Invalid metadata format for {connector_name}: expected dict") 667 return None 668 669 data = metadata.get("data", {}) 670 docker_repository = data.get("dockerRepository") 671 docker_image_tag = data.get("dockerImageTag") 672 673 if not docker_repository or not docker_image_tag: 674 print_error( 675 f"Could not find dockerRepository/dockerImageTag in metadata for {connector_name}" 676 ) 677 return None 678 679 return f"{docker_repository}:{docker_image_tag}" 680 681 682def _run_with_optional_http_metrics( 683 connector_image: str, 684 command: Command, 685 output_dir: Path, 686 target_or_control: TargetOrControl, 687 enable_http_metrics: bool, 688 config_path: Path | None, 689 catalog_path: Path | None, 690 state_path: Path | None, 691 enable_debug_logs: bool = False, 692) -> dict: 693 """Run a connector command with optional HTTP metrics capture. 694 695 When enable_http_metrics is True, starts mitmproxy to capture HTTP traffic. 696 If mitmproxy fails to start, falls back to running without metrics. 697 698 Args: 699 connector_image: Full connector image name with tag. 700 command: The Airbyte command to run. 701 output_dir: Directory to store output files. 702 target_or_control: Whether this is target or control version. 703 enable_http_metrics: Whether to capture HTTP metrics via mitmproxy. 704 config_path: Path to connector config JSON file. 705 catalog_path: Path to configured catalog JSON file. 706 state_path: Path to state JSON file. 707 enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container. 708 709 Returns: 710 Dictionary with execution results, optionally including http_metrics. 711 """ 712 if not enable_http_metrics: 713 return _run_connector_command( 714 connector_image=connector_image, 715 command=command, 716 output_dir=output_dir, 717 target_or_control=target_or_control, 718 config_path=config_path, 719 catalog_path=catalog_path, 720 state_path=state_path, 721 enable_debug_logs=enable_debug_logs, 722 ) 723 724 with MitmproxyManager.start(output_dir) as session: 725 if session is None: 726 print_error("Mitmproxy unavailable, running without HTTP metrics") 727 return _run_connector_command( 728 connector_image=connector_image, 729 command=command, 730 output_dir=output_dir, 731 target_or_control=target_or_control, 732 config_path=config_path, 733 catalog_path=catalog_path, 734 state_path=state_path, 735 enable_debug_logs=enable_debug_logs, 736 ) 737 738 print_success(f"Started mitmproxy on {session.proxy_url}") 739 result = _run_connector_command( 740 connector_image=connector_image, 741 command=command, 742 output_dir=output_dir, 743 target_or_control=target_or_control, 744 config_path=config_path, 745 catalog_path=catalog_path, 746 state_path=state_path, 747 proxy_url=session.proxy_url, 748 enable_debug_logs=enable_debug_logs, 749 ) 750 751 http_metrics = parse_http_dump(session.dump_file_path) 752 result["http_metrics"] = { 753 "flow_count": http_metrics.flow_count, 754 "duplicate_flow_count": http_metrics.duplicate_flow_count, 755 } 756 print_success( 757 f"Captured {http_metrics.flow_count} HTTP flows " 758 f"({http_metrics.duplicate_flow_count} duplicates)" 759 ) 760 return result 761 762 763@connector_app.command(name="regression-test") 764def regression_test( 765 skip_compare: Annotated[ 766 bool, 767 Parameter( 768 help="If True, skip comparison and run single-version tests only. " 769 "If False (default), run comparison tests (target vs control)." 770 ), 771 ] = False, 772 test_image: Annotated[ 773 str | None, 774 Parameter( 775 help="Test connector image with tag (e.g., airbyte/source-github:1.0.0). " 776 "This is the image under test - in comparison mode, it's compared against control_image." 777 ), 778 ] = None, 779 control_image: Annotated[ 780 str | None, 781 Parameter( 782 help="Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). " 783 "Ignored if `skip_compare=True`." 784 ), 785 ] = None, 786 connector_name: Annotated[ 787 str | None, 788 Parameter( 789 help="Connector name to build image from source (e.g., 'source-pokeapi'). " 790 "If provided, builds the image locally with tag 'dev'. " 791 "For comparison tests (default), this builds the target image. " 792 "For single-version tests (skip_compare=True), this builds the test image." 793 ), 794 ] = None, 795 repo_root: Annotated[ 796 str | None, 797 Parameter( 798 help="Path to the airbyte repo root. Required if connector_name is provided " 799 "and the repo cannot be auto-detected." 800 ), 801 ] = None, 802 command: Annotated[ 803 Literal["spec", "check", "discover", "read"], 804 Parameter(help="The Airbyte command to run."), 805 ] = "check", 806 connection_id: Annotated[ 807 str | None, 808 Parameter( 809 help="Airbyte Cloud connection ID to fetch config/catalog from. " 810 "Mutually exclusive with config-path/catalog-path. " 811 "If provided, test_image/control_image can be auto-detected." 812 ), 813 ] = None, 814 config_path: Annotated[ 815 str | None, 816 Parameter(help="Path to the connector config JSON file."), 817 ] = None, 818 catalog_path: Annotated[ 819 str | None, 820 Parameter(help="Path to the configured catalog JSON file (required for read)."), 821 ] = None, 822 state_path: Annotated[ 823 str | None, 824 Parameter(help="Path to the state JSON file (optional for read)."), 825 ] = None, 826 output_dir: Annotated[ 827 str, 828 Parameter(help="Directory to store test artifacts."), 829 ] = "/tmp/regression_test_artifacts", 830 enable_http_metrics: Annotated[ 831 bool, 832 Parameter( 833 help="Capture HTTP traffic metrics via mitmproxy (experimental). " 834 "Requires mitmdump to be installed. Only used in comparison mode." 835 ), 836 ] = False, 837 selected_streams: Annotated[ 838 str | None, 839 Parameter( 840 help="Comma-separated list of stream names to include in the read. " 841 "Only these streams will be included in the configured catalog. " 842 "This is useful to limit data volume by testing only specific streams." 843 ), 844 ] = None, 845 enable_debug_logs: Annotated[ 846 bool, 847 Parameter( 848 help="Enable debug-level logging for regression test output. " 849 "Also passed as `LOG_LEVEL=DEBUG` to the connector Docker container." 850 ), 851 ] = False, 852 with_state: Annotated[ 853 bool | None, 854 Parameter( 855 negative="--no-state", 856 help="Fetch and pass the connection's current state to the read command, " 857 "producing a warm read instead of a cold read. Defaults to `True` when " 858 "`--connection-id` is provided, `False` otherwise. Has no effect unless " 859 "the command is `read`. Ignored when `--state-path` is explicitly provided.", 860 ), 861 ] = None, 862) -> None: 863 """Run regression tests on connectors. 864 865 This command supports two modes: 866 867 Comparison mode (skip_compare=False, default): 868 Runs the specified Airbyte protocol command against both the target (new) 869 and control (baseline) connector versions, then compares the results. 870 This helps identify regressions between versions. 871 872 Single-version mode (skip_compare=True): 873 Runs the specified Airbyte protocol command against a single connector 874 and validates the output. No comparison is performed. 875 876 Results are written to the output directory and to GitHub Actions outputs 877 if running in CI. 878 879 You can provide the test image in three ways: 880 1. --test-image: Use a pre-built image from Docker registry 881 2. --connector-name: Build the image locally from source code 882 3. --connection-id: Auto-detect from an Airbyte Cloud connection 883 884 You can provide config/catalog either via file paths OR via a connection_id 885 that fetches them from Airbyte Cloud. 886 """ 887 # Configure debug logging for the regression test harness when requested 888 if enable_debug_logs: 889 logging.basicConfig( 890 level=logging.DEBUG, 891 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 892 force=True, 893 ) 894 895 output_path = Path(output_dir) 896 output_path.mkdir(parents=True, exist_ok=True) 897 898 cmd = Command(command) 899 900 config_file: Path | None = None 901 catalog_file: Path | None = None 902 state_file = Path(state_path) if state_path else None 903 904 # Resolve the test image (used in both single-version and comparison modes) 905 resolved_test_image: str | None = test_image 906 resolved_control_image: str | None = control_image 907 908 # Validate conflicting parameters 909 # Single-version mode: reject comparison-specific parameters 910 if skip_compare and control_image: 911 write_github_output("success", False) 912 write_github_output( 913 "error", "Cannot specify control_image with skip_compare=True" 914 ) 915 exit_with_error( 916 "Cannot specify --control-image with --skip-compare. " 917 "Control image is only used in comparison mode." 918 ) 919 920 # If connector_name is provided, build the image from source 921 if connector_name: 922 if resolved_test_image: 923 write_github_output("success", False) 924 write_github_output( 925 "error", "Cannot specify both test_image and connector_name" 926 ) 927 exit_with_error("Cannot specify both --test-image and --connector-name") 928 929 repo_root_path = Path(repo_root) if repo_root else None 930 built_image = _build_connector_image_from_source( 931 connector_name=connector_name, 932 repo_root=repo_root_path, 933 tag="dev", 934 ) 935 if not built_image: 936 write_github_output("success", False) 937 write_github_output("error", f"Failed to build image for {connector_name}") 938 exit_with_error(f"Failed to build image for {connector_name}") 939 resolved_test_image = built_image 940 941 if connection_id: 942 if config_path or catalog_path: 943 write_github_output("success", False) 944 write_github_output( 945 "error", "Cannot specify both connection_id and file paths" 946 ) 947 exit_with_error( 948 "Cannot specify both connection_id and config_path/catalog_path" 949 ) 950 951 print_success(f"Fetching config/catalog from connection: {connection_id}") 952 connection_data = fetch_connection_data(connection_id) 953 954 # Check if we should retrieve unmasked secrets 955 if should_use_secret_retriever(): 956 print_success( 957 "USE_CONNECTION_SECRET_RETRIEVER enabled - enriching config with unmasked secrets..." 958 ) 959 try: 960 connection_data = enrich_config_with_secrets( 961 connection_data, 962 retrieval_reason="Regression test with USE_CONNECTION_SECRET_RETRIEVER=true", 963 ) 964 print_success("Successfully retrieved unmasked secrets from database") 965 except SecretRetrievalError as e: 966 write_github_output("success", False) 967 write_github_output("error", str(e)) 968 exit_with_error( 969 f"{e}\n\n" 970 f"This connection cannot be used for regression testing. " 971 f"Please use a connection from a non-EU workspace, or use GSM-based " 972 f"integration test credentials instead (by omitting --connection-id)." 973 ) 974 975 # Resolve with_state default: True when connection_id is provided 976 resolved_with_state = with_state if with_state is not None else True 977 978 if resolved_with_state and not state_file and command == "read": 979 print_success("Fetching connection state for warm read...") 980 try: 981 connection_data.state = fetch_connection_state(connection_data) 982 if connection_data.state: 983 print_success( 984 f"Fetched state with {len(connection_data.state)} stream(s)" 985 ) 986 else: 987 connection_data.state = None 988 print_success( 989 "No state available for this connection (initial sync)" 990 ) 991 except Exception as exc: 992 print_error(f"Failed to fetch connection state: {exc}") 993 print_success("Falling back to cold read (no state)") 994 995 config_file, catalog_file, state_file_from_conn = save_connection_data_to_files( 996 connection_data, output_path / "connection_data" 997 ) 998 999 if state_file_from_conn and not state_file: 1000 state_file = state_file_from_conn 1001 1002 print_success( 1003 f"Fetched config for source: {connection_data.source_name} " 1004 f"with {len(connection_data.stream_names)} streams" 1005 ) 1006 1007 # Auto-detect test/control image from connection if not provided 1008 if not resolved_test_image and connection_data.connector_image: 1009 resolved_test_image = connection_data.connector_image 1010 print_success(f"Auto-detected test image: {resolved_test_image}") 1011 1012 if ( 1013 not skip_compare 1014 and not resolved_control_image 1015 and connection_data.connector_image 1016 ): 1017 resolved_control_image = connection_data.connector_image 1018 print_success(f"Auto-detected control image: {resolved_control_image}") 1019 elif config_path: 1020 config_file = Path(config_path) 1021 catalog_file = Path(catalog_path) if catalog_path else None 1022 elif connector_name: 1023 # Fallback: fetch integration test secrets from GSM using PyAirbyte API 1024 print_success( 1025 f"No connection_id or config_path provided. " 1026 f"Attempting to fetch integration test config from GSM for {connector_name}..." 1027 ) 1028 gsm_config = get_first_config_from_secrets(connector_name) 1029 if gsm_config: 1030 # Write config to a temp file (not in output_path to avoid artifact upload) 1031 gsm_config_dir = Path( 1032 tempfile.mkdtemp(prefix=f"gsm-config-{connector_name}-") 1033 ) 1034 gsm_config_dir.chmod(0o700) 1035 gsm_config_file = gsm_config_dir / "config.json" 1036 gsm_config_file.write_text(json.dumps(gsm_config, indent=2)) 1037 gsm_config_file.chmod(0o600) 1038 config_file = gsm_config_file 1039 # Use catalog_path if provided (e.g., generated from discover output) 1040 catalog_file = Path(catalog_path) if catalog_path else None 1041 print_success( 1042 f"Fetched integration test config from GSM for {connector_name}" 1043 ) 1044 else: 1045 print_error( 1046 f"Failed to fetch integration test config from GSM for {connector_name}." 1047 ) 1048 config_file = None 1049 # Use catalog_path if provided (e.g., generated from discover output) 1050 catalog_file = Path(catalog_path) if catalog_path else None 1051 else: 1052 config_file = None 1053 catalog_file = Path(catalog_path) if catalog_path else None 1054 1055 # Auto-detect control_image from metadata.yaml if connector_name is provided (comparison mode only) 1056 if not skip_compare and not resolved_control_image and connector_name: 1057 resolved_control_image = _fetch_control_image_from_metadata(connector_name) 1058 if resolved_control_image: 1059 print_success( 1060 f"Auto-detected control image from metadata.yaml: {resolved_control_image}" 1061 ) 1062 1063 # Validate that we have the required images 1064 if not resolved_test_image: 1065 write_github_output("success", False) 1066 write_github_output("error", "No test image specified") 1067 exit_with_error( 1068 "You must provide one of the following: a test_image, a connector_name " 1069 "to build the image from source, or a connection_id to auto-detect the image." 1070 ) 1071 1072 if not skip_compare and not resolved_control_image: 1073 write_github_output("success", False) 1074 write_github_output("error", "No control image specified") 1075 exit_with_error( 1076 "You must provide one of the following: a control_image, a connection_id " 1077 "for a connection that has an associated connector image, or a connector_name " 1078 "to auto-detect the control image from the airbyte repo's metadata.yaml." 1079 ) 1080 1081 # Pull images if they weren't just built locally 1082 # If connector_name was provided, we just built the test image locally 1083 if not connector_name and not ensure_image_available(resolved_test_image): 1084 write_github_output("success", False) 1085 write_github_output("error", f"Failed to pull image: {resolved_test_image}") 1086 exit_with_error(f"Failed to pull test image: {resolved_test_image}") 1087 1088 if ( 1089 not skip_compare 1090 and resolved_control_image 1091 and not ensure_image_available(resolved_control_image) 1092 ): 1093 write_github_output("success", False) 1094 write_github_output("error", f"Failed to pull image: {resolved_control_image}") 1095 exit_with_error( 1096 f"Failed to pull control connector image: {resolved_control_image}" 1097 ) 1098 1099 # Apply selected_streams filter to catalog if requested 1100 if selected_streams and catalog_file: 1101 streams_set = {s.strip() for s in selected_streams.split(",") if s.strip()} 1102 if streams_set: 1103 print_success( 1104 f"Filtering catalog to {len(streams_set)} selected streams: " 1105 f"{', '.join(sorted(streams_set))}" 1106 ) 1107 filter_configured_catalog_file(catalog_file, streams_set) 1108 1109 # Upgrade READ → READ_WITH_STATE when a state file is available 1110 if cmd == Command.READ and state_file: 1111 cmd = Command.READ_WITH_STATE 1112 print_success("State available — using read-with-state (warm read)") 1113 1114 # Track telemetry for the regression test 1115 # Extract version from image tag (e.g., "airbyte/source-github:1.0.0" -> "1.0.0") 1116 target_version = ( 1117 resolved_test_image.rsplit(":", 1)[-1] 1118 if ":" in resolved_test_image 1119 else "unknown" 1120 ) 1121 control_version = None 1122 if resolved_control_image and ":" in resolved_control_image: 1123 control_version = resolved_control_image.rsplit(":", 1)[-1] 1124 1125 # Get tester identity from environment (GitHub Actions sets GITHUB_ACTOR) 1126 tester = os.getenv("GITHUB_ACTOR") or os.getenv("USER") 1127 1128 track_regression_test( 1129 user_id=tester, 1130 connector_image=resolved_test_image, 1131 command=command, 1132 target_version=target_version, 1133 control_version=control_version, 1134 additional_properties={ 1135 "connection_id": connection_id, 1136 "skip_compare": skip_compare, 1137 "with_state": cmd == Command.READ_WITH_STATE, 1138 }, 1139 ) 1140 1141 # Execute the appropriate mode 1142 if skip_compare: 1143 # Single-version mode: run only the connector image 1144 result = _run_connector_command( 1145 connector_image=resolved_test_image, 1146 command=cmd, 1147 output_dir=output_path, 1148 target_or_control=TargetOrControl.TARGET, 1149 config_path=config_file, 1150 catalog_path=catalog_file, 1151 state_path=state_file, 1152 enable_debug_logs=enable_debug_logs, 1153 ) 1154 1155 print_json(result) 1156 1157 write_github_outputs( 1158 { 1159 "success": result["success"], 1160 "connector": resolved_test_image, 1161 "command": command, 1162 "exit_code": result["exit_code"], 1163 } 1164 ) 1165 1166 write_test_summary( 1167 connector_image=resolved_test_image, 1168 test_type="regression-test", 1169 success=result["success"], 1170 results={ 1171 "command": command, 1172 "exit_code": result["exit_code"], 1173 "output_dir": output_dir, 1174 }, 1175 ) 1176 1177 # Generate report.md with detailed metrics 1178 report_path = generate_single_version_report( 1179 connector_image=resolved_test_image, 1180 command=command, 1181 result=result, 1182 output_dir=output_path, 1183 ) 1184 print_success(f"Generated report: {report_path}") 1185 1186 # Write report to GITHUB_STEP_SUMMARY (if env var exists) 1187 write_github_summary(report_path.read_text()) 1188 1189 if result["success"]: 1190 print_success( 1191 f"Single-version regression test passed for {resolved_test_image}" 1192 ) 1193 else: 1194 print_error( 1195 f"Single-version regression test failed for {resolved_test_image}" 1196 ) 1197 else: 1198 # Comparison mode: run both target and control images 1199 target_output = output_path / "target" 1200 control_output = output_path / "control" 1201 1202 target_result = _run_with_optional_http_metrics( 1203 connector_image=resolved_test_image, 1204 command=cmd, 1205 output_dir=target_output, 1206 target_or_control=TargetOrControl.TARGET, 1207 enable_http_metrics=enable_http_metrics, 1208 config_path=config_file, 1209 catalog_path=catalog_file, 1210 state_path=state_file, 1211 enable_debug_logs=enable_debug_logs, 1212 ) 1213 1214 control_result = _run_with_optional_http_metrics( 1215 connector_image=resolved_control_image, # type: ignore[arg-type] 1216 command=cmd, 1217 output_dir=control_output, 1218 target_or_control=TargetOrControl.CONTROL, 1219 enable_http_metrics=enable_http_metrics, 1220 config_path=config_file, 1221 catalog_path=catalog_file, 1222 state_path=state_file, 1223 enable_debug_logs=enable_debug_logs, 1224 ) 1225 1226 both_succeeded = target_result["success"] and control_result["success"] 1227 regression_detected = target_result["success"] != control_result["success"] 1228 1229 combined_result = { 1230 "target": target_result, 1231 "control": control_result, 1232 "both_succeeded": both_succeeded, 1233 "regression_detected": regression_detected, 1234 } 1235 1236 print_json(combined_result) 1237 1238 both_failed = not target_result["success"] and not control_result["success"] 1239 1240 write_github_outputs( 1241 { 1242 "success": not regression_detected, 1243 "target_image": resolved_test_image, 1244 "control_image": resolved_control_image, 1245 "command": command, 1246 "target_exit_code": target_result["exit_code"], 1247 "control_exit_code": control_result["exit_code"], 1248 "regression_detected": regression_detected, 1249 "both_failed": both_failed, 1250 } 1251 ) 1252 1253 write_json_output("regression_report", combined_result) 1254 1255 report_path = generate_regression_report( 1256 target_image=resolved_test_image, 1257 control_image=resolved_control_image, # type: ignore[arg-type] 1258 command=command, 1259 target_result=target_result, 1260 control_result=control_result, 1261 output_dir=output_path, 1262 ) 1263 print_success(f"Generated regression report: {report_path}") 1264 1265 # Write report to GITHUB_STEP_SUMMARY (if env var exists) 1266 write_github_summary(report_path.read_text()) 1267 1268 if regression_detected: 1269 print_error( 1270 f"Regression detected between {resolved_test_image} and {resolved_control_image}" 1271 ) 1272 elif both_succeeded: 1273 print_success( 1274 f"Regression test passed for {resolved_test_image} vs {resolved_control_image}" 1275 ) 1276 else: 1277 # Both versions failed, but no regression between them was detected; treat this as a 1278 # test environment issue (e.g., expired credentials, transient API errors, rate limiting). 1279 # Exit successfully since no regression between versions was detected. 1280 print_warning( 1281 f"Both versions failed for {resolved_test_image} vs {resolved_control_image}. " 1282 "No regression detected. This may indicate expired credentials or a transient API issue." 1283 ) 1284 1285 1286@connector_app.command(name="fetch-connection-config") 1287def fetch_connection_config_cmd( 1288 connection_id: Annotated[ 1289 str, 1290 Parameter(help="The UUID of the Airbyte Cloud connection."), 1291 ], 1292 output_path: Annotated[ 1293 str | None, 1294 Parameter( 1295 help="Path to output file or directory. " 1296 "If directory, writes connection-<id>-config.json inside it. " 1297 "Default: platform temp directory (e.g. /tmp/connection-<id>-config.json)" 1298 ), 1299 ] = None, 1300 with_secrets: Annotated[ 1301 bool, 1302 Parameter( 1303 name="--with-secrets", 1304 negative="--no-secrets", 1305 help="If set, fetches unmasked secrets from the internal database. " 1306 "Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or `gcloud auth application-default login`. " 1307 "Must be used with --oc-issue-url.", 1308 ), 1309 ] = False, 1310 oc_issue_url: Annotated[ 1311 str | None, 1312 Parameter( 1313 help="OC issue URL for audit logging. Required when using --with-secrets." 1314 ), 1315 ] = None, 1316) -> None: 1317 """Fetch connection configuration from Airbyte Cloud to a local file. 1318 1319 This command retrieves the source configuration for a given connection ID 1320 and writes it to a JSON file. When `--output-path` is omitted the file is 1321 written to the platform temp directory to avoid accidentally committing 1322 secrets to a git repository. 1323 1324 Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and 1325 AIRBYTE_CLOUD_CLIENT_SECRET environment variables. 1326 1327 When --with-secrets is specified, the command fetches unmasked secrets from 1328 the internal database using the connection-retriever. This additionally requires: 1329 - An OC issue URL for audit logging (--oc-issue-url) 1330 - GCP credentials via `GCP_PROD_DB_ACCESS_CREDENTIALS` env var or `gcloud auth application-default login` 1331 - If `CI=true`: expects `cloud-sql-proxy` running on localhost, or 1332 direct network access to the Cloud SQL instance. 1333 """ 1334 path = Path(output_path) if output_path else None 1335 result = fetch_connection_config( 1336 connection_id=connection_id, 1337 output_path=path, 1338 with_secrets=with_secrets, 1339 oc_issue_url=oc_issue_url, 1340 ) 1341 if result.success: 1342 print_success(result.message) 1343 else: 1344 print_error(result.message) 1345 print_json(result.model_dump()) 1346 1347 1348def _read_json_input( 1349 json_str: str | None, 1350 input_file: str | None, 1351) -> dict: 1352 """Read JSON from a positional arg, --input file, or STDIN (in that priority).""" 1353 if json_str is not None: 1354 return json.loads(json_str) 1355 if input_file is not None: 1356 return json.loads(Path(input_file).read_text()) 1357 if not sys.stdin.isatty(): 1358 return json.loads(sys.stdin.read()) 1359 exit_with_error( 1360 "No JSON input provided. Pass it as a positional argument, " 1361 "via --input <file>, or pipe to STDIN." 1362 ) 1363 raise SystemExit(1) # unreachable, but satisfies type checker 1364 1365 1366@state_app.command(name="get") 1367def get_connection_state( 1368 connection_id: Annotated[ 1369 str, 1370 Parameter(help="The connection ID (UUID) to fetch state for."), 1371 ], 1372 stream_name: Annotated[ 1373 str | None, 1374 Parameter( 1375 help="Optional stream name to filter state for a single stream.", 1376 name="--stream", 1377 ), 1378 ] = None, 1379 stream_namespace: Annotated[ 1380 str | None, 1381 Parameter( 1382 help="Optional stream namespace to narrow the stream filter.", 1383 name="--namespace", 1384 ), 1385 ] = None, 1386 output: Annotated[ 1387 str | None, 1388 Parameter( 1389 help="Path to write the state JSON output to a file.", 1390 name="--output", 1391 ), 1392 ] = None, 1393) -> None: 1394 """Get the current state for an Airbyte Cloud connection.""" 1395 workspace = CloudWorkspace.from_env() 1396 conn = workspace.get_connection(connection_id) 1397 1398 if stream_name is not None: 1399 stream_state = conn.get_stream_state( 1400 stream_name=stream_name, 1401 stream_namespace=stream_namespace, 1402 ) 1403 result = { 1404 "connection_id": connection_id, 1405 "stream_name": stream_name, 1406 "stream_namespace": stream_namespace, 1407 "stream_state": stream_state, 1408 } 1409 else: 1410 result = conn.dump_raw_state() 1411 1412 json_output = json.dumps(result, indent=2, default=str) 1413 if output is not None: 1414 Path(output).write_text(json_output + "\n") 1415 print(f"State written to {output}", file=sys.stderr) 1416 else: 1417 print(json_output) 1418 1419 1420@state_app.command(name="set") 1421def set_connection_state( 1422 connection_id: Annotated[ 1423 str, 1424 Parameter(help="The connection ID (UUID) to update state for."), 1425 ], 1426 state_json: Annotated[ 1427 str | None, 1428 Parameter( 1429 help="The connection state as a JSON string. When --stream is used, " 1430 'this is just the stream\'s state blob (e.g., \'{"cursor": "2024-01-01"}\').' 1431 " Otherwise, must include 'stateType', 'connectionId', and the appropriate " 1432 "state field. Can also be provided via --input or STDIN." 1433 ), 1434 ] = None, 1435 stream_name: Annotated[ 1436 str | None, 1437 Parameter( 1438 help="Optional stream name to update state for a single stream only.", 1439 name="--stream", 1440 ), 1441 ] = None, 1442 stream_namespace: Annotated[ 1443 str | None, 1444 Parameter( 1445 help="Optional stream namespace to identify the stream.", 1446 name="--namespace", 1447 ), 1448 ] = None, 1449 input_file: Annotated[ 1450 str | None, 1451 Parameter( 1452 help="Path to a JSON file containing the state to set.", 1453 name="--input", 1454 ), 1455 ] = None, 1456) -> None: 1457 """Set the state for an Airbyte Cloud connection. 1458 1459 State JSON can be provided as a positional argument, via --input <file>, 1460 or piped through STDIN. 1461 1462 Uses the safe variant that prevents updates while a sync is running. 1463 When --stream is provided, only that stream's state is updated within 1464 the existing connection state. 1465 """ 1466 parsed_state = _read_json_input(state_json, input_file) 1467 workspace = CloudWorkspace.from_env() 1468 conn = workspace.get_connection(connection_id) 1469 1470 if stream_name is not None: 1471 conn.set_stream_state( 1472 stream_name=stream_name, 1473 state_blob_dict=parsed_state, 1474 stream_namespace=stream_namespace, 1475 ) 1476 else: 1477 conn.import_raw_state(parsed_state) 1478 1479 result = conn.dump_raw_state() 1480 json_output = json.dumps(result, indent=2, default=str) 1481 print(json_output) 1482 1483 1484@catalog_app.command(name="get") 1485def get_connection_catalog( 1486 connection_id: Annotated[ 1487 str, 1488 Parameter(help="The connection ID (UUID) to fetch catalog for."), 1489 ], 1490 output: Annotated[ 1491 str | None, 1492 Parameter( 1493 help="Path to write the catalog JSON output to a file.", 1494 name="--output", 1495 ), 1496 ] = None, 1497) -> None: 1498 """Get the configured catalog for an Airbyte Cloud connection.""" 1499 workspace = CloudWorkspace.from_env() 1500 conn = workspace.get_connection(connection_id) 1501 result = conn.dump_raw_catalog() 1502 if result is None: 1503 exit_with_error("No configured catalog found for this connection.") 1504 raise SystemExit(1) # unreachable, but satisfies type checker 1505 1506 json_output = json.dumps(result, indent=2, default=str) 1507 if output is not None: 1508 Path(output).write_text(json_output + "\n") 1509 print(f"Catalog written to {output}", file=sys.stderr) 1510 else: 1511 print(json_output) 1512 1513 1514@catalog_app.command(name="set") 1515def set_connection_catalog( 1516 connection_id: Annotated[ 1517 str, 1518 Parameter(help="The connection ID (UUID) to update catalog for."), 1519 ], 1520 catalog_json: Annotated[ 1521 str | None, 1522 Parameter( 1523 help="The configured catalog as a JSON string. " 1524 "Can also be provided via --input or STDIN." 1525 ), 1526 ] = None, 1527 input_file: Annotated[ 1528 str | None, 1529 Parameter( 1530 help="Path to a JSON file containing the catalog to set.", 1531 name="--input", 1532 ), 1533 ] = None, 1534) -> None: 1535 """Set the configured catalog for an Airbyte Cloud connection. 1536 1537 Catalog JSON can be provided as a positional argument, via --input <file>, 1538 or piped through STDIN. 1539 1540 WARNING: This replaces the entire configured catalog. 1541 """ 1542 parsed_catalog = _read_json_input(catalog_json, input_file) 1543 workspace = CloudWorkspace.from_env() 1544 conn = workspace.get_connection(connection_id) 1545 conn.import_raw_catalog(parsed_catalog) 1546 1547 result = conn.dump_raw_catalog() 1548 if result is None: 1549 exit_with_error("Failed to retrieve catalog after import.") 1550 raise SystemExit(1) # unreachable, but satisfies type checker 1551 1552 json_output = json.dumps(result, indent=2, default=str) 1553 print(json_output) 1554 1555 1556@logs_app.command(name="lookup-cloud-backend-error") 1557def lookup_cloud_backend_error( 1558 error_id: Annotated[ 1559 str, 1560 Parameter( 1561 help=( 1562 "The error ID (UUID) to search for. This is typically returned " 1563 "in API error responses as {'errorId': '...'}" 1564 ) 1565 ), 1566 ], 1567 lookback_days: Annotated[ 1568 int, 1569 Parameter(help="Number of days to look back in logs."), 1570 ] = 7, 1571 min_severity_filter: Annotated[ 1572 GCPSeverity | None, 1573 Parameter( 1574 help="Optional minimum severity level to filter logs.", 1575 ), 1576 ] = None, 1577 raw: Annotated[ 1578 bool, 1579 Parameter(help="Output raw JSON instead of formatted text."), 1580 ] = False, 1581) -> None: 1582 """Look up error details from GCP Cloud Logging by error ID. 1583 1584 When an Airbyte Cloud API returns an error response with only an error ID 1585 (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command 1586 fetches the full stack trace and error details from GCP Cloud Logging. 1587 1588 Requires GCP credentials with Logs Viewer role on the target project. 1589 Set up credentials with: gcloud auth application-default login 1590 """ 1591 print(f"Searching for error ID: {error_id}", file=sys.stderr) 1592 print(f"Lookback days: {lookback_days}", file=sys.stderr) 1593 if min_severity_filter: 1594 print(f"Severity filter: {min_severity_filter}", file=sys.stderr) 1595 print(file=sys.stderr) 1596 1597 result = fetch_error_logs( 1598 error_id=error_id, 1599 lookback_days=lookback_days, 1600 min_severity_filter=min_severity_filter, 1601 ) 1602 1603 if raw: 1604 print_json(result.model_dump()) 1605 return 1606 1607 print(f"Found {result.total_entries_found} log entries", file=sys.stderr) 1608 print(file=sys.stderr) 1609 1610 if result.payloads: 1611 for i, payload in enumerate(result.payloads): 1612 print(f"=== Log Group {i + 1} ===") 1613 print(f"Timestamp: {payload.timestamp}") 1614 print(f"Severity: {payload.severity}") 1615 if payload.resource.labels.pod_name: 1616 print(f"Pod: {payload.resource.labels.pod_name}") 1617 print(f"Lines: {payload.num_log_lines}") 1618 print() 1619 print(payload.message) 1620 print() 1621 elif result.entries: 1622 print("No grouped payloads, showing raw entries:", file=sys.stderr) 1623 for entry in result.entries: 1624 print(f"[{entry.timestamp}] {entry.severity}: {entry.payload}") 1625 else: 1626 print_error("No log entries found for this error ID.")