airbyte_ops_mcp.registry

Registry operations for Airbyte connectors.

This package provides functionality for:

  • Reading connector metadata from the GCS registry
  • Listing connectors and versions in the registry
  • Publishing connector metadata to GCS
  • Promoting and rolling back release candidates
  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Registry operations for Airbyte connectors.
  3
  4This package provides functionality for:
  5- Reading connector metadata from the GCS registry
  6- Listing connectors and versions in the registry
  7- Publishing connector metadata to GCS
  8- Promoting and rolling back release candidates
  9"""
 10
 11from __future__ import annotations
 12
 13from airbyte_ops_mcp.registry._constants import (
 14    DEFAULT_METADATA_SERVICE_BUCKET_NAME,
 15    DEV_METADATA_SERVICE_BUCKET_NAME,
 16    LATEST_GCS_FOLDER_NAME,
 17    METADATA_FILE_NAME,
 18    METADATA_FOLDER,
 19    PROD_METADATA_SERVICE_BUCKET_NAME,
 20    RELEASE_CANDIDATE_GCS_FOLDER_NAME,
 21    SONAR_DEV_BUCKET_NAME,
 22    SONAR_PROD_BUCKET_NAME,
 23)
 24from airbyte_ops_mcp.registry._enums import (
 25    ConnectorLanguage,
 26    ConnectorType,
 27    SupportLevel,
 28)
 29from airbyte_ops_mcp.registry.audit import (
 30    AuditResult,
 31    UnpublishedConnector,
 32    find_unpublished_connectors,
 33)
 34from airbyte_ops_mcp.registry.compile import (
 35    CompileResult,
 36    PurgeLatestResult,
 37    compile_registry,
 38    purge_latest_dirs,
 39)
 40from airbyte_ops_mcp.registry.generate import (
 41    GenerateResult,
 42    generate_version_artifacts,
 43)
 44from airbyte_ops_mcp.registry.models import (
 45    ConnectorListResult,
 46    ConnectorMetadata,
 47    ConnectorPublishResult,
 48    MetadataPublishResult,
 49    RegistryEntryResult,
 50    VersionListResult,
 51)
 52from airbyte_ops_mcp.registry.operations import (
 53    get_registry_entry,
 54    get_registry_spec,
 55    list_connector_versions,
 56    list_registry_connectors,
 57    list_registry_connectors_filtered,
 58)
 59from airbyte_ops_mcp.registry.publish import (
 60    CONNECTOR_PATH_PREFIX,
 61    create_progressive_rollout_blob,
 62    delete_progressive_rollout_blob,
 63    get_connector_metadata,
 64    get_gcs_publish_path,
 65    is_valid_for_progressive_rollout,
 66    publish_connector_metadata,
 67    strip_rc_suffix,
 68)
 69from airbyte_ops_mcp.registry.publish_artifacts import (
 70    PublishArtifactsResult,
 71    publish_version_artifacts,
 72)
 73from airbyte_ops_mcp.registry.rebuild import (
 74    OutputMode,
 75    RebuildResult,
 76    rebuild_registry,
 77)
 78from airbyte_ops_mcp.registry.registry_store_base import (
 79    Registry,
 80    get_registry,
 81)
 82from airbyte_ops_mcp.registry.store import (
 83    REGISTRY_STORE_ENV_VAR,
 84    RegistryStore,
 85    StoreType,
 86    resolve_registry_store,
 87)
 88from airbyte_ops_mcp.registry.validate import (
 89    ValidateOptions,
 90    ValidationResult,
 91    validate_metadata,
 92)
 93from airbyte_ops_mcp.registry.yank import (
 94    YANK_FILE_NAME,
 95    YankResult,
 96    unyank_connector_version,
 97    yank_connector_version,
 98)
 99
100__all__ = [
101    "CONNECTOR_PATH_PREFIX",
102    "DEFAULT_METADATA_SERVICE_BUCKET_NAME",
103    "DEV_METADATA_SERVICE_BUCKET_NAME",
104    "LATEST_GCS_FOLDER_NAME",
105    "METADATA_FILE_NAME",
106    "METADATA_FOLDER",
107    "PROD_METADATA_SERVICE_BUCKET_NAME",
108    "REGISTRY_STORE_ENV_VAR",
109    "RELEASE_CANDIDATE_GCS_FOLDER_NAME",
110    "SONAR_DEV_BUCKET_NAME",
111    "SONAR_PROD_BUCKET_NAME",
112    "YANK_FILE_NAME",
113    "AuditResult",
114    "CompileResult",
115    "ConnectorLanguage",
116    "ConnectorListResult",
117    "ConnectorMetadata",
118    "ConnectorPublishResult",
119    "ConnectorType",
120    "GenerateResult",
121    "MetadataPublishResult",
122    "OutputMode",
123    "PublishArtifactsResult",
124    "PurgeLatestResult",
125    "RebuildResult",
126    "Registry",
127    "RegistryEntryResult",
128    "RegistryStore",
129    "StoreType",
130    "SupportLevel",
131    "UnpublishedConnector",
132    "ValidateOptions",
133    "ValidationResult",
134    "VersionListResult",
135    "YankResult",
136    "compile_registry",
137    "create_progressive_rollout_blob",
138    "delete_progressive_rollout_blob",
139    "find_unpublished_connectors",
140    "generate_version_artifacts",
141    "get_connector_metadata",
142    "get_gcs_publish_path",
143    "get_registry",
144    "get_registry_entry",
145    "get_registry_spec",
146    "is_valid_for_progressive_rollout",
147    "list_connector_versions",
148    "list_registry_connectors",
149    "list_registry_connectors_filtered",
150    "publish_connector_metadata",
151    "publish_version_artifacts",
152    "purge_latest_dirs",
153    "rebuild_registry",
154    "resolve_registry_store",
155    "strip_rc_suffix",
156    "unyank_connector_version",
157    "validate_metadata",
158    "yank_connector_version",
159]
CONNECTOR_PATH_PREFIX = 'airbyte-integrations/connectors'
DEFAULT_METADATA_SERVICE_BUCKET_NAME = 'dev-airbyte-cloud-connector-metadata-service-2'
DEV_METADATA_SERVICE_BUCKET_NAME = 'dev-airbyte-cloud-connector-metadata-service-2'
LATEST_GCS_FOLDER_NAME = 'latest'
METADATA_FILE_NAME = 'metadata.yaml'
METADATA_FOLDER = 'metadata'
PROD_METADATA_SERVICE_BUCKET_NAME = 'prod-airbyte-cloud-connector-metadata-service'
REGISTRY_STORE_ENV_VAR = 'AIRBYTE_REGISTRY_STORE'
RELEASE_CANDIDATE_GCS_FOLDER_NAME = 'release_candidate'
SONAR_DEV_BUCKET_NAME = 'airbyte-connector-registry-dev'
SONAR_PROD_BUCKET_NAME = 'airbyte-connector-registry'
YANK_FILE_NAME = 'version-yank.yml'
@dataclass
class AuditResult:
37@dataclass
38class AuditResult:
39    """Result of auditing which connectors have unpublished versions."""
40
41    unpublished: list[UnpublishedConnector] = field(default_factory=list)
42    checked_count: int = 0
43    skipped_archived: list[str] = field(default_factory=list)
44    skipped_rc: list[str] = field(default_factory=list)
45    skipped_disabled: list[str] = field(default_factory=list)
46    errors: list[str] = field(default_factory=list)

Result of auditing which connectors have unpublished versions.

AuditResult( unpublished: list[UnpublishedConnector] = <factory>, checked_count: int = 0, skipped_archived: list[str] = <factory>, skipped_rc: list[str] = <factory>, skipped_disabled: list[str] = <factory>, errors: list[str] = <factory>)
unpublished: list[UnpublishedConnector]
checked_count: int = 0
skipped_archived: list[str]
skipped_rc: list[str]
skipped_disabled: list[str]
errors: list[str]
@dataclass
class CompileResult:
 93@dataclass
 94class CompileResult:
 95    """Result of a registry compile operation."""
 96
 97    target: str
 98    connectors_scanned: int = 0
 99    versions_found: int = 0
100    yanked_versions: int = 0
101    latest_updated: int = 0
102    latest_already_current: int = 0
103    cloud_registry_entries: int = 0
104    oss_registry_entries: int = 0
105    composite_registry_entries: int = 0
106    version_indexes_written: int = 0
107    specs_secrets_mask_properties: int = 0
108    errors: list[str] = field(default_factory=list)
109    dry_run: bool = False
110
111    @property
112    def status(self) -> str:
113        if self.dry_run:
114            return "dry-run"
115        if self.errors:
116            return "completed-with-errors"
117        return "success"
118
119    def summary(self) -> str:
120        return (
121            f"[{self.status}] Scanned {self.connectors_scanned} connectors, "
122            f"{self.versions_found} versions ({self.yanked_versions} yanked). "
123            f"Latest updated: {self.latest_updated}, "
124            f"already current: {self.latest_already_current}. "
125            f"Registry entries: cloud={self.cloud_registry_entries}, "
126            f"oss={self.oss_registry_entries}, "
127            f"composite={self.composite_registry_entries}. "
128            f"Version indexes: {self.version_indexes_written}. "
129            f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. "
130            f"Errors: {len(self.errors)}."
131        )

Result of a registry compile operation.

CompileResult( target: str, connectors_scanned: int = 0, versions_found: int = 0, yanked_versions: int = 0, latest_updated: int = 0, latest_already_current: int = 0, cloud_registry_entries: int = 0, oss_registry_entries: int = 0, composite_registry_entries: int = 0, version_indexes_written: int = 0, specs_secrets_mask_properties: int = 0, errors: list[str] = <factory>, dry_run: bool = False)
target: str
connectors_scanned: int = 0
versions_found: int = 0
yanked_versions: int = 0
latest_updated: int = 0
latest_already_current: int = 0
cloud_registry_entries: int = 0
oss_registry_entries: int = 0
composite_registry_entries: int = 0
version_indexes_written: int = 0
specs_secrets_mask_properties: int = 0
errors: list[str]
dry_run: bool = False
status: str
111    @property
112    def status(self) -> str:
113        if self.dry_run:
114            return "dry-run"
115        if self.errors:
116            return "completed-with-errors"
117        return "success"
def summary(self) -> str:
119    def summary(self) -> str:
120        return (
121            f"[{self.status}] Scanned {self.connectors_scanned} connectors, "
122            f"{self.versions_found} versions ({self.yanked_versions} yanked). "
123            f"Latest updated: {self.latest_updated}, "
124            f"already current: {self.latest_already_current}. "
125            f"Registry entries: cloud={self.cloud_registry_entries}, "
126            f"oss={self.oss_registry_entries}, "
127            f"composite={self.composite_registry_entries}. "
128            f"Version indexes: {self.version_indexes_written}. "
129            f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. "
130            f"Errors: {len(self.errors)}."
131        )
class ConnectorLanguage(enum.StrEnum):
 88class ConnectorLanguage(StrEnum):
 89    """Connector implementation languages."""
 90
 91    PYTHON = "python"
 92    JAVA = "java"
 93    LOW_CODE = "low-code"
 94    MANIFEST_ONLY = "manifest-only"
 95
 96    @classmethod
 97    def parse(cls, value: str) -> ConnectorLanguage:
 98        """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch."""
 99        try:
100            return cls(value)
101        except ValueError:
102            valid = ", ".join(f"`{m.value}`" for m in cls)
103            raise ValueError(
104                f"Unrecognized language: {value!r}. Expected one of: {valid}."
105            ) from None

Connector implementation languages.

PYTHON = <ConnectorLanguage.PYTHON: 'python'>
JAVA = <ConnectorLanguage.JAVA: 'java'>
LOW_CODE = <ConnectorLanguage.LOW_CODE: 'low-code'>
MANIFEST_ONLY = <ConnectorLanguage.MANIFEST_ONLY: 'manifest-only'>
@classmethod
def parse(cls, value: str) -> ConnectorLanguage:
 96    @classmethod
 97    def parse(cls, value: str) -> ConnectorLanguage:
 98        """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch."""
 99        try:
100            return cls(value)
101        except ValueError:
102            valid = ", ".join(f"`{m.value}`" for m in cls)
103            raise ValueError(
104                f"Unrecognized language: {value!r}. Expected one of: {valid}."
105            ) from None

Parse a string into a ConnectorLanguage, raising ValueError on mismatch.

class ConnectorListResult(pydantic.main.BaseModel):
106class ConnectorListResult(BaseModel):
107    """Result of listing connectors in the registry."""
108
109    bucket_name: str = Field(description="The GCS bucket name")
110    connector_count: int = Field(description="Number of connectors found")
111    connectors: list[str] = Field(description="List of connector names")

Result of listing connectors in the registry.

bucket_name: str = PydanticUndefined

The GCS bucket name

connector_count: int = PydanticUndefined

Number of connectors found

connectors: list[str] = PydanticUndefined

List of connector names

class ConnectorMetadata(pydantic.main.BaseModel):
16class ConnectorMetadata(BaseModel):
17    """Connector metadata from metadata.yaml.
18
19    This model represents the essential metadata about a connector
20    read from its metadata.yaml file in the Airbyte monorepo.
21    """
22
23    name: str = Field(description="The connector technical name")
24    docker_repository: str = Field(description="The Docker repository")
25    docker_image_tag: str = Field(description="The Docker image tag/version")
26    support_level: str | None = Field(
27        default=None, description="The support level (certified, community, etc.)"
28    )
29    definition_id: str | None = Field(
30        default=None, description="The connector definition ID"
31    )

Connector metadata from metadata.yaml.

This model represents the essential metadata about a connector read from its metadata.yaml file in the Airbyte monorepo.

name: str = PydanticUndefined

The connector technical name

docker_repository: str = PydanticUndefined

The Docker repository

docker_image_tag: str = PydanticUndefined

The Docker image tag/version

support_level: str | None = None

The support level (certified, community, etc.)

definition_id: str | None = None

The connector definition ID

class ConnectorPublishResult(pydantic.main.BaseModel):
34class ConnectorPublishResult(BaseModel):
35    """Result of a connector publish operation.
36
37    This model provides detailed information about the outcome of a
38    connector publish operation (apply or rollback version override).
39    """
40
41    connector: str = Field(description="The connector technical name")
42    version: str = Field(description="The connector version")
43    action: Literal["progressive-rollout-create", "progressive-rollout-cleanup"] = (
44        Field(description="The action performed")
45    )
46    status: Literal["success", "failure", "dry-run"] = Field(
47        description="The status of the operation"
48    )
49    docker_image: str | None = Field(
50        default=None, description="The Docker image name if applicable"
51    )
52    registry_updated: bool = Field(
53        default=False, description="Whether the registry was updated"
54    )
55    message: str | None = Field(default=None, description="Additional status message")
56
57    def __str__(self) -> str:
58        """Return a string representation of the publish result."""
59        status_prefix = "dry-run" if self.status == "dry-run" else self.status
60        return f"[{status_prefix}] {self.connector}:{self.version} - {self.action}"

Result of a connector publish operation.

This model provides detailed information about the outcome of a connector publish operation (apply or rollback version override).

connector: str = PydanticUndefined

The connector technical name

version: str = PydanticUndefined

The connector version

action: Literal['progressive-rollout-create', 'progressive-rollout-cleanup'] = PydanticUndefined

The action performed

status: Literal['success', 'failure', 'dry-run'] = PydanticUndefined

The status of the operation

docker_image: str | None = None

The Docker image name if applicable

registry_updated: bool = False

Whether the registry was updated

message: str | None = None

Additional status message

class ConnectorType(enum.StrEnum):
70class ConnectorType(StrEnum):
71    """Connector type: source or destination."""
72
73    SOURCE = "source"
74    DESTINATION = "destination"
75
76    @classmethod
77    def parse(cls, value: str) -> ConnectorType:
78        """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch."""
79        try:
80            return cls(value)
81        except ValueError:
82            valid = ", ".join(f"`{m.value}`" for m in cls)
83            raise ValueError(
84                f"Unrecognized connector type: {value!r}. Expected one of: {valid}."
85            ) from None

Connector type: source or destination.

SOURCE = <ConnectorType.SOURCE: 'source'>
DESTINATION = <ConnectorType.DESTINATION: 'destination'>
@classmethod
def parse(cls, value: str) -> ConnectorType:
76    @classmethod
77    def parse(cls, value: str) -> ConnectorType:
78        """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch."""
79        try:
80            return cls(value)
81        except ValueError:
82            valid = ", ".join(f"`{m.value}`" for m in cls)
83            raise ValueError(
84                f"Unrecognized connector type: {value!r}. Expected one of: {valid}."
85            ) from None

Parse a string into a ConnectorType, raising ValueError on mismatch.

@dataclass
class GenerateResult:
100@dataclass
101class GenerateResult:
102    """Result of a local artifact generation run."""
103
104    connector_name: str
105    version: str
106    docker_image: str
107    output_dir: str
108    artifacts_written: list[str] = field(default_factory=list)
109    errors: list[str] = field(default_factory=list)
110    validation_errors: list[str] = field(default_factory=list)
111    dry_run: bool = False
112
113    @property
114    def success(self) -> bool:
115        return len(self.errors) == 0 and len(self.validation_errors) == 0
116
117    def to_dict(self) -> dict[str, Any]:
118        return {
119            "connector_name": self.connector_name,
120            "version": self.version,
121            "docker_image": self.docker_image,
122            "output_dir": self.output_dir,
123            "artifacts_written": self.artifacts_written,
124            "errors": self.errors,
125            "validation_errors": self.validation_errors,
126            "dry_run": self.dry_run,
127            "success": self.success,
128        }

Result of a local artifact generation run.

GenerateResult( connector_name: str, version: str, docker_image: str, output_dir: str, artifacts_written: list[str] = <factory>, errors: list[str] = <factory>, validation_errors: list[str] = <factory>, dry_run: bool = False)
connector_name: str
version: str
docker_image: str
output_dir: str
artifacts_written: list[str]
errors: list[str]
validation_errors: list[str]
dry_run: bool = False
success: bool
113    @property
114    def success(self) -> bool:
115        return len(self.errors) == 0 and len(self.validation_errors) == 0
def to_dict(self) -> dict[str, typing.Any]:
117    def to_dict(self) -> dict[str, Any]:
118        return {
119            "connector_name": self.connector_name,
120            "version": self.version,
121            "docker_image": self.docker_image,
122            "output_dir": self.output_dir,
123            "artifacts_written": self.artifacts_written,
124            "errors": self.errors,
125            "validation_errors": self.validation_errors,
126            "dry_run": self.dry_run,
127            "success": self.success,
128        }
class MetadataPublishResult(pydantic.main.BaseModel):
63class MetadataPublishResult(BaseModel):
64    """Result of a metadata publish operation to GCS.
65
66    This model provides detailed information about the outcome of
67    publishing connector metadata to the registry.
68    """
69
70    connector_name: str = Field(description="The connector technical name")
71    version: str = Field(description="The version that was published")
72    bucket_name: str = Field(description="The GCS bucket name")
73    versioned_path: str = Field(description="The versioned GCS path")
74    latest_path: str | None = Field(
75        default=None, description="The latest GCS path if updated"
76    )
77    versioned_uploaded: bool = Field(
78        default=False, description="Whether the versioned metadata was uploaded"
79    )
80    latest_uploaded: bool = Field(
81        default=False, description="Whether the latest metadata was uploaded"
82    )
83    status: Literal["success", "dry-run", "already-up-to-date"] = Field(
84        description="The status of the operation"
85    )
86    message: str = Field(description="Status message describing the outcome")
87
88    def __str__(self) -> str:
89        """Return a string representation of the publish result."""
90        return f"[{self.status}] {self.connector_name}:{self.version} -> {self.versioned_path}"

Result of a metadata publish operation to GCS.

This model provides detailed information about the outcome of publishing connector metadata to the registry.

connector_name: str = PydanticUndefined

The connector technical name

version: str = PydanticUndefined

The version that was published

bucket_name: str = PydanticUndefined

The GCS bucket name

versioned_path: str = PydanticUndefined

The versioned GCS path

latest_path: str | None = None

The latest GCS path if updated

versioned_uploaded: bool = False

Whether the versioned metadata was uploaded

latest_uploaded: bool = False

Whether the latest metadata was uploaded

status: Literal['success', 'dry-run', 'already-up-to-date'] = PydanticUndefined

The status of the operation

message: str = PydanticUndefined

Status message describing the outcome

OutputMode = typing.Literal['local', 'gcs', 's3']
@dataclass
class PublishArtifactsResult:
46@dataclass
47class PublishArtifactsResult:
48    """Result of a version-artifacts publish operation."""
49
50    connector_name: str
51    version: str
52    target: str
53    gcs_destination: str
54    files_uploaded: list[str] = field(default_factory=list)
55    errors: list[str] = field(default_factory=list)
56    validation_errors: list[str] = field(default_factory=list)
57    dry_run: bool = False
58
59    @property
60    def success(self) -> bool:
61        return len(self.errors) == 0 and len(self.validation_errors) == 0
62
63    @property
64    def status(self) -> str:
65        if self.dry_run:
66            return "dry-run"
67        if self.errors or self.validation_errors:
68            return "completed-with-errors"
69        return "success"

Result of a version-artifacts publish operation.

PublishArtifactsResult( connector_name: str, version: str, target: str, gcs_destination: str, files_uploaded: list[str] = <factory>, errors: list[str] = <factory>, validation_errors: list[str] = <factory>, dry_run: bool = False)
connector_name: str
version: str
target: str
gcs_destination: str
files_uploaded: list[str]
errors: list[str]
validation_errors: list[str]
dry_run: bool = False
success: bool
59    @property
60    def success(self) -> bool:
61        return len(self.errors) == 0 and len(self.validation_errors) == 0
status: str
63    @property
64    def status(self) -> str:
65        if self.dry_run:
66            return "dry-run"
67        if self.errors or self.validation_errors:
68            return "completed-with-errors"
69        return "success"
@dataclass
class PurgeLatestResult:
134@dataclass
135class PurgeLatestResult:
136    """Result of a purge-latest operation."""
137
138    target: str
139    connectors_found: int = 0
140    latest_dirs_deleted: int = 0
141    errors: list[str] = field(default_factory=list)
142    dry_run: bool = False
143
144    @property
145    def status(self) -> str:
146        if self.dry_run:
147            return "dry-run"
148        if self.errors:
149            return "completed-with-errors"
150        return "success"
151
152    def summary(self) -> str:
153        return (
154            f"[{self.status}] Found {self.connectors_found} connectors, "
155            f"deleted {self.latest_dirs_deleted} latest/ directories. "
156            f"Errors: {len(self.errors)}."
157        )

Result of a purge-latest operation.

PurgeLatestResult( target: str, connectors_found: int = 0, latest_dirs_deleted: int = 0, errors: list[str] = <factory>, dry_run: bool = False)
target: str
connectors_found: int = 0
latest_dirs_deleted: int = 0
errors: list[str]
dry_run: bool = False
status: str
144    @property
145    def status(self) -> str:
146        if self.dry_run:
147            return "dry-run"
148        if self.errors:
149            return "completed-with-errors"
150        return "success"
def summary(self) -> str:
152    def summary(self) -> str:
153        return (
154            f"[{self.status}] Found {self.connectors_found} connectors, "
155            f"deleted {self.latest_dirs_deleted} latest/ directories. "
156            f"Errors: {len(self.errors)}."
157        )
@dataclass
class RebuildResult:
43@dataclass
44class RebuildResult:
45    """Result of a registry rebuild operation."""
46
47    source_bucket: str
48    output_mode: OutputMode
49    output_root: str
50    connectors_processed: int = 0
51    blobs_copied: int = 0
52    blobs_skipped: int = 0
53    errors: list[str] = field(default_factory=list)
54    dry_run: bool = False
55
56    @property
57    def status(self) -> str:
58        """Return the status of the rebuild operation."""
59        if self.dry_run:
60            return "dry-run"
61        if self.errors:
62            return "completed-with-errors"
63        return "success"
64
65    def summary(self) -> str:
66        """Return a human-readable summary."""
67        return (
68            f"[{self.status}] Rebuilt {self.connectors_processed} connectors, "
69            f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, "
70            f"{len(self.errors)} errors. Output: {self.output_root}"
71        )

Result of a registry rebuild operation.

RebuildResult( source_bucket: str, output_mode: Literal['local', 'gcs', 's3'], output_root: str, connectors_processed: int = 0, blobs_copied: int = 0, blobs_skipped: int = 0, errors: list[str] = <factory>, dry_run: bool = False)
source_bucket: str
output_mode: Literal['local', 'gcs', 's3']
output_root: str
connectors_processed: int = 0
blobs_copied: int = 0
blobs_skipped: int = 0
errors: list[str]
dry_run: bool = False
status: str
56    @property
57    def status(self) -> str:
58        """Return the status of the rebuild operation."""
59        if self.dry_run:
60            return "dry-run"
61        if self.errors:
62            return "completed-with-errors"
63        return "success"

Return the status of the rebuild operation.

def summary(self) -> str:
65    def summary(self) -> str:
66        """Return a human-readable summary."""
67        return (
68            f"[{self.status}] Rebuilt {self.connectors_processed} connectors, "
69            f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, "
70            f"{len(self.errors)} errors. Output: {self.output_root}"
71        )

Return a human-readable summary.

class Registry(abc.ABC):
 38class Registry(ABC):
 39    """A configured connector registry store.
 40
 41    A Registry is bound to a specific `airbyte_ops_mcp.registry.store.RegistryStore`
 42    (store type + env + optional prefix), and provides methods used by the CLI to
 43    read/write registry contents.
 44    """
 45
 46    def __init__(self, store: RegistryStore) -> None:
 47        self.store = store
 48
 49    @property
 50    def store_type(self) -> StoreType:
 51        return self.store.store_type
 52
 53    @property
 54    def bucket_name(self) -> str:
 55        return self.store.bucket
 56
 57    @property
 58    def prefix(self) -> str:
 59        return self.store.prefix
 60
 61    def _require_no_prefix(self, op_name: str) -> None:
 62        """Raise if this op doesn't support prefixed targets."""
 63
 64        if self.prefix:
 65            raise NotImplementedError(
 66                f"Operation '{op_name}' does not yet support store prefixes (got prefix='{self.prefix}')."
 67            )
 68
 69    # ---------------------------------------------------------------------
 70    # Read operations
 71    # ---------------------------------------------------------------------
 72
 73    @abstractmethod
 74    def list_connectors(
 75        self,
 76        *,
 77        support_level: SupportLevel | None = None,
 78        min_support_level: SupportLevel | None = None,
 79        connector_type: ConnectorType | None = None,
 80        language: ConnectorLanguage | None = None,
 81    ) -> list[str]:
 82        raise NotImplementedError(
 83            _op_not_implemented_message(self.store_type, "list_connectors")
 84        )
 85
 86    def list_connector_versions(self, connector_name: str) -> list[str]:
 87        raise NotImplementedError(
 88            _op_not_implemented_message(self.store_type, "list_connector_versions")
 89        )
 90
 91    def get_connector_metadata(
 92        self, connector_name: str, version: str = "latest"
 93    ) -> dict[str, Any]:
 94        raise NotImplementedError(
 95            _op_not_implemented_message(self.store_type, "get_connector_metadata")
 96        )
 97
 98    # ---------------------------------------------------------------------
 99    # Write / mutate operations
