airbyte_ops_mcp.mcp.agent_message_bus
MCP tools for managing GitHub issue/PR subscriptions.
This module exposes subscription management as MCP tools for AI agents. It is a thin wrapper that calls the Cloud Run subscription API via HTTP.
MCP reference
MCP primitives registered by the agent_message_bus module of the airbyte-internal-ops server: 3 tool(s), 0 prompt(s), 0 resource(s).
Tools (3)
list_github_subscriptions
Hints: read-only · idempotent · open-world
List all active GitHub issue/PR subscriptions for this session.
Returns the list of GitHub issues and PRs that this session is currently subscribed to, along with their expiry times.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
agent_session_url |
string |
yes | — | Your Devin session URL. Use the session URL from your system prompt. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"agent_session_url": {
"description": "Your Devin session URL. Use the session URL from your system prompt.",
"type": "string"
}
},
"required": [
"agent_session_url"
],
"type": "object"
}
Show output JSON schema
{
"description": "Response from the list_github_subscriptions tool.",
"properties": {
"success": {
"description": "Whether the listing was successful",
"type": "boolean"
},
"message": {
"description": "Human-readable status message",
"type": "string"
},
"subscriptions": {
"description": "List of active subscriptions with id, github_url, expires_at",
"items": {
"additionalProperties": {
"type": "string"
},
"type": "object"
},
"type": "array"
}
},
"required": [
"success",
"message"
],
"type": "object"
}
subscribe_to_github_issue
Hints: idempotent · open-world
Subscribe to notifications on a GitHub issue or pull request.
Creates a subscription that will deliver real-time notifications back to your Devin session when activity occurs on the specified GitHub issue or PR. Notifications are triggered by GitHub webhooks and delivered within seconds.
If you are already subscribed to the same issue/PR, the subscription is updated (TTL extended, watch events merged).
Use this tool when you need to monitor a GitHub issue or PR for changes, new comments, merges, closures, or other activity.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
github_url |
string |
yes | — | The GitHub issue or PR URL to subscribe to. Examples: https://github.com/airbytehq/airbyte/issues/123 or https://github.com/airbytehq/airbyte/pull/456 |
agent_session_url |
string |
yes | — | Your Devin session URL so notifications can be delivered back to your session. Use the session URL from your system prompt. |
watch_events |
array<string> | null |
no | null |
Optional list of event types to watch. Valid values: 'comment', 'close', 'merge', 'reopen', 'label', 'synchronize', 'ready_for_review', 'assigned'. Defaults to all events if not specified. |
ttl_hours |
integer |
no | 240 |
Number of hours until the subscription expires. Default is 240 (10 days). |
slack_users_cc |
string | null |
no | null |
Optional comma-delimited list of Slack user tags to CC on notifications. Example: '<@U12345>, <@U67890>'. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"github_url": {
"description": "The GitHub issue or PR URL to subscribe to. Examples: https://github.com/airbytehq/airbyte/issues/123 or https://github.com/airbytehq/airbyte/pull/456",
"type": "string"
},
"agent_session_url": {
"description": "Your Devin session URL so notifications can be delivered back to your session. Use the session URL from your system prompt.",
"type": "string"
},
"watch_events": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional list of event types to watch. Valid values: 'comment', 'close', 'merge', 'reopen', 'label', 'synchronize', 'ready_for_review', 'assigned'. Defaults to all events if not specified."
},
"ttl_hours": {
"default": 240,
"description": "Number of hours until the subscription expires. Default is 240 (10 days).",
"type": "integer"
},
"slack_users_cc": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional comma-delimited list of Slack user tags to CC on notifications. Example: '<@U12345>, <@U67890>'."
}
},
"required": [
"github_url",
"agent_session_url"
],
"type": "object"
}
Show output JSON schema
{
"description": "Response from the subscribe_to_github_issue tool.",
"properties": {
"success": {
"description": "Whether the subscription was created successfully",
"type": "boolean"
},
"message": {
"description": "Human-readable status message",
"type": "string"
},
"subscription_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "ID of the created or updated subscription"
},
"github_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "GitHub URL being watched"
},
"expires_at": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the subscription expires (ISO 8601)"
}
},
"required": [
"success",
"message"
],
"type": "object"
}
unsubscribe_from_github_issue
Hints: idempotent · open-world
Unsubscribe from notifications on a GitHub issue or pull request.
Removes an active subscription so you will no longer receive notifications for the specified issue/PR.
You can unsubscribe by:
- Providing a specific subscription_id
- Providing a github_url + session_url to unsubscribe from that specific issue/PR
- Providing only session_url to unsubscribe from all issues/PRs
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
agent_session_url |
string |
yes | — | Your Devin session URL. Use the session URL from your system prompt. |
github_url |
string | null |
no | null |
The GitHub issue or PR URL to unsubscribe from. If not provided, all subscriptions for this session are removed. |
subscription_id |
string | null |
no | null |
Optional specific subscription ID to remove. Use this if you know the exact subscription to cancel. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"agent_session_url": {
"description": "Your Devin session URL. Use the session URL from your system prompt.",
"type": "string"
},
"github_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The GitHub issue or PR URL to unsubscribe from. If not provided, all subscriptions for this session are removed."
},
"subscription_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional specific subscription ID to remove. Use this if you know the exact subscription to cancel."
}
},
"required": [
"agent_session_url"
],
"type": "object"
}
Show output JSON schema
{
"description": "Response from the unsubscribe_from_github_issue tool.",
"properties": {
"success": {
"description": "Whether the unsubscribe was successful",
"type": "boolean"
},
"message": {
"description": "Human-readable status message",
"type": "string"
},
"deleted_count": {
"default": 0,
"description": "Number of subscriptions removed",
"type": "integer"
}
},
"required": [
"success",
"message"
],
"type": "object"
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for managing GitHub issue/PR subscriptions. 3 4This module exposes subscription management as MCP tools for AI agents. 5It is a thin wrapper that calls the Cloud Run subscription API via HTTP. 6 7## MCP reference 8 9.. include:: ../../../docs/mcp-generated/agent_message_bus.md 10 :start-line: 2 11""" 12 13from __future__ import annotations 14 15__all__: list[str] = [] 16 17import logging 18import os 19from typing import Annotated 20 21import requests 22from fastmcp import FastMCP 23from fastmcp_extensions import mcp_tool, register_mcp_tools 24from pydantic import BaseModel, Field 25 26logger = logging.getLogger(__name__) 27 28SUBSCRIPTION_API_URL_ENV = "SUBSCRIPTION_API_URL" 29SUBSCRIPTION_API_TOKEN_ENV = "SUBSCRIPTION_API_BEARER_TOKEN" 30 31 32def _get_api_url() -> str: 33 """Get the subscription API base URL.""" 34 url = os.environ.get(SUBSCRIPTION_API_URL_ENV) 35 if not url: 36 raise ValueError( 37 f"{SUBSCRIPTION_API_URL_ENV} environment variable is not set. " 38 "Cannot reach the GitHub subscriptions backend." 39 ) 40 return url.rstrip("/") 41 42 43def _get_api_token() -> str: 44 """Get the subscription API bearer token.""" 45 token = os.environ.get(SUBSCRIPTION_API_TOKEN_ENV) 46 if not token: 47 raise ValueError( 48 f"{SUBSCRIPTION_API_TOKEN_ENV} environment variable is not set. " 49 "Cannot authenticate to the GitHub subscriptions backend." 50 ) 51 return token 52 53 54def _api_headers() -> dict[str, str]: 55 """Build headers for API requests.""" 56 return { 57 "Authorization": f"Bearer {_get_api_token()}", 58 "Content-Type": "application/json", 59 } 60 61 62class SubscribeResponse(BaseModel): 63 """Response from the subscribe_to_github_issue tool.""" 64 65 success: bool = Field( 66 description="Whether the subscription was created successfully" 67 ) 68 message: str = Field(description="Human-readable status message") 69 subscription_id: str | None = Field( 70 default=None, 71 description="ID of the created or updated subscription", 72 ) 73 github_url: str | None = Field( 74 default=None, 75 description="GitHub URL being watched", 76 ) 77 expires_at: str | None = Field( 78 default=None, 79 description="When the subscription expires (ISO 8601)", 80 ) 81 82 83class UnsubscribeResponse(BaseModel): 84 """Response from the unsubscribe_from_github_issue tool.""" 85 86 success: bool = Field(description="Whether the unsubscribe was successful") 87 message: str = Field(description="Human-readable status message") 88 deleted_count: int = Field( 89 default=0, 90 description="Number of subscriptions removed", 91 ) 92 93 94class ListSubscriptionsResponse(BaseModel): 95 """Response from the list_github_subscriptions tool.""" 96 97 success: bool = Field(description="Whether the listing was successful") 98 message: str = Field(description="Human-readable status message") 99 subscriptions: list[dict[str, str]] = Field( 100 default_factory=list, 101 description="List of active subscriptions with id, github_url, expires_at", 102 ) 103 104 105@mcp_tool( 106 read_only=False, 107 idempotent=True, 108 open_world=True, 109) 110def subscribe_to_github_issue( 111 github_url: Annotated[ 112 str, 113 "The GitHub issue or PR URL to subscribe to. " 114 "Examples: https://github.com/airbytehq/airbyte/issues/123 " 115 "or https://github.com/airbytehq/airbyte/pull/456", 116 ], 117 agent_session_url: Annotated[ 118 str, 119 "Your Devin session URL so notifications can be delivered back to " 120 "your session. Use the session URL from your system prompt.", 121 ], 122 watch_events: Annotated[ 123 list[str] | None, 124 "Optional list of event types to watch. Valid values: " 125 "'comment', 'close', 'merge', 'reopen', 'label', 'synchronize', " 126 "'ready_for_review', 'assigned'. Defaults to all events if not specified.", 127 ] = None, 128 ttl_hours: Annotated[ 129 int, 130 "Number of hours until the subscription expires. Default is 240 (10 days).", 131 ] = 240, 132 slack_users_cc: Annotated[ 133 str | None, 134 "Optional comma-delimited list of Slack user tags to CC on " 135 "notifications. Example: '<@U12345>, <@U67890>'.", 136 ] = None, 137) -> SubscribeResponse: 138 """Subscribe to notifications on a GitHub issue or pull request. 139 140 Creates a subscription that will deliver real-time notifications back 141 to your Devin session when activity occurs on the specified GitHub 142 issue or PR. Notifications are triggered by GitHub webhooks and 143 delivered within seconds. 144 145 If you are already subscribed to the same issue/PR, the subscription 146 is updated (TTL extended, watch events merged). 147 148 Use this tool when you need to monitor a GitHub issue or PR for 149 changes, new comments, merges, closures, or other activity. 150 """ 151 try: 152 api_url = _get_api_url() 153 body: dict[str, str | list[str] | int | None] = { 154 "github_url": github_url, 155 "session_url": agent_session_url, 156 "ttl_hours": ttl_hours, 157 } 158 if watch_events: 159 body["watch_events"] = watch_events 160 if slack_users_cc: 161 body["slack_users_cc"] = slack_users_cc 162 163 response = requests.post( 164 f"{api_url}/subscriptions", 165 json=body, 166 headers=_api_headers(), 167 timeout=10, 168 ) 169 response.raise_for_status() 170 data = response.json() 171 172 return SubscribeResponse( 173 success=True, 174 message=( 175 f"Subscribed to {github_url}. " 176 f"You will receive notifications in this session until " 177 f"{data.get('expires_at', 'expiry unknown')}." 178 ), 179 subscription_id=data.get("id"), 180 github_url=github_url, 181 expires_at=data.get("expires_at"), 182 ) 183 184 except ValueError as e: 185 return SubscribeResponse( 186 success=False, 187 message=f"Configuration error: {e}", 188 ) 189 except requests.RequestException as e: 190 logger.exception("Failed to create subscription") 191 return SubscribeResponse( 192 success=False, 193 message=f"Failed to create subscription: {e}", 194 ) 195 196 197@mcp_tool( 198 read_only=False, 199 idempotent=True, 200 open_world=True, 201) 202def unsubscribe_from_github_issue( 203 agent_session_url: Annotated[ 204 str, 205 "Your Devin session URL. Use the session URL from your system prompt.", 206 ], 207 github_url: Annotated[ 208 str | None, 209 "The GitHub issue or PR URL to unsubscribe from. " 210 "If not provided, all subscriptions for this session are removed.", 211 ] = None, 212 subscription_id: Annotated[ 213 str | None, 214 "Optional specific subscription ID to remove. " 215 "Use this if you know the exact subscription to cancel.", 216 ] = None, 217) -> UnsubscribeResponse: 218 """Unsubscribe from notifications on a GitHub issue or pull request. 219 220 Removes an active subscription so you will no longer receive 221 notifications for the specified issue/PR. 222 223 You can unsubscribe by: 224 - Providing a specific subscription_id 225 - Providing a github_url + session_url to unsubscribe from that specific issue/PR 226 - Providing only session_url to unsubscribe from all issues/PRs 227 """ 228 try: 229 api_url = _get_api_url() 230 231 if subscription_id: 232 # Delete by ID 233 response = requests.delete( 234 f"{api_url}/subscriptions/{subscription_id}", 235 headers=_api_headers(), 236 timeout=10, 237 ) 238 else: 239 # Delete by match 240 params: dict[str, str] = {"session_url": agent_session_url} 241 if github_url: 242 params["github_url"] = github_url 243 response = requests.delete( 244 f"{api_url}/subscriptions", 245 params=params, 246 headers=_api_headers(), 247 timeout=10, 248 ) 249 250 response.raise_for_status() 251 data = response.json() 252 count = data.get("deleted_count", 0) 253 254 return UnsubscribeResponse( 255 success=True, 256 message=f"Removed {count} subscription(s).", 257 deleted_count=count, 258 ) 259 260 except ValueError as e: 261 return UnsubscribeResponse( 262 success=False, 263 message=f"Configuration error: {e}", 264 ) 265 except requests.RequestException as e: 266 logger.exception("Failed to unsubscribe") 267 return UnsubscribeResponse( 268 success=False, 269 message=f"Failed to unsubscribe: {e}", 270 ) 271 272 273@mcp_tool( 274 read_only=True, 275 idempotent=True, 276 open_world=True, 277) 278def list_github_subscriptions( 279 agent_session_url: Annotated[ 280 str, 281 "Your Devin session URL. Use the session URL from your system prompt.", 282 ], 283) -> ListSubscriptionsResponse: 284 """List all active GitHub issue/PR subscriptions for this session. 285 286 Returns the list of GitHub issues and PRs that this session is 287 currently subscribed to, along with their expiry times. 288 """ 289 try: 290 api_url = _get_api_url() 291 292 response = requests.get( 293 f"{api_url}/subscriptions", 294 params={"session_url": agent_session_url}, 295 headers=_api_headers(), 296 timeout=10, 297 ) 298 response.raise_for_status() 299 data = response.json() 300 301 subs = [ 302 { 303 "id": s["id"], 304 "github_url": s["github_url"], 305 "watch_events": ", ".join(s.get("watch_events", [])), 306 "expires_at": s.get("expires_at", "unknown"), 307 } 308 for s in data 309 ] 310 311 if not subs: 312 return ListSubscriptionsResponse( 313 success=True, 314 message="No active subscriptions for this session.", 315 subscriptions=[], 316 ) 317 318 return ListSubscriptionsResponse( 319 success=True, 320 message=f"Found {len(subs)} active subscription(s).", 321 subscriptions=subs, 322 ) 323 324 except ValueError as e: 325 return ListSubscriptionsResponse( 326 success=False, 327 message=f"Configuration error: {e}", 328 ) 329 except requests.RequestException as e: 330 logger.exception("Failed to list subscriptions") 331 return ListSubscriptionsResponse( 332 success=False, 333 message=f"Failed to list subscriptions: {e}", 334 ) 335 336 337def register_message_bus_tools(app: FastMCP) -> None: 338 """Register message bus tools with the FastMCP app.""" 339 register_mcp_tools(app, mcp_module=__name__)