airbyte_ops_mcp.airbyte_repo

Airbyte repository operations.

This package provides tools for working with the Airbyte monorepo, including:

  • Detecting changed connectors
  • Filtering connectors by certification status
  • Format checking and fixing
  • CI matrix planning
 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2"""Airbyte repository operations.
 3
 4This package provides tools for working with the Airbyte monorepo, including:
 5- Detecting changed connectors
 6- Filtering connectors by certification status
 7- Format checking and fixing
 8- CI matrix planning
 9"""
10
11from __future__ import annotations
12
13from airbyte_ops_mcp.airbyte_repo.bump_version import (
14    BumpType,
15    ChangelogEntry,
16    ChangelogParsingError,
17    ConnectorNotFoundError,
18    ConnectorVersionError,
19    InvalidVersionError,
20    VersionBumpResult,
21    VersionNotFoundError,
22    bump_connector_version,
23    strip_prerelease_suffix,
24)
25from airbyte_ops_mcp.airbyte_repo.list_connectors import (
26    ConnectorLanguage,
27    ConnectorListResult,
28    get_all_connectors,
29    get_certified_connectors,
30    get_connectors_by_language,
31    get_connectors_with_local_cdk,
32    get_modified_connectors,
33    list_connectors,
34)
35from airbyte_ops_mcp.airbyte_repo.utils import (
36    detect_env_pr_info,
37    parse_pr_info,
38    resolve_diff_range,
39)
40
41__all__ = [
42    "BumpType",
43    "ChangelogEntry",
44    "ChangelogParsingError",
45    "ConnectorLanguage",
46    "ConnectorListResult",
47    "ConnectorNotFoundError",
48    "ConnectorVersionError",
49    "InvalidVersionError",
50    "VersionBumpResult",
51    "VersionNotFoundError",
52    "bump_connector_version",
53    "detect_env_pr_info",
54    "get_all_connectors",
55    "get_certified_connectors",
56    "get_connectors_by_language",
57    "get_connectors_with_local_cdk",
58    "get_modified_connectors",
59    "list_connectors",
60    "parse_pr_info",
61    "resolve_diff_range",
62    "strip_prerelease_suffix",
63]
class BumpType(enum.StrEnum):
35class BumpType(StrEnum):
36    """Supported version bump types."""
37
38    PATCH = "patch"
39    MINOR = "minor"
40    MAJOR = "major"
41    PATCH_RC = "patch_rc"
42    MINOR_RC = "minor_rc"
43    MAJOR_RC = "major_rc"
44    RC = "rc"
45    PROMOTE = "promote"

Supported version bump types.

PATCH = <BumpType.PATCH: 'patch'>
MINOR = <BumpType.MINOR: 'minor'>
MAJOR = <BumpType.MAJOR: 'major'>
PATCH_RC = <BumpType.PATCH_RC: 'patch_rc'>
MINOR_RC = <BumpType.MINOR_RC: 'minor_rc'>
MAJOR_RC = <BumpType.MAJOR_RC: 'major_rc'>
RC = <BumpType.RC: 'rc'>
PROMOTE = <BumpType.PROMOTE: 'promote'>
@dataclass(frozen=True)
class ChangelogEntry:
 89@dataclass(frozen=True)
 90class ChangelogEntry:
 91    """A single changelog entry."""
 92
 93    date: datetime.date
 94    version: semver.Version
 95    pr_number: int | str
 96    comment: str
 97
 98    def to_markdown(self, github_repo: str = AIRBYTE_GITHUB_REPO) -> str:
 99        """Convert entry to markdown table row."""
100        return (
101            f"| {self.version} | {self.date.strftime('%Y-%m-%d')} | "
102            f"[{self.pr_number}](https://github.com/{github_repo}/pull/{self.pr_number}) | "
103            f"{self.comment} |"
104        )

A single changelog entry.

ChangelogEntry( date: datetime.date, version: semver.version.Version, pr_number: int | str, comment: str)
date: datetime.date
version: semver.version.Version
pr_number: int | str
comment: str
def to_markdown(self, github_repo: str = 'airbytehq/airbyte') -> str:
 98    def to_markdown(self, github_repo: str = AIRBYTE_GITHUB_REPO) -> str:
 99        """Convert entry to markdown table row."""
100        return (
101            f"| {self.version} | {self.date.strftime('%Y-%m-%d')} | "
102            f"[{self.pr_number}](https://github.com/{github_repo}/pull/{self.pr_number}) | "
103            f"{self.comment} |"
104        )

Convert entry to markdown table row.

class ChangelogParsingError(airbyte_ops_mcp.airbyte_repo.ConnectorVersionError):
72class ChangelogParsingError(ConnectorVersionError):
73    """Raised when changelog cannot be parsed."""
74
75    pass

Raised when changelog cannot be parsed.