100    # ---------------------------------------------------------------------
101
102    def progressive_rollout_create(
103        self,
104        repo_path: Path,
105        connector_name: str,
106        dry_run: bool = False,
107    ) -> ConnectorPublishResult:
108        raise NotImplementedError(
109            _op_not_implemented_message(self.store_type, "progressive_rollout_create")
110        )
111
112    def progressive_rollout_cleanup(
113        self,
114        repo_path: Path,
115        connector_name: str,
116        dry_run: bool = False,
117    ) -> ConnectorPublishResult:
118        raise NotImplementedError(
119            _op_not_implemented_message(self.store_type, "progressive_rollout_cleanup")
120        )
121
122    def yank(
123        self,
124        connector_name: str,
125        version: str,
126        reason: str = "",
127        dry_run: bool = False,
128    ) -> YankResult:
129        raise NotImplementedError(_op_not_implemented_message(self.store_type, "yank"))
130
131    def unyank(
132        self,
133        connector_name: str,
134        version: str,
135        dry_run: bool = False,
136    ) -> YankResult:
137        raise NotImplementedError(
138            _op_not_implemented_message(self.store_type, "unyank")
139        )
140
141    def publish_version_artifacts(
142        self,
143        connector_name: str,
144        version: str,
145        artifacts_dir: Path,
146        dry_run: bool = False,
147        with_validate: bool = True,
148    ) -> PublishArtifactsResult:
149        raise NotImplementedError(
150            _op_not_implemented_message(self.store_type, "publish_version_artifacts")
151        )
152
153    def delete_dev_latest(
154        self,
155        connector_name: list[str] | None = None,
156        dry_run: bool = False,
157    ) -> PurgeLatestResult:
158        raise NotImplementedError(
159            _op_not_implemented_message(self.store_type, "delete_dev_latest")
160        )
161
162    def compile(
163        self,
164        connector_name: list[str] | None = None,
165        dry_run: bool = False,
166        with_secrets_mask: bool = False,
167        with_legacy_migration: str | None = None,
168        force: bool = False,
169    ) -> CompileResult:
170        raise NotImplementedError(
171            _op_not_implemented_message(self.store_type, "compile")
172        )
173
174    def marketing_stubs_check(self, repo_root: Path) -> dict[str, Any]:
175        raise NotImplementedError(
176            _op_not_implemented_message(self.store_type, "marketing_stubs_check")
177        )
178
179    def marketing_stubs_sync(
180        self,
181        repo_root: Path,
182        dry_run: bool = False,
183    ) -> dict[str, Any]:
184        raise NotImplementedError(
185            _op_not_implemented_message(self.store_type, "marketing_stubs_sync")
186        )
187
188    def mirror(
189        self,
190        output_mode: OutputMode,
191        output_path_root: str | None = None,
192        gcs_bucket: str | None = None,
193        s3_bucket: str | None = None,
194        dry_run: bool = False,
195        connector_name: list[str] | None = None,
196    ) -> RebuildResult:
197        raise NotImplementedError(
198            _op_not_implemented_message(self.store_type, "mirror")
199        )

A configured connector registry store.

A Registry is bound to a specific airbyte_ops_mcp.registry.store.RegistryStore (store type + env + optional prefix), and provides methods used by the CLI to read/write registry contents.

store
store_type: StoreType
49    @property
50    def store_type(self) -> StoreType:
51        return self.store.store_type
bucket_name: str
53    @property
54    def bucket_name(self) -> str:
55        return self.store.bucket
prefix: str
57    @property
58    def prefix(self) -> str:
59        return self.store.prefix
@abstractmethod
def list_connectors( self, *, support_level: SupportLevel | None = None, min_support_level: SupportLevel | None = None, connector_type: ConnectorType | None = None, language: ConnectorLanguage | None = None) -> list[str]:
73    @abstractmethod
74    def list_connectors(
75        self,
76        *,
77        support_level: SupportLevel | None = None,
78        min_support_level: SupportLevel | None = None,
79        connector_type: ConnectorType | None = None,
80        language: ConnectorLanguage | None = None,
81    ) -> list[str]:
82        raise NotImplementedError(
83            _op_not_implemented_message(self.store_type, "list_connectors")
84        )
def list_connector_versions(self, connector_name: str) -> list[str]:
86    def list_connector_versions(self, connector_name: str) -> list[str]:
87        raise NotImplementedError(
88            _op_not_implemented_message(self.store_type, "list_connector_versions")
89        )
def get_connector_metadata( self, connector_name: str, version: str = 'latest') -> dict[str, typing.Any]:
91    def get_connector_metadata(
92        self, connector_name: str, version: str = "latest"
93    ) -> dict[str, Any]:
94        raise NotImplementedError(
95            _op_not_implemented_message(self.store_type, "get_connector_metadata")
96        )
def progressive_rollout_create( self, repo_path: pathlib.Path, connector_name: str, dry_run: bool = False) -> ConnectorPublishResult:
102    def progressive_rollout_create(
103        self,
104        repo_path: Path,
105        connector_name: str,
106        dry_run: bool = False,
107    ) -> ConnectorPublishResult:
108        raise NotImplementedError(
109            _op_not_implemented_message(self.store_type, "progressive_rollout_create")
110        )
def progressive_rollout_cleanup( self, repo_path: pathlib.Path, connector_name: str, dry_run: bool = False) -> ConnectorPublishResult:
112    def progressive_rollout_cleanup(
113        self,
114        repo_path: Path,
115        connector_name: str,
116        dry_run: bool = False,
117    ) -> ConnectorPublishResult:
118        raise NotImplementedError(
119            _op_not_implemented_message(self.store_type, "progressive_rollout_cleanup")
120        )
def yank( self, connector_name: str, version: str, reason: str = '', dry_run: bool = False) -> YankResult:
122    def yank(
123        self,
124        connector_name: str,
125        version: str,
126        reason: str = "",
127        dry_run: bool = False,
128    ) -> YankResult:
129        raise NotImplementedError(_op_not_implemented_message(self.store_type, "yank"))
def unyank( self, connector_name: str, version: str, dry_run: bool = False) -> YankResult:
131    def unyank(
132        self,
133        connector_name: str,
134        version: str,
135        dry_run: bool = False,
136    ) -> YankResult:
137        raise NotImplementedError(
138            _op_not_implemented_message(self.store_type, "unyank")
139        )
def publish_version_artifacts( self, connector_name: str, version: str, artifacts_dir: pathlib.Path, dry_run: bool = False, with_validate: bool = True) -> PublishArtifactsResult:
141    def publish_version_artifacts(
142        self,
143        connector_name: str,
144        version: str,
145        artifacts_dir: Path,
146        dry_run: bool = False,
147        with_validate: bool = True,
148    ) -> PublishArtifactsResult:
149        raise NotImplementedError(
150            _op_not_implemented_message(self.store_type, "publish_version_artifacts")
151        )
def delete_dev_latest( self, connector_name: list[str] | None = None, dry_run: bool = False) -> PurgeLatestResult:
153    def delete_dev_latest(
154        self,
155        connector_name: list[str] | None = None,
156        dry_run: bool = False,
157    ) -> PurgeLatestResult:
158        raise NotImplementedError(
159            _op_not_implemented_message(self.store_type, "delete_dev_latest")
160        )
def compile( self, connector_name: list[str] | None = None, dry_run: bool = False, with_secrets_mask: bool = False, with_legacy_migration: str | None = None, force: bool = False) -> CompileResult:
162    def compile(
163        self,
164        connector_name: list[str] | None = None,
165        dry_run: bool = False,
166        with_secrets_mask: bool = False,
167        with_legacy_migration: str | None = None,
168        force: bool = False,
169    ) -> CompileResult:
170        raise NotImplementedError(
171            _op_not_implemented_message(self.store_type, "compile")
172        )
def marketing_stubs_check(self, repo_root: pathlib.Path) -> dict[str, typing.Any]:
174    def marketing_stubs_check(self, repo_root: Path) -> dict[str, Any]:
175        raise NotImplementedError(
176            _op_not_implemented_message(self.store_type, "marketing_stubs_check")
177        )
def marketing_stubs_sync( self, repo_root: pathlib.Path, dry_run: bool = False) -> dict[str, typing.Any]:
179    def marketing_stubs_sync(
180        self,
181        repo_root: Path,
182        dry_run: bool = False,
183    ) -> dict[str, Any]:
184        raise NotImplementedError(
185            _op_not_implemented_message(self.store_type, "marketing_stubs_sync")
186        )
def mirror( self, output_mode: Literal['local', 'gcs', 's3'], output_path_root: str | None = None, gcs_bucket: str | None = None, s3_bucket: str | None = None, dry_run: bool = False, connector_name: list[str] | None = None) -> RebuildResult:
188    def mirror(
189        self,
190        output_mode: OutputMode,
191        output_path_root: str | None = None,
192        gcs_bucket: str | None = None,
193        s3_bucket: str | None = None,
194        dry_run: bool = False,
195        connector_name: list[str] | None = None,
196    ) -> RebuildResult:
197        raise NotImplementedError(
198            _op_not_implemented_message(self.store_type, "mirror")
199        )
class RegistryEntryResult(pydantic.main.BaseModel):
 93class RegistryEntryResult(BaseModel):
 94    """Result of reading a registry entry from GCS.
 95
 96    This model wraps the raw metadata dictionary with additional context.
 97    """
 98
 99    connector_name: str = Field(description="The connector technical name")
100    version: str = Field(description="The version that was read")
101    bucket_name: str = Field(description="The GCS bucket name")
102    gcs_path: str = Field(description="The GCS path that was read")
103    metadata: dict = Field(description="The raw metadata dictionary")

Result of reading a registry entry from GCS.

This model wraps the raw metadata dictionary with additional context.

connector_name: str = PydanticUndefined

The connector technical name

version: str = PydanticUndefined

The version that was read

bucket_name: str = PydanticUndefined

The GCS bucket name

gcs_path: str = PydanticUndefined

The GCS path that was read

metadata: dict = PydanticUndefined

The raw metadata dictionary

