airbyte_ops_mcp.mcp.tier_lookup
MCP tools for customer tier lookup and cache management.
Provides tools to resolve customer tiers for organizations, workspaces, and connections, and to manage the tier cache.
MCP reference
MCP primitives registered by the tier_lookup module of the airbyte-internal-ops server: 3 tool(s), 0 prompt(s), 0 resource(s).
Tools (3)
get_customer_tier_cache_stats
Hints: read-only · idempotent
Get current statistics about the customer tier cache.
Returns cache size, age, and file paths for both the tier cache (org -> tier) and workspace cache (workspace -> org + region).
Useful for checking cache freshness and diagnosing issues.
Parameters:
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"description": "Statistics about the tier cache state.",
"properties": {
"tier_cache_size": {
"description": "Number of orgs in the tier cache",
"type": "integer"
},
"workspace_cache_size": {
"description": "Number of workspaces in the workspace cache",
"type": "integer"
},
"tier_cache_age_seconds": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Age of the tier cache in seconds (None if not cached)"
},
"workspace_cache_age_seconds": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Age of the workspace cache in seconds (None if not cached)"
},
"tier_cache_path": {
"description": "Path to the tier cache file",
"type": "string"
},
"workspace_cache_path": {
"description": "Path to the workspace cache file",
"type": "string"
}
},
"required": [
"tier_cache_size",
"workspace_cache_size",
"tier_cache_path",
"workspace_cache_path"
],
"type": "object"
}
lookup_customer_tiers
Hints: read-only · idempotent
Look up customer tier classification for organizations, workspaces, and/or connections.
Accepts mixed lists of organization IDs, workspace IDs, and connection IDs. Resolves each to its organization and maps to a customer tier (TIER_0, TIER_1, or TIER_2).
Tier 0 and Tier 1 orgs are explicitly tracked in Salesforce and cached from BigQuery. Any org not in the cache defaults to TIER_2.
Returns enriched entries with tier, region (EU/US), and a summary of the distribution.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
organization_ids |
array<string> | null |
no | null |
List of organization UUIDs to look up tiers for. Example: ['664c690e-5263-49ba-b01f-4a6759b3330a'] |
workspace_ids |
array<string> | null |
no | null |
List of workspace UUIDs to look up tiers for. Each workspace will be resolved to its organization and tier. Example: ['266ebdfe-0d7b-4540-9817-de7e4505ba61'] |
connection_ids |
array<string> | null |
no | null |
List of connection UUIDs to look up tiers for. Each connection will be resolved to its workspace, organization, and tier. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"organization_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "List of organization UUIDs to look up tiers for. Example: ['664c690e-5263-49ba-b01f-4a6759b3330a']"
},
"workspace_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "List of workspace UUIDs to look up tiers for. Each workspace will be resolved to its organization and tier. Example: ['266ebdfe-0d7b-4540-9817-de7e4505ba61']"
},
"connection_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "List of connection UUIDs to look up tiers for. Each connection will be resolved to its workspace, organization, and tier."
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Result of a customer tier lookup across multiple IDs.",
"properties": {
"entries": {
"description": "Individual tier lookup results for each input ID",
"items": {
"description": "A single resolved tier entry for an org, workspace, or connection.",
"properties": {
"input_id": {
"description": "The original ID that was looked up",
"type": "string"
},
"input_type": {
"description": "Type of the input ID: 'organization', 'workspace', or 'connection'",
"type": "string"
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Resolved organization UUID"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace UUID (if input was workspace or connection)"
},
"connection_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Connection UUID (if input was connection)"
},
"customer_tier": {
"description": "Resolved tier: TIER_0, TIER_1, or TIER_2",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2"
],
"type": "string"
},
"dataplane_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Dataplane region name (e.g., 'US', 'EU')"
},
"is_eu": {
"default": false,
"description": "Whether the entity is in the EU region",
"type": "boolean"
},
"resolved": {
"default": true,
"description": "Whether the ID was successfully resolved",
"type": "boolean"
}
},
"required": [
"input_id",
"input_type",
"customer_tier"
],
"type": "object"
},
"type": "array"
},
"summary": {
"description": "Tier distribution summary across all resolved entries",
"properties": {
"tier_0_count": {
"default": 0,
"description": "Number of TIER_0 entries",
"type": "integer"
},
"tier_1_count": {
"default": 0,
"description": "Number of TIER_1 entries",
"type": "integer"
},
"tier_2_count": {
"default": 0,
"description": "Number of TIER_2 entries",
"type": "integer"
},
"total": {
"default": 0,
"description": "Total number of entries",
"type": "integer"
}
},
"type": "object"
},
"summary_text": {
"description": "Human-readable tier distribution summary",
"type": "string"
}
},
"required": [
"entries",
"summary",
"summary_text"
],
"type": "object"
}
refresh_customer_tier_cache
Hints: idempotent
Force-refresh the customer tier cache from BigQuery.
The tier cache is automatically refreshed every 24 hours. Use this tool to manually trigger a refresh if you need the latest tier data immediately (e.g., after a Salesforce update).
Returns cache statistics after the refresh.
Parameters:
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"description": "Result of refreshing the tier cache.",
"properties": {
"stats": {
"description": "Cache statistics after refresh",
"properties": {
"tier_cache_size": {
"description": "Number of orgs in the tier cache",
"type": "integer"
},
"workspace_cache_size": {
"description": "Number of workspaces in the workspace cache",
"type": "integer"
},
"tier_cache_age_seconds": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Age of the tier cache in seconds (None if not cached)"
},
"workspace_cache_age_seconds": {
"anyOf": [
{
"type": "number"
},
{
"type": "null"
}
],
"default": null,
"description": "Age of the workspace cache in seconds (None if not cached)"
},
"tier_cache_path": {
"description": "Path to the tier cache file",
"type": "string"
},
"workspace_cache_path": {
"description": "Path to the workspace cache file",
"type": "string"
}
},
"required": [
"tier_cache_size",
"workspace_cache_size",
"tier_cache_path",
"workspace_cache_path"
],
"type": "object"
},
"message": {
"description": "Human-readable result message",
"type": "string"
}
},
"required": [
"stats",
"message"
],
"type": "object"
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for customer tier lookup and cache management. 3 4Provides tools to resolve customer tiers for organizations, workspaces, and connections, 5and to manage the tier cache. 6 7## MCP reference 8 9.. include:: ../../../docs/mcp-generated/tier_lookup.md 10 :start-line: 2 11""" 12 13from __future__ import annotations 14 15__all__: list[str] = [] 16 17from typing import Annotated, Any 18 19from fastmcp import FastMCP 20from fastmcp_extensions import mcp_tool, register_mcp_tools 21from pydantic import BaseModel, Field 22 23from airbyte_ops_mcp.prod_db_access.queries import query_connection_workspace_details 24from airbyte_ops_mcp.tier_cache import ( 25 CustomerTier, 26 TierCacheStats, 27 TierSummary, 28 build_tier_summary, 29 get_cache_stats, 30 get_org_tier, 31 get_org_tiers, 32 refresh_tier_cache, 33 resolve_workspaces, 34) 35 36# ============================================================================= 37# Pydantic Models for MCP Tool Responses 38# ============================================================================= 39 40 41class TierLookupEntry(BaseModel): 42 """A single resolved tier entry for an org, workspace, or connection.""" 43 44 input_id: str = Field(description="The original ID that was looked up") 45 input_type: str = Field( 46 description="Type of the input ID: 'organization', 'workspace', or 'connection'" 47 ) 48 organization_id: str | None = Field( 49 default=None, description="Resolved organization UUID" 50 ) 51 workspace_id: str | None = Field( 52 default=None, 53 description="Workspace UUID (if input was workspace or connection)", 54 ) 55 connection_id: str | None = Field( 56 default=None, description="Connection UUID (if input was connection)" 57 ) 58 customer_tier: CustomerTier = Field( 59 description="Resolved tier: TIER_0, TIER_1, or TIER_2" 60 ) 61 dataplane_name: str | None = Field( 62 default=None, description="Dataplane region name (e.g., 'US', 'EU')" 63 ) 64 is_eu: bool = Field( 65 default=False, description="Whether the entity is in the EU region" 66 ) 67 resolved: bool = Field( 68 default=True, description="Whether the ID was successfully resolved" 69 ) 70 71 72class TierLookupResult(BaseModel): 73 """Result of a customer tier lookup across multiple IDs.""" 74 75 entries: list[TierLookupEntry] = Field( 76 description="Individual tier lookup results for each input ID" 77 ) 78 summary: TierSummary = Field( 79 description="Tier distribution summary across all resolved entries" 80 ) 81 summary_text: str = Field(description="Human-readable tier distribution summary") 82 83 84class TierCacheRefreshResult(BaseModel): 85 """Result of refreshing the tier cache.""" 86 87 stats: TierCacheStats = Field(description="Cache statistics after refresh") 88 message: str = Field(description="Human-readable result message") 89 90 91# ============================================================================= 92# MCP Tools 93# ============================================================================= 94 95 96@mcp_tool( 97 read_only=True, 98 idempotent=True, 99) 100def lookup_customer_tiers( 101 organization_ids: Annotated[ 102 list[str] | None, 103 Field( 104 description=( 105 "List of organization UUIDs to look up tiers for. " 106 "Example: ['664c690e-5263-49ba-b01f-4a6759b3330a']" 107 ), 108 default=None, 109 ), 110 ] = None, 111 workspace_ids: Annotated[ 112 list[str] | None, 113 Field( 114 description=( 115 "List of workspace UUIDs to look up tiers for. " 116 "Each workspace will be resolved to its organization and tier. " 117 "Example: ['266ebdfe-0d7b-4540-9817-de7e4505ba61']" 118 ), 119 default=None, 120 ), 121 ] = None, 122 connection_ids: Annotated[ 123 list[str] | None, 124 Field( 125 description=( 126 "List of connection UUIDs to look up tiers for. " 127 "Each connection will be resolved to its workspace, organization, and tier." 128 ), 129 default=None, 130 ), 131 ] = None, 132) -> TierLookupResult: 133 """Look up customer tier classification for organizations, workspaces, and/or connections. 134 135 Accepts mixed lists of organization IDs, workspace IDs, and connection IDs. 136 Resolves each to its organization and maps to a customer tier (TIER_0, TIER_1, or TIER_2). 137 138 Tier 0 and Tier 1 orgs are explicitly tracked in Salesforce and cached from BigQuery. 139 Any org not in the cache defaults to TIER_2. 140 141 Returns enriched entries with tier, region (EU/US), and a summary of the distribution. 142 """ 143 entries: list[TierLookupEntry] = [] 144 145 # Resolve organization IDs directly 146 org_ids = organization_ids or [] 147 if org_ids: 148 org_results = get_org_tiers(org_ids) 149 for result in org_results: 150 entries.append( 151 TierLookupEntry( 152 input_id=result.organization_id, 153 input_type="organization", 154 organization_id=result.organization_id, 155 customer_tier=result.customer_tier, 156 resolved=True, 157 ) 158 ) 159 160 # Resolve workspace IDs -> org -> tier 161 ws_ids = workspace_ids or [] 162 if ws_ids: 163 ws_results = resolve_workspaces(ws_ids) 164 for result in ws_results: 165 entries.append( 166 TierLookupEntry( 167 input_id=result.workspace_id, 168 input_type="workspace", 169 organization_id=result.organization_id, 170 workspace_id=result.workspace_id, 171 customer_tier=result.customer_tier, 172 dataplane_name=result.dataplane_name, 173 is_eu=result.is_eu, 174 resolved=result.resolved, 175 ) 176 ) 177 178 # Resolve connection IDs -> workspace -> org -> tier 179 conn_ids = connection_ids or [] 180 if conn_ids: 181 conn_details = query_connection_workspace_details(conn_ids) 182 conn_map: dict[str, dict[str, Any]] = { 183 str(row["connection_id"]): row for row in conn_details 184 } 185 186 for conn_id in conn_ids: 187 row = conn_map.get(conn_id) 188 if row is None: 189 entries.append( 190 TierLookupEntry( 191 input_id=conn_id, 192 input_type="connection", 193 connection_id=conn_id, 194 customer_tier="TIER_2", 195 resolved=False, 196 ) 197 ) 198 continue 199 200 org_id = str(row["organization_id"]) 201 dataplane_name = row.get("dataplane_name") or "US" 202 org_result = get_org_tier(org_id) 203 204 entries.append( 205 TierLookupEntry( 206 input_id=conn_id, 207 input_type="connection", 208 organization_id=org_id, 209 workspace_id=str(row["workspace_id"]), 210 connection_id=conn_id, 211 customer_tier=org_result.customer_tier, 212 dataplane_name=dataplane_name, 213 is_eu=dataplane_name == "EU", 214 resolved=True, 215 ) 216 ) 217 218 # Build summary from enriched entries 219 summary_rows = [{"customer_tier": e.customer_tier} for e in entries if e.resolved] 220 summary = build_tier_summary(summary_rows) 221 222 return TierLookupResult( 223 entries=entries, 224 summary=summary, 225 summary_text=str(summary), 226 ) 227 228 229@mcp_tool( 230 read_only=False, 231 idempotent=True, 232) 233def refresh_customer_tier_cache() -> TierCacheRefreshResult: 234 """Force-refresh the customer tier cache from BigQuery. 235 236 The tier cache is automatically refreshed every 24 hours. Use this tool to 237 manually trigger a refresh if you need the latest tier data immediately 238 (e.g., after a Salesforce update). 239 240 Returns cache statistics after the refresh. 241 """ 242 stats = refresh_tier_cache() 243 return TierCacheRefreshResult( 244 stats=stats, 245 message=( 246 f"Tier cache refreshed: {stats.tier_cache_size} orgs cached, " 247 f"{stats.workspace_cache_size} workspaces cached." 248 ), 249 ) 250 251 252@mcp_tool( 253 read_only=True, 254 idempotent=True, 255) 256def get_customer_tier_cache_stats() -> TierCacheStats: 257 """Get current statistics about the customer tier cache. 258 259 Returns cache size, age, and file paths for both the tier cache 260 (org -> tier) and workspace cache (workspace -> org + region). 261 262 Useful for checking cache freshness and diagnosing issues. 263 """ 264 return get_cache_stats() 265 266 267def register_tier_lookup_tools(app: FastMCP) -> None: 268 """Register tier lookup tools with the FastMCP app.""" 269 register_mcp_tools(app, mcp_module=__name__)