class ConnectorLanguage(enum.StrEnum):
 88class ConnectorLanguage(StrEnum):
 89    """Connector implementation languages."""
 90
 91    PYTHON = "python"
 92    JAVA = "java"
 93    LOW_CODE = "low-code"
 94    MANIFEST_ONLY = "manifest-only"
 95
 96    @classmethod
 97    def parse(cls, value: str) -> ConnectorLanguage:
 98        """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch."""
 99        try:
100            return cls(value)
101        except ValueError:
102            valid = ", ".join(f"`{m.value}`" for m in cls)
103            raise ValueError(
104                f"Unrecognized language: {value!r}. Expected one of: {valid}."
105            ) from None

Connector implementation languages.

PYTHON = <ConnectorLanguage.PYTHON: 'python'>
JAVA = <ConnectorLanguage.JAVA: 'java'>
LOW_CODE = <ConnectorLanguage.LOW_CODE: 'low-code'>
MANIFEST_ONLY = <ConnectorLanguage.MANIFEST_ONLY: 'manifest-only'>
@classmethod
def parse(cls, value: str) -> ConnectorLanguage:
 96    @classmethod
 97    def parse(cls, value: str) -> ConnectorLanguage:
 98        """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch."""
 99        try:
100            return cls(value)
101        except ValueError:
102            valid = ", ".join(f"`{m.value}`" for m in cls)
103            raise ValueError(
104                f"Unrecognized language: {value!r}. Expected one of: {valid}."
105            ) from None

Parse a string into a ConnectorLanguage, raising ValueError on mismatch.

@dataclass
class ConnectorListResult:
570@dataclass
571class ConnectorListResult:
572    """Result of listing connectors with filters."""
573
574    connectors: list[str]
575    count: int

Result of listing connectors with filters.

ConnectorListResult(connectors: list[str], count: int)
connectors: list[str]
count: int
class ConnectorNotFoundError(airbyte_ops_mcp.airbyte_repo.ConnectorVersionError):
54class ConnectorNotFoundError(ConnectorVersionError):
55    """Raised when a connector is not found."""
56
57    pass

Raised when a connector is not found.

class ConnectorVersionError(builtins.Exception):
48class ConnectorVersionError(Exception):
49    """Base exception for connector version operations."""
50
51    pass

Base exception for connector version operations.

class InvalidVersionError(airbyte_ops_mcp.airbyte_repo.ConnectorVersionError):
66class InvalidVersionError(ConnectorVersionError):
67    """Raised when a version string is invalid."""
68
69    pass

Raised when a version string is invalid.

@dataclass
class VersionBumpResult:
78@dataclass
79class VersionBumpResult:
80    """Result of a version bump operation."""
81
82    connector: str
83    previous_version: str
84    new_version: str
85    files_modified: list[str]
86    dry_run: bool

Result of a version bump operation.

VersionBumpResult( connector: str, previous_version: str, new_version: str, files_modified: list[str], dry_run: bool)
connector: str
previous_version: str
new_version: str
files_modified: list[str]
dry_run: bool
class VersionNotFoundError(airbyte_ops_mcp.airbyte_repo.ConnectorVersionError):
60class VersionNotFoundError(ConnectorVersionError):
61    """Raised when version cannot be found in a file."""
62
63    pass

Raised when version cannot be found in a file.