@dataclass(frozen=True, kw_only=True)
class RegistryStore:
132@dataclass(frozen=True, kw_only=True)
133class RegistryStore:
134    """Parsed store target (type + environment + optional prefix).
135
136    Examples::
137
138        RegistryStore.parse("coral:dev")
139        # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="")
140        RegistryStore.parse("coral:dev/aj-test")
141        # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test")
142        RegistryStore.parse("sonar:prod")
143        # -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="")
144    """
145
146    store_type: StoreType
147    env: str
148    prefix: str = ""
149
150    # -- Derived helpers -----------------------------------------------------
151
152    @property
153    def bucket(self) -> str:
154        """Resolve the concrete bucket name for this target."""
155        env_map = BUCKET_MAP.get(self.store_type)
156        if env_map is None:
157            raise ValueError(f"Unknown store type: {self.store_type!r}")
158        bucket_name = env_map.get(self.env)
159        if bucket_name is None:
160            raise ValueError(
161                f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. "
162                f"Expected one of: {', '.join(sorted(env_map))}."
163            )
164        return bucket_name
165
166    @property
167    def bucket_root(self) -> str:
168        """Bucket name with optional prefix appended (`bucket/prefix`)."""
169        if self.prefix:
170            return f"{self.bucket}/{self.prefix}"
171        return self.bucket
172
173    # -- Parsing -------------------------------------------------------------
174
175    @classmethod
176    def parse(cls, target: str) -> RegistryStore:
177        """Parse a store target string.
178
179        Accepted formats:
180
181            "coral:dev"
182
183            "coral:prod"
184            "coral:dev/aj-test100"
185            "sonar:prod"
186
187        Raises:
188            ValueError: If the string cannot be parsed or references an
189                unknown store type / environment.
190        """
191        if ":" not in target:
192            raise ValueError(
193                f"Invalid store target '{target}'. "
194                "Expected format: '<store_type>:<env>[/<prefix>]' "
195                "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')."
196            )
197
198        store_part, env_part = target.split(":", 1)
199
200        # Validate store type
201        store_part_lower = store_part.lower()
202        valid_types = {t.value: t for t in StoreType}
203        if store_part_lower not in valid_types:
204            raise ValueError(
205                f"Unknown store type '{store_part}'. "
206                f"Expected one of: {', '.join(sorted(valid_types))}."
207            )
208        store_type = valid_types[store_part_lower]
209
210        # Split env and prefix
211        env_key, _, prefix = env_part.partition("/")
212        prefix = prefix.strip("/")
213
214        # Validate env
215        env_map = BUCKET_MAP.get(store_type, {})
216        if env_key not in env_map:
217            raise ValueError(
218                f"Unknown environment '{env_key}' for store type '{store_type.value}'. "
219                f"Expected one of: {', '.join(sorted(env_map))}."
220            )
221
222        return cls(store_type=store_type, env=env_key, prefix=prefix)

Parsed store target (type + environment + optional prefix).

Examples::

RegistryStore.parse("coral:dev")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="")
RegistryStore.parse("coral:dev/aj-test")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test")
RegistryStore.parse("sonar:prod")
# -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="")
RegistryStore( *, store_type: StoreType, env: str, prefix: str = '')
store_type: StoreType
env: str
prefix: str = ''
bucket: str
152    @property
153    def bucket(self) -> str:
154        """Resolve the concrete bucket name for this target."""
155        env_map = BUCKET_MAP.get(self.store_type)
156        if env_map is None:
157            raise ValueError(f"Unknown store type: {self.store_type!r}")
158        bucket_name = env_map.get(self.env)
159        if bucket_name is None:
160            raise ValueError(
161                f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. "
162                f"Expected one of: {', '.join(sorted(env_map))}."
163            )
164        return bucket_name

Resolve the concrete bucket name for this target.

bucket_root: str
166    @property
167    def bucket_root(self) -> str:
168        """Bucket name with optional prefix appended (`bucket/prefix`)."""
169        if self.prefix:
170            return f"{self.bucket}/{self.prefix}"
171        return self.bucket

Bucket name with optional prefix appended (bucket/prefix).

@classmethod
def parse(cls, target: str) -> RegistryStore:
175    @classmethod
176    def parse(cls, target: str) -> RegistryStore:
177        """Parse a store target string.
178
179        Accepted formats:
180
181            "coral:dev"
182
183            "coral:prod"
184            "coral:dev/aj-test100"
185            "sonar:prod"
186
187        Raises:
188            ValueError: If the string cannot be parsed or references an
189                unknown store type / environment.
190        """
191        if ":" not in target:
192            raise ValueError(
193                f"Invalid store target '{target}'. "
194                "Expected format: '<store_type>:<env>[/<prefix>]' "
195                "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')."
196            )
197
198        store_part, env_part = target.split(":", 1)
199
200        # Validate store type
201        store_part_lower = store_part.lower()
202        valid_types = {t.value: t for t in StoreType}
203        if store_part_lower not in valid_types:
204            raise ValueError(
205                f"Unknown store type '{store_part}'. "
206                f"Expected one of: {', '.join(sorted(valid_types))}."
207            )
208        store_type = valid_types[store_part_lower]
209
210        # Split env and prefix
211        env_key, _, prefix = env_part.partition("/")
212        prefix = prefix.strip("/")
213
214        # Validate env
215        env_map = BUCKET_MAP.get(store_type, {})
216        if env_key not in env_map:
217            raise ValueError(
218                f"Unknown environment '{env_key}' for store type '{store_type.value}'. "
219                f"Expected one of: {', '.join(sorted(env_map))}."
220            )
221
222        return cls(store_type=store_type, env=env_key, prefix=prefix)

Parse a store target string.

Accepted formats:

"coral:dev"

"coral:prod" "coral:dev/aj-test100" "sonar:prod"

Raises:
  • ValueError: If the string cannot be parsed or references an unknown store type / environment.
class StoreType(builtins.str, enum.Enum):
 59class StoreType(str, Enum):
 60    """Registry store type identifier."""
 61
 62    SONAR = "sonar"
 63    CORAL = "coral"
 64
 65    # -- Auto-detection class methods ----------------------------------------
 66
 67    @classmethod
 68    def get_from_connector_name(cls, name: str) -> StoreType:
 69        """Infer the store type from a connector's technical name.
 70
 71        Connectors whose name starts with `source-` or `destination-` belong
 72        to the **coral** registry.  All other names belong to **sonar**.
 73
 74        Args:
 75            name: Connector technical name (e.g. `"source-github"` or `"stripe"`).
 76
 77        Returns:
 78            The inferred `StoreType`.
 79        """
 80        if name.startswith("source-") or name.startswith("destination-"):
 81            return cls.CORAL
 82        return cls.SONAR
 83
 84    @classmethod
 85    def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None:
 86        """Infer the store type from a repository working directory.
 87
 88        Checks for well-known directory markers:
 89
 90        * **sonar** -- `integrations/` alongside `connector-sdk/`
 91        * **coral** -- `airbyte-integrations/connectors/`
 92
 93        Args:
 94            path: Directory to inspect.  Defaults to `Path.cwd`.
 95
 96        Returns:
 97            The inferred `StoreType`, or `None` if the directory does
 98            not match any known registry repository layout.
 99        """
100        if path is None:
101            path = Path.cwd()
102
103        # Sonar markers
104        if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir():
105            return cls.SONAR
106
107        # Coral markers
108        if (path / "airbyte-integrations" / "connectors").is_dir():
109            return cls.CORAL
110
111        return None

Registry store type identifier.

SONAR = <StoreType.SONAR: 'sonar'>
CORAL = <StoreType.CORAL: 'coral'>
@classmethod
def get_from_connector_name(cls, name: str) -> StoreType:
67    @classmethod
68    def get_from_connector_name(cls, name: str) -> StoreType:
69        """Infer the store type from a connector's technical name.
70
71        Connectors whose name starts with `source-` or `destination-` belong
72        to the **coral** registry.  All other names belong to **sonar**.
73
74        Args:
75            name: Connector technical name (e.g. `"source-github"` or `"stripe"`).
76
77        Returns:
78            The inferred `StoreType`.
79        """
80        if name.startswith("source-") or name.startswith("destination-"):
81            return cls.CORAL
82        return cls.SONAR

Infer the store type from a connector's technical name.

Connectors whose name starts with source- or destination- belong to the coral registry. All other names belong to sonar.

Arguments:
  • name: Connector technical name (e.g. "source-github" or "stripe").
Returns:

The inferred StoreType.

@classmethod
def detect_from_repo_dir( cls, path: pathlib.Path | None = None) -> StoreType | None:
 84    @classmethod
 85    def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None:
 86        """Infer the store type from a repository working directory.
 87
 88        Checks for well-known directory markers:
 89
 90        * **sonar** -- `integrations/` alongside `connector-sdk/`
 91        * **coral** -- `airbyte-integrations/connectors/`
 92
 93        Args:
 94            path: Directory to inspect.  Defaults to `Path.cwd`.
 95
 96        Returns:
 97            The inferred `StoreType`, or `None` if the directory does
 98            not match any known registry repository layout.
 99        """
100        if path is None:
101            path = Path.cwd()
102
103        # Sonar markers
104        if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir():
105            return cls.SONAR
106
107        # Coral markers
108        if (path / "airbyte-integrations" / "connectors").is_dir():
109            return cls.CORAL
110
111        return None

Infer the store type from a repository working directory.

Checks for well-known directory markers:

  • sonar -- integrations/ alongside connector-sdk/
  • coral -- airbyte-integrations/connectors/
Arguments:
  • path: Directory to inspect. Defaults to Path.cwd.
Returns:

The inferred StoreType, or None if the directory does not match any known registry repository layout.

class SupportLevel(enum.StrEnum):
14class SupportLevel(StrEnum):
15    """Connector support levels ordered by precedence."""
16
17    ARCHIVED = "archived"
18    COMMUNITY = "community"
19    CERTIFIED = "certified"
20
21    @property
22    def precedence(self) -> int:
23        """Numeric precedence for ordering comparisons.
24
25        Higher values indicate higher support commitment.
26        """
27        return _SUPPORT_LEVEL_PRECEDENCE[self]
28
29    @classmethod
30    def from_precedence(cls, precedence: int) -> SupportLevel:
31        """Look up a `SupportLevel` by its numeric precedence value.
32
33        Raises `ValueError` when the precedence is not recognised.
34        """
35        for member in cls:
36            if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence:
37                return member
38        valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
39        raise ValueError(
40            f"Unrecognized support-level precedence: {precedence!r}. "
41            f"Expected one of: {valid}."
42        ) from None
43
44    @classmethod
45    def parse(cls, value: str) -> SupportLevel:
46        """Parse a string into a `SupportLevel`.
47
48        Accepts a keyword (`archived`, `community`, `certified`)
49        or a legacy integer string (`100`, `200`, `300`).
50
51        Raises `ValueError` when the value is not recognised.
52        """
53        try:
54            return cls(value)
55        except ValueError:
56            pass
57        # Fallback: try interpreting as an integer precedence value.
58        try:
59            return cls.from_precedence(int(value))
60        except (ValueError, KeyError):
61            pass
62        valid_kw = ", ".join(f"`{m.value}`" for m in cls)
63        valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
64        raise ValueError(
65            f"Unrecognized support level: {value!r}. "
66            f"Expected keyword ({valid_kw}) or integer ({valid_int})."
67        ) from None

Connector support levels ordered by precedence.

ARCHIVED = <SupportLevel.ARCHIVED: 'archived'>
COMMUNITY = <SupportLevel.COMMUNITY: 'community'>
CERTIFIED = <SupportLevel.CERTIFIED: 'certified'>
precedence: int
21    @property
22    def precedence(self) -> int:
23        """Numeric precedence for ordering comparisons.
24
25        Higher values indicate higher support commitment.
26        """
27        return _SUPPORT_LEVEL_PRECEDENCE[self]

Numeric precedence for ordering comparisons.

Higher values indicate higher support commitment.

@classmethod
def from_precedence(cls, precedence: int) -> SupportLevel:
29    @classmethod
30    def from_precedence(cls, precedence: int) -> SupportLevel:
31        """Look up a `SupportLevel` by its numeric precedence value.
32
33        Raises `ValueError` when the precedence is not recognised.
34        """
35        for member in cls:
36            if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence:
37                return member
38        valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
39        raise ValueError(
40            f"Unrecognized support-level precedence: {precedence!r}. "
41            f"Expected one of: {valid}."
42        ) from None

Look up a SupportLevel by its numeric precedence value.

Raises ValueError when the precedence is not recognised.

@classmethod
def parse(cls, value: str) -> SupportLevel:
44    @classmethod
45    def parse(cls, value: str) -> SupportLevel:
46        """Parse a string into a `SupportLevel`.
47
48        Accepts a keyword (`archived`, `community`, `certified`)
49        or a legacy integer string (`100`, `200`, `300`).
50
51        Raises `ValueError` when the value is not recognised.
52        """
53        try:
54            return cls(value)
55        except ValueError:
56            pass
57        # Fallback: try interpreting as an integer precedence value.
58        try:
59            return cls.from_precedence(int(value))
60        except (ValueError, KeyError):
61            pass
62        valid_kw = ", ".join(f"`{m.value}`" for m in cls)
63        valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
64        raise ValueError(
65            f"Unrecognized support level: {value!r}. "
66            f"Expected keyword ({valid_kw}) or integer ({valid_int})."
67        ) from None

Parse a string into a SupportLevel.

Accepts a keyword (archived, community, certified) or a legacy integer string (100, 200, 300).

Raises ValueError when the value is not recognised.

@dataclass
class UnpublishedConnector:
29@dataclass
30class UnpublishedConnector:
31    """A connector whose current local version is not published on GCS."""
32
33    connector_name: str
34    local_version: str

A connector whose current local version is not published on GCS.

UnpublishedConnector(connector_name: str, local_version: str)
connector_name: str
local_version: str
@dataclass(frozen=True)
class ValidateOptions:
37@dataclass(frozen=True)
38class ValidateOptions:
39    """Options that influence which validators run and how."""
40
41    docs_path: str | None = None
42    """Path to the connector's documentation file (for `validate_docs_path_exists`)."""
43
44    is_prerelease: bool = False
45    """Whether this is a pre-release build (skips version-decrement checks)."""

Options that influence which validators run and how.

ValidateOptions(docs_path: str | None = None, is_prerelease: bool = False)
docs_path: str | None = None

Path to the connector's documentation file (for validate_docs_path_exists).

is_prerelease: bool = False

Whether this is a pre-release build (skips version-decrement checks).

@dataclass
class ValidationResult:
48@dataclass
49class ValidationResult:
50    """Aggregate result from running all validators."""
51
52    passed: bool = True
53    errors: list[str] = field(default_factory=list)
54    validators_run: int = 0
55
56    def add_error(self, message: str) -> None:
57        self.passed = False
58        self.errors.append(message)

Aggregate result from running all validators.

ValidationResult( passed: bool = True, errors: list[str] = <factory>, validators_run: int = 0)
passed: bool = True
errors: list[str]
validators_run: int = 0
def add_error(self, message: str) -> None:
56    def add_error(self, message: str) -> None:
57        self.passed = False
58        self.errors.append(message)
class VersionListResult(pydantic.main.BaseModel):
114class VersionListResult(BaseModel):
115    """Result of listing versions for a connector."""
116
117    connector_name: str = Field(description="The connector technical name")
118    bucket_name: str = Field(description="The GCS bucket name")
119    version_count: int = Field(description="Number of versions found")
120    versions: list[str] = Field(description="List of version strings")

Result of listing versions for a connector.

connector_name: str = PydanticUndefined

The connector technical name

bucket_name: str = PydanticUndefined

The GCS bucket name

version_count: int = PydanticUndefined

Number of versions found

versions: list[str] = PydanticUndefined

List of version strings

@dataclass
class YankResult:
30@dataclass
31class YankResult:
32    """Result of a yank or unyank operation."""
33
34    connector_name: str
35    version: str
36    bucket_name: str
37    action: str  # "yank" or "unyank"
38    success: bool
39    message: str
40    dry_run: bool = False
41
42    def to_dict(self) -> dict[str, Any]:
43        """Convert to a dictionary for JSON serialization."""
44        return {
45            "connector_name": self.connector_name,
46            "version": self.version,
47            "bucket_name": self.bucket_name,
48            "action": self.action,
49            "success": self.success,
50            "message": self.message,
51            "dry_run": self.dry_run,
52        }

Result of a yank or unyank operation.

YankResult( connector_name: str, version: str, bucket_name: str, action: str, success: bool, message: str, dry_run: bool = False)
connector_name: str
version: str
bucket_name: str
action: str
success: bool
message: str
dry_run: bool = False
def to_dict(self) -> dict[str, typing.Any]:
42    def to_dict(self) -> dict[str, Any]:
43        """Convert to a dictionary for JSON serialization."""
44        return {
45            "connector_name": self.connector_name,
46            "version": self.version,
47            "bucket_name": self.bucket_name,
48            "action": self.action,
49            "success": self.success,
50            "message": self.message,
51            "dry_run": self.dry_run,
52        }

Convert to a dictionary for JSON serialization.

