airbyte_ops_mcp.airbyte_repo.list_connectors

Detect changed connectors in the Airbyte monorepo.

This module provides functionality to detect which connectors have been modified by comparing git diffs between branches.

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Detect changed connectors in the Airbyte monorepo.
  3
  4This module provides functionality to detect which connectors have been modified
  5by comparing git diffs between branches.
  6"""
  7
  8from __future__ import annotations
  9
 10import logging
 11import re
 12import subprocess
 13from dataclasses import dataclass
 14from enum import StrEnum
 15from functools import lru_cache
 16from pathlib import Path
 17from typing import Any
 18
 19import requests
 20import yaml
 21
 22from airbyte_ops_mcp.github_api import GitHubAPIError
 23from airbyte_ops_mcp.registry._enums import ConnectorLanguage, ConnectorType
 24
 25CONNECTOR_PATH_PREFIX = "airbyte-integrations/connectors"
 26METADATA_FILE_NAME = "metadata.yaml"
 27GIT_DEFAULT_BRANCH = "origin/master"
 28
 29logger = logging.getLogger(__name__)
 30
 31
 32class ConnectorSubtype(StrEnum):
 33    """Connector subtypes based on data source category."""
 34
 35    API = "api"
 36    DATABASE = "database"
 37    FILE = "file"
 38    CUSTOM = "custom"
 39
 40
 41def _extract_connector_names(changed_files: list[str]) -> list[str]:
 42    """Extract unique connector names from a list of changed file paths.
 43
 44    Args:
 45        changed_files: File paths relative to the repo root.
 46
 47    Returns:
 48        Sorted list of connector technical names.
 49    """
 50    changed_connectors: set[str] = set()
 51    for file_path in changed_files:
 52        if file_path.startswith(CONNECTOR_PATH_PREFIX + "/"):
 53            # e.g. "airbyte-integrations/connectors/source-faker/..." -> "source-faker"
 54            path_parts = file_path.split("/")
 55            if len(path_parts) >= 3:
 56                changed_connectors.add(path_parts[2])
 57    return sorted(changed_connectors)
 58
 59
 60def get_modified_connectors(
 61    repo_path: str | Path,
 62    base_ref: str = GIT_DEFAULT_BRANCH,
 63    head_ref: str = "HEAD",
 64) -> list[str]:
 65    """Get list of connector IDs modified according to a local git diff.
 66
 67    This is the *local-only* fallback used when the GitHub API path is not
 68    available (e.g. no PR number or no token).  It uses `git merge-base`
 69    when possible so that only changes introduced on the head branch are
 70    reported, falling back to a plain two-ref diff when the merge-base
 71    cannot be determined (shallow clones without enough history).
 72
 73    Prefer :func:`get_modified_connectors_from_github` whenever a PR number
 74    is known, as it avoids shallow-clone pitfalls entirely.
 75
 76    Args:
 77        repo_path: Path to the Airbyte monorepo.
 78        base_ref: Base git reference to compare against.
 79        head_ref: Head git reference to compare.
 80
 81    Returns:
 82        Sorted list of connector technical names.
 83    """
 84    repo_path = Path(repo_path)
 85
 86    # Try to find the merge-base so the diff only contains changes introduced
 87    # on the head branch, avoiding false positives from unrelated base-branch
 88    # commits.  Falls back to a plain two-ref diff when unavailable.
 89    effective_base = base_ref
 90    try:
 91        mb_result = subprocess.run(
 92            ["git", "merge-base", base_ref, head_ref],
 93            cwd=repo_path,
 94            capture_output=True,
 95            text=True,
 96            check=True,
 97            timeout=30,
 98        )
 99        effective_base = mb_result.stdout.strip()
100        logger.debug("Using merge-base %s for diff", effective_base)
101    except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
102        logger.debug(
103            "Could not determine merge-base for %s..%s; falling back to plain diff",
104            base_ref,
105            head_ref,
106        )
107
108    try:
109        result = subprocess.run(
110            ["git", "diff", "--name-only", effective_base, head_ref],
111            cwd=repo_path,
112            capture_output=True,
113            text=True,
114            check=True,
115            timeout=30,
116        )
117        changed_files = result.stdout.strip().split("\n")
118    except subprocess.CalledProcessError as e:
119        raise RuntimeError(f"Failed to get git diff: {e.stderr}") from e
120    except subprocess.TimeoutExpired as e:
121        raise RuntimeError("Git diff command timed out after 30 seconds") from e
122
123    return _extract_connector_names(changed_files)
124
125
126def get_modified_connectors_from_github(
127    pr_number: int,
128    pr_owner: str,
129    pr_repo: str,
130    gh_token: str,
131) -> list[str]:
132    """Get list of connector IDs modified in a PR via the GitHub API.
133
134    This is the preferred path when a PR number is known because it is
135    immune to shallow-clone depth issues and always returns an accurate
136    diff (only the files the PR actually changes).
137
138    Args:
139        pr_number: Pull request number.
140        pr_owner: Repository owner (e.g. `"airbytehq"`).
141        pr_repo: Repository name (e.g. `"airbyte"`).
142        gh_token: GitHub API token (must be provided explicitly).
143
144    Returns:
145        Sorted list of connector technical names.
146
147    Raises:
148        :class:`~airbyte_ops_mcp.github_api.GitHubAPIError`: If the API
149            request fails.
150    """
151    from airbyte_ops_mcp.github_api import get_pr_changed_files
152
153    logger.info(
154        "Fetching changed files for PR %s/%s#%d via GitHub API",
155        pr_owner,
156        pr_repo,
157        pr_number,
158    )
159    changed_files = get_pr_changed_files(pr_owner, pr_repo, pr_number, token=gh_token)
160    return _extract_connector_names(changed_files)
161
162
163@lru_cache(maxsize=128)
164def get_certified_connectors(repo_path: str | Path) -> set[str]:
165    """Get set of all certified connector IDs from metadata.
166
167    This function reads the metadata.yaml file for each connector to determine
168    which connectors have "certified" support level.
169
170    Args:
171        repo_path: Path to the Airbyte monorepo
172
173    Returns:
174        Set of certified connector technical names
175
176    Example:
177        >>> certified = get_certified_connectors("/path/to/airbyte")
178        >>> "source-postgres" in certified
179        True
180    """
181    repo_path = Path(repo_path)
182    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
183
184    if not connectors_dir.exists():
185        raise ValueError(f"Connectors directory not found: {connectors_dir}")
186
187    certified_connectors = set()
188
189    # Iterate through all connector directories
190    for connector_dir in connectors_dir.iterdir():
191        if not connector_dir.is_dir():
192            continue
193
194        metadata_file = connector_dir / METADATA_FILE_NAME
195        if not metadata_file.exists():
196            continue
197
198        # Read metadata to check support level
199        try:
200            import yaml
201
202            with open(metadata_file) as f:
203                metadata = yaml.safe_load(f)
204
205            support_level = metadata.get("data", {}).get("supportLevel")
206            if support_level == "certified":
207                certified_connectors.add(connector_dir.name)
208        except Exception:
209            # Skip connectors with invalid metadata
210            continue
211
212    return certified_connectors
213
214
215def get_all_connectors(repo_path: str | Path) -> set[str]:
216    """Get set of all connector IDs in the repository.
217
218    Args:
219        repo_path: Path to the Airbyte monorepo
220
221    Returns:
222        Set of all connector technical names
223
224    Example:
225        >>> all_connectors = get_all_connectors("/path/to/airbyte")
226        >>> "source-faker" in all_connectors
227        True
228    """
229    repo_path = Path(repo_path)
230    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
231
232    if not connectors_dir.exists():
233        raise ValueError(f"Connectors directory not found: {connectors_dir}")
234
235    return {p.name for p in connectors_dir.iterdir() if p.is_dir()}
236
237
238def get_connector_metadata(
239    repo_path: str | Path,
240    connector_name: str,
241) -> dict[str, Any] | None:
242    """Get metadata for a specific connector.
243
244    Args:
245        repo_path: Path to the Airbyte monorepo
246        connector_name: Technical name of the connector (e.g., "source-faker")
247
248    Returns:
249        The connector's metadata dict (the 'data' section), or None if not found
250
251    Example:
252        >>> metadata = get_connector_metadata("/path/to/airbyte", "source-faker")
253        >>> metadata.get("supportLevel")
254        'certified'
255    """
256    repo_path = Path(repo_path)
257    connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name
258    metadata_file = connector_dir / METADATA_FILE_NAME
259
260    if not metadata_file.exists():
261        return None
262
263    try:
264        with open(metadata_file) as f:
265            metadata = yaml.safe_load(f)
266        return metadata.get("data", {})
267    except Exception:
268        return None
269
270
271def get_connectors_by_language(
272    repo_path: str | Path,
273    language: ConnectorLanguage,
274) -> set[str]:
275    """Get set of all connector IDs for a specific language.
276
277    This function reads connector directories to determine which connectors
278    use the specified language.
279
280    Args:
281        repo_path: Path to the Airbyte monorepo
282        language: Language to filter by
283
284    Returns:
285        Set of connector technical names using the specified language
286
287    Example:
288        >>> python_connectors = get_connectors_by_language(
289        ...     "/path/to/airbyte", ConnectorLanguage.PYTHON
290        ... )
291        >>> "source-faker" in python_connectors
292        True
293    """
294    repo_path = Path(repo_path)
295    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
296
297    if not connectors_dir.exists():
298        raise ValueError(f"Connectors directory not found: {connectors_dir}")
299
300    language_value = language.value
301    connectors_by_language = set()
302
303    # Iterate through all connector directories
304    for connector_dir in connectors_dir.iterdir():
305        if not connector_dir.is_dir():
306            continue
307
308        # Determine language based on file structure
309        connector_name = connector_dir.name
310        detected_language = _detect_connector_language(connector_dir, connector_name)
311
312        if detected_language == language_value:
313            connectors_by_language.add(connector_dir.name)
314
315    return connectors_by_language
316
317
318def get_connectors_by_type(
319    repo_path: str | Path,
320    connector_type: ConnectorType,
321) -> set[str]:
322    """Get set of all connector IDs for a specific type (source or destination).
323
324    This function reads connector directories to determine which connectors
325    match the specified type based on their metadata.yaml or name prefix.
326
327    Args:
328        repo_path: Path to the Airbyte monorepo
329        connector_type: Type to filter by (source or destination)
330
331    Returns:
332        Set of connector technical names matching the specified type
333
334    Example:
335        >>> source_connectors = get_connectors_by_type(
336        ...     "/path/to/airbyte", ConnectorType.SOURCE
337        ... )
338        >>> "source-postgres" in source_connectors
339        True
340    """
341    repo_path = Path(repo_path)
342    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
343
344    if not connectors_dir.exists():
345        raise ValueError(f"Connectors directory not found: {connectors_dir}")
346
347    type_value = connector_type.value
348    connectors_by_type = set()
349
350    for connector_dir in connectors_dir.iterdir():
351        if not connector_dir.is_dir():
352            continue
353
354        connector_name = connector_dir.name
355
356        # First try to get type from metadata.yaml
357        metadata = get_connector_metadata(repo_path, connector_name)
358        if metadata:
359            metadata_type = metadata.get("connectorType")
360            if metadata_type == type_value:
361                connectors_by_type.add(connector_name)
362                continue
363
364        # Fallback to name prefix detection
365        if connector_name.startswith(f"{type_value}-"):
366            connectors_by_type.add(connector_name)
367
368    return connectors_by_type
369
370
371def get_connectors_by_subtype(
372    repo_path: str | Path,
373    connector_subtype: ConnectorSubtype,
374) -> set[str]:
375    """Get set of all connector IDs for a specific subtype (api, database, file, etc.).
376
377    This function reads connector metadata.yaml files to determine which connectors
378    match the specified subtype.
379
380    Args:
381        repo_path: Path to the Airbyte monorepo
382        connector_subtype: Subtype to filter by (api, database, file, custom)
383
384    Returns:
385        Set of connector technical names matching the specified subtype
386
387    Example:
388        >>> database_connectors = get_connectors_by_subtype(
389        ...     "/path/to/airbyte", ConnectorSubtype.DATABASE
390        ... )
391        >>> "source-postgres" in database_connectors
392        True
393    """
394    repo_path = Path(repo_path)
395    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
396
397    if not connectors_dir.exists():
398        raise ValueError(f"Connectors directory not found: {connectors_dir}")
399
400    subtype_value = connector_subtype.value
401    connectors_by_subtype = set()
402
403    for connector_dir in connectors_dir.iterdir():
404        if not connector_dir.is_dir():
405            continue
406
407        connector_name = connector_dir.name
408        metadata = get_connector_metadata(repo_path, connector_name)
409
410        if metadata:
411            metadata_subtype = metadata.get("connectorSubtype")
412            if metadata_subtype == subtype_value:
413                connectors_by_subtype.add(connector_name)
414
415    return connectors_by_subtype
416
417
418@lru_cache(maxsize=1024)
419def _detect_connector_language(connector_dir: Path, connector_name: str) -> str | None:
420    """Detect the language of a connector based on its file structure.
421
422    Args:
423        connector_dir: Path to the connector directory
424        connector_name: Technical name of the connector
425
426    Returns:
427        Language string (python, java, low-code, manifest-only) or None
428    """
429    # Check for manifest-only (manifest.yaml at root)
430    if (connector_dir / "manifest.yaml").is_file():
431        return "manifest-only"
432
433    # Check for low-code (manifest.yaml in source directory)
434    source_dir = connector_dir / connector_name.replace("-", "_")
435    if (source_dir / "manifest.yaml").is_file():
436        return "low-code"
437
438    # Check for Python (setup.py or pyproject.toml)
439    if (connector_dir / "setup.py").is_file() or (
440        connector_dir / "pyproject.toml"
441    ).is_file():
442        return "python"
443
444    # Check for Java/Kotlin (src/main/java or src/main/kotlin)
445    if (connector_dir / "src" / "main" / "java").exists() or (
446        connector_dir / "src" / "main" / "kotlin"
447    ).exists():
448        return "java"
449
450    return None
451
452
453@lru_cache(maxsize=128)
454def get_connectors_with_local_cdk(repo_path: str | Path) -> set[str]:
455    """Get set of connectors using local CDK reference instead of published version.
456
457    This function detects connectors that are configured to use a local CDK
458    checkout rather than a published CDK version. The detection method differs
459    by connector language:
460    - Python: Checks for path-based dependency in pyproject.toml
461    - Java/Kotlin: Checks for useLocalCdk=true in build.gradle files
462
463    Args:
464        repo_path: Path to the Airbyte monorepo
465
466    Returns:
467        Set of connector technical names using local CDK reference
468
469    Example:
470        >>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte")
471        >>> "destination-motherduck" in local_cdk_connectors
472        True
473    """
474    repo_path = Path(repo_path)
475    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
476
477    if not connectors_dir.exists():
478        raise ValueError(f"Connectors directory not found: {connectors_dir}")
479
480    local_cdk_connectors = set()
481
482    # Iterate through all connector directories
483    for connector_dir in connectors_dir.iterdir():
484        if not connector_dir.is_dir():
485            continue
486
487        connector_name = connector_dir.name
488        detected_language = _detect_connector_language(connector_dir, connector_name)
489
490        # Check for local CDK based on language
491        if detected_language in ("python", "low-code") and _has_local_cdk_python(
492            connector_dir
493        ):
494            # Python connectors: check pyproject.toml for path-based CDK dependency
495            local_cdk_connectors.add(connector_name)
496        elif detected_language == "java" and _has_local_cdk_java(connector_dir):
497            # Java/Kotlin connectors: check build.gradle for useLocalCdk=true
498            local_cdk_connectors.add(connector_name)
499
500    return local_cdk_connectors
501
502
503def _has_local_cdk_python(connector_dir: Path) -> bool:
504    """Check if a Python connector uses local CDK reference.
505
506    Args:
507        connector_dir: Path to the connector directory
508
509    Returns:
510        True if connector uses local CDK reference
511    """
512    pyproject_file = connector_dir / "pyproject.toml"
513    if not pyproject_file.exists():
514        return False
515
516    try:
517        content = pyproject_file.read_text()
518        # Look for path-based airbyte-cdk dependency
519        # Pattern: airbyte-cdk = { path = "..." } or airbyte-cdk = {path = "..."}
520        return bool(re.search(r"airbyte-cdk\s*=\s*\{[^}]*path\s*=", content))
521    except Exception:
522        return False
523
524
525def _has_local_cdk_java(connector_dir: Path) -> bool:
526    """Check if a Java/Kotlin connector uses local CDK reference.
527
528    Mirrors the implementation from airbyte-ci/connectors/connector_ops/connector_ops/utils.py
529
530    Args:
531        connector_dir: Path to the connector directory
532
533    Returns:
534        True if connector uses local CDK reference
535    """
536    # Check both build.gradle and build.gradle.kts
537    for build_file_name in ("build.gradle", "build.gradle.kts"):
538        build_file = connector_dir / build_file_name
539        if not build_file.exists():
540            continue
541
542        try:
543            # Read file and strip inline comments
544            contents = "\n".join(
545                [
546                    line.split("//")[0]  # Remove inline comments
547                    for line in build_file.read_text().split("\n")
548                ]
549            )
550            # Remove spaces and check for useLocalCdk=true
551            contents = contents.replace(" ", "")
552            if "useLocalCdk=true" in contents:
553                return True
554        except Exception:
555            continue
556
557    return False
558
559
560@dataclass
561class ConnectorListResult:
562    """Result of listing connectors with filters."""
563
564    connectors: list[str]
565    count: int
566
567
568def list_connectors(
569    repo_path: str | Path,
570    certified: bool | None = None,
571    modified: bool | None = None,
572    language_filter: set[str] | None = None,
573    language_exclude: set[str] | None = None,
574    connector_type: str | None = None,
575    connector_subtype: str | None = None,
576    base_ref: str | None = None,
577    head_ref: str | None = None,
578    pr_number: int | None = None,
579    pr_owner: str | None = None,
580    pr_repo: str | None = None,
581    gh_token: str | None = None,
582) -> ConnectorListResult:
583    """List connectors in the Airbyte monorepo with flexible filtering.
584
585    This is the core capability function that encapsulates all filtering logic.
586    Both CLI and MCP layers should use this function.
587
588    When `modified` filtering is requested and both `gh_token` and full PR
589    context (`pr_number`, `pr_owner`, `pr_repo`) are provided, the
590    GitHub API is used to determine changed files — this is the most reliable
591    method and avoids shallow-clone pitfalls.  Falls back to local `git diff`
592    when the token is not provided or PR context is incomplete.
593
594    Args:
595        repo_path: Path to the Airbyte monorepo
596        certified: Filter by certification status (True=certified only,
597            False=non-certified only, None=all)
598        modified: Filter by modification status (True=modified only,
599            False=not-modified only, None=all)
600        language_filter: Set of languages to include (python, java, low-code, manifest-only)
601        language_exclude: Set of languages to exclude (mutually exclusive with language_filter)
602        connector_type: Filter by connector type (source, destination, or None for all)
603        connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all)
604        base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master")
605        head_ref: Head git reference for modification detection (default: "HEAD")
606        pr_number: Pull request number (enables GitHub API path for modification detection)
607        pr_owner: Repository owner for PR (e.g. "airbytehq")
608        pr_repo: Repository name for PR (e.g. "airbyte")
609        gh_token: GitHub API token. When provided together with PR context,
610            the GitHub API is used instead of local git diff.
611
612    Returns:
613        ConnectorListResult with sorted list of connector names and count
614
615    Raises:
616        ValueError: If both language_filter and language_exclude are provided
617    """
618    # Validate mutual exclusivity of language filters
619    if language_filter is not None and language_exclude is not None:
620        raise ValueError(
621            "Cannot specify both language_filter and language_exclude. "
622            "Only one language filter parameter is accepted."
623        )
624
625    # Start with all connectors
626    result = get_all_connectors(repo_path)
627
628    # Apply certified filter
629    if certified is not None:
630        certified_set = get_certified_connectors(repo_path)
631        if certified:
632            # Include only certified
633            result &= certified_set
634        else:
635            # Include only non-certified
636            result -= certified_set
637
638    # Apply modified filter
639    if modified is not None:
640        changed_set: set[str] | None = None
641
642        # Use GitHub API when token and full PR context are explicitly provided
643        if (
644            gh_token is not None
645            and pr_number is not None
646            and pr_owner is not None
647            and pr_repo is not None
648        ):
649            try:
650                changed_set = set(
651                    get_modified_connectors_from_github(
652                        pr_number, pr_owner, pr_repo, gh_token=gh_token
653                    )
654                )
655            except (GitHubAPIError, requests.RequestException):
656                logger.warning(
657                    "GitHub API call failed for PR %s/%s#%d; "
658                    "falling back to local git diff",
659                    pr_owner,
660                    pr_repo,
661                    pr_number,
662                    exc_info=True,
663                )
664
665        # Fallback to local git diff
666        if changed_set is None:
667            base = base_ref if base_ref is not None else GIT_DEFAULT_BRANCH
668            head = head_ref if head_ref is not None else "HEAD"
669            changed_set = set(get_modified_connectors(repo_path, base, head))
670
671        if modified:
672            # Include only modified
673            result &= changed_set
674        else:
675            # Include only not-modified
676            result -= changed_set
677
678    # Apply language include filter
679    if language_filter:
680        # Get connectors for all specified languages and union them
681        lang_result: set[str] = set()
682        for lang in language_filter:
683            lang_result |= get_connectors_by_language(
684                repo_path, ConnectorLanguage(lang)
685            )
686        result &= lang_result
687
688    # Apply language exclude filter
689    if language_exclude:
690        # Get connectors for all specified languages and exclude them
691        for lang in language_exclude:
692            excluded = get_connectors_by_language(repo_path, ConnectorLanguage(lang))
693            result -= excluded
694
695    # Apply connector type filter
696    if connector_type is not None:
697        type_set = get_connectors_by_type(repo_path, ConnectorType(connector_type))
698        result &= type_set
699
700    # Apply connector subtype filter
701    if connector_subtype is not None:
702        subtype_set = get_connectors_by_subtype(
703            repo_path, ConnectorSubtype(connector_subtype)
704        )
705        result &= subtype_set
706
707    return ConnectorListResult(
708        connectors=sorted(result),
709        count=len(result),
710    )
CONNECTOR_PATH_PREFIX = 'airbyte-integrations/connectors'
METADATA_FILE_NAME = 'metadata.yaml'
GIT_DEFAULT_BRANCH = 'origin/master'
logger = <Logger airbyte_ops_mcp.airbyte_repo.list_connectors (WARNING)>
class ConnectorSubtype(enum.StrEnum):
33class ConnectorSubtype(StrEnum):
34    """Connector subtypes based on data source category."""
35
36    API = "api"
37    DATABASE = "database"
38    FILE = "file"
39    CUSTOM = "custom"

Connector subtypes based on data source category.

API = <ConnectorSubtype.API: 'api'>
DATABASE = <ConnectorSubtype.DATABASE: 'database'>
FILE = <ConnectorSubtype.FILE: 'file'>
CUSTOM = <ConnectorSubtype.CUSTOM: 'custom'>
def get_modified_connectors( repo_path: str | pathlib.Path, base_ref: str = 'origin/master', head_ref: str = 'HEAD') -> list[str]:
 61def get_modified_connectors(
 62    repo_path: str | Path,
 63    base_ref: str = GIT_DEFAULT_BRANCH,
 64    head_ref: str = "HEAD",
 65) -> list[str]:
 66    """Get list of connector IDs modified according to a local git diff.
 67
 68    This is the *local-only* fallback used when the GitHub API path is not
 69    available (e.g. no PR number or no token).  It uses `git merge-base`
 70    when possible so that only changes introduced on the head branch are
 71    reported, falling back to a plain two-ref diff when the merge-base
 72    cannot be determined (shallow clones without enough history).
 73
 74    Prefer :func:`get_modified_connectors_from_github` whenever a PR number
 75    is known, as it avoids shallow-clone pitfalls entirely.
 76
 77    Args:
 78        repo_path: Path to the Airbyte monorepo.
 79        base_ref: Base git reference to compare against.
 80        head_ref: Head git reference to compare.
 81
 82    Returns:
 83        Sorted list of connector technical names.
 84    """
 85    repo_path = Path(repo_path)
 86
 87    # Try to find the merge-base so the diff only contains changes introduced
 88    # on the head branch, avoiding false positives from unrelated base-branch
 89    # commits.  Falls back to a plain two-ref diff when unavailable.
 90    effective_base = base_ref
 91    try:
 92        mb_result = subprocess.run(
 93            ["git", "merge-base", base_ref, head_ref],
 94            cwd=repo_path,
 95            capture_output=True,
 96            text=True,
 97            check=True,
 98            timeout=30,
 99        )
100        effective_base = mb_result.stdout.strip()
101        logger.debug("Using merge-base %s for diff", effective_base)
102    except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
103        logger.debug(
104            "Could not determine merge-base for %s..%s; falling back to plain diff",
105            base_ref,
106            head_ref,
107        )
108
109    try:
110        result = subprocess.run(
111            ["git", "diff", "--name-only", effective_base, head_ref],
112            cwd=repo_path,
113            capture_output=True,
114            text=True,
115            check=True,
116            timeout=30,
117        )
118        changed_files = result.stdout.strip().split("\n")
119    except subprocess.CalledProcessError as e:
120        raise RuntimeError(f"Failed to get git diff: {e.stderr}") from e
121    except subprocess.TimeoutExpired as e:
122        raise RuntimeError("Git diff command timed out after 30 seconds") from e
123
124    return _extract_connector_names(changed_files)

Get list of connector IDs modified according to a local git diff.

This is the local-only fallback used when the GitHub API path is not available (e.g. no PR number or no token). It uses git merge-base when possible so that only changes introduced on the head branch are reported, falling back to a plain two-ref diff when the merge-base cannot be determined (shallow clones without enough history).

Prefer get_modified_connectors_from_github() whenever a PR number is known, as it avoids shallow-clone pitfalls entirely.

Arguments:
  • repo_path: Path to the Airbyte monorepo.
  • base_ref: Base git reference to compare against.
  • head_ref: Head git reference to compare.
Returns:

Sorted list of connector technical names.

def get_modified_connectors_from_github(pr_number: int, pr_owner: str, pr_repo: str, gh_token: str) -> list[str]:
127def get_modified_connectors_from_github(
128    pr_number: int,
129    pr_owner: str,
130    pr_repo: str,
131    gh_token: str,
132) -> list[str]:
133    """Get list of connector IDs modified in a PR via the GitHub API.
134
135    This is the preferred path when a PR number is known because it is
136    immune to shallow-clone depth issues and always returns an accurate
137    diff (only the files the PR actually changes).
138
139    Args:
140        pr_number: Pull request number.
141        pr_owner: Repository owner (e.g. `"airbytehq"`).
142        pr_repo: Repository name (e.g. `"airbyte"`).
143        gh_token: GitHub API token (must be provided explicitly).
144
145    Returns:
146        Sorted list of connector technical names.
147
148    Raises:
149        :class:`~airbyte_ops_mcp.github_api.GitHubAPIError`: If the API
150            request fails.
151    """
152    from airbyte_ops_mcp.github_api import get_pr_changed_files
153
154    logger.info(
155        "Fetching changed files for PR %s/%s#%d via GitHub API",
156        pr_owner,
157        pr_repo,
158        pr_number,
159    )
160    changed_files = get_pr_changed_files(pr_owner, pr_repo, pr_number, token=gh_token)
161    return _extract_connector_names(changed_files)

Get list of connector IDs modified in a PR via the GitHub API.

This is the preferred path when a PR number is known because it is immune to shallow-clone depth issues and always returns an accurate diff (only the files the PR actually changes).

Arguments:
  • pr_number: Pull request number.
  • pr_owner: Repository owner (e.g. "airbytehq").
  • pr_repo: Repository name (e.g. "airbyte").
  • gh_token: GitHub API token (must be provided explicitly).
Returns:

Sorted list of connector technical names.

Raises:
  • ~airbyte_ops_mcp.github_api.GitHubAPIError: If the API request fails.
@lru_cache(maxsize=128)
def get_certified_connectors(repo_path: str | pathlib.Path) -> set[str]:
164@lru_cache(maxsize=128)
165def get_certified_connectors(repo_path: str | Path) -> set[str]:
166    """Get set of all certified connector IDs from metadata.
167
168    This function reads the metadata.yaml file for each connector to determine
169    which connectors have "certified" support level.
170
171    Args:
172        repo_path: Path to the Airbyte monorepo
173
174    Returns:
175        Set of certified connector technical names
176
177    Example:
178        >>> certified = get_certified_connectors("/path/to/airbyte")
179        >>> "source-postgres" in certified
180        True
181    """
182    repo_path = Path(repo_path)
183    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
184
185    if not connectors_dir.exists():
186        raise ValueError(f"Connectors directory not found: {connectors_dir}")
187
188    certified_connectors = set()
189
190    # Iterate through all connector directories
191    for connector_dir in connectors_dir.iterdir():
192        if not connector_dir.is_dir():
193            continue
194
195        metadata_file = connector_dir / METADATA_FILE_NAME
196        if not metadata_file.exists():
197            continue
198
199        # Read metadata to check support level
200        try:
201            import yaml
202
203            with open(metadata_file) as f:
204                metadata = yaml.safe_load(f)
205
206            support_level = metadata.get("data", {}).get("supportLevel")
207            if support_level == "certified":
208                certified_connectors.add(connector_dir.name)
209        except Exception:
210            # Skip connectors with invalid metadata
211            continue
212
213    return certified_connectors

Get set of all certified connector IDs from metadata.

This function reads the metadata.yaml file for each connector to determine which connectors have "certified" support level.

Arguments:
  • repo_path: Path to the Airbyte monorepo
Returns:

Set of certified connector technical names

Example:
>>> certified = get_certified_connectors("/path/to/airbyte")
>>> "source-postgres" in certified
True
def get_all_connectors(repo_path: str | pathlib.Path) -> set[str]:
216def get_all_connectors(repo_path: str | Path) -> set[str]:
217    """Get set of all connector IDs in the repository.
218
219    Args:
220        repo_path: Path to the Airbyte monorepo
221
222    Returns:
223        Set of all connector technical names
224
225    Example:
226        >>> all_connectors = get_all_connectors("/path/to/airbyte")
227        >>> "source-faker" in all_connectors
228        True
229    """
230    repo_path = Path(repo_path)
231    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
232
233    if not connectors_dir.exists():
234        raise ValueError(f"Connectors directory not found: {connectors_dir}")
235
236    return {p.name for p in connectors_dir.iterdir() if p.is_dir()}

Get set of all connector IDs in the repository.

Arguments:
  • repo_path: Path to the Airbyte monorepo
Returns:

Set of all connector technical names

Example:
>>> all_connectors = get_all_connectors("/path/to/airbyte")
>>> "source-faker" in all_connectors
True
def get_connector_metadata( repo_path: str | pathlib.Path, connector_name: str) -> dict[str, typing.Any] | None:
239def get_connector_metadata(
240    repo_path: str | Path,
241    connector_name: str,
242) -> dict[str, Any] | None:
243    """Get metadata for a specific connector.
244
245    Args:
246        repo_path: Path to the Airbyte monorepo
247        connector_name: Technical name of the connector (e.g., "source-faker")
248
249    Returns:
250        The connector's metadata dict (the 'data' section), or None if not found
251
252    Example:
253        >>> metadata = get_connector_metadata("/path/to/airbyte", "source-faker")
254        >>> metadata.get("supportLevel")
255        'certified'
256    """
257    repo_path = Path(repo_path)
258    connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name
259    metadata_file = connector_dir / METADATA_FILE_NAME
260
261    if not metadata_file.exists():
262        return None
263
264    try:
265        with open(metadata_file) as f:
266            metadata = yaml.safe_load(f)
267        return metadata.get("data", {})
268    except Exception:
269        return None

Get metadata for a specific connector.

Arguments:
  • repo_path: Path to the Airbyte monorepo
  • connector_name: Technical name of the connector (e.g., "source-faker")
Returns:

The connector's metadata dict (the 'data' section), or None if not found

Example:
>>> metadata = get_connector_metadata("/path/to/airbyte", "source-faker")
>>> metadata.get("supportLevel")
'certified'
def get_connectors_by_language( repo_path: str | pathlib.Path, language: airbyte_ops_mcp.registry.ConnectorLanguage) -> set[str]:
272def get_connectors_by_language(
273    repo_path: str | Path,
274    language: ConnectorLanguage,
275) -> set[str]:
276    """Get set of all connector IDs for a specific language.
277
278    This function reads connector directories to determine which connectors
279    use the specified language.
280
281    Args:
282        repo_path: Path to the Airbyte monorepo
283        language: Language to filter by
284
285    Returns:
286        Set of connector technical names using the specified language
287
288    Example:
289        >>> python_connectors = get_connectors_by_language(
290        ...     "/path/to/airbyte", ConnectorLanguage.PYTHON
291        ... )
292        >>> "source-faker" in python_connectors
293        True
294    """
295    repo_path = Path(repo_path)
296    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
297
298    if not connectors_dir.exists():
299        raise ValueError(f"Connectors directory not found: {connectors_dir}")
300
301    language_value = language.value
302    connectors_by_language = set()
303
304    # Iterate through all connector directories
305    for connector_dir in connectors_dir.iterdir():
306        if not connector_dir.is_dir():
307            continue
308
309        # Determine language based on file structure
310        connector_name = connector_dir.name
311        detected_language = _detect_connector_language(connector_dir, connector_name)
312
313        if detected_language == language_value:
314            connectors_by_language.add(connector_dir.name)
315
316    return connectors_by_language

Get set of all connector IDs for a specific language.

This function reads connector directories to determine which connectors use the specified language.

Arguments:
  • repo_path: Path to the Airbyte monorepo
  • language: Language to filter by
Returns:

Set of connector technical names using the specified language

Example:
>>> python_connectors = get_connectors_by_language(
...     "/path/to/airbyte", ConnectorLanguage.PYTHON
... )
>>> "source-faker" in python_connectors
True
def get_connectors_by_type( repo_path: str | pathlib.Path, connector_type: airbyte_ops_mcp.registry.ConnectorType) -> set[str]:
319def get_connectors_by_type(
320    repo_path: str | Path,
321    connector_type: ConnectorType,
322) -> set[str]:
323    """Get set of all connector IDs for a specific type (source or destination).
324
325    This function reads connector directories to determine which connectors
326    match the specified type based on their metadata.yaml or name prefix.
327
328    Args:
329        repo_path: Path to the Airbyte monorepo
330        connector_type: Type to filter by (source or destination)
331
332    Returns:
333        Set of connector technical names matching the specified type
334
335    Example:
336        >>> source_connectors = get_connectors_by_type(
337        ...     "/path/to/airbyte", ConnectorType.SOURCE
338        ... )
339        >>> "source-postgres" in source_connectors
340        True
341    """
342    repo_path = Path(repo_path)
343    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
344
345    if not connectors_dir.exists():
346        raise ValueError(f"Connectors directory not found: {connectors_dir}")
347
348    type_value = connector_type.value
349    connectors_by_type = set()
350
351    for connector_dir in connectors_dir.iterdir():
352        if not connector_dir.is_dir():
353            continue
354
355        connector_name = connector_dir.name
356
357        # First try to get type from metadata.yaml
358        metadata = get_connector_metadata(repo_path, connector_name)
359        if metadata:
360            metadata_type = metadata.get("connectorType")
361            if metadata_type == type_value:
362                connectors_by_type.add(connector_name)
363                continue
364
365        # Fallback to name prefix detection
366        if connector_name.startswith(f"{type_value}-"):
367            connectors_by_type.add(connector_name)
368
369    return connectors_by_type

Get set of all connector IDs for a specific type (source or destination).

This function reads connector directories to determine which connectors match the specified type based on their metadata.yaml or name prefix.

Arguments:
  • repo_path: Path to the Airbyte monorepo
  • connector_type: Type to filter by (source or destination)
Returns:

Set of connector technical names matching the specified type

Example:
>>> source_connectors = get_connectors_by_type(
...     "/path/to/airbyte", ConnectorType.SOURCE
... )
>>> "source-postgres" in source_connectors
True
def get_connectors_by_subtype( repo_path: str | pathlib.Path, connector_subtype: ConnectorSubtype) -> set[str]:
372def get_connectors_by_subtype(
373    repo_path: str | Path,
374    connector_subtype: ConnectorSubtype,
375) -> set[str]:
376    """Get set of all connector IDs for a specific subtype (api, database, file, etc.).
377
378    This function reads connector metadata.yaml files to determine which connectors
379    match the specified subtype.
380
381    Args:
382        repo_path: Path to the Airbyte monorepo
383        connector_subtype: Subtype to filter by (api, database, file, custom)
384
385    Returns:
386        Set of connector technical names matching the specified subtype
387
388    Example:
389        >>> database_connectors = get_connectors_by_subtype(
390        ...     "/path/to/airbyte", ConnectorSubtype.DATABASE
391        ... )
392        >>> "source-postgres" in database_connectors
393        True
394    """
395    repo_path = Path(repo_path)
396    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
397
398    if not connectors_dir.exists():
399        raise ValueError(f"Connectors directory not found: {connectors_dir}")
400
401    subtype_value = connector_subtype.value
402    connectors_by_subtype = set()
403
404    for connector_dir in connectors_dir.iterdir():
405        if not connector_dir.is_dir():
406            continue
407
408        connector_name = connector_dir.name
409        metadata = get_connector_metadata(repo_path, connector_name)
410
411        if metadata:
412            metadata_subtype = metadata.get("connectorSubtype")
413            if metadata_subtype == subtype_value:
414                connectors_by_subtype.add(connector_name)
415
416    return connectors_by_subtype

Get set of all connector IDs for a specific subtype (api, database, file, etc.).

This function reads connector metadata.yaml files to determine which connectors match the specified subtype.

Arguments:
  • repo_path: Path to the Airbyte monorepo
  • connector_subtype: Subtype to filter by (api, database, file, custom)
Returns:

Set of connector technical names matching the specified subtype

Example:
>>> database_connectors = get_connectors_by_subtype(
...     "/path/to/airbyte", ConnectorSubtype.DATABASE
... )
>>> "source-postgres" in database_connectors
True
@lru_cache(maxsize=128)
def get_connectors_with_local_cdk(repo_path: str | pathlib.Path) -> set[str]:
454@lru_cache(maxsize=128)
455def get_connectors_with_local_cdk(repo_path: str | Path) -> set[str]:
456    """Get set of connectors using local CDK reference instead of published version.
457
458    This function detects connectors that are configured to use a local CDK
459    checkout rather than a published CDK version. The detection method differs
460    by connector language:
461    - Python: Checks for path-based dependency in pyproject.toml
462    - Java/Kotlin: Checks for useLocalCdk=true in build.gradle files
463
464    Args:
465        repo_path: Path to the Airbyte monorepo
466
467    Returns:
468        Set of connector technical names using local CDK reference
469
470    Example:
471        >>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte")
472        >>> "destination-motherduck" in local_cdk_connectors
473        True
474    """
475    repo_path = Path(repo_path)
476    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
477
478    if not connectors_dir.exists():
479        raise ValueError(f"Connectors directory not found: {connectors_dir}")
480
481    local_cdk_connectors = set()
482
483    # Iterate through all connector directories
484    for connector_dir in connectors_dir.iterdir():
485        if not connector_dir.is_dir():
486            continue
487
488        connector_name = connector_dir.name
489        detected_language = _detect_connector_language(connector_dir, connector_name)
490
491        # Check for local CDK based on language
492        if detected_language in ("python", "low-code") and _has_local_cdk_python(
493            connector_dir
494        ):
495            # Python connectors: check pyproject.toml for path-based CDK dependency
496            local_cdk_connectors.add(connector_name)
497        elif detected_language == "java" and _has_local_cdk_java(connector_dir):
498            # Java/Kotlin connectors: check build.gradle for useLocalCdk=true
499            local_cdk_connectors.add(connector_name)
500
501    return local_cdk_connectors

Get set of connectors using local CDK reference instead of published version.

This function detects connectors that are configured to use a local CDK checkout rather than a published CDK version. The detection method differs by connector language:

  • Python: Checks for path-based dependency in pyproject.toml
  • Java/Kotlin: Checks for useLocalCdk=true in build.gradle files
Arguments:
  • repo_path: Path to the Airbyte monorepo
Returns:

Set of connector technical names using local CDK reference

Example:
>>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte")
>>> "destination-motherduck" in local_cdk_connectors
True
@dataclass
class ConnectorListResult:
561@dataclass
562class ConnectorListResult:
563    """Result of listing connectors with filters."""
564
565    connectors: list[str]
566    count: int

Result of listing connectors with filters.

ConnectorListResult(connectors: list[str], count: int)
connectors: list[str]
count: int
def list_connectors( repo_path: str | pathlib.Path, certified: bool | None = None, modified: bool | None = None, language_filter: set[str] | None = None, language_exclude: set[str] | None = None, connector_type: str | None = None, connector_subtype: str | None = None, base_ref: str | None = None, head_ref: str | None = None, pr_number: int | None = None, pr_owner: str | None = None, pr_repo: str | None = None, gh_token: str | None = None) -> ConnectorListResult:
569def list_connectors(
570    repo_path: str | Path,
571    certified: bool | None = None,
572    modified: bool | None = None,
573    language_filter: set[str] | None = None,
574    language_exclude: set[str] | None = None,
575    connector_type: str | None = None,
576    connector_subtype: str | None = None,
577    base_ref: str | None = None,
578    head_ref: str | None = None,
579    pr_number: int | None = None,
580    pr_owner: str | None = None,
581    pr_repo: str | None = None,
582    gh_token: str | None = None,
583) -> ConnectorListResult:
584    """List connectors in the Airbyte monorepo with flexible filtering.
585
586    This is the core capability function that encapsulates all filtering logic.
587    Both CLI and MCP layers should use this function.
588
589    When `modified` filtering is requested and both `gh_token` and full PR
590    context (`pr_number`, `pr_owner`, `pr_repo`) are provided, the
591    GitHub API is used to determine changed files — this is the most reliable
592    method and avoids shallow-clone pitfalls.  Falls back to local `git diff`
593    when the token is not provided or PR context is incomplete.
594
595    Args:
596        repo_path: Path to the Airbyte monorepo
597        certified: Filter by certification status (True=certified only,
598            False=non-certified only, None=all)
599        modified: Filter by modification status (True=modified only,
600            False=not-modified only, None=all)
601        language_filter: Set of languages to include (python, java, low-code, manifest-only)
602        language_exclude: Set of languages to exclude (mutually exclusive with language_filter)
603        connector_type: Filter by connector type (source, destination, or None for all)
604        connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all)
605        base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master")
606        head_ref: Head git reference for modification detection (default: "HEAD")
607        pr_number: Pull request number (enables GitHub API path for modification detection)
608        pr_owner: Repository owner for PR (e.g. "airbytehq")
609        pr_repo: Repository name for PR (e.g. "airbyte")
610        gh_token: GitHub API token. When provided together with PR context,
611            the GitHub API is used instead of local git diff.
612
613    Returns:
614        ConnectorListResult with sorted list of connector names and count
615
616    Raises:
617        ValueError: If both language_filter and language_exclude are provided
618    """
619    # Validate mutual exclusivity of language filters
620    if language_filter is not None and language_exclude is not None:
621        raise ValueError(
622            "Cannot specify both language_filter and language_exclude. "
623            "Only one language filter parameter is accepted."
624        )
625
626    # Start with all connectors
627    result = get_all_connectors(repo_path)
628
629    # Apply certified filter
630    if certified is not None:
631        certified_set = get_certified_connectors(repo_path)
632        if certified:
633            # Include only certified
634            result &= certified_set
635        else:
636            # Include only non-certified
637            result -= certified_set
638
639    # Apply modified filter
640    if modified is not None:
641        changed_set: set[str] | None = None
642
643        # Use GitHub API when token and full PR context are explicitly provided
644        if (
645            gh_token is not None
646            and pr_number is not None
647            and pr_owner is not None
648            and pr_repo is not None
649        ):
650            try:
651                changed_set = set(
652                    get_modified_connectors_from_github(
653                        pr_number, pr_owner, pr_repo, gh_token=gh_token
654                    )
655                )
656            except (GitHubAPIError, requests.RequestException):
657                logger.warning(
658                    "GitHub API call failed for PR %s/%s#%d; "
659                    "falling back to local git diff",
660                    pr_owner,
661                    pr_repo,
662                    pr_number,
663                    exc_info=True,
664                )
665
666        # Fallback to local git diff
667        if changed_set is None:
668            base = base_ref if base_ref is not None else GIT_DEFAULT_BRANCH
669            head = head_ref if head_ref is not None else "HEAD"
670            changed_set = set(get_modified_connectors(repo_path, base, head))
671
672        if modified:
673            # Include only modified
674            result &= changed_set
675        else:
676            # Include only not-modified
677            result -= changed_set
678
679    # Apply language include filter
680    if language_filter:
681        # Get connectors for all specified languages and union them
682        lang_result: set[str] = set()
683        for lang in language_filter:
684            lang_result |= get_connectors_by_language(
685                repo_path, ConnectorLanguage(lang)
686            )
687        result &= lang_result
688
689    # Apply language exclude filter
690    if language_exclude:
691        # Get connectors for all specified languages and exclude them
692        for lang in language_exclude:
693            excluded = get_connectors_by_language(repo_path, ConnectorLanguage(lang))
694            result -= excluded
695
696    # Apply connector type filter
697    if connector_type is not None:
698        type_set = get_connectors_by_type(repo_path, ConnectorType(connector_type))
699        result &= type_set
700
701    # Apply connector subtype filter
702    if connector_subtype is not None:
703        subtype_set = get_connectors_by_subtype(
704            repo_path, ConnectorSubtype(connector_subtype)
705        )
706        result &= subtype_set
707
708    return ConnectorListResult(
709        connectors=sorted(result),
710        count=len(result),
711    )

List connectors in the Airbyte monorepo with flexible filtering.

This is the core capability function that encapsulates all filtering logic. Both CLI and MCP layers should use this function.

When modified filtering is requested and both gh_token and full PR context (pr_number, pr_owner, pr_repo) are provided, the GitHub API is used to determine changed files — this is the most reliable method and avoids shallow-clone pitfalls. Falls back to local git diff when the token is not provided or PR context is incomplete.

Arguments:
  • repo_path: Path to the Airbyte monorepo
  • certified: Filter by certification status (True=certified only, False=non-certified only, None=all)
  • modified: Filter by modification status (True=modified only, False=not-modified only, None=all)
  • language_filter: Set of languages to include (python, java, low-code, manifest-only)
  • language_exclude: Set of languages to exclude (mutually exclusive with language_filter)
  • connector_type: Filter by connector type (source, destination, or None for all)
  • connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all)
  • base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master")
  • head_ref: Head git reference for modification detection (default: "HEAD")
  • pr_number: Pull request number (enables GitHub API path for modification detection)
  • pr_owner: Repository owner for PR (e.g. "airbytehq")
  • pr_repo: Repository name for PR (e.g. "airbyte")
  • gh_token: GitHub API token. When provided together with PR context, the GitHub API is used instead of local git diff.
Returns:

ConnectorListResult with sorted list of connector names and count

Raises:
  • ValueError: If both language_filter and language_exclude are provided