def bump_connector_version( repo_path: str | pathlib.Path, connector_name: str, bump_type: Optional[Literal['patch', 'minor', 'major', 'patch_rc', 'minor_rc', 'major_rc', 'rc', 'promote']] = None, new_version: str | None = None, changelog_message: str | None = None, pr_number: int | str | None = None, dry_run: bool = False, no_changelog: bool = False, progressive_rollout_enabled: bool | None = None) -> VersionBumpResult:
679def bump_connector_version(
680    repo_path: str | Path,
681    connector_name: str,
682    bump_type: Literal[
683        "patch", "minor", "major", "patch_rc", "minor_rc", "major_rc", "rc", "promote"
684    ]
685    | None = None,
686    new_version: str | None = None,
687    changelog_message: str | None = None,
688    pr_number: int | str | None = None,
689    dry_run: bool = False,
690    no_changelog: bool = False,
691    progressive_rollout_enabled: bool | None = None,
692) -> VersionBumpResult:
693    """Bump a connector's version across all relevant files.
694
695    This function updates the version in:
696    - metadata.yaml (always)
697    - pyproject.toml (if exists, for Python connectors)
698    - Documentation changelog (if changelog_message provided, doc exists, and
699      no_changelog is False)
700
701    For RC bump types (`patch_rc`, `minor_rc`, `major_rc`, `rc` from a
702    stable version), this also sets `enableProgressiveRollout: true` in
703    metadata so the registry compile step excludes this version from
704    default/latest evaluation while the rollout is in progress.
705
706    For `promote`, this sets `enableProgressiveRollout` to false so the
707    version becomes eligible for default/latest again.
708
709    Args:
710        repo_path: Path to the Airbyte monorepo
711        connector_name: Technical name of the connector (e.g., "source-github")
712        bump_type: Type of version bump (patch, minor, major, patch_rc, minor_rc, major_rc, rc, promote)
713        new_version: Explicit new version (overrides bump_type)
714        changelog_message: Message to add to changelog (optional)
715        pr_number: PR number for changelog entry (optional)
716        dry_run: If True, don't actually modify files
717        no_changelog: If True, skip changelog updates even if changelog_message
718            is provided.  Useful for ephemeral version bumps (e.g. pre-release
719            artifact generation) where the version is temporary.
720        progressive_rollout_enabled: Explicit override for the
721            `enableProgressiveRollout` flag in metadata.yaml.  When `True` or
722            `False`, the flag is set to that value (without touching registry
723            overrides).  When `None` (the default), the automatic behaviour
724            based on `bump_type` is used instead.
725
726    Returns:
727        VersionBumpResult with details of the operation
728
729    Raises:
730        ConnectorNotFoundError: If connector doesn't exist
731        VersionNotFoundError: If current version cannot be found
732        InvalidVersionError: If version format is invalid
733        ValueError: If neither bump_type nor new_version is provided
734    """
735    repo_path = Path(repo_path)
736    connector_path = get_connector_path(repo_path, connector_name)
737
738    # Get current version
739    current_version = get_current_version(connector_path)
740
741    # Calculate new version
742    bump_type_enum = BumpType(bump_type) if bump_type else None
743    calculated_version = calculate_new_version(
744        current_version, bump_type_enum, new_version
745    )
746
747    files_modified: list[str] = []
748    metadata_file = connector_path / METADATA_FILE_NAME
749    if not metadata_file.exists():
750        raise ConnectorNotFoundError(f"metadata.yaml not found at {metadata_file}")
751    metadata_file_rel = str(metadata_file.relative_to(repo_path))
752
753    # Update metadata.yaml
754    if update_metadata_version(connector_path, calculated_version, dry_run):
755        files_modified.append(metadata_file_rel)
756
757    # Update pyproject.toml if it exists
758    if update_pyproject_version(connector_path, calculated_version, dry_run):
759        files_modified.append(
760            f"{CONNECTOR_PATH_PREFIX}/{connector_name}/{PYPROJECT_FILE_NAME}"
761        )
762
763    # Update changelog if message provided and not explicitly skipped
764    if changelog_message and not no_changelog:
765        doc_path = get_connector_doc_path(repo_path, connector_name)
766        if doc_path and update_changelog(
767            doc_path,
768            calculated_version,
769            changelog_message,
770            pr_number,
771            dry_run,
772        ):
773            files_modified.append(str(doc_path.relative_to(repo_path)))
774
775    # Progressive rollout handling
776
777    # Automatic behaviour based on bump_type.
778    # Configure progressive rollout metadata for initial RC bumps
779    # (skip if current version is already an RC — flag is already set)
780    is_current_rc = semver.Version.is_valid(current_version) and _is_rc(
781        semver.Version.parse(current_version)
782    )
783    if (
784        bump_type_enum in _RC_BUMP_TYPES
785        and not is_current_rc
786        and _setup_progressive_rollout_metadata(
787            connector_path=connector_path,
788            previous_ga_version=current_version,
789            dry_run=dry_run,
790        )
791        and metadata_file_rel not in files_modified
792    ):
793        files_modified.append(metadata_file_rel)
794
795    # Clean up progressive rollout metadata on promote (RC → GA)
796    if (
797        bump_type_enum == BumpType.PROMOTE
798        and _cleanup_progressive_rollout_metadata(
799            connector_path=connector_path, dry_run=dry_run
800        )
801        and metadata_file_rel not in files_modified
802    ):
803        files_modified.append(metadata_file_rel)
804
805    # Explicit override — applied last so it takes precedence over
806    # the automatic steps above (e.g. preview builds can force the
807    # flag off even when bump_type would normally enable it).
808    if progressive_rollout_enabled is not None:
809        metadata = load_metadata_yaml(metadata_file)
810        data = metadata.get("data", {})
811        if _set_progressive_rollout_flag(data, progressive_rollout_enabled):
812            metadata["data"] = data
813            if not dry_run:
814                write_metadata_yaml(metadata, metadata_file)
815            if metadata_file_rel not in files_modified:
816                files_modified.append(metadata_file_rel)
817
818    return VersionBumpResult(
819        connector=connector_name,
820        previous_version=current_version,
821        new_version=calculated_version,
822        files_modified=files_modified,
823        dry_run=dry_run,
824    )

Bump a connector's version across all relevant files.

This function updates the version in:

  • metadata.yaml (always)
  • pyproject.toml (if exists, for Python connectors)
  • Documentation changelog (if changelog_message provided, doc exists, and no_changelog is False)

For RC bump types (patch_rc, minor_rc, major_rc, rc from a stable version), this also sets enableProgressiveRollout: true in metadata so the registry compile step excludes this version from default/latest evaluation while the rollout is in progress.

For promote, this sets enableProgressiveRollout to false so the version becomes eligible for default/latest again.

