airbyte.mcp.cloud_ops
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4from typing import Annotated, Any 5 6from fastmcp import FastMCP 7from pydantic import Field 8 9from airbyte import cloud, get_destination, get_source 10from airbyte.cloud.auth import ( 11 resolve_cloud_api_url, 12 resolve_cloud_client_id, 13 resolve_cloud_client_secret, 14 resolve_cloud_workspace_id, 15) 16from airbyte.cloud.connections import CloudConnection 17from airbyte.cloud.connectors import CloudDestination, CloudSource 18from airbyte.cloud.workspaces import CloudWorkspace 19from airbyte.destinations.util import get_noop_destination 20from airbyte.mcp._util import resolve_config, resolve_list_of_strings 21 22 23def _get_cloud_workspace() -> CloudWorkspace: 24 """Get an authenticated CloudWorkspace using environment variables.""" 25 return CloudWorkspace( 26 workspace_id=resolve_cloud_workspace_id(), 27 client_id=resolve_cloud_client_id(), 28 client_secret=resolve_cloud_client_secret(), 29 api_root=resolve_cloud_api_url(), 30 ) 31 32 33# @app.tool() # << deferred 34def deploy_source_to_cloud( 35 source_name: Annotated[ 36 str, 37 Field(description="The name to use when deploying the source."), 38 ], 39 source_connector_name: Annotated[ 40 str, 41 Field(description="The name of the source connector (e.g., 'source-faker')."), 42 ], 43 *, 44 config: Annotated[ 45 dict | str | None, 46 Field( 47 description="The configuration for the source connector.", 48 default=None, 49 ), 50 ], 51 config_secret_name: Annotated[ 52 str | None, 53 Field( 54 description="The name of the secret containing the configuration.", 55 default=None, 56 ), 57 ], 58 unique: Annotated[ 59 bool, 60 Field( 61 description="Whether to require a unique name.", 62 default=True, 63 ), 64 ], 65) -> str: 66 """Deploy a source connector to Airbyte Cloud. 67 68 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 69 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 70 Airbyte Cloud API. 71 """ 72 try: 73 source = get_source( 74 source_connector_name, 75 install_if_missing=False, 76 ) 77 config_dict = resolve_config( 78 config=config, 79 config_secret_name=config_secret_name, 80 config_spec_jsonschema=source.config_spec, 81 ) 82 source.set_config(config_dict) 83 84 workspace: CloudWorkspace = _get_cloud_workspace() 85 deployed_source = workspace.deploy_source( 86 name=source_name, 87 source=source, 88 unique=unique, 89 ) 90 91 except Exception as ex: 92 return f"Failed to deploy source '{source_name}': {ex}" 93 else: 94 return ( 95 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 96 f" and URL: {deployed_source.connector_url}" 97 ) 98 99 100# @app.tool() # << deferred 101def deploy_destination_to_cloud( 102 destination_name: Annotated[ 103 str, 104 Field(description="The name to use when deploying the destination."), 105 ], 106 destination_connector_name: Annotated[ 107 str, 108 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 109 ], 110 *, 111 config: Annotated[ 112 dict | str | None, 113 Field( 114 description="The configuration for the destination connector.", 115 default=None, 116 ), 117 ], 118 config_secret_name: Annotated[ 119 str | None, 120 Field( 121 description="The name of the secret containing the configuration.", 122 default=None, 123 ), 124 ], 125 unique: Annotated[ 126 bool, 127 Field( 128 description="Whether to require a unique name.", 129 default=True, 130 ), 131 ], 132) -> str: 133 """Deploy a destination connector to Airbyte Cloud. 134 135 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 136 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 137 Airbyte Cloud API. 138 """ 139 try: 140 destination = get_destination( 141 destination_connector_name, 142 install_if_missing=False, 143 ) 144 config_dict = resolve_config( 145 config=config, 146 config_secret_name=config_secret_name, 147 config_spec_jsonschema=destination.config_spec, 148 ) 149 destination.set_config(config_dict) 150 151 workspace: CloudWorkspace = _get_cloud_workspace() 152 deployed_destination = workspace.deploy_destination( 153 name=destination_name, 154 destination=destination, 155 unique=unique, 156 ) 157 158 except Exception as ex: 159 return f"Failed to deploy destination '{destination_name}': {ex}" 160 else: 161 return ( 162 f"Successfully deployed destination '{destination_name}' " 163 f"with ID: {deployed_destination.connector_id}" 164 ) 165 166 167# @app.tool() # << deferred 168def create_connection_on_cloud( 169 connection_name: Annotated[ 170 str, 171 Field(description="The name of the connection."), 172 ], 173 source_id: Annotated[ 174 str, 175 Field(description="The ID of the deployed source."), 176 ], 177 destination_id: Annotated[ 178 str, 179 Field(description="The ID of the deployed destination."), 180 ], 181 selected_streams: Annotated[ 182 str | list[str], 183 Field( 184 description=( 185 "The selected stream names to sync within the connection. " 186 "Must be an explicit stream name or list of streams. " 187 "Cannot be empty or '*'." 188 ) 189 ), 190 ], 191 table_prefix: Annotated[ 192 str | None, 193 Field( 194 description="Optional table prefix to use when syncing to the destination.", 195 default=None, 196 ), 197 ], 198) -> str: 199 """Create a connection between a deployed source and destination on Airbyte Cloud. 200 201 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 202 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 203 Airbyte Cloud API. 204 """ 205 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 206 try: 207 workspace: CloudWorkspace = _get_cloud_workspace() 208 deployed_connection = workspace.deploy_connection( 209 connection_name=connection_name, 210 source=source_id, 211 destination=destination_id, 212 selected_streams=resolved_streams_list, 213 table_prefix=table_prefix, 214 ) 215 216 except Exception as ex: 217 return f"Failed to create connection '{connection_name}': {ex}" 218 else: 219 return ( 220 f"Successfully created connection '{connection_name}' " 221 f"with ID '{deployed_connection.connection_id}' and " 222 f"URL: {deployed_connection.connection_url}" 223 ) 224 225 226# @app.tool() # << deferred 227def run_cloud_sync( 228 connection_id: Annotated[ 229 str, 230 Field(description="The ID of the Airbyte Cloud connection."), 231 ], 232 *, 233 wait: Annotated[ 234 bool, 235 Field( 236 description="Whether to wait for the sync to complete.", 237 default=True, 238 ), 239 ], 240 wait_timeout: Annotated[ 241 int, 242 Field( 243 description="Maximum time to wait for sync completion (seconds).", 244 default=300, 245 ), 246 ], 247) -> str: 248 """Run a sync job on Airbyte Cloud. 249 250 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 251 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 252 Airbyte Cloud API. 253 """ 254 try: 255 workspace: CloudWorkspace = _get_cloud_workspace() 256 connection = workspace.get_connection(connection_id=connection_id) 257 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 258 259 except Exception as ex: 260 return f"Failed to run sync for connection '{connection_id}': {ex}" 261 else: 262 if wait: 263 status = sync_result.get_job_status() 264 return ( 265 f"Sync completed with status: {status}. " # Sync completed. 266 f"Job ID is '{sync_result.job_id}' and " 267 f"job URL is: {sync_result.job_url}" 268 ) 269 return ( 270 f"Sync started. " # Sync started. 271 f"Job ID is '{sync_result.job_id}' and " 272 f"job URL is: {sync_result.job_url}" 273 ) 274 275 276# @app.tool() # << deferred 277def check_airbyte_cloud_workspace() -> str: 278 """Check if we have a valid Airbyte Cloud connection and return workspace info. 279 280 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 281 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 282 Airbyte Cloud API. 283 284 Returns workspace ID and workspace URL for verification. 285 """ 286 try: 287 workspace: CloudWorkspace = _get_cloud_workspace() 288 workspace.connect() 289 290 except Exception as ex: 291 return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}" 292 else: 293 return ( 294 f"✅ Successfully connected to Airbyte Cloud workspace.\n" 295 f"Workspace ID: {workspace.workspace_id}\n" 296 f"Workspace URL: {workspace.workspace_url}" 297 ) 298 299 300# @app.tool() # << deferred 301def deploy_noop_destination_to_cloud( 302 name: str = "No-op Destination", 303 *, 304 unique: bool = True, 305) -> str: 306 """Deploy the No-op destination to Airbyte Cloud for testing purposes. 307 308 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 309 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 310 Airbyte Cloud API. 311 """ 312 try: 313 destination = get_noop_destination() 314 workspace: CloudWorkspace = _get_cloud_workspace() 315 deployed_destination = workspace.deploy_destination( 316 name=name, 317 destination=destination, 318 unique=unique, 319 ) 320 except Exception as ex: 321 return f"Failed to deploy No-op Destination: {ex}" 322 else: 323 return ( 324 f"Successfully deployed No-op Destination " 325 f"with ID '{deployed_destination.connector_id}' and " 326 f"URL: {deployed_destination.connector_url}" 327 ) 328 329 330# @app.tool() # << deferred 331def get_cloud_sync_status( 332 connection_id: Annotated[ 333 str, 334 Field( 335 description="The ID of the Airbyte Cloud connection.", 336 ), 337 ], 338 job_id: Annotated[ 339 int | None, 340 Field( 341 description="Optional job ID. If not provided, the latest job will be used.", 342 default=None, 343 ), 344 ], 345 *, 346 include_attempts: Annotated[ 347 bool, 348 Field( 349 description="Whether to include detailed attempts information.", 350 default=False, 351 ), 352 ], 353) -> dict[str, Any]: 354 """Get the status of a sync job from the Airbyte Cloud. 355 356 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 357 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 358 Airbyte Cloud API. 359 """ 360 try: 361 workspace: CloudWorkspace = _get_cloud_workspace() 362 connection = workspace.get_connection(connection_id=connection_id) 363 364 # If a job ID is provided, get the job by ID. 365 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 366 367 if not sync_result: 368 return {"status": None, "job_id": None, "attempts": []} 369 370 result = { 371 "status": sync_result.get_job_status(), 372 "job_id": sync_result.job_id, 373 "bytes_synced": sync_result.bytes_synced, 374 "records_synced": sync_result.records_synced, 375 "start_time": sync_result.start_time.isoformat(), 376 "job_url": sync_result.job_url, 377 "attempts": [], 378 } 379 380 if include_attempts: 381 attempts = sync_result.get_attempts() 382 result["attempts"] = [ 383 { 384 "attempt_number": attempt.attempt_number, 385 "attempt_id": attempt.attempt_id, 386 "status": attempt.status, 387 "bytes_synced": attempt.bytes_synced, 388 "records_synced": attempt.records_synced, 389 "created_at": attempt.created_at.isoformat(), 390 } 391 for attempt in attempts 392 ] 393 394 return result # noqa: TRY300 395 396 except Exception as ex: 397 return { 398 "status": None, 399 "job_id": job_id, 400 "error": f"Failed to get sync status for connection '{connection_id}': {ex}", 401 "attempts": [], 402 } 403 404 405# @app.tool() # << deferred 406def list_deployed_cloud_source_connectors() -> list[CloudSource]: 407 """List all deployed source connectors in the Airbyte Cloud workspace. 408 409 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 410 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 411 Airbyte Cloud API. 412 """ 413 workspace: CloudWorkspace = _get_cloud_workspace() 414 return workspace.list_sources() 415 416 417# @app.tool() # << deferred 418def list_deployed_cloud_destination_connectors() -> list[CloudDestination]: 419 """List all deployed destination connectors in the Airbyte Cloud workspace. 420 421 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 422 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 423 Airbyte Cloud API. 424 """ 425 workspace: CloudWorkspace = _get_cloud_workspace() 426 return workspace.list_destinations() 427 428 429# @app.tool() # << deferred 430def get_cloud_sync_logs( 431 connection_id: Annotated[ 432 str, 433 Field(description="The ID of the Airbyte Cloud connection."), 434 ], 435 job_id: Annotated[ 436 int | None, 437 Field(description="Optional job ID. If not provided, the latest job will be used."), 438 ] = None, 439 attempt_number: Annotated[ 440 int | None, 441 Field( 442 description="Optional attempt number. If not provided, the latest attempt will be used." 443 ), 444 ] = None, 445) -> str: 446 """Get the logs from a sync job attempt on Airbyte Cloud. 447 448 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 449 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 450 Airbyte Cloud API. 451 """ 452 try: 453 workspace: CloudWorkspace = _get_cloud_workspace() 454 connection = workspace.get_connection(connection_id=connection_id) 455 456 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 457 458 if not sync_result: 459 return f"No sync job found for connection '{connection_id}'" 460 461 attempts = sync_result.get_attempts() 462 463 if not attempts: 464 return f"No attempts found for job '{sync_result.job_id}'" 465 466 if attempt_number is not None: 467 target_attempt = None 468 for attempt in attempts: 469 if attempt.attempt_number == attempt_number: 470 target_attempt = attempt 471 break 472 473 if target_attempt is None: 474 return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'" 475 else: 476 target_attempt = max(attempts, key=lambda a: a.attempt_number) 477 478 logs = target_attempt.get_full_log_text() 479 480 if not logs: 481 return ( 482 f"No logs available for job '{sync_result.job_id}', " 483 f"attempt {target_attempt.attempt_number}" 484 ) 485 486 return logs # noqa: TRY300 487 488 except Exception as ex: 489 return f"Failed to get logs for connection '{connection_id}': {ex}" 490 491 492# @app.tool() # << deferred 493def list_deployed_cloud_connections() -> list[CloudConnection]: 494 """List all deployed connections in the Airbyte Cloud workspace. 495 496 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 497 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 498 Airbyte Cloud API. 499 """ 500 workspace: CloudWorkspace = _get_cloud_workspace() 501 return workspace.list_connections() 502 503 504def register_cloud_ops_tools(app: FastMCP) -> None: 505 """@private Register tools with the FastMCP app. 506 507 This is an internal function and should not be called directly. 508 """ 509 app.tool(check_airbyte_cloud_workspace) 510 app.tool(deploy_source_to_cloud) 511 app.tool(deploy_destination_to_cloud) 512 app.tool(deploy_noop_destination_to_cloud) 513 app.tool(create_connection_on_cloud) 514 app.tool(run_cloud_sync) 515 app.tool(get_cloud_sync_status) 516 app.tool(get_cloud_sync_logs) 517 app.tool(list_deployed_cloud_source_connectors) 518 app.tool(list_deployed_cloud_destination_connectors) 519 app.tool(list_deployed_cloud_connections)
35def deploy_source_to_cloud( 36 source_name: Annotated[ 37 str, 38 Field(description="The name to use when deploying the source."), 39 ], 40 source_connector_name: Annotated[ 41 str, 42 Field(description="The name of the source connector (e.g., 'source-faker')."), 43 ], 44 *, 45 config: Annotated[ 46 dict | str | None, 47 Field( 48 description="The configuration for the source connector.", 49 default=None, 50 ), 51 ], 52 config_secret_name: Annotated[ 53 str | None, 54 Field( 55 description="The name of the secret containing the configuration.", 56 default=None, 57 ), 58 ], 59 unique: Annotated[ 60 bool, 61 Field( 62 description="Whether to require a unique name.", 63 default=True, 64 ), 65 ], 66) -> str: 67 """Deploy a source connector to Airbyte Cloud. 68 69 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 70 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 71 Airbyte Cloud API. 72 """ 73 try: 74 source = get_source( 75 source_connector_name, 76 install_if_missing=False, 77 ) 78 config_dict = resolve_config( 79 config=config, 80 config_secret_name=config_secret_name, 81 config_spec_jsonschema=source.config_spec, 82 ) 83 source.set_config(config_dict) 84 85 workspace: CloudWorkspace = _get_cloud_workspace() 86 deployed_source = workspace.deploy_source( 87 name=source_name, 88 source=source, 89 unique=unique, 90 ) 91 92 except Exception as ex: 93 return f"Failed to deploy source '{source_name}': {ex}" 94 else: 95 return ( 96 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 97 f" and URL: {deployed_source.connector_url}" 98 )
Deploy a source connector to Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
102def deploy_destination_to_cloud( 103 destination_name: Annotated[ 104 str, 105 Field(description="The name to use when deploying the destination."), 106 ], 107 destination_connector_name: Annotated[ 108 str, 109 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 110 ], 111 *, 112 config: Annotated[ 113 dict | str | None, 114 Field( 115 description="The configuration for the destination connector.", 116 default=None, 117 ), 118 ], 119 config_secret_name: Annotated[ 120 str | None, 121 Field( 122 description="The name of the secret containing the configuration.", 123 default=None, 124 ), 125 ], 126 unique: Annotated[ 127 bool, 128 Field( 129 description="Whether to require a unique name.", 130 default=True, 131 ), 132 ], 133) -> str: 134 """Deploy a destination connector to Airbyte Cloud. 135 136 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 137 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 138 Airbyte Cloud API. 139 """ 140 try: 141 destination = get_destination( 142 destination_connector_name, 143 install_if_missing=False, 144 ) 145 config_dict = resolve_config( 146 config=config, 147 config_secret_name=config_secret_name, 148 config_spec_jsonschema=destination.config_spec, 149 ) 150 destination.set_config(config_dict) 151 152 workspace: CloudWorkspace = _get_cloud_workspace() 153 deployed_destination = workspace.deploy_destination( 154 name=destination_name, 155 destination=destination, 156 unique=unique, 157 ) 158 159 except Exception as ex: 160 return f"Failed to deploy destination '{destination_name}': {ex}" 161 else: 162 return ( 163 f"Successfully deployed destination '{destination_name}' " 164 f"with ID: {deployed_destination.connector_id}" 165 )
Deploy a destination connector to Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
169def create_connection_on_cloud( 170 connection_name: Annotated[ 171 str, 172 Field(description="The name of the connection."), 173 ], 174 source_id: Annotated[ 175 str, 176 Field(description="The ID of the deployed source."), 177 ], 178 destination_id: Annotated[ 179 str, 180 Field(description="The ID of the deployed destination."), 181 ], 182 selected_streams: Annotated[ 183 str | list[str], 184 Field( 185 description=( 186 "The selected stream names to sync within the connection. " 187 "Must be an explicit stream name or list of streams. " 188 "Cannot be empty or '*'." 189 ) 190 ), 191 ], 192 table_prefix: Annotated[ 193 str | None, 194 Field( 195 description="Optional table prefix to use when syncing to the destination.", 196 default=None, 197 ), 198 ], 199) -> str: 200 """Create a connection between a deployed source and destination on Airbyte Cloud. 201 202 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 203 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 204 Airbyte Cloud API. 205 """ 206 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 207 try: 208 workspace: CloudWorkspace = _get_cloud_workspace() 209 deployed_connection = workspace.deploy_connection( 210 connection_name=connection_name, 211 source=source_id, 212 destination=destination_id, 213 selected_streams=resolved_streams_list, 214 table_prefix=table_prefix, 215 ) 216 217 except Exception as ex: 218 return f"Failed to create connection '{connection_name}': {ex}" 219 else: 220 return ( 221 f"Successfully created connection '{connection_name}' " 222 f"with ID '{deployed_connection.connection_id}' and " 223 f"URL: {deployed_connection.connection_url}" 224 )
Create a connection between a deployed source and destination on Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
228def run_cloud_sync( 229 connection_id: Annotated[ 230 str, 231 Field(description="The ID of the Airbyte Cloud connection."), 232 ], 233 *, 234 wait: Annotated[ 235 bool, 236 Field( 237 description="Whether to wait for the sync to complete.", 238 default=True, 239 ), 240 ], 241 wait_timeout: Annotated[ 242 int, 243 Field( 244 description="Maximum time to wait for sync completion (seconds).", 245 default=300, 246 ), 247 ], 248) -> str: 249 """Run a sync job on Airbyte Cloud. 250 251 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 252 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 253 Airbyte Cloud API. 254 """ 255 try: 256 workspace: CloudWorkspace = _get_cloud_workspace() 257 connection = workspace.get_connection(connection_id=connection_id) 258 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 259 260 except Exception as ex: 261 return f"Failed to run sync for connection '{connection_id}': {ex}" 262 else: 263 if wait: 264 status = sync_result.get_job_status() 265 return ( 266 f"Sync completed with status: {status}. " # Sync completed. 267 f"Job ID is '{sync_result.job_id}' and " 268 f"job URL is: {sync_result.job_url}" 269 ) 270 return ( 271 f"Sync started. " # Sync started. 272 f"Job ID is '{sync_result.job_id}' and " 273 f"job URL is: {sync_result.job_url}" 274 )
Run a sync job on Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
278def check_airbyte_cloud_workspace() -> str: 279 """Check if we have a valid Airbyte Cloud connection and return workspace info. 280 281 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 282 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 283 Airbyte Cloud API. 284 285 Returns workspace ID and workspace URL for verification. 286 """ 287 try: 288 workspace: CloudWorkspace = _get_cloud_workspace() 289 workspace.connect() 290 291 except Exception as ex: 292 return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}" 293 else: 294 return ( 295 f"✅ Successfully connected to Airbyte Cloud workspace.\n" 296 f"Workspace ID: {workspace.workspace_id}\n" 297 f"Workspace URL: {workspace.workspace_url}" 298 )
Check if we have a valid Airbyte Cloud connection and return workspace info.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
Returns workspace ID and workspace URL for verification.
302def deploy_noop_destination_to_cloud( 303 name: str = "No-op Destination", 304 *, 305 unique: bool = True, 306) -> str: 307 """Deploy the No-op destination to Airbyte Cloud for testing purposes. 308 309 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 310 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 311 Airbyte Cloud API. 312 """ 313 try: 314 destination = get_noop_destination() 315 workspace: CloudWorkspace = _get_cloud_workspace() 316 deployed_destination = workspace.deploy_destination( 317 name=name, 318 destination=destination, 319 unique=unique, 320 ) 321 except Exception as ex: 322 return f"Failed to deploy No-op Destination: {ex}" 323 else: 324 return ( 325 f"Successfully deployed No-op Destination " 326 f"with ID '{deployed_destination.connector_id}' and " 327 f"URL: {deployed_destination.connector_url}" 328 )
Deploy the No-op destination to Airbyte Cloud for testing purposes.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
332def get_cloud_sync_status( 333 connection_id: Annotated[ 334 str, 335 Field( 336 description="The ID of the Airbyte Cloud connection.", 337 ), 338 ], 339 job_id: Annotated[ 340 int | None, 341 Field( 342 description="Optional job ID. If not provided, the latest job will be used.", 343 default=None, 344 ), 345 ], 346 *, 347 include_attempts: Annotated[ 348 bool, 349 Field( 350 description="Whether to include detailed attempts information.", 351 default=False, 352 ), 353 ], 354) -> dict[str, Any]: 355 """Get the status of a sync job from the Airbyte Cloud. 356 357 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 358 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 359 Airbyte Cloud API. 360 """ 361 try: 362 workspace: CloudWorkspace = _get_cloud_workspace() 363 connection = workspace.get_connection(connection_id=connection_id) 364 365 # If a job ID is provided, get the job by ID. 366 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 367 368 if not sync_result: 369 return {"status": None, "job_id": None, "attempts": []} 370 371 result = { 372 "status": sync_result.get_job_status(), 373 "job_id": sync_result.job_id, 374 "bytes_synced": sync_result.bytes_synced, 375 "records_synced": sync_result.records_synced, 376 "start_time": sync_result.start_time.isoformat(), 377 "job_url": sync_result.job_url, 378 "attempts": [], 379 } 380 381 if include_attempts: 382 attempts = sync_result.get_attempts() 383 result["attempts"] = [ 384 { 385 "attempt_number": attempt.attempt_number, 386 "attempt_id": attempt.attempt_id, 387 "status": attempt.status, 388 "bytes_synced": attempt.bytes_synced, 389 "records_synced": attempt.records_synced, 390 "created_at": attempt.created_at.isoformat(), 391 } 392 for attempt in attempts 393 ] 394 395 return result # noqa: TRY300 396 397 except Exception as ex: 398 return { 399 "status": None, 400 "job_id": job_id, 401 "error": f"Failed to get sync status for connection '{connection_id}': {ex}", 402 "attempts": [], 403 }
Get the status of a sync job from the Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
407def list_deployed_cloud_source_connectors() -> list[CloudSource]: 408 """List all deployed source connectors in the Airbyte Cloud workspace. 409 410 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 411 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 412 Airbyte Cloud API. 413 """ 414 workspace: CloudWorkspace = _get_cloud_workspace() 415 return workspace.list_sources()
List all deployed source connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
419def list_deployed_cloud_destination_connectors() -> list[CloudDestination]: 420 """List all deployed destination connectors in the Airbyte Cloud workspace. 421 422 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 423 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 424 Airbyte Cloud API. 425 """ 426 workspace: CloudWorkspace = _get_cloud_workspace() 427 return workspace.list_destinations()
List all deployed destination connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
431def get_cloud_sync_logs( 432 connection_id: Annotated[ 433 str, 434 Field(description="The ID of the Airbyte Cloud connection."), 435 ], 436 job_id: Annotated[ 437 int | None, 438 Field(description="Optional job ID. If not provided, the latest job will be used."), 439 ] = None, 440 attempt_number: Annotated[ 441 int | None, 442 Field( 443 description="Optional attempt number. If not provided, the latest attempt will be used." 444 ), 445 ] = None, 446) -> str: 447 """Get the logs from a sync job attempt on Airbyte Cloud. 448 449 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 450 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 451 Airbyte Cloud API. 452 """ 453 try: 454 workspace: CloudWorkspace = _get_cloud_workspace() 455 connection = workspace.get_connection(connection_id=connection_id) 456 457 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 458 459 if not sync_result: 460 return f"No sync job found for connection '{connection_id}'" 461 462 attempts = sync_result.get_attempts() 463 464 if not attempts: 465 return f"No attempts found for job '{sync_result.job_id}'" 466 467 if attempt_number is not None: 468 target_attempt = None 469 for attempt in attempts: 470 if attempt.attempt_number == attempt_number: 471 target_attempt = attempt 472 break 473 474 if target_attempt is None: 475 return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'" 476 else: 477 target_attempt = max(attempts, key=lambda a: a.attempt_number) 478 479 logs = target_attempt.get_full_log_text() 480 481 if not logs: 482 return ( 483 f"No logs available for job '{sync_result.job_id}', " 484 f"attempt {target_attempt.attempt_number}" 485 ) 486 487 return logs # noqa: TRY300 488 489 except Exception as ex: 490 return f"Failed to get logs for connection '{connection_id}': {ex}"
Get the logs from a sync job attempt on Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.
494def list_deployed_cloud_connections() -> list[CloudConnection]: 495 """List all deployed connections in the Airbyte Cloud workspace. 496 497 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 498 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 499 Airbyte Cloud API. 500 """ 501 workspace: CloudWorkspace = _get_cloud_workspace() 502 return workspace.list_connections()
List all deployed connections in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLIENT_ID
, AIRBYTE_CLIENT_SECRET
, AIRBYTE_WORKSPACE_ID
,
and AIRBYTE_API_ROOT
environment variables will be used to authenticate with the
Airbyte Cloud API.