airbyte_ops_mcp.airbyte_repo.list_connectors
Detect changed connectors in the Airbyte monorepo.
This module provides functionality to detect which connectors have been modified by comparing git diffs between branches.
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Detect changed connectors in the Airbyte monorepo. 3 4This module provides functionality to detect which connectors have been modified 5by comparing git diffs between branches. 6""" 7 8from __future__ import annotations 9 10import logging 11import re 12import subprocess 13from dataclasses import dataclass 14from enum import StrEnum 15from functools import lru_cache 16from pathlib import Path 17from typing import Any 18 19import requests 20import yaml 21 22from airbyte_ops_mcp.github_api import GitHubAPIError, get_pr_changed_files 23from airbyte_ops_mcp.registry._enums import ConnectorLanguage, ConnectorType 24 25CONNECTOR_PATH_PREFIX = "airbyte-integrations/connectors" 26METADATA_FILE_NAME = "metadata.yaml" 27GIT_DEFAULT_BRANCH = "origin/master" 28 29logger = logging.getLogger(__name__) 30 31 32class ConnectorSubtype(StrEnum): 33 """Connector subtypes based on data source category.""" 34 35 API = "api" 36 DATABASE = "database" 37 FILE = "file" 38 CUSTOM = "custom" 39 40 41def _extract_connector_names(changed_files: list[str]) -> list[str]: 42 """Extract unique connector names from a list of changed file paths. 43 44 Args: 45 changed_files: File paths relative to the repo root. 46 47 Returns: 48 Sorted list of connector technical names. 49 """ 50 changed_connectors: set[str] = set() 51 for file_path in changed_files: 52 if file_path.startswith(CONNECTOR_PATH_PREFIX + "/"): 53 # e.g. "airbyte-integrations/connectors/source-faker/..." -> "source-faker" 54 path_parts = file_path.split("/") 55 if len(path_parts) >= 3: 56 changed_connectors.add(path_parts[2]) 57 return sorted(changed_connectors) 58 59 60def get_modified_connectors( 61 repo_path: str | Path, 62 base_ref: str = GIT_DEFAULT_BRANCH, 63 head_ref: str = "HEAD", 64) -> list[str]: 65 """Get list of connector IDs modified according to a local git diff. 66 67 This is the *local-only* fallback used when the GitHub API path is not 68 available (e.g. no PR number or no token). It uses `git merge-base` 69 when possible so that only changes introduced on the head branch are 70 reported, falling back to a plain two-ref diff when the merge-base 71 cannot be determined (shallow clones without enough history). 72 73 Prefer `get_modified_connectors_from_github` whenever a PR number 74 is known, as it avoids shallow-clone pitfalls entirely. 75 76 Args: 77 repo_path: Path to the Airbyte monorepo. 78 base_ref: Base git reference to compare against. 79 head_ref: Head git reference to compare. 80 81 Returns: 82 Sorted list of connector technical names. 83 """ 84 repo_path = Path(repo_path) 85 86 # Try to find the merge-base so the diff only contains changes introduced 87 # on the head branch, avoiding false positives from unrelated base-branch 88 # commits. Falls back to a plain two-ref diff when unavailable. 89 effective_base = base_ref 90 try: 91 mb_result = subprocess.run( 92 ["git", "merge-base", base_ref, head_ref], 93 cwd=repo_path, 94 capture_output=True, 95 text=True, 96 check=True, 97 timeout=30, 98 ) 99 effective_base = mb_result.stdout.strip() 100 logger.debug("Using merge-base %s for diff", effective_base) 101 except (subprocess.CalledProcessError, subprocess.TimeoutExpired): 102 logger.debug( 103 "Could not determine merge-base for %s..%s; falling back to plain diff", 104 base_ref, 105 head_ref, 106 ) 107 108 try: 109 result = subprocess.run( 110 ["git", "diff", "--name-only", effective_base, head_ref], 111 cwd=repo_path, 112 capture_output=True, 113 text=True, 114 check=True, 115 timeout=30, 116 ) 117 changed_files = result.stdout.strip().split("\n") 118 except subprocess.CalledProcessError as e: 119 raise RuntimeError(f"Failed to get git diff: {e.stderr}") from e 120 except subprocess.TimeoutExpired as e: 121 raise RuntimeError("Git diff command timed out after 30 seconds") from e 122 123 return _extract_connector_names(changed_files) 124 125 126def format_github_actions_connector_matrix( 127 connectors: list[str], 128) -> dict[str, list[str]]: 129 """Format connector names as a GitHub Actions matrix. 130 131 GitHub Actions matrix generation needs at least one value. Return an empty 132 connector placeholder when no connectors changed so callers can pair this 133 with an `if` guard on the matrix job. 134 """ 135 return {"connector": connectors or [""]} 136 137 138def get_modified_connectors_from_github( 139 pr_number: int, 140 pr_owner: str, 141 pr_repo: str, 142 gh_token: str, 143) -> list[str]: 144 """Get list of connector IDs modified in a PR via the GitHub API. 145 146 This is the preferred path when a PR number is known because it is 147 immune to shallow-clone depth issues and always returns an accurate 148 diff (only the files the PR actually changes). 149 150 Args: 151 pr_number: Pull request number. 152 pr_owner: Repository owner (e.g. `"airbytehq"`). 153 pr_repo: Repository name (e.g. `"airbyte"`). 154 gh_token: GitHub API token (must be provided explicitly). 155 156 Returns: 157 Sorted list of connector technical names. 158 159 Raises: 160 GitHubAPIError: If the API request fails. 161 """ 162 logger.info( 163 "Fetching changed files for PR %s/%s#%d via GitHub API", 164 pr_owner, 165 pr_repo, 166 pr_number, 167 ) 168 changed_files = get_pr_changed_files(pr_owner, pr_repo, pr_number, token=gh_token) 169 return _extract_connector_names(changed_files) 170 171 172@lru_cache(maxsize=128) 173def get_certified_connectors(repo_path: str | Path) -> set[str]: 174 """Get set of all certified connector IDs from metadata. 175 176 This function reads the metadata.yaml file for each connector to determine 177 which connectors have "certified" support level. 178 179 Args: 180 repo_path: Path to the Airbyte monorepo 181 182 Returns: 183 Set of certified connector technical names 184 185 Example: 186 >>> certified = get_certified_connectors("/path/to/airbyte") 187 >>> "source-postgres" in certified 188 True 189 """ 190 repo_path = Path(repo_path) 191 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 192 193 if not connectors_dir.exists(): 194 raise ValueError(f"Connectors directory not found: {connectors_dir}") 195 196 certified_connectors = set() 197 198 # Iterate through all connector directories 199 for connector_dir in connectors_dir.iterdir(): 200 if not connector_dir.is_dir(): 201 continue 202 203 metadata_file = connector_dir / METADATA_FILE_NAME 204 if not metadata_file.exists(): 205 continue 206 207 # Read metadata to check support level 208 try: 209 import yaml 210 211 with open(metadata_file) as f: 212 metadata = yaml.safe_load(f) 213 214 support_level = metadata.get("data", {}).get("supportLevel") 215 if support_level == "certified": 216 certified_connectors.add(connector_dir.name) 217 except Exception: 218 # Skip connectors with invalid metadata 219 continue 220 221 return certified_connectors 222 223 224def get_all_connectors(repo_path: str | Path) -> set[str]: 225 """Get set of all connector IDs in the repository. 226 227 Args: 228 repo_path: Path to the Airbyte monorepo 229 230 Returns: 231 Set of all connector technical names 232 233 Example: 234 >>> all_connectors = get_all_connectors("/path/to/airbyte") 235 >>> "source-faker" in all_connectors 236 True 237 """ 238 repo_path = Path(repo_path) 239 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 240 241 if not connectors_dir.exists(): 242 raise ValueError(f"Connectors directory not found: {connectors_dir}") 243 244 return {p.name for p in connectors_dir.iterdir() if p.is_dir()} 245 246 247def get_connector_metadata( 248 repo_path: str | Path, 249 connector_name: str, 250) -> dict[str, Any] | None: 251 """Get metadata for a specific connector. 252 253 Args: 254 repo_path: Path to the Airbyte monorepo 255 connector_name: Technical name of the connector (e.g., "source-faker") 256 257 Returns: 258 The connector's metadata dict (the 'data' section), or None if not found 259 260 Example: 261 >>> metadata = get_connector_metadata("/path/to/airbyte", "source-faker") 262 >>> metadata.get("supportLevel") 263 'certified' 264 """ 265 repo_path = Path(repo_path) 266 connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name 267 metadata_file = connector_dir / METADATA_FILE_NAME 268 269 if not metadata_file.exists(): 270 return None 271 272 try: 273 with open(metadata_file) as f: 274 metadata = yaml.safe_load(f) 275 return metadata.get("data", {}) 276 except Exception: 277 return None 278 279 280def get_connectors_by_language( 281 repo_path: str | Path, 282 language: ConnectorLanguage, 283) -> set[str]: 284 """Get set of all connector IDs for a specific language. 285 286 This function reads connector directories to determine which connectors 287 use the specified language. 288 289 Args: 290 repo_path: Path to the Airbyte monorepo 291 language: Language to filter by 292 293 Returns: 294 Set of connector technical names using the specified language 295 296 Example: 297 >>> python_connectors = get_connectors_by_language( 298 ... "/path/to/airbyte", ConnectorLanguage.PYTHON 299 ... ) 300 >>> "source-faker" in python_connectors 301 True 302 """ 303 repo_path = Path(repo_path) 304 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 305 306 if not connectors_dir.exists(): 307 raise ValueError(f"Connectors directory not found: {connectors_dir}") 308 309 language_value = language.value 310 connectors_by_language = set() 311 312 # Iterate through all connector directories 313 for connector_dir in connectors_dir.iterdir(): 314 if not connector_dir.is_dir(): 315 continue 316 317 # Determine language based on file structure 318 connector_name = connector_dir.name 319 detected_language = _detect_connector_language(connector_dir, connector_name) 320 321 if detected_language == language_value: 322 connectors_by_language.add(connector_dir.name) 323 324 return connectors_by_language 325 326 327def get_connectors_by_type( 328 repo_path: str | Path, 329 connector_type: ConnectorType, 330) -> set[str]: 331 """Get set of all connector IDs for a specific type (source or destination). 332 333 This function reads connector directories to determine which connectors 334 match the specified type based on their metadata.yaml or name prefix. 335 336 Args: 337 repo_path: Path to the Airbyte monorepo 338 connector_type: Type to filter by (source or destination) 339 340 Returns: 341 Set of connector technical names matching the specified type 342 343 Example: 344 >>> source_connectors = get_connectors_by_type( 345 ... "/path/to/airbyte", ConnectorType.SOURCE 346 ... ) 347 >>> "source-postgres" in source_connectors 348 True 349 """ 350 repo_path = Path(repo_path) 351 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 352 353 if not connectors_dir.exists(): 354 raise ValueError(f"Connectors directory not found: {connectors_dir}") 355 356 type_value = connector_type.value 357 connectors_by_type = set() 358 359 for connector_dir in connectors_dir.iterdir(): 360 if not connector_dir.is_dir(): 361 continue 362 363 connector_name = connector_dir.name 364 365 # First try to get type from metadata.yaml 366 metadata = get_connector_metadata(repo_path, connector_name) 367 if metadata: 368 metadata_type = metadata.get("connectorType") 369 if metadata_type == type_value: 370 connectors_by_type.add(connector_name) 371 continue 372 373 # Fallback to name prefix detection 374 if connector_name.startswith(f"{type_value}-"): 375 connectors_by_type.add(connector_name) 376 377 return connectors_by_type 378 379 380def get_connectors_by_subtype( 381 repo_path: str | Path, 382 connector_subtype: ConnectorSubtype, 383) -> set[str]: 384 """Get set of all connector IDs for a specific subtype (api, database, file, etc.). 385 386 This function reads connector metadata.yaml files to determine which connectors 387 match the specified subtype. 388 389 Args: 390 repo_path: Path to the Airbyte monorepo 391 connector_subtype: Subtype to filter by (api, database, file, custom) 392 393 Returns: 394 Set of connector technical names matching the specified subtype 395 396 Example: 397 >>> database_connectors = get_connectors_by_subtype( 398 ... "/path/to/airbyte", ConnectorSubtype.DATABASE 399 ... ) 400 >>> "source-postgres" in database_connectors 401 True 402 """ 403 repo_path = Path(repo_path) 404 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 405 406 if not connectors_dir.exists(): 407 raise ValueError(f"Connectors directory not found: {connectors_dir}") 408 409 subtype_value = connector_subtype.value 410 connectors_by_subtype = set() 411 412 for connector_dir in connectors_dir.iterdir(): 413 if not connector_dir.is_dir(): 414 continue 415 416 connector_name = connector_dir.name 417 metadata = get_connector_metadata(repo_path, connector_name) 418 419 if metadata: 420 metadata_subtype = metadata.get("connectorSubtype") 421 if metadata_subtype == subtype_value: 422 connectors_by_subtype.add(connector_name) 423 424 return connectors_by_subtype 425 426 427@lru_cache(maxsize=1024) 428def _detect_connector_language(connector_dir: Path, connector_name: str) -> str | None: 429 """Detect the language of a connector based on its file structure. 430 431 Args: 432 connector_dir: Path to the connector directory 433 connector_name: Technical name of the connector 434 435 Returns: 436 Language string (python, java, low-code, manifest-only) or None 437 """ 438 # Check for manifest-only (manifest.yaml at root) 439 if (connector_dir / "manifest.yaml").is_file(): 440 return "manifest-only" 441 442 # Check for low-code (manifest.yaml in source directory) 443 source_dir = connector_dir / connector_name.replace("-", "_") 444 if (source_dir / "manifest.yaml").is_file(): 445 return "low-code" 446 447 # Check for Python (setup.py or pyproject.toml) 448 if (connector_dir / "setup.py").is_file() or ( 449 connector_dir / "pyproject.toml" 450 ).is_file(): 451 return "python" 452 453 # Check for Java/Kotlin (src/main/java or src/main/kotlin) 454 if (connector_dir / "src" / "main" / "java").exists() or ( 455 connector_dir / "src" / "main" / "kotlin" 456 ).exists(): 457 return "java" 458 459 return None 460 461 462@lru_cache(maxsize=128) 463def get_connectors_with_local_cdk(repo_path: str | Path) -> set[str]: 464 """Get set of connectors using local CDK reference instead of published version. 465 466 This function detects connectors that are configured to use a local CDK 467 checkout rather than a published CDK version. The detection method differs 468 by connector language: 469 - Python: Checks for path-based dependency in pyproject.toml 470 - Java/Kotlin: Checks for useLocalCdk=true in build.gradle files 471 472 Args: 473 repo_path: Path to the Airbyte monorepo 474 475 Returns: 476 Set of connector technical names using local CDK reference 477 478 Example: 479 >>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte") 480 >>> "destination-motherduck" in local_cdk_connectors 481 True 482 """ 483 repo_path = Path(repo_path) 484 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 485 486 if not connectors_dir.exists(): 487 raise ValueError(f"Connectors directory not found: {connectors_dir}") 488 489 local_cdk_connectors = set() 490 491 # Iterate through all connector directories 492 for connector_dir in connectors_dir.iterdir(): 493 if not connector_dir.is_dir(): 494 continue 495 496 connector_name = connector_dir.name 497 detected_language = _detect_connector_language(connector_dir, connector_name) 498 499 # Check for local CDK based on language 500 if detected_language in ("python", "low-code") and _has_local_cdk_python( 501 connector_dir 502 ): 503 # Python connectors: check pyproject.toml for path-based CDK dependency 504 local_cdk_connectors.add(connector_name) 505 elif detected_language == "java" and _has_local_cdk_java(connector_dir): 506 # Java/Kotlin connectors: check build.gradle for useLocalCdk=true 507 local_cdk_connectors.add(connector_name) 508 509 return local_cdk_connectors 510 511 512def _has_local_cdk_python(connector_dir: Path) -> bool: 513 """Check if a Python connector uses local CDK reference. 514 515 Args: 516 connector_dir: Path to the connector directory 517 518 Returns: 519 True if connector uses local CDK reference 520 """ 521 pyproject_file = connector_dir / "pyproject.toml" 522 if not pyproject_file.exists(): 523 return False 524 525 try: 526 content = pyproject_file.read_text() 527 # Look for path-based airbyte-cdk dependency 528 # Pattern: airbyte-cdk = { path = "..." } or airbyte-cdk = {path = "..."} 529 return bool(re.search(r"airbyte-cdk\s*=\s*\{[^}]*path\s*=", content)) 530 except Exception: 531 return False 532 533 534def _has_local_cdk_java(connector_dir: Path) -> bool: 535 """Check if a Java/Kotlin connector uses local CDK reference. 536 537 Mirrors the implementation from airbyte-ci/connectors/connector_ops/connector_ops/utils.py 538 539 Args: 540 connector_dir: Path to the connector directory 541 542 Returns: 543 True if connector uses local CDK reference 544 """ 545 # Check both build.gradle and build.gradle.kts 546 for build_file_name in ("build.gradle", "build.gradle.kts"): 547 build_file = connector_dir / build_file_name 548 if not build_file.exists(): 549 continue 550 551 try: 552 # Read file and strip inline comments 553 contents = "\n".join( 554 [ 555 line.split("//")[0] # Remove inline comments 556 for line in build_file.read_text().split("\n") 557 ] 558 ) 559 # Remove spaces and check for useLocalCdk=true 560 contents = contents.replace(" ", "") 561 if "useLocalCdk=true" in contents: 562 return True 563 except Exception: 564 continue 565 566 return False 567 568 569@dataclass 570class ConnectorListResult: 571 """Result of listing connectors with filters.""" 572 573 connectors: list[str] 574 count: int 575 576 577def list_connectors( 578 repo_path: str | Path, 579 certified: bool | None = None, 580 modified: bool | None = None, 581 language_filter: set[str] | None = None, 582 language_exclude: set[str] | None = None, 583 connector_type: str | None = None, 584 connector_subtype: str | None = None, 585 base_ref: str | None = None, 586 head_ref: str | None = None, 587 pr_number: int | None = None, 588 pr_owner: str | None = None, 589 pr_repo: str | None = None, 590 gh_token: str | None = None, 591) -> ConnectorListResult: 592 """List connectors in the Airbyte monorepo with flexible filtering. 593 594 This is the core capability function that encapsulates all filtering logic. 595 Both CLI and MCP layers should use this function. 596 597 When `modified` filtering is requested and both `gh_token` and full PR 598 context (`pr_number`, `pr_owner`, `pr_repo`) are provided, the 599 GitHub API is used to determine changed files — this is the most reliable 600 method and avoids shallow-clone pitfalls. Falls back to local `git diff` 601 when the token is not provided or PR context is incomplete. 602 603 Args: 604 repo_path: Path to the Airbyte monorepo 605 certified: Filter by certification status (True=certified only, 606 False=non-certified only, None=all) 607 modified: Filter by modification status (True=modified only, 608 False=not-modified only, None=all) 609 language_filter: Set of languages to include (python, java, low-code, manifest-only) 610 language_exclude: Set of languages to exclude (mutually exclusive with language_filter) 611 connector_type: Filter by connector type (source, destination, or None for all) 612 connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all) 613 base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master") 614 head_ref: Head git reference for modification detection (default: "HEAD") 615 pr_number: Pull request number (enables GitHub API path for modification detection) 616 pr_owner: Repository owner for PR (e.g. "airbytehq") 617 pr_repo: Repository name for PR (e.g. "airbyte") 618 gh_token: GitHub API token. When provided together with PR context, 619 the GitHub API is used instead of local git diff. 620 621 Returns: 622 ConnectorListResult with sorted list of connector names and count 623 624 Raises: 625 ValueError: If both language_filter and language_exclude are provided 626 """ 627 # Validate mutual exclusivity of language filters 628 if language_filter is not None and language_exclude is not None: 629 raise ValueError( 630 "Cannot specify both language_filter and language_exclude. " 631 "Only one language filter parameter is accepted." 632 ) 633 634 # Start with all connectors 635 result = get_all_connectors(repo_path) 636 637 # Apply certified filter 638 if certified is not None: 639 certified_set = get_certified_connectors(repo_path) 640 if certified: 641 # Include only certified 642 result &= certified_set 643 else: 644 # Include only non-certified 645 result -= certified_set 646 647 # Apply modified filter 648 if modified is not None: 649 changed_set: set[str] | None = None 650 651 # Use GitHub API when token and full PR context are explicitly provided 652 if ( 653 gh_token is not None 654 and pr_number is not None 655 and pr_owner is not None 656 and pr_repo is not None 657 ): 658 try: 659 changed_set = set( 660 get_modified_connectors_from_github( 661 pr_number, pr_owner, pr_repo, gh_token=gh_token 662 ) 663 ) 664 except (GitHubAPIError, requests.RequestException): 665 logger.warning( 666 "GitHub API call failed for PR %s/%s#%d; " 667 "falling back to local git diff", 668 pr_owner, 669 pr_repo, 670 pr_number, 671 exc_info=True, 672 ) 673 674 # Fallback to local git diff 675 if changed_set is None: 676 base = base_ref if base_ref is not None else GIT_DEFAULT_BRANCH 677 head = head_ref if head_ref is not None else "HEAD" 678 changed_set = set(get_modified_connectors(repo_path, base, head)) 679 680 if modified: 681 # Include only modified 682 result &= changed_set 683 else: 684 # Include only not-modified 685 result -= changed_set 686 687 # Apply language include filter 688 if language_filter: 689 # Get connectors for all specified languages and union them 690 lang_result: set[str] = set() 691 for lang in language_filter: 692 lang_result |= get_connectors_by_language( 693 repo_path, ConnectorLanguage(lang) 694 ) 695 result &= lang_result 696 697 # Apply language exclude filter 698 if language_exclude: 699 # Get connectors for all specified languages and exclude them 700 for lang in language_exclude: 701 excluded = get_connectors_by_language(repo_path, ConnectorLanguage(lang)) 702 result -= excluded 703 704 # Apply connector type filter 705 if connector_type is not None: 706 type_set = get_connectors_by_type(repo_path, ConnectorType(connector_type)) 707 result &= type_set 708 709 # Apply connector subtype filter 710 if connector_subtype is not None: 711 subtype_set = get_connectors_by_subtype( 712 repo_path, ConnectorSubtype(connector_subtype) 713 ) 714 result &= subtype_set 715 716 return ConnectorListResult( 717 connectors=sorted(result), 718 count=len(result), 719 )
33class ConnectorSubtype(StrEnum): 34 """Connector subtypes based on data source category.""" 35 36 API = "api" 37 DATABASE = "database" 38 FILE = "file" 39 CUSTOM = "custom"
Connector subtypes based on data source category.
61def get_modified_connectors( 62 repo_path: str | Path, 63 base_ref: str = GIT_DEFAULT_BRANCH, 64 head_ref: str = "HEAD", 65) -> list[str]: 66 """Get list of connector IDs modified according to a local git diff. 67 68 This is the *local-only* fallback used when the GitHub API path is not 69 available (e.g. no PR number or no token). It uses `git merge-base` 70 when possible so that only changes introduced on the head branch are 71 reported, falling back to a plain two-ref diff when the merge-base 72 cannot be determined (shallow clones without enough history). 73 74 Prefer `get_modified_connectors_from_github` whenever a PR number 75 is known, as it avoids shallow-clone pitfalls entirely. 76 77 Args: 78 repo_path: Path to the Airbyte monorepo. 79 base_ref: Base git reference to compare against. 80 head_ref: Head git reference to compare. 81 82 Returns: 83 Sorted list of connector technical names. 84 """ 85 repo_path = Path(repo_path) 86 87 # Try to find the merge-base so the diff only contains changes introduced 88 # on the head branch, avoiding false positives from unrelated base-branch 89 # commits. Falls back to a plain two-ref diff when unavailable. 90 effective_base = base_ref 91 try: 92 mb_result = subprocess.run( 93 ["git", "merge-base", base_ref, head_ref], 94 cwd=repo_path, 95 capture_output=True, 96 text=True, 97 check=True, 98 timeout=30, 99 ) 100 effective_base = mb_result.stdout.strip() 101 logger.debug("Using merge-base %s for diff", effective_base) 102 except (subprocess.CalledProcessError, subprocess.TimeoutExpired): 103 logger.debug( 104 "Could not determine merge-base for %s..%s; falling back to plain diff", 105 base_ref, 106 head_ref, 107 ) 108 109 try: 110 result = subprocess.run( 111 ["git", "diff", "--name-only", effective_base, head_ref], 112 cwd=repo_path, 113 capture_output=True, 114 text=True, 115 check=True, 116 timeout=30, 117 ) 118 changed_files = result.stdout.strip().split("\n") 119 except subprocess.CalledProcessError as e: 120 raise RuntimeError(f"Failed to get git diff: {e.stderr}") from e 121 except subprocess.TimeoutExpired as e: 122 raise RuntimeError("Git diff command timed out after 30 seconds") from e 123 124 return _extract_connector_names(changed_files)
Get list of connector IDs modified according to a local git diff.
This is the local-only fallback used when the GitHub API path is not
available (e.g. no PR number or no token). It uses git merge-base
when possible so that only changes introduced on the head branch are
reported, falling back to a plain two-ref diff when the merge-base
cannot be determined (shallow clones without enough history).
Prefer get_modified_connectors_from_github whenever a PR number
is known, as it avoids shallow-clone pitfalls entirely.
Arguments:
- repo_path: Path to the Airbyte monorepo.
- base_ref: Base git reference to compare against.
- head_ref: Head git reference to compare.
Returns:
Sorted list of connector technical names.
127def format_github_actions_connector_matrix( 128 connectors: list[str], 129) -> dict[str, list[str]]: 130 """Format connector names as a GitHub Actions matrix. 131 132 GitHub Actions matrix generation needs at least one value. Return an empty 133 connector placeholder when no connectors changed so callers can pair this 134 with an `if` guard on the matrix job. 135 """ 136 return {"connector": connectors or [""]}
Format connector names as a GitHub Actions matrix.
GitHub Actions matrix generation needs at least one value. Return an empty
connector placeholder when no connectors changed so callers can pair this
with an if guard on the matrix job.
139def get_modified_connectors_from_github( 140 pr_number: int, 141 pr_owner: str, 142 pr_repo: str, 143 gh_token: str, 144) -> list[str]: 145 """Get list of connector IDs modified in a PR via the GitHub API. 146 147 This is the preferred path when a PR number is known because it is 148 immune to shallow-clone depth issues and always returns an accurate 149 diff (only the files the PR actually changes). 150 151 Args: 152 pr_number: Pull request number. 153 pr_owner: Repository owner (e.g. `"airbytehq"`). 154 pr_repo: Repository name (e.g. `"airbyte"`). 155 gh_token: GitHub API token (must be provided explicitly). 156 157 Returns: 158 Sorted list of connector technical names. 159 160 Raises: 161 GitHubAPIError: If the API request fails. 162 """ 163 logger.info( 164 "Fetching changed files for PR %s/%s#%d via GitHub API", 165 pr_owner, 166 pr_repo, 167 pr_number, 168 ) 169 changed_files = get_pr_changed_files(pr_owner, pr_repo, pr_number, token=gh_token) 170 return _extract_connector_names(changed_files)
Get list of connector IDs modified in a PR via the GitHub API.
This is the preferred path when a PR number is known because it is immune to shallow-clone depth issues and always returns an accurate diff (only the files the PR actually changes).
Arguments:
- pr_number: Pull request number.
- pr_owner: Repository owner (e.g.
"airbytehq"). - pr_repo: Repository name (e.g.
"airbyte"). - gh_token: GitHub API token (must be provided explicitly).
Returns:
Sorted list of connector technical names.
Raises:
- GitHubAPIError: If the API request fails.
173@lru_cache(maxsize=128) 174def get_certified_connectors(repo_path: str | Path) -> set[str]: 175 """Get set of all certified connector IDs from metadata. 176 177 This function reads the metadata.yaml file for each connector to determine 178 which connectors have "certified" support level. 179 180 Args: 181 repo_path: Path to the Airbyte monorepo 182 183 Returns: 184 Set of certified connector technical names 185 186 Example: 187 >>> certified = get_certified_connectors("/path/to/airbyte") 188 >>> "source-postgres" in certified 189 True 190 """ 191 repo_path = Path(repo_path) 192 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 193 194 if not connectors_dir.exists(): 195 raise ValueError(f"Connectors directory not found: {connectors_dir}") 196 197 certified_connectors = set() 198 199 # Iterate through all connector directories 200 for connector_dir in connectors_dir.iterdir(): 201 if not connector_dir.is_dir(): 202 continue 203 204 metadata_file = connector_dir / METADATA_FILE_NAME 205 if not metadata_file.exists(): 206 continue 207 208 # Read metadata to check support level 209 try: 210 import yaml 211 212 with open(metadata_file) as f: 213 metadata = yaml.safe_load(f) 214 215 support_level = metadata.get("data", {}).get("supportLevel") 216 if support_level == "certified": 217 certified_connectors.add(connector_dir.name) 218 except Exception: 219 # Skip connectors with invalid metadata 220 continue 221 222 return certified_connectors
Get set of all certified connector IDs from metadata.
This function reads the metadata.yaml file for each connector to determine which connectors have "certified" support level.
Arguments:
- repo_path: Path to the Airbyte monorepo
Returns:
Set of certified connector technical names
Example:
>>> certified = get_certified_connectors("/path/to/airbyte") >>> "source-postgres" in certified True
225def get_all_connectors(repo_path: str | Path) -> set[str]: 226 """Get set of all connector IDs in the repository. 227 228 Args: 229 repo_path: Path to the Airbyte monorepo 230 231 Returns: 232 Set of all connector technical names 233 234 Example: 235 >>> all_connectors = get_all_connectors("/path/to/airbyte") 236 >>> "source-faker" in all_connectors 237 True 238 """ 239 repo_path = Path(repo_path) 240 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 241 242 if not connectors_dir.exists(): 243 raise ValueError(f"Connectors directory not found: {connectors_dir}") 244 245 return {p.name for p in connectors_dir.iterdir() if p.is_dir()}
Get set of all connector IDs in the repository.
Arguments:
- repo_path: Path to the Airbyte monorepo
Returns:
Set of all connector technical names
Example:
>>> all_connectors = get_all_connectors("/path/to/airbyte") >>> "source-faker" in all_connectors True
248def get_connector_metadata( 249 repo_path: str | Path, 250 connector_name: str, 251) -> dict[str, Any] | None: 252 """Get metadata for a specific connector. 253 254 Args: 255 repo_path: Path to the Airbyte monorepo 256 connector_name: Technical name of the connector (e.g., "source-faker") 257 258 Returns: 259 The connector's metadata dict (the 'data' section), or None if not found 260 261 Example: 262 >>> metadata = get_connector_metadata("/path/to/airbyte", "source-faker") 263 >>> metadata.get("supportLevel") 264 'certified' 265 """ 266 repo_path = Path(repo_path) 267 connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name 268 metadata_file = connector_dir / METADATA_FILE_NAME 269 270 if not metadata_file.exists(): 271 return None 272 273 try: 274 with open(metadata_file) as f: 275 metadata = yaml.safe_load(f) 276 return metadata.get("data", {}) 277 except Exception: 278 return None
Get metadata for a specific connector.
Arguments:
- repo_path: Path to the Airbyte monorepo
- connector_name: Technical name of the connector (e.g., "source-faker")
Returns:
The connector's metadata dict (the 'data' section), or None if not found
Example:
>>> metadata = get_connector_metadata("/path/to/airbyte", "source-faker") >>> metadata.get("supportLevel") 'certified'
281def get_connectors_by_language( 282 repo_path: str | Path, 283 language: ConnectorLanguage, 284) -> set[str]: 285 """Get set of all connector IDs for a specific language. 286 287 This function reads connector directories to determine which connectors 288 use the specified language. 289 290 Args: 291 repo_path: Path to the Airbyte monorepo 292 language: Language to filter by 293 294 Returns: 295 Set of connector technical names using the specified language 296 297 Example: 298 >>> python_connectors = get_connectors_by_language( 299 ... "/path/to/airbyte", ConnectorLanguage.PYTHON 300 ... ) 301 >>> "source-faker" in python_connectors 302 True 303 """ 304 repo_path = Path(repo_path) 305 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 306 307 if not connectors_dir.exists(): 308 raise ValueError(f"Connectors directory not found: {connectors_dir}") 309 310 language_value = language.value 311 connectors_by_language = set() 312 313 # Iterate through all connector directories 314 for connector_dir in connectors_dir.iterdir(): 315 if not connector_dir.is_dir(): 316 continue 317 318 # Determine language based on file structure 319 connector_name = connector_dir.name 320 detected_language = _detect_connector_language(connector_dir, connector_name) 321 322 if detected_language == language_value: 323 connectors_by_language.add(connector_dir.name) 324 325 return connectors_by_language
Get set of all connector IDs for a specific language.
This function reads connector directories to determine which connectors use the specified language.
Arguments:
- repo_path: Path to the Airbyte monorepo
- language: Language to filter by
Returns:
Set of connector technical names using the specified language
Example:
>>> python_connectors = get_connectors_by_language( ... "/path/to/airbyte", ConnectorLanguage.PYTHON ... ) >>> "source-faker" in python_connectors True
328def get_connectors_by_type( 329 repo_path: str | Path, 330 connector_type: ConnectorType, 331) -> set[str]: 332 """Get set of all connector IDs for a specific type (source or destination). 333 334 This function reads connector directories to determine which connectors 335 match the specified type based on their metadata.yaml or name prefix. 336 337 Args: 338 repo_path: Path to the Airbyte monorepo 339 connector_type: Type to filter by (source or destination) 340 341 Returns: 342 Set of connector technical names matching the specified type 343 344 Example: 345 >>> source_connectors = get_connectors_by_type( 346 ... "/path/to/airbyte", ConnectorType.SOURCE 347 ... ) 348 >>> "source-postgres" in source_connectors 349 True 350 """ 351 repo_path = Path(repo_path) 352 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 353 354 if not connectors_dir.exists(): 355 raise ValueError(f"Connectors directory not found: {connectors_dir}") 356 357 type_value = connector_type.value 358 connectors_by_type = set() 359 360 for connector_dir in connectors_dir.iterdir(): 361 if not connector_dir.is_dir(): 362 continue 363 364 connector_name = connector_dir.name 365 366 # First try to get type from metadata.yaml 367 metadata = get_connector_metadata(repo_path, connector_name) 368 if metadata: 369 metadata_type = metadata.get("connectorType") 370 if metadata_type == type_value: 371 connectors_by_type.add(connector_name) 372 continue 373 374 # Fallback to name prefix detection 375 if connector_name.startswith(f"{type_value}-"): 376 connectors_by_type.add(connector_name) 377 378 return connectors_by_type
Get set of all connector IDs for a specific type (source or destination).
This function reads connector directories to determine which connectors match the specified type based on their metadata.yaml or name prefix.
Arguments:
- repo_path: Path to the Airbyte monorepo
- connector_type: Type to filter by (source or destination)
Returns:
Set of connector technical names matching the specified type
Example:
>>> source_connectors = get_connectors_by_type( ... "/path/to/airbyte", ConnectorType.SOURCE ... ) >>> "source-postgres" in source_connectors True
381def get_connectors_by_subtype( 382 repo_path: str | Path, 383 connector_subtype: ConnectorSubtype, 384) -> set[str]: 385 """Get set of all connector IDs for a specific subtype (api, database, file, etc.). 386 387 This function reads connector metadata.yaml files to determine which connectors 388 match the specified subtype. 389 390 Args: 391 repo_path: Path to the Airbyte monorepo 392 connector_subtype: Subtype to filter by (api, database, file, custom) 393 394 Returns: 395 Set of connector technical names matching the specified subtype 396 397 Example: 398 >>> database_connectors = get_connectors_by_subtype( 399 ... "/path/to/airbyte", ConnectorSubtype.DATABASE 400 ... ) 401 >>> "source-postgres" in database_connectors 402 True 403 """ 404 repo_path = Path(repo_path) 405 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 406 407 if not connectors_dir.exists(): 408 raise ValueError(f"Connectors directory not found: {connectors_dir}") 409 410 subtype_value = connector_subtype.value 411 connectors_by_subtype = set() 412 413 for connector_dir in connectors_dir.iterdir(): 414 if not connector_dir.is_dir(): 415 continue 416 417 connector_name = connector_dir.name 418 metadata = get_connector_metadata(repo_path, connector_name) 419 420 if metadata: 421 metadata_subtype = metadata.get("connectorSubtype") 422 if metadata_subtype == subtype_value: 423 connectors_by_subtype.add(connector_name) 424 425 return connectors_by_subtype
Get set of all connector IDs for a specific subtype (api, database, file, etc.).
This function reads connector metadata.yaml files to determine which connectors match the specified subtype.
Arguments:
- repo_path: Path to the Airbyte monorepo
- connector_subtype: Subtype to filter by (api, database, file, custom)
Returns:
Set of connector technical names matching the specified subtype
Example:
>>> database_connectors = get_connectors_by_subtype( ... "/path/to/airbyte", ConnectorSubtype.DATABASE ... ) >>> "source-postgres" in database_connectors True
463@lru_cache(maxsize=128) 464def get_connectors_with_local_cdk(repo_path: str | Path) -> set[str]: 465 """Get set of connectors using local CDK reference instead of published version. 466 467 This function detects connectors that are configured to use a local CDK 468 checkout rather than a published CDK version. The detection method differs 469 by connector language: 470 - Python: Checks for path-based dependency in pyproject.toml 471 - Java/Kotlin: Checks for useLocalCdk=true in build.gradle files 472 473 Args: 474 repo_path: Path to the Airbyte monorepo 475 476 Returns: 477 Set of connector technical names using local CDK reference 478 479 Example: 480 >>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte") 481 >>> "destination-motherduck" in local_cdk_connectors 482 True 483 """ 484 repo_path = Path(repo_path) 485 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 486 487 if not connectors_dir.exists(): 488 raise ValueError(f"Connectors directory not found: {connectors_dir}") 489 490 local_cdk_connectors = set() 491 492 # Iterate through all connector directories 493 for connector_dir in connectors_dir.iterdir(): 494 if not connector_dir.is_dir(): 495 continue 496 497 connector_name = connector_dir.name 498 detected_language = _detect_connector_language(connector_dir, connector_name) 499 500 # Check for local CDK based on language 501 if detected_language in ("python", "low-code") and _has_local_cdk_python( 502 connector_dir 503 ): 504 # Python connectors: check pyproject.toml for path-based CDK dependency 505 local_cdk_connectors.add(connector_name) 506 elif detected_language == "java" and _has_local_cdk_java(connector_dir): 507 # Java/Kotlin connectors: check build.gradle for useLocalCdk=true 508 local_cdk_connectors.add(connector_name) 509 510 return local_cdk_connectors
Get set of connectors using local CDK reference instead of published version.
This function detects connectors that are configured to use a local CDK checkout rather than a published CDK version. The detection method differs by connector language:
- Python: Checks for path-based dependency in pyproject.toml
- Java/Kotlin: Checks for useLocalCdk=true in build.gradle files
Arguments:
- repo_path: Path to the Airbyte monorepo
Returns:
Set of connector technical names using local CDK reference
Example:
>>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte") >>> "destination-motherduck" in local_cdk_connectors True
570@dataclass 571class ConnectorListResult: 572 """Result of listing connectors with filters.""" 573 574 connectors: list[str] 575 count: int
Result of listing connectors with filters.
578def list_connectors( 579 repo_path: str | Path, 580 certified: bool | None = None, 581 modified: bool | None = None, 582 language_filter: set[str] | None = None, 583 language_exclude: set[str] | None = None, 584 connector_type: str | None = None, 585 connector_subtype: str | None = None, 586 base_ref: str | None = None, 587 head_ref: str | None = None, 588 pr_number: int | None = None, 589 pr_owner: str | None = None, 590 pr_repo: str | None = None, 591 gh_token: str | None = None, 592) -> ConnectorListResult: 593 """List connectors in the Airbyte monorepo with flexible filtering. 594 595 This is the core capability function that encapsulates all filtering logic. 596 Both CLI and MCP layers should use this function. 597 598 When `modified` filtering is requested and both `gh_token` and full PR 599 context (`pr_number`, `pr_owner`, `pr_repo`) are provided, the 600 GitHub API is used to determine changed files — this is the most reliable 601 method and avoids shallow-clone pitfalls. Falls back to local `git diff` 602 when the token is not provided or PR context is incomplete. 603 604 Args: 605 repo_path: Path to the Airbyte monorepo 606 certified: Filter by certification status (True=certified only, 607 False=non-certified only, None=all) 608 modified: Filter by modification status (True=modified only, 609 False=not-modified only, None=all) 610 language_filter: Set of languages to include (python, java, low-code, manifest-only) 611 language_exclude: Set of languages to exclude (mutually exclusive with language_filter) 612 connector_type: Filter by connector type (source, destination, or None for all) 613 connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all) 614 base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master") 615 head_ref: Head git reference for modification detection (default: "HEAD") 616 pr_number: Pull request number (enables GitHub API path for modification detection) 617 pr_owner: Repository owner for PR (e.g. "airbytehq") 618 pr_repo: Repository name for PR (e.g. "airbyte") 619 gh_token: GitHub API token. When provided together with PR context, 620 the GitHub API is used instead of local git diff. 621 622 Returns: 623 ConnectorListResult with sorted list of connector names and count 624 625 Raises: 626 ValueError: If both language_filter and language_exclude are provided 627 """ 628 # Validate mutual exclusivity of language filters 629 if language_filter is not None and language_exclude is not None: 630 raise ValueError( 631 "Cannot specify both language_filter and language_exclude. " 632 "Only one language filter parameter is accepted." 633 ) 634 635 # Start with all connectors 636 result = get_all_connectors(repo_path) 637 638 # Apply certified filter 639 if certified is not None: 640 certified_set = get_certified_connectors(repo_path) 641 if certified: 642 # Include only certified 643 result &= certified_set 644 else: 645 # Include only non-certified 646 result -= certified_set 647 648 # Apply modified filter 649 if modified is not None: 650 changed_set: set[str] | None = None 651 652 # Use GitHub API when token and full PR context are explicitly provided 653 if ( 654 gh_token is not None 655 and pr_number is not None 656 and pr_owner is not None 657 and pr_repo is not None 658 ): 659 try: 660 changed_set = set( 661 get_modified_connectors_from_github( 662 pr_number, pr_owner, pr_repo, gh_token=gh_token 663 ) 664 ) 665 except (GitHubAPIError, requests.RequestException): 666 logger.warning( 667 "GitHub API call failed for PR %s/%s#%d; " 668 "falling back to local git diff", 669 pr_owner, 670 pr_repo, 671 pr_number, 672 exc_info=True, 673 ) 674 675 # Fallback to local git diff 676 if changed_set is None: 677 base = base_ref if base_ref is not None else GIT_DEFAULT_BRANCH 678 head = head_ref if head_ref is not None else "HEAD" 679 changed_set = set(get_modified_connectors(repo_path, base, head)) 680 681 if modified: 682 # Include only modified 683 result &= changed_set 684 else: 685 # Include only not-modified 686 result -= changed_set 687 688 # Apply language include filter 689 if language_filter: 690 # Get connectors for all specified languages and union them 691 lang_result: set[str] = set() 692 for lang in language_filter: 693 lang_result |= get_connectors_by_language( 694 repo_path, ConnectorLanguage(lang) 695 ) 696 result &= lang_result 697 698 # Apply language exclude filter 699 if language_exclude: 700 # Get connectors for all specified languages and exclude them 701 for lang in language_exclude: 702 excluded = get_connectors_by_language(repo_path, ConnectorLanguage(lang)) 703 result -= excluded 704 705 # Apply connector type filter 706 if connector_type is not None: 707 type_set = get_connectors_by_type(repo_path, ConnectorType(connector_type)) 708 result &= type_set 709 710 # Apply connector subtype filter 711 if connector_subtype is not None: 712 subtype_set = get_connectors_by_subtype( 713 repo_path, ConnectorSubtype(connector_subtype) 714 ) 715 result &= subtype_set 716 717 return ConnectorListResult( 718 connectors=sorted(result), 719 count=len(result), 720 )
List connectors in the Airbyte monorepo with flexible filtering.
This is the core capability function that encapsulates all filtering logic. Both CLI and MCP layers should use this function.
When modified filtering is requested and both gh_token and full PR
context (pr_number, pr_owner, pr_repo) are provided, the
GitHub API is used to determine changed files — this is the most reliable
method and avoids shallow-clone pitfalls. Falls back to local git diff
when the token is not provided or PR context is incomplete.
Arguments:
- repo_path: Path to the Airbyte monorepo
- certified: Filter by certification status (True=certified only, False=non-certified only, None=all)
- modified: Filter by modification status (True=modified only, False=not-modified only, None=all)
- language_filter: Set of languages to include (python, java, low-code, manifest-only)
- language_exclude: Set of languages to exclude (mutually exclusive with language_filter)
- connector_type: Filter by connector type (source, destination, or None for all)
- connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all)
- base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master")
- head_ref: Head git reference for modification detection (default: "HEAD")
- pr_number: Pull request number (enables GitHub API path for modification detection)
- pr_owner: Repository owner for PR (e.g. "airbytehq")
- pr_repo: Repository name for PR (e.g. "airbyte")
- gh_token: GitHub API token. When provided together with PR context, the GitHub API is used instead of local git diff.
Returns:
ConnectorListResult with sorted list of connector names and count
Raises:
- ValueError: If both language_filter and language_exclude are provided