Arguments:
  • repo_path: Path to the Airbyte monorepo
  • connector_name: Technical name of the connector (e.g., "source-github")
  • bump_type: Type of version bump (patch, minor, major, patch_rc, minor_rc, major_rc, rc, promote)
  • new_version: Explicit new version (overrides bump_type)
  • changelog_message: Message to add to changelog (optional)
  • pr_number: PR number for changelog entry (optional)
  • dry_run: If True, don't actually modify files
  • no_changelog: If True, skip changelog updates even if changelog_message is provided. Useful for ephemeral version bumps (e.g. pre-release artifact generation) where the version is temporary.
  • progressive_rollout_enabled: Explicit override for the enableProgressiveRollout flag in metadata.yaml. When True or False, the flag is set to that value (without touching registry overrides). When None (the default), the automatic behaviour based on bump_type is used instead.
Returns:

VersionBumpResult with details of the operation

Raises:
  • ConnectorNotFoundError: If connector doesn't exist
  • VersionNotFoundError: If current version cannot be found
  • InvalidVersionError: If version format is invalid
  • ValueError: If neither bump_type nor new_version is provided
def detect_env_pr_info() -> tuple[int | None, str | None, str | None]:
39def detect_env_pr_info() -> tuple[int | None, str | None, str | None]:
40    """Detect PR number, owner, and repo from GitHub Actions environment variables.
41
42    Returns:
43        Tuple of (pr_number, pr_owner, pr_repo)
44    """
45    # Try to extract PR number from GITHUB_REF (e.g., "refs/pull/123/merge")
46    github_ref = os.getenv("GITHUB_REF", "")
47    m = re.match(r"refs/pull/(\d+)/", github_ref)
48    pr_number = int(m.group(1)) if m else None
49
50    # Try to extract owner and repo from GITHUB_REPOSITORY (e.g., "airbytehq/airbyte")
51    pr_owner = None
52    pr_repo = None
53    github_repo = os.getenv("GITHUB_REPOSITORY", "")
54    if github_repo and "/" in github_repo:
55        parts = github_repo.split("/", 1)
56        pr_owner = parts[0]
57        pr_repo = parts[1]
58
59    return pr_number, pr_owner, pr_repo

Detect PR number, owner, and repo from GitHub Actions environment variables.

Returns:

Tuple of (pr_number, pr_owner, pr_repo)

def get_all_connectors(repo_path: str | pathlib.Path) -> set[str]:
225def get_all_connectors(repo_path: str | Path) -> set[str]:
226    """Get set of all connector IDs in the repository.
227
228    Args:
229        repo_path: Path to the Airbyte monorepo
230
231    Returns:
232        Set of all connector technical names
233
234    Example:
235        >>> all_connectors = get_all_connectors("/path/to/airbyte")
236        >>> "source-faker" in all_connectors
237        True
238    """
239    repo_path = Path(repo_path)
240    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
241
242    if not connectors_dir.exists():
243        raise ValueError(f"Connectors directory not found: {connectors_dir}")
244
245    return {p.name for p in connectors_dir.iterdir() if p.is_dir()}

Get set of all connector IDs in the repository.

Arguments:
  • repo_path: Path to the Airbyte monorepo
Returns:

Set of all connector technical names

Example:
>>> all_connectors = get_all_connectors("/path/to/airbyte")
>>> "source-faker" in all_connectors
True
@lru_cache(maxsize=128)
def get_certified_connectors(repo_path: str | pathlib.Path) -> set[str]:
173@lru_cache(maxsize=128)
174def get_certified_connectors(repo_path: str | Path) -> set[str]:
175    """Get set of all certified connector IDs from metadata.
176
177    This function reads the metadata.yaml file for each connector to determine
178    which connectors have "certified" support level.
179
180    Args:
181        repo_path: Path to the Airbyte monorepo
182
183    Returns:
184        Set of certified connector technical names
185
186    Example:
187        >>> certified = get_certified_connectors("/path/to/airbyte")
188        >>> "source-postgres" in certified
189        True
190    """
191    repo_path = Path(repo_path)
192    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
193
194    if not connectors_dir.exists():
195        raise ValueError(f"Connectors directory not found: {connectors_dir}")
196
197    certified_connectors = set()
198
199    # Iterate through all connector directories
200    for connector_dir in connectors_dir.iterdir():
201        if not connector_dir.is_dir():
202            continue
203
204        metadata_file = connector_dir / METADATA_FILE_NAME
205        if not metadata_file.exists():
206            continue
207
208        # Read metadata to check support level
209        try:
210            import yaml
211
212            with open(metadata_file) as f:
213                metadata = yaml.safe_load(f)
214
215            support_level = metadata.get("data", {}).get("supportLevel")
216            if support_level == "certified":
217                certified_connectors.add(connector_dir.name)
218        except Exception:
219            # Skip connectors with invalid metadata
220            continue
221
222    return certified_connectors

Get set of all certified connector IDs from metadata.

This function reads the metadata.yaml file for each connector to determine which connectors have "certified" support level.

