airbyte.mcp.cloud_ops

Airbyte Cloud MCP operations.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Airbyte Cloud MCP operations."""
  3
  4from typing import Annotated, Any
  5
  6from fastmcp import FastMCP
  7from pydantic import Field
  8
  9from airbyte import cloud, get_destination, get_source
 10from airbyte.cloud.auth import (
 11    resolve_cloud_api_url,
 12    resolve_cloud_client_id,
 13    resolve_cloud_client_secret,
 14    resolve_cloud_workspace_id,
 15)
 16from airbyte.cloud.connections import CloudConnection
 17from airbyte.cloud.connectors import CloudDestination, CloudSource
 18from airbyte.cloud.workspaces import CloudWorkspace
 19from airbyte.destinations.util import get_noop_destination
 20from airbyte.mcp._util import resolve_config, resolve_list_of_strings
 21
 22
 23def _get_cloud_workspace() -> CloudWorkspace:
 24    """Get an authenticated CloudWorkspace using environment variables."""
 25    return CloudWorkspace(
 26        workspace_id=resolve_cloud_workspace_id(),
 27        client_id=resolve_cloud_client_id(),
 28        client_secret=resolve_cloud_client_secret(),
 29        api_root=resolve_cloud_api_url(),
 30    )
 31
 32
 33# @app.tool()  # << deferred
 34def deploy_source_to_cloud(
 35    source_name: Annotated[
 36        str,
 37        Field(description="The name to use when deploying the source."),
 38    ],
 39    source_connector_name: Annotated[
 40        str,
 41        Field(description="The name of the source connector (e.g., 'source-faker')."),
 42    ],
 43    *,
 44    config: Annotated[
 45        dict | str | None,
 46        Field(
 47            description="The configuration for the source connector.",
 48            default=None,
 49        ),
 50    ],
 51    config_secret_name: Annotated[
 52        str | None,
 53        Field(
 54            description="The name of the secret containing the configuration.",
 55            default=None,
 56        ),
 57    ],
 58    unique: Annotated[
 59        bool,
 60        Field(
 61            description="Whether to require a unique name.",
 62            default=True,
 63        ),
 64    ],
 65) -> str:
 66    """Deploy a source connector to Airbyte Cloud.
 67
 68    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 69    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 70    Airbyte Cloud API.
 71    """
 72    try:
 73        source = get_source(
 74            source_connector_name,
 75            install_if_missing=False,
 76        )
 77        config_dict = resolve_config(
 78            config=config,
 79            config_secret_name=config_secret_name,
 80            config_spec_jsonschema=source.config_spec,
 81        )
 82        source.set_config(config_dict)
 83
 84        workspace: CloudWorkspace = _get_cloud_workspace()
 85        deployed_source = workspace.deploy_source(
 86            name=source_name,
 87            source=source,
 88            unique=unique,
 89        )
 90
 91    except Exception as ex:
 92        return f"Failed to deploy source '{source_name}': {ex}"
 93    else:
 94        return (
 95            f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
 96            f" and URL: {deployed_source.connector_url}"
 97        )
 98
 99
100# @app.tool()  # << deferred
101def deploy_destination_to_cloud(
102    destination_name: Annotated[
103        str,
104        Field(description="The name to use when deploying the destination."),
105    ],
106    destination_connector_name: Annotated[
107        str,
108        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
109    ],
110    *,
111    config: Annotated[
112        dict | str | None,
113        Field(
114            description="The configuration for the destination connector.",
115            default=None,
116        ),
117    ],
118    config_secret_name: Annotated[
119        str | None,
120        Field(
121            description="The name of the secret containing the configuration.",
122            default=None,
123        ),
124    ],
125    unique: Annotated[
126        bool,
127        Field(
128            description="Whether to require a unique name.",
129            default=True,
130        ),
131    ],
132) -> str:
133    """Deploy a destination connector to Airbyte Cloud.
134
135    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
136    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
137    Airbyte Cloud API.
138    """
139    try:
140        destination = get_destination(
141            destination_connector_name,
142            install_if_missing=False,
143        )
144        config_dict = resolve_config(
145            config=config,
146            config_secret_name=config_secret_name,
147            config_spec_jsonschema=destination.config_spec,
148        )
149        destination.set_config(config_dict)
150
151        workspace: CloudWorkspace = _get_cloud_workspace()
152        deployed_destination = workspace.deploy_destination(
153            name=destination_name,
154            destination=destination,
155            unique=unique,
156        )
157
158    except Exception as ex:
159        return f"Failed to deploy destination '{destination_name}': {ex}"
160    else:
161        return (
162            f"Successfully deployed destination '{destination_name}' "
163            f"with ID: {deployed_destination.connector_id}"
164        )
165
166
167# @app.tool()  # << deferred
168def create_connection_on_cloud(
169    connection_name: Annotated[
170        str,
171        Field(description="The name of the connection."),
172    ],
173    source_id: Annotated[
174        str,
175        Field(description="The ID of the deployed source."),
176    ],
177    destination_id: Annotated[
178        str,
179        Field(description="The ID of the deployed destination."),
180    ],
181    selected_streams: Annotated[
182        str | list[str],
183        Field(
184            description=(
185                "The selected stream names to sync within the connection. "
186                "Must be an explicit stream name or list of streams. "
187                "Cannot be empty or '*'."
188            )
189        ),
190    ],
191    table_prefix: Annotated[
192        str | None,
193        Field(
194            description="Optional table prefix to use when syncing to the destination.",
195            default=None,
196        ),
197    ],
198) -> str:
199    """Create a connection between a deployed source and destination on Airbyte Cloud.
200
201    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
202    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
203    Airbyte Cloud API.
204    """
205    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
206    try:
207        workspace: CloudWorkspace = _get_cloud_workspace()
208        deployed_connection = workspace.deploy_connection(
209            connection_name=connection_name,
210            source=source_id,
211            destination=destination_id,
212            selected_streams=resolved_streams_list,
213            table_prefix=table_prefix,
214        )
215
216    except Exception as ex:
217        return f"Failed to create connection '{connection_name}': {ex}"
218    else:
219        return (
220            f"Successfully created connection '{connection_name}' "
221            f"with ID '{deployed_connection.connection_id}' and "
222            f"URL: {deployed_connection.connection_url}"
223        )
224
225
226# @app.tool()  # << deferred
227def run_cloud_sync(
228    connection_id: Annotated[
229        str,
230        Field(description="The ID of the Airbyte Cloud connection."),
231    ],
232    *,
233    wait: Annotated[
234        bool,
235        Field(
236            description="Whether to wait for the sync to complete.",
237            default=True,
238        ),
239    ],
240    wait_timeout: Annotated[
241        int,
242        Field(
243            description="Maximum time to wait for sync completion (seconds).",
244            default=300,
245        ),
246    ],
247) -> str:
248    """Run a sync job on Airbyte Cloud.
249
250    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
251    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
252    Airbyte Cloud API.
253    """
254    try:
255        workspace: CloudWorkspace = _get_cloud_workspace()
256        connection = workspace.get_connection(connection_id=connection_id)
257        sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
258
259    except Exception as ex:
260        return f"Failed to run sync for connection '{connection_id}': {ex}"
261    else:
262        if wait:
263            status = sync_result.get_job_status()
264            return (
265                f"Sync completed with status: {status}. "  # Sync completed.
266                f"Job ID is '{sync_result.job_id}' and "
267                f"job URL is: {sync_result.job_url}"
268            )
269        return (
270            f"Sync started. "  # Sync started.
271            f"Job ID is '{sync_result.job_id}' and "
272            f"job URL is: {sync_result.job_url}"
273        )
274
275
276# @app.tool()  # << deferred
277def check_airbyte_cloud_workspace() -> str:
278    """Check if we have a valid Airbyte Cloud connection and return workspace info.
279
280    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
281    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
282    Airbyte Cloud API.
283
284    Returns workspace ID and workspace URL for verification.
285    """
286    try:
287        workspace: CloudWorkspace = _get_cloud_workspace()
288        workspace.connect()
289
290    except Exception as ex:
291        return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}"
292    else:
293        return (
294            f"✅ Successfully connected to Airbyte Cloud workspace.\n"
295            f"Workspace ID: {workspace.workspace_id}\n"
296            f"Workspace URL: {workspace.workspace_url}"
297        )
298
299
300# @app.tool()  # << deferred
301def deploy_noop_destination_to_cloud(
302    name: str = "No-op Destination",
303    *,
304    unique: bool = True,
305) -> str:
306    """Deploy the No-op destination to Airbyte Cloud for testing purposes.
307
308    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
309    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
310    Airbyte Cloud API.
311    """
312    try:
313        destination = get_noop_destination()
314        workspace: CloudWorkspace = _get_cloud_workspace()
315        deployed_destination = workspace.deploy_destination(
316            name=name,
317            destination=destination,
318            unique=unique,
319        )
320    except Exception as ex:
321        return f"Failed to deploy No-op Destination: {ex}"
322    else:
323        return (
324            f"Successfully deployed No-op Destination "
325            f"with ID '{deployed_destination.connector_id}' and "
326            f"URL: {deployed_destination.connector_url}"
327        )
328
329
330# @app.tool()  # << deferred
331def get_cloud_sync_status(
332    connection_id: Annotated[
333        str,
334        Field(
335            description="The ID of the Airbyte Cloud connection.",
336        ),
337    ],
338    job_id: Annotated[
339        int | None,
340        Field(
341            description="Optional job ID. If not provided, the latest job will be used.",
342            default=None,
343        ),
344    ],
345    *,
346    include_attempts: Annotated[
347        bool,
348        Field(
349            description="Whether to include detailed attempts information.",
350            default=False,
351        ),
352    ],
353) -> dict[str, Any]:
354    """Get the status of a sync job from the Airbyte Cloud.
355
356    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
357    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
358    Airbyte Cloud API.
359    """
360    try:
361        workspace: CloudWorkspace = _get_cloud_workspace()
362        connection = workspace.get_connection(connection_id=connection_id)
363
364        # If a job ID is provided, get the job by ID.
365        sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
366
367        if not sync_result:
368            return {"status": None, "job_id": None, "attempts": []}
369
370        result = {
371            "status": sync_result.get_job_status(),
372            "job_id": sync_result.job_id,
373            "bytes_synced": sync_result.bytes_synced,
374            "records_synced": sync_result.records_synced,
375            "start_time": sync_result.start_time.isoformat(),
376            "job_url": sync_result.job_url,
377            "attempts": [],
378        }
379
380        if include_attempts:
381            attempts = sync_result.get_attempts()
382            result["attempts"] = [
383                {
384                    "attempt_number": attempt.attempt_number,
385                    "attempt_id": attempt.attempt_id,
386                    "status": attempt.status,
387                    "bytes_synced": attempt.bytes_synced,
388                    "records_synced": attempt.records_synced,
389                    "created_at": attempt.created_at.isoformat(),
390                }
391                for attempt in attempts
392            ]
393
394        return result  # noqa: TRY300
395
396    except Exception as ex:
397        return {
398            "status": None,
399            "job_id": job_id,
400            "error": f"Failed to get sync status for connection '{connection_id}': {ex}",
401            "attempts": [],
402        }
403
404
405# @app.tool()  # << deferred
406def list_deployed_cloud_source_connectors() -> list[CloudSource]:
407    """List all deployed source connectors in the Airbyte Cloud workspace.
408
409    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
410    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
411    Airbyte Cloud API.
412    """
413    workspace: CloudWorkspace = _get_cloud_workspace()
414    return workspace.list_sources()
415
416
417# @app.tool()  # << deferred
418def list_deployed_cloud_destination_connectors() -> list[CloudDestination]:
419    """List all deployed destination connectors in the Airbyte Cloud workspace.
420
421    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
422    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
423    Airbyte Cloud API.
424    """
425    workspace: CloudWorkspace = _get_cloud_workspace()
426    return workspace.list_destinations()
427
428
429# @app.tool()  # << deferred
430def get_cloud_sync_logs(
431    connection_id: Annotated[
432        str,
433        Field(description="The ID of the Airbyte Cloud connection."),
434    ],
435    job_id: Annotated[
436        int | None,
437        Field(description="Optional job ID. If not provided, the latest job will be used."),
438    ] = None,
439    attempt_number: Annotated[
440        int | None,
441        Field(
442            description="Optional attempt number. If not provided, the latest attempt will be used."
443        ),
444    ] = None,
445) -> str:
446    """Get the logs from a sync job attempt on Airbyte Cloud.
447
448    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
449    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
450    Airbyte Cloud API.
451    """
452    try:
453        workspace: CloudWorkspace = _get_cloud_workspace()
454        connection = workspace.get_connection(connection_id=connection_id)
455
456        sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
457
458        if not sync_result:
459            return f"No sync job found for connection '{connection_id}'"
460
461        attempts = sync_result.get_attempts()
462
463        if not attempts:
464            return f"No attempts found for job '{sync_result.job_id}'"
465
466        if attempt_number is not None:
467            target_attempt = None
468            for attempt in attempts:
469                if attempt.attempt_number == attempt_number:
470                    target_attempt = attempt
471                    break
472
473            if target_attempt is None:
474                return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'"
475        else:
476            target_attempt = max(attempts, key=lambda a: a.attempt_number)
477
478        logs = target_attempt.get_full_log_text()
479
480        if not logs:
481            return (
482                f"No logs available for job '{sync_result.job_id}', "
483                f"attempt {target_attempt.attempt_number}"
484            )
485
486        return logs  # noqa: TRY300
487
488    except Exception as ex:
489        return f"Failed to get logs for connection '{connection_id}': {ex}"
490
491
492# @app.tool()  # << deferred
493def list_deployed_cloud_connections() -> list[CloudConnection]:
494    """List all deployed connections in the Airbyte Cloud workspace.
495
496    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
497    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
498    Airbyte Cloud API.
499    """
500    workspace: CloudWorkspace = _get_cloud_workspace()
501    return workspace.list_connections()
502
503
504def register_cloud_ops_tools(app: FastMCP) -> None:
505    """@private Register tools with the FastMCP app.
506
507    This is an internal function and should not be called directly.
508    """
509    app.tool(check_airbyte_cloud_workspace)
510    app.tool(deploy_source_to_cloud)
511    app.tool(deploy_destination_to_cloud)
512    app.tool(deploy_noop_destination_to_cloud)
513    app.tool(create_connection_on_cloud)
514    app.tool(run_cloud_sync)
515    app.tool(get_cloud_sync_status)
516    app.tool(get_cloud_sync_logs)
517    app.tool(list_deployed_cloud_source_connectors)
518    app.tool(list_deployed_cloud_destination_connectors)
519    app.tool(list_deployed_cloud_connections)
def deploy_source_to_cloud( source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the source.')], source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the source connector (e.g., 'source-faker').")], *, config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
35def deploy_source_to_cloud(
36    source_name: Annotated[
37        str,
38        Field(description="The name to use when deploying the source."),
39    ],
40    source_connector_name: Annotated[
41        str,
42        Field(description="The name of the source connector (e.g., 'source-faker')."),
43    ],
44    *,
45    config: Annotated[
46        dict | str | None,
47        Field(
48            description="The configuration for the source connector.",
49            default=None,
50        ),
51    ],
52    config_secret_name: Annotated[
53        str | None,
54        Field(
55            description="The name of the secret containing the configuration.",
56            default=None,
57        ),
58    ],
59    unique: Annotated[
60        bool,
61        Field(
62            description="Whether to require a unique name.",
63            default=True,
64        ),
65    ],
66) -> str:
67    """Deploy a source connector to Airbyte Cloud.
68
69    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
70    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
71    Airbyte Cloud API.
72    """
73    try:
74        source = get_source(
75            source_connector_name,
76            install_if_missing=False,
77        )
78        config_dict = resolve_config(
79            config=config,
80            config_secret_name=config_secret_name,
81            config_spec_jsonschema=source.config_spec,
82        )
83        source.set_config(config_dict)
84
85        workspace: CloudWorkspace = _get_cloud_workspace()
86        deployed_source = workspace.deploy_source(
87            name=source_name,
88            source=source,
89            unique=unique,
90        )
91
92    except Exception as ex:
93        return f"Failed to deploy source '{source_name}': {ex}"
94    else:
95        return (
96            f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
97            f" and URL: {deployed_source.connector_url}"
98        )

Deploy a source connector to Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

def deploy_destination_to_cloud( destination_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the destination.')], destination_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the destination connector (e.g., 'destination-postgres').")], *, config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
102def deploy_destination_to_cloud(
103    destination_name: Annotated[
104        str,
105        Field(description="The name to use when deploying the destination."),
106    ],
107    destination_connector_name: Annotated[
108        str,
109        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
110    ],
111    *,
112    config: Annotated[
113        dict | str | None,
114        Field(
115            description="The configuration for the destination connector.",
116            default=None,
117        ),
118    ],
119    config_secret_name: Annotated[
120        str | None,
121        Field(
122            description="The name of the secret containing the configuration.",
123            default=None,
124        ),
125    ],
126    unique: Annotated[
127        bool,
128        Field(
129            description="Whether to require a unique name.",
130            default=True,
131        ),
132    ],
133) -> str:
134    """Deploy a destination connector to Airbyte Cloud.
135
136    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
137    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
138    Airbyte Cloud API.
139    """
140    try:
141        destination = get_destination(
142            destination_connector_name,
143            install_if_missing=False,
144        )
145        config_dict = resolve_config(
146            config=config,
147            config_secret_name=config_secret_name,
148            config_spec_jsonschema=destination.config_spec,
149        )
150        destination.set_config(config_dict)
151
152        workspace: CloudWorkspace = _get_cloud_workspace()
153        deployed_destination = workspace.deploy_destination(
154            name=destination_name,
155            destination=destination,
156            unique=unique,
157        )
158
159    except Exception as ex:
160        return f"Failed to deploy destination '{destination_name}': {ex}"
161    else:
162        return (
163            f"Successfully deployed destination '{destination_name}' "
164            f"with ID: {deployed_destination.connector_id}"
165        )

Deploy a destination connector to Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

def create_connection_on_cloud( connection_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connection.')], source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source.')], destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination.')], selected_streams: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description="The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'.")], table_prefix: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional table prefix to use when syncing to the destination.')]) -> str:
169def create_connection_on_cloud(
170    connection_name: Annotated[
171        str,
172        Field(description="The name of the connection."),
173    ],
174    source_id: Annotated[
175        str,
176        Field(description="The ID of the deployed source."),
177    ],
178    destination_id: Annotated[
179        str,
180        Field(description="The ID of the deployed destination."),
181    ],
182    selected_streams: Annotated[
183        str | list[str],
184        Field(
185            description=(
186                "The selected stream names to sync within the connection. "
187                "Must be an explicit stream name or list of streams. "
188                "Cannot be empty or '*'."
189            )
190        ),
191    ],
192    table_prefix: Annotated[
193        str | None,
194        Field(
195            description="Optional table prefix to use when syncing to the destination.",
196            default=None,
197        ),
198    ],
199) -> str:
200    """Create a connection between a deployed source and destination on Airbyte Cloud.
201
202    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
203    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
204    Airbyte Cloud API.
205    """
206    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
207    try:
208        workspace: CloudWorkspace = _get_cloud_workspace()
209        deployed_connection = workspace.deploy_connection(
210            connection_name=connection_name,
211            source=source_id,
212            destination=destination_id,
213            selected_streams=resolved_streams_list,
214            table_prefix=table_prefix,
215        )
216
217    except Exception as ex:
218        return f"Failed to create connection '{connection_name}': {ex}"
219    else:
220        return (
221            f"Successfully created connection '{connection_name}' "
222            f"with ID '{deployed_connection.connection_id}' and "
223            f"URL: {deployed_connection.connection_url}"
224        )

Create a connection between a deployed source and destination on Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

def run_cloud_sync( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, wait: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to wait for the sync to complete.')], wait_timeout: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=300, description='Maximum time to wait for sync completion (seconds).')]) -> str:
228def run_cloud_sync(
229    connection_id: Annotated[
230        str,
231        Field(description="The ID of the Airbyte Cloud connection."),
232    ],
233    *,
234    wait: Annotated[
235        bool,
236        Field(
237            description="Whether to wait for the sync to complete.",
238            default=True,
239        ),
240    ],
241    wait_timeout: Annotated[
242        int,
243        Field(
244            description="Maximum time to wait for sync completion (seconds).",
245            default=300,
246        ),
247    ],
248) -> str:
249    """Run a sync job on Airbyte Cloud.
250
251    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
252    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
253    Airbyte Cloud API.
254    """
255    try:
256        workspace: CloudWorkspace = _get_cloud_workspace()
257        connection = workspace.get_connection(connection_id=connection_id)
258        sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
259
260    except Exception as ex:
261        return f"Failed to run sync for connection '{connection_id}': {ex}"
262    else:
263        if wait:
264            status = sync_result.get_job_status()
265            return (
266                f"Sync completed with status: {status}. "  # Sync completed.
267                f"Job ID is '{sync_result.job_id}' and "
268                f"job URL is: {sync_result.job_url}"
269            )
270        return (
271            f"Sync started. "  # Sync started.
272            f"Job ID is '{sync_result.job_id}' and "
273            f"job URL is: {sync_result.job_url}"
274        )

Run a sync job on Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

def check_airbyte_cloud_workspace() -> str:
278def check_airbyte_cloud_workspace() -> str:
279    """Check if we have a valid Airbyte Cloud connection and return workspace info.
280
281    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
282    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
283    Airbyte Cloud API.
284
285    Returns workspace ID and workspace URL for verification.
286    """
287    try:
288        workspace: CloudWorkspace = _get_cloud_workspace()
289        workspace.connect()
290
291    except Exception as ex:
292        return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}"
293    else:
294        return (
295            f"✅ Successfully connected to Airbyte Cloud workspace.\n"
296            f"Workspace ID: {workspace.workspace_id}\n"
297            f"Workspace URL: {workspace.workspace_url}"
298        )

Check if we have a valid Airbyte Cloud connection and return workspace info.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

Returns workspace ID and workspace URL for verification.

def deploy_noop_destination_to_cloud(name: str = 'No-op Destination', *, unique: bool = True) -> str:
302def deploy_noop_destination_to_cloud(
303    name: str = "No-op Destination",
304    *,
305    unique: bool = True,
306) -> str:
307    """Deploy the No-op destination to Airbyte Cloud for testing purposes.
308
309    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
310    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
311    Airbyte Cloud API.
312    """
313    try:
314        destination = get_noop_destination()
315        workspace: CloudWorkspace = _get_cloud_workspace()
316        deployed_destination = workspace.deploy_destination(
317            name=name,
318            destination=destination,
319            unique=unique,
320        )
321    except Exception as ex:
322        return f"Failed to deploy No-op Destination: {ex}"
323    else:
324        return (
325            f"Successfully deployed No-op Destination "
326            f"with ID '{deployed_destination.connector_id}' and "
327            f"URL: {deployed_destination.connector_url}"
328        )

Deploy the No-op destination to Airbyte Cloud for testing purposes.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

def get_cloud_sync_status( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional job ID. If not provided, the latest job will be used.')], *, include_attempts: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to include detailed attempts information.')]) -> dict[str, typing.Any]:
332def get_cloud_sync_status(
333    connection_id: Annotated[
334        str,
335        Field(
336            description="The ID of the Airbyte Cloud connection.",
337        ),
338    ],
339    job_id: Annotated[
340        int | None,
341        Field(
342            description="Optional job ID. If not provided, the latest job will be used.",
343            default=None,
344        ),
345    ],
346    *,
347    include_attempts: Annotated[
348        bool,
349        Field(
350            description="Whether to include detailed attempts information.",
351            default=False,
352        ),
353    ],
354) -> dict[str, Any]:
355    """Get the status of a sync job from the Airbyte Cloud.
356
357    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
358    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
359    Airbyte Cloud API.
360    """
361    try:
362        workspace: CloudWorkspace = _get_cloud_workspace()
363        connection = workspace.get_connection(connection_id=connection_id)
364
365        # If a job ID is provided, get the job by ID.
366        sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
367
368        if not sync_result:
369            return {"status": None, "job_id": None, "attempts": []}
370
371        result = {
372            "status": sync_result.get_job_status(),
373            "job_id": sync_result.job_id,
374            "bytes_synced": sync_result.bytes_synced,
375            "records_synced": sync_result.records_synced,
376            "start_time": sync_result.start_time.isoformat(),
377            "job_url": sync_result.job_url,
378            "attempts": [],
379        }
380
381        if include_attempts:
382            attempts = sync_result.get_attempts()
383            result["attempts"] = [
384                {
385                    "attempt_number": attempt.attempt_number,
386                    "attempt_id": attempt.attempt_id,
387                    "status": attempt.status,
388                    "bytes_synced": attempt.bytes_synced,
389                    "records_synced": attempt.records_synced,
390                    "created_at": attempt.created_at.isoformat(),
391                }
392                for attempt in attempts
393            ]
394
395        return result  # noqa: TRY300
396
397    except Exception as ex:
398        return {
399            "status": None,
400            "job_id": job_id,
401            "error": f"Failed to get sync status for connection '{connection_id}': {ex}",
402            "attempts": [],
403        }

Get the status of a sync job from the Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

def list_deployed_cloud_source_connectors() -> list[airbyte.cloud.connectors.CloudSource]:
407def list_deployed_cloud_source_connectors() -> list[CloudSource]:
408    """List all deployed source connectors in the Airbyte Cloud workspace.
409
410    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
411    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
412    Airbyte Cloud API.
413    """
414    workspace: CloudWorkspace = _get_cloud_workspace()
415    return workspace.list_sources()

List all deployed source connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

def list_deployed_cloud_destination_connectors() -> list[airbyte.cloud.connectors.CloudDestination]:
419def list_deployed_cloud_destination_connectors() -> list[CloudDestination]:
420    """List all deployed destination connectors in the Airbyte Cloud workspace.
421
422    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
423    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
424    Airbyte Cloud API.
425    """
426    workspace: CloudWorkspace = _get_cloud_workspace()
427    return workspace.list_destinations()

List all deployed destination connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

def get_cloud_sync_logs( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional job ID. If not provided, the latest job will be used.')] = None, attempt_number: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional attempt number. If not provided, the latest attempt will be used.')] = None) -> str:
431def get_cloud_sync_logs(
432    connection_id: Annotated[
433        str,
434        Field(description="The ID of the Airbyte Cloud connection."),
435    ],
436    job_id: Annotated[
437        int | None,
438        Field(description="Optional job ID. If not provided, the latest job will be used."),
439    ] = None,
440    attempt_number: Annotated[
441        int | None,
442        Field(
443            description="Optional attempt number. If not provided, the latest attempt will be used."
444        ),
445    ] = None,
446) -> str:
447    """Get the logs from a sync job attempt on Airbyte Cloud.
448
449    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
450    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
451    Airbyte Cloud API.
452    """
453    try:
454        workspace: CloudWorkspace = _get_cloud_workspace()
455        connection = workspace.get_connection(connection_id=connection_id)
456
457        sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
458
459        if not sync_result:
460            return f"No sync job found for connection '{connection_id}'"
461
462        attempts = sync_result.get_attempts()
463
464        if not attempts:
465            return f"No attempts found for job '{sync_result.job_id}'"
466
467        if attempt_number is not None:
468            target_attempt = None
469            for attempt in attempts:
470                if attempt.attempt_number == attempt_number:
471                    target_attempt = attempt
472                    break
473
474            if target_attempt is None:
475                return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'"
476        else:
477            target_attempt = max(attempts, key=lambda a: a.attempt_number)
478
479        logs = target_attempt.get_full_log_text()
480
481        if not logs:
482            return (
483                f"No logs available for job '{sync_result.job_id}', "
484                f"attempt {target_attempt.attempt_number}"
485            )
486
487        return logs  # noqa: TRY300
488
489    except Exception as ex:
490        return f"Failed to get logs for connection '{connection_id}': {ex}"

Get the logs from a sync job attempt on Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

def list_deployed_cloud_connections() -> list[airbyte.cloud.CloudConnection]:
494def list_deployed_cloud_connections() -> list[CloudConnection]:
495    """List all deployed connections in the Airbyte Cloud workspace.
496
497    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
498    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
499    Airbyte Cloud API.
500    """
501    workspace: CloudWorkspace = _get_cloud_workspace()
502    return workspace.list_connections()

List all deployed connections in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.