airbyte_ops_mcp.mcp.agent_message_bus

MCP tools for managing GitHub issue/PR subscriptions.

This module exposes subscription management as MCP tools for AI agents. It is a thin wrapper that calls the Cloud Run subscription API via HTTP.

MCP reference

MCP primitives registered by the agent_message_bus module of the airbyte-internal-ops server: 3 tool(s), 0 prompt(s), 0 resource(s).

Tools (3)

list_github_subscriptions

Hints: read-only · idempotent · open-world

List all active GitHub issue/PR subscriptions for this session.

Returns the list of GitHub issues and PRs that this session is currently subscribed to, along with their expiry times.

Parameters:

Name Type Required Default Description
agent_session_url string yes Your Devin session URL. Use the session URL from your system prompt.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "agent_session_url": {
      "description": "Your Devin session URL. Use the session URL from your system prompt.",
      "type": "string"
    }
  },
  "required": [
    "agent_session_url"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Response from the list_github_subscriptions tool.",
  "properties": {
    "success": {
      "description": "Whether the listing was successful",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable status message",
      "type": "string"
    },
    "subscriptions": {
      "description": "List of active subscriptions with id, github_url, expires_at",
      "items": {
        "additionalProperties": {
          "type": "string"
        },
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "success",
    "message"
  ],
  "type": "object"
}

subscribe_to_github_issue

Hints: idempotent · open-world

Subscribe to notifications on a GitHub issue or pull request.

Creates a subscription that will deliver real-time notifications back to your Devin session when activity occurs on the specified GitHub issue or PR. Notifications are triggered by GitHub webhooks and delivered within seconds.

If you are already subscribed to the same issue/PR, the subscription is updated (TTL extended, watch events merged).

Use this tool when you need to monitor a GitHub issue or PR for changes, new comments, merges, closures, or other activity.

Parameters:

Name Type Required Default Description
github_url string yes The GitHub issue or PR URL to subscribe to. Examples: https://github.com/airbytehq/airbyte/issues/123 or https://github.com/airbytehq/airbyte/pull/456
agent_session_url string yes Your Devin session URL so notifications can be delivered back to your session. Use the session URL from your system prompt.
watch_events array<string> | null no null Optional list of event types to watch. Valid values: 'comment', 'close', 'merge', 'reopen', 'label', 'synchronize', 'ready_for_review', 'assigned'. Defaults to all events if not specified.
ttl_hours integer no 240 Number of hours until the subscription expires. Default is 240 (10 days).
slack_users_cc string | null no null Optional comma-delimited list of Slack user tags to CC on notifications. Example: '<@U12345>, <@U67890>'.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "github_url": {
      "description": "The GitHub issue or PR URL to subscribe to. Examples: https://github.com/airbytehq/airbyte/issues/123 or https://github.com/airbytehq/airbyte/pull/456",
      "type": "string"
    },
    "agent_session_url": {
      "description": "Your Devin session URL so notifications can be delivered back to your session. Use the session URL from your system prompt.",
      "type": "string"
    },
    "watch_events": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional list of event types to watch. Valid values: 'comment', 'close', 'merge', 'reopen', 'label', 'synchronize', 'ready_for_review', 'assigned'. Defaults to all events if not specified."
    },
    "ttl_hours": {
      "default": 240,
      "description": "Number of hours until the subscription expires. Default is 240 (10 days).",
      "type": "integer"
    },
    "slack_users_cc": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional comma-delimited list of Slack user tags to CC on notifications. Example: '<@U12345>, <@U67890>'."
    }
  },
  "required": [
    "github_url",
    "agent_session_url"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Response from the subscribe_to_github_issue tool.",
  "properties": {
    "success": {
      "description": "Whether the subscription was created successfully",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable status message",
      "type": "string"
    },
    "subscription_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "ID of the created or updated subscription"
    },
    "github_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "GitHub URL being watched"
    },
    "expires_at": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "When the subscription expires (ISO 8601)"
    }
  },
  "required": [
    "success",
    "message"
  ],
  "type": "object"
}

unsubscribe_from_github_issue

Hints: idempotent · open-world