Arguments:
  • repo_path: Path to the Airbyte monorepo
Returns:

Set of certified connector technical names

Example:
>>> certified = get_certified_connectors("/path/to/airbyte")
>>> "source-postgres" in certified
True
def get_connectors_by_language( repo_path: str | pathlib.Path, language: ConnectorLanguage) -> set[str]:
281def get_connectors_by_language(
282    repo_path: str | Path,
283    language: ConnectorLanguage,
284) -> set[str]:
285    """Get set of all connector IDs for a specific language.
286
287    This function reads connector directories to determine which connectors
288    use the specified language.
289
290    Args:
291        repo_path: Path to the Airbyte monorepo
292        language: Language to filter by
293
294    Returns:
295        Set of connector technical names using the specified language
296
297    Example:
298        >>> python_connectors = get_connectors_by_language(
299        ...     "/path/to/airbyte", ConnectorLanguage.PYTHON
300        ... )
301        >>> "source-faker" in python_connectors
302        True
303    """
304    repo_path = Path(repo_path)
305    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
306
307    if not connectors_dir.exists():
308        raise ValueError(f"Connectors directory not found: {connectors_dir}")
309
310    language_value = language.value
311    connectors_by_language = set()
312
313    # Iterate through all connector directories
314    for connector_dir in connectors_dir.iterdir():
315        if not connector_dir.is_dir():
316            continue
317
318        # Determine language based on file structure
319        connector_name = connector_dir.name
320        detected_language = _detect_connector_language(connector_dir, connector_name)
321
322        if detected_language == language_value:
323            connectors_by_language.add(connector_dir.name)
324
325    return connectors_by_language

Get set of all connector IDs for a specific language.

This function reads connector directories to determine which connectors use the specified language.

Arguments:
  • repo_path: Path to the Airbyte monorepo
  • language: Language to filter by
Returns:

Set of connector technical names using the specified language

Example:
>>> python_connectors = get_connectors_by_language(
...     "/path/to/airbyte", ConnectorLanguage.PYTHON
... )
>>> "source-faker" in python_connectors
True
@lru_cache(maxsize=128)
def get_connectors_with_local_cdk(repo_path: str | pathlib.Path) -> set[str]:
463@lru_cache(maxsize=128)
464def get_connectors_with_local_cdk(repo_path: str | Path) -> set[str]:
465    """Get set of connectors using local CDK reference instead of published version.
466
467    This function detects connectors that are configured to use a local CDK
468    checkout rather than a published CDK version. The detection method differs
469    by connector language:
470    - Python: Checks for path-based dependency in pyproject.toml
471    - Java/Kotlin: Checks for useLocalCdk=true in build.gradle files
472
473    Args:
474        repo_path: Path to the Airbyte monorepo
475
476    Returns:
477        Set of connector technical names using local CDK reference
478
479    Example:
480        >>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte")
481        >>> "destination-motherduck" in local_cdk_connectors
482        True
483    """
484    repo_path = Path(repo_path)
485    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
486
487    if not connectors_dir.exists():
488        raise ValueError(f"Connectors directory not found: {connectors_dir}")
489
490    local_cdk_connectors = set()
491
492    # Iterate through all connector directories
493    for connector_dir in connectors_dir.iterdir():
494        if not connector_dir.is_dir():
495            continue
496
497        connector_name = connector_dir.name
498        detected_language = _detect_connector_language(connector_dir, connector_name)
499
500        # Check for local CDK based on language
501        if detected_language in ("python", "low-code") and _has_local_cdk_python(
502            connector_dir
503        ):
504            # Python connectors: check pyproject.toml for path-based CDK dependency
505            local_cdk_connectors.add(connector_name)
506        elif detected_language == "java" and _has_local_cdk_java(connector_dir):
507            # Java/Kotlin connectors: check build.gradle for useLocalCdk=true
508            local_cdk_connectors.add(connector_name)
509
510    return local_cdk_connectors

Get set of connectors using local CDK reference instead of published version.

This function detects connectors that are configured to use a local CDK checkout rather than a published CDK version. The detection method differs by connector language:

  • Python: Checks for path-based dependency in pyproject.toml
  • Java/Kotlin: Checks for useLocalCdk=true in build.gradle files
Arguments:
  • repo_path: Path to the Airbyte monorepo
Returns:

Set of connector technical names using local CDK reference