def compile_registry( *, store: RegistryStore, connector_name: list[str] | None = None, dry_run: bool = False, with_secrets_mask: bool = False, with_legacy_migration: str | None = None, force: bool = False) -> CompileResult:
1500def compile_registry(
1501    *,
1502    store: RegistryStore,
1503    connector_name: list[str] | None = None,
1504    dry_run: bool = False,
1505    with_secrets_mask: bool = False,
1506    with_legacy_migration: str | None = None,
1507    force: bool = False,
1508) -> CompileResult:
1509    """Compile the registry: sync latest/ dirs and write index files.
1510
1511    Steps:
1512        1. Glob for all `metadata.yaml` to discover (connector, version) pairs.
1513        2. Glob for `version-yank.yml` to build the yanked set.
1514        3. Compute the latest GA semver per connector.
1515        4. Glob for `version=*` markers in `latest/` dirs for a fast check.
1516        5. Delete stale `latest/` dirs and recursively copy the versioned dir.
1517        5m. (Optional) Legacy migration: delete disabled registry entries.
1518        6. Write global registry JSONs and per-connector `versions.json`.
1519        6c. (Optional) Regenerate `specs_secrets_mask.yaml`.
1520
1521    Args:
1522        store: Registry store (bucket + optional prefix).
1523        connector_name: If provided, only resync `latest/` directories for
1524            these connectors (steps 4-5).  Index rebuilds (steps 6a-6c)
1525            always operate on the full set of connectors so that global
1526            registry files remain complete.
1527        dry_run: If True, report what would be done without writing.
1528        with_secrets_mask: If True, regenerate `specs_secrets_mask.yaml`.
1529        with_legacy_migration: If set, run the named migration step.
1530            Currently supported: `"v1"` — delete `{registry_type}.json`
1531            files for connectors whose `registryOverrides.{registry}.enabled`
1532            is `false`.
1533        force: If True, resync all connectors' latest/ directories even if the
1534            existing version marker matches the computed latest version. This
1535            is useful when metadata content changes without a version bump.
1536
1537    Returns:
1538        A `CompileResult` describing what was done.
1539    """
1540    if with_legacy_migration and with_legacy_migration not in LEGACY_MIGRATION_VERSIONS:
1541        raise ValueError(
1542            f"Unknown legacy migration version: {with_legacy_migration!r}. "
1543            f"Supported: {', '.join(LEGACY_MIGRATION_VERSIONS)}"
1544        )
1545
1546    result = CompileResult(target=store.bucket_root, dry_run=dry_run)
1547
1548    token = get_gcs_credentials_token()
1549    fs = gcsfs.GCSFileSystem(token=token)
1550
1551    # --- Step 1 + 2: Scan versions and yanks ---
1552    # Always scan ALL connectors so that index rebuilds (step 6) are complete.
1553    _log_progress("Step 1-2: Scanning versions and yank markers...")
1554    connector_versions, yanked = _scan_versions_and_yanks(
1555        fs,
1556        store=store,
1557        connector_name=None,
1558    )
1559    result.connectors_scanned = len(connector_versions)
1560    result.versions_found = sum(len(v) for v in connector_versions.values())
1561    result.yanked_versions = len(yanked)
1562    _log_progress(
1563        "  Found %d connectors, %d versions, %d yanked",
1564        result.connectors_scanned,
1565        result.versions_found,
1566        result.yanked_versions,
1567    )
1568
1569    # --- Step 2b: Scan release candidates ---
1570    _log_progress("Step 2b: Scanning for active release candidates...")
1571    rc_versions = _scan_release_candidates(
1572        fs,
1573        store=store,
1574        connector_name=connector_name,
1575    )
1576    _log_progress("  Found %d active release candidates", len(rc_versions))
1577
1578    # --- Step 3: Compute latest ---
1579    _log_progress("Step 3: Computing latest GA version per connector...")
1580    latest_versions = _compute_latest_versions(
1581        connector_versions=connector_versions,
1582        yanked=yanked,
1583        fs=fs,
1584        store=store,
1585    )
1586    _log_progress("  Computed latest for %d connectors", len(latest_versions))
1587
1588    # --- Step 4: Check existing latest markers ---
1589    # When --connector-name is set, only check/sync those connectors (steps 4-5).
1590    # Index rebuilds in step 6 always use the full unfiltered data.
1591    if connector_name:
1592        connector_name_set = set(connector_name)
1593        sync_scope = {
1594            c: v for c, v in latest_versions.items() if c in connector_name_set
1595        }
1596        _log_progress(
1597            "  --connector-name filter: syncing %d of %d connectors",
1598            len(sync_scope),
1599            len(latest_versions),
1600        )
1601    else:
1602        sync_scope = latest_versions
1603
1604    _log_progress("Step 4: Checking existing latest/ markers...")
1605    sync_scope_names = list(sync_scope) if connector_name else None
1606    existing_markers = _scan_latest_markers(
1607        fs,
1608        store=store,
1609        connector_name=sync_scope_names,
1610    )
1611    _log_progress("  Found %d existing markers", len(existing_markers))
1612
1613    stale_connectors: list[str] = []
1614    for connector, expected_version in sync_scope.items():
1615        current_marker = existing_markers.get(connector)
1616        if not force and current_marker == expected_version:
1617            result.latest_already_current += 1
1618        else:
1619            stale_connectors.append(connector)
1620
1621    _log_progress(
1622        "  %d connectors need latest/ update, %d already current",
1623        len(stale_connectors),
1624        result.latest_already_current,
1625    )
1626
1627    # --- Step 5: Resync stale latest/ dirs (parallel) ---
1628    if stale_connectors:
1629        _log_progress(
1630            "Step 5: Syncing %d stale latest/ directories (max_workers=%d)...",
1631            len(stale_connectors),
1632            _COMPILE_SYNC_MAX_WORKERS,
1633        )
1634
1635        def _sync_one_connector(connector: str) -> None:
1636            """Sync a single connector's latest/ dir."""
1637            version = latest_versions[connector]
1638            _sync_latest_dir(
1639                fs,
1640                store=store,
1641                connector=connector,
1642                version=version,
1643                dry_run=dry_run,
1644            )
1645            if not dry_run:
1646                _apply_overrides_to_latest_entry(
1647                    fs,
1648                    store=store,
1649                    connector=connector,
1650                    version=version,
1651                )
1652
1653        sorted_stale = sorted(stale_connectors)
1654        with ThreadPoolExecutor(max_workers=_COMPILE_SYNC_MAX_WORKERS) as pool:
1655            futures = {pool.submit(_sync_one_connector, c): c for c in sorted_stale}
1656            for i, future in enumerate(as_completed(futures), 1):
1657                connector = futures[future]
1658                try:
1659                    future.result()
1660                    result.latest_updated += 1
1661                except Exception as exc:
1662                    error_msg = f"Failed to sync latest/ for {connector}: {exc}"
1663                    logger.error(error_msg)
1664                    result.errors.append(error_msg)
1665                    # Delete the (possibly partial) latest/ dir so the next
1666                    # compile retries this connector from scratch.
1667                    try:
1668                        _delete_latest_dir(
1669                            fs,
1670                            store=store,
1671                            connector=connector,
1672                        )
1673                        logger.info(
1674                            "Cleaned up partial latest/ for %s after failure",
1675                            connector,
1676                        )
1677                    except Exception as cleanup_exc:
1678                        logger.warning(
1679                            "Could not clean up latest/ for %s: %s",
1680                            connector,
1681                            cleanup_exc,
1682                        )
1683                if i % 100 == 0:
1684                    _log_progress("  Synced %d / %d...", i, len(sorted_stale))
1685    else:
1686        _log_progress("Step 5: All latest/ directories are current, nothing to sync.")
1687
1688    # --- Step 5m: Legacy migration (optional) ---
1689    if with_legacy_migration == "v1":
1690        _log_progress(
1691            "Step 5m: Legacy migration v1 — deleting disabled registry entries..."
1692        )
1693        migration_deleted = _cleanup_disabled_registry_entries(
1694            fs,
1695            store=store,
1696            connector_versions=connector_versions,
1697            dry_run=dry_run,
1698        )
1699        total_deleted = sum(len(v) for v in migration_deleted.values())
1700        if migration_deleted:
1701            for conn, paths in sorted(migration_deleted.items()):
1702                _log_progress(
1703                    "  %s: %s %d files",
1704                    conn,
1705                    "would delete" if dry_run else "deleted",
1706                    len(paths),
1707                )
1708        _log_progress(
1709            "  Migration v1: %s %d files across %d connectors",
1710            "would delete" if dry_run else "deleted",
1711            total_deleted,
1712            len(migration_deleted),
1713        )
1714
1715    # --- Step 6a: Compile global registry JSONs ---
1716    _log_progress("Step 6a: Compiling global registry JSON files...")
1717    all_registry_entries: list[dict[str, Any]] = []  # collected for Step 6c
1718    entries_by_registry_type: dict[str, list[dict[str, Any]]] = {}
1719    for registry_type in ("cloud", "oss"):
1720        entries = _compile_global_registry(
1721            fs,
1722            store=store,
1723            latest_versions=latest_versions,
1724            registry_type=registry_type,
1725        )
1726
1727        # Inject release candidate info into entries that have active RCs.
1728        # For each connector with a release_candidate/metadata.yaml, read the
1729        # RC version's {registry_type}.json and add it under
1730        # releases.releaseCandidates[version] — matching the legacy format.
1731        if rc_versions:
1732            rc_entries: dict[str, dict[str, Any]] = {}
1733            for connector, rc_ver in rc_versions.items():
1734                rc_entry = _read_rc_registry_entry(
1735                    fs,
1736                    store=store,
1737                    connector=connector,
1738                    rc_version=rc_ver,
1739                    registry_type=registry_type,
1740                )
1741                if rc_entry:
1742                    docker_repo = rc_entry.get(
1743                        "dockerRepository",
1744                        f"airbyte/{connector}",
1745                    )
1746                    rc_entries[docker_repo] = {
1747                        "version": rc_ver,
1748                        "entry": rc_entry,
1749                    }
1750            if rc_entries:
1751                entries = _apply_release_candidates_to_entries(entries, rc_entries)
1752                _log_progress(
1753                    "  Injected %d release candidates into %s registry",
1754                    len(rc_entries),
1755                    registry_type,
1756                )
1757
1758        all_registry_entries.extend(entries)
1759        entries_by_registry_type[registry_type] = entries
1760        registry_json = _build_global_registry_json(entries)
1761        entry_count = len(registry_json["sources"]) + len(registry_json["destinations"])
1762
1763        if registry_type == "cloud":
1764            result.cloud_registry_entries = entry_count
1765        else:
1766            result.oss_registry_entries = entry_count
1767
1768        if dry_run:
1769            _log_progress(
1770                "  [DRY RUN] Would write %s_registry.json (%d entries)",
1771                registry_type,
1772                entry_count,
1773            )
1774        else:
1775            content = json.dumps(registry_json, indent=2, sort_keys=True) + "\n"
1776            path_prefix = f"{store.prefix}/" if store.prefix else ""
1777            _write_gcs_blob_with_custom_ttl(
1778                bucket_name=store.bucket,
1779                blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/{registry_type}_registry.json",
1780                content=content,
1781                cache_control=_REGISTRY_INDEX_CACHE_CONTROL,
1782            )
1783            _log_progress(
1784                "  Wrote %s_registry.json (%d entries)",
1785                registry_type,
1786                entry_count,
1787            )
1788
1789    # --- Step 6a.2: Compile composite registry JSON (superset) ---
1790    _log_progress("Step 6a.2: Compiling composite_registry.json (superset)...")
1791    composite_json = _build_composite_registry_json(
1792        cloud_entries=entries_by_registry_type.get("cloud", []),
1793        oss_entries=entries_by_registry_type.get("oss", []),
1794    )
1795    composite_entry_count = len(composite_json["sources"]) + len(
1796        composite_json["destinations"]
1797    )
1798    result.composite_registry_entries = composite_entry_count
1799    if dry_run:
1800        _log_progress(
1801            "  [DRY RUN] Would write composite_registry.json (%d entries)",
1802            composite_entry_count,
1803        )
1804    else:
1805        composite_content = json.dumps(composite_json, indent=2, sort_keys=True) + "\n"
1806        path_prefix = f"{store.prefix}/" if store.prefix else ""
1807        _write_gcs_blob_with_custom_ttl(
1808            bucket_name=store.bucket,
1809            blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/composite_registry.json",
1810            content=composite_content,
1811            cache_control=_REGISTRY_INDEX_CACHE_CONTROL,
1812        )
1813        _log_progress(
1814            "  Wrote composite_registry.json (%d entries)",
1815            composite_entry_count,
1816        )
1817
1818    # --- Step 6b: Per-connector version indexes (parallel) ---
1819    _log_progress(
1820        "Step 6b: Writing per-connector version indexes (max_workers=%d)...",
1821        _COMPILE_WRITE_MAX_WORKERS,
1822    )
1823    base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte"
1824    sorted_connectors = sorted(connector_versions)
1825
1826    def _write_one_version_index(connector: str) -> None:
1827        """Build and write a single connector's versions.json."""
1828        versions = connector_versions[connector]
1829        latest_v = latest_versions.get(connector)
1830        rc_v = rc_versions.get(connector)
1831        index = _build_version_index(
1832            fs,
1833            store=store,
1834            connector=connector,
1835            versions=versions,
1836            yanked=yanked,
1837            latest_version=latest_v,
1838            rc_version=rc_v,
1839        )
1840        index_path = f"{base}/{connector}/versions.json"
1841        if dry_run:
1842            _log_progress(
1843                "  [DRY RUN] Would write %s/versions.json (%d versions)",
1844                connector,
1845                len(versions),
1846            )
1847        else:
1848            content = json.dumps(index, indent=2, sort_keys=True) + "\n"
1849            with fs.open(index_path, "w") as f:
1850                f.write(content)
1851
1852    with ThreadPoolExecutor(max_workers=_COMPILE_WRITE_MAX_WORKERS) as pool:
1853        futures = {
1854            pool.submit(_write_one_version_index, c): c for c in sorted_connectors
1855        }
1856        for i, future in enumerate(as_completed(futures), 1):
1857            connector = futures[future]
1858            try:
1859                future.result()
1860                result.version_indexes_written += 1
1861            except Exception as exc:
1862                error_msg = f"Failed to write versions.json for {connector}: {exc}"
1863                logger.error(error_msg)
1864                result.errors.append(error_msg)
1865            if i % 100 == 0:
1866                _log_progress(
1867                    "  Wrote %d / %d version indexes...", i, len(sorted_connectors)
1868                )
1869
1870    # --- Step 6c: Specs secrets mask (optional) ---
1871    if with_secrets_mask:
1872        _log_progress("Step 6c: Generating specs secrets mask...")
1873        # Reuse entries collected during Step 6a to avoid redundant GCS reads.
1874        secret_names = _extract_secret_property_names(all_registry_entries)
1875        sorted_names = sorted(secret_names)
1876        result.specs_secrets_mask_properties = len(sorted_names)
1877        mask_content = yaml.dump({"properties": sorted_names}, default_flow_style=False)
1878        mask_path = (
1879            f"{store.bucket_root}/{_REGISTRIES_PREFIX}/{_SPECS_SECRETS_MASK_FILENAME}"
1880        )
1881
1882        _log_progress(
1883            "  Found %d secret properties: %s",
1884            len(sorted_names),
1885            ", ".join(sorted_names),
1886        )
1887
1888        if dry_run:
1889            _log_progress(
1890                "  [DRY RUN] Would write %s",
1891                _SPECS_SECRETS_MASK_FILENAME,
1892            )
1893        else:
1894            with fs.open(mask_path, "w") as f:
1895                f.write(mask_content)
1896            _log_progress(
1897                "  Wrote %s",
1898                _SPECS_SECRETS_MASK_FILENAME,
1899            )
1900
1901    _log_progress(result.summary())
1902    return result

Compile the registry: sync latest/ dirs and write index files.

Steps:
  1. Glob for all metadata.yaml to discover (connector, version) pairs.
  2. Glob for version-yank.yml to build the yanked set.
  3. Compute the latest GA semver per connector.
  4. Glob for version=* markers in latest/ dirs for a fast check.
  5. Delete stale latest/ dirs and recursively copy the versioned dir. 5m. (Optional) Legacy migration: delete disabled registry entries.
  6. Write global registry JSONs and per-connector versions.json. 6c. (Optional) Regenerate specs_secrets_mask.yaml.
Arguments:
  • store: Registry store (bucket + optional prefix).
  • connector_name: If provided, only resync latest/ directories for these connectors (steps 4-5). Index rebuilds (steps 6a-6c) always operate on the full set of connectors so that global registry files remain complete.
  • dry_run: If True, report what would be done without writing.
  • with_secrets_mask: If True, regenerate specs_secrets_mask.yaml.
  • with_legacy_migration: If set, run the named migration step. Currently supported: "v1" — delete {registry_type}.json files for connectors whose registryOverrides.{registry}.enabled is false.
  • force: If True, resync all connectors' latest/ directories even if the existing version marker matches the computed latest version. This is useful when metadata content changes without a version bump.
Returns:

A CompileResult describing what was done.

def create_progressive_rollout_blob( repo_path: pathlib.Path, connector_name: str, dry_run: bool = False, bucket_name: str | None = None) -> ConnectorPublishResult:
108def create_progressive_rollout_blob(
109    repo_path: Path,
110    connector_name: str,
111    dry_run: bool = False,
112    bucket_name: str | None = None,
113) -> ConnectorPublishResult:
114    """Create the `release_candidate/metadata.yaml` blob in GCS.
115
116    Copies the versioned metadata (e.g. `1.2.3-rc.1/metadata.yaml` or
117    `2.1.0/metadata.yaml` for GA progressive rollouts) to
118    `release_candidate/metadata.yaml` so the platform knows a progressive
119    rollout is active for this connector.
120
121    The connector's `metadata.yaml` on disk must declare a version that
122    is valid for progressive rollout (i.e. not a `-preview` build).
123    The versioned blob must already exist in GCS (i.e. the version must
124    have been published first).
125
126    Requires `GCS_CREDENTIALS` environment variable to be set.
127    """
128    if not bucket_name:
129        raise ValueError("bucket_name is required for progressive rollout create.")
130
131    metadata = get_connector_metadata(repo_path, connector_name)
132    version = metadata.docker_image_tag
133    docker_repo = metadata.docker_repository
134
135    if not is_valid_for_progressive_rollout(version):
136        return ConnectorPublishResult(
137            connector=metadata.name,
138            version=version,
139            action="progressive-rollout-create",
140            status="failure",
141            docker_image=f"{docker_repo}:{version}",
142            registry_updated=False,
143            message=f"Version '{version}' is not valid for progressive rollout. "
144            "Preview versions are not supported.",
145        )
146
147    gcp_connector_dir = f"{METADATA_FOLDER}/{docker_repo}"
148    versioned_path = f"{gcp_connector_dir}/{version}/{METADATA_FILE_NAME}"
149    rc_path = (
150        f"{gcp_connector_dir}/{RELEASE_CANDIDATE_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}"
151    )
152
153    if dry_run:
154        return ConnectorPublishResult(
155            connector=metadata.name,
156            version=version,
157            action="progressive-rollout-create",
158            status="dry-run",
159            docker_image=f"{docker_repo}:{version}",
160            registry_updated=False,
161            message=f"Would copy {versioned_path} to {rc_path}",
162        )
163
164    storage_client = get_gcs_storage_client()
165    gcs_bucket = storage_client.bucket(bucket_name)
166
167    # Verify the versioned blob exists
168    versioned_blob = gcs_bucket.blob(versioned_path)
169    if not versioned_blob.exists():
170        return ConnectorPublishResult(
171            connector=metadata.name,
172            version=version,
173            action="progressive-rollout-create",
174            status="failure",
175            docker_image=f"{docker_repo}:{version}",
176            registry_updated=False,
177            message=f"Versioned metadata not found: {versioned_path}. "
178            "The version must be published before creating the progressive rollout marker.",
179        )
180
181    # Copy the versioned metadata to the release_candidate/ path
182    gcs_bucket.copy_blob(versioned_blob, gcs_bucket, rc_path)
183    logger.info(
184        "Created progressive rollout metadata blob: %s (copied from %s)",
185        rc_path,
186        versioned_path,
187    )
188
189    return ConnectorPublishResult(
190        connector=metadata.name,
191        version=version,
192        action="progressive-rollout-create",
193        status="success",
194        docker_image=f"{docker_repo}:{version}",
195        registry_updated=True,
196        message=f"Created release_candidate/ blob for {metadata.name} at {rc_path} "
197        f"(copied from {versioned_path}).",
198    )

