airbyte_ops_mcp.cli.cloud
CLI commands for Airbyte Cloud operations.
Commands:
airbyte-ops cloud connector get-version-info - Get connector version info airbyte-ops cloud connector set-version-override - Set connector version override airbyte-ops cloud connector clear-version-override - Clear connector version override airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file
CLI reference
The commands below are regenerated by poe docs-generate via cyclopts's
programmatic docs API; see docs/generate_cli.py.
airbyte-ops cloud COMMAND
Airbyte Cloud operations.
Commands:
connection: Connection operations in Airbyte Cloud.connector: Deployed connector operations in Airbyte Cloud.db: Database operations for Airbyte Cloud Prod DB Replica.logs: GCP Cloud Logging operations for Airbyte Cloud.
airbyte-ops cloud connector
Deployed connector operations in Airbyte Cloud.
airbyte-ops cloud connector get-version-info
airbyte-ops cloud connector get-version-info WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE
Get the current version information for a deployed connector.
Parameters:
WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]
airbyte-ops cloud connector set-version-override
airbyte-ops cloud connector set-version-override VERSION REASON ISSUE-URL APPROVAL-COMMENT-URL [ARGS]
Set a version override for a deployed connector.
Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and
AIRBYTE_INTERNAL_ADMIN_USER environment variables.
The --override-level flag selects the scope at which the pin is applied:
actor(default): pins a single deployed connector instance. Requires--workspace-id,--connector-id, and--connector-type.workspace: pins ALL instances of a connector type within a workspace. Requires--workspace-idand--actor-definition-id.organization: pins ALL instances of a connector type across an organization. Requires--organization-idand--actor-definition-id.
The customer_tier_filter gates the operation: the call fails if the
actual tier of the target organization does not match. Use ALL to
proceed regardless of tier (a warning is shown for sensitive tiers).
Parameters:
VERSION, --version: The semver version string to pin to (e.g., '2.1.5-preview.abc1234'). [required]REASON, --reason: Explanation for the override (min 10 characters). [required]ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]OVERRIDE-LEVEL, --override-level: Scope at which to apply the version override: 'actor' (single deployed connector instance), 'workspace' (all instances of a connector type within a workspace), or 'organization' (all instances across an organization). Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).REASON-URL, --reason-url: Optional URL with more context (e.g., issue link).CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]
airbyte-ops cloud connector clear-version-override
airbyte-ops cloud connector clear-version-override ISSUE-URL APPROVAL-COMMENT-URL [ARGS]
Clear a version override from a deployed connector.
Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and
AIRBYTE_INTERNAL_ADMIN_USER environment variables.
The --override-level flag selects the scope at which the pin is removed:
actor(default): clears a single deployed connector instance pin. Requires--workspace-id,--connector-id, and--connector-type.workspace: clears the workspace-level pin for a connector type. Requires--workspace-idand--actor-definition-id.organization: clears the organization-level pin for a connector type. Requires--organization-idand--actor-definition-id.
Parameters:
ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]OVERRIDE-LEVEL, --override-level: Scope at which to clear the version override: 'actor', 'workspace', or 'organization'. Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]
airbyte-ops cloud connector regression-test
airbyte-ops cloud connector regression-test [ARGS]
Run regression tests on connectors.
This command supports two modes:
Comparison mode (skip_compare=False, default): Runs the specified Airbyte protocol command against both the target (new) and control (baseline) connector versions, then compares the results. This helps identify regressions between versions.
Single-version mode (skip_compare=True): Runs the specified Airbyte protocol command against a single connector and validates the output. No comparison is performed.
Results are written to the output directory and to GitHub Actions outputs if running in CI.
You can provide the test image in three ways:
- --test-image: Use a pre-built image from Docker registry
- --connector-name: Build the image locally from source code
- --connection-id: Auto-detect from an Airbyte Cloud connection
You can provide config/catalog either via file paths OR via a connection_id that fetches them from Airbyte Cloud.
Parameters:
SKIP-COMPARE, --skip-compare, --no-skip-compare: If True, skip comparison and run single-version tests only. If False (default), run comparison tests (target vs control). [default: False]TEST-IMAGE, --test-image: Test connector image with tag (e.g., airbyte/source-github:1.0.0). This is the image under test - in comparison mode, it's compared against control_image.CONTROL-IMAGE, --control-image: Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). Ignored ifskip_compare=True.CONNECTOR-NAME, --connector-name: Connector name to build image from source (e.g., 'source-pokeapi'). If provided, builds the image locally with tag 'dev'. For comparison tests (default), this builds the target image. For single-version tests (skip_compare=True), this builds the test image.REPO-ROOT, --repo-root: Path to the airbyte repo root. Required if connector_name is provided and the repo cannot be auto-detected.COMMAND, --command: The Airbyte command to run. [choices: spec, check, discover, read] [default: check]CONNECTION-ID, --connection-id: Airbyte Cloud connection ID to fetch config/catalog from. Mutually exclusive with config-path/catalog-path. If provided, test_image/control_image can be auto-detected.CONFIG-PATH, --config-path: Path to the connector config JSON file.CATALOG-PATH, --catalog-path: Path to the configured catalog JSON file (required for read).STATE-PATH, --state-path: Path to the state JSON file (optional for read).OUTPUT-DIR, --output-dir: Directory to store test artifacts. [default: /tmp/regression_test_artifacts]ENABLE-HTTP-METRICS, --enable-http-metrics, --no-enable-http-metrics: Capture HTTP traffic metrics via mitmproxy (experimental). Requires mitmdump to be installed. Only used in comparison mode. [default: False]SELECTED-STREAMS, --selected-streams: Comma-separated list of stream names to include in the read. Only these streams will be included in the configured catalog. This is useful to limit data volume by testing only specific streams.ENABLE-DEBUG-LOGS, --enable-debug-logs, --no-enable-debug-logs: Enable debug-level logging for regression test output. Also passed asLOG_LEVEL=DEBUGto the connector Docker container. [default: False]WITH-STATE, --with-state, --no-state: Fetch and pass the connection's current state to the read command, producing a warm read instead of a cold read. Defaults toTruewhen--connection-idis provided,Falseotherwise. Has no effect unless the command isread. Ignored when--state-pathis explicitly provided.
airbyte-ops cloud connector fetch-connection-config
airbyte-ops cloud connector fetch-connection-config CONNECTION-ID [ARGS]
Fetch connection configuration from Airbyte Cloud to a local file.
This command retrieves the source configuration for a given connection ID
and writes it to a JSON file. When --output-path is omitted the file is
written to the platform temp directory to avoid accidentally committing
secrets to a git repository.
Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET environment variables.
When --with-secrets is specified, the command fetches unmasked secrets from the internal database using the connection-retriever. This additionally requires:
- An OC issue URL for audit logging (--oc-issue-url)
- GCP credentials via
GCP_PROD_DB_ACCESS_CREDENTIALSenv var orgcloud auth application-default login - If
CI=true: expectscloud-sql-proxyrunning on localhost, or direct network access to the Cloud SQL instance.
Parameters:
CONNECTION-ID, --connection-id: The UUID of the Airbyte Cloud connection. [required]OUTPUT-PATH, --output-path: Path to output file or directory. If directory, writes connection--config.json inside it. Default: platform temp directory (e.g. /tmp/connection- -config.json) WITH-SECRETS, --with-secrets, --no-secrets: If set, fetches unmasked secrets from the internal database. Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var orgcloud auth application-default login. Must be used with --oc-issue-url. [default: False]OC-ISSUE-URL, --oc-issue-url: OC issue URL for audit logging. Required when using --with-secrets.
airbyte-ops cloud db
Database operations for Airbyte Cloud Prod DB Replica.
airbyte-ops cloud db start-proxy
airbyte-ops cloud db start-proxy [ARGS]
Start the Cloud SQL Proxy for database access.
This command starts the Cloud SQL Auth Proxy to enable connections to the Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.
By default, runs as a daemon (background process). Use --no-daemon to run in foreground mode where you can see logs and stop with Ctrl+C.
Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, which should contain the service account JSON credentials.
After starting the proxy, set these environment variables to use database tools: export USE_CLOUD_SQL_PROXY=1 export DB_PORT={port}
Parameters:
PORT, --port: Port for the Cloud SQL Proxy to listen on. [default: 15432]DAEMON, --daemon, --no-daemon: Run as daemon in background (default). Use --no-daemon for foreground. [default: True]
airbyte-ops cloud db stop-proxy
airbyte-ops cloud db stop-proxy
Stop the Cloud SQL Proxy daemon.
This command stops a Cloud SQL Proxy that was started with 'start-proxy'. It reads the PID from the PID file and sends a SIGTERM signal to stop the process.
airbyte-ops cloud connection
Connection operations in Airbyte Cloud.
airbyte-ops cloud connection state
Connection state operations in Airbyte Cloud.
Commands:
get: Get the current state for an Airbyte Cloud connection.set: Set the state for an Airbyte Cloud connection.
airbyte-ops cloud connection state get
airbyte-ops cloud connection state get CONNECTION-ID [ARGS]
Get the current state for an Airbyte Cloud connection.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch state for. [required]STREAM, --stream: Optional stream name to filter state for a single stream.NAMESPACE, --namespace: Optional stream namespace to narrow the stream filter.OUTPUT, --output: Path to write the state JSON output to a file.
airbyte-ops cloud connection state set
airbyte-ops cloud connection state set CONNECTION-ID [ARGS]
Set the state for an Airbyte Cloud connection.
State JSON can be provided as a positional argument, via --input
Uses the safe variant that prevents updates while a sync is running. When --stream is provided, only that stream's state is updated within the existing connection state.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]STATE-JSON, --state-json: The connection state as a JSON string. When --stream is used, this is just the stream's state blob (e.g., '{"cursor": "2024-01-01"}'). Otherwise, must include 'stateType', 'connectionId', and the appropriate state field. Can also be provided via --input or STDIN.STREAM, --stream: Optional stream name to update state for a single stream only.NAMESPACE, --namespace: Optional stream namespace to identify the stream.INPUT, --input: Path to a JSON file containing the state to set.
airbyte-ops cloud connection catalog
Connection catalog operations in Airbyte Cloud.
Commands:
get: Get the configured catalog for an Airbyte Cloud connection.set: Set the configured catalog for an Airbyte Cloud connection.
airbyte-ops cloud connection catalog get
airbyte-ops cloud connection catalog get CONNECTION-ID [ARGS]
Get the configured catalog for an Airbyte Cloud connection.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch catalog for. [required]OUTPUT, --output: Path to write the catalog JSON output to a file.
airbyte-ops cloud connection catalog set
airbyte-ops cloud connection catalog set CONNECTION-ID [ARGS]
Set the configured catalog for an Airbyte Cloud connection.
Catalog JSON can be provided as a positional argument, via --input
WARNING: This replaces the entire configured catalog.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update catalog for. [required]CATALOG-JSON, --catalog-json: The configured catalog as a JSON string. Can also be provided via --input or STDIN.INPUT, --input: Path to a JSON file containing the catalog to set.
airbyte-ops cloud logs
GCP Cloud Logging operations for Airbyte Cloud.
airbyte-ops cloud logs lookup-cloud-backend-error
airbyte-ops cloud logs lookup-cloud-backend-error ERROR-ID [ARGS]
Look up error details from GCP Cloud Logging by error ID.
When an Airbyte Cloud API returns an error response with only an error ID (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command fetches the full stack trace and error details from GCP Cloud Logging.
Requires GCP credentials with Logs Viewer role on the target project. Set up credentials with: gcloud auth application-default login
Parameters:
ERROR-ID, --error-id: The error ID (UUID) to search for. This is typically returned in API error responses as {'errorId': '...'} [required]LOOKBACK-DAYS, --lookback-days: Number of days to look back in logs. [default: 7]MIN-SEVERITY-FILTER, --min-severity-filter: Optional minimum severity level to filter logs. [choices: debug, info, notice, warning, error, critical, alert, emergency]RAW, --raw, --no-raw: Output raw JSON instead of formatted text. [default: False]
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""CLI commands for Airbyte Cloud operations. 3 4Commands: 5 airbyte-ops cloud connector get-version-info - Get connector version info 6 airbyte-ops cloud connector set-version-override - Set connector version override 7 airbyte-ops cloud connector clear-version-override - Clear connector version override 8 airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) 9 airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file 10 11## CLI reference 12 13The commands below are regenerated by `poe docs-generate` via cyclopts's 14programmatic docs API; see `docs/generate_cli.py`. 15 16.. include:: ../../../docs/generated/cli/cloud.md 17 :start-line: 2 18""" 19 20from __future__ import annotations 21 22# Hide Python-level members from the pdoc page for this module; the rendered 23# docs for this CLI group come entirely from the grafted `.. include::` in 24# the module docstring above. 25__all__: list[str] = [] 26 27import json 28import logging 29import os 30import shutil 31import signal 32import socket 33import subprocess 34import sys 35import tempfile 36import time 37from pathlib import Path 38from typing import Annotated, Literal 39 40import requests 41import yaml 42from airbyte.cloud.workspaces import CloudWorkspace 43from airbyte.exceptions import PyAirbyteInputError 44from airbyte_cdk.models.connector_metadata import MetadataFile 45from airbyte_cdk.utils.connector_paths import find_connector_root_from_name 46from airbyte_cdk.utils.docker import build_connector_image, verify_docker_installation 47from airbyte_protocol.models import ConfiguredAirbyteCatalog 48from cyclopts import Parameter 49 50from airbyte_ops_mcp.cli._base import App, app 51from airbyte_ops_mcp.cli._shared import ( 52 exit_with_error, 53 print_error, 54 print_json, 55 print_success, 56 print_warning, 57) 58from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError 59from airbyte_ops_mcp.cloud_admin.connection_config import fetch_connection_config 60from airbyte_ops_mcp.cloud_admin.registry_lookup import ( 61 resolve_definition_id_to_canonical_info, 62) 63from airbyte_ops_mcp.cloud_admin.version_overrides import ( 64 ResolvedCloudAuth, 65 get_connector_version_info, 66 set_actor_version_override, 67 set_organization_version_override, 68 set_workspace_version_override, 69) 70from airbyte_ops_mcp.constants import ( 71 CLOUD_SQL_INSTANCE, 72 CLOUD_SQL_PROXY_PID_FILE, 73 DEFAULT_CLOUD_SQL_PROXY_PORT, 74 ENV_GCP_PROD_DB_ACCESS_CREDENTIALS, 75) 76from airbyte_ops_mcp.gcp_logs import GCPSeverity, fetch_error_logs 77from airbyte_ops_mcp.regression_tests.cdk_secrets import get_first_config_from_secrets 78from airbyte_ops_mcp.regression_tests.ci_output import ( 79 generate_regression_report, 80 generate_single_version_report, 81 write_github_output, 82 write_github_outputs, 83 write_github_summary, 84 write_json_output, 85 write_test_summary, 86) 87from airbyte_ops_mcp.regression_tests.config_overrides import ( 88 filter_configured_catalog_file, 89) 90from airbyte_ops_mcp.regression_tests.connection_fetcher import ( 91 fetch_connection_data, 92 fetch_connection_state, 93 save_connection_data_to_files, 94) 95from airbyte_ops_mcp.regression_tests.connection_secret_retriever import ( 96 SecretRetrievalError, 97 enrich_config_with_secrets, 98 should_use_secret_retriever, 99) 100from airbyte_ops_mcp.regression_tests.connector_runner import ( 101 ConnectorRunner, 102 ensure_image_available, 103) 104from airbyte_ops_mcp.regression_tests.http_metrics import ( 105 MitmproxyManager, 106 parse_http_dump, 107) 108from airbyte_ops_mcp.regression_tests.models import ( 109 Command, 110 ConnectorUnderTest, 111 ExecutionInputs, 112 TargetOrControl, 113) 114from airbyte_ops_mcp.telemetry import track_regression_test 115 116# Path to connectors directory within the airbyte repo 117CONNECTORS_SUBDIR = Path("airbyte-integrations") / "connectors" 118 119# Create the cloud sub-app 120cloud_app = App(name="cloud", help="Airbyte Cloud operations.") 121app.command(cloud_app) 122 123# Create the connector sub-app under cloud 124connector_app = App( 125 name="connector", help="Deployed connector operations in Airbyte Cloud." 126) 127cloud_app.command(connector_app) 128 129# Create the db sub-app under cloud 130db_app = App(name="db", help="Database operations for Airbyte Cloud Prod DB Replica.") 131cloud_app.command(db_app) 132 133# Create the connection sub-app under cloud 134connection_app = App(name="connection", help="Connection operations in Airbyte Cloud.") 135cloud_app.command(connection_app) 136 137# Create the state sub-app under connection 138state_app = App(name="state", help="Connection state operations in Airbyte Cloud.") 139connection_app.command(state_app) 140 141# Create the catalog sub-app under connection 142catalog_app = App( 143 name="catalog", help="Connection catalog operations in Airbyte Cloud." 144) 145connection_app.command(catalog_app) 146 147# Create the logs sub-app under cloud 148logs_app = App(name="logs", help="GCP Cloud Logging operations for Airbyte Cloud.") 149cloud_app.command(logs_app) 150 151 152@db_app.command(name="start-proxy") 153def start_proxy( 154 port: Annotated[ 155 int, 156 Parameter(help="Port for the Cloud SQL Proxy to listen on."), 157 ] = DEFAULT_CLOUD_SQL_PROXY_PORT, 158 daemon: Annotated[ 159 bool, 160 Parameter( 161 help="Run as daemon in background (default). Use --no-daemon for foreground." 162 ), 163 ] = True, 164) -> None: 165 """Start the Cloud SQL Proxy for database access. 166 167 This command starts the Cloud SQL Auth Proxy to enable connections to the 168 Airbyte Cloud Prod DB Replica. The proxy is required for database query tools. 169 170 By default, runs as a daemon (background process). Use --no-daemon to run in 171 foreground mode where you can see logs and stop with Ctrl+C. 172 173 Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, 174 which should contain the service account JSON credentials. 175 176 After starting the proxy, set these environment variables to use database tools: 177 export USE_CLOUD_SQL_PROXY=1 178 export DB_PORT={port} 179 180 Example: 181 airbyte-ops cloud db start-proxy 182 airbyte-ops cloud db start-proxy --port 15432 183 airbyte-ops cloud db start-proxy --no-daemon 184 """ 185 # Check if proxy is already running on the requested port (idempotency) 186 try: 187 with socket.create_connection(("127.0.0.1", port), timeout=0.5): 188 # Something is already listening on this port 189 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 190 pid_info = "" 191 if pid_file.exists(): 192 pid_info = f" (PID: {pid_file.read_text().strip()})" 193 print_success( 194 f"Cloud SQL Proxy is already running on port {port}{pid_info}" 195 ) 196 print_success("") 197 print_success("To use database tools, set these environment variables:") 198 print_success(" export USE_CLOUD_SQL_PROXY=1") 199 print_success(f" export DB_PORT={port}") 200 return 201 except (OSError, TimeoutError, ConnectionRefusedError): 202 pass # Port not in use, proceed with starting proxy 203 204 # Check if cloud-sql-proxy is installed 205 proxy_path = shutil.which("cloud-sql-proxy") 206 if not proxy_path: 207 exit_with_error( 208 "cloud-sql-proxy not found in PATH. " 209 "Install it from: https://cloud.google.com/sql/docs/mysql/sql-proxy" 210 ) 211 212 # Get credentials from environment 213 creds_json = os.getenv(ENV_GCP_PROD_DB_ACCESS_CREDENTIALS) 214 if not creds_json: 215 exit_with_error( 216 f"{ENV_GCP_PROD_DB_ACCESS_CREDENTIALS} environment variable is not set. " 217 "This should contain the GCP service account JSON credentials." 218 ) 219 220 # Build the command using --json-credentials to avoid writing to disk 221 cmd = [ 222 proxy_path, 223 CLOUD_SQL_INSTANCE, 224 f"--port={port}", 225 f"--json-credentials={creds_json}", 226 ] 227 228 print_success(f"Starting Cloud SQL Proxy on port {port}...") 229 print_success(f"Instance: {CLOUD_SQL_INSTANCE}") 230 print_success("") 231 print_success("To use database tools, set these environment variables:") 232 print_success(" export USE_CLOUD_SQL_PROXY=1") 233 print_success(f" export DB_PORT={port}") 234 print_success("") 235 236 if daemon: 237 # Run in background (daemon mode) with log file for diagnostics 238 log_file_path = Path("/tmp/airbyte-cloud-sql-proxy.log") 239 log_file = log_file_path.open("ab") 240 process = subprocess.Popen( 241 cmd, 242 stdout=subprocess.DEVNULL, 243 stderr=log_file, 244 start_new_session=True, 245 ) 246 247 # Brief wait to verify the process started successfully 248 time.sleep(0.5) 249 if process.poll() is not None: 250 # Process exited immediately - read any error output 251 log_file.close() 252 error_output = "" 253 if log_file_path.exists(): 254 error_output = log_file_path.read_text()[-1000:] # Last 1000 chars 255 exit_with_error( 256 f"Cloud SQL Proxy failed to start (exit code: {process.returncode}).\n" 257 f"Check logs at {log_file_path}\n" 258 f"Recent output: {error_output}" 259 ) 260 261 # Write PID to file for stop-proxy command 262 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 263 pid_file.write_text(str(process.pid)) 264 print_success(f"Cloud SQL Proxy started as daemon (PID: {process.pid})") 265 print_success(f"Logs: {log_file_path}") 266 print_success("To stop: airbyte-ops cloud db stop-proxy") 267 else: 268 # Run in foreground - replace current process 269 # Signals (Ctrl+C) will be handled directly by the cloud-sql-proxy process 270 print_success("Running in foreground. Press Ctrl+C to stop the proxy.") 271 print_success("") 272 os.execv(proxy_path, cmd) 273 274 275@db_app.command(name="stop-proxy") 276def stop_proxy() -> None: 277 """Stop the Cloud SQL Proxy daemon. 278 279 This command stops a Cloud SQL Proxy that was started with 'start-proxy'. 280 It reads the PID from the PID file and sends a SIGTERM signal to stop the process. 281 282 Example: 283 airbyte-ops cloud db stop-proxy 284 """ 285 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 286 287 if not pid_file.exists(): 288 exit_with_error( 289 f"PID file not found at {CLOUD_SQL_PROXY_PID_FILE}. " 290 "No Cloud SQL Proxy daemon appears to be running." 291 ) 292 293 pid_str = pid_file.read_text().strip() 294 if not pid_str.isdigit(): 295 pid_file.unlink() 296 exit_with_error(f"Invalid PID in {CLOUD_SQL_PROXY_PID_FILE}: {pid_str}") 297 298 pid = int(pid_str) 299 300 # Check if process is still running 301 try: 302 os.kill(pid, 0) # Signal 0 just checks if process exists 303 except ProcessLookupError: 304 pid_file.unlink() 305 print_success( 306 f"Cloud SQL Proxy (PID: {pid}) is not running. Cleaned up PID file." 307 ) 308 return 309 except PermissionError: 310 exit_with_error(f"Permission denied to check process {pid}.") 311 312 # Send SIGTERM to stop the process 313 try: 314 os.kill(pid, signal.SIGTERM) 315 print_success(f"Sent SIGTERM to Cloud SQL Proxy (PID: {pid}).") 316 except ProcessLookupError: 317 print_success(f"Cloud SQL Proxy (PID: {pid}) already stopped.") 318 except PermissionError: 319 exit_with_error(f"Permission denied to stop process {pid}.") 320 321 # Clean up PID file 322 pid_file.unlink(missing_ok=True) 323 print_success("Cloud SQL Proxy stopped.") 324 325 326@connector_app.command(name="get-version-info") 327def get_version_info( 328 workspace_id: Annotated[ 329 str, 330 Parameter(help="The Airbyte Cloud workspace ID."), 331 ], 332 connector_id: Annotated[ 333 str, 334 Parameter(help="The ID of the deployed connector (source or destination)."), 335 ], 336 connector_type: Annotated[ 337 Literal["source", "destination"], 338 Parameter(help="The type of connector."), 339 ], 340) -> None: 341 """Get the current version information for a deployed connector.""" 342 result = get_connector_version_info( 343 auth=_resolve_cli_cloud_auth(), 344 workspace_id=workspace_id, 345 actor_id=connector_id, 346 actor_type=connector_type, 347 ) 348 print_json(result.model_dump()) 349 350 351_OverrideLevel = Literal["actor", "workspace", "organization"] 352_TierFilter = Literal["TIER_0", "TIER_1", "TIER_2", "ALL"] 353 354 355def _resolve_cli_cloud_auth() -> ResolvedCloudAuth: 356 """Resolve Airbyte Cloud auth from environment variables for CLI use. 357 358 Mirrors the priority order the MCP server uses: 359 360 1. `AIRBYTE_CLOUD_BEARER_TOKEN` 361 2. `AIRBYTE_CLOUD_CLIENT_ID` + `AIRBYTE_CLOUD_CLIENT_SECRET` 362 363 Raises a CLI error via `exit_with_error` if no usable credentials are present. 364 """ 365 bearer_token = os.environ.get("AIRBYTE_CLOUD_BEARER_TOKEN") 366 if bearer_token: 367 return ResolvedCloudAuth(bearer_token=bearer_token) 368 369 client_id = os.environ.get("AIRBYTE_CLOUD_CLIENT_ID") 370 client_secret = os.environ.get("AIRBYTE_CLOUD_CLIENT_SECRET") 371 if client_id and client_secret: 372 return ResolvedCloudAuth(client_id=client_id, client_secret=client_secret) 373 374 exit_with_error( 375 "Missing Airbyte Cloud credentials. Set AIRBYTE_CLOUD_BEARER_TOKEN, or " 376 "both AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET." 377 ) 378 379 380def _validate_override_scope_args( 381 *, 382 override_level: _OverrideLevel, 383 workspace_id: str | None, 384 organization_id: str | None, 385 connector_id: str | None, 386 connector_type: Literal["source", "destination"] | None, 387 actor_definition_id: str | None, 388) -> None: 389 """Validate scope-specific arguments for `set/clear-version-override`. 390 391 Calls `exit_with_error` (which terminates the process) if the supplied 392 arguments are inconsistent with the requested `override_level`. 393 """ 394 if override_level == "actor": 395 missing = [ 396 name 397 for name, value in ( 398 ("--workspace-id", workspace_id), 399 ("--connector-id", connector_id), 400 ("--connector-type", connector_type), 401 ) 402 if not value 403 ] 404 if missing: 405 exit_with_error( 406 "Override level 'actor' requires " + ", ".join(missing) + "." 407 ) 408 if actor_definition_id is not None: 409 exit_with_error( 410 "Override level 'actor' must not be combined with " 411 "--actor-definition-id." 412 ) 413 if organization_id is not None: 414 exit_with_error( 415 "Override level 'actor' must not be combined with --organization-id." 416 ) 417 return 418 419 if override_level == "workspace": 420 missing = [ 421 name 422 for name, value in ( 423 ("--workspace-id", workspace_id), 424 ("--actor-definition-id", actor_definition_id), 425 ) 426 if not value 427 ] 428 if missing: 429 exit_with_error( 430 "Override level 'workspace' requires " + ", ".join(missing) + "." 431 ) 432 if connector_id is not None or connector_type is not None: 433 exit_with_error( 434 "Override level 'workspace' must not be combined with " 435 "--connector-id or --connector-type." 436 ) 437 if organization_id is not None: 438 exit_with_error( 439 "Override level 'workspace' must not be combined with " 440 "--organization-id." 441 ) 442 return 443 444 # override_level == "organization" 445 missing = [ 446 name 447 for name, value in ( 448 ("--organization-id", organization_id), 449 ("--actor-definition-id", actor_definition_id), 450 ) 451 if not value 452 ] 453 if missing: 454 exit_with_error( 455 "Override level 'organization' requires " + ", ".join(missing) + "." 456 ) 457 if connector_id is not None or connector_type is not None: 458 exit_with_error( 459 "Override level 'organization' must not be combined with " 460 "--connector-id or --connector-type." 461 ) 462 if workspace_id is not None: 463 exit_with_error( 464 "Override level 'organization' must not be combined with --workspace-id." 465 ) 466 467 468def _dispatch_version_override( 469 *, 470 override_level: _OverrideLevel, 471 workspace_id: str | None, 472 organization_id: str | None, 473 connector_id: str | None, 474 connector_type: Literal["source", "destination"] | None, 475 actor_definition_id: str | None, 476 version: str | None, 477 unset: bool, 478 reason: str | None, 479 reason_url: str | None, 480 issue_url: str, 481 approval_comment_url: str, 482 ai_agent_session_url: str | None, 483 customer_tier_filter: _TierFilter, 484) -> None: 485 """Route a version-override request to the helper for `override_level`. 486 487 Resolves `--actor-definition-id` to its canonical name and connector type 488 via the cloud registry for `workspace`/`organization` scopes, then calls 489 the appropriate MCP-tool function. Prints the operation result as JSON. 490 """ 491 _validate_override_scope_args( 492 override_level=override_level, 493 workspace_id=workspace_id, 494 organization_id=organization_id, 495 connector_id=connector_id, 496 connector_type=connector_type, 497 actor_definition_id=actor_definition_id, 498 ) 499 500 auth = _resolve_cli_cloud_auth() 501 502 if override_level == "actor": 503 assert workspace_id is not None 504 assert connector_id is not None 505 assert connector_type is not None 506 try: 507 actor_result = set_actor_version_override( 508 auth=auth, 509 workspace_id=workspace_id, 510 actor_id=connector_id, 511 actor_type=connector_type, 512 approval_comment_url=approval_comment_url, 513 version=version, 514 unset=unset, 515 override_reason=reason, 516 override_reason_reference_url=reason_url, 517 issue_url=issue_url, 518 ai_agent_session_url=ai_agent_session_url, 519 customer_tier_filter=customer_tier_filter, 520 ) 521 except (PyAirbyteInputError, CloudAuthError) as e: 522 exit_with_error(str(e)) 523 _print_override_result(actor_result.success, actor_result.message) 524 print_json(actor_result.model_dump()) 525 return 526 527 assert actor_definition_id is not None 528 try: 529 canonical_name, resolved_connector_type = ( 530 resolve_definition_id_to_canonical_info(actor_definition_id) 531 ) 532 except (PyAirbyteInputError, requests.RequestException) as e: 533 exit_with_error(f"Failed to resolve --actor-definition-id: {e}") 534 typed_connector_type: Literal["source", "destination"] = ( 535 "source" if resolved_connector_type == "source" else "destination" 536 ) 537 538 if override_level == "workspace": 539 assert workspace_id is not None 540 try: 541 ws_result = set_workspace_version_override( 542 auth=auth, 543 workspace_id=workspace_id, 544 connector_name=canonical_name, 545 connector_type=typed_connector_type, 546 approval_comment_url=approval_comment_url, 547 version=version, 548 unset=unset, 549 override_reason=reason, 550 override_reason_reference_url=reason_url, 551 issue_url=issue_url, 552 ai_agent_session_url=ai_agent_session_url, 553 customer_tier_filter=customer_tier_filter, 554 ) 555 except (PyAirbyteInputError, CloudAuthError) as e: 556 exit_with_error(str(e)) 557 _print_override_result(ws_result.success, ws_result.message) 558 print_json(ws_result.model_dump()) 559 return 560 561 # override_level == "organization" 562 assert organization_id is not None 563 try: 564 org_result = set_organization_version_override( 565 auth=auth, 566 organization_id=organization_id, 567 connector_name=canonical_name, 568 connector_type=typed_connector_type, 569 approval_comment_url=approval_comment_url, 570 version=version, 571 unset=unset, 572 override_reason=reason, 573 override_reason_reference_url=reason_url, 574 issue_url=issue_url, 575 ai_agent_session_url=ai_agent_session_url, 576 customer_tier_filter=customer_tier_filter, 577 ) 578 except (PyAirbyteInputError, CloudAuthError) as e: 579 exit_with_error(str(e)) 580 _print_override_result(org_result.success, org_result.message) 581 print_json(org_result.model_dump()) 582 583 584def _print_override_result(success: bool, message: str) -> None: 585 """Emit an override-operation status message via the shared CLI helpers.""" 586 if success: 587 print_success(message) 588 else: 589 print_error(message) 590 591 592@connector_app.command(name="set-version-override") 593def set_version_override( 594 version: Annotated[ 595 str, 596 Parameter( 597 help="The semver version string to pin to (e.g., '2.1.5-preview.abc1234')." 598 ), 599 ], 600 reason: Annotated[ 601 str, 602 Parameter(help="Explanation for the override (min 10 characters)."), 603 ], 604 issue_url: Annotated[ 605 str, 606 Parameter(help="GitHub issue URL providing context for this operation."), 607 ], 608 approval_comment_url: Annotated[ 609 str, 610 Parameter( 611 help="Slack approval record URL where admin authorized this deployment." 612 ), 613 ], 614 override_level: Annotated[ 615 _OverrideLevel, 616 Parameter( 617 help=( 618 "Scope at which to apply the version override: 'actor' (single " 619 "deployed connector instance), 'workspace' (all instances of a " 620 "connector type within a workspace), or 'organization' (all " 621 "instances across an organization). Defaults to 'actor'." 622 ), 623 ), 624 ] = "actor", 625 workspace_id: Annotated[ 626 str | None, 627 Parameter( 628 help=( 629 "The Airbyte Cloud workspace ID. Required for 'actor' and " 630 "'workspace' override levels." 631 ) 632 ), 633 ] = None, 634 organization_id: Annotated[ 635 str | None, 636 Parameter( 637 help=( 638 "The Airbyte Cloud organization ID. Required for " 639 "'organization' override level." 640 ) 641 ), 642 ] = None, 643 connector_id: Annotated[ 644 str | None, 645 Parameter( 646 help=( 647 "The ID of the deployed connector (source or destination). " 648 "Required for 'actor' override level." 649 ) 650 ), 651 ] = None, 652 connector_type: Annotated[ 653 Literal["source", "destination"] | None, 654 Parameter(help="The type of connector. Required for 'actor' override level."), 655 ] = None, 656 actor_definition_id: Annotated[ 657 str | None, 658 Parameter( 659 help=( 660 "The connector definition UUID. Required for 'workspace' and " 661 "'organization' override levels." 662 ) 663 ), 664 ] = None, 665 ai_agent_session_url: Annotated[ 666 str | None, 667 Parameter( 668 help="URL to AI agent session driving this operation (for auditability)." 669 ), 670 ] = None, 671 reason_url: Annotated[ 672 str | None, 673 Parameter(help="Optional URL with more context (e.g., issue link)."), 674 ] = None, 675 customer_tier_filter: Annotated[ 676 _TierFilter, 677 Parameter( 678 help=( 679 "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 680 "The operation will be rejected if the actual customer tier does not match. " 681 "Defaults to 'TIER_2' (non-sensitive customers)." 682 ), 683 ), 684 ] = "TIER_2", 685) -> None: 686 """Set a version override for a deployed connector. 687 688 Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and 689 `AIRBYTE_INTERNAL_ADMIN_USER` environment variables. 690 691 The `--override-level` flag selects the scope at which the pin is applied: 692 693 - `actor` (default): pins a single deployed connector instance. Requires 694 `--workspace-id`, `--connector-id`, and `--connector-type`. 695 - `workspace`: pins ALL instances of a connector type within a workspace. 696 Requires `--workspace-id` and `--actor-definition-id`. 697 - `organization`: pins ALL instances of a connector type across an 698 organization. Requires `--organization-id` and `--actor-definition-id`. 699 700 The `customer_tier_filter` gates the operation: the call fails if the 701 actual tier of the target organization does not match. Use `ALL` to 702 proceed regardless of tier (a warning is shown for sensitive tiers). 703 """ 704 _dispatch_version_override( 705 override_level=override_level, 706 workspace_id=workspace_id, 707 organization_id=organization_id, 708 connector_id=connector_id, 709 connector_type=connector_type, 710 actor_definition_id=actor_definition_id, 711 version=version, 712 unset=False, 713 reason=reason, 714 reason_url=reason_url, 715 issue_url=issue_url, 716 approval_comment_url=approval_comment_url, 717 ai_agent_session_url=ai_agent_session_url, 718 customer_tier_filter=customer_tier_filter, 719 ) 720 721 722@connector_app.command(name="clear-version-override") 723def clear_version_override( 724 issue_url: Annotated[ 725 str, 726 Parameter(help="GitHub issue URL providing context for this operation."), 727 ], 728 approval_comment_url: Annotated[ 729 str, 730 Parameter( 731 help="Slack approval record URL where admin authorized this deployment." 732 ), 733 ], 734 override_level: Annotated[ 735 _OverrideLevel, 736 Parameter( 737 help=( 738 "Scope at which to clear the version override: 'actor', " 739 "'workspace', or 'organization'. Defaults to 'actor'." 740 ), 741 ), 742 ] = "actor", 743 workspace_id: Annotated[ 744 str | None, 745 Parameter( 746 help=( 747 "The Airbyte Cloud workspace ID. Required for 'actor' and " 748 "'workspace' override levels." 749 ) 750 ), 751 ] = None, 752 organization_id: Annotated[ 753 str | None, 754 Parameter( 755 help=( 756 "The Airbyte Cloud organization ID. Required for " 757 "'organization' override level." 758 ) 759 ), 760 ] = None, 761 connector_id: Annotated[ 762 str | None, 763 Parameter( 764 help=( 765 "The ID of the deployed connector (source or destination). " 766 "Required for 'actor' override level." 767 ) 768 ), 769 ] = None, 770 connector_type: Annotated[ 771 Literal["source", "destination"] | None, 772 Parameter(help="The type of connector. Required for 'actor' override level."), 773 ] = None, 774 actor_definition_id: Annotated[ 775 str | None, 776 Parameter( 777 help=( 778 "The connector definition UUID. Required for 'workspace' and " 779 "'organization' override levels." 780 ) 781 ), 782 ] = None, 783 ai_agent_session_url: Annotated[ 784 str | None, 785 Parameter( 786 help="URL to AI agent session driving this operation (for auditability)." 787 ), 788 ] = None, 789 customer_tier_filter: Annotated[ 790 _TierFilter, 791 Parameter( 792 help=( 793 "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 794 "The operation will be rejected if the actual customer tier does not match. " 795 "Defaults to 'TIER_2' (non-sensitive customers)." 796 ), 797 ), 798 ] = "TIER_2", 799) -> None: 800 """Clear a version override from a deployed connector. 801 802 Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and 803 `AIRBYTE_INTERNAL_ADMIN_USER` environment variables. 804 805 The `--override-level` flag selects the scope at which the pin is removed: 806 807 - `actor` (default): clears a single deployed connector instance pin. 808 Requires `--workspace-id`, `--connector-id`, and `--connector-type`. 809 - `workspace`: clears the workspace-level pin for a connector type. 810 Requires `--workspace-id` and `--actor-definition-id`. 811 - `organization`: clears the organization-level pin for a connector type. 812 Requires `--organization-id` and `--actor-definition-id`. 813 """ 814 _dispatch_version_override( 815 override_level=override_level, 816 workspace_id=workspace_id, 817 organization_id=organization_id, 818 connector_id=connector_id, 819 connector_type=connector_type, 820 actor_definition_id=actor_definition_id, 821 version=None, 822 unset=True, 823 reason=None, 824 reason_url=None, 825 issue_url=issue_url, 826 approval_comment_url=approval_comment_url, 827 ai_agent_session_url=ai_agent_session_url, 828 customer_tier_filter=customer_tier_filter, 829 ) 830 831 832def _load_json_file(file_path: Path) -> dict | None: 833 """Load a JSON file and return its contents. 834 835 Returns None if the file doesn't exist or contains invalid JSON. 836 """ 837 if not file_path.exists(): 838 return None 839 try: 840 return json.loads(file_path.read_text()) 841 except json.JSONDecodeError as e: 842 print_error(f"Failed to parse JSON in file: {file_path}\nError: {e}") 843 return None 844 845 846def _run_connector_command( 847 connector_image: str, 848 command: Command, 849 output_dir: Path, 850 target_or_control: TargetOrControl, 851 config_path: Path | None = None, 852 catalog_path: Path | None = None, 853 state_path: Path | None = None, 854 proxy_url: str | None = None, 855 enable_debug_logs: bool = False, 856) -> dict: 857 """Run a connector command and return results as a dict. 858 859 Args: 860 connector_image: Full connector image name with tag. 861 command: The Airbyte command to run. 862 output_dir: Directory to store output files. 863 target_or_control: Whether this is target or control version. 864 config_path: Path to connector config JSON file. 865 catalog_path: Path to configured catalog JSON file. 866 state_path: Path to state JSON file. 867 proxy_url: Optional HTTP proxy URL for traffic capture. 868 enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container. 869 870 Returns: 871 Dictionary with execution results. 872 """ 873 connector = ConnectorUnderTest.from_image_name(connector_image, target_or_control) 874 875 config = _load_json_file(config_path) if config_path else None 876 state = _load_json_file(state_path) if state_path else None 877 878 configured_catalog = None 879 if catalog_path and catalog_path.exists(): 880 catalog_json = catalog_path.read_text() 881 configured_catalog = ConfiguredAirbyteCatalog.parse_raw(catalog_json) 882 883 # Pass log level as an environment variable to the connector container 884 container_env: dict[str, str] = {} 885 if enable_debug_logs: 886 container_env["LOG_LEVEL"] = "DEBUG" 887 888 execution_inputs = ExecutionInputs( 889 connector_under_test=connector, 890 command=command, 891 output_dir=output_dir, 892 config=config, 893 configured_catalog=configured_catalog, 894 state=state, 895 environment_variables=container_env or None, 896 ) 897 898 runner = ConnectorRunner(execution_inputs, proxy_url=proxy_url) 899 result = runner.run() 900 901 result.save_artifacts(output_dir) 902 903 return { 904 "connector": connector_image, 905 "command": command.value, 906 "success": result.success, 907 "exit_code": result.exit_code, 908 "stdout_file": str(result.stdout_file_path), 909 "stderr_file": str(result.stderr_file_path), 910 "message_counts": { 911 k.value: v for k, v in result.get_message_count_per_type().items() 912 }, 913 "record_counts_per_stream": result.get_record_count_per_stream(), 914 } 915 916 917def _build_connector_image_from_source( 918 connector_name: str, 919 repo_root: Path | None = None, 920 tag: str = "dev", 921) -> str | None: 922 """Build a connector image from source code. 923 924 Args: 925 connector_name: Name of the connector (e.g., 'source-pokeapi'). 926 repo_root: Optional path to the airbyte repo root. If not provided, 927 will attempt to auto-detect from current directory. 928 tag: Tag to apply to the built image (default: 'dev'). 929 930 Returns: 931 The full image name with tag if successful, None if build fails. 932 """ 933 if not verify_docker_installation(): 934 print_error("Docker is not installed or not running") 935 return None 936 937 try: 938 connector_directory = find_connector_root_from_name(connector_name) 939 except FileNotFoundError: 940 if repo_root: 941 connector_directory = repo_root / CONNECTORS_SUBDIR / connector_name 942 if not connector_directory.exists(): 943 print_error(f"Connector directory not found: {connector_directory}") 944 return None 945 else: 946 print_error( 947 f"Could not find connector '{connector_name}'. " 948 "Try providing --repo-root to specify the airbyte repo location." 949 ) 950 return None 951 952 metadata_file_path = connector_directory / "metadata.yaml" 953 if not metadata_file_path.exists(): 954 print_error(f"metadata.yaml not found at {metadata_file_path}") 955 return None 956 957 metadata = MetadataFile.from_file(metadata_file_path) 958 print_success(f"Building image for connector: {connector_name}") 959 960 built_image = build_connector_image( 961 connector_name=connector_name, 962 connector_directory=connector_directory, 963 metadata=metadata, 964 tag=tag, 965 no_verify=False, 966 ) 967 print_success(f"Successfully built image: {built_image}") 968 return built_image 969 970 971def _fetch_control_image_from_metadata(connector_name: str) -> str | None: 972 """Fetch the current released connector image from metadata.yaml on main branch. 973 974 This fetches the connector's metadata.yaml from the airbyte monorepo's master branch 975 and extracts the dockerRepository and dockerImageTag to construct the control image. 976 977 Args: 978 connector_name: The connector name (e.g., 'source-github'). 979 980 Returns: 981 The full connector image with tag (e.g., 'airbyte/source-github:1.0.0'), 982 or None if the metadata could not be fetched or parsed. 983 """ 984 metadata_url = ( 985 f"https://raw.githubusercontent.com/airbytehq/airbyte/master/" 986 f"airbyte-integrations/connectors/{connector_name}/metadata.yaml" 987 ) 988 response = requests.get(metadata_url, timeout=30) 989 if not response.ok: 990 print_error( 991 f"Failed to fetch metadata for {connector_name}: " 992 f"HTTP {response.status_code} from {metadata_url}" 993 ) 994 return None 995 996 metadata = yaml.safe_load(response.text) 997 if not isinstance(metadata, dict): 998 print_error(f"Invalid metadata format for {connector_name}: expected dict") 999 return None 1000 1001 data = metadata.get("data", {}) 1002 docker_repository = data.get("dockerRepository") 1003 docker_image_tag = data.get("dockerImageTag") 1004 1005 if not docker_repository or not docker_image_tag: 1006 print_error( 1007 f"Could not find dockerRepository/dockerImageTag in metadata for {connector_name}" 1008 ) 1009 return None 1010 1011 return f"{docker_repository}:{docker_image_tag}" 1012 1013 1014def _run_with_optional_http_metrics( 1015 connector_image: str, 1016 command: Command, 1017 output_dir: Path, 1018 target_or_control: TargetOrControl, 1019 enable_http_metrics: bool, 1020 config_path: Path | None, 1021 catalog_path: Path | None, 1022 state_path: Path | None, 1023 enable_debug_logs: bool = False, 1024) -> dict: 1025 """Run a connector command with optional HTTP metrics capture. 1026 1027 When enable_http_metrics is True, starts mitmproxy to capture HTTP traffic. 1028 If mitmproxy fails to start, falls back to running without metrics. 1029 1030 Args: 1031 connector_image: Full connector image name with tag. 1032 command: The Airbyte command to run. 1033 output_dir: Directory to store output files. 1034 target_or_control: Whether this is target or control version. 1035 enable_http_metrics: Whether to capture HTTP metrics via mitmproxy. 1036 config_path: Path to connector config JSON file. 1037 catalog_path: Path to configured catalog JSON file. 1038 state_path: Path to state JSON file. 1039 enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container. 1040 1041 Returns: 1042 Dictionary with execution results, optionally including http_metrics. 1043 """ 1044 if not enable_http_metrics: 1045 return _run_connector_command( 1046 connector_image=connector_image, 1047 command=command, 1048 output_dir=output_dir, 1049 target_or_control=target_or_control, 1050 config_path=config_path, 1051 catalog_path=catalog_path, 1052 state_path=state_path, 1053 enable_debug_logs=enable_debug_logs, 1054 ) 1055 1056 with MitmproxyManager.start(output_dir) as session: 1057 if session is None: 1058 print_error("Mitmproxy unavailable, running without HTTP metrics") 1059 return _run_connector_command( 1060 connector_image=connector_image, 1061 command=command, 1062 output_dir=output_dir, 1063 target_or_control=target_or_control, 1064 config_path=config_path, 1065 catalog_path=catalog_path, 1066 state_path=state_path, 1067 enable_debug_logs=enable_debug_logs, 1068 ) 1069 1070 print_success(f"Started mitmproxy on {session.proxy_url}") 1071 result = _run_connector_command( 1072 connector_image=connector_image, 1073 command=command, 1074 output_dir=output_dir, 1075 target_or_control=target_or_control, 1076 config_path=config_path, 1077 catalog_path=catalog_path, 1078 state_path=state_path, 1079 proxy_url=session.proxy_url, 1080 enable_debug_logs=enable_debug_logs, 1081 ) 1082 1083 http_metrics = parse_http_dump(session.dump_file_path) 1084 result["http_metrics"] = { 1085 "flow_count": http_metrics.flow_count, 1086 "duplicate_flow_count": http_metrics.duplicate_flow_count, 1087 } 1088 print_success( 1089 f"Captured {http_metrics.flow_count} HTTP flows " 1090 f"({http_metrics.duplicate_flow_count} duplicates)" 1091 ) 1092 return result 1093 1094 1095@connector_app.command(name="regression-test") 1096def regression_test( 1097 skip_compare: Annotated[ 1098 bool, 1099 Parameter( 1100 help="If True, skip comparison and run single-version tests only. " 1101 "If False (default), run comparison tests (target vs control)." 1102 ), 1103 ] = False, 1104 test_image: Annotated[ 1105 str | None, 1106 Parameter( 1107 help="Test connector image with tag (e.g., airbyte/source-github:1.0.0). " 1108 "This is the image under test - in comparison mode, it's compared against control_image." 1109 ), 1110 ] = None, 1111 control_image: Annotated[ 1112 str | None, 1113 Parameter( 1114 help="Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). " 1115 "Ignored if `skip_compare=True`." 1116 ), 1117 ] = None, 1118 connector_name: Annotated[ 1119 str | None, 1120 Parameter( 1121 help="Connector name to build image from source (e.g., 'source-pokeapi'). " 1122 "If provided, builds the image locally with tag 'dev'. " 1123 "For comparison tests (default), this builds the target image. " 1124 "For single-version tests (skip_compare=True), this builds the test image." 1125 ), 1126 ] = None, 1127 repo_root: Annotated[ 1128 str | None, 1129 Parameter( 1130 help="Path to the airbyte repo root. Required if connector_name is provided " 1131 "and the repo cannot be auto-detected." 1132 ), 1133 ] = None, 1134 command: Annotated[ 1135 Literal["spec", "check", "discover", "read"], 1136 Parameter(help="The Airbyte command to run."), 1137 ] = "check", 1138 connection_id: Annotated[ 1139 str | None, 1140 Parameter( 1141 help="Airbyte Cloud connection ID to fetch config/catalog from. " 1142 "Mutually exclusive with config-path/catalog-path. " 1143 "If provided, test_image/control_image can be auto-detected." 1144 ), 1145 ] = None, 1146 config_path: Annotated[ 1147 str | None, 1148 Parameter(help="Path to the connector config JSON file."), 1149 ] = None, 1150 catalog_path: Annotated[ 1151 str | None, 1152 Parameter(help="Path to the configured catalog JSON file (required for read)."), 1153 ] = None, 1154 state_path: Annotated[ 1155 str | None, 1156 Parameter(help="Path to the state JSON file (optional for read)."), 1157 ] = None, 1158 output_dir: Annotated[ 1159 str, 1160 Parameter(help="Directory to store test artifacts."), 1161 ] = "/tmp/regression_test_artifacts", 1162 enable_http_metrics: Annotated[ 1163 bool, 1164 Parameter( 1165 help="Capture HTTP traffic metrics via mitmproxy (experimental). " 1166 "Requires mitmdump to be installed. Only used in comparison mode." 1167 ), 1168 ] = False, 1169 selected_streams: Annotated[ 1170 str | None, 1171 Parameter( 1172 help="Comma-separated list of stream names to include in the read. " 1173 "Only these streams will be included in the configured catalog. " 1174 "This is useful to limit data volume by testing only specific streams." 1175 ), 1176 ] = None, 1177 enable_debug_logs: Annotated[ 1178 bool, 1179 Parameter( 1180 help="Enable debug-level logging for regression test output. " 1181 "Also passed as `LOG_LEVEL=DEBUG` to the connector Docker container." 1182 ), 1183 ] = False, 1184 with_state: Annotated[ 1185 bool | None, 1186 Parameter( 1187 negative="--no-state", 1188 help="Fetch and pass the connection's current state to the read command, " 1189 "producing a warm read instead of a cold read. Defaults to `True` when " 1190 "`--connection-id` is provided, `False` otherwise. Has no effect unless " 1191 "the command is `read`. Ignored when `--state-path` is explicitly provided.", 1192 ), 1193 ] = None, 1194) -> None: 1195 """Run regression tests on connectors. 1196 1197 This command supports two modes: 1198 1199 Comparison mode (skip_compare=False, default): 1200 Runs the specified Airbyte protocol command against both the target (new) 1201 and control (baseline) connector versions, then compares the results. 1202 This helps identify regressions between versions. 1203 1204 Single-version mode (skip_compare=True): 1205 Runs the specified Airbyte protocol command against a single connector 1206 and validates the output. No comparison is performed. 1207 1208 Results are written to the output directory and to GitHub Actions outputs 1209 if running in CI. 1210 1211 You can provide the test image in three ways: 1212 1. --test-image: Use a pre-built image from Docker registry 1213 2. --connector-name: Build the image locally from source code 1214 3. --connection-id: Auto-detect from an Airbyte Cloud connection 1215 1216 You can provide config/catalog either via file paths OR via a connection_id 1217 that fetches them from Airbyte Cloud. 1218 """ 1219 # Configure debug logging for the regression test harness when requested 1220 if enable_debug_logs: 1221 logging.basicConfig( 1222 level=logging.DEBUG, 1223 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 1224 force=True, 1225 ) 1226 1227 output_path = Path(output_dir) 1228 output_path.mkdir(parents=True, exist_ok=True) 1229 1230 cmd = Command(command) 1231 1232 config_file: Path | None = None 1233 catalog_file: Path | None = None 1234 state_file = Path(state_path) if state_path else None 1235 1236 # Resolve the test image (used in both single-version and comparison modes) 1237 resolved_test_image: str | None = test_image 1238 resolved_control_image: str | None = control_image 1239 1240 # Validate conflicting parameters 1241 # Single-version mode: reject comparison-specific parameters 1242 if skip_compare and control_image: 1243 write_github_output("success", False) 1244 write_github_output( 1245 "error", "Cannot specify control_image with skip_compare=True" 1246 ) 1247 exit_with_error( 1248 "Cannot specify --control-image with --skip-compare. " 1249 "Control image is only used in comparison mode." 1250 ) 1251 1252 # If connector_name is provided, build the image from source 1253 if connector_name: 1254 if resolved_test_image: 1255 write_github_output("success", False) 1256 write_github_output( 1257 "error", "Cannot specify both test_image and connector_name" 1258 ) 1259 exit_with_error("Cannot specify both --test-image and --connector-name") 1260 1261 repo_root_path = Path(repo_root) if repo_root else None 1262 built_image = _build_connector_image_from_source( 1263 connector_name=connector_name, 1264 repo_root=repo_root_path, 1265 tag="dev", 1266 ) 1267 if not built_image: 1268 write_github_output("success", False) 1269 write_github_output("error", f"Failed to build image for {connector_name}") 1270 exit_with_error(f"Failed to build image for {connector_name}") 1271 resolved_test_image = built_image 1272 1273 if connection_id: 1274 if config_path or catalog_path: 1275 write_github_output("success", False) 1276 write_github_output( 1277 "error", "Cannot specify both connection_id and file paths" 1278 ) 1279 exit_with_error( 1280 "Cannot specify both connection_id and config_path/catalog_path" 1281 ) 1282 1283 print_success(f"Fetching config/catalog from connection: {connection_id}") 1284 connection_data = fetch_connection_data(connection_id) 1285 1286 # Check if we should retrieve unmasked secrets 1287 if should_use_secret_retriever(): 1288 print_success( 1289 "USE_CONNECTION_SECRET_RETRIEVER enabled - enriching config with unmasked secrets..." 1290 ) 1291 try: 1292 connection_data = enrich_config_with_secrets( 1293 connection_data, 1294 retrieval_reason="Regression test with USE_CONNECTION_SECRET_RETRIEVER=true", 1295 ) 1296 print_success("Successfully retrieved unmasked secrets from database") 1297 except SecretRetrievalError as e: 1298 write_github_output("success", False) 1299 write_github_output("error", str(e)) 1300 exit_with_error( 1301 f"{e}\n\n" 1302 f"This connection cannot be used for regression testing. " 1303 f"Please use a connection from a non-EU workspace, or use GSM-based " 1304 f"integration test credentials instead (by omitting --connection-id)." 1305 ) 1306 1307 # Resolve with_state default: True when connection_id is provided 1308 resolved_with_state = with_state if with_state is not None else True 1309 1310 if resolved_with_state and not state_file and command == "read": 1311 print_success("Fetching connection state for warm read...") 1312 try: 1313 connection_data.state = fetch_connection_state(connection_data) 1314 if connection_data.state: 1315 print_success( 1316 f"Fetched state with {len(connection_data.state)} stream(s)" 1317 ) 1318 else: 1319 connection_data.state = None 1320 print_success( 1321 "No state available for this connection (initial sync)" 1322 ) 1323 except Exception as exc: 1324 print_error(f"Failed to fetch connection state: {exc}") 1325 print_success("Falling back to cold read (no state)") 1326 1327 config_file, catalog_file, state_file_from_conn = save_connection_data_to_files( 1328 connection_data, output_path / "connection_data" 1329 ) 1330 1331 if state_file_from_conn and not state_file: 1332 state_file = state_file_from_conn 1333 1334 print_success( 1335 f"Fetched config for source: {connection_data.source_name} " 1336 f"with {len(connection_data.stream_names)} streams" 1337 ) 1338 1339 # Auto-detect test/control image from connection if not provided 1340 if not resolved_test_image and connection_data.connector_image: 1341 resolved_test_image = connection_data.connector_image 1342 print_success(f"Auto-detected test image: {resolved_test_image}") 1343 1344 if ( 1345 not skip_compare 1346 and not resolved_control_image 1347 and connection_data.connector_image 1348 ): 1349 resolved_control_image = connection_data.connector_image 1350 print_success(f"Auto-detected control image: {resolved_control_image}") 1351 elif config_path: 1352 config_file = Path(config_path) 1353 catalog_file = Path(catalog_path) if catalog_path else None 1354 elif connector_name: 1355 # Fallback: fetch integration test secrets from GSM using PyAirbyte API 1356 print_success( 1357 f"No connection_id or config_path provided. " 1358 f"Attempting to fetch integration test config from GSM for {connector_name}..." 1359 ) 1360 gsm_config = get_first_config_from_secrets(connector_name) 1361 if gsm_config: 1362 # Write config to a temp file (not in output_path to avoid artifact upload) 1363 gsm_config_dir = Path( 1364 tempfile.mkdtemp(prefix=f"gsm-config-{connector_name}-") 1365 ) 1366 gsm_config_dir.chmod(0o700) 1367 gsm_config_file = gsm_config_dir / "config.json" 1368 gsm_config_file.write_text(json.dumps(gsm_config, indent=2)) 1369 gsm_config_file.chmod(0o600) 1370 config_file = gsm_config_file 1371 # Use catalog_path if provided (e.g., generated from discover output) 1372 catalog_file = Path(catalog_path) if catalog_path else None 1373 print_success( 1374 f"Fetched integration test config from GSM for {connector_name}" 1375 ) 1376 else: 1377 print_error( 1378 f"Failed to fetch integration test config from GSM for {connector_name}." 1379 ) 1380 config_file = None 1381 # Use catalog_path if provided (e.g., generated from discover output) 1382 catalog_file = Path(catalog_path) if catalog_path else None 1383 else: 1384 config_file = None 1385 catalog_file = Path(catalog_path) if catalog_path else None 1386 1387 # Auto-detect control_image from metadata.yaml if connector_name is provided (comparison mode only) 1388 if not skip_compare and not resolved_control_image and connector_name: 1389 resolved_control_image = _fetch_control_image_from_metadata(connector_name) 1390 if resolved_control_image: 1391 print_success( 1392 f"Auto-detected control image from metadata.yaml: {resolved_control_image}" 1393 ) 1394 1395 # Validate that we have the required images 1396 if not resolved_test_image: 1397 write_github_output("success", False) 1398 write_github_output("error", "No test image specified") 1399 exit_with_error( 1400 "You must provide one of the following: a test_image, a connector_name " 1401 "to build the image from source, or a connection_id to auto-detect the image." 1402 ) 1403 1404 if not skip_compare and not resolved_control_image: 1405 write_github_output("success", False) 1406 write_github_output("error", "No control image specified") 1407 exit_with_error( 1408 "You must provide one of the following: a control_image, a connection_id " 1409 "for a connection that has an associated connector image, or a connector_name " 1410 "to auto-detect the control image from the airbyte repo's metadata.yaml." 1411 ) 1412 1413 # Pull images if they weren't just built locally 1414 # If connector_name was provided, we just built the test image locally 1415 if not connector_name and not ensure_image_available(resolved_test_image): 1416 write_github_output("success", False) 1417 write_github_output("error", f"Failed to pull image: {resolved_test_image}") 1418 exit_with_error(f"Failed to pull test image: {resolved_test_image}") 1419 1420 if ( 1421 not skip_compare 1422 and resolved_control_image 1423 and not ensure_image_available(resolved_control_image) 1424 ): 1425 write_github_output("success", False) 1426 write_github_output("error", f"Failed to pull image: {resolved_control_image}") 1427 exit_with_error( 1428 f"Failed to pull control connector image: {resolved_control_image}" 1429 ) 1430 1431 # Apply selected_streams filter to catalog if requested 1432 if selected_streams and catalog_file: 1433 streams_set = {s.strip() for s in selected_streams.split(",") if s.strip()} 1434 if streams_set: 1435 print_success( 1436 f"Filtering catalog to {len(streams_set)} selected streams: " 1437 f"{', '.join(sorted(streams_set))}" 1438 ) 1439 filter_configured_catalog_file(catalog_file, streams_set) 1440 1441 # Upgrade READ → READ_WITH_STATE when a state file is available 1442 if cmd == Command.READ and state_file: 1443 cmd = Command.READ_WITH_STATE 1444 print_success("State available — using read-with-state (warm read)") 1445 1446 # Track telemetry for the regression test 1447 # Extract version from image tag (e.g., "airbyte/source-github:1.0.0" -> "1.0.0") 1448 target_version = ( 1449 resolved_test_image.rsplit(":", 1)[-1] 1450 if ":" in resolved_test_image 1451 else "unknown" 1452 ) 1453 control_version = None 1454 if resolved_control_image and ":" in resolved_control_image: 1455 control_version = resolved_control_image.rsplit(":", 1)[-1] 1456 1457 # Get tester identity from environment (GitHub Actions sets GITHUB_ACTOR) 1458 tester = os.getenv("GITHUB_ACTOR") or os.getenv("USER") 1459 1460 track_regression_test( 1461 user_id=tester, 1462 connector_image=resolved_test_image, 1463 command=command, 1464 target_version=target_version, 1465 control_version=control_version, 1466 additional_properties={ 1467 "connection_id": connection_id, 1468 "skip_compare": skip_compare, 1469 "with_state": cmd == Command.READ_WITH_STATE, 1470 }, 1471 ) 1472 1473 # Execute the appropriate mode 1474 if skip_compare: 1475 # Single-version mode: run only the connector image 1476 result = _run_connector_command( 1477 connector_image=resolved_test_image, 1478 command=cmd, 1479 output_dir=output_path, 1480 target_or_control=TargetOrControl.TARGET, 1481 config_path=config_file, 1482 catalog_path=catalog_file, 1483 state_path=state_file, 1484 enable_debug_logs=enable_debug_logs, 1485 ) 1486 1487 print_json(result) 1488 1489 write_github_outputs( 1490 { 1491 "success": result["success"], 1492 "connector": resolved_test_image, 1493 "command": command, 1494 "exit_code": result["exit_code"], 1495 } 1496 ) 1497 1498 write_test_summary( 1499 connector_image=resolved_test_image, 1500 test_type="regression-test", 1501 success=result["success"], 1502 results={ 1503 "command": command, 1504 "exit_code": result["exit_code"], 1505 "output_dir": output_dir, 1506 }, 1507 ) 1508 1509 # Generate report.md with detailed metrics 1510 report_path = generate_single_version_report( 1511 connector_image=resolved_test_image, 1512 command=command, 1513 result=result, 1514 output_dir=output_path, 1515 ) 1516 print_success(f"Generated report: {report_path}") 1517 1518 # Write report to GITHUB_STEP_SUMMARY (if env var exists) 1519 write_github_summary(report_path.read_text()) 1520 1521 if result["success"]: 1522 print_success( 1523 f"Single-version regression test passed for {resolved_test_image}" 1524 ) 1525 else: 1526 print_error( 1527 f"Single-version regression test failed for {resolved_test_image}" 1528 ) 1529 else: 1530 # Comparison mode: run both target and control images 1531 target_output = output_path / "target" 1532 control_output = output_path / "control" 1533 1534 target_result = _run_with_optional_http_metrics( 1535 connector_image=resolved_test_image, 1536 command=cmd, 1537 output_dir=target_output, 1538 target_or_control=TargetOrControl.TARGET, 1539 enable_http_metrics=enable_http_metrics, 1540 config_path=config_file, 1541 catalog_path=catalog_file, 1542 state_path=state_file, 1543 enable_debug_logs=enable_debug_logs, 1544 ) 1545 1546 control_result = _run_with_optional_http_metrics( 1547 connector_image=resolved_control_image, # type: ignore[arg-type] 1548 command=cmd, 1549 output_dir=control_output, 1550 target_or_control=TargetOrControl.CONTROL, 1551 enable_http_metrics=enable_http_metrics, 1552 config_path=config_file, 1553 catalog_path=catalog_file, 1554 state_path=state_file, 1555 enable_debug_logs=enable_debug_logs, 1556 ) 1557 1558 both_succeeded = target_result["success"] and control_result["success"] 1559 regression_detected = target_result["success"] != control_result["success"] 1560 1561 combined_result = { 1562 "target": target_result, 1563 "control": control_result, 1564 "both_succeeded": both_succeeded, 1565 "regression_detected": regression_detected, 1566 } 1567 1568 print_json(combined_result) 1569 1570 both_failed = not target_result["success"] and not control_result["success"] 1571 1572 write_github_outputs( 1573 { 1574 "success": not regression_detected, 1575 "target_image": resolved_test_image, 1576 "control_image": resolved_control_image, 1577 "command": command, 1578 "target_exit_code": target_result["exit_code"], 1579 "control_exit_code": control_result["exit_code"], 1580 "regression_detected": regression_detected, 1581 "both_failed": both_failed, 1582 } 1583 ) 1584 1585 write_json_output("regression_report", combined_result) 1586 1587 report_path = generate_regression_report( 1588 target_image=resolved_test_image, 1589 control_image=resolved_control_image, # type: ignore[arg-type] 1590 command=command, 1591 target_result=target_result, 1592 control_result=control_result, 1593 output_dir=output_path, 1594 ) 1595 print_success(f"Generated regression report: {report_path}") 1596 1597 # Write report to GITHUB_STEP_SUMMARY (if env var exists) 1598 write_github_summary(report_path.read_text()) 1599 1600 if regression_detected: 1601 print_error( 1602 f"Regression detected between {resolved_test_image} and {resolved_control_image}" 1603 ) 1604 elif both_succeeded: 1605 print_success( 1606 f"Regression test passed for {resolved_test_image} vs {resolved_control_image}" 1607 ) 1608 else: 1609 # Both versions failed, but no regression between them was detected; treat this as a 1610 # test environment issue (e.g., expired credentials, transient API errors, rate limiting). 1611 # Exit successfully since no regression between versions was detected. 1612 print_warning( 1613 f"Both versions failed for {resolved_test_image} vs {resolved_control_image}. " 1614 "No regression detected. This may indicate expired credentials or a transient API issue." 1615 ) 1616 1617 1618@connector_app.command(name="fetch-connection-config") 1619def fetch_connection_config_cmd( 1620 connection_id: Annotated[ 1621 str, 1622 Parameter(help="The UUID of the Airbyte Cloud connection."), 1623 ], 1624 output_path: Annotated[ 1625 str | None, 1626 Parameter( 1627 help="Path to output file or directory. " 1628 "If directory, writes connection-<id>-config.json inside it. " 1629 "Default: platform temp directory (e.g. /tmp/connection-<id>-config.json)" 1630 ), 1631 ] = None, 1632 with_secrets: Annotated[ 1633 bool, 1634 Parameter( 1635 name="--with-secrets", 1636 negative="--no-secrets", 1637 help="If set, fetches unmasked secrets from the internal database. " 1638 "Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or `gcloud auth application-default login`. " 1639 "Must be used with --oc-issue-url.", 1640 ), 1641 ] = False, 1642 oc_issue_url: Annotated[ 1643 str | None, 1644 Parameter( 1645 help="OC issue URL for audit logging. Required when using --with-secrets." 1646 ), 1647 ] = None, 1648) -> None: 1649 """Fetch connection configuration from Airbyte Cloud to a local file. 1650 1651 This command retrieves the source configuration for a given connection ID 1652 and writes it to a JSON file. When `--output-path` is omitted the file is 1653 written to the platform temp directory to avoid accidentally committing 1654 secrets to a git repository. 1655 1656 Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and 1657 AIRBYTE_CLOUD_CLIENT_SECRET environment variables. 1658 1659 When --with-secrets is specified, the command fetches unmasked secrets from 1660 the internal database using the connection-retriever. This additionally requires: 1661 - An OC issue URL for audit logging (--oc-issue-url) 1662 - GCP credentials via `GCP_PROD_DB_ACCESS_CREDENTIALS` env var or `gcloud auth application-default login` 1663 - If `CI=true`: expects `cloud-sql-proxy` running on localhost, or 1664 direct network access to the Cloud SQL instance. 1665 """ 1666 path = Path(output_path) if output_path else None 1667 result = fetch_connection_config( 1668 connection_id=connection_id, 1669 output_path=path, 1670 with_secrets=with_secrets, 1671 oc_issue_url=oc_issue_url, 1672 ) 1673 if result.success: 1674 print_success(result.message) 1675 else: 1676 print_error(result.message) 1677 print_json(result.model_dump()) 1678 1679 1680def _read_json_input( 1681 json_str: str | None, 1682 input_file: str | None, 1683) -> dict: 1684 """Read JSON from a positional arg, --input file, or STDIN (in that priority).""" 1685 if json_str is not None: 1686 return json.loads(json_str) 1687 if input_file is not None: 1688 return json.loads(Path(input_file).read_text()) 1689 if not sys.stdin.isatty(): 1690 return json.loads(sys.stdin.read()) 1691 exit_with_error( 1692 "No JSON input provided. Pass it as a positional argument, " 1693 "via --input <file>, or pipe to STDIN." 1694 ) 1695 raise SystemExit(1) # unreachable, but satisfies type checker 1696 1697 1698@state_app.command(name="get") 1699def get_connection_state( 1700 connection_id: Annotated[ 1701 str, 1702 Parameter(help="The connection ID (UUID) to fetch state for."), 1703 ], 1704 stream_name: Annotated[ 1705 str | None, 1706 Parameter( 1707 help="Optional stream name to filter state for a single stream.", 1708 name="--stream", 1709 ), 1710 ] = None, 1711 stream_namespace: Annotated[ 1712 str | None, 1713 Parameter( 1714 help="Optional stream namespace to narrow the stream filter.", 1715 name="--namespace", 1716 ), 1717 ] = None, 1718 output: Annotated[ 1719 str | None, 1720 Parameter( 1721 help="Path to write the state JSON output to a file.", 1722 name="--output", 1723 ), 1724 ] = None, 1725) -> None: 1726 """Get the current state for an Airbyte Cloud connection.""" 1727 workspace = CloudWorkspace.from_env() 1728 conn = workspace.get_connection(connection_id) 1729 1730 if stream_name is not None: 1731 stream_state = conn.get_stream_state( 1732 stream_name=stream_name, 1733 stream_namespace=stream_namespace, 1734 ) 1735 result = { 1736 "connection_id": connection_id, 1737 "stream_name": stream_name, 1738 "stream_namespace": stream_namespace, 1739 "stream_state": stream_state, 1740 } 1741 else: 1742 result = conn.dump_raw_state() 1743 1744 json_output = json.dumps(result, indent=2, default=str) 1745 if output is not None: 1746 Path(output).write_text(json_output + "\n") 1747 print(f"State written to {output}", file=sys.stderr) 1748 else: 1749 print(json_output) 1750 1751 1752@state_app.command(name="set") 1753def set_connection_state( 1754 connection_id: Annotated[ 1755 str, 1756 Parameter(help="The connection ID (UUID) to update state for."), 1757 ], 1758 state_json: Annotated[ 1759 str | None, 1760 Parameter( 1761 help="The connection state as a JSON string. When --stream is used, " 1762 'this is just the stream\'s state blob (e.g., \'{"cursor": "2024-01-01"}\').' 1763 " Otherwise, must include 'stateType', 'connectionId', and the appropriate " 1764 "state field. Can also be provided via --input or STDIN." 1765 ), 1766 ] = None, 1767 stream_name: Annotated[ 1768 str | None, 1769 Parameter( 1770 help="Optional stream name to update state for a single stream only.", 1771 name="--stream", 1772 ), 1773 ] = None, 1774 stream_namespace: Annotated[ 1775 str | None, 1776 Parameter( 1777 help="Optional stream namespace to identify the stream.", 1778 name="--namespace", 1779 ), 1780 ] = None, 1781 input_file: Annotated[ 1782 str | None, 1783 Parameter( 1784 help="Path to a JSON file containing the state to set.", 1785 name="--input", 1786 ), 1787 ] = None, 1788) -> None: 1789 """Set the state for an Airbyte Cloud connection. 1790 1791 State JSON can be provided as a positional argument, via --input <file>, 1792 or piped through STDIN. 1793 1794 Uses the safe variant that prevents updates while a sync is running. 1795 When --stream is provided, only that stream's state is updated within 1796 the existing connection state. 1797 """ 1798 parsed_state = _read_json_input(state_json, input_file) 1799 workspace = CloudWorkspace.from_env() 1800 conn = workspace.get_connection(connection_id) 1801 1802 if stream_name is not None: 1803 conn.set_stream_state( 1804 stream_name=stream_name, 1805 state_blob_dict=parsed_state, 1806 stream_namespace=stream_namespace, 1807 ) 1808 else: 1809 conn.import_raw_state(parsed_state) 1810 1811 result = conn.dump_raw_state() 1812 json_output = json.dumps(result, indent=2, default=str) 1813 print(json_output) 1814 1815 1816@catalog_app.command(name="get") 1817def get_connection_catalog( 1818 connection_id: Annotated[ 1819 str, 1820 Parameter(help="The connection ID (UUID) to fetch catalog for."), 1821 ], 1822 output: Annotated[ 1823 str | None, 1824 Parameter( 1825 help="Path to write the catalog JSON output to a file.", 1826 name="--output", 1827 ), 1828 ] = None, 1829) -> None: 1830 """Get the configured catalog for an Airbyte Cloud connection.""" 1831 workspace = CloudWorkspace.from_env() 1832 conn = workspace.get_connection(connection_id) 1833 result = conn.dump_raw_catalog() 1834 if result is None: 1835 exit_with_error("No configured catalog found for this connection.") 1836 raise SystemExit(1) # unreachable, but satisfies type checker 1837 1838 json_output = json.dumps(result, indent=2, default=str) 1839 if output is not None: 1840 Path(output).write_text(json_output + "\n") 1841 print(f"Catalog written to {output}", file=sys.stderr) 1842 else: 1843 print(json_output) 1844 1845 1846@catalog_app.command(name="set") 1847def set_connection_catalog( 1848 connection_id: Annotated[ 1849 str, 1850 Parameter(help="The connection ID (UUID) to update catalog for."), 1851 ], 1852 catalog_json: Annotated[ 1853 str | None, 1854 Parameter( 1855 help="The configured catalog as a JSON string. " 1856 "Can also be provided via --input or STDIN." 1857 ), 1858 ] = None, 1859 input_file: Annotated[ 1860 str | None, 1861 Parameter( 1862 help="Path to a JSON file containing the catalog to set.", 1863 name="--input", 1864 ), 1865 ] = None, 1866) -> None: 1867 """Set the configured catalog for an Airbyte Cloud connection. 1868 1869 Catalog JSON can be provided as a positional argument, via --input <file>, 1870 or piped through STDIN. 1871 1872 WARNING: This replaces the entire configured catalog. 1873 """ 1874 parsed_catalog = _read_json_input(catalog_json, input_file) 1875 workspace = CloudWorkspace.from_env() 1876 conn = workspace.get_connection(connection_id) 1877 conn.import_raw_catalog(parsed_catalog) 1878 1879 result = conn.dump_raw_catalog() 1880 if result is None: 1881 exit_with_error("Failed to retrieve catalog after import.") 1882 raise SystemExit(1) # unreachable, but satisfies type checker 1883 1884 json_output = json.dumps(result, indent=2, default=str) 1885 print(json_output) 1886 1887 1888@logs_app.command(name="lookup-cloud-backend-error") 1889def lookup_cloud_backend_error( 1890 error_id: Annotated[ 1891 str, 1892 Parameter( 1893 help=( 1894 "The error ID (UUID) to search for. This is typically returned " 1895 "in API error responses as {'errorId': '...'}" 1896 ) 1897 ), 1898 ], 1899 lookback_days: Annotated[ 1900 int, 1901 Parameter(help="Number of days to look back in logs."), 1902 ] = 7, 1903 min_severity_filter: Annotated[ 1904 GCPSeverity | None, 1905 Parameter( 1906 help="Optional minimum severity level to filter logs.", 1907 ), 1908 ] = None, 1909 raw: Annotated[ 1910 bool, 1911 Parameter(help="Output raw JSON instead of formatted text."), 1912 ] = False, 1913) -> None: 1914 """Look up error details from GCP Cloud Logging by error ID. 1915 1916 When an Airbyte Cloud API returns an error response with only an error ID 1917 (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command 1918 fetches the full stack trace and error details from GCP Cloud Logging. 1919 1920 Requires GCP credentials with Logs Viewer role on the target project. 1921 Set up credentials with: gcloud auth application-default login 1922 """ 1923 print(f"Searching for error ID: {error_id}", file=sys.stderr) 1924 print(f"Lookback days: {lookback_days}", file=sys.stderr) 1925 if min_severity_filter: 1926 print(f"Severity filter: {min_severity_filter}", file=sys.stderr) 1927 print(file=sys.stderr) 1928 1929 result = fetch_error_logs( 1930 error_id=error_id, 1931 lookback_days=lookback_days, 1932 min_severity_filter=min_severity_filter, 1933 ) 1934 1935 if raw: 1936 print_json(result.model_dump()) 1937 return 1938 1939 print(f"Found {result.total_entries_found} log entries", file=sys.stderr) 1940 print(file=sys.stderr) 1941 1942 if result.payloads: 1943 for i, payload in enumerate(result.payloads): 1944 print(f"=== Log Group {i + 1} ===") 1945 print(f"Timestamp: {payload.timestamp}") 1946 print(f"Severity: {payload.severity}") 1947 if payload.resource.labels.pod_name: 1948 print(f"Pod: {payload.resource.labels.pod_name}") 1949 print(f"Lines: {payload.num_log_lines}") 1950 print() 1951 print(payload.message) 1952 print() 1953 elif result.entries: 1954 print("No grouped payloads, showing raw entries:", file=sys.stderr) 1955 for entry in result.entries: 1956 print(f"[{entry.timestamp}] {entry.severity}: {entry.payload}") 1957 else: 1958 print_error("No log entries found for this error ID.")