Example:
>>> local_cdk_connectors = get_connectors_with_local_cdk("/path/to/airbyte")
>>> "destination-motherduck" in local_cdk_connectors
True
def get_modified_connectors( repo_path: str | pathlib.Path, base_ref: str = 'origin/master', head_ref: str = 'HEAD') -> list[str]:
 61def get_modified_connectors(
 62    repo_path: str | Path,
 63    base_ref: str = GIT_DEFAULT_BRANCH,
 64    head_ref: str = "HEAD",
 65) -> list[str]:
 66    """Get list of connector IDs modified according to a local git diff.
 67
 68    This is the *local-only* fallback used when the GitHub API path is not
 69    available (e.g. no PR number or no token).  It uses `git merge-base`
 70    when possible so that only changes introduced on the head branch are
 71    reported, falling back to a plain two-ref diff when the merge-base
 72    cannot be determined (shallow clones without enough history).
 73
 74    Prefer `get_modified_connectors_from_github` whenever a PR number
 75    is known, as it avoids shallow-clone pitfalls entirely.
 76
 77    Args:
 78        repo_path: Path to the Airbyte monorepo.
 79        base_ref: Base git reference to compare against.
 80        head_ref: Head git reference to compare.
 81
 82    Returns:
 83        Sorted list of connector technical names.
 84    """
 85    repo_path = Path(repo_path)
 86
 87    # Try to find the merge-base so the diff only contains changes introduced
 88    # on the head branch, avoiding false positives from unrelated base-branch
 89    # commits.  Falls back to a plain two-ref diff when unavailable.
 90    effective_base = base_ref
 91    try:
 92        mb_result = subprocess.run(
 93            ["git", "merge-base", base_ref, head_ref],
 94            cwd=repo_path,
 95            capture_output=True,
 96            text=True,
 97            check=True,
 98            timeout=30,
 99        )
100        effective_base = mb_result.stdout.strip()
101        logger.debug("Using merge-base %s for diff", effective_base)
102    except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
103        logger.debug(
104            "Could not determine merge-base for %s..%s; falling back to plain diff",
105            base_ref,
106            head_ref,
107        )
108
109    try:
110        result = subprocess.run(
111            ["git", "diff", "--name-only", effective_base, head_ref],
112            cwd=repo_path,
113            capture_output=True,
114            text=True,
115            check=True,
116            timeout=30,
117        )
118        changed_files = result.stdout.strip().split("\n")
119    except subprocess.CalledProcessError as e:
120        raise RuntimeError(f"Failed to get git diff: {e.stderr}") from e
121    except subprocess.TimeoutExpired as e:
122        raise RuntimeError("Git diff command timed out after 30 seconds") from e
123
124    return _extract_connector_names(changed_files)

Get list of connector IDs modified according to a local git diff.

This is the local-only fallback used when the GitHub API path is not available (e.g. no PR number or no token). It uses git merge-base when possible so that only changes introduced on the head branch are reported, falling back to a plain two-ref diff when the merge-base cannot be determined (shallow clones without enough history).

Prefer get_modified_connectors_from_github whenever a PR number is known, as it avoids shallow-clone pitfalls entirely.

Arguments:
  • repo_path: Path to the Airbyte monorepo.
  • base_ref: Base git reference to compare against.
  • head_ref: Head git reference to compare.
Returns:

Sorted list of connector technical names.