Create the release_candidate/metadata.yaml blob in GCS.

Copies the versioned metadata (e.g. 1.2.3-rc.1/metadata.yaml or 2.1.0/metadata.yaml for GA progressive rollouts) to release_candidate/metadata.yaml so the platform knows a progressive rollout is active for this connector.

The connector's metadata.yaml on disk must declare a version that is valid for progressive rollout (i.e. not a -preview build). The versioned blob must already exist in GCS (i.e. the version must have been published first).

Requires GCS_CREDENTIALS environment variable to be set.

def delete_progressive_rollout_blob( repo_path: pathlib.Path, connector_name: str, dry_run: bool = False, bucket_name: str | None = None) -> ConnectorPublishResult:
201def delete_progressive_rollout_blob(
202    repo_path: Path,
203    connector_name: str,
204    dry_run: bool = False,
205    bucket_name: str | None = None,
206) -> ConnectorPublishResult:
207    """Delete the `release_candidate/` metadata blob from GCS.
208
209    This is the only GCS operation needed when finalizing a progressive
210    rollout (both promote and rollback).  The versioned blob (e.g.
211    `1.2.3-rc.1/metadata.yaml`) is intentionally preserved as an
212    audit trail of what was actually deployed during the rollout.
213
214    Requires `GCS_CREDENTIALS` environment variable to be set.
215    """
216    if not bucket_name:
217        raise ValueError("bucket_name is required for progressive rollout cleanup.")
218
219    metadata = get_connector_metadata(repo_path, connector_name)
220    version = metadata.docker_image_tag
221    docker_repo = metadata.docker_repository
222
223    # NOTE: We intentionally do NOT gate on version format here.
224    # In the promote workflow the on-disk version will already be bumped
225    # to GA (e.g. 1.2.3) before cleanup runs.  The rc_path is derived
226    # solely from docker_repo + the fixed RELEASE_CANDIDATE_GCS_FOLDER_NAME
227    # constant, so the on-disk version is irrelevant.
228
229    gcp_connector_dir = f"{METADATA_FOLDER}/{docker_repo}"
230    rc_path = (
231        f"{gcp_connector_dir}/{RELEASE_CANDIDATE_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}"
232    )
233
234    if dry_run:
235        return ConnectorPublishResult(
236            connector=metadata.name,
237            version=version,
238            action="progressive-rollout-cleanup",
239            status="dry-run",
240            docker_image=f"{docker_repo}:{version}",
241            registry_updated=False,
242            message=f"Would delete release_candidate/ blob for {metadata.name} "
243            f"(version {version}) at {rc_path}",
244        )
245
246    storage_client = get_gcs_storage_client()
247    gcs_bucket = storage_client.bucket(bucket_name)
248    rc_blob = gcs_bucket.blob(rc_path)
249
250    if not rc_blob.exists():
251        logger.info(
252            "Progressive rollout metadata already deleted (idempotent): %s", rc_path
253        )
254        return ConnectorPublishResult(
255            connector=metadata.name,
256            version=version,
257            action="progressive-rollout-cleanup",
258            status="success",
259            docker_image=f"{docker_repo}:{version}",
260            registry_updated=False,
261            message=f"Progressive rollout metadata already deleted (no-op): {rc_path}",
262        )
263
264    rc_blob.delete()
265    logger.info("Deleted progressive rollout metadata blob: %s", rc_path)
266
267    return ConnectorPublishResult(
268        connector=metadata.name,
269        version=version,
270        action="progressive-rollout-cleanup",
271        status="success",
272        docker_image=f"{docker_repo}:{version}",
273        registry_updated=True,
274        message=f"Deleted release_candidate/ blob for {metadata.name} at {rc_path}.",
275    )

Delete the release_candidate/ metadata blob from GCS.

This is the only GCS operation needed when finalizing a progressive rollout (both promote and rollback). The versioned blob (e.g. 1.2.3-rc.1/metadata.yaml) is intentionally preserved as an audit trail of what was actually deployed during the rollout.

Requires GCS_CREDENTIALS environment variable to be set.

def find_unpublished_connectors( repo_path: str | pathlib.Path, bucket_name: str, connector_names: list[str] | None = None) -> AuditResult:
 90def find_unpublished_connectors(
 91    repo_path: str | Path,
 92    bucket_name: str,
 93    connector_names: list[str] | None = None,
 94) -> AuditResult:
 95    """Find connectors whose local version is not published on GCS.
 96
 97    For each connector in the local checkout, reads `dockerImageTag` from
 98    `metadata.yaml` and checks whether `metadata/<docker-repo>/<version>/metadata.yaml`
 99    exists in the GCS bucket.  Connectors that are archived, disabled on all
100    registries, or have RC versions are skipped.
101
102    Args:
103        repo_path: Path to the Airbyte monorepo checkout.
104        bucket_name: GCS bucket name to check against.
105        connector_names: Optional list of connector names to check.
106            If `None`, discovers all connectors in the repo.
107
108    Returns:
109        An `AuditResult` containing unpublished connectors and metadata.
110    """
111    repo_path = Path(repo_path)
112    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
113
114    if not connectors_dir.exists():
115        raise ValueError(f"Connectors directory not found: {connectors_dir}")
116
117    # Discover connector names if not provided
118    if connector_names is None:
119        connector_names = sorted(
120            d.name
121            for d in connectors_dir.iterdir()
122            if d.is_dir() and (d / METADATA_FILE_NAME).exists()
123        )
124
125    result = AuditResult()
126
127    # Collect connectors and their versions first, then batch-check GCS
128    to_check: list[tuple[str, str]] = []  # (connector_name, version)
129
130    for name in connector_names:
131        metadata_path = connectors_dir / name / METADATA_FILE_NAME
132        metadata = _read_local_metadata(metadata_path)
133        if metadata is None:
134            result.errors.append(f"{name}: metadata.yaml not found or unreadable")
135            continue
136
137        if _is_archived(metadata):
138            result.skipped_archived.append(name)
139            continue
140
141        if _is_disabled_on_all_registries(metadata):
142            result.skipped_disabled.append(name)
143            continue
144
145        data = metadata.get("data", {})
146        version = data.get("dockerImageTag")
147        if not version:
148            result.errors.append(f"{name}: no dockerImageTag in metadata")
149            continue
150
151        if _is_rc_version(version):
152            result.skipped_rc.append(name)
153            continue
154
155        to_check.append((name, version))
156
157    if not to_check:
158        result.checked_count = 0
159        return result
160
161    # Check GCS for each connector version
162    storage_client = get_gcs_storage_client()
163    bucket = storage_client.bucket(bucket_name)
164
165    for name, version in to_check:
166        result.checked_count += 1
167        blob_path = f"{METADATA_FOLDER}/airbyte/{name}/{version}/{METADATA_FILE_NAME}"
168        blob = bucket.blob(blob_path)
169
170        try:
171            exists = blob.exists()
172        except Exception as e:
173            result.errors.append(f"{name}: GCS check failed: {e}")
174            continue
175
176        if not exists:
177            logger.info(
178                "Unpublished: %s version %s (checked %s)",
179                name,
180                version,
181                blob_path,
182            )
183            result.unpublished.append(
184                UnpublishedConnector(connector_name=name, local_version=version)
185            )
186
187    logger.info(
188        "Audit complete: %d checked, %d unpublished, %d archived-skipped, %d disabled-skipped, %d rc-skipped",
189        result.checked_count,
190        len(result.unpublished),
191        len(result.skipped_archived),
192        len(result.skipped_disabled),
193        len(result.skipped_rc),
194    )
195
196    return result

Find connectors whose local version is not published on GCS.

For each connector in the local checkout, reads dockerImageTag from metadata.yaml and checks whether metadata/<docker-repo>/<version>/metadata.yaml exists in the GCS bucket. Connectors that are archived, disabled on all registries, or have RC versions are skipped.

Arguments:
  • repo_path: Path to the Airbyte monorepo checkout.
  • bucket_name: GCS bucket name to check against.
  • connector_names: Optional list of connector names to check. If None, discovers all connectors in the repo.
Returns:

An AuditResult containing unpublished connectors and metadata.

def generate_version_artifacts( metadata_file: pathlib.Path, docker_image: str, output_dir: pathlib.Path | None = None, repo_root: pathlib.Path | None = None, dry_run: bool = False, with_validate: bool = True, with_dependency_dump: bool = True, with_sbom: bool = True) -> GenerateResult:
617def generate_version_artifacts(
618    metadata_file: Path,
619    docker_image: str,
620    output_dir: Path | None = None,
621    repo_root: Path | None = None,
622    dry_run: bool = False,
623    with_validate: bool = True,
624    with_dependency_dump: bool = True,
625    with_sbom: bool = True,
626) -> GenerateResult:
627    """Generate all version artifacts for a connector release.
628
629    Artifacts are enriched with git commit info, SBOM URL, and (when applicable)
630    components SHA before writing.  Validation is run after generation by default.
631
632    Args:
633        metadata_file: Path to the connector's `metadata.yaml`.
634        docker_image: Docker image to run spec against (e.g. `airbyte/source-faker:6.2.38`).
635        output_dir: Directory to write artifacts to.  If `None`, a temp directory is created.
636        repo_root: Root of the Airbyte repo checkout (for resolving `doc.md`).
637            If `None`, inferred by walking up from `metadata_file`.
638        dry_run: If `True`, report what would be generated without writing or running docker.
639        with_validate: If `True` (default), run metadata validators after generation.
640            Pass `False` (`--no-validate`) to skip.
641        with_dependency_dump: If `True` (default), generate `dependencies.json`
642            for Python connectors.  Pass `False` (`--no-dependency-dump`) to skip.
643        with_sbom: If `True` (default), generate `spdx.json` (SBOM) for
644            connectors.  Pass `False` (`--no-sbom`) to skip.
645
646    Returns:
647        A `GenerateResult` describing what was produced.
648    """
649    # --- Load metadata ---
650    if not metadata_file.exists():
651        raise FileNotFoundError(f"Metadata file not found: {metadata_file}")
652
653    raw_metadata: dict[str, Any] = yaml.safe_load(metadata_file.read_text())
654    metadata_data: dict[str, Any] = raw_metadata.get("data", {})
655
656    connector_name = metadata_data.get("dockerRepository", "unknown").replace(
657        "airbyte/", ""
658    )
659    version = metadata_data.get("dockerImageTag", "unknown")
660
661    # --- Resolve output directory ---
662    if output_dir is None:
663        output_dir = Path(
664            tempfile.mkdtemp(prefix=f"connector-artifacts-{connector_name}-{version}-")
665        )
666    output_dir.mkdir(parents=True, exist_ok=True)
667
668    result = GenerateResult(
669        connector_name=connector_name,
670        version=version,
671        docker_image=docker_image,
672        output_dir=str(output_dir),
673        dry_run=dry_run,
674    )
675
676    if dry_run:
677        logger.info("[DRY RUN] Would generate artifacts to %s", output_dir)
678        result.artifacts_written = [
679            "metadata.yaml",
680            "icon.svg",
681            "doc.md",
682            "cloud.json",
683            "oss.json",
684            "manifest.yaml (if present)",
685            "components.zip (if components.py present)",
686            "components.zip.sha256 (if components.py present)",
687            f"version={version}",
688        ]
689        if with_sbom:
690            result.artifacts_written.append(SBOM_FILE_NAME)
691        if with_dependency_dump:
692            result.artifacts_written.append("dependencies.json (if Python connector)")
693        return result
694
695    # --- Prepare metadata output ---
696    metadata_out = output_dir / "metadata.yaml"
697    result.artifacts_written.append("metadata.yaml")
698
699    # --- Enrich metadata with git info *before* building registry entries so
700    #     that `generated.git` propagates into `cloud.json` / `oss.json`. ---
701    raw_metadata = _enrich_metadata_git_info(raw_metadata, metadata_file)
702
703    # --- Generate SBOM from the connector Docker image ---
704    sbom_generated = False
705    if not with_sbom:
706        logger.info("SBOM generation disabled via --no-sbom.")
707    else:
708        try:
709            sbom_path = generate_sbom(docker_image, output_dir)
710        except RuntimeError as exc:
711            logger.warning("SBOM generation failed (non-fatal): %s", exc)
712        except (FileNotFoundError, subprocess.TimeoutExpired):
713            logger.warning("Docker not available or SBOM generation timed out.")
714        else:
715            result.artifacts_written.append(SBOM_FILE_NAME)
716            sbom_generated = True
717            logger.info("Generated SBOM: %s", sbom_path)
718
719    # --- Enrich metadata with SBOM URL ---
720    raw_metadata = _enrich_metadata_sbom_url(
721        raw_metadata, sbom_generated=sbom_generated
722    )
723
724    # --- Run docker spec for cloud and oss ---
725    specs: dict[str, dict[str, Any]] = {}
726    for mode in VALID_REGISTRIES:
727        try:
728            specs[mode] = _run_docker_spec(docker_image, mode)
729            logger.info("Got %s spec from docker image %s", mode, docker_image)
730        except RuntimeError as exc:
731            error_msg = f"Failed to get {mode} spec: {exc}"
732            logger.error(error_msg)
733            result.errors.append(error_msg)
734
735    # --- Generate dependencies.json for Python connectors ---
736    # This must happen *before* building registry entries so that the
737    # local dependencies data can be used for packageInfo without a GCS
738    # round-trip.
739    local_dependencies: dict[str, Any] | None = None
740    if not with_dependency_dump:
741        logger.info("Dependency generation disabled via --no-dependency-dump.")
742    elif _is_python_connector(metadata_data):
743        logger.info("Python connector detected — generating dependencies.json")
744        local_dependencies = generate_python_dependencies_file(
745            metadata_data=metadata_data,
746            docker_image=docker_image,
747            output_dir=output_dir,
748        )
749        if local_dependencies is not None:
750            result.artifacts_written.append(CONNECTOR_DEPENDENCY_FILE_NAME)
751    else:
752        logger.info(
753            "Non-Python connector (%s) — skipping dependencies.json generation.",
754            connector_name,
755        )
756
757    # --- Check if registryOverrides enable each registry ---
758    registry_overrides = metadata_data.get("registryOverrides", {})
759
760    # --- Generate registry entries (cloud.json, oss.json) ---
761    for registry_type in VALID_REGISTRIES:
762        overrides = registry_overrides.get(registry_type, {})
763        enabled = overrides.get("enabled", False)
764        if not enabled:
765            logger.info(
766                "Registry type %s is not enabled for %s, skipping %s.json generation.",
767                registry_type,
768                connector_name,
769                registry_type,
770            )
771            continue
772
773        spec = specs.get(registry_type)
774        if spec is None:
775            error_msg = (
776                f"Cannot generate {registry_type}.json: no spec available "
777                f"(docker spec for {registry_type} failed or was not run)."
778            )
779            result.errors.append(error_msg)
780            continue
781
782        registry_entry = _build_registry_entry(
783            metadata_data,
784            registry_type,
785            spec,
786            local_dependencies=local_dependencies,
787        )
788
789        out_path = output_dir / f"{registry_type}.json"
790        out_path.write_text(
791            json.dumps(registry_entry, indent=2, sort_keys=True, default=_json_serial)
792            + "\n"
793        )
794        result.artifacts_written.append(f"{registry_type}.json")
795        logger.info("Wrote %s", out_path)
796
797    # --- Copy icon.svg (sibling of metadata.yaml in the connector directory) ---
798    icon_source = metadata_file.parent / "icon.svg"
799    if icon_source.is_file():
800        icon_out = output_dir / "icon.svg"
801        shutil.copy2(icon_source, icon_out)
802        result.artifacts_written.append("icon.svg")
803        logger.info("Wrote %s", icon_out)
804    else:
805        logger.warning("No icon.svg found at %s.", icon_source)
806        result.errors.append("Icon file is missing.")
807
808    # --- Copy doc.md (derived from documentationUrl in metadata) ---
809    if repo_root is None:
810        # Infer repo root by walking up from metadata_file looking for .git
811        # Note: .git can be a directory (normal clone) or a file (git worktree)
812        # Resolve to absolute path first so the walk-up works with relative paths.
813        candidate = metadata_file.resolve().parent
814        while candidate != candidate.parent:
815            git_indicator = candidate / ".git"
816            if git_indicator.is_dir() or git_indicator.is_file():
817                repo_root = candidate
818                break
819            candidate = candidate.parent
820
821    if repo_root is not None:
822        doc_source = _resolve_doc_path(metadata_data, repo_root)
823        if doc_source is not None and doc_source.is_file():
824            doc_out = output_dir / DOC_FILE_NAME
825            shutil.copy2(doc_source, doc_out)
826            result.artifacts_written.append(DOC_FILE_NAME)
827            logger.info("Wrote %s (from %s)", doc_out, doc_source)
828        else:
829            error_msg = (
830                f"Documentation file not found: {doc_source}. "
831                f"Derived from documentationUrl in metadata."
832            )
833            logger.error(error_msg)
834            result.errors.append(error_msg)
835    else:
836        error_msg = "Cannot resolve doc.md: repo root not found."
837        logger.error(error_msg)
838        result.errors.append(error_msg)
839
840    # --- Copy manifest.yaml (from connector root, if present) ---
841    connector_dir = metadata_file.parent
842    manifest_source = connector_dir / MANIFEST_FILE_NAME
843    components_sha256: str | None = None
844    if manifest_source.is_file():
845        manifest_out = output_dir / MANIFEST_FILE_NAME
846        shutil.copy2(manifest_source, manifest_out)
847        result.artifacts_written.append(MANIFEST_FILE_NAME)
848        logger.info("Wrote %s", manifest_out)
849
850        # --- Generate components.zip if components.py exists ---
851        components_source = connector_dir / COMPONENTS_PY_FILE_NAME
852        if components_source.is_file():
853            zip_path, sha256_path = _create_components_zip(
854                manifest_path=manifest_source,
855                components_path=components_source,
856                output_dir=output_dir,
857            )
858            result.artifacts_written.append(COMPONENTS_ZIP_FILE_NAME)
859            result.artifacts_written.append(COMPONENTS_ZIP_SHA256_FILE_NAME)
860            logger.info("Wrote %s and %s", zip_path, sha256_path)
861            # Read back the SHA256 for metadata enrichment
862            components_sha256 = sha256_path.read_text().strip()
863    else:
864        logger.info(
865            "No manifest.yaml at %s — skipping manifest artifacts.", manifest_source
866        )
867
868    # --- Enrich metadata with components SHA (after zip creation) ---
869    raw_metadata = _enrich_metadata_components_sha(raw_metadata, components_sha256)
870
871    # --- Write final enriched metadata.yaml ---
872    # Use sort_keys=True to match the legacy pipeline's alphabetical key ordering.
873    # After Registry 2.0 launches we are free to change the key ordering.
874    metadata_out.write_text(
875        yaml.dump(raw_metadata, default_flow_style=False, sort_keys=True)
876    )
877    logger.info("Wrote enriched %s", metadata_out)
878
879    # --- Write version marker file (version=<semver>) ---
880    # This zero-byte file is used by the compile step as a fast-check marker.
881    # Including it in the generated artifacts means `latest/` gets the marker
882    # for free via a recursive copy, removing the need for a separate write.
883    marker_file = output_dir / f"version={version}"
884    marker_file.write_bytes(b"")
885    result.artifacts_written.append(f"version={version}")
886    logger.info("Wrote version marker %s", marker_file)
887
888    # --- Validate metadata (after generation) ---
889    if with_validate:
890        logger.info("Running post-generation validation...")
891        doc_path: str | None = None
892        if repo_root is not None:
893            resolved = _resolve_doc_path(metadata_data, repo_root)
894            doc_path = str(resolved) if resolved else None
895        validation = validate_metadata(
896            metadata_data=metadata_data,
897            opts=ValidateOptions(docs_path=doc_path),
898        )
899        if not validation.passed:
900            for err in validation.errors:
901                logger.error("Validation error: %s", err)
902            result.validation_errors = validation.errors
903        else:
904            logger.info("Validation passed (%d validators).", validation.validators_run)
905
906    return result

