airbyte_ops_mcp.cli.cloud
CLI commands for Airbyte Cloud operations.
Commands:
airbyte-ops cloud connector get-version-info - Get connector version info airbyte-ops cloud connector set-version-override - Set connector version override airbyte-ops cloud connector clear-version-override - Clear connector version override airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file
CLI reference
The commands below are regenerated by poe docs-generate via cyclopts's
programmatic docs API; see docs/generate_cli.py.
airbyte-ops cloud COMMAND
Airbyte Cloud operations.
Commands:
connection: Connection operations in Airbyte Cloud.connector: Deployed connector operations in Airbyte Cloud.db: Database operations for Airbyte Cloud Prod DB Replica.logs: GCP Cloud Logging operations for Airbyte Cloud.
airbyte-ops cloud connector
Deployed connector operations in Airbyte Cloud.
airbyte-ops cloud connector get-version-info
airbyte-ops cloud connector get-version-info WORKSPACE-ID CONNECTOR-ID CONNECTOR-TYPE
Get the current version information for a deployed connector.
Parameters:
WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. [required]CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). [required]CONNECTOR-TYPE, --connector-type: The type of connector. [required] [choices: source, destination]
airbyte-ops cloud connector set-version-override
airbyte-ops cloud connector set-version-override VERSION REASON ISSUE-URL APPROVAL-COMMENT-URL [ARGS]
Set a version override for a deployed connector.
Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and
AIRBYTE_INTERNAL_ADMIN_USER environment variables.
The --override-level flag selects the scope at which the pin is applied:
actor(default): pins a single deployed connector instance. Requires--workspace-id,--connector-id, and--connector-type.workspace: pins ALL instances of a connector type within a workspace. Requires--workspace-idand--actor-definition-id.organization: pins ALL instances of a connector type across an organization. Requires--organization-idand--actor-definition-id.
The customer_tier_filter gates the operation: the call fails if the
actual tier of the target organization does not match. Use ALL to
proceed regardless of tier (a warning is shown for sensitive tiers).
Parameters:
VERSION, --version: The semver version string to pin to (e.g., '2.1.5-preview.abc1234'). [required]REASON, --reason: Explanation for the override (min 10 characters). [required]ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]OVERRIDE-LEVEL, --override-level: Scope at which to apply the version override: 'actor' (single deployed connector instance), 'workspace' (all instances of a connector type within a workspace), or 'organization' (all instances across an organization). Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).REASON-URL, --reason-url: Optional URL with more context (e.g., issue link).CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]
airbyte-ops cloud connector clear-version-override
airbyte-ops cloud connector clear-version-override ISSUE-URL APPROVAL-COMMENT-URL [ARGS]
Clear a version override from a deployed connector.
Requires admin authentication via AIRBYTE_INTERNAL_ADMIN_FLAG and
AIRBYTE_INTERNAL_ADMIN_USER environment variables.
The --override-level flag selects the scope at which the pin is removed:
actor(default): clears a single deployed connector instance pin. Requires--workspace-id,--connector-id, and--connector-type.workspace: clears the workspace-level pin for a connector type. Requires--workspace-idand--actor-definition-id.organization: clears the organization-level pin for a connector type. Requires--organization-idand--actor-definition-id.
Parameters:
ISSUE-URL, --issue-url: GitHub issue URL providing context for this operation. [required]APPROVAL-COMMENT-URL, --approval-comment-url: Slack approval record URL where admin authorized this deployment. [required]OVERRIDE-LEVEL, --override-level: Scope at which to clear the version override: 'actor', 'workspace', or 'organization'. Defaults to 'actor'. [choices: actor, workspace, organization] [default: actor]WORKSPACE-ID, --workspace-id: The Airbyte Cloud workspace ID. Required for 'actor' and 'workspace' override levels.ORGANIZATION-ID, --organization-id: The Airbyte Cloud organization ID. Required for 'organization' override level.CONNECTOR-ID, --connector-id: The ID of the deployed connector (source or destination). Required for 'actor' override level.CONNECTOR-TYPE, --connector-type: The type of connector. Required for 'actor' override level. [choices: source, destination]ACTOR-DEFINITION-ID, --actor-definition-id: The connector definition UUID. Required for 'workspace' and 'organization' override levels.AI-AGENT-SESSION-URL, --ai-agent-session-url: URL to AI agent session driving this operation (for auditability).CUSTOMER-TIER-FILTER, --customer-tier-filter: Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. The operation will be rejected if the actual customer tier does not match. Defaults to 'TIER_2' (non-sensitive customers). [choices: TIER_0, TIER_1, TIER_2, ALL] [default: TIER_2]
airbyte-ops cloud connector regression-test
airbyte-ops cloud connector regression-test [ARGS]
Run regression tests on connectors.
This command supports two modes:
Comparison mode (skip_compare=False, default): Runs the specified Airbyte protocol command against both the target (new) and control (baseline) connector versions, then compares the results. This helps identify regressions between versions.
Single-version mode (skip_compare=True): Runs the specified Airbyte protocol command against a single connector and validates the output. No comparison is performed.
Results are written to the output directory and to GitHub Actions outputs if running in CI.
You can provide the test image in three ways:
- --test-image: Use a pre-built image from Docker registry
- --connector-name: Build the image locally from source code
- --connection-id: Auto-detect from an Airbyte Cloud connection
You can provide config/catalog either via file paths OR via a connection_id that fetches them from Airbyte Cloud.
Parameters:
SKIP-COMPARE, --skip-compare, --no-skip-compare: If True, skip comparison and run single-version tests only. If False (default), run comparison tests (target vs control). [default: False]TEST-IMAGE, --test-image: Test connector image with tag (e.g., airbyte/source-github:1.0.0). This is the image under test - in comparison mode, it's compared against control_image.CONTROL-IMAGE, --control-image: Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). Ignored ifskip_compare=True.CONNECTOR-NAME, --connector-name: Connector name to build image from source (e.g., 'source-pokeapi'). If provided, builds the image locally with tag 'dev'. For comparison tests (default), this builds the target image. For single-version tests (skip_compare=True), this builds the test image.REPO-ROOT, --repo-root: Path to the airbyte repo root. Required if connector_name is provided and the repo cannot be auto-detected.COMMAND, --command: The Airbyte command to run. [choices: spec, check, discover, read] [default: check]CONNECTION-ID, --connection-id: Airbyte Cloud connection ID to fetch config/catalog from. Mutually exclusive with config-path/catalog-path. If provided, test_image/control_image can be auto-detected.CONFIG-PATH, --config-path: Path to the connector config JSON file.CATALOG-PATH, --catalog-path: Path to the configured catalog JSON file (required for read).STATE-PATH, --state-path: Path to the state JSON file (optional for read).OUTPUT-DIR, --output-dir: Directory to store test artifacts. [default: /tmp/regression_test_artifacts]ENABLE-HTTP-METRICS, --enable-http-metrics, --no-enable-http-metrics: Capture HTTP traffic metrics via mitmproxy (experimental). Requires mitmdump to be installed. Only used in comparison mode. [default: False]SELECTED-STREAMS, --selected-streams: Comma-separated list of stream names to include in the read. Only these streams will be included in the configured catalog. This is useful to limit data volume by testing only specific streams.ENABLE-DEBUG-LOGS, --enable-debug-logs, --no-enable-debug-logs: Enable debug-level logging for regression test output. Also passed asLOG_LEVEL=DEBUGto the connector Docker container. [default: False]WITH-STATE, --with-state, --no-state: Fetch and pass the connection's current state to the read command, producing a warm read instead of a cold read. Defaults toTruewhen--connection-idis provided,Falseotherwise. Has no effect unless the command isread. Ignored when--state-pathis explicitly provided.
airbyte-ops cloud connector fetch-connection-config
airbyte-ops cloud connector fetch-connection-config CONNECTION-ID [ARGS]
Fetch connection configuration from Airbyte Cloud to a local file.
This command retrieves the source configuration for a given connection ID
and writes it to a JSON file. When --output-path is omitted the file is
written to the platform temp directory to avoid accidentally committing
secrets to a git repository.
Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET environment variables.
When --with-secrets is specified, the command fetches unmasked secrets from the internal database using the connection-retriever. This additionally requires:
- An OC issue URL for audit logging (--oc-issue-url)
- GCP credentials via
GCP_PROD_DB_ACCESS_CREDENTIALSenv var orgcloud auth application-default login - Cloud SQL Python Connector access to the Prod DB replica.
Parameters:
CONNECTION-ID, --connection-id: The UUID of the Airbyte Cloud connection. [required]OUTPUT-PATH, --output-path: Path to output file or directory. If directory, writes connection--config.json inside it. Default: platform temp directory (e.g. /tmp/connection- -config.json) WITH-SECRETS, --with-secrets, --no-secrets: If set, fetches unmasked secrets from the internal database. Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var orgcloud auth application-default login. Must be used with --oc-issue-url. [default: False]OC-ISSUE-URL, --oc-issue-url: OC issue URL for audit logging. Required when using --with-secrets.
airbyte-ops cloud db
Database operations for Airbyte Cloud Prod DB Replica.
airbyte-ops cloud db start-proxy
airbyte-ops cloud db start-proxy [ARGS]
Start the Cloud SQL Proxy for database access.
This command starts the Cloud SQL Auth Proxy to enable connections to the Airbyte Cloud Prod DB Replica. The proxy is required for database query tools.
By default, runs as a daemon (background process). Use --no-daemon to run in foreground mode where you can see logs and stop with Ctrl+C.
Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, which should contain the service account JSON credentials.
After starting the proxy, set these environment variables to use database tools: export USE_CLOUD_SQL_PROXY=1 export DB_PORT={port}
Parameters:
PORT, --port: Port for the Cloud SQL Proxy to listen on. [default: 15432]DAEMON, --daemon, --no-daemon: Run as daemon in background (default). Use --no-daemon for foreground. [default: True]
airbyte-ops cloud db stop-proxy
airbyte-ops cloud db stop-proxy
Stop the Cloud SQL Proxy daemon.
This command stops a Cloud SQL Proxy that was started with 'start-proxy'. It reads the PID from the PID file and sends a SIGTERM signal to stop the process.
airbyte-ops cloud connection
Connection operations in Airbyte Cloud.
airbyte-ops cloud connection state
Connection state operations in Airbyte Cloud.
Commands:
get: Get the current state for an Airbyte Cloud connection.reset: Reset a configured stream's state so the next sync full-refreshes it.set: Set the state for an Airbyte Cloud connection.
airbyte-ops cloud connection state get
airbyte-ops cloud connection state get CONNECTION-ID [ARGS]
Get the current state for an Airbyte Cloud connection.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch state for. [required]STREAM, --stream: Optional stream name to filter state for a single stream.NAMESPACE, --namespace: Optional stream namespace to narrow the stream filter.OUTPUT, --output: Path to write the state JSON output to a file.
airbyte-ops cloud connection state set
airbyte-ops cloud connection state set CONNECTION-ID [ARGS]
Set the state for an Airbyte Cloud connection.
State JSON can be provided as a positional argument, via --input
Uses the safe variant that prevents updates while a sync is running. When --stream is provided, only that stream's state is updated within the existing connection state.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]STATE-JSON, --state-json: The connection state as a JSON string. When --stream is used, this is just the stream's state blob (e.g., '{"cursor": "2024-01-01"}'). Otherwise, must include 'stateType', 'connectionId', and the appropriate state field. Can also be provided via --input or STDIN.STREAM, --stream: Optional stream name to update state for a single stream only.NAMESPACE, --namespace: Optional stream namespace to identify the stream.INPUT, --input: Path to a JSON file containing the state to set.
airbyte-ops cloud connection state reset
airbyte-ops cloud connection state reset CONNECTION-ID STREAM [ARGS]
Reset a configured stream's state so the next sync full-refreshes it.
Uses the safe variant that prevents updates while a sync is running.
Returns a restorable previous_state_backup in raw Config API format.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update state for. [required]STREAM, --stream: The configured stream name whose state should be reset. [required]NAMESPACE, --namespace: Optional stream namespace to identify the stream.
airbyte-ops cloud connection catalog
Connection catalog operations in Airbyte Cloud.
Commands:
get: Get the configured catalog for an Airbyte Cloud connection.set: Set the configured catalog for an Airbyte Cloud connection.
airbyte-ops cloud connection catalog get
airbyte-ops cloud connection catalog get CONNECTION-ID [ARGS]
Get the configured catalog for an Airbyte Cloud connection.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to fetch catalog for. [required]OUTPUT, --output: Path to write the catalog JSON output to a file.
airbyte-ops cloud connection catalog set
airbyte-ops cloud connection catalog set CONNECTION-ID [ARGS]
Set the configured catalog for an Airbyte Cloud connection.
Catalog JSON can be provided as a positional argument, via --input
WARNING: This replaces the entire configured catalog.
Parameters:
CONNECTION-ID, --connection-id: The connection ID (UUID) to update catalog for. [required]CATALOG-JSON, --catalog-json: The configured catalog as a JSON string. Can also be provided via --input or STDIN.INPUT, --input: Path to a JSON file containing the catalog to set.
airbyte-ops cloud logs
GCP Cloud Logging operations for Airbyte Cloud.
airbyte-ops cloud logs lookup-cloud-backend-error
airbyte-ops cloud logs lookup-cloud-backend-error ERROR-ID [ARGS]
Look up error details from GCP Cloud Logging by error ID.
When an Airbyte Cloud API returns an error response with only an error ID (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command fetches the full stack trace and error details from GCP Cloud Logging.
Requires GCP credentials with Logs Viewer role on the target project. Set up credentials with: gcloud auth application-default login
Parameters:
ERROR-ID, --error-id: The error ID (UUID) to search for. This is typically returned in API error responses as {'errorId': '...'} [required]LOOKBACK-DAYS, --lookback-days: Number of days to look back in logs. [default: 7]MIN-SEVERITY-FILTER, --min-severity-filter: Optional minimum severity level to filter logs. [choices: debug, info, notice, warning, error, critical, alert, emergency]RAW, --raw, --no-raw: Output raw JSON instead of formatted text. [default: False]
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""CLI commands for Airbyte Cloud operations. 3 4Commands: 5 airbyte-ops cloud connector get-version-info - Get connector version info 6 airbyte-ops cloud connector set-version-override - Set connector version override 7 airbyte-ops cloud connector clear-version-override - Clear connector version override 8 airbyte-ops cloud connector regression-test - Run regression tests (single-version or comparison) 9 airbyte-ops cloud connector fetch-connection-config - Fetch connection config to local file 10 11## CLI reference 12 13The commands below are regenerated by `poe docs-generate` via cyclopts's 14programmatic docs API; see `docs/generate_cli.py`. 15 16.. include:: ../../../docs/generated/cli/cloud.md 17 :start-line: 2 18""" 19 20from __future__ import annotations 21 22# Hide Python-level members from the pdoc page for this module; the rendered 23# docs for this CLI group come entirely from the grafted `.. include::` in 24# the module docstring above. 25__all__: list[str] = [] 26 27import json 28import logging 29import os 30import shutil 31import signal 32import socket 33import subprocess 34import sys 35import tempfile 36import time 37from pathlib import Path 38from typing import Annotated, Literal 39 40import requests 41import yaml 42from airbyte.cloud.workspaces import CloudWorkspace 43from airbyte.exceptions import PyAirbyteInputError 44from airbyte_cdk.models.connector_metadata import MetadataFile 45from airbyte_cdk.utils.connector_paths import find_connector_root_from_name 46from airbyte_cdk.utils.docker import build_connector_image, verify_docker_installation 47from airbyte_protocol.models import ConfiguredAirbyteCatalog 48from cyclopts import Parameter 49 50from airbyte_ops_mcp.cli._base import App, app 51from airbyte_ops_mcp.cli._shared import ( 52 exit_with_error, 53 print_error, 54 print_json, 55 print_success, 56 print_warning, 57) 58from airbyte_ops_mcp.cloud_admin.auth import CloudAuthError 59from airbyte_ops_mcp.cloud_admin.connection_config import fetch_connection_config 60from airbyte_ops_mcp.cloud_admin.connection_state import ( 61 reset_stream_state as reset_cloud_stream_state, 62) 63from airbyte_ops_mcp.cloud_admin.registry_lookup import ( 64 resolve_definition_id_to_canonical_info, 65) 66from airbyte_ops_mcp.cloud_admin.version_overrides import ( 67 ResolvedCloudAuth, 68 VersionOverrideTarget, 69 get_connector_version_info, 70) 71from airbyte_ops_mcp.cloud_admin.version_overrides import ( 72 set_version_override as set_cloud_version_override, 73) 74from airbyte_ops_mcp.constants import ( 75 CLOUD_SQL_INSTANCE, 76 CLOUD_SQL_PROXY_PID_FILE, 77 DEFAULT_CLOUD_SQL_PROXY_PORT, 78 ENV_GCP_PROD_DB_ACCESS_CREDENTIALS, 79) 80from airbyte_ops_mcp.gcp_logs import GCPSeverity, fetch_error_logs 81from airbyte_ops_mcp.regression_tests.cdk_secrets import get_first_config_from_secrets 82from airbyte_ops_mcp.regression_tests.ci_output import ( 83 generate_regression_report, 84 generate_single_version_report, 85 write_github_output, 86 write_github_outputs, 87 write_github_summary, 88 write_json_output, 89 write_test_summary, 90) 91from airbyte_ops_mcp.regression_tests.config_overrides import ( 92 filter_configured_catalog_file, 93) 94from airbyte_ops_mcp.regression_tests.connection_fetcher import ( 95 fetch_connection_data, 96 fetch_connection_state, 97 save_connection_data_to_files, 98) 99from airbyte_ops_mcp.regression_tests.connection_secret_retriever import ( 100 SecretRetrievalError, 101 enrich_config_with_secrets, 102 should_use_secret_retriever, 103) 104from airbyte_ops_mcp.regression_tests.connector_runner import ( 105 ConnectorRunner, 106 ensure_image_available, 107) 108from airbyte_ops_mcp.regression_tests.http_metrics import ( 109 MitmproxyManager, 110 parse_http_dump, 111) 112from airbyte_ops_mcp.regression_tests.models import ( 113 Command, 114 ConnectorUnderTest, 115 ExecutionInputs, 116 TargetOrControl, 117) 118from airbyte_ops_mcp.telemetry import track_regression_test 119from airbyte_ops_mcp.tier_cache import resolve_workspace 120 121# Path to connectors directory within the airbyte repo 122CONNECTORS_SUBDIR = Path("airbyte-integrations") / "connectors" 123 124# Create the cloud sub-app 125cloud_app = App(name="cloud", help="Airbyte Cloud operations.") 126app.command(cloud_app) 127 128# Create the connector sub-app under cloud 129connector_app = App( 130 name="connector", help="Deployed connector operations in Airbyte Cloud." 131) 132cloud_app.command(connector_app) 133 134# Create the db sub-app under cloud 135db_app = App(name="db", help="Database operations for Airbyte Cloud Prod DB Replica.") 136cloud_app.command(db_app) 137 138# Create the connection sub-app under cloud 139connection_app = App(name="connection", help="Connection operations in Airbyte Cloud.") 140cloud_app.command(connection_app) 141 142# Create the state sub-app under connection 143state_app = App(name="state", help="Connection state operations in Airbyte Cloud.") 144connection_app.command(state_app) 145 146# Create the catalog sub-app under connection 147catalog_app = App( 148 name="catalog", help="Connection catalog operations in Airbyte Cloud." 149) 150connection_app.command(catalog_app) 151 152# Create the logs sub-app under cloud 153logs_app = App(name="logs", help="GCP Cloud Logging operations for Airbyte Cloud.") 154cloud_app.command(logs_app) 155 156 157@db_app.command(name="start-proxy") 158def start_proxy( 159 port: Annotated[ 160 int, 161 Parameter(help="Port for the Cloud SQL Proxy to listen on."), 162 ] = DEFAULT_CLOUD_SQL_PROXY_PORT, 163 daemon: Annotated[ 164 bool, 165 Parameter( 166 help="Run as daemon in background (default). Use --no-daemon for foreground." 167 ), 168 ] = True, 169) -> None: 170 """Start the Cloud SQL Proxy for database access. 171 172 This command starts the Cloud SQL Auth Proxy to enable connections to the 173 Airbyte Cloud Prod DB Replica. The proxy is required for database query tools. 174 175 By default, runs as a daemon (background process). Use --no-daemon to run in 176 foreground mode where you can see logs and stop with Ctrl+C. 177 178 Credentials are read from the GCP_PROD_DB_ACCESS_CREDENTIALS environment variable, 179 which should contain the service account JSON credentials. 180 181 After starting the proxy, set these environment variables to use database tools: 182 export USE_CLOUD_SQL_PROXY=1 183 export DB_PORT={port} 184 185 Example: 186 airbyte-ops cloud db start-proxy 187 airbyte-ops cloud db start-proxy --port 15432 188 airbyte-ops cloud db start-proxy --no-daemon 189 """ 190 # Check if proxy is already running on the requested port (idempotency) 191 try: 192 with socket.create_connection(("127.0.0.1", port), timeout=0.5): 193 # Something is already listening on this port 194 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 195 pid_info = "" 196 if pid_file.exists(): 197 pid_info = f" (PID: {pid_file.read_text().strip()})" 198 print_success( 199 f"Cloud SQL Proxy is already running on port {port}{pid_info}" 200 ) 201 print_success("") 202 print_success("To use database tools, set these environment variables:") 203 print_success(" export USE_CLOUD_SQL_PROXY=1") 204 print_success(f" export DB_PORT={port}") 205 return 206 except (OSError, TimeoutError, ConnectionRefusedError): 207 pass # Port not in use, proceed with starting proxy 208 209 # Check if cloud-sql-proxy is installed 210 proxy_path = shutil.which("cloud-sql-proxy") 211 if not proxy_path: 212 exit_with_error( 213 "cloud-sql-proxy not found in PATH. " 214 "Install it from: https://cloud.google.com/sql/docs/mysql/sql-proxy" 215 ) 216 217 # Get credentials from environment 218 creds_json = os.getenv(ENV_GCP_PROD_DB_ACCESS_CREDENTIALS) 219 if not creds_json: 220 exit_with_error( 221 f"{ENV_GCP_PROD_DB_ACCESS_CREDENTIALS} environment variable is not set. " 222 "This should contain the GCP service account JSON credentials." 223 ) 224 225 # Build the command using --json-credentials to avoid writing to disk 226 cmd = [ 227 proxy_path, 228 CLOUD_SQL_INSTANCE, 229 f"--port={port}", 230 f"--json-credentials={creds_json}", 231 ] 232 233 print_success(f"Starting Cloud SQL Proxy on port {port}...") 234 print_success(f"Instance: {CLOUD_SQL_INSTANCE}") 235 print_success("") 236 print_success("To use database tools, set these environment variables:") 237 print_success(" export USE_CLOUD_SQL_PROXY=1") 238 print_success(f" export DB_PORT={port}") 239 print_success("") 240 241 if daemon: 242 # Run in background (daemon mode) with log file for diagnostics 243 log_file_path = Path("/tmp/airbyte-cloud-sql-proxy.log") 244 log_file = log_file_path.open("ab") 245 process = subprocess.Popen( 246 cmd, 247 stdout=subprocess.DEVNULL, 248 stderr=log_file, 249 start_new_session=True, 250 ) 251 252 # Brief wait to verify the process started successfully 253 time.sleep(0.5) 254 if process.poll() is not None: 255 # Process exited immediately - read any error output 256 log_file.close() 257 error_output = "" 258 if log_file_path.exists(): 259 error_output = log_file_path.read_text()[-1000:] # Last 1000 chars 260 exit_with_error( 261 f"Cloud SQL Proxy failed to start (exit code: {process.returncode}).\n" 262 f"Check logs at {log_file_path}\n" 263 f"Recent output: {error_output}" 264 ) 265 266 # Write PID to file for stop-proxy command 267 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 268 pid_file.write_text(str(process.pid)) 269 print_success(f"Cloud SQL Proxy started as daemon (PID: {process.pid})") 270 print_success(f"Logs: {log_file_path}") 271 print_success("To stop: airbyte-ops cloud db stop-proxy") 272 else: 273 # Run in foreground - replace current process 274 # Signals (Ctrl+C) will be handled directly by the cloud-sql-proxy process 275 print_success("Running in foreground. Press Ctrl+C to stop the proxy.") 276 print_success("") 277 os.execv(proxy_path, cmd) 278 279 280@db_app.command(name="stop-proxy") 281def stop_proxy() -> None: 282 """Stop the Cloud SQL Proxy daemon. 283 284 This command stops a Cloud SQL Proxy that was started with 'start-proxy'. 285 It reads the PID from the PID file and sends a SIGTERM signal to stop the process. 286 287 Example: 288 airbyte-ops cloud db stop-proxy 289 """ 290 pid_file = Path(CLOUD_SQL_PROXY_PID_FILE) 291 292 if not pid_file.exists(): 293 exit_with_error( 294 f"PID file not found at {CLOUD_SQL_PROXY_PID_FILE}. " 295 "No Cloud SQL Proxy daemon appears to be running." 296 ) 297 298 pid_str = pid_file.read_text().strip() 299 if not pid_str.isdigit(): 300 pid_file.unlink() 301 exit_with_error(f"Invalid PID in {CLOUD_SQL_PROXY_PID_FILE}: {pid_str}") 302 303 pid = int(pid_str) 304 305 # Check if process is still running 306 try: 307 os.kill(pid, 0) # Signal 0 just checks if process exists 308 except ProcessLookupError: 309 pid_file.unlink() 310 print_success( 311 f"Cloud SQL Proxy (PID: {pid}) is not running. Cleaned up PID file." 312 ) 313 return 314 except PermissionError: 315 exit_with_error(f"Permission denied to check process {pid}.") 316 317 # Send SIGTERM to stop the process 318 try: 319 os.kill(pid, signal.SIGTERM) 320 print_success(f"Sent SIGTERM to Cloud SQL Proxy (PID: {pid}).") 321 except ProcessLookupError: 322 print_success(f"Cloud SQL Proxy (PID: {pid}) already stopped.") 323 except PermissionError: 324 exit_with_error(f"Permission denied to stop process {pid}.") 325 326 # Clean up PID file 327 pid_file.unlink(missing_ok=True) 328 print_success("Cloud SQL Proxy stopped.") 329 330 331@connector_app.command(name="get-version-info") 332def get_version_info( 333 workspace_id: Annotated[ 334 str, 335 Parameter(help="The Airbyte Cloud workspace ID."), 336 ], 337 connector_id: Annotated[ 338 str, 339 Parameter(help="The ID of the deployed connector (source or destination)."), 340 ], 341 connector_type: Annotated[ 342 Literal["source", "destination"], 343 Parameter(help="The type of connector."), 344 ], 345) -> None: 346 """Get the current version information for a deployed connector.""" 347 result = get_connector_version_info( 348 auth=_resolve_cli_cloud_auth(), 349 workspace_id=workspace_id, 350 actor_id=connector_id, 351 actor_type=connector_type, 352 ) 353 print_json(result.model_dump()) 354 355 356_OverrideLevel = Literal["actor", "workspace", "organization"] 357_TierFilter = Literal["TIER_0", "TIER_1", "TIER_2", "ALL"] 358 359 360def _resolve_cli_cloud_auth() -> ResolvedCloudAuth: 361 """Resolve Airbyte Cloud auth from environment variables for CLI use. 362 363 Mirrors the priority order the MCP server uses: 364 365 1. `AIRBYTE_CLOUD_BEARER_TOKEN` 366 2. `AIRBYTE_CLOUD_CLIENT_ID` + `AIRBYTE_CLOUD_CLIENT_SECRET` 367 368 Raises a CLI error via `exit_with_error` if no usable credentials are present. 369 """ 370 bearer_token = os.environ.get("AIRBYTE_CLOUD_BEARER_TOKEN") 371 if bearer_token: 372 return ResolvedCloudAuth(bearer_token=bearer_token) 373 374 client_id = os.environ.get("AIRBYTE_CLOUD_CLIENT_ID") 375 client_secret = os.environ.get("AIRBYTE_CLOUD_CLIENT_SECRET") 376 if client_id and client_secret: 377 return ResolvedCloudAuth(client_id=client_id, client_secret=client_secret) 378 379 exit_with_error( 380 "Missing Airbyte Cloud credentials. Set AIRBYTE_CLOUD_BEARER_TOKEN, or " 381 "both AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET." 382 ) 383 384 385def _validate_override_scope_args( 386 *, 387 override_level: _OverrideLevel, 388 workspace_id: str | None, 389 organization_id: str | None, 390 connector_id: str | None, 391 connector_type: Literal["source", "destination"] | None, 392 actor_definition_id: str | None, 393) -> None: 394 """Validate scope-specific arguments for `set/clear-version-override`. 395 396 Calls `exit_with_error` (which terminates the process) if the supplied 397 arguments are inconsistent with the requested `override_level`. 398 """ 399 if override_level == "actor": 400 missing = [ 401 name 402 for name, value in ( 403 ("--workspace-id", workspace_id), 404 ("--connector-id", connector_id), 405 ("--connector-type", connector_type), 406 ) 407 if not value 408 ] 409 if missing: 410 exit_with_error( 411 "Override level 'actor' requires " + ", ".join(missing) + "." 412 ) 413 if actor_definition_id is not None: 414 exit_with_error( 415 "Override level 'actor' must not be combined with " 416 "--actor-definition-id." 417 ) 418 if organization_id is not None: 419 exit_with_error( 420 "Override level 'actor' must not be combined with --organization-id." 421 ) 422 return 423 424 if override_level == "workspace": 425 missing = [ 426 name 427 for name, value in ( 428 ("--workspace-id", workspace_id), 429 ("--actor-definition-id", actor_definition_id), 430 ) 431 if not value 432 ] 433 if missing: 434 exit_with_error( 435 "Override level 'workspace' requires " + ", ".join(missing) + "." 436 ) 437 if connector_id is not None or connector_type is not None: 438 exit_with_error( 439 "Override level 'workspace' must not be combined with " 440 "--connector-id or --connector-type." 441 ) 442 if organization_id is not None: 443 exit_with_error( 444 "Override level 'workspace' must not be combined with " 445 "--organization-id." 446 ) 447 return 448 449 # override_level == "organization" 450 missing = [ 451 name 452 for name, value in ( 453 ("--organization-id", organization_id), 454 ("--actor-definition-id", actor_definition_id), 455 ) 456 if not value 457 ] 458 if missing: 459 exit_with_error( 460 "Override level 'organization' requires " + ", ".join(missing) + "." 461 ) 462 if connector_id is not None or connector_type is not None: 463 exit_with_error( 464 "Override level 'organization' must not be combined with " 465 "--connector-id or --connector-type." 466 ) 467 if workspace_id is not None: 468 exit_with_error( 469 "Override level 'organization' must not be combined with --workspace-id." 470 ) 471 472 473def _dispatch_version_override( 474 *, 475 override_level: _OverrideLevel, 476 workspace_id: str | None, 477 organization_id: str | None, 478 connector_id: str | None, 479 connector_type: Literal["source", "destination"] | None, 480 actor_definition_id: str | None, 481 version: str | None, 482 unset: bool, 483 reason: str | None, 484 reason_url: str | None, 485 issue_url: str, 486 approval_comment_url: str, 487 ai_agent_session_url: str | None, 488 customer_tier_filter: _TierFilter, 489) -> None: 490 """Route a version-override request through the normalized core helper. 491 492 Resolves `--actor-definition-id` to its canonical name and connector type 493 via the cloud registry for `workspace`/`organization` scopes. Prints the 494 operation result as JSON. 495 """ 496 _validate_override_scope_args( 497 override_level=override_level, 498 workspace_id=workspace_id, 499 organization_id=organization_id, 500 connector_id=connector_id, 501 connector_type=connector_type, 502 actor_definition_id=actor_definition_id, 503 ) 504 505 auth = _resolve_cli_cloud_auth() 506 507 if override_level == "actor": 508 assert workspace_id is not None 509 assert connector_id is not None 510 assert connector_type is not None 511 assert organization_id is None 512 try: 513 ws_resolution = resolve_workspace(workspace_id) 514 if not ws_resolution.organization_id: 515 exit_with_error("Could not resolve organization for workspace.") 516 result = set_cloud_version_override( 517 auth=auth, 518 target=VersionOverrideTarget( 519 scope="actor", 520 organization_id=ws_resolution.organization_id, 521 workspace_id=workspace_id, 522 actor_id=connector_id, 523 connector_type=connector_type, 524 ), 525 approval_comment_url=approval_comment_url, 526 version=version, 527 unset=unset, 528 override_reason=reason, 529 override_reason_reference_url=reason_url, 530 issue_url=issue_url, 531 ai_agent_session_url=ai_agent_session_url, 532 customer_tier_filter=customer_tier_filter, 533 ) 534 except (PyAirbyteInputError, CloudAuthError) as e: 535 exit_with_error(str(e)) 536 _print_override_result(result.success, result.message) 537 print_json(result.model_dump()) 538 return 539 540 try: 541 assert actor_definition_id is not None 542 canonical_name, resolved_connector_type = ( 543 resolve_definition_id_to_canonical_info(actor_definition_id) 544 ) 545 except (PyAirbyteInputError, requests.RequestException) as e: 546 exit_with_error(f"Failed to resolve --actor-definition-id: {e}") 547 typed_connector_type: Literal["source", "destination"] = ( 548 "source" if resolved_connector_type == "source" else "destination" 549 ) 550 551 if override_level == "workspace": 552 assert workspace_id is not None 553 try: 554 ws_resolution = resolve_workspace(workspace_id) 555 if not ws_resolution.organization_id: 556 exit_with_error("Could not resolve organization for workspace.") 557 target = VersionOverrideTarget( 558 scope="workspace", 559 organization_id=ws_resolution.organization_id, 560 workspace_id=workspace_id, 561 connector_name=canonical_name, 562 connector_type=typed_connector_type, 563 ) 564 except PyAirbyteInputError as e: 565 exit_with_error(str(e)) 566 else: 567 assert organization_id is not None 568 target = VersionOverrideTarget( 569 scope="organization", 570 organization_id=organization_id, 571 connector_name=canonical_name, 572 connector_type=typed_connector_type, 573 ) 574 575 try: 576 result = set_cloud_version_override( 577 auth=auth, 578 target=target, 579 approval_comment_url=approval_comment_url, 580 version=version, 581 unset=unset, 582 override_reason=reason, 583 override_reason_reference_url=reason_url, 584 issue_url=issue_url, 585 ai_agent_session_url=ai_agent_session_url, 586 customer_tier_filter=customer_tier_filter, 587 ) 588 except (PyAirbyteInputError, CloudAuthError) as e: 589 exit_with_error(str(e)) 590 _print_override_result(result.success, result.message) 591 print_json(result.model_dump()) 592 593 594def _print_override_result(success: bool, message: str) -> None: 595 """Emit an override-operation status message via the shared CLI helpers.""" 596 if success: 597 print_success(message) 598 else: 599 print_error(message) 600 601 602@connector_app.command(name="set-version-override") 603def set_version_override( 604 version: Annotated[ 605 str, 606 Parameter( 607 help="The semver version string to pin to (e.g., '2.1.5-preview.abc1234')." 608 ), 609 ], 610 reason: Annotated[ 611 str, 612 Parameter(help="Explanation for the override (min 10 characters)."), 613 ], 614 issue_url: Annotated[ 615 str, 616 Parameter(help="GitHub issue URL providing context for this operation."), 617 ], 618 approval_comment_url: Annotated[ 619 str, 620 Parameter( 621 help="Slack approval record URL where admin authorized this deployment." 622 ), 623 ], 624 override_level: Annotated[ 625 _OverrideLevel, 626 Parameter( 627 help=( 628 "Scope at which to apply the version override: 'actor' (single " 629 "deployed connector instance), 'workspace' (all instances of a " 630 "connector type within a workspace), or 'organization' (all " 631 "instances across an organization). Defaults to 'actor'." 632 ), 633 ), 634 ] = "actor", 635 workspace_id: Annotated[ 636 str | None, 637 Parameter( 638 help=( 639 "The Airbyte Cloud workspace ID. Required for 'actor' and " 640 "'workspace' override levels." 641 ) 642 ), 643 ] = None, 644 organization_id: Annotated[ 645 str | None, 646 Parameter( 647 help=( 648 "The Airbyte Cloud organization ID. Required for " 649 "'organization' override level." 650 ) 651 ), 652 ] = None, 653 connector_id: Annotated[ 654 str | None, 655 Parameter( 656 help=( 657 "The ID of the deployed connector (source or destination). " 658 "Required for 'actor' override level." 659 ) 660 ), 661 ] = None, 662 connector_type: Annotated[ 663 Literal["source", "destination"] | None, 664 Parameter(help="The type of connector. Required for 'actor' override level."), 665 ] = None, 666 actor_definition_id: Annotated[ 667 str | None, 668 Parameter( 669 help=( 670 "The connector definition UUID. Required for 'workspace' and " 671 "'organization' override levels." 672 ) 673 ), 674 ] = None, 675 ai_agent_session_url: Annotated[ 676 str | None, 677 Parameter( 678 help="URL to AI agent session driving this operation (for auditability)." 679 ), 680 ] = None, 681 reason_url: Annotated[ 682 str | None, 683 Parameter(help="Optional URL with more context (e.g., issue link)."), 684 ] = None, 685 customer_tier_filter: Annotated[ 686 _TierFilter, 687 Parameter( 688 help=( 689 "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 690 "The operation will be rejected if the actual customer tier does not match. " 691 "Defaults to 'TIER_2' (non-sensitive customers)." 692 ), 693 ), 694 ] = "TIER_2", 695) -> None: 696 """Set a version override for a deployed connector. 697 698 Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and 699 `AIRBYTE_INTERNAL_ADMIN_USER` environment variables. 700 701 The `--override-level` flag selects the scope at which the pin is applied: 702 703 - `actor` (default): pins a single deployed connector instance. Requires 704 `--workspace-id`, `--connector-id`, and `--connector-type`. 705 - `workspace`: pins ALL instances of a connector type within a workspace. 706 Requires `--workspace-id` and `--actor-definition-id`. 707 - `organization`: pins ALL instances of a connector type across an 708 organization. Requires `--organization-id` and `--actor-definition-id`. 709 710 The `customer_tier_filter` gates the operation: the call fails if the 711 actual tier of the target organization does not match. Use `ALL` to 712 proceed regardless of tier (a warning is shown for sensitive tiers). 713 """ 714 _dispatch_version_override( 715 override_level=override_level, 716 workspace_id=workspace_id, 717 organization_id=organization_id, 718 connector_id=connector_id, 719 connector_type=connector_type, 720 actor_definition_id=actor_definition_id, 721 version=version, 722 unset=False, 723 reason=reason, 724 reason_url=reason_url, 725 issue_url=issue_url, 726 approval_comment_url=approval_comment_url, 727 ai_agent_session_url=ai_agent_session_url, 728 customer_tier_filter=customer_tier_filter, 729 ) 730 731 732@connector_app.command(name="clear-version-override") 733def clear_version_override( 734 issue_url: Annotated[ 735 str, 736 Parameter(help="GitHub issue URL providing context for this operation."), 737 ], 738 approval_comment_url: Annotated[ 739 str, 740 Parameter( 741 help="Slack approval record URL where admin authorized this deployment." 742 ), 743 ], 744 override_level: Annotated[ 745 _OverrideLevel, 746 Parameter( 747 help=( 748 "Scope at which to clear the version override: 'actor', " 749 "'workspace', or 'organization'. Defaults to 'actor'." 750 ), 751 ), 752 ] = "actor", 753 workspace_id: Annotated[ 754 str | None, 755 Parameter( 756 help=( 757 "The Airbyte Cloud workspace ID. Required for 'actor' and " 758 "'workspace' override levels." 759 ) 760 ), 761 ] = None, 762 organization_id: Annotated[ 763 str | None, 764 Parameter( 765 help=( 766 "The Airbyte Cloud organization ID. Required for " 767 "'organization' override level." 768 ) 769 ), 770 ] = None, 771 connector_id: Annotated[ 772 str | None, 773 Parameter( 774 help=( 775 "The ID of the deployed connector (source or destination). " 776 "Required for 'actor' override level." 777 ) 778 ), 779 ] = None, 780 connector_type: Annotated[ 781 Literal["source", "destination"] | None, 782 Parameter(help="The type of connector. Required for 'actor' override level."), 783 ] = None, 784 actor_definition_id: Annotated[ 785 str | None, 786 Parameter( 787 help=( 788 "The connector definition UUID. Required for 'workspace' and " 789 "'organization' override levels." 790 ) 791 ), 792 ] = None, 793 ai_agent_session_url: Annotated[ 794 str | None, 795 Parameter( 796 help="URL to AI agent session driving this operation (for auditability)." 797 ), 798 ] = None, 799 customer_tier_filter: Annotated[ 800 _TierFilter, 801 Parameter( 802 help=( 803 "Tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 804 "The operation will be rejected if the actual customer tier does not match. " 805 "Defaults to 'TIER_2' (non-sensitive customers)." 806 ), 807 ), 808 ] = "TIER_2", 809) -> None: 810 """Clear a version override from a deployed connector. 811 812 Requires admin authentication via `AIRBYTE_INTERNAL_ADMIN_FLAG` and 813 `AIRBYTE_INTERNAL_ADMIN_USER` environment variables. 814 815 The `--override-level` flag selects the scope at which the pin is removed: 816 817 - `actor` (default): clears a single deployed connector instance pin. 818 Requires `--workspace-id`, `--connector-id`, and `--connector-type`. 819 - `workspace`: clears the workspace-level pin for a connector type. 820 Requires `--workspace-id` and `--actor-definition-id`. 821 - `organization`: clears the organization-level pin for a connector type. 822 Requires `--organization-id` and `--actor-definition-id`. 823 """ 824 _dispatch_version_override( 825 override_level=override_level, 826 workspace_id=workspace_id, 827 organization_id=organization_id, 828 connector_id=connector_id, 829 connector_type=connector_type, 830 actor_definition_id=actor_definition_id, 831 version=None, 832 unset=True, 833 reason=None, 834 reason_url=None, 835 issue_url=issue_url, 836 approval_comment_url=approval_comment_url, 837 ai_agent_session_url=ai_agent_session_url, 838 customer_tier_filter=customer_tier_filter, 839 ) 840 841 842def _load_json_file(file_path: Path) -> dict | None: 843 """Load a JSON file and return its contents. 844 845 Returns None if the file doesn't exist or contains invalid JSON. 846 """ 847 if not file_path.exists(): 848 return None 849 try: 850 return json.loads(file_path.read_text()) 851 except json.JSONDecodeError as e: 852 print_error(f"Failed to parse JSON in file: {file_path}\nError: {e}") 853 return None 854 855 856def _run_connector_command( 857 connector_image: str, 858 command: Command, 859 output_dir: Path, 860 target_or_control: TargetOrControl, 861 config_path: Path | None = None, 862 catalog_path: Path | None = None, 863 state_path: Path | None = None, 864 proxy_url: str | None = None, 865 enable_debug_logs: bool = False, 866) -> dict: 867 """Run a connector command and return results as a dict. 868 869 Args: 870 connector_image: Full connector image name with tag. 871 command: The Airbyte command to run. 872 output_dir: Directory to store output files. 873 target_or_control: Whether this is target or control version. 874 config_path: Path to connector config JSON file. 875 catalog_path: Path to configured catalog JSON file. 876 state_path: Path to state JSON file. 877 proxy_url: Optional HTTP proxy URL for traffic capture. 878 enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container. 879 880 Returns: 881 Dictionary with execution results. 882 """ 883 connector = ConnectorUnderTest.from_image_name(connector_image, target_or_control) 884 885 config = _load_json_file(config_path) if config_path else None 886 state = _load_json_file(state_path) if state_path else None 887 888 configured_catalog = None 889 if catalog_path and catalog_path.exists(): 890 catalog_json = catalog_path.read_text() 891 configured_catalog = ConfiguredAirbyteCatalog.parse_raw(catalog_json) 892 893 # Pass log level as an environment variable to the connector container 894 container_env: dict[str, str] = {} 895 if enable_debug_logs: 896 container_env["LOG_LEVEL"] = "DEBUG" 897 898 execution_inputs = ExecutionInputs( 899 connector_under_test=connector, 900 command=command, 901 output_dir=output_dir, 902 config=config, 903 configured_catalog=configured_catalog, 904 state=state, 905 environment_variables=container_env or None, 906 ) 907 908 runner = ConnectorRunner(execution_inputs, proxy_url=proxy_url) 909 result = runner.run() 910 911 result.save_artifacts(output_dir) 912 913 return { 914 "connector": connector_image, 915 "command": command.value, 916 "success": result.success, 917 "exit_code": result.exit_code, 918 "stdout_file": str(result.stdout_file_path), 919 "stderr_file": str(result.stderr_file_path), 920 "message_counts": { 921 k.value: v for k, v in result.get_message_count_per_type().items() 922 }, 923 "record_counts_per_stream": result.get_record_count_per_stream(), 924 } 925 926 927def _build_connector_image_from_source( 928 connector_name: str, 929 repo_root: Path | None = None, 930 tag: str = "dev", 931) -> str | None: 932 """Build a connector image from source code. 933 934 Args: 935 connector_name: Name of the connector (e.g., 'source-pokeapi'). 936 repo_root: Optional path to the airbyte repo root. If not provided, 937 will attempt to auto-detect from current directory. 938 tag: Tag to apply to the built image (default: 'dev'). 939 940 Returns: 941 The full image name with tag if successful, None if build fails. 942 """ 943 if not verify_docker_installation(): 944 print_error("Docker is not installed or not running") 945 return None 946 947 try: 948 connector_directory = find_connector_root_from_name(connector_name) 949 except FileNotFoundError: 950 if repo_root: 951 connector_directory = repo_root / CONNECTORS_SUBDIR / connector_name 952 if not connector_directory.exists(): 953 print_error(f"Connector directory not found: {connector_directory}") 954 return None 955 else: 956 print_error( 957 f"Could not find connector '{connector_name}'. " 958 "Try providing --repo-root to specify the airbyte repo location." 959 ) 960 return None 961 962 metadata_file_path = connector_directory / "metadata.yaml" 963 if not metadata_file_path.exists(): 964 print_error(f"metadata.yaml not found at {metadata_file_path}") 965 return None 966 967 metadata = MetadataFile.from_file(metadata_file_path) 968 print_success(f"Building image for connector: {connector_name}") 969 970 built_image = build_connector_image( 971 connector_name=connector_name, 972 connector_directory=connector_directory, 973 metadata=metadata, 974 tag=tag, 975 no_verify=False, 976 ) 977 print_success(f"Successfully built image: {built_image}") 978 return built_image 979 980 981def _fetch_control_image_from_metadata(connector_name: str) -> str | None: 982 """Fetch the current released connector image from metadata.yaml on main branch. 983 984 This fetches the connector's metadata.yaml from the airbyte monorepo's master branch 985 and extracts the dockerRepository and dockerImageTag to construct the control image. 986 987 Args: 988 connector_name: The connector name (e.g., 'source-github'). 989 990 Returns: 991 The full connector image with tag (e.g., 'airbyte/source-github:1.0.0'), 992 or None if the metadata could not be fetched or parsed. 993 """ 994 metadata_url = ( 995 f"https://raw.githubusercontent.com/airbytehq/airbyte/master/" 996 f"airbyte-integrations/connectors/{connector_name}/metadata.yaml" 997 ) 998 response = requests.get(metadata_url, timeout=30) 999 if not response.ok: 1000 print_error( 1001 f"Failed to fetch metadata for {connector_name}: " 1002 f"HTTP {response.status_code} from {metadata_url}" 1003 ) 1004 return None 1005 1006 metadata = yaml.safe_load(response.text) 1007 if not isinstance(metadata, dict): 1008 print_error(f"Invalid metadata format for {connector_name}: expected dict") 1009 return None 1010 1011 data = metadata.get("data", {}) 1012 docker_repository = data.get("dockerRepository") 1013 docker_image_tag = data.get("dockerImageTag") 1014 1015 if not docker_repository or not docker_image_tag: 1016 print_error( 1017 f"Could not find dockerRepository/dockerImageTag in metadata for {connector_name}" 1018 ) 1019 return None 1020 1021 return f"{docker_repository}:{docker_image_tag}" 1022 1023 1024def _run_with_optional_http_metrics( 1025 connector_image: str, 1026 command: Command, 1027 output_dir: Path, 1028 target_or_control: TargetOrControl, 1029 enable_http_metrics: bool, 1030 config_path: Path | None, 1031 catalog_path: Path | None, 1032 state_path: Path | None, 1033 enable_debug_logs: bool = False, 1034) -> dict: 1035 """Run a connector command with optional HTTP metrics capture. 1036 1037 When enable_http_metrics is True, starts mitmproxy to capture HTTP traffic. 1038 If mitmproxy fails to start, falls back to running without metrics. 1039 1040 Args: 1041 connector_image: Full connector image name with tag. 1042 command: The Airbyte command to run. 1043 output_dir: Directory to store output files. 1044 target_or_control: Whether this is target or control version. 1045 enable_http_metrics: Whether to capture HTTP metrics via mitmproxy. 1046 config_path: Path to connector config JSON file. 1047 catalog_path: Path to configured catalog JSON file. 1048 state_path: Path to state JSON file. 1049 enable_debug_logs: If `True`, pass `LOG_LEVEL=DEBUG` to the connector container. 1050 1051 Returns: 1052 Dictionary with execution results, optionally including http_metrics. 1053 """ 1054 if not enable_http_metrics: 1055 return _run_connector_command( 1056 connector_image=connector_image, 1057 command=command, 1058 output_dir=output_dir, 1059 target_or_control=target_or_control, 1060 config_path=config_path, 1061 catalog_path=catalog_path, 1062 state_path=state_path, 1063 enable_debug_logs=enable_debug_logs, 1064 ) 1065 1066 with MitmproxyManager.start(output_dir) as session: 1067 if session is None: 1068 print_error("Mitmproxy unavailable, running without HTTP metrics") 1069 return _run_connector_command( 1070 connector_image=connector_image, 1071 command=command, 1072 output_dir=output_dir, 1073 target_or_control=target_or_control, 1074 config_path=config_path, 1075 catalog_path=catalog_path, 1076 state_path=state_path, 1077 enable_debug_logs=enable_debug_logs, 1078 ) 1079 1080 print_success(f"Started mitmproxy on {session.proxy_url}") 1081 result = _run_connector_command( 1082 connector_image=connector_image, 1083 command=command, 1084 output_dir=output_dir, 1085 target_or_control=target_or_control, 1086 config_path=config_path, 1087 catalog_path=catalog_path, 1088 state_path=state_path, 1089 proxy_url=session.proxy_url, 1090 enable_debug_logs=enable_debug_logs, 1091 ) 1092 1093 http_metrics = parse_http_dump(session.dump_file_path) 1094 result["http_metrics"] = { 1095 "flow_count": http_metrics.flow_count, 1096 "duplicate_flow_count": http_metrics.duplicate_flow_count, 1097 } 1098 print_success( 1099 f"Captured {http_metrics.flow_count} HTTP flows " 1100 f"({http_metrics.duplicate_flow_count} duplicates)" 1101 ) 1102 return result 1103 1104 1105@connector_app.command(name="regression-test") 1106def regression_test( 1107 skip_compare: Annotated[ 1108 bool, 1109 Parameter( 1110 help="If True, skip comparison and run single-version tests only. " 1111 "If False (default), run comparison tests (target vs control)." 1112 ), 1113 ] = False, 1114 test_image: Annotated[ 1115 str | None, 1116 Parameter( 1117 help="Test connector image with tag (e.g., airbyte/source-github:1.0.0). " 1118 "This is the image under test - in comparison mode, it's compared against control_image." 1119 ), 1120 ] = None, 1121 control_image: Annotated[ 1122 str | None, 1123 Parameter( 1124 help="Control connector image (baseline version) with tag (e.g., airbyte/source-github:1.0.0). " 1125 "Ignored if `skip_compare=True`." 1126 ), 1127 ] = None, 1128 connector_name: Annotated[ 1129 str | None, 1130 Parameter( 1131 help="Connector name to build image from source (e.g., 'source-pokeapi'). " 1132 "If provided, builds the image locally with tag 'dev'. " 1133 "For comparison tests (default), this builds the target image. " 1134 "For single-version tests (skip_compare=True), this builds the test image." 1135 ), 1136 ] = None, 1137 repo_root: Annotated[ 1138 str | None, 1139 Parameter( 1140 help="Path to the airbyte repo root. Required if connector_name is provided " 1141 "and the repo cannot be auto-detected." 1142 ), 1143 ] = None, 1144 command: Annotated[ 1145 Literal["spec", "check", "discover", "read"], 1146 Parameter(help="The Airbyte command to run."), 1147 ] = "check", 1148 connection_id: Annotated[ 1149 str | None, 1150 Parameter( 1151 help="Airbyte Cloud connection ID to fetch config/catalog from. " 1152 "Mutually exclusive with config-path/catalog-path. " 1153 "If provided, test_image/control_image can be auto-detected." 1154 ), 1155 ] = None, 1156 config_path: Annotated[ 1157 str | None, 1158 Parameter(help="Path to the connector config JSON file."), 1159 ] = None, 1160 catalog_path: Annotated[ 1161 str | None, 1162 Parameter(help="Path to the configured catalog JSON file (required for read)."), 1163 ] = None, 1164 state_path: Annotated[ 1165 str | None, 1166 Parameter(help="Path to the state JSON file (optional for read)."), 1167 ] = None, 1168 output_dir: Annotated[ 1169 str, 1170 Parameter(help="Directory to store test artifacts."), 1171 ] = "/tmp/regression_test_artifacts", 1172 enable_http_metrics: Annotated[ 1173 bool, 1174 Parameter( 1175 help="Capture HTTP traffic metrics via mitmproxy (experimental). " 1176 "Requires mitmdump to be installed. Only used in comparison mode." 1177 ), 1178 ] = False, 1179 selected_streams: Annotated[ 1180 str | None, 1181 Parameter( 1182 help="Comma-separated list of stream names to include in the read. " 1183 "Only these streams will be included in the configured catalog. " 1184 "This is useful to limit data volume by testing only specific streams." 1185 ), 1186 ] = None, 1187 enable_debug_logs: Annotated[ 1188 bool, 1189 Parameter( 1190 help="Enable debug-level logging for regression test output. " 1191 "Also passed as `LOG_LEVEL=DEBUG` to the connector Docker container." 1192 ), 1193 ] = False, 1194 with_state: Annotated[ 1195 bool | None, 1196 Parameter( 1197 negative="--no-state", 1198 help="Fetch and pass the connection's current state to the read command, " 1199 "producing a warm read instead of a cold read. Defaults to `True` when " 1200 "`--connection-id` is provided, `False` otherwise. Has no effect unless " 1201 "the command is `read`. Ignored when `--state-path` is explicitly provided.", 1202 ), 1203 ] = None, 1204) -> None: 1205 """Run regression tests on connectors. 1206 1207 This command supports two modes: 1208 1209 Comparison mode (skip_compare=False, default): 1210 Runs the specified Airbyte protocol command against both the target (new) 1211 and control (baseline) connector versions, then compares the results. 1212 This helps identify regressions between versions. 1213 1214 Single-version mode (skip_compare=True): 1215 Runs the specified Airbyte protocol command against a single connector 1216 and validates the output. No comparison is performed. 1217 1218 Results are written to the output directory and to GitHub Actions outputs 1219 if running in CI. 1220 1221 You can provide the test image in three ways: 1222 1. --test-image: Use a pre-built image from Docker registry 1223 2. --connector-name: Build the image locally from source code 1224 3. --connection-id: Auto-detect from an Airbyte Cloud connection 1225 1226 You can provide config/catalog either via file paths OR via a connection_id 1227 that fetches them from Airbyte Cloud. 1228 """ 1229 # Configure debug logging for the regression test harness when requested 1230 if enable_debug_logs: 1231 logging.basicConfig( 1232 level=logging.DEBUG, 1233 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 1234 force=True, 1235 ) 1236 1237 output_path = Path(output_dir) 1238 output_path.mkdir(parents=True, exist_ok=True) 1239 1240 cmd = Command(command) 1241 1242 config_file: Path | None = None 1243 catalog_file: Path | None = None 1244 state_file = Path(state_path) if state_path else None 1245 1246 # Resolve the test image (used in both single-version and comparison modes) 1247 resolved_test_image: str | None = test_image 1248 resolved_control_image: str | None = control_image 1249 1250 # Validate conflicting parameters 1251 # Single-version mode: reject comparison-specific parameters 1252 if skip_compare and control_image: 1253 write_github_output("success", False) 1254 write_github_output( 1255 "error", "Cannot specify control_image with skip_compare=True" 1256 ) 1257 exit_with_error( 1258 "Cannot specify --control-image with --skip-compare. " 1259 "Control image is only used in comparison mode." 1260 ) 1261 1262 # If connector_name is provided, build the image from source 1263 if connector_name: 1264 if resolved_test_image: 1265 write_github_output("success", False) 1266 write_github_output( 1267 "error", "Cannot specify both test_image and connector_name" 1268 ) 1269 exit_with_error("Cannot specify both --test-image and --connector-name") 1270 1271 repo_root_path = Path(repo_root) if repo_root else None 1272 built_image = _build_connector_image_from_source( 1273 connector_name=connector_name, 1274 repo_root=repo_root_path, 1275 tag="dev", 1276 ) 1277 if not built_image: 1278 write_github_output("success", False) 1279 write_github_output("error", f"Failed to build image for {connector_name}") 1280 exit_with_error(f"Failed to build image for {connector_name}") 1281 resolved_test_image = built_image 1282 1283 if connection_id: 1284 if config_path or catalog_path: 1285 write_github_output("success", False) 1286 write_github_output( 1287 "error", "Cannot specify both connection_id and file paths" 1288 ) 1289 exit_with_error( 1290 "Cannot specify both connection_id and config_path/catalog_path" 1291 ) 1292 1293 print_success(f"Fetching config/catalog from connection: {connection_id}") 1294 connection_data = fetch_connection_data(connection_id) 1295 1296 # Check if we should retrieve unmasked secrets 1297 if should_use_secret_retriever(): 1298 print_success( 1299 "USE_CONNECTION_SECRET_RETRIEVER enabled - enriching config with unmasked secrets..." 1300 ) 1301 try: 1302 connection_data = enrich_config_with_secrets( 1303 connection_data, 1304 retrieval_reason="Regression test with USE_CONNECTION_SECRET_RETRIEVER=true", 1305 ) 1306 print_success("Successfully retrieved unmasked secrets from database") 1307 except SecretRetrievalError as e: 1308 write_github_output("success", False) 1309 write_github_output("error", str(e)) 1310 exit_with_error( 1311 f"{e}\n\n" 1312 f"This connection cannot be used for regression testing. " 1313 f"Please use a connection from a non-EU workspace, or use GSM-based " 1314 f"integration test credentials instead (by omitting --connection-id)." 1315 ) 1316 1317 # Resolve with_state default: True when connection_id is provided 1318 resolved_with_state = with_state if with_state is not None else True 1319 1320 if resolved_with_state and not state_file and command == "read": 1321 print_success("Fetching connection state for warm read...") 1322 try: 1323 connection_data.state = fetch_connection_state(connection_data) 1324 if connection_data.state: 1325 print_success( 1326 f"Fetched state with {len(connection_data.state)} stream(s)" 1327 ) 1328 else: 1329 connection_data.state = None 1330 print_success( 1331 "No state available for this connection (initial sync)" 1332 ) 1333 except Exception as exc: 1334 print_error(f"Failed to fetch connection state: {exc}") 1335 print_success("Falling back to cold read (no state)") 1336 1337 config_file, catalog_file, state_file_from_conn = save_connection_data_to_files( 1338 connection_data, output_path / "connection_data" 1339 ) 1340 1341 if state_file_from_conn and not state_file: 1342 state_file = state_file_from_conn 1343 1344 print_success( 1345 f"Fetched config for source: {connection_data.source_name} " 1346 f"with {len(connection_data.stream_names)} streams" 1347 ) 1348 1349 # Auto-detect test/control image from connection if not provided 1350 if not resolved_test_image and connection_data.connector_image: 1351 resolved_test_image = connection_data.connector_image 1352 print_success(f"Auto-detected test image: {resolved_test_image}") 1353 1354 if ( 1355 not skip_compare 1356 and not resolved_control_image 1357 and connection_data.connector_image 1358 ): 1359 resolved_control_image = connection_data.connector_image 1360 print_success(f"Auto-detected control image: {resolved_control_image}") 1361 elif config_path: 1362 config_file = Path(config_path) 1363 catalog_file = Path(catalog_path) if catalog_path else None 1364 elif connector_name: 1365 # Fallback: fetch integration test secrets from GSM using PyAirbyte API 1366 print_success( 1367 f"No connection_id or config_path provided. " 1368 f"Attempting to fetch integration test config from GSM for {connector_name}..." 1369 ) 1370 gsm_config = get_first_config_from_secrets(connector_name) 1371 if gsm_config: 1372 # Write config to a temp file (not in output_path to avoid artifact upload) 1373 gsm_config_dir = Path( 1374 tempfile.mkdtemp(prefix=f"gsm-config-{connector_name}-") 1375 ) 1376 gsm_config_dir.chmod(0o700) 1377 gsm_config_file = gsm_config_dir / "config.json" 1378 gsm_config_file.write_text(json.dumps(gsm_config, indent=2)) 1379 gsm_config_file.chmod(0o600) 1380 config_file = gsm_config_file 1381 # Use catalog_path if provided (e.g., generated from discover output) 1382 catalog_file = Path(catalog_path) if catalog_path else None 1383 print_success( 1384 f"Fetched integration test config from GSM for {connector_name}" 1385 ) 1386 else: 1387 print_error( 1388 f"Failed to fetch integration test config from GSM for {connector_name}." 1389 ) 1390 config_file = None 1391 # Use catalog_path if provided (e.g., generated from discover output) 1392 catalog_file = Path(catalog_path) if catalog_path else None 1393 else: 1394 config_file = None 1395 catalog_file = Path(catalog_path) if catalog_path else None 1396 1397 # Auto-detect control_image from metadata.yaml if connector_name is provided (comparison mode only) 1398 if not skip_compare and not resolved_control_image and connector_name: 1399 resolved_control_image = _fetch_control_image_from_metadata(connector_name) 1400 if resolved_control_image: 1401 print_success( 1402 f"Auto-detected control image from metadata.yaml: {resolved_control_image}" 1403 ) 1404 1405 # Validate that we have the required images 1406 if not resolved_test_image: 1407 write_github_output("success", False) 1408 write_github_output("error", "No test image specified") 1409 exit_with_error( 1410 "You must provide one of the following: a test_image, a connector_name " 1411 "to build the image from source, or a connection_id to auto-detect the image." 1412 ) 1413 1414 if not skip_compare and not resolved_control_image: 1415 write_github_output("success", False) 1416 write_github_output("error", "No control image specified") 1417 exit_with_error( 1418 "You must provide one of the following: a control_image, a connection_id " 1419 "for a connection that has an associated connector image, or a connector_name " 1420 "to auto-detect the control image from the airbyte repo's metadata.yaml." 1421 ) 1422 1423 # Pull images if they weren't just built locally 1424 # If connector_name was provided, we just built the test image locally 1425 if not connector_name and not ensure_image_available(resolved_test_image): 1426 write_github_output("success", False) 1427 write_github_output("error", f"Failed to pull image: {resolved_test_image}") 1428 exit_with_error(f"Failed to pull test image: {resolved_test_image}") 1429 1430 if ( 1431 not skip_compare 1432 and resolved_control_image 1433 and not ensure_image_available(resolved_control_image) 1434 ): 1435 write_github_output("success", False) 1436 write_github_output("error", f"Failed to pull image: {resolved_control_image}") 1437 exit_with_error( 1438 f"Failed to pull control connector image: {resolved_control_image}" 1439 ) 1440 1441 # Apply selected_streams filter to catalog if requested 1442 if selected_streams and catalog_file: 1443 streams_set = {s.strip() for s in selected_streams.split(",") if s.strip()} 1444 if streams_set: 1445 print_success( 1446 f"Filtering catalog to {len(streams_set)} selected streams: " 1447 f"{', '.join(sorted(streams_set))}" 1448 ) 1449 filter_configured_catalog_file(catalog_file, streams_set) 1450 1451 # Upgrade READ → READ_WITH_STATE when a state file is available 1452 if cmd == Command.READ and state_file: 1453 cmd = Command.READ_WITH_STATE 1454 print_success("State available — using read-with-state (warm read)") 1455 1456 # Track telemetry for the regression test 1457 # Extract version from image tag (e.g., "airbyte/source-github:1.0.0" -> "1.0.0") 1458 target_version = ( 1459 resolved_test_image.rsplit(":", 1)[-1] 1460 if ":" in resolved_test_image 1461 else "unknown" 1462 ) 1463 control_version = None 1464 if resolved_control_image and ":" in resolved_control_image: 1465 control_version = resolved_control_image.rsplit(":", 1)[-1] 1466 1467 # Get tester identity from environment (GitHub Actions sets GITHUB_ACTOR) 1468 tester = os.getenv("GITHUB_ACTOR") or os.getenv("USER") 1469 1470 track_regression_test( 1471 user_id=tester, 1472 connector_image=resolved_test_image, 1473 command=command, 1474 target_version=target_version, 1475 control_version=control_version, 1476 additional_properties={ 1477 "connection_id": connection_id, 1478 "skip_compare": skip_compare, 1479 "with_state": cmd == Command.READ_WITH_STATE, 1480 }, 1481 ) 1482 1483 # Execute the appropriate mode 1484 if skip_compare: 1485 # Single-version mode: run only the connector image 1486 result = _run_connector_command( 1487 connector_image=resolved_test_image, 1488 command=cmd, 1489 output_dir=output_path, 1490 target_or_control=TargetOrControl.TARGET, 1491 config_path=config_file, 1492 catalog_path=catalog_file, 1493 state_path=state_file, 1494 enable_debug_logs=enable_debug_logs, 1495 ) 1496 1497 print_json(result) 1498 1499 write_github_outputs( 1500 { 1501 "success": result["success"], 1502 "connector": resolved_test_image, 1503 "command": command, 1504 "exit_code": result["exit_code"], 1505 } 1506 ) 1507 1508 write_test_summary( 1509 connector_image=resolved_test_image, 1510 test_type="regression-test", 1511 success=result["success"], 1512 results={ 1513 "command": command, 1514 "exit_code": result["exit_code"], 1515 "output_dir": output_dir, 1516 }, 1517 ) 1518 1519 # Generate report.md with detailed metrics 1520 report_path = generate_single_version_report( 1521 connector_image=resolved_test_image, 1522 command=command, 1523 result=result, 1524 output_dir=output_path, 1525 ) 1526 print_success(f"Generated report: {report_path}") 1527 1528 # Write report to GITHUB_STEP_SUMMARY (if env var exists) 1529 write_github_summary(report_path.read_text()) 1530 1531 if result["success"]: 1532 print_success( 1533 f"Single-version regression test passed for {resolved_test_image}" 1534 ) 1535 else: 1536 print_error( 1537 f"Single-version regression test failed for {resolved_test_image}" 1538 ) 1539 else: 1540 # Comparison mode: run both target and control images 1541 target_output = output_path / "target" 1542 control_output = output_path / "control" 1543 1544 target_result = _run_with_optional_http_metrics( 1545 connector_image=resolved_test_image, 1546 command=cmd, 1547 output_dir=target_output, 1548 target_or_control=TargetOrControl.TARGET, 1549 enable_http_metrics=enable_http_metrics, 1550 config_path=config_file, 1551 catalog_path=catalog_file, 1552 state_path=state_file, 1553 enable_debug_logs=enable_debug_logs, 1554 ) 1555 1556 control_result = _run_with_optional_http_metrics( 1557 connector_image=resolved_control_image, # type: ignore[arg-type] 1558 command=cmd, 1559 output_dir=control_output, 1560 target_or_control=TargetOrControl.CONTROL, 1561 enable_http_metrics=enable_http_metrics, 1562 config_path=config_file, 1563 catalog_path=catalog_file, 1564 state_path=state_file, 1565 enable_debug_logs=enable_debug_logs, 1566 ) 1567 1568 both_succeeded = target_result["success"] and control_result["success"] 1569 regression_detected = target_result["success"] != control_result["success"] 1570 1571 combined_result = { 1572 "target": target_result, 1573 "control": control_result, 1574 "both_succeeded": both_succeeded, 1575 "regression_detected": regression_detected, 1576 } 1577 1578 print_json(combined_result) 1579 1580 both_failed = not target_result["success"] and not control_result["success"] 1581 1582 write_github_outputs( 1583 { 1584 "success": not regression_detected, 1585 "target_image": resolved_test_image, 1586 "control_image": resolved_control_image, 1587 "command": command, 1588 "target_exit_code": target_result["exit_code"], 1589 "control_exit_code": control_result["exit_code"], 1590 "regression_detected": regression_detected, 1591 "both_failed": both_failed, 1592 } 1593 ) 1594 1595 write_json_output("regression_report", combined_result) 1596 1597 report_path = generate_regression_report( 1598 target_image=resolved_test_image, 1599 control_image=resolved_control_image, # type: ignore[arg-type] 1600 command=command, 1601 target_result=target_result, 1602 control_result=control_result, 1603 output_dir=output_path, 1604 ) 1605 print_success(f"Generated regression report: {report_path}") 1606 1607 # Write report to GITHUB_STEP_SUMMARY (if env var exists) 1608 write_github_summary(report_path.read_text()) 1609 1610 if regression_detected: 1611 print_error( 1612 f"Regression detected between {resolved_test_image} and {resolved_control_image}" 1613 ) 1614 elif both_succeeded: 1615 print_success( 1616 f"Regression test passed for {resolved_test_image} vs {resolved_control_image}" 1617 ) 1618 else: 1619 # Both versions failed, but no regression between them was detected; treat this as a 1620 # test environment issue (e.g., expired credentials, transient API errors, rate limiting). 1621 # Exit successfully since no regression between versions was detected. 1622 print_warning( 1623 f"Both versions failed for {resolved_test_image} vs {resolved_control_image}. " 1624 "No regression detected. This may indicate expired credentials or a transient API issue." 1625 ) 1626 1627 1628@connector_app.command(name="fetch-connection-config") 1629def fetch_connection_config_cmd( 1630 connection_id: Annotated[ 1631 str, 1632 Parameter(help="The UUID of the Airbyte Cloud connection."), 1633 ], 1634 output_path: Annotated[ 1635 str | None, 1636 Parameter( 1637 help="Path to output file or directory. " 1638 "If directory, writes connection-<id>-config.json inside it. " 1639 "Default: platform temp directory (e.g. /tmp/connection-<id>-config.json)" 1640 ), 1641 ] = None, 1642 with_secrets: Annotated[ 1643 bool, 1644 Parameter( 1645 name="--with-secrets", 1646 negative="--no-secrets", 1647 help="If set, fetches unmasked secrets from the internal database. " 1648 "Requires GCP_PROD_DB_ACCESS_CREDENTIALS env var or `gcloud auth application-default login`. " 1649 "Must be used with --oc-issue-url.", 1650 ), 1651 ] = False, 1652 oc_issue_url: Annotated[ 1653 str | None, 1654 Parameter( 1655 help="OC issue URL for audit logging. Required when using --with-secrets." 1656 ), 1657 ] = None, 1658) -> None: 1659 """Fetch connection configuration from Airbyte Cloud to a local file. 1660 1661 This command retrieves the source configuration for a given connection ID 1662 and writes it to a JSON file. When `--output-path` is omitted the file is 1663 written to the platform temp directory to avoid accidentally committing 1664 secrets to a git repository. 1665 1666 Requires authentication via AIRBYTE_CLOUD_CLIENT_ID and 1667 AIRBYTE_CLOUD_CLIENT_SECRET environment variables. 1668 1669 When --with-secrets is specified, the command fetches unmasked secrets from 1670 the internal database using the connection-retriever. This additionally requires: 1671 - An OC issue URL for audit logging (--oc-issue-url) 1672 - GCP credentials via `GCP_PROD_DB_ACCESS_CREDENTIALS` env var or `gcloud auth application-default login` 1673 - Cloud SQL Python Connector access to the Prod DB replica. 1674 """ 1675 path = Path(output_path) if output_path else None 1676 result = fetch_connection_config( 1677 connection_id=connection_id, 1678 output_path=path, 1679 with_secrets=with_secrets, 1680 oc_issue_url=oc_issue_url, 1681 ) 1682 if result.success: 1683 print_success(result.message) 1684 else: 1685 print_error(result.message) 1686 print_json(result.model_dump()) 1687 1688 1689def _read_json_input( 1690 json_str: str | None, 1691 input_file: str | None, 1692) -> dict: 1693 """Read JSON from a positional arg, --input file, or STDIN (in that priority).""" 1694 if json_str is not None: 1695 return json.loads(json_str) 1696 if input_file is not None: 1697 return json.loads(Path(input_file).read_text()) 1698 if not sys.stdin.isatty(): 1699 return json.loads(sys.stdin.read()) 1700 exit_with_error( 1701 "No JSON input provided. Pass it as a positional argument, " 1702 "via --input <file>, or pipe to STDIN." 1703 ) 1704 raise SystemExit(1) # unreachable, but satisfies type checker 1705 1706 1707@state_app.command(name="get") 1708def get_connection_state( 1709 connection_id: Annotated[ 1710 str, 1711 Parameter(help="The connection ID (UUID) to fetch state for."), 1712 ], 1713 stream_name: Annotated[ 1714 str | None, 1715 Parameter( 1716 help="Optional stream name to filter state for a single stream.", 1717 name="--stream", 1718 ), 1719 ] = None, 1720 stream_namespace: Annotated[ 1721 str | None, 1722 Parameter( 1723 help="Optional stream namespace to narrow the stream filter.", 1724 name="--namespace", 1725 ), 1726 ] = None, 1727 output: Annotated[ 1728 str | None, 1729 Parameter( 1730 help="Path to write the state JSON output to a file.", 1731 name="--output", 1732 ), 1733 ] = None, 1734) -> None: 1735 """Get the current state for an Airbyte Cloud connection.""" 1736 workspace = CloudWorkspace.from_env() 1737 conn = workspace.get_connection(connection_id) 1738 1739 if stream_name is not None: 1740 stream_state = conn.get_stream_state( 1741 stream_name=stream_name, 1742 stream_namespace=stream_namespace, 1743 ) 1744 result = { 1745 "connection_id": connection_id, 1746 "stream_name": stream_name, 1747 "stream_namespace": stream_namespace, 1748 "stream_state": stream_state, 1749 } 1750 else: 1751 result = conn.dump_raw_state() 1752 1753 json_output = json.dumps(result, indent=2, default=str) 1754 if output is not None: 1755 Path(output).write_text(json_output + "\n") 1756 print(f"State written to {output}", file=sys.stderr) 1757 else: 1758 print(json_output) 1759 1760 1761@state_app.command(name="set") 1762def set_connection_state( 1763 connection_id: Annotated[ 1764 str, 1765 Parameter(help="The connection ID (UUID) to update state for."), 1766 ], 1767 state_json: Annotated[ 1768 str | None, 1769 Parameter( 1770 help="The connection state as a JSON string. When --stream is used, " 1771 'this is just the stream\'s state blob (e.g., \'{"cursor": "2024-01-01"}\').' 1772 " Otherwise, must include 'stateType', 'connectionId', and the appropriate " 1773 "state field. Can also be provided via --input or STDIN." 1774 ), 1775 ] = None, 1776 stream_name: Annotated[ 1777 str | None, 1778 Parameter( 1779 help="Optional stream name to update state for a single stream only.", 1780 name="--stream", 1781 ), 1782 ] = None, 1783 stream_namespace: Annotated[ 1784 str | None, 1785 Parameter( 1786 help="Optional stream namespace to identify the stream.", 1787 name="--namespace", 1788 ), 1789 ] = None, 1790 input_file: Annotated[ 1791 str | None, 1792 Parameter( 1793 help="Path to a JSON file containing the state to set.", 1794 name="--input", 1795 ), 1796 ] = None, 1797) -> None: 1798 """Set the state for an Airbyte Cloud connection. 1799 1800 State JSON can be provided as a positional argument, via --input <file>, 1801 or piped through STDIN. 1802 1803 Uses the safe variant that prevents updates while a sync is running. 1804 When --stream is provided, only that stream's state is updated within 1805 the existing connection state. 1806 """ 1807 parsed_state = _read_json_input(state_json, input_file) 1808 workspace = CloudWorkspace.from_env() 1809 conn = workspace.get_connection(connection_id) 1810 1811 if stream_name is not None: 1812 conn.set_stream_state( 1813 stream_name=stream_name, 1814 state_blob_dict=parsed_state, 1815 stream_namespace=stream_namespace, 1816 ) 1817 else: 1818 conn.import_raw_state(parsed_state) 1819 1820 result = conn.dump_raw_state() 1821 json_output = json.dumps(result, indent=2, default=str) 1822 print(json_output) 1823 1824 1825@state_app.command(name="reset") 1826def reset_stream_state( 1827 connection_id: Annotated[ 1828 str, 1829 Parameter(help="The connection ID (UUID) to update state for."), 1830 ], 1831 stream_name: Annotated[ 1832 str, 1833 Parameter( 1834 help="The configured stream name whose state should be reset.", 1835 name="--stream", 1836 ), 1837 ], 1838 stream_namespace: Annotated[ 1839 str | None, 1840 Parameter( 1841 help="Optional stream namespace to identify the stream.", 1842 name="--namespace", 1843 ), 1844 ] = None, 1845) -> None: 1846 """Reset a configured stream's state so the next sync full-refreshes it. 1847 1848 Uses the safe variant that prevents updates while a sync is running. 1849 Returns a restorable `previous_state_backup` in raw Config API format. 1850 """ 1851 workspace = CloudWorkspace.from_env() 1852 conn = workspace.get_connection(connection_id) 1853 1854 try: 1855 result = reset_cloud_stream_state( 1856 conn, 1857 stream_name=stream_name, 1858 stream_namespace=stream_namespace, 1859 ) 1860 except PyAirbyteInputError as e: 1861 exit_with_error(str(e)) 1862 1863 print(json.dumps(result.model_dump(), indent=2, default=str)) 1864 1865 1866@catalog_app.command(name="get") 1867def get_connection_catalog( 1868 connection_id: Annotated[ 1869 str, 1870 Parameter(help="The connection ID (UUID) to fetch catalog for."), 1871 ], 1872 output: Annotated[ 1873 str | None, 1874 Parameter( 1875 help="Path to write the catalog JSON output to a file.", 1876 name="--output", 1877 ), 1878 ] = None, 1879) -> None: 1880 """Get the configured catalog for an Airbyte Cloud connection.""" 1881 workspace = CloudWorkspace.from_env() 1882 conn = workspace.get_connection(connection_id) 1883 result = conn.dump_raw_catalog() 1884 if result is None: 1885 exit_with_error("No configured catalog found for this connection.") 1886 raise SystemExit(1) # unreachable, but satisfies type checker 1887 1888 json_output = json.dumps(result, indent=2, default=str) 1889 if output is not None: 1890 Path(output).write_text(json_output + "\n") 1891 print(f"Catalog written to {output}", file=sys.stderr) 1892 else: 1893 print(json_output) 1894 1895 1896@catalog_app.command(name="set") 1897def set_connection_catalog( 1898 connection_id: Annotated[ 1899 str, 1900 Parameter(help="The connection ID (UUID) to update catalog for."), 1901 ], 1902 catalog_json: Annotated[ 1903 str | None, 1904 Parameter( 1905 help="The configured catalog as a JSON string. " 1906 "Can also be provided via --input or STDIN." 1907 ), 1908 ] = None, 1909 input_file: Annotated[ 1910 str | None, 1911 Parameter( 1912 help="Path to a JSON file containing the catalog to set.", 1913 name="--input", 1914 ), 1915 ] = None, 1916) -> None: 1917 """Set the configured catalog for an Airbyte Cloud connection. 1918 1919 Catalog JSON can be provided as a positional argument, via --input <file>, 1920 or piped through STDIN. 1921 1922 WARNING: This replaces the entire configured catalog. 1923 """ 1924 parsed_catalog = _read_json_input(catalog_json, input_file) 1925 workspace = CloudWorkspace.from_env() 1926 conn = workspace.get_connection(connection_id) 1927 conn.import_raw_catalog(parsed_catalog) 1928 1929 result = conn.dump_raw_catalog() 1930 if result is None: 1931 exit_with_error("Failed to retrieve catalog after import.") 1932 raise SystemExit(1) # unreachable, but satisfies type checker 1933 1934 json_output = json.dumps(result, indent=2, default=str) 1935 print(json_output) 1936 1937 1938@logs_app.command(name="lookup-cloud-backend-error") 1939def lookup_cloud_backend_error( 1940 error_id: Annotated[ 1941 str, 1942 Parameter( 1943 help=( 1944 "The error ID (UUID) to search for. This is typically returned " 1945 "in API error responses as {'errorId': '...'}" 1946 ) 1947 ), 1948 ], 1949 lookback_days: Annotated[ 1950 int, 1951 Parameter(help="Number of days to look back in logs."), 1952 ] = 7, 1953 min_severity_filter: Annotated[ 1954 GCPSeverity | None, 1955 Parameter( 1956 help="Optional minimum severity level to filter logs.", 1957 ), 1958 ] = None, 1959 raw: Annotated[ 1960 bool, 1961 Parameter(help="Output raw JSON instead of formatted text."), 1962 ] = False, 1963) -> None: 1964 """Look up error details from GCP Cloud Logging by error ID. 1965 1966 When an Airbyte Cloud API returns an error response with only an error ID 1967 (e.g., {"errorId": "3173452e-8f22-4286-a1ec-b0f16c1e078a"}), this command 1968 fetches the full stack trace and error details from GCP Cloud Logging. 1969 1970 Requires GCP credentials with Logs Viewer role on the target project. 1971 Set up credentials with: gcloud auth application-default login 1972 """ 1973 print(f"Searching for error ID: {error_id}", file=sys.stderr) 1974 print(f"Lookback days: {lookback_days}", file=sys.stderr) 1975 if min_severity_filter: 1976 print(f"Severity filter: {min_severity_filter}", file=sys.stderr) 1977 print(file=sys.stderr) 1978 1979 result = fetch_error_logs( 1980 error_id=error_id, 1981 lookback_days=lookback_days, 1982 min_severity_filter=min_severity_filter, 1983 ) 1984 1985 if raw: 1986 print_json(result.model_dump()) 1987 return 1988 1989 print(f"Found {result.total_entries_found} log entries", file=sys.stderr) 1990 print(file=sys.stderr) 1991 1992 if result.payloads: 1993 for i, payload in enumerate(result.payloads): 1994 print(f"=== Log Group {i + 1} ===") 1995 print(f"Timestamp: {payload.timestamp}") 1996 print(f"Severity: {payload.severity}") 1997 if payload.resource.labels.pod_name: 1998 print(f"Pod: {payload.resource.labels.pod_name}") 1999 print(f"Lines: {payload.num_log_lines}") 2000 print() 2001 print(payload.message) 2002 print() 2003 elif result.entries: 2004 print("No grouped payloads, showing raw entries:", file=sys.stderr) 2005 for entry in result.entries: 2006 print(f"[{entry.timestamp}] {entry.severity}: {entry.payload}") 2007 else: 2008 print_error("No log entries found for this error ID.")