def list_connectors( repo_path: str | pathlib.Path, certified: bool | None = None, modified: bool | None = None, language_filter: set[str] | None = None, language_exclude: set[str] | None = None, connector_type: str | None = None, connector_subtype: str | None = None, base_ref: str | None = None, head_ref: str | None = None, pr_number: int | None = None, pr_owner: str | None = None, pr_repo: str | None = None, gh_token: str | None = None) -> ConnectorListResult:
578def list_connectors(
579    repo_path: str | Path,
580    certified: bool | None = None,
581    modified: bool | None = None,
582    language_filter: set[str] | None = None,
583    language_exclude: set[str] | None = None,
584    connector_type: str | None = None,
585    connector_subtype: str | None = None,
586    base_ref: str | None = None,
587    head_ref: str | None = None,
588    pr_number: int | None = None,
589    pr_owner: str | None = None,
590    pr_repo: str | None = None,
591    gh_token: str | None = None,
592) -> ConnectorListResult:
593    """List connectors in the Airbyte monorepo with flexible filtering.
594
595    This is the core capability function that encapsulates all filtering logic.
596    Both CLI and MCP layers should use this function.
597
598    When `modified` filtering is requested and both `gh_token` and full PR
599    context (`pr_number`, `pr_owner`, `pr_repo`) are provided, the
600    GitHub API is used to determine changed files — this is the most reliable
601    method and avoids shallow-clone pitfalls.  Falls back to local `git diff`
602    when the token is not provided or PR context is incomplete.
603
604    Args:
605        repo_path: Path to the Airbyte monorepo
606        certified: Filter by certification status (True=certified only,
607            False=non-certified only, None=all)
608        modified: Filter by modification status (True=modified only,
609            False=not-modified only, None=all)
610        language_filter: Set of languages to include (python, java, low-code, manifest-only)
611        language_exclude: Set of languages to exclude (mutually exclusive with language_filter)
612        connector_type: Filter by connector type (source, destination, or None for all)
613        connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all)
614        base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master")
615        head_ref: Head git reference for modification detection (default: "HEAD")
616        pr_number: Pull request number (enables GitHub API path for modification detection)
617        pr_owner: Repository owner for PR (e.g. "airbytehq")
618        pr_repo: Repository name for PR (e.g. "airbyte")
619        gh_token: GitHub API token. When provided together with PR context,
620            the GitHub API is used instead of local git diff.
621
622    Returns:
623        ConnectorListResult with sorted list of connector names and count
624
625    Raises:
626        ValueError: If both language_filter and language_exclude are provided
627    """
628    # Validate mutual exclusivity of language filters
629    if language_filter is not None and language_exclude is not None:
630        raise ValueError(
631            "Cannot specify both language_filter and language_exclude. "
632            "Only one language filter parameter is accepted."
633        )
634
635    # Start with all connectors
636    result = get_all_connectors(repo_path)
637
638    # Apply certified filter
639    if certified is not None:
640        certified_set = get_certified_connectors(repo_path)
641        if certified:
642            # Include only certified
643            result &= certified_set
644        else:
645            # Include only non-certified
646            result -= certified_set
647
648    # Apply modified filter
649    if modified is not None:
650        changed_set: set[str] | None = None
651
652        # Use GitHub API when token and full PR context are explicitly provided
653        if (
654            gh_token is not None
655            and pr_number is not None
656            and pr_owner is not None
657            and pr_repo is not None
658        ):
659            try:
660                changed_set = set(
661                    get_modified_connectors_from_github(
662                        pr_number, pr_owner, pr_repo, gh_token=gh_token
663                    )
664                )
665            except (GitHubAPIError, requests.RequestException):
666                logger.warning(
667                    "GitHub API call failed for PR %s/%s#%d; "
668                    "falling back to local git diff",
669                    pr_owner,
670                    pr_repo,
671                    pr_number,
672                    exc_info=True,
673                )
674
675        # Fallback to local git diff
676        if changed_set is None:
677            base = base_ref if base_ref is not None else GIT_DEFAULT_BRANCH
678            head = head_ref if head_ref is not None else "HEAD"
679            changed_set = set(get_modified_connectors(repo_path, base, head))
680
681        if modified:
682            # Include only modified
683            result &= changed_set
684        else:
685            # Include only not-modified
686            result -= changed_set
687
688    # Apply language include filter
689    if language_filter:
690        # Get connectors for all specified languages and union them
691        lang_result: set[str] = set()
692        for lang in language_filter:
693            lang_result |= get_connectors_by_language(
694                repo_path, ConnectorLanguage(lang)
695            )
696        result &= lang_result
697
698    # Apply language exclude filter
699    if language_exclude:
700        # Get connectors for all specified languages and exclude them
701        for lang in language_exclude:
702            excluded = get_connectors_by_language(repo_path, ConnectorLanguage(lang))
703            result -= excluded
704
705    # Apply connector type filter
706    if connector_type is not None:
707        type_set = get_connectors_by_type(repo_path, ConnectorType(connector_type))
708        result &= type_set
709
710    # Apply connector subtype filter
711    if connector_subtype is not None:
712        subtype_set = get_connectors_by_subtype(
713            repo_path, ConnectorSubtype(connector_subtype)
714        )
715        result &= subtype_set
716
717    return ConnectorListResult(
718        connectors=sorted(result),
719        count=len(result),
720    )

List connectors in the Airbyte monorepo with flexible filtering.

This is the core capability function that encapsulates all filtering logic. Both CLI and MCP layers should use this function.

When modified filtering is requested and both gh_token and full PR context (pr_number, pr_owner, pr_repo) are provided, the GitHub API is used to determine changed files — this is the most reliable method and avoids shallow-clone pitfalls. Falls back to local git diff when the token is not provided or PR context is incomplete.

Arguments:
  • repo_path: Path to the Airbyte monorepo
  • certified: Filter by certification status (True=certified only, False=non-certified only, None=all)
  • modified: Filter by modification status (True=modified only, False=not-modified only, None=all)
  • language_filter: Set of languages to include (python, java, low-code, manifest-only)
  • language_exclude: Set of languages to exclude (mutually exclusive with language_filter)
  • connector_type: Filter by connector type (source, destination, or None for all)
  • connector_subtype: Filter by connector subtype (api, database, file, custom, or None for all)
  • base_ref: Base git reference for modification detection (default: GIT_DEFAULT_BRANCH, e.g. "origin/master")
  • head_ref: Head git reference for modification detection (default: "HEAD")
  • pr_number: Pull request number (enables GitHub API path for modification detection)
  • pr_owner: Repository owner for PR (e.g. "airbytehq")
  • pr_repo: Repository name for PR (e.g. "airbyte")
  • gh_token: GitHub API token. When provided together with PR context, the GitHub API is used instead of local git diff.
Returns:

ConnectorListResult with sorted list of connector names and count

Raises:
  • ValueError: If both language_filter and language_exclude are provided