Generate all version artifacts for a connector release.

Artifacts are enriched with git commit info, SBOM URL, and (when applicable) components SHA before writing. Validation is run after generation by default.

Arguments:
  • metadata_file: Path to the connector's metadata.yaml.
  • docker_image: Docker image to run spec against (e.g. airbyte/source-faker:6.2.38).
  • output_dir: Directory to write artifacts to. If None, a temp directory is created.
  • repo_root: Root of the Airbyte repo checkout (for resolving doc.md). If None, inferred by walking up from metadata_file.
  • dry_run: If True, report what would be generated without writing or running docker.
  • with_validate: If True (default), run metadata validators after generation. Pass False (--no-validate) to skip.
  • with_dependency_dump: If True (default), generate dependencies.json for Python connectors. Pass False (--no-dependency-dump) to skip.
  • with_sbom: If True (default), generate spdx.json (SBOM) for connectors. Pass False (--no-sbom) to skip.
Returns:

A GenerateResult describing what was produced.

def get_connector_metadata( repo_path: pathlib.Path, connector_name: str) -> ConnectorMetadata:
41def get_connector_metadata(repo_path: Path, connector_name: str) -> ConnectorMetadata:
42    """Read connector metadata from metadata.yaml.
43
44    Args:
45        repo_path: Path to the Airbyte monorepo.
46        connector_name: The connector technical name (e.g., 'source-github').
47
48    Returns:
49        ConnectorMetadata object with the connector's metadata.
50
51    Raises:
52        FileNotFoundError: If the connector directory or metadata file doesn't exist.
53    """
54    connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name
55    if not connector_dir.exists():
56        raise FileNotFoundError(f"Connector directory not found: {connector_dir}")
57
58    metadata_file = connector_dir / METADATA_FILE_NAME
59    if not metadata_file.exists():
60        raise FileNotFoundError(f"Metadata file not found: {metadata_file}")
61
62    with open(metadata_file) as f:
63        metadata = yaml.safe_load(f)
64
65    data = metadata.get("data", {})
66    return ConnectorMetadata(
67        name=connector_name,
68        docker_repository=data.get("dockerRepository", f"airbyte/{connector_name}"),
69        docker_image_tag=data.get("dockerImageTag", "unknown"),
70        support_level=data.get("supportLevel"),
71        definition_id=data.get("definitionId"),
72    )

Read connector metadata from metadata.yaml.

Arguments:
  • repo_path: Path to the Airbyte monorepo.
  • connector_name: The connector technical name (e.g., 'source-github').
Returns:

ConnectorMetadata object with the connector's metadata.

Raises:
  • FileNotFoundError: If the connector directory or metadata file doesn't exist.
def get_gcs_publish_path(connector_name: str, artifact_type: str, version: str = 'latest') -> str:
278def get_gcs_publish_path(
279    connector_name: str,
280    artifact_type: str,
281    version: str = LATEST_GCS_FOLDER_NAME,
282) -> str:
283    """Compute the GCS path for a connector artifact for publishing.
284
285    All connectors use the airbyte/{connector_name} convention.
286    """
287    artifact_files = {
288        "metadata": METADATA_FILE_NAME,
289        "spec": "spec.json",
290        "icon": "icon.svg",
291        "doc": "doc.md",
292    }
293
294    if artifact_type not in artifact_files:
295        raise ValueError(
296            f"Unknown artifact type: {artifact_type}. "
297            f"Valid types are: {', '.join(artifact_files.keys())}"
298        )
299
300    file_name = artifact_files[artifact_type]
301    return f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{file_name}"

Compute the GCS path for a connector artifact for publishing.

All connectors use the airbyte/{connector_name} convention.

def get_registry( store: RegistryStore) -> Registry:
202def get_registry(store: RegistryStore) -> Registry:
203    """Factory for obtaining the right store implementation."""
204
205    if store.store_type == StoreType.CORAL:
206        from airbyte_ops_mcp.registry.coral_registry_store import CoralRegistry
207
208        return CoralRegistry(store)
209
210    if store.store_type == StoreType.SONAR:
211        from airbyte_ops_mcp.registry.sonar_registry_store import SonarRegistry
212
213        return SonarRegistry(store)
214
215    # defensive: StoreType is an Enum, but keep this for readability
216    raise ValueError(f"Unknown store type: {store.store_type}")

Factory for obtaining the right store implementation.

def get_registry_entry( connector_name: str, bucket_name: str, version: str = 'latest') -> dict[str, typing.Any]:
43def get_registry_entry(
44    connector_name: str,
45    bucket_name: str,
46    version: str = LATEST_GCS_FOLDER_NAME,
47) -> dict[str, Any]:
48    """Get a connector's registry entry from GCS.
49
50    Reads metadata for a connector from the registry stored in GCS.
51
52    Args:
53        connector_name: The connector name (e.g., "source-faker", "destination-postgres")
54        bucket_name: Name of the GCS bucket containing the registry
55        version: Version folder name (e.g., "latest", "1.2.3")
56
57    Returns:
58        dict: The connector's metadata as a dictionary
59
60    Raises:
61        ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure
62        FileNotFoundError: If the connector metadata is not found in the registry
63        yaml.YAMLError: If the metadata file contains invalid YAML syntax
64    """
65    storage_client = get_gcs_storage_client()
66    bucket = storage_client.bucket(bucket_name)
67
68    # Construct the path to the metadata file
69    # Pattern: metadata/airbyte/{connector_name}/{version}/metadata.yaml
70    blob_path = (
71        f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{METADATA_FILE_NAME}"
72    )
73    blob = bucket.blob(blob_path)
74
75    logger.info(f"Reading registry entry for {connector_name} from {blob_path}")
76
77    # Read the file
78    content = safe_read_gcs_file(blob)
79    if content is None:
80        raise FileNotFoundError(
81            f"Connector metadata not found in registry: {connector_name}. "
82            f"Checked path: {blob_path}"
83        )
84
85    # Parse YAML
86    try:
87        metadata = yaml.safe_load(content)
88        if metadata is None or not isinstance(metadata, dict):
89            raise ValueError(f"Metadata file {blob_path} has an invalid structure")
90        return metadata
91    except yaml.YAMLError as e:
92        logger.error(
93            "Failed to parse metadata for %s from %s: %s",
94            connector_name,
95            blob_path,
96            e,
97        )
98        raise

Get a connector's registry entry from GCS.

Reads metadata for a connector from the registry stored in GCS.

Arguments:
  • connector_name: The connector name (e.g., "source-faker", "destination-postgres")
  • bucket_name: Name of the GCS bucket containing the registry
  • version: Version folder name (e.g., "latest", "1.2.3")
Returns:

dict: The connector's metadata as a dictionary

Raises:
  • ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure
  • FileNotFoundError: If the connector metadata is not found in the registry
  • yaml.YAMLError: If the metadata file contains invalid YAML syntax
def get_registry_spec( connector_name: str, bucket_name: str, version: str = 'latest') -> dict[str, typing.Any]:
101def get_registry_spec(
102    connector_name: str,
103    bucket_name: str,
104    version: str = LATEST_GCS_FOLDER_NAME,
105) -> dict[str, Any]:
106    """Get a connector's spec from GCS.
107
108    Reads the connector specification from the registry stored in GCS.
109
110    Args:
111        connector_name: The connector name (e.g., "source-faker", "destination-postgres")
112        bucket_name: Name of the GCS bucket containing the registry
113        version: Version folder name (e.g., "latest", "1.2.3")
114
115    Returns:
116        dict: The connector's spec as a dictionary
117
118    Raises:
119        ValueError: If GCS credentials are not configured, or if the spec is not a JSON object
120        FileNotFoundError: If the connector spec is not found in the registry
121        json.JSONDecodeError: If the spec file contains invalid JSON syntax
122    """
123    storage_client = get_gcs_storage_client()
124    bucket = storage_client.bucket(bucket_name)
125
126    # Construct the path to the spec file
127    # Pattern: metadata/airbyte/{connector_name}/{version}/spec.json
128    blob_path = f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{SPEC_FILE_NAME}"
129    blob = bucket.blob(blob_path)
130
131    logger.info(f"Reading spec for {connector_name} from {blob_path}")
132
133    # Read the file
134    content = safe_read_gcs_file(blob)
135    if content is None:
136        raise FileNotFoundError(
137            f"Connector spec not found in registry: {connector_name}. "
138            f"Checked path: {blob_path}"
139        )
140
141    # Parse JSON
142    try:
143        spec = json.loads(content)
144        if spec is None or not isinstance(spec, dict):
145            raise ValueError(
146                f"Spec file for {connector_name} at {blob_path} is not a JSON object"
147            )
148        return spec
149    except json.JSONDecodeError as e:
150        logger.error(
151            "Failed to parse spec for %s from %s: %s",
152            connector_name,
153            blob_path,
154            e,
155        )
156        raise

Get a connector's spec from GCS.

Reads the connector specification from the registry stored in GCS.

Arguments:
  • connector_name: The connector name (e.g., "source-faker", "destination-postgres")
  • bucket_name: Name of the GCS bucket containing the registry
  • version: Version folder name (e.g., "latest", "1.2.3")
Returns:

dict: The connector's spec as a dictionary

Raises:
  • ValueError: If GCS credentials are not configured, or if the spec is not a JSON object
  • FileNotFoundError: If the connector spec is not found in the registry
  • json.JSONDecodeError: If the spec file contains invalid JSON syntax
def is_valid_for_progressive_rollout(version: str) -> bool:
75def is_valid_for_progressive_rollout(version: str) -> bool:
76    """Check if a version string is valid for progressive rollout.
77
78    Currently rejects only `-preview` versions.  All other semver
79    versions (GA and RC alike) are accepted.  This gate is intentionally
80    kept generic so it can be tightened or turned into a no-op in the
81    future.
82
83    Args:
84        version: The version string to check.
85
86    Returns:
87        True if the version may be used in a progressive rollout,
88        False if the version format is explicitly disallowed.
89    """
90    return "-preview." not in version

Check if a version string is valid for progressive rollout.

Currently rejects only -preview versions. All other semver versions (GA and RC alike) are accepted. This gate is intentionally kept generic so it can be tightened or turned into a no-op in the future.

Arguments:
  • version: The version string to check.
Returns:

True if the version may be used in a progressive rollout, False if the version format is explicitly disallowed.

def list_connector_versions(connector_name: str, bucket_name: str) -> list[str]:
325def list_connector_versions(connector_name: str, bucket_name: str) -> list[str]:
326    """List all versions of a connector in the registry.
327
328    Scans the GCS bucket to find all versions of a specific connector.
329
330    Args:
331        connector_name: The connector name (e.g., "source-faker")
332        bucket_name: Name of the GCS bucket containing the registry
333
334    Returns:
335        list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate')
336
337    Raises:
338        ValueError: If GCS credentials are not configured
339    """
340    storage_client = get_gcs_storage_client()
341    bucket = storage_client.bucket(bucket_name)
342
343    # List all blobs matching the pattern: metadata/airbyte/{connector_name}/*/metadata.yaml
344    glob_pattern = f"{METADATA_FOLDER}/airbyte/{connector_name}/*/{METADATA_FILE_NAME}"
345    logger.info(f"Listing versions for {connector_name} with pattern: {glob_pattern}")
346
347    try:
348        blobs = bucket.list_blobs(match_glob=glob_pattern)
349    except Exception as e:
350        logger.error(f"Error listing blobs in bucket {bucket_name}: {e}")
351        raise
352
353    # Extract versions from blob paths
354    # Path format: metadata/airbyte/{connector-name}/{version}/metadata.yaml
355    versions: set[str] = set()
356    for blob in blobs:
357        path_parts = blob.name.split("/")
358        # Path should be: metadata / airbyte / connector-name / version / metadata.yaml
359        if len(path_parts) >= 5:
360            version = path_parts[3]
361            # Exclude special folders
362            if version not in ("latest", "release_candidate"):
363                versions.add(version)
364
365    return sorted(versions)

List all versions of a connector in the registry.

Scans the GCS bucket to find all versions of a specific connector.

Arguments:
  • connector_name: The connector name (e.g., "source-faker")
  • bucket_name: Name of the GCS bucket containing the registry
Returns:

list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate')

Raises:
  • ValueError: If GCS credentials are not configured
def list_registry_connectors(bucket_name: str) -> list[str]:
159def list_registry_connectors(bucket_name: str) -> list[str]:
160    """List all connectors in the registry.
161
162    Scans the GCS bucket to find all connectors that have metadata files.
163
164    Args:
165        bucket_name: Name of the GCS bucket containing the registry
166
167    Returns:
168        list[str]: Sorted list of connector names
169
170    Raises:
171        ValueError: If GCS credentials are not configured
172    """
173    storage_client = get_gcs_storage_client()
174    bucket = storage_client.bucket(bucket_name)
175
176    # List all blobs matching the pattern: metadata/airbyte/*/latest/metadata.yaml
177    glob_pattern = (
178        f"{METADATA_FOLDER}/airbyte/*/{LATEST_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}"
179    )
180    logger.info(f"Listing connectors with pattern: {glob_pattern}")
181
182    try:
183        blobs = bucket.list_blobs(match_glob=glob_pattern)
184    except Exception as e:
185        logger.error(f"Error listing blobs in bucket {bucket_name}: {e}")
186        raise
187
188    # Extract connector names from blob paths
189    # Path format: metadata/airbyte/{connector-name}/latest/metadata.yaml
190    connector_names: set[str] = set()
191    for blob in blobs:
192        path_parts = blob.name.split("/")
193        # Path should be: metadata / airbyte / connector-name / latest / metadata.yaml
194        if len(path_parts) >= 5:
195            connector_name = path_parts[2]
196            connector_names.add(connector_name)
197
198    return sorted(connector_names)

List all connectors in the registry.

Scans the GCS bucket to find all connectors that have metadata files.

Arguments:
  • bucket_name: Name of the GCS bucket containing the registry
Returns:

list[str]: Sorted list of connector names

Raises:
  • ValueError: If GCS credentials are not configured
def list_registry_connectors_filtered( bucket_name: str, *, support_level: SupportLevel | None = None, min_support_level: SupportLevel | None = None, connector_type: ConnectorType | None = None, language: ConnectorLanguage | None = None, prefix: str = '') -> list[str]:
201def list_registry_connectors_filtered(
202    bucket_name: str,
203    *,
204    support_level: SupportLevel | None = None,
205    min_support_level: SupportLevel | None = None,
206    connector_type: ConnectorType | None = None,
207    language: ConnectorLanguage | None = None,
208    prefix: str = "",
209) -> list[str]:
210    """List connectors from the compiled cloud registry index with filtering.
211
212    When any filter is applied, reads the compiled `cloud_registry.json` index
213    instead of globbing individual metadata blobs. This is significantly faster
214    because the index is a single JSON file containing all connector entries.
215
216    When no filters are applied, falls back to the existing glob-based search
217    which captures all connectors (including OSS-only connectors not in the
218    Cloud index).
219
220    Args:
221        bucket_name: Name of the GCS bucket containing the registry.
222        support_level: Exact support level to match (e.g., `SupportLevel.CERTIFIED`).
223        min_support_level: Minimum support level threshold. Returns connectors
224            at or above this level.
225        connector_type: Filter by connector type (`ConnectorType.SOURCE` or
226            `ConnectorType.DESTINATION`).
227        language: Filter by implementation language (e.g., `ConnectorLanguage.PYTHON`).
228        prefix: Optional bucket prefix (e.g., `"aj-test100"`).
229
230    Returns:
231        Sorted list of connector technical names (e.g., `"source-github"`).
232
233    Raises:
234        ValueError: If `support_level` and `min_support_level` are both provided.
235    """
236    has_filters = any([support_level, min_support_level, connector_type, language])
237
238    if not has_filters:
239        return list_registry_connectors(bucket_name=bucket_name)
240
241    if support_level and min_support_level:
242        raise ValueError(
243            "Cannot specify both `support_level` and `min_support_level`. "
244            "Use `support_level` for an exact match or `min_support_level` for a threshold."
245        )
246
247    entries = _read_cloud_registry_index(bucket_name=bucket_name, prefix=prefix)
248
249    # Apply support_level exact match
250    if support_level:
251        entries = [e for e in entries if e.get("supportLevel") == support_level]
252
253    # Apply min_support_level threshold
254    if min_support_level:
255        threshold = min_support_level.precedence
256        known_levels = {m.value for m in SupportLevel}
257        entries = [
258            e
259            for e in entries
260            if e.get("supportLevel")
261            and e["supportLevel"] in known_levels
262            and SupportLevel(e["supportLevel"]).precedence >= threshold
263        ]
264
265    # Apply connector_type filter
266    if connector_type == ConnectorType.SOURCE:
267        entries = [e for e in entries if "sourceDefinitionId" in e]
268    elif connector_type == ConnectorType.DESTINATION:
269        entries = [e for e in entries if "destinationDefinitionId" in e]
270
271    # Apply language filter
272    if language:
273        entries = [e for e in entries if e.get("language") == language]
274
275    # Extract connector names from dockerRepository (e.g., "airbyte/source-github" -> "source-github")
276    names: set[str] = set()
277    for entry in entries:
278        docker_repo = entry.get("dockerRepository", "")
279        if "/" in docker_repo:
280            names.add(docker_repo.split("/", 1)[1])
281        elif docker_repo:
282            names.add(docker_repo)
283
284    return sorted(names)