Unsubscribe from notifications on a GitHub issue or pull request.

Removes an active subscription so you will no longer receive notifications for the specified issue/PR.

You can unsubscribe by:

  • Providing a specific subscription_id
  • Providing a github_url + session_url to unsubscribe from that specific issue/PR
  • Providing only session_url to unsubscribe from all issues/PRs

Parameters:

Name Type Required Default Description
agent_session_url string yes Your Devin session URL. Use the session URL from your system prompt.
github_url string | null no null The GitHub issue or PR URL to unsubscribe from. If not provided, all subscriptions for this session are removed.
subscription_id string | null no null Optional specific subscription ID to remove. Use this if you know the exact subscription to cancel.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "agent_session_url": {
      "description": "Your Devin session URL. Use the session URL from your system prompt.",
      "type": "string"
    },
    "github_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The GitHub issue or PR URL to unsubscribe from. If not provided, all subscriptions for this session are removed."
    },
    "subscription_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional specific subscription ID to remove. Use this if you know the exact subscription to cancel."
    }
  },
  "required": [
    "agent_session_url"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Response from the unsubscribe_from_github_issue tool.",
  "properties": {
    "success": {
      "description": "Whether the unsubscribe was successful",
      "type": "boolean"
    },
    "message": {
      "description": "Human-readable status message",
      "type": "string"
    },
    "deleted_count": {
      "default": 0,
      "description": "Number of subscriptions removed",
      "type": "integer"
    }
  },
  "required": [
    "success",
    "message"
  ],
  "type": "object"
}

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""MCP tools for managing GitHub issue/PR subscriptions.
  3
  4This module exposes subscription management as MCP tools for AI agents.
  5It is a thin wrapper that calls the Cloud Run subscription API via HTTP.
  6
  7## MCP reference
  8
  9.. include:: ../../../docs/mcp-generated/agent_message_bus.md
 10    :start-line: 2
 11"""
 12
 13from __future__ import annotations
 14
 15__all__: list[str] = []
 16
 17import logging
 18import os
 19from typing import Annotated
 20
 21import requests
 22from fastmcp import FastMCP
 23from fastmcp_extensions import mcp_tool, register_mcp_tools
 24from pydantic import BaseModel, Field
 25
 26logger = logging.getLogger(__name__)
 27
 28SUBSCRIPTION_API_URL_ENV = "SUBSCRIPTION_API_URL"
 29SUBSCRIPTION_API_TOKEN_ENV = "SUBSCRIPTION_API_BEARER_TOKEN"
 30
 31
 32def _get_api_url() -> str:
 33    """Get the subscription API base URL."""
 34    url = os.environ.get(SUBSCRIPTION_API_URL_ENV)
 35    if not url:
 36        raise ValueError(
 37            f"{SUBSCRIPTION_API_URL_ENV} environment variable is not set. "
 38            "Cannot reach the GitHub subscriptions backend."
 39        )
 40    return url.rstrip("/")
 41
 42
 43def _get_api_token() -> str:
 44    """Get the subscription API bearer token."""
 45    token = os.environ.get(SUBSCRIPTION_API_TOKEN_ENV)
 46    if not token:
 47        raise ValueError(
 48            f"{SUBSCRIPTION_API_TOKEN_ENV} environment variable is not set. "
 49            "Cannot authenticate to the GitHub subscriptions backend."
 50        )
 51    return token
 52
 53
 54def _api_headers() -> dict[str, str]:
 55    """Build headers for API requests."""
 56    return {
 57        "Authorization": f"Bearer {_get_api_token()}",
 58        "Content-Type": "application/json",
 59    }
 60
 61
 62class SubscribeResponse(BaseModel):
 63    """Response from the subscribe_to_github_issue tool."""
 64
 65    success: bool = Field(
 66        description="Whether the subscription was created successfully"
 67    )
 68    message: str = Field(description="Human-readable status message")
 69    subscription_id: str | None = Field(
 70        default=None,
 71        description="ID of the created or updated subscription",
 72    )
 73    github_url: str | None = Field(
 74        default=None,
 75        description="GitHub URL being watched",
 76    )
 77    expires_at: str | None = Field(
 78        default=None,
 79        description="When the subscription expires (ISO 8601)",
 80    )
 81
 82
 83class UnsubscribeResponse(BaseModel):
 84    """Response from the unsubscribe_from_github_issue tool."""
 85
 86    success: bool = Field(description="Whether the unsubscribe was successful")
 87    message: str = Field(description="Human-readable status message")
 88    deleted_count: int = Field(
 89        default=0,
 90        description="Number of subscriptions removed",
 91    )
 92
 93
 94class ListSubscriptionsResponse(BaseModel):
 95    """Response from the list_github_subscriptions tool."""
 96
 97    success: bool = Field(description="Whether the listing was successful")
 98    message: str = Field(description="Human-readable status message")
 99    subscriptions: list[dict[str, str]] = Field(
100        default_factory=list,
101        description="List of active subscriptions with id, github_url, expires_at",
102    )
103
104
105@mcp_tool(
106    read_only=False,
107    idempotent=True,
108    open_world=True,
109)
110def subscribe_to_github_issue(
111    github_url: Annotated[
112        str,
113        "The GitHub issue or PR URL to subscribe to. "
114        "Examples: https://github.com/airbytehq/airbyte/issues/123 "
115        "or https://github.com/airbytehq/airbyte/pull/456",
116    ],
117    agent_session_url: Annotated[
118        str,
119        "Your Devin session URL so notifications can be delivered back to "
120        "your session. Use the session URL from your system prompt.",
121    ],
122    watch_events: Annotated[
123        list[str] | None,
124        "Optional list of event types to watch. Valid values: "
125        "'comment', 'close', 'merge', 'reopen', 'label', 'synchronize', "
126        "'ready_for_review', 'assigned'. Defaults to all events if not specified.",
127    ] = None,
128    ttl_hours: Annotated[
129        int,
130        "Number of hours until the subscription expires. Default is 240 (10 days).",
131    ] = 240,
132    slack_users_cc: Annotated[
133        str | None,
134        "Optional comma-delimited list of Slack user tags to CC on "
135        "notifications. Example: '<@U12345>, <@U67890>'.",
136    ] = None,
137) -> SubscribeResponse:
138    """Subscribe to notifications on a GitHub issue or pull request.
139
140    Creates a subscription that will deliver real-time notifications back
141    to your Devin session when activity occurs on the specified GitHub
142    issue or PR. Notifications are triggered by GitHub webhooks and
143    delivered within seconds.
144
145    If you are already subscribed to the same issue/PR, the subscription
146    is updated (TTL extended, watch events merged).
147
148    Use this tool when you need to monitor a GitHub issue or PR for
149    changes, new comments, merges, closures, or other activity.
150    """
151    try:
152        api_url = _get_api_url()
153        body: dict[str, str | list[str] | int | None] = {
154            "github_url": github_url,
155            "session_url": agent_session_url,
156            "ttl_hours": ttl_hours,
157        }
158        if watch_events:
159            body["watch_events"] = watch_events
160        if slack_users_cc:
161            body["slack_users_cc"] = slack_users_cc
162
163        response = requests.post(
164            f"{api_url}/subscriptions",
165            json=body,
166            headers=_api_headers(),
167            timeout=10,
168        )
169        response.raise_for_status()
170        data = response.json()
171
172        return SubscribeResponse(
173            success=True,
174            message=(
175                f"Subscribed to {github_url}. "
176                f"You will receive notifications in this session until "
177                f"{data.get('expires_at', 'expiry unknown')}."
178            ),
179            subscription_id=data.get("id"),
180            github_url=github_url,
181            expires_at=data.get("expires_at"),
182        )
183
184    except ValueError as e:
185        return SubscribeResponse(
186            success=False,
187            message=f"Configuration error: {e}",
188        )
189    except requests.RequestException as e:
190        logger.exception("Failed to create subscription")
191        return SubscribeResponse(
192            success=False,
193            message=f"Failed to create subscription: {e}",
194        )
195
196
197@mcp_tool(
198    read_only=False,
199    idempotent=True,
200    open_world=True,
201)
202def unsubscribe_from_github_issue(
203    agent_session_url: Annotated[
204        str,
205        "Your Devin session URL. Use the session URL from your system prompt.",
206    ],
207    github_url: Annotated[
208        str | None,
209        "The GitHub issue or PR URL to unsubscribe from. "
210        "If not provided, all subscriptions for this session are removed.",
211    ] = None,
212    subscription_id: Annotated[
213        str | None,
214        "Optional specific subscription ID to remove. "
215        "Use this if you know the exact subscription to cancel.",
216    ] = None,
217) -> UnsubscribeResponse:
218    """Unsubscribe from notifications on a GitHub issue or pull request.
219
220    Removes an active subscription so you will no longer receive
221    notifications for the specified issue/PR.
222
223    You can unsubscribe by:
224    - Providing a specific subscription_id
225    - Providing a github_url + session_url to unsubscribe from that specific issue/PR
226    - Providing only session_url to unsubscribe from all issues/PRs
227    """
228    try:
229        api_url = _get_api_url()
230
231        if subscription_id:
232            # Delete by ID
233            response = requests.delete(
234                f"{api_url}/subscriptions/{subscription_id}",
235                headers=_api_headers(),
236                timeout=10,
237            )
238        else:
239            # Delete by match
240            params: dict[str, str] = {"session_url": agent_session_url}
241            if github_url:
242                params["github_url"] = github_url
243            response = requests.delete(
244                f"{api_url}/subscriptions",
245                params=params,
246                headers=_api_headers(),
247                timeout=10,
248            )
249
250        response.raise_for_status()
251        data = response.json()
252        count = data.get("deleted_count", 0)
253
254        return UnsubscribeResponse(
255            success=True,
256            message=f"Removed {count} subscription(s).",
257            deleted_count=count,
258        )
259
260    except ValueError as e:
261        return UnsubscribeResponse(
262            success=False,
263            message=f"Configuration error: {e}",
264        )
265    except requests.RequestException as e:
266        logger.exception("Failed to unsubscribe")
267        return UnsubscribeResponse(
268            success=False,
269            message=f"Failed to unsubscribe: {e}",
270        )
271
272
273@mcp_tool(
274    read_only=True,
275    idempotent=True,
276    open_world=True,
277)
278def list_github_subscriptions(
279    agent_session_url: Annotated[
280        str,
281        "Your Devin session URL. Use the session URL from your system prompt.",
282    ],
283) -> ListSubscriptionsResponse:
284    """List all active GitHub issue/PR subscriptions for this session.
285
286    Returns the list of GitHub issues and PRs that this session is
287    currently subscribed to, along with their expiry times.
288    """
289    try:
290        api_url = _get_api_url()
291
292        response = requests.get(
293            f"{api_url}/subscriptions",
294            params={"session_url": agent_session_url},
295            headers=_api_headers(),
296            timeout=10,
297        )
298        response.raise_for_status()
299        data = response.json()
300
301        subs = [
302            {
303                "id": s["id"],
304                "github_url": s["github_url"],
305                "watch_events": ", ".join(s.get("watch_events", [])),
306                "expires_at": s.get("expires_at", "unknown"),
307            }
308            for s in data
309        ]
310
311        if not subs:
312            return ListSubscriptionsResponse(
313                success=True,
314                message="No active subscriptions for this session.",
315                subscriptions=[],
316            )
317
318        return ListSubscriptionsResponse(
319            success=True,
320            message=f"Found {len(subs)} active subscription(s).",
321            subscriptions=subs,
322        )
323
324    except ValueError as e:
325        return ListSubscriptionsResponse(
326            success=False,
327            message=f"Configuration error: {e}",
328        )
329    except requests.RequestException as e:
330        logger.exception("Failed to list subscriptions")
331        return ListSubscriptionsResponse(
332            success=False,
333            message=f"Failed to list subscriptions: {e}",
334        )
335
336
337def register_message_bus_tools(app: FastMCP) -> None:
338    """Register message bus tools with the FastMCP app."""
339    register_mcp_tools(app, mcp_module=__name__)