def parse_pr_info(pr_num_or_url: str) -> tuple[int | None, str | None, str | None]:
15def parse_pr_info(pr_num_or_url: str) -> tuple[int | None, str | None, str | None]:
16    """Parse PR number, owner, and repo from string or URL.
17
18    Args:
19        pr_num_or_url: PR number (e.g., "123") or URL (e.g., "https://github.com/airbytehq/airbyte/pull/123")
20
21    Returns:
22        Tuple of (pr_number, pr_owner, pr_repo)
23    """
24    # If it's all digits, just return PR number
25    if pr_num_or_url.isdigit():
26        return int(pr_num_or_url), None, None
27
28    # Parse URL: https://github.com/airbytehq/airbyte-enterprise/pull/123
29    m = re.match(r"https://github\.com/([^/]+)/([^/]+)/pull/(\d+)", pr_num_or_url)
30    if m:
31        pr_owner = m.group(1)
32        pr_repo = m.group(2)
33        pr_number = int(m.group(3))
34        return pr_number, pr_owner, pr_repo
35
36    return None, None, None

Parse PR number, owner, and repo from string or URL.

Arguments:
  • pr_num_or_url: PR number (e.g., "123") or URL (e.g., "https://github.com/airbytehq/airbyte/pull/123")
Returns:

Tuple of (pr_number, pr_owner, pr_repo)

def resolve_diff_range( pr_num_or_url: str | None) -> tuple[str, str, int | None, str | None, str | None]:
 62def resolve_diff_range(
 63    pr_num_or_url: str | None,
 64) -> tuple[str, str, int | None, str | None, str | None]:
 65    """Resolve PR info to base_ref and head_ref for git diff.
 66
 67    Args:
 68        pr_num_or_url: PR number, URL, or None to auto-detect from environment
 69
 70    Returns:
 71        Tuple of (base_ref, head_ref, pr_number, pr_owner, pr_repo)
 72    """
 73    pr_number = None
 74    pr_owner = None
 75    pr_repo = None
 76
 77    # Try to get PR info from parameter or environment
 78    if pr_num_or_url:
 79        pr_number, pr_owner, pr_repo = parse_pr_info(pr_num_or_url)
 80    else:
 81        pr_number, pr_owner, pr_repo = detect_env_pr_info()
 82
 83    # Backfill owner/repo from GITHUB_REPOSITORY when only PR number is known
 84    # (e.g. --pr 75600 without a full URL, running inside GitHub Actions)
 85    if pr_number is not None and (pr_owner is None or pr_repo is None):
 86        _env_number, env_owner, env_repo = detect_env_pr_info()
 87        if env_owner and env_repo:
 88            pr_owner = pr_owner or env_owner
 89            pr_repo = pr_repo or env_repo
 90
 91    # Determine base_ref and head_ref based on PR detection
 92    if pr_number is not None:
 93        # PR detected - use origin/{base_branch} vs HEAD (assumes CI checked out the PR)
 94        # Use GITHUB_BASE_REF if available (set by GitHub Actions for PRs)
 95        # This handles repos with different default branches (main, master, etc.)
 96        base_branch = os.getenv("GITHUB_BASE_REF") or "master"
 97        base_ref = f"origin/{base_branch}"
 98        head_ref = "HEAD"
 99    else:
100        # No PR detected - fallback to HEAD~1 vs HEAD (post-merge use case)
101        base_ref = "HEAD~1"
102        head_ref = "HEAD"
103
104    return base_ref, head_ref, pr_number, pr_owner, pr_repo

Resolve PR info to base_ref and head_ref for git diff.

Arguments:
  • pr_num_or_url: PR number, URL, or None to auto-detect from environment
Returns:

Tuple of (base_ref, head_ref, pr_number, pr_owner, pr_repo)

def strip_prerelease_suffix(version_string: str) -> str:
159def strip_prerelease_suffix(version_string: str) -> str:
160    """Strip any pre-release suffix from a version string.
161
162    Returns the base version (major.minor.patch) without any pre-release
163    or build metadata suffixes. This is useful when transforming a version
164    from one pre-release state to another (e.g., from -rc.1 to -preview.{sha}).
165
166    Args:
167        version_string: A semver version string, possibly with a pre-release suffix
168            (e.g., "2.23.16-rc.1", "1.2.3-preview.abc1234", "1.0.0")
169
170    Returns:
171        The base version without any pre-release suffix (e.g., "2.23.16", "1.2.3", "1.0.0")
172
173    Raises:
174        InvalidVersionError: If the version string is not valid semver
175    """
176    try:
177        version = semver.Version.parse(version_string)
178    except ValueError as e:
179        raise InvalidVersionError(f"Cannot parse version: {version_string}") from e
180
181    return str(version.finalize_version())

Strip any pre-release suffix from a version string.

Returns the base version (major.minor.patch) without any pre-release or build metadata suffixes. This is useful when transforming a version from one pre-release state to another (e.g., from -rc.1 to -preview.{sha}).

Arguments:
  • version_string: A semver version string, possibly with a pre-release suffix (e.g., "2.23.16-rc.1", "1.2.3-preview.abc1234", "1.0.0")
Returns:

The base version without any pre-release suffix (e.g., "2.23.16", "1.2.3", "1.0.0")

Raises:
  • InvalidVersionError: If the version string is not valid semver