List connectors from the compiled cloud registry index with filtering.

When any filter is applied, reads the compiled cloud_registry.json index instead of globbing individual metadata blobs. This is significantly faster because the index is a single JSON file containing all connector entries.

When no filters are applied, falls back to the existing glob-based search which captures all connectors (including OSS-only connectors not in the Cloud index).

Arguments:
  • bucket_name: Name of the GCS bucket containing the registry.
  • support_level: Exact support level to match (e.g., SupportLevel.CERTIFIED).
  • min_support_level: Minimum support level threshold. Returns connectors at or above this level.
  • connector_type: Filter by connector type (ConnectorType.SOURCE or ConnectorType.DESTINATION).
  • language: Filter by implementation language (e.g., ConnectorLanguage.PYTHON).
  • prefix: Optional bucket prefix (e.g., "aj-test100").
Returns:

Sorted list of connector technical names (e.g., "source-github").

Raises:
  • ValueError: If support_level and min_support_level are both provided.
def publish_connector_metadata( connector_name: str, metadata: dict[str, typing.Any], bucket_name: str, version: str, update_latest: bool = True, dry_run: bool = False) -> MetadataPublishResult:
304def publish_connector_metadata(
305    connector_name: str,
306    metadata: dict[str, Any],
307    bucket_name: str,
308    version: str,
309    update_latest: bool = True,
310    dry_run: bool = False,
311) -> MetadataPublishResult:
312    """Publish connector metadata to GCS.
313
314    Uploads the metadata to the registry bucket at a versioned path, and optionally
315    also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading
316    unchanged files.
317
318    Requires GCS_CREDENTIALS environment variable to be set.
319    """
320    if not isinstance(metadata, dict):
321        raise ValueError("Metadata must be a dictionary")
322
323    if "data" not in metadata:
324        raise ValueError("Metadata must contain 'data' field")
325
326    # Construct GCS paths using airbyte/{connector_name} convention
327    versioned_blob_path = get_gcs_publish_path(connector_name, "metadata", version)
328    latest_blob_path = get_gcs_publish_path(
329        connector_name, "metadata", LATEST_GCS_FOLDER_NAME
330    )
331
332    if dry_run:
333        message = f"[DRY RUN] Would upload metadata to gs://{bucket_name}/{versioned_blob_path}"
334        if update_latest:
335            message += f" and gs://{bucket_name}/{latest_blob_path}"
336        logger.info(message)
337        return MetadataPublishResult(
338            connector_name=connector_name,
339            version=version,
340            bucket_name=bucket_name,
341            versioned_path=versioned_blob_path,
342            latest_path=latest_blob_path if update_latest else None,
343            versioned_uploaded=False,
344            latest_uploaded=False,
345            status="dry-run",
346            message=message,
347        )
348
349    # Get GCS client and bucket
350    storage_client = get_gcs_storage_client()
351    bucket = storage_client.bucket(bucket_name)
352
353    # Write metadata to temp file
354    with tempfile.NamedTemporaryFile(
355        mode="w", suffix=".yaml", delete=False
356    ) as tmp_file:
357        yaml.dump(metadata, tmp_file)
358        tmp_path = Path(tmp_file.name)
359
360    try:
361        # Upload versioned file
362        versioned_uploaded, _ = upload_file_if_changed(
363            local_file_path=tmp_path,
364            bucket=bucket,
365            blob_path=versioned_blob_path,
366            disable_cache=True,
367        )
368
369        if versioned_uploaded:
370            logger.info(
371                f"Uploaded metadata for {connector_name} v{version} to {versioned_blob_path}"
372            )
373        else:
374            logger.info(
375                f"Versioned metadata for {connector_name} v{version} is already up to date"
376            )
377
378        # Optionally update latest pointer
379        latest_uploaded = False
380        if update_latest:
381            latest_uploaded, _ = upload_file_if_changed(
382                local_file_path=tmp_path,
383                bucket=bucket,
384                blob_path=latest_blob_path,
385                disable_cache=True,
386            )
387            if latest_uploaded:
388                logger.info(f"Updated latest pointer for {connector_name}")
389            else:
390                logger.info(
391                    f"Latest pointer for {connector_name} is already up to date"
392                )
393    finally:
394        # Clean up temp file even if upload fails
395        tmp_path.unlink(missing_ok=True)
396
397    # Determine status
398    if versioned_uploaded or latest_uploaded:
399        status = "success"
400        message = f"Published metadata for {connector_name} v{version}"
401        if versioned_uploaded:
402            message += f" to {versioned_blob_path}"
403        if latest_uploaded:
404            message += " and updated latest"
405    else:
406        status = "already-up-to-date"
407        message = f"Metadata for {connector_name} v{version} is already up to date"
408
409    return MetadataPublishResult(
410        connector_name=connector_name,
411        version=version,
412        bucket_name=bucket_name,
413        versioned_path=versioned_blob_path,
414        latest_path=latest_blob_path if update_latest else None,
415        versioned_uploaded=versioned_uploaded,
416        latest_uploaded=latest_uploaded,
417        status=status,
418        message=message,
419    )

Publish connector metadata to GCS.

Uploads the metadata to the registry bucket at a versioned path, and optionally also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading unchanged files.

Requires GCS_CREDENTIALS environment variable to be set.

def publish_version_artifacts( connector_name: str, version: str, artifacts_dir: pathlib.Path, store: RegistryStore, dry_run: bool = False, with_validate: bool = True) -> PublishArtifactsResult:
115def publish_version_artifacts(
116    connector_name: str,
117    version: str,
118    artifacts_dir: Path,
119    store: RegistryStore,
120    dry_run: bool = False,
121    with_validate: bool = True,
122) -> PublishArtifactsResult:
123    """Publish locally generated artifacts to a GCS registry bucket.
124
125    Uses `gcsfs.GCSFileSystem` to upload the local *artifacts_dir* to the
126    versioned path inside the target GCS bucket.
127
128    The target GCS path is:
129        `gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/`
130
131    Before uploading, this function validates that the `connector_name`
132    (derived from the connector directory) matches the `dockerRepository`
133    declared in `metadata.yaml`.  A mismatch would cause the registry
134    compile step to see duplicate definition-ID entries and fail.
135
136    Args:
137        connector_name: Connector name (e.g. `source-faker`).
138        version: Version string (e.g. `6.2.38`).
139        artifacts_dir: Local directory containing artifacts from `generate`.
140        store: Parsed store target containing bucket, prefix, and stage info.
141        dry_run: If `True`, report what would be uploaded without writing.
142        with_validate: If `True` (default), validate metadata before uploading.
143            Pass `False` (`--no-validate`) to skip.
144
145    Returns:
146        A `PublishArtifactsResult` describing what was published.
147
148    Raises:
149        ValueError: If the connector directory name does not match
150            `dockerRepository` in the generated metadata.
151    """
152    if not artifacts_dir.is_dir():
153        raise FileNotFoundError(f"Artifacts directory not found: {artifacts_dir}")
154
155    # Fail fast if the connector directory name doesn't match dockerRepository.
156    # A mismatch would publish artifacts under the wrong GCS path and corrupt
157    # the registry (duplicate definition-IDs under different directory names).
158    mismatch_error = _check_connector_name_matches_docker_repo(
159        connector_name, artifacts_dir
160    )
161    if mismatch_error:
162        raise ValueError(mismatch_error)
163
164    # Build the GCS destination path
165    bucket_name = store.bucket
166    prefix = store.prefix
167    blob_root = versioned_blob_root(
168        connector_name=connector_name, version=version, store=store
169    )
170    versioned_dest = f"gcs://{bucket_name}/{blob_root}"
171
172    target_label = f"{bucket_name}/{prefix}" if prefix else bucket_name
173    result = PublishArtifactsResult(
174        connector_name=connector_name,
175        version=version,
176        target=target_label,
177        gcs_destination=versioned_dest,
178        dry_run=dry_run,
179    )
180
181    # --- Pre-publish validation ---
182    if with_validate:
183        metadata_file = artifacts_dir / "metadata.yaml"
184        if metadata_file.is_file():
185            raw_metadata = yaml.safe_load(metadata_file.read_text())
186            metadata_data = (raw_metadata or {}).get("data", {})
187            validation = validate_metadata(metadata_data=metadata_data)
188            if not validation.passed:
189                for err in validation.errors:
190                    logger.error("Pre-publish validation error: %s", err)
191                result.validation_errors = validation.errors
192                return result
193            logger.info(
194                "Pre-publish validation passed (%d validators).",
195                validation.validators_run,
196            )
197        else:
198            logger.warning("No metadata.yaml in artifacts dir; skipping validation.")
199
200    # Enumerate local files
201    local_files = sorted(f for f in artifacts_dir.rglob("*") if f.is_file())
202    if not local_files:
203        result.errors.append(f"No files found in {artifacts_dir}.")
204        return result
205
206    _log_progress(
207        "Publishing %d artifacts for %s@%s%s",
208        len(local_files),
209        connector_name,
210        version,
211        versioned_dest,
212    )
213
214    # Build references used by both dry-run and real upload paths
215    deps_file = artifacts_dir / CONNECTOR_DEPENDENCY_FILE_NAME
216    has_deps = deps_file.is_file()
217    deps_gcs_key = dependencies_blob_path(
218        connector_name=connector_name, version=version, store=store
219    )
220
221    sbom_file = artifacts_dir / SBOM_FILE_NAME
222    has_sbom = sbom_file.is_file()
223
224    if dry_run:
225        for f in local_files:
226            rel = f.relative_to(artifacts_dir)
227            result.files_uploaded.append(str(rel))
228            _log_progress("  [DRY RUN] would upload: %s", rel)
229        # Report the dual-load of dependencies.json to connector_dependencies/
230        if has_deps:
231            result.files_uploaded.append(deps_gcs_key)
232            _log_progress(
233                "  [DRY RUN] would also dual-load: %s → gs://%s/%s",
234                CONNECTOR_DEPENDENCY_FILE_NAME,
235                bucket_name,
236                deps_gcs_key,
237            )
238        # Report the separate sbom/ upload
239        if has_sbom:
240            sbom_gcs_key = sbom_blob_path(
241                connector_name=connector_name,
242                version=version,
243                store=store,
244            )
245            result.files_uploaded.append(sbom_gcs_key)
246            _log_progress(
247                "  [DRY RUN] would also upload: %s → gs://%s/%s",
248                SBOM_FILE_NAME,
249                bucket_name,
250                sbom_gcs_key,
251            )
252        return result
253
254    # Authenticate
255    token = get_gcs_credentials_token()
256    fs = gcsfs.GCSFileSystem(token=token)
257
258    # Strip gcs:// prefix for gcsfs path
259    dest_path = versioned_dest.replace("gcs://", "")
260
261    # Upload all files to the versioned path
262    _log_progress("Uploading to: %s", versioned_dest)
263    for f in local_files:
264        rel = f.relative_to(artifacts_dir)
265        remote_path = f"{dest_path}/{rel}"
266        fs.put(str(f), remote_path)
267        result.files_uploaded.append(str(rel))
268        _log_progress("  Uploaded: %s", rel)
269
270    # Delete remote files that don't exist locally (sync semantics)
271    try:
272        remote_files = fs.ls(dest_path, detail=False)
273        local_rel_paths = {str(f.relative_to(artifacts_dir)) for f in local_files}
274        for remote_file in remote_files:
275            # Skip the directory entry itself if it appears in the listing
276            if remote_file == dest_path:
277                continue
278            # Derive the remote relative path, matching upload semantics
279            if remote_file.startswith(dest_path + "/"):
280                remote_rel = remote_file[len(dest_path) + 1 :]
281            else:
282                remote_rel = remote_file.split("/")[-1]
283            if remote_rel not in local_rel_paths:
284                fs.rm(remote_file)
285                _log_progress("  Deleted stale remote file: %s", remote_rel)
286    except FileNotFoundError:
287        pass  # Destination doesn't exist yet, nothing to clean
288
289    _log_progress("Uploaded %d files to %s", len(local_files), versioned_dest)
290
291    # --- Dual-load dependencies.json to the connector_dependencies/ path ---
292    if not has_deps:
293        logger.debug(
294            "No %s in artifacts dir — skipping dual-load.",
295            CONNECTOR_DEPENDENCY_FILE_NAME,
296        )
297    else:
298        deps_remote = f"{bucket_name}/{deps_gcs_key}"
299        _log_progress(
300            "Dual-loading %s to gs://%s",
301            CONNECTOR_DEPENDENCY_FILE_NAME,
302            deps_remote,
303        )
304        fs.put(str(deps_file), deps_remote)
305        result.files_uploaded.append(deps_gcs_key)
306        _log_progress("  Uploaded %s (dual-load)", CONNECTOR_DEPENDENCY_FILE_NAME)
307
308    # --- Upload SBOM to the dedicated sbom/ path in GCS ---
309    if not has_sbom:
310        logger.debug(
311            "No %s in artifacts dir — skipping SBOM dual-load.",
312            SBOM_FILE_NAME,
313        )
314    else:
315        sbom_gcs_uri = upload_sbom(
316            sbom_path=sbom_file,
317            connector_name=connector_name,
318            version=version,
319            store=store,
320            dry_run=dry_run,
321        )
322        result.files_uploaded.append(
323            sbom_blob_path(
324                connector_name=connector_name,
325                version=version,
326                store=store,
327            ),
328        )
329        _log_progress("Uploaded SBOM to dedicated path: %s", sbom_gcs_uri)
330
331    return result

Publish locally generated artifacts to a GCS registry bucket.

Uses gcsfs.GCSFileSystem to upload the local artifacts_dir to the versioned path inside the target GCS bucket.

The target GCS path is:

gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/

Before uploading, this function validates that the connector_name (derived from the connector directory) matches the dockerRepository declared in metadata.yaml. A mismatch would cause the registry compile step to see duplicate definition-ID entries and fail.

Arguments:
  • connector_name: Connector name (e.g. source-faker).
  • version: Version string (e.g. 6.2.38).
  • artifacts_dir: Local directory containing artifacts from generate.
  • store: Parsed store target containing bucket, prefix, and stage info.
  • dry_run: If True, report what would be uploaded without writing.
  • with_validate: If True (default), validate metadata before uploading. Pass False (--no-validate) to skip.
Returns:

A PublishArtifactsResult describing what was published.

Raises:
  • ValueError: If the connector directory name does not match dockerRepository in the generated metadata.
def purge_latest_dirs( *, store: RegistryStore, connector_name: list[str] | None = None, dry_run: bool = False) -> PurgeLatestResult:
1387def purge_latest_dirs(
1388    *,
1389    store: RegistryStore,
1390    connector_name: list[str] | None = None,
1391    dry_run: bool = False,
1392) -> PurgeLatestResult:
1393    """Delete all `latest/` directories from the registry store.
1394
1395    Discovers connector directories via glob, then deletes each
1396    `latest/` subdirectory in parallel using a thread pool.
1397
1398    Args:
1399        store: Registry store (bucket + optional prefix).
1400        connector_name: If provided, only purge these connectors.
1401        dry_run: If True, report what would be done without deleting.
1402
1403    Returns:
1404        A `PurgeLatestResult` describing what was done.
1405    """
1406    result = PurgeLatestResult(target=store.bucket_root, dry_run=dry_run)
1407
1408    token = get_gcs_credentials_token()
1409    fs = gcsfs.GCSFileSystem(token=token)
1410
1411    base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte"
1412
1413    # Discover latest/ dirs by listing connector directories that contain
1414    # a `latest/` subdirectory.
1415    _log_progress("Discovering latest/ directories...")
1416    base_with_slash = f"{base}/"
1417    if connector_name:
1418        # Check each requested connector for a latest/ dir
1419        seen: set[str] = set()
1420        connectors_with_latest: list[str] = []
1421        for name in connector_name:
1422            if name in seen:
1423                continue
1424            latest_path = f"{base}/{name}/latest"
1425            if fs.exists(latest_path):
1426                connectors_with_latest.append(name)
1427                seen.add(name)
1428    else:
1429        # Glob for all connectors, then filter to those with latest/
1430        all_connector_dirs = fs.glob(f"{base}/*/latest")
1431        seen = set()
1432        connectors_with_latest = []
1433        for path in all_connector_dirs:
1434            # Strip the known base prefix and take the first component
1435            if not path.startswith(base_with_slash):
1436                logger.warning("Could not parse latest path: %s", path)
1437                continue
1438            relative = path[len(base_with_slash) :]
1439            connector = relative.split("/")[0]
1440            if connector and connector not in seen:
1441                connectors_with_latest.append(connector)
1442                seen.add(connector)
1443
1444    result.connectors_found = len(connectors_with_latest)
1445    _log_progress(
1446        "Found %d connectors with latest/ directories",
1447        result.connectors_found,
1448    )
1449
1450    if not connectors_with_latest:
1451        _log_progress("Nothing to purge.")
1452        _log_progress(result.summary())
1453        return result
1454
1455    if dry_run:
1456        for connector in sorted(connectors_with_latest):
1457            _log_progress("  [DRY RUN] Would delete %s/latest/", connector)
1458        result.latest_dirs_deleted = len(connectors_with_latest)
1459        _log_progress(result.summary())
1460        return result
1461
1462    # Delete latest/ dirs in parallel using the shared helper.
1463    def _delete_one(connector: str) -> str | None:
1464        """Delete a single connector's latest/ dir. Returns error string or None."""
1465        try:
1466            _delete_latest_dir(
1467                fs,
1468                store=store,
1469                connector=connector,
1470            )
1471            return None
1472        except Exception as exc:
1473            return f"Failed to delete latest/ for {connector}: {exc}"
1474
1475    _log_progress(
1476        "Deleting %d latest/ directories (max_workers=%d)...",
1477        len(connectors_with_latest),
1478        _PURGE_LATEST_MAX_WORKERS,
1479    )
1480
1481    with ThreadPoolExecutor(max_workers=_PURGE_LATEST_MAX_WORKERS) as pool:
1482        futures = {
1483            pool.submit(_delete_one, c): c for c in sorted(connectors_with_latest)
1484        }
1485        for i, future in enumerate(as_completed(futures), 1):
1486            connector = futures[future]
1487            error = future.result()
1488            if error:
1489                logger.error(error)
1490                result.errors.append(error)
1491            else:
1492                result.latest_dirs_deleted += 1
1493            if i % 100 == 0:
1494                _log_progress("  Deleted %d / %d...", i, len(connectors_with_latest))
1495
1496    _log_progress(result.summary())
1497    return result

Delete all latest/ directories from the registry store.

