airbyte_ops_mcp.airbyte_repo.list_connectors
Detect changed connectors in the Airbyte monorepo.
This module provides functionality to detect which connectors have been modified by comparing git diffs between branches.
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Detect changed connectors in the Airbyte monorepo. 3 4This module provides functionality to detect which connectors have been modified 5by comparing git diffs between branches. 6""" 7 8from __future__ import annotations 9 10import logging 11import re 12import subprocess 13from dataclasses import dataclass 14from enum import StrEnum 15from functools import lru_cache 16from pathlib import Path 17from typing import Any 18 19import requests 20import yaml 21 22from airbyte_ops_mcp.github_api import GitHubAPIError 23from airbyte_ops_mcp.registry._enums import ConnectorLanguage, ConnectorType 24 25CONNECTOR_PATH_PREFIX = "airbyte-integrations/connectors" 26METADATA_FILE_NAME = "metadata.yaml" 27GIT_DEFAULT_BRANCH = "origin/master" 28 29logger = logging.getLogger(__name__) 30 31 32class ConnectorSubtype(StrEnum): 33 """Connector subtypes based on data source category.""" 34 35 API = "api" 36 DATABASE = "database" 37 FILE = "file" 38 CUSTOM = "custom" 39 40 41def _extract_connector_names(changed_files: list[str]) -> list[str]: 42 """Extract unique connector names from a list of changed file paths. 43 44 Args: 45 changed_files: File paths relative to the repo root. 46 47 Returns: 48 Sorted list of connector technical names. 49 """ 50 changed_connectors: set[str] = set() 51 for file_path in changed_files: 52 if file_path.startswith(CONNECTOR_PATH_PREFIX + "/"): 53 # e.g. "airbyte-integrations/connectors/source-faker/..." -> "source-faker" 54 path_parts = file_path.split("/") 55 if len(path_parts) >= 3: 56 changed_connectors.add(path_parts[2]) 57 return sorted(changed_connectors) 58 59 60def get_modified_connectors( 61 repo_path: str | Path, 62 base_ref: str = GIT_DEFAULT_BRANCH, 63 head_ref: str = "HEAD", 64) -> list[str]: 65 """Get list of connector IDs modified according to a local git diff. 66 67 This is the *local-only* fallback used when the GitHub API path is not 68 available (e.g. no PR number or no token). It uses `git merge-base` 69 when possible so that only changes introduced on the head branch are 70 reported, falling back to a plain two-ref diff when the merge-base 71 cannot be determined (shallow clones without enough history). 72 73 Prefer :func:`get_modified_connectors_from_github` whenever a PR number 74 is known, as it avoids shallow-clone pitfalls entirely. 75 76 Args: 77 repo_path: Path to the Airbyte monorepo. 78 base_ref: Base git reference to compare against. 79 head_ref: Head git reference to compare. 80 81 Returns: 82 Sorted list of connector technical names. 83 """ 84 repo_path = Path(repo_path) 85 86 # Try to find the merge-base so the diff only contains changes introduced 87 # on the head branch, avoiding false positives from unrelated base-branch 88 # commits. Falls back to a plain two-ref diff when unavailable. 89 effective_base = base_ref 90 try: 91 mb_result = subprocess.run( 92 ["git", "merge-base", base_ref, head_ref], 93 cwd=repo_path, 94 capture_output=True, 95 text=True, 96 check=True, 97 timeout=30, 98 ) 99 effective_base = mb_result.stdout.strip() 100 logger.debug("Using merge-base %s for diff", effective_base) 101 except (subprocess.CalledProcessError, subprocess.TimeoutExpired): 102 logger.debug( 103 "Could not determine merge-base for %s..%s; falling back to plain diff", 104 base_ref, 105 head_ref, 106 ) 107 108 try: 109 result = subprocess.run( 110 ["git", "diff", "--name-only", effective_base, head_ref], 111 cwd=repo_path, 112 capture_output=True, 113 text=True, 114 check=True, 115 timeout=30, 116 ) 117 changed_files = result.stdout.strip().split("\n") 118 except subprocess.CalledProcessError as e: 119 raise RuntimeError(f"Failed to get git diff: {e.stderr}") from e 120 except subprocess.TimeoutExpired as e: 121 raise RuntimeError("Git diff command timed out after 30 seconds") from e 122 123 return _extract_connector_names(changed_files) 124 125 126def get_modified_connectors_from_github( 127 pr_number: int, 128 pr_owner: str, 129 pr_repo: str, 130 gh_token: str, 131) -> list[str]: 132 """Get list of connector IDs modified in a PR via the GitHub API. 133 134 This is the preferred path when a PR number is known because it is 135 immune to shallow-clone depth issues and always returns an accurate 136 diff (only the files the PR actually changes). 137 138 Args: 139 pr_number: Pull request number. 140 pr_owner: Repository owner (e.g. `"airbytehq"`). 141 pr_repo: Repository name (e.g. `"airbyte"`). 142 gh_token: GitHub API token (must be provided explicitly). 143 144 Returns: 145 Sorted list of connector technical names. 146 147 Raises: 148 :class:`~airbyte_ops_mcp.github_api.GitHubAPIError`: If the API 149 request fails. 150 """ 151 from airbyte_ops_mcp.github_api import get_pr_changed_files 152 153 logger.info( 154 "Fetching changed files for PR %s/%s#%d via GitHub API", 155 pr_owner, 156 pr_repo, 157 pr_number, 158 ) 159 changed_files = get_pr_changed_files(pr_owner, pr_repo, pr_number, token=gh_token) 160 return _extract_connector_names(changed_files) 161 162 163@lru_cache(maxsize=128) 164def get_certified_connectors(repo_path: str | Path) -> set[str]: 165 """Get set of all certified connector IDs from metadata. 166 167 This function reads the metadata.yaml file for each connector to determine 168 which connectors have "certified" support level. 169 170 Args: 171 repo_path: Path to the Airbyte monorepo 172 173 Returns: 174 Set of certified connector technical names 175 176 Example: 177 >>> certified = get_certified_connectors("/path/to/airbyte") 178 >>> "source-postgres" in certified 179 True 180 """ 181 repo_path = Path(repo_path) 182 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 183 184 if not connectors_dir.exists(): 185 raise ValueError(f"Connectors directory not found: {connectors_dir}") 186 187 certified_connectors = set() 188 189 # Iterate through all connector directories 190 for connector_dir in connectors_dir.iterdir(): 191 if not connector_dir.is_dir(): 192 continue 193 194 metadata_file = connector_dir / METADATA_FILE_NAME 195 if not metadata_file.exists(): 196 continue 197 198 # Read metadata to check support level 199 try: 200 import yaml 201 202 with open(metadata_file) as f: 203 metadata = yaml.safe_load(f) 204 205 support_level = metadata.get("data", {}).get("supportLevel") 206 if support_level == "certified": 207 certified_connectors.add(connector_dir.name) 208 except Exception: 209 # Skip connectors with invalid metadata 210 continue 211 212 return certified_connectors 213 214 215def get_all_connectors(repo_path: str | Path) -> set[str]: 216 """Get set of all connector IDs in the repository. 217 218 Args: 219 repo_path: Path to the Airbyte monorepo 220 221 Returns: 222 Set of all connector technical names 223 224 Example: 225 >>> all_connectors = get_all_connectors("/path/to/airbyte") 226 >>> "source-faker" in all_connectors 227 True 228 """ 229 repo_path = Path(repo_path) 230 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 231 232 if not connectors_dir.exists(): 233 raise ValueError(f"Connectors directory not found: {connectors_dir}") 234 235 return {p.name for p in connectors_dir.iterdir() if p.is_dir()} 236 237 238def get_connector_metadata( 239 repo_path: str | Path, 240 connector_name: str, 241) -> dict[str, Any] | None: 242 """Get metadata for a specific connector. 243 244 Args: 245 repo_path: Path to the Airbyte monorepo 246 connector_name: Technical name of the connector (e.g., "source-faker") 247 248 Returns: 249 The connector's metadata dict (the 'data' section), or None if not found 250 251 Example: 252 >>> metadata = get_connector_metadata("/path/to/airbyte", "source-faker") 253 >>> metadata.get("supportLevel") 254 'certified' 255 """ 256 repo_path = Path(repo_path) 257 connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name 258 metadata_file = connector_dir / METADATA_FILE_NAME 259 260 if not metadata_file.exists(): 261 return None 262 263 try: 264 with open(metadata_file) as f: 265 metadata = yaml.safe_load(f) 266 return metadata.get("data", {}) 267 except Exception: 268 return None 269 270 271def get_connectors_by_language( 272 repo_path: str | Path, 273 language: ConnectorLanguage, 274) -> set[str]: 275 """Get set of all connector IDs for a specific language. 276 277 This function reads connector directories to determine which connectors 278 use the specified language. 279 280 Args: 281 repo_path: Path to the Airbyte monorepo 282 language: Language to filter by 283 284 Returns: 285 Set of connector technical names using the specified language 286 287 Example: 288 >>> python_connectors = get_connectors_by_language( 289 ... "/path/to/airbyte", ConnectorLanguage.PYTHON 290 ... ) 291 >>> "source-faker" in python_connectors 292 True 293 """ 294 repo_path = Path(repo_path) 295 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 296 297 if not connectors_dir.exists(): 298 raise ValueError(f"Connectors directory not found: {connectors_dir}") 299 300 language_value = language.value 301 connectors_by_language = set() 302 303 # Iterate through all connector directories 304 for connector_dir in connectors_dir.iterdir(): 305 if not connector_dir.is_dir(): 306 continue 307 308 # Determine language based on file structure 309 connector_name = connector_dir.name 310 detected_language = _detect_connector_language(connector_dir, connector_name) 311 312 if detected_language == language_value: 313 connectors_by_language.add(connector_dir.name) 314 315 return connectors_by_language 316 317 318def get_connectors_by_type( 319 repo_path: str | Path, 320 connector_type: ConnectorType, 321) -> set[str]: 322 """Get set of all connector IDs for a specific type (source or destination). 323 324 This function reads connector directories to determine which connectors 325 match the specified type based on their metadata.yaml or name prefix. 326 327 Args: 328 repo_path: Path to the Airbyte monorepo 329 connector_type: Type to filter by (source or destination) 330 331 Returns: 332 Set of connector technical names matching the specified type 333 334 Example: 335 >>> source_connectors = get_connectors_by_type( 336 ... "/path/to/airbyte", ConnectorType.SOURCE 337 ... ) 338 >>> "source-postgres" in source_connectors 339 True 340 """ 341 repo_path = Path(repo_path) 342 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 343 344 if not connectors_dir.exists(): 345 raise ValueError(f"Connectors directory not found: {connectors_dir}") 346 347 type_value = connector_type.value 348 connectors_by_type = set() 349 350 for connector_dir in connectors_dir.iterdir(): 351 if not connector_dir.is_dir(): 352 continue 353 354 connector_name = connector_dir.name 355 356 # First try to get type from metadata.yaml 357 metadata = get_connector_metadata(repo_path, connector_name) 358 if metadata: 359 metadata_type = metadata.get("connectorType") 360 if metadata_type == type_value: 361 connectors_by_type.add(connector_name) 362 continue 363 364 # Fallback to name prefix detection 365 if connector_name.startswith(f"{type_value}-"): 366 connectors_by_type.add(connector_name) 367 368 return connectors_by_type 369 370 371def get_connectors_by_subtype( 372 repo_path: str | Path, 373 connector_subtype: ConnectorSubtype, 374) -> set[str]: 375 """Get set of all connector IDs for a specific subtype (api, database, file, etc.). 376 377 This function reads connector metadata.yaml files to determine which connectors 378 match the specified subtype. 379 380 Args: 381 repo_path: Path to the Airbyte monorepo 382 connector_subtype: Subtype to filter by (api, database, file, custom) 383 384 Returns: 385 Set of connector technical names matching the specified subtype 386 387 Example: 388 >>> database_connectors = get_connectors_by_subtype( 389 ... "/path/to/airbyte", ConnectorSubtype.DATABASE 390 ... ) 391 >>> "source-postgres" in database_connectors 392 True 393 """ 394 repo_path = Path(repo_path) 395 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 396 397 if not connectors_dir.exists(): 398 raise ValueError(f"Connectors directory not found: {connectors_dir}") 399 400 subtype_value = connector_subtype.value 401 connectors_by_subtype = set() 402 403 for connector_dir in connectors_dir.iterdir(): 404 if not connector_dir.is_dir(): 405 continue 406 407 connector_name = connector_dir.name 408 metadata = get_connector_metadata(repo_path, connector_name) 409 410 if metadata: 411 metadata_subtype = metadata.get("connectorSubtype") 412 if metadata_subtype == subtype_value: 413 connectors_by_subtype.add(connector_name) 414 415 return connectors_by_subtype 416 417 418@lru_cache(maxsize=1024) 419def _detect_connector_language(connector_dir: Path, connector_name: str) -> str | None: 420 """Detect the language of a connector based on its file structure. 421 422 Args: 423 connector_dir: Path to the connector directory 424 connector_name: Technical name of the connector 425 426 Returns: 427 Language string (python, java, low-code, manifest-only) or None 428 """ 429 # Check for manifest-only (manifest.yaml at root) 430 if (connector_dir / "manifest.yaml").is_file(): 431 return "manifest-only" 432 433 # Check for low-code (manifest.yaml in source directory) 434 source_dir = connector_dir / connector_name.replace("-", "_") 435 if (source_dir / "manifest.yaml").is_file(): 436 return "low-code" 437 438 # Check for Python (setup.py or pyproject.toml) 439 if (connector_dir / "setup.py").is_file() or ( 440 connector_dir / "pyproject.toml" 441 ).is_file(): 442 return "python" 443 444 # Check for Java/Kotlin (src/main/java or src/main/kotlin) 445 if (connector_dir / "src" / "main" / "java").exists() or ( 446 connector_dir / "src" / "main" / "kotlin" 447 ).exists(): 448 return "java" 449 450 return None 451 452 453@lru_cache(maxsize=128) 454def get_connectors_with_local_cdk(repo_path: str | Path) -> set[str]: 455 """Get set of connectors using local CDK reference instead of published version. 456 457 This function detects connectors that are configured to use a local CDK 458 checkout rather than a published CDK version. The detection method differs 459 by connector language: 460 - Python: Checks for path-based dependency in pyproject.toml 461 - Java/Kotlin: Checks for useLocalCdk=true in build.gradle files 462 463 Args: 464 repo_path: Path to the Airbyte monorepo 465 466 Returns: 467 Set of connector technical names using local CDK reference 468 469 Example: 470 >>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte") 471 >>> "destination-motherduck" in local_cdk_connectors 472 True 473 """ 474 repo_path = Path(repo_path) 475 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 476 477 if not connectors_dir.exists(): 478 raise ValueError(f"Connectors directory not found: {connectors_dir}") 479 480 local_cdk_connectors = set() 481 482 # Iterate through all connector directories 483 for connector_dir in connectors_dir.iterdir(): 484 if not connector_dir.is_dir(): 485 continue 486 487 connector_name = connector_dir.name 488 detected_language = _detect_connector_language(connector_dir, connector_name) 489 490 # Check for local CDK based on language 491 if detected_language in ("python", "low-code") and _has_local_cdk_python( 492 connector_dir 493 ): 494 # Python connectors: check pyproject.toml for path-based CDK dependency 495 local_cdk_connectors.add(connector_name) 496 elif detected_language == "java" and _has_local_cdk_java(connector_dir): 497 # Java/Kotlin connectors: check build.gradle for useLocalCdk=true 498 local_cdk_connectors.add(connector_name) 499 500 return local_cdk_connectors 501 502 503def _has_local_cdk_python(connector_dir: Path) -> bool: 504 """Check if a Python connector uses local CDK reference. 505 506 Args: 507 connector_dir: Path to the connector directory 508 509 Returns: 510 True if connector uses local CDK reference 511 """ 512 pyproject_file = connector_dir / "pyproject.toml" 513 if not pyproject_file.exists(): 514 return False 515 516 try: 517 content = pyproject_file.read_text() 518 # Look for path-based airbyte-cdk dependency 519 # Pattern: airbyte-cdk = { path = "..." } or airbyte-cdk = {path = "..."} 520 return bool(re.search(r"airbyte-cdk\s*=\s*\{[^}]*path\s*=", content)) 521 except Exception: 522 return False 523 524 525def _has_local_cdk_java(connector_dir: Path) -> bool: 526 """Check if a Java/Kotlin connector uses local CDK reference. 527 528 Mirrors the implementation from airbyte-ci/connectors/connector_ops/connector_ops/utils.py 529 530 Args: 531 connector_dir: Path to the connector directory 532 533 Returns: 534 True if connector uses local CDK reference 535 """ 536 # Check both build.gradle and build.gradle.kts 537 for build_file_name in ("build.gradle", "build.gradle.kts"): 538 build_file = connector_dir / build_file_name 539 if not build_file.exists(): 540 continue 541 542 try: 543 # Read file and strip inline comments 544 contents = "\n".join( 545 [ 546 line.split("//")[0] # Remove inline comments 547 for line in build_file.read_text().split("\n") 548 ] 549 ) 550 # Remove spaces and check for useLocalCdk=true 551 contents = contents.replace(" ", "") 552 if "useLocalCdk=true" in contents: 553 return True 554 except Exception: 555 continue 556 557 return False 558 559 560@dataclass 561class ConnectorListResult: 562 """Result of listing connectors with filters.""" 563 564 connectors: list[str] 565 count: int 566 567 568def list_connectors( 569 repo_path: str | Path, 570 certified: bool | None = None, 571 modified: bool | None = None, 572 language_filter: set[str] | None = None, 573 language_exclude: set[str] | None = None, 574 connector_type: str | None = None, 575 connector_subtype: str | None = None, 576 base_ref: str | None = None, 577 head_ref: str | None = None, 578 pr_number: int | None = None, 579 pr_owner: str | None = None, 580 pr_repo: str | None = None, 581 gh_token: str | None = None, 582) -> ConnectorListResult: 583 """List connectors in the Airbyte monorepo with flexible filtering. 584 585 This is the core capability function that encapsulates all filtering logic. 586 Both CLI and MCP layers should use this function. 587 588 When `modified` filtering is requested and both `gh_token` and full PR 589 context (`pr_number`, `pr_owner`, `pr_repo`) are provided, the 590 GitHub API is used to determine changed files — this is the most reliable 591 method and avoids shallow-clone pitfalls. Falls back to local `git diff` 592 when the token is not provided or PR context is incomplete. 593 594 Args: 595 repo_path: Path to the Airbyte monorepo 596 certified: Filter by certification status (True=certified only, 597 False=non-certified only, None=all) 598 modified: Filter by modification status (True=modified only, 599 False=not-modified only, None=all) 600 language_filter: Set of languages to include (python, java, low-code, manifest-only) 601 language_exclude: Set of languages to exclude (mutually exclusive with language_filter) 602 connector_type: Filter by connector type (source, destination, or None for all) 603 connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all) 604 base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master") 605 head_ref: Head git reference for modification detection (default: "HEAD") 606 pr_number: Pull request number (enables GitHub API path for modification detection) 607 pr_owner: Repository owner for PR (e.g. "airbytehq") 608 pr_repo: Repository name for PR (e.g. "airbyte") 609 gh_token: GitHub API token. When provided together with PR context, 610 the GitHub API is used instead of local git diff. 611 612 Returns: 613 ConnectorListResult with sorted list of connector names and count 614 615 Raises: 616 ValueError: If both language_filter and language_exclude are provided 617 """ 618 # Validate mutual exclusivity of language filters 619 if language_filter is not None and language_exclude is not None: 620 raise ValueError( 621 "Cannot specify both language_filter and language_exclude. " 622 "Only one language filter parameter is accepted." 623 ) 624 625 # Start with all connectors 626 result = get_all_connectors(repo_path) 627 628 # Apply certified filter 629 if certified is not None: 630 certified_set = get_certified_connectors(repo_path) 631 if certified: 632 # Include only certified 633 result &= certified_set 634 else: 635 # Include only non-certified 636 result -= certified_set 637 638 # Apply modified filter 639 if modified is not None: 640 changed_set: set[str] | None = None 641 642 # Use GitHub API when token and full PR context are explicitly provided 643 if ( 644 gh_token is not None 645 and pr_number is not None 646 and pr_owner is not None 647 and pr_repo is not None 648 ): 649 try: 650 changed_set = set( 651 get_modified_connectors_from_github( 652 pr_number, pr_owner, pr_repo, gh_token=gh_token 653 ) 654 ) 655 except (GitHubAPIError, requests.RequestException): 656 logger.warning( 657 "GitHub API call failed for PR %s/%s#%d; " 658 "falling back to local git diff", 659 pr_owner, 660 pr_repo, 661 pr_number, 662 exc_info=True, 663 ) 664 665 # Fallback to local git diff 666 if changed_set is None: 667 base = base_ref if base_ref is not None else GIT_DEFAULT_BRANCH 668 head = head_ref if head_ref is not None else "HEAD" 669 changed_set = set(get_modified_connectors(repo_path, base, head)) 670 671 if modified: 672 # Include only modified 673 result &= changed_set 674 else: 675 # Include only not-modified 676 result -= changed_set 677 678 # Apply language include filter 679 if language_filter: 680 # Get connectors for all specified languages and union them 681 lang_result: set[str] = set() 682 for lang in language_filter: 683 lang_result |= get_connectors_by_language( 684 repo_path, ConnectorLanguage(lang) 685 ) 686 result &= lang_result 687 688 # Apply language exclude filter 689 if language_exclude: 690 # Get connectors for all specified languages and exclude them 691 for lang in language_exclude: 692 excluded = get_connectors_by_language(repo_path, ConnectorLanguage(lang)) 693 result -= excluded 694 695 # Apply connector type filter 696 if connector_type is not None: 697 type_set = get_connectors_by_type(repo_path, ConnectorType(connector_type)) 698 result &= type_set 699 700 # Apply connector subtype filter 701 if connector_subtype is not None: 702 subtype_set = get_connectors_by_subtype( 703 repo_path, ConnectorSubtype(connector_subtype) 704 ) 705 result &= subtype_set 706 707 return ConnectorListResult( 708 connectors=sorted(result), 709 count=len(result), 710 )
33class ConnectorSubtype(StrEnum): 34 """Connector subtypes based on data source category.""" 35 36 API = "api" 37 DATABASE = "database" 38 FILE = "file" 39 CUSTOM = "custom"
Connector subtypes based on data source category.
61def get_modified_connectors( 62 repo_path: str | Path, 63 base_ref: str = GIT_DEFAULT_BRANCH, 64 head_ref: str = "HEAD", 65) -> list[str]: 66 """Get list of connector IDs modified according to a local git diff. 67 68 This is the *local-only* fallback used when the GitHub API path is not 69 available (e.g. no PR number or no token). It uses `git merge-base` 70 when possible so that only changes introduced on the head branch are 71 reported, falling back to a plain two-ref diff when the merge-base 72 cannot be determined (shallow clones without enough history). 73 74 Prefer :func:`get_modified_connectors_from_github` whenever a PR number 75 is known, as it avoids shallow-clone pitfalls entirely. 76 77 Args: 78 repo_path: Path to the Airbyte monorepo. 79 base_ref: Base git reference to compare against. 80 head_ref: Head git reference to compare. 81 82 Returns: 83 Sorted list of connector technical names. 84 """ 85 repo_path = Path(repo_path) 86 87 # Try to find the merge-base so the diff only contains changes introduced 88 # on the head branch, avoiding false positives from unrelated base-branch 89 # commits. Falls back to a plain two-ref diff when unavailable. 90 effective_base = base_ref 91 try: 92 mb_result = subprocess.run( 93 ["git", "merge-base", base_ref, head_ref], 94 cwd=repo_path, 95 capture_output=True, 96 text=True, 97 check=True, 98 timeout=30, 99 ) 100 effective_base = mb_result.stdout.strip() 101 logger.debug("Using merge-base %s for diff", effective_base) 102 except (subprocess.CalledProcessError, subprocess.TimeoutExpired): 103 logger.debug( 104 "Could not determine merge-base for %s..%s; falling back to plain diff", 105 base_ref, 106 head_ref, 107 ) 108 109 try: 110 result = subprocess.run( 111 ["git", "diff", "--name-only", effective_base, head_ref], 112 cwd=repo_path, 113 capture_output=True, 114 text=True, 115 check=True, 116 timeout=30, 117 ) 118 changed_files = result.stdout.strip().split("\n") 119 except subprocess.CalledProcessError as e: 120 raise RuntimeError(f"Failed to get git diff: {e.stderr}") from e 121 except subprocess.TimeoutExpired as e: 122 raise RuntimeError("Git diff command timed out after 30 seconds") from e 123 124 return _extract_connector_names(changed_files)
Get list of connector IDs modified according to a local git diff.
This is the local-only fallback used when the GitHub API path is not
available (e.g. no PR number or no token). It uses git merge-base
when possible so that only changes introduced on the head branch are
reported, falling back to a plain two-ref diff when the merge-base
cannot be determined (shallow clones without enough history).
Prefer get_modified_connectors_from_github() whenever a PR number
is known, as it avoids shallow-clone pitfalls entirely.
Arguments:
- repo_path: Path to the Airbyte monorepo.
- base_ref: Base git reference to compare against.
- head_ref: Head git reference to compare.
Returns:
Sorted list of connector technical names.
127def get_modified_connectors_from_github( 128 pr_number: int, 129 pr_owner: str, 130 pr_repo: str, 131 gh_token: str, 132) -> list[str]: 133 """Get list of connector IDs modified in a PR via the GitHub API. 134 135 This is the preferred path when a PR number is known because it is 136 immune to shallow-clone depth issues and always returns an accurate 137 diff (only the files the PR actually changes). 138 139 Args: 140 pr_number: Pull request number. 141 pr_owner: Repository owner (e.g. `"airbytehq"`). 142 pr_repo: Repository name (e.g. `"airbyte"`). 143 gh_token: GitHub API token (must be provided explicitly). 144 145 Returns: 146 Sorted list of connector technical names. 147 148 Raises: 149 :class:`~airbyte_ops_mcp.github_api.GitHubAPIError`: If the API 150 request fails. 151 """ 152 from airbyte_ops_mcp.github_api import get_pr_changed_files 153 154 logger.info( 155 "Fetching changed files for PR %s/%s#%d via GitHub API", 156 pr_owner, 157 pr_repo, 158 pr_number, 159 ) 160 changed_files = get_pr_changed_files(pr_owner, pr_repo, pr_number, token=gh_token) 161 return _extract_connector_names(changed_files)
Get list of connector IDs modified in a PR via the GitHub API.
This is the preferred path when a PR number is known because it is immune to shallow-clone depth issues and always returns an accurate diff (only the files the PR actually changes).
Arguments:
- pr_number: Pull request number.
- pr_owner: Repository owner (e.g.
"airbytehq"). - pr_repo: Repository name (e.g.
"airbyte"). - gh_token: GitHub API token (must be provided explicitly).
Returns:
Sorted list of connector technical names.
Raises:
~airbyte_ops_mcp.github_api.GitHubAPIError: If the API request fails.
164@lru_cache(maxsize=128) 165def get_certified_connectors(repo_path: str | Path) -> set[str]: 166 """Get set of all certified connector IDs from metadata. 167 168 This function reads the metadata.yaml file for each connector to determine 169 which connectors have "certified" support level. 170 171 Args: 172 repo_path: Path to the Airbyte monorepo 173 174 Returns: 175 Set of certified connector technical names 176 177 Example: 178 >>> certified = get_certified_connectors("/path/to/airbyte") 179 >>> "source-postgres" in certified 180 True 181 """ 182 repo_path = Path(repo_path) 183 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 184 185 if not connectors_dir.exists(): 186 raise ValueError(f"Connectors directory not found: {connectors_dir}") 187 188 certified_connectors = set() 189 190 # Iterate through all connector directories 191 for connector_dir in connectors_dir.iterdir(): 192 if not connector_dir.is_dir(): 193 continue 194 195 metadata_file = connector_dir / METADATA_FILE_NAME 196 if not metadata_file.exists(): 197 continue 198 199 # Read metadata to check support level 200 try: 201 import yaml 202 203 with open(metadata_file) as f: 204 metadata = yaml.safe_load(f) 205 206 support_level = metadata.get("data", {}).get("supportLevel") 207 if support_level == "certified": 208 certified_connectors.add(connector_dir.name) 209 except Exception: 210 # Skip connectors with invalid metadata 211 continue 212 213 return certified_connectors
Get set of all certified connector IDs from metadata.
This function reads the metadata.yaml file for each connector to determine which connectors have "certified" support level.
Arguments:
- repo_path: Path to the Airbyte monorepo
Returns:
Set of certified connector technical names
Example:
>>> certified = get_certified_connectors("/path/to/airbyte") >>> "source-postgres" in certified True
216def get_all_connectors(repo_path: str | Path) -> set[str]: 217 """Get set of all connector IDs in the repository. 218 219 Args: 220 repo_path: Path to the Airbyte monorepo 221 222 Returns: 223 Set of all connector technical names 224 225 Example: 226 >>> all_connectors = get_all_connectors("/path/to/airbyte") 227 >>> "source-faker" in all_connectors 228 True 229 """ 230 repo_path = Path(repo_path) 231 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 232 233 if not connectors_dir.exists(): 234 raise ValueError(f"Connectors directory not found: {connectors_dir}") 235 236 return {p.name for p in connectors_dir.iterdir() if p.is_dir()}
Get set of all connector IDs in the repository.
Arguments:
- repo_path: Path to the Airbyte monorepo
Returns:
Set of all connector technical names
Example:
>>> all_connectors = get_all_connectors("/path/to/airbyte") >>> "source-faker" in all_connectors True
239def get_connector_metadata( 240 repo_path: str | Path, 241 connector_name: str, 242) -> dict[str, Any] | None: 243 """Get metadata for a specific connector. 244 245 Args: 246 repo_path: Path to the Airbyte monorepo 247 connector_name: Technical name of the connector (e.g., "source-faker") 248 249 Returns: 250 The connector's metadata dict (the 'data' section), or None if not found 251 252 Example: 253 >>> metadata = get_connector_metadata("/path/to/airbyte", "source-faker") 254 >>> metadata.get("supportLevel") 255 'certified' 256 """ 257 repo_path = Path(repo_path) 258 connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name 259 metadata_file = connector_dir / METADATA_FILE_NAME 260 261 if not metadata_file.exists(): 262 return None 263 264 try: 265 with open(metadata_file) as f: 266 metadata = yaml.safe_load(f) 267 return metadata.get("data", {}) 268 except Exception: 269 return None
Get metadata for a specific connector.
Arguments:
- repo_path: Path to the Airbyte monorepo
- connector_name: Technical name of the connector (e.g., "source-faker")
Returns:
The connector's metadata dict (the 'data' section), or None if not found
Example:
>>> metadata = get_connector_metadata("/path/to/airbyte", "source-faker") >>> metadata.get("supportLevel") 'certified'
272def get_connectors_by_language( 273 repo_path: str | Path, 274 language: ConnectorLanguage, 275) -> set[str]: 276 """Get set of all connector IDs for a specific language. 277 278 This function reads connector directories to determine which connectors 279 use the specified language. 280 281 Args: 282 repo_path: Path to the Airbyte monorepo 283 language: Language to filter by 284 285 Returns: 286 Set of connector technical names using the specified language 287 288 Example: 289 >>> python_connectors = get_connectors_by_language( 290 ... "/path/to/airbyte", ConnectorLanguage.PYTHON 291 ... ) 292 >>> "source-faker" in python_connectors 293 True 294 """ 295 repo_path = Path(repo_path) 296 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 297 298 if not connectors_dir.exists(): 299 raise ValueError(f"Connectors directory not found: {connectors_dir}") 300 301 language_value = language.value 302 connectors_by_language = set() 303 304 # Iterate through all connector directories 305 for connector_dir in connectors_dir.iterdir(): 306 if not connector_dir.is_dir(): 307 continue 308 309 # Determine language based on file structure 310 connector_name = connector_dir.name 311 detected_language = _detect_connector_language(connector_dir, connector_name) 312 313 if detected_language == language_value: 314 connectors_by_language.add(connector_dir.name) 315 316 return connectors_by_language
Get set of all connector IDs for a specific language.
This function reads connector directories to determine which connectors use the specified language.
Arguments:
- repo_path: Path to the Airbyte monorepo
- language: Language to filter by
Returns:
Set of connector technical names using the specified language
Example:
>>> python_connectors = get_connectors_by_language( ... "/path/to/airbyte", ConnectorLanguage.PYTHON ... ) >>> "source-faker" in python_connectors True
319def get_connectors_by_type( 320 repo_path: str | Path, 321 connector_type: ConnectorType, 322) -> set[str]: 323 """Get set of all connector IDs for a specific type (source or destination). 324 325 This function reads connector directories to determine which connectors 326 match the specified type based on their metadata.yaml or name prefix. 327 328 Args: 329 repo_path: Path to the Airbyte monorepo 330 connector_type: Type to filter by (source or destination) 331 332 Returns: 333 Set of connector technical names matching the specified type 334 335 Example: 336 >>> source_connectors = get_connectors_by_type( 337 ... "/path/to/airbyte", ConnectorType.SOURCE 338 ... ) 339 >>> "source-postgres" in source_connectors 340 True 341 """ 342 repo_path = Path(repo_path) 343 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 344 345 if not connectors_dir.exists(): 346 raise ValueError(f"Connectors directory not found: {connectors_dir}") 347 348 type_value = connector_type.value 349 connectors_by_type = set() 350 351 for connector_dir in connectors_dir.iterdir(): 352 if not connector_dir.is_dir(): 353 continue 354 355 connector_name = connector_dir.name 356 357 # First try to get type from metadata.yaml 358 metadata = get_connector_metadata(repo_path, connector_name) 359 if metadata: 360 metadata_type = metadata.get("connectorType") 361 if metadata_type == type_value: 362 connectors_by_type.add(connector_name) 363 continue 364 365 # Fallback to name prefix detection 366 if connector_name.startswith(f"{type_value}-"): 367 connectors_by_type.add(connector_name) 368 369 return connectors_by_type
Get set of all connector IDs for a specific type (source or destination).
This function reads connector directories to determine which connectors match the specified type based on their metadata.yaml or name prefix.
Arguments:
- repo_path: Path to the Airbyte monorepo
- connector_type: Type to filter by (source or destination)
Returns:
Set of connector technical names matching the specified type
Example:
>>> source_connectors = get_connectors_by_type( ... "/path/to/airbyte", ConnectorType.SOURCE ... ) >>> "source-postgres" in source_connectors True
372def get_connectors_by_subtype( 373 repo_path: str | Path, 374 connector_subtype: ConnectorSubtype, 375) -> set[str]: 376 """Get set of all connector IDs for a specific subtype (api, database, file, etc.). 377 378 This function reads connector metadata.yaml files to determine which connectors 379 match the specified subtype. 380 381 Args: 382 repo_path: Path to the Airbyte monorepo 383 connector_subtype: Subtype to filter by (api, database, file, custom) 384 385 Returns: 386 Set of connector technical names matching the specified subtype 387 388 Example: 389 >>> database_connectors = get_connectors_by_subtype( 390 ... "/path/to/airbyte", ConnectorSubtype.DATABASE 391 ... ) 392 >>> "source-postgres" in database_connectors 393 True 394 """ 395 repo_path = Path(repo_path) 396 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 397 398 if not connectors_dir.exists(): 399 raise ValueError(f"Connectors directory not found: {connectors_dir}") 400 401 subtype_value = connector_subtype.value 402 connectors_by_subtype = set() 403 404 for connector_dir in connectors_dir.iterdir(): 405 if not connector_dir.is_dir(): 406 continue 407 408 connector_name = connector_dir.name 409 metadata = get_connector_metadata(repo_path, connector_name) 410 411 if metadata: 412 metadata_subtype = metadata.get("connectorSubtype") 413 if metadata_subtype == subtype_value: 414 connectors_by_subtype.add(connector_name) 415 416 return connectors_by_subtype
Get set of all connector IDs for a specific subtype (api, database, file, etc.).
This function reads connector metadata.yaml files to determine which connectors match the specified subtype.
Arguments:
- repo_path: Path to the Airbyte monorepo
- connector_subtype: Subtype to filter by (api, database, file, custom)
Returns:
Set of connector technical names matching the specified subtype
Example:
>>> database_connectors = get_connectors_by_subtype( ... "/path/to/airbyte", ConnectorSubtype.DATABASE ... ) >>> "source-postgres" in database_connectors True
454@lru_cache(maxsize=128) 455def get_connectors_with_local_cdk(repo_path: str | Path) -> set[str]: 456 """Get set of connectors using local CDK reference instead of published version. 457 458 This function detects connectors that are configured to use a local CDK 459 checkout rather than a published CDK version. The detection method differs 460 by connector language: 461 - Python: Checks for path-based dependency in pyproject.toml 462 - Java/Kotlin: Checks for useLocalCdk=true in build.gradle files 463 464 Args: 465 repo_path: Path to the Airbyte monorepo 466 467 Returns: 468 Set of connector technical names using local CDK reference 469 470 Example: 471 >>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte") 472 >>> "destination-motherduck" in local_cdk_connectors 473 True 474 """ 475 repo_path = Path(repo_path) 476 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 477 478 if not connectors_dir.exists(): 479 raise ValueError(f"Connectors directory not found: {connectors_dir}") 480 481 local_cdk_connectors = set() 482 483 # Iterate through all connector directories 484 for connector_dir in connectors_dir.iterdir(): 485 if not connector_dir.is_dir(): 486 continue 487 488 connector_name = connector_dir.name 489 detected_language = _detect_connector_language(connector_dir, connector_name) 490 491 # Check for local CDK based on language 492 if detected_language in ("python", "low-code") and _has_local_cdk_python( 493 connector_dir 494 ): 495 # Python connectors: check pyproject.toml for path-based CDK dependency 496 local_cdk_connectors.add(connector_name) 497 elif detected_language == "java" and _has_local_cdk_java(connector_dir): 498 # Java/Kotlin connectors: check build.gradle for useLocalCdk=true 499 local_cdk_connectors.add(connector_name) 500 501 return local_cdk_connectors
Get set of connectors using local CDK reference instead of published version.
This function detects connectors that are configured to use a local CDK checkout rather than a published CDK version. The detection method differs by connector language:
- Python: Checks for path-based dependency in pyproject.toml
- Java/Kotlin: Checks for useLocalCdk=true in build.gradle files
Arguments:
- repo_path: Path to the Airbyte monorepo
Returns:
Set of connector technical names using local CDK reference
Example:
>>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte") >>> "destination-motherduck" in local_cdk_connectors True
561@dataclass 562class ConnectorListResult: 563 """Result of listing connectors with filters.""" 564 565 connectors: list[str] 566 count: int
Result of listing connectors with filters.
569def list_connectors( 570 repo_path: str | Path, 571 certified: bool | None = None, 572 modified: bool | None = None, 573 language_filter: set[str] | None = None, 574 language_exclude: set[str] | None = None, 575 connector_type: str | None = None, 576 connector_subtype: str | None = None, 577 base_ref: str | None = None, 578 head_ref: str | None = None, 579 pr_number: int | None = None, 580 pr_owner: str | None = None, 581 pr_repo: str | None = None, 582 gh_token: str | None = None, 583) -> ConnectorListResult: 584 """List connectors in the Airbyte monorepo with flexible filtering. 585 586 This is the core capability function that encapsulates all filtering logic. 587 Both CLI and MCP layers should use this function. 588 589 When `modified` filtering is requested and both `gh_token` and full PR 590 context (`pr_number`, `pr_owner`, `pr_repo`) are provided, the 591 GitHub API is used to determine changed files — this is the most reliable 592 method and avoids shallow-clone pitfalls. Falls back to local `git diff` 593 when the token is not provided or PR context is incomplete. 594 595 Args: 596 repo_path: Path to the Airbyte monorepo 597 certified: Filter by certification status (True=certified only, 598 False=non-certified only, None=all) 599 modified: Filter by modification status (True=modified only, 600 False=not-modified only, None=all) 601 language_filter: Set of languages to include (python, java, low-code, manifest-only) 602 language_exclude: Set of languages to exclude (mutually exclusive with language_filter) 603 connector_type: Filter by connector type (source, destination, or None for all) 604 connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all) 605 base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master") 606 head_ref: Head git reference for modification detection (default: "HEAD") 607 pr_number: Pull request number (enables GitHub API path for modification detection) 608 pr_owner: Repository owner for PR (e.g. "airbytehq") 609 pr_repo: Repository name for PR (e.g. "airbyte") 610 gh_token: GitHub API token. When provided together with PR context, 611 the GitHub API is used instead of local git diff. 612 613 Returns: 614 ConnectorListResult with sorted list of connector names and count 615 616 Raises: 617 ValueError: If both language_filter and language_exclude are provided 618 """ 619 # Validate mutual exclusivity of language filters 620 if language_filter is not None and language_exclude is not None: 621 raise ValueError( 622 "Cannot specify both language_filter and language_exclude. " 623 "Only one language filter parameter is accepted." 624 ) 625 626 # Start with all connectors 627 result = get_all_connectors(repo_path) 628 629 # Apply certified filter 630 if certified is not None: 631 certified_set = get_certified_connectors(repo_path) 632 if certified: 633 # Include only certified 634 result &= certified_set 635 else: 636 # Include only non-certified 637 result -= certified_set 638 639 # Apply modified filter 640 if modified is not None: 641 changed_set: set[str] | None = None 642 643 # Use GitHub API when token and full PR context are explicitly provided 644 if ( 645 gh_token is not None 646 and pr_number is not None 647 and pr_owner is not None 648 and pr_repo is not None 649 ): 650 try: 651 changed_set = set( 652 get_modified_connectors_from_github( 653 pr_number, pr_owner, pr_repo, gh_token=gh_token 654 ) 655 ) 656 except (GitHubAPIError, requests.RequestException): 657 logger.warning( 658 "GitHub API call failed for PR %s/%s#%d; " 659 "falling back to local git diff", 660 pr_owner, 661 pr_repo, 662 pr_number, 663 exc_info=True, 664 ) 665 666 # Fallback to local git diff 667 if changed_set is None: 668 base = base_ref if base_ref is not None else GIT_DEFAULT_BRANCH 669 head = head_ref if head_ref is not None else "HEAD" 670 changed_set = set(get_modified_connectors(repo_path, base, head)) 671 672 if modified: 673 # Include only modified 674 result &= changed_set 675 else: 676 # Include only not-modified 677 result -= changed_set 678 679 # Apply language include filter 680 if language_filter: 681 # Get connectors for all specified languages and union them 682 lang_result: set[str] = set() 683 for lang in language_filter: 684 lang_result |= get_connectors_by_language( 685 repo_path, ConnectorLanguage(lang) 686 ) 687 result &= lang_result 688 689 # Apply language exclude filter 690 if language_exclude: 691 # Get connectors for all specified languages and exclude them 692 for lang in language_exclude: 693 excluded = get_connectors_by_language(repo_path, ConnectorLanguage(lang)) 694 result -= excluded 695 696 # Apply connector type filter 697 if connector_type is not None: 698 type_set = get_connectors_by_type(repo_path, ConnectorType(connector_type)) 699 result &= type_set 700 701 # Apply connector subtype filter 702 if connector_subtype is not None: 703 subtype_set = get_connectors_by_subtype( 704 repo_path, ConnectorSubtype(connector_subtype) 705 ) 706 result &= subtype_set 707 708 return ConnectorListResult( 709 connectors=sorted(result), 710 count=len(result), 711 )
List connectors in the Airbyte monorepo with flexible filtering.
This is the core capability function that encapsulates all filtering logic. Both CLI and MCP layers should use this function.
When modified filtering is requested and both gh_token and full PR
context (pr_number, pr_owner, pr_repo) are provided, the
GitHub API is used to determine changed files — this is the most reliable
method and avoids shallow-clone pitfalls. Falls back to local git diff
when the token is not provided or PR context is incomplete.
Arguments:
- repo_path: Path to the Airbyte monorepo
- certified: Filter by certification status (True=certified only, False=non-certified only, None=all)
- modified: Filter by modification status (True=modified only, False=not-modified only, None=all)
- language_filter: Set of languages to include (python, java, low-code, manifest-only)
- language_exclude: Set of languages to exclude (mutually exclusive with language_filter)
- connector_type: Filter by connector type (source, destination, or None for all)
- connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all)
- base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master")
- head_ref: Head git reference for modification detection (default: "HEAD")
- pr_number: Pull request number (enables GitHub API path for modification detection)
- pr_owner: Repository owner for PR (e.g. "airbytehq")
- pr_repo: Repository name for PR (e.g. "airbyte")
- gh_token: GitHub API token. When provided together with PR context, the GitHub API is used instead of local git diff.
Returns:
ConnectorListResult with sorted list of connector names and count
Raises:
- ValueError: If both language_filter and language_exclude are provided