Discovers connector directories via glob, then deletes each latest/ subdirectory in parallel using a thread pool.

Arguments:
  • store: Registry store (bucket + optional prefix).
  • connector_name: If provided, only purge these connectors.
  • dry_run: If True, report what would be done without deleting.
Returns:

A PurgeLatestResult describing what was done.

def rebuild_registry( source_bucket: str, output_mode: Literal['local', 'gcs', 's3'], output_path_root: str | None = None, gcs_bucket: str | None = None, s3_bucket: str | None = None, dry_run: bool = False, connector_name: list[str] | None = None) -> RebuildResult:
147def rebuild_registry(
148    source_bucket: str,
149    output_mode: OutputMode,
150    output_path_root: str | None = None,
151    gcs_bucket: str | None = None,
152    s3_bucket: str | None = None,
153    dry_run: bool = False,
154    connector_name: list[str] | None = None,
155) -> RebuildResult:
156    """Rebuild the entire registry from a source GCS bucket to an output target.
157
158    Reads all connector metadata blobs from the source GCS bucket and copies
159    them to the output target using fsspec for unified filesystem access.
160
161    The output targets are:
162    - local: Write to a local directory tree.
163    - gcs: Copy to a GCS bucket (must not be the prod bucket).
164    - s3: Copy to an S3 bucket.
165
166    Args:
167        source_bucket: The GCS bucket to read from (typically prod).
168        output_mode: Where to write: "local", "gcs", or "s3".
169        output_path_root: Root path/prefix for output. For local mode, if None
170            creates a temp directory. For GCS/S3, prepended to all blob paths.
171        gcs_bucket: Target GCS bucket name (required if output_mode="gcs").
172        s3_bucket: Target S3 bucket name (required if output_mode="s3").
173        dry_run: If True, report what would be done without writing.
174        connector_name: If provided, only rebuild these connector names
175            (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all.
176
177    Returns:
178        RebuildResult with details of the operation.
179
180    Raises:
181        ValueError: If target is the prod bucket, or required bucket arg is missing.
182    """
183    if output_mode == "gcs" and gcs_bucket:
184        _validate_not_prod_bucket(gcs_bucket)
185
186    # Resolve output root for local mode
187    effective_output_root = output_path_root or ""
188    if output_mode == "local":
189        effective_output_root = _resolve_local_output_root(output_path_root)
190
191    result = RebuildResult(
192        source_bucket=source_bucket,
193        output_mode=output_mode,
194        output_root=effective_output_root,
195        dry_run=dry_run,
196    )
197
198    # Create source filesystem (always GCS)
199    source_fs, gcs_token = _make_source_fs()
200    source_base = f"{source_bucket}/{METADATA_FOLDER}"
201
202    # List files under metadata/ in the source bucket
203    if connector_name:
204        _log_progress(
205            "Listing blobs for %d connectors under gs://%s/...",
206            len(connector_name),
207            source_base,
208        )
209        source_paths: list[str] = []
210        for name in connector_name:
211            connector_prefix = f"{source_base}/airbyte/{name}"
212            found = source_fs.find(connector_prefix)
213            source_paths.extend(found)
214            _log_progress("  %s: %d blobs", name, len(found))
215    else:
216        _log_progress("Listing all blobs under gs://%s/...", source_base)
217        source_paths = source_fs.find(source_base)
218
219    if not source_paths:
220        _log_progress("No blobs found under gs://%s/", source_base)
221        return result
222
223    total_blobs = len(source_paths)
224    _log_progress("Found %d blobs to process", total_blobs)
225
226    # Create output filesystem
227    output_fs, output_base = _make_output_fs(
228        output_mode=output_mode,
229        output_root=effective_output_root,
230        gcs_bucket=gcs_bucket,
231        s3_bucket=s3_bucket,
232        gcs_token=gcs_token,
233    )
234
235    # Collect connector names and compute relative paths for all blobs.
236    bucket_prefix = f"{source_bucket}/"
237    blob_relative_paths: list[str] = []
238    connector_names: set[str] = set()
239
240    for source_path in source_paths:
241        relative_path = source_path
242        if source_path.startswith(bucket_prefix):
243            relative_path = source_path[len(bucket_prefix) :]
244        blob_relative_paths.append(relative_path)
245
246        parts = relative_path.split("/")
247        if len(parts) >= 3:
248            connector_names.add(parts[2])
249
250    if dry_run:
251        result.blobs_copied = total_blobs
252        result.connectors_processed = len(connector_names)
253        _log_progress(
254            "[DRY RUN] Would copy %d blobs (%d connectors)",
255            total_blobs,
256            len(connector_names),
257        )
258        _log_progress(result.summary())
259        return result
260
261    # Use GCS-native server-side copy for GCS→GCS mirrors.
262    if output_mode == "gcs" and gcs_bucket:
263        result = _gcs_native_copy(
264            source_bucket=source_bucket,
265            dest_bucket_name=gcs_bucket,
266            dest_prefix=effective_output_root,
267            blob_relative_paths=blob_relative_paths,
268            connector_names=connector_names,
269            gcs_token=gcs_token,
270            result=result,
271            connector_name_filter=connector_name,
272        )
273        return result
274
275    # Fallback: fsspec-based copy for local and S3 output modes.
276    result = _fsspec_copy(
277        source_fs=source_fs,
278        source_paths=source_paths,
279        output_fs=output_fs,
280        output_base=output_base,
281        output_mode=output_mode,
282        source_bucket=source_bucket,
283        blob_relative_paths=blob_relative_paths,
284        connector_names=connector_names,
285        result=result,
286    )
287    return result

Rebuild the entire registry from a source GCS bucket to an output target.

Reads all connector metadata blobs from the source GCS bucket and copies them to the output target using fsspec for unified filesystem access.

The output targets are:

  • local: Write to a local directory tree.
  • gcs: Copy to a GCS bucket (must not be the prod bucket).
  • s3: Copy to an S3 bucket.
Arguments:
  • source_bucket: The GCS bucket to read from (typically prod).
  • output_mode: Where to write: "local", "gcs", or "s3".
  • output_path_root: Root path/prefix for output. For local mode, if None creates a temp directory. For GCS/S3, prepended to all blob paths.
  • gcs_bucket: Target GCS bucket name (required if output_mode="gcs").
  • s3_bucket: Target S3 bucket name (required if output_mode="s3").
  • dry_run: If True, report what would be done without writing.
  • connector_name: If provided, only rebuild these connector names (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all.
Returns:

RebuildResult with details of the operation.

Raises:
  • ValueError: If target is the prod bucket, or required bucket arg is missing.
def resolve_registry_store( store: str | None = None, connector_name: str | None = None, cwd: pathlib.Path | None = None, default_env: str = 'dev') -> RegistryStore:
229def resolve_registry_store(
230    store: str | None = None,
231    connector_name: str | None = None,
232    cwd: Path | None = None,
233    default_env: str = "dev",
234) -> RegistryStore:
235    """Resolve a `RegistryStore` from CLI inputs.
236
237    All applicable detection methods are evaluated.  Explicit sources
238    (`--store`, then the `AIRBYTE_REGISTRY_STORE` env var) take priority
239    and are returned directly.  When only auto-detected sources remain, they
240    are compared and a `ValueError` is raised if they disagree.
241
242    Priority (highest → lowest):
243
244    1. **Explicit** `--store` argument (e.g. `"coral:dev"`).
245    2. **Environment variable** -- `AIRBYTE_REGISTRY_STORE`.
246    3. **Auto-detected** -- connector name and/or working directory.
247       If both are present and disagree, a `ValueError` is raised.
248
249    Args:
250        store: Explicit store target string (e.g. `"coral:dev"`).
251        connector_name: Optional connector name for auto-detection.
252        cwd: Working directory for repo-based detection.
253        default_env: Environment to use when auto-detecting (default `"dev"`).
254
255    Returns:
256        A fully resolved `RegistryStore`.
257
258    Raises:
259        ValueError: If no detection method succeeds, or if auto-detected
260            methods produce conflicting store types.
261    """
262    # -- Collect all detection results ------------------------------------
263    # Explicit sources (take priority — no conflict checking needed).
264    explicit_target: RegistryStore | None = None
265    explicit_source: str | None = None
266
267    if store is not None:
268        explicit_target = RegistryStore.parse(store)
269        explicit_source = "--store"
270
271    env_store = os.environ.get(REGISTRY_STORE_ENV_VAR)
272    if env_store and explicit_target is None:
273        explicit_target = RegistryStore.parse(env_store)
274        explicit_source = REGISTRY_STORE_ENV_VAR
275
276    # Auto-detected sources (only consulted when no explicit source).
277    auto_detections: dict[str, StoreType] = {}
278
279    if connector_name is not None:
280        auto_detections["connector_name"] = StoreType.get_from_connector_name(
281            connector_name,
282        )
283
284    dir_type = StoreType.detect_from_repo_dir(cwd)
285    if dir_type is not None:
286        auto_detections["working_directory"] = dir_type
287
288    # -- Return explicit source if present --------------------------------
289    if explicit_target is not None:
290        logger.debug(
291            "Using explicit store target from %s: %s:%s",
292            explicit_source,
293            explicit_target.store_type.value,
294            explicit_target.env,
295        )
296        return explicit_target
297
298    # -- No explicit source: resolve from auto-detections -----------------
299    if not auto_detections:
300        raise ValueError(
301            "Cannot determine registry store. "
302            "Provide --store (e.g. 'coral:dev' or 'sonar:prod'), "
303            f"set ${REGISTRY_STORE_ENV_VAR}, "
304            "or run from a recognized repository directory."
305        )
306
307    distinct = set(auto_detections.values())
308
309    if len(distinct) > 1:
310        detail = ", ".join(f"{src}={st.value}" for src, st in auto_detections.items())
311        raise ValueError(
312            f"Conflicting store types detected: {detail}. "
313            "Provide an explicit --store to resolve the ambiguity."
314        )
315
316    resolved_type = distinct.pop()
317    logger.info(
318        "Auto-detected store type '%s' (sources: %s)",
319        resolved_type.value,
320        ", ".join(auto_detections),
321    )
322    return RegistryStore(store_type=resolved_type, env=default_env)

Resolve a RegistryStore from CLI inputs.

All applicable detection methods are evaluated. Explicit sources (--store, then the AIRBYTE_REGISTRY_STORE env var) take priority and are returned directly. When only auto-detected sources remain, they are compared and a ValueError is raised if they disagree.

Priority (highest → lowest):

  1. Explicit --store argument (e.g. "coral:dev").
  2. Environment variable -- AIRBYTE_REGISTRY_STORE.
  3. Auto-detected -- connector name and/or working directory. If both are present and disagree, a ValueError is raised.
Arguments:
  • store: Explicit store target string (e.g. "coral:dev").
  • connector_name: Optional connector name for auto-detection.
  • cwd: Working directory for repo-based detection.
  • default_env: Environment to use when auto-detecting (default "dev").
Returns:

A fully resolved RegistryStore.

Raises:
  • ValueError: If no detection method succeeds, or if auto-detected methods produce conflicting store types.
def strip_rc_suffix(version: str) -> str:
 93def strip_rc_suffix(version: str) -> str:
 94    """Strip the release candidate suffix from a version string.
 95
 96    Args:
 97        version: The version string (e.g., '1.2.3-rc.1').
 98
 99    Returns:
100        The base version without RC suffix (e.g., '1.2.3').
101        Returns the original version if no RC suffix is present.
102    """
103    if "-rc." in version:
104        return version.split("-rc.")[0]
105    return version

Strip the release candidate suffix from a version string.

Arguments:
  • version: The version string (e.g., '1.2.3-rc.1').
Returns:

The base version without RC suffix (e.g., '1.2.3'). Returns the original version if no RC suffix is present.

def unyank_connector_version( connector_name: str, version: str, bucket_name: str, dry_run: bool = False) -> YankResult:
157def unyank_connector_version(
158    connector_name: str,
159    version: str,
160    bucket_name: str,
161    dry_run: bool = False,
162) -> YankResult:
163    """Remove the yank marker from a connector version.
164
165    Deletes the version-yank.yml file at:
166        metadata/airbyte/{connector_name}/{version}/version-yank.yml
167
168    Args:
169        connector_name: The connector name (e.g., "source-faker").
170        version: The version to unyank (e.g., "1.2.3").
171        bucket_name: The GCS bucket name.
172        dry_run: If True, report what would be done without deleting.
173
174    Returns:
175        YankResult with details of the operation.
176    """
177    yank_path = _get_yank_blob_path(connector_name, version)
178
179    storage_client = get_gcs_storage_client()
180    bucket = storage_client.bucket(bucket_name)
181
182    # Check if yank marker exists
183    yank_blob = bucket.blob(yank_path)
184    if not yank_blob.exists():
185        return YankResult(
186            connector_name=connector_name,
187            version=version,
188            bucket_name=bucket_name,
189            action="unyank",
190            success=False,
191            message=f"Version {version} of {connector_name} is not yanked.",
192            dry_run=dry_run,
193        )
194
195    if dry_run:
196        return YankResult(
197            connector_name=connector_name,
198            version=version,
199            bucket_name=bucket_name,
200            action="unyank",
201            success=True,
202            message=f"[DRY RUN] Would unyank {connector_name} {version}.",
203            dry_run=True,
204        )
205
206    # Delete the yank marker
207    yank_blob.delete()
208
209    logger.info("Unyanked %s version %s in %s", connector_name, version, bucket_name)
210
211    return YankResult(
212        connector_name=connector_name,
213        version=version,
214        bucket_name=bucket_name,
215        action="unyank",
216        success=True,
217        message=f"Successfully unyanked {connector_name} {version}.",
218    )

Remove the yank marker from a connector version.

Deletes the version-yank.yml file at: metadata/airbyte/{connector_name}/{version}/version-yank.yml

Arguments:
  • connector_name: The connector name (e.g., "source-faker").
  • version: The version to unyank (e.g., "1.2.3").
  • bucket_name: The GCS bucket name.
  • dry_run: If True, report what would be done without deleting.
Returns:

YankResult with details of the operation.

def validate_metadata( metadata_data: dict[str, typing.Any], opts: ValidateOptions | None = None) -> ValidationResult:
270def validate_metadata(
271    metadata_data: dict[str, Any],
272    opts: ValidateOptions | None = None,
273) -> ValidationResult:
274    """Run all pre-publish validators against raw `metadata.data`.
275
276    Args:
277        metadata_data: The `data` section of a parsed `metadata.yaml`.
278        opts: Options influencing validation behaviour.
279
280    Returns:
281        A `ValidationResult` with aggregate pass/fail and error list.
282    """
283    if opts is None:
284        opts = ValidateOptions()
285
286    result = ValidationResult()
287
288    for validator in PRE_PUBLISH_VALIDATORS:
289        result.validators_run += 1
290        logger.info("Running validator: %s", validator.__name__)
291        passed, error = validator(metadata_data, opts)
292        if not passed and error:
293            logger.error("Validation failed: %s", error)
294            result.add_error(error)
295
296    return result

Run all pre-publish validators against raw metadata.data.

Arguments:
  • metadata_data: The data section of a parsed metadata.yaml.
  • opts: Options influencing validation behaviour.
Returns:

A ValidationResult with aggregate pass/fail and error list.

def yank_connector_version( connector_name: str, version: str, bucket_name: str, reason: str = '', dry_run: bool = False) -> YankResult:
 65def yank_connector_version(
 66    connector_name: str,
 67    version: str,
 68    bucket_name: str,
 69    reason: str = "",
 70    dry_run: bool = False,
 71) -> YankResult:
 72    """Mark a connector version as yanked by writing a version-yank.yml marker.
 73
 74    The marker file is placed at:
 75        metadata/airbyte/{connector_name}/{version}/version-yank.yml
 76
 77    Args:
 78        connector_name: The connector name (e.g., "source-faker").
 79        version: The version to yank (e.g., "1.2.3").
 80        bucket_name: The GCS bucket name.
 81        reason: Optional reason for yanking the version.
 82        dry_run: If True, report what would be done without writing.
 83
 84    Returns:
 85        YankResult with details of the operation.
 86
 87    Raises:
 88        ValueError: If the bucket is the production bucket and no override is set,
 89            or if the version does not exist.
 90    """
 91    yank_path = _get_yank_blob_path(connector_name, version)
 92    metadata_path = _get_metadata_blob_path(connector_name, version)
 93
 94    storage_client = get_gcs_storage_client()
 95    bucket = storage_client.bucket(bucket_name)
 96
 97    # Verify the version exists
 98    metadata_blob = bucket.blob(metadata_path)
 99    if not metadata_blob.exists():
100        return YankResult(
101            connector_name=connector_name,
102            version=version,
103            bucket_name=bucket_name,
104            action="yank",
105            success=False,
106            message=f"Version {version} not found for {connector_name} in {bucket_name}.",
107            dry_run=dry_run,
108        )
109
110    # Check if already yanked
111    yank_blob = bucket.blob(yank_path)
112    if yank_blob.exists():
113        return YankResult(
114            connector_name=connector_name,
115            version=version,
116            bucket_name=bucket_name,
117            action="yank",
118            success=False,
119            message=f"Version {version} of {connector_name} is already yanked.",
120            dry_run=dry_run,
121        )
122
123    if dry_run:
124        return YankResult(
125            connector_name=connector_name,
126            version=version,
127            bucket_name=bucket_name,
128            action="yank",
129            success=True,
130            message=f"[DRY RUN] Would yank {connector_name} {version}.",
131            dry_run=True,
132        )
133
134    # Write the yank marker file
135    yank_content: dict[str, Any] = {
136        "yanked": True,
137        "yanked_at": datetime.now(tz=timezone.utc).isoformat(),
138    }
139    if reason:
140        yank_content["reason"] = reason
141
142    yank_yaml = yaml.dump(yank_content, default_flow_style=False)
143    yank_blob.upload_from_string(yank_yaml, content_type="application/x-yaml")
144
145    logger.info("Yanked %s version %s in %s", connector_name, version, bucket_name)
146
147    return YankResult(
148        connector_name=connector_name,
149        version=version,
150        bucket_name=bucket_name,
151        action="yank",
152        success=True,
153        message=f"Successfully yanked {connector_name} {version}.",
154    )

Mark a connector version as yanked by writing a version-yank.yml marker.

The marker file is placed at:

metadata/airbyte/{connector_name}/{version}/version-yank.yml

Arguments:
  • connector_name: The connector name (e.g., "source-faker").
  • version: The version to yank (e.g., "1.2.3").
  • bucket_name: The GCS bucket name.
  • reason: Optional reason for yanking the version.
  • dry_run: If True, report what would be done without writing.
Returns:

YankResult with details of the operation.

Raises:
  • ValueError: If the bucket is the production bucket and no override is set, or if the version does not exist.