airbyte_ops_mcp.registry

Registry operations for Airbyte connectors.

This package provides functionality for:

  • Reading connector metadata from the GCS registry
  • Listing connectors and versions in the registry
  • Publishing connector metadata to GCS
  • Promoting and rolling back release candidates
  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Registry operations for Airbyte connectors.
  3
  4This package provides functionality for:
  5- Reading connector metadata from the GCS registry
  6- Listing connectors and versions in the registry
  7- Publishing connector metadata to GCS
  8- Promoting and rolling back release candidates
  9"""
 10
 11from __future__ import annotations
 12
 13from airbyte_ops_mcp.registry._constants import (
 14    DEFAULT_METADATA_SERVICE_BUCKET_NAME,
 15    DEV_METADATA_SERVICE_BUCKET_NAME,
 16    LATEST_GCS_FOLDER_NAME,
 17    METADATA_FILE_NAME,
 18    METADATA_FOLDER,
 19    PROD_METADATA_SERVICE_BUCKET_NAME,
 20    RELEASE_CANDIDATE_GCS_FOLDER_NAME,
 21    SONAR_DEV_BUCKET_NAME,
 22    SONAR_PROD_BUCKET_NAME,
 23)
 24from airbyte_ops_mcp.registry._enums import (
 25    ConnectorLanguage,
 26    ConnectorType,
 27    SupportLevel,
 28)
 29from airbyte_ops_mcp.registry.audit import (
 30    AuditResult,
 31    UnpublishedConnector,
 32    find_unpublished_connectors,
 33)
 34from airbyte_ops_mcp.registry.compile import (
 35    CompileResult,
 36    PurgeLatestResult,
 37    compile_registry,
 38    purge_latest_dirs,
 39)
 40from airbyte_ops_mcp.registry.generate import (
 41    GenerateResult,
 42    generate_version_artifacts,
 43)
 44from airbyte_ops_mcp.registry.models import (
 45    ConnectorListResult,
 46    ConnectorMetadata,
 47    ConnectorPublishResult,
 48    MetadataPublishResult,
 49    RegistryEntryResult,
 50    VersionListResult,
 51)
 52from airbyte_ops_mcp.registry.operations import (
 53    get_registry_entry,
 54    get_registry_spec,
 55    list_connector_versions,
 56    list_registry_connectors,
 57    list_registry_connectors_filtered,
 58)
 59from airbyte_ops_mcp.registry.publish import (
 60    CONNECTOR_PATH_PREFIX,
 61    create_progressive_rollout_blob,
 62    delete_progressive_rollout_blob,
 63    get_connector_metadata,
 64    get_gcs_publish_path,
 65    is_valid_for_progressive_rollout,
 66    publish_connector_metadata,
 67    strip_rc_suffix,
 68)
 69from airbyte_ops_mcp.registry.publish_artifacts import (
 70    PublishArtifactsResult,
 71    publish_version_artifacts,
 72)
 73from airbyte_ops_mcp.registry.rebuild import (
 74    OutputMode,
 75    RebuildResult,
 76    rebuild_registry,
 77)
 78from airbyte_ops_mcp.registry.registry_store_base import (
 79    Registry,
 80    get_registry,
 81)
 82from airbyte_ops_mcp.registry.store import (
 83    REGISTRY_STORE_ENV_VAR,
 84    RegistryStore,
 85    StoreType,
 86    resolve_registry_store,
 87)
 88from airbyte_ops_mcp.registry.validate import (
 89    ValidateOptions,
 90    ValidationResult,
 91    validate_metadata,
 92)
 93from airbyte_ops_mcp.registry.yank import (
 94    YANK_FILE_NAME,
 95    YankResult,
 96    unyank_connector_version,
 97    yank_connector_version,
 98)
 99
100__all__ = [
101    "CONNECTOR_PATH_PREFIX",
102    "DEFAULT_METADATA_SERVICE_BUCKET_NAME",
103    "DEV_METADATA_SERVICE_BUCKET_NAME",
104    "LATEST_GCS_FOLDER_NAME",
105    "METADATA_FILE_NAME",
106    "METADATA_FOLDER",
107    "PROD_METADATA_SERVICE_BUCKET_NAME",
108    "REGISTRY_STORE_ENV_VAR",
109    "RELEASE_CANDIDATE_GCS_FOLDER_NAME",
110    "SONAR_DEV_BUCKET_NAME",
111    "SONAR_PROD_BUCKET_NAME",
112    "YANK_FILE_NAME",
113    "AuditResult",
114    "CompileResult",
115    "ConnectorLanguage",
116    "ConnectorListResult",
117    "ConnectorMetadata",
118    "ConnectorPublishResult",
119    "ConnectorType",
120    "GenerateResult",
121    "MetadataPublishResult",
122    "OutputMode",
123    "PublishArtifactsResult",
124    "PurgeLatestResult",
125    "RebuildResult",
126    "Registry",
127    "RegistryEntryResult",
128    "RegistryStore",
129    "StoreType",
130    "SupportLevel",
131    "UnpublishedConnector",
132    "ValidateOptions",
133    "ValidationResult",
134    "VersionListResult",
135    "YankResult",
136    "compile_registry",
137    "create_progressive_rollout_blob",
138    "delete_progressive_rollout_blob",
139    "find_unpublished_connectors",
140    "generate_version_artifacts",
141    "get_connector_metadata",
142    "get_gcs_publish_path",
143    "get_registry",
144    "get_registry_entry",
145    "get_registry_spec",
146    "is_valid_for_progressive_rollout",
147    "list_connector_versions",
148    "list_registry_connectors",
149    "list_registry_connectors_filtered",
150    "publish_connector_metadata",
151    "publish_version_artifacts",
152    "purge_latest_dirs",
153    "rebuild_registry",
154    "resolve_registry_store",
155    "strip_rc_suffix",
156    "unyank_connector_version",
157    "validate_metadata",
158    "yank_connector_version",
159]
CONNECTOR_PATH_PREFIX = 'airbyte-integrations/connectors'
DEFAULT_METADATA_SERVICE_BUCKET_NAME = 'dev-airbyte-cloud-connector-metadata-service-2'
DEV_METADATA_SERVICE_BUCKET_NAME = 'dev-airbyte-cloud-connector-metadata-service-2'
LATEST_GCS_FOLDER_NAME = 'latest'
METADATA_FILE_NAME = 'metadata.yaml'
METADATA_FOLDER = 'metadata'
PROD_METADATA_SERVICE_BUCKET_NAME = 'prod-airbyte-cloud-connector-metadata-service'
REGISTRY_STORE_ENV_VAR = 'AIRBYTE_REGISTRY_STORE'
RELEASE_CANDIDATE_GCS_FOLDER_NAME = 'release_candidate'
SONAR_DEV_BUCKET_NAME = 'airbyte-connector-registry-dev'
SONAR_PROD_BUCKET_NAME = 'airbyte-connector-registry'
YANK_FILE_NAME = 'version-yank.yml'
@dataclass
class AuditResult:
37@dataclass
38class AuditResult:
39    """Result of auditing which connectors have unpublished versions."""
40
41    unpublished: list[UnpublishedConnector] = field(default_factory=list)
42    checked_count: int = 0
43    skipped_archived: list[str] = field(default_factory=list)
44    skipped_rc: list[str] = field(default_factory=list)
45    skipped_disabled: list[str] = field(default_factory=list)
46    errors: list[str] = field(default_factory=list)

Result of auditing which connectors have unpublished versions.

AuditResult( unpublished: list[UnpublishedConnector] = <factory>, checked_count: int = 0, skipped_archived: list[str] = <factory>, skipped_rc: list[str] = <factory>, skipped_disabled: list[str] = <factory>, errors: list[str] = <factory>)
unpublished: list[UnpublishedConnector]
checked_count: int = 0
skipped_archived: list[str]
skipped_rc: list[str]
skipped_disabled: list[str]
errors: list[str]
@dataclass
class CompileResult:
102@dataclass
103class CompileResult:
104    """Result of a registry compile operation."""
105
106    target: str
107    connectors_scanned: int = 0
108    versions_found: int = 0
109    yanked_versions: int = 0
110    latest_updated: int = 0
111    latest_already_current: int = 0
112    cloud_registry_entries: int = 0
113    oss_registry_entries: int = 0
114    composite_registry_entries: int = 0
115    metrics_connector_count: int = 0
116    metrics_registry_entries: int = 0
117    metrics_source: str | None = None
118    metrics_error: str | None = None
119    version_indexes_written: int = 0
120    specs_secrets_mask_properties: int = 0
121    errors: list[str] = field(default_factory=list)
122    dry_run: bool = False
123
124    @property
125    def status(self) -> str:
126        if self.dry_run:
127            return "dry-run"
128        if self.errors:
129            return "completed-with-errors"
130        return "success"
131
132    def summary(self) -> str:
133        return (
134            f"[{self.status}] Scanned {self.connectors_scanned} connectors, "
135            f"{self.versions_found} versions ({self.yanked_versions} yanked). "
136            f"Latest updated: {self.latest_updated}, "
137            f"already current: {self.latest_already_current}. "
138            f"Registry entries: cloud={self.cloud_registry_entries}, "
139            f"oss={self.oss_registry_entries}, "
140            f"composite={self.composite_registry_entries}. "
141            f"Metrics loaded for {self.metrics_connector_count} connectors, "
142            f"injected into {self.metrics_registry_entries} registry entries. "
143            f"Version indexes: {self.version_indexes_written}. "
144            f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. "
145            f"Errors: {len(self.errors)}."
146        )

Result of a registry compile operation.

CompileResult( target: str, connectors_scanned: int = 0, versions_found: int = 0, yanked_versions: int = 0, latest_updated: int = 0, latest_already_current: int = 0, cloud_registry_entries: int = 0, oss_registry_entries: int = 0, composite_registry_entries: int = 0, metrics_connector_count: int = 0, metrics_registry_entries: int = 0, metrics_source: str | None = None, metrics_error: str | None = None, version_indexes_written: int = 0, specs_secrets_mask_properties: int = 0, errors: list[str] = <factory>, dry_run: bool = False)
target: str
connectors_scanned: int = 0
versions_found: int = 0
yanked_versions: int = 0
latest_updated: int = 0
latest_already_current: int = 0
cloud_registry_entries: int = 0
oss_registry_entries: int = 0
composite_registry_entries: int = 0
metrics_connector_count: int = 0
metrics_registry_entries: int = 0
metrics_source: str | None = None
metrics_error: str | None = None
version_indexes_written: int = 0
specs_secrets_mask_properties: int = 0
errors: list[str]
dry_run: bool = False
status: str
124    @property
125    def status(self) -> str:
126        if self.dry_run:
127            return "dry-run"
128        if self.errors:
129            return "completed-with-errors"
130        return "success"
def summary(self) -> str:
132    def summary(self) -> str:
133        return (
134            f"[{self.status}] Scanned {self.connectors_scanned} connectors, "
135            f"{self.versions_found} versions ({self.yanked_versions} yanked). "
136            f"Latest updated: {self.latest_updated}, "
137            f"already current: {self.latest_already_current}. "
138            f"Registry entries: cloud={self.cloud_registry_entries}, "
139            f"oss={self.oss_registry_entries}, "
140            f"composite={self.composite_registry_entries}. "
141            f"Metrics loaded for {self.metrics_connector_count} connectors, "
142            f"injected into {self.metrics_registry_entries} registry entries. "
143            f"Version indexes: {self.version_indexes_written}. "
144            f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. "
145            f"Errors: {len(self.errors)}."
146        )
class ConnectorLanguage(enum.StrEnum):
 88class ConnectorLanguage(StrEnum):
 89    """Connector implementation languages."""
 90
 91    PYTHON = "python"
 92    JAVA = "java"
 93    LOW_CODE = "low-code"
 94    MANIFEST_ONLY = "manifest-only"
 95
 96    @classmethod
 97    def parse(cls, value: str) -> ConnectorLanguage:
 98        """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch."""
 99        try:
100            return cls(value)
101        except ValueError:
102            valid = ", ".join(f"`{m.value}`" for m in cls)
103            raise ValueError(
104                f"Unrecognized language: {value!r}. Expected one of: {valid}."
105            ) from None

Connector implementation languages.

PYTHON = <ConnectorLanguage.PYTHON: 'python'>
JAVA = <ConnectorLanguage.JAVA: 'java'>
LOW_CODE = <ConnectorLanguage.LOW_CODE: 'low-code'>
MANIFEST_ONLY = <ConnectorLanguage.MANIFEST_ONLY: 'manifest-only'>
@classmethod
def parse(cls, value: str) -> ConnectorLanguage:
 96    @classmethod
 97    def parse(cls, value: str) -> ConnectorLanguage:
 98        """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch."""
 99        try:
100            return cls(value)
101        except ValueError:
102            valid = ", ".join(f"`{m.value}`" for m in cls)
103            raise ValueError(
104                f"Unrecognized language: {value!r}. Expected one of: {valid}."
105            ) from None

Parse a string into a ConnectorLanguage, raising ValueError on mismatch.

class ConnectorListResult(pydantic.main.BaseModel):
106class ConnectorListResult(BaseModel):
107    """Result of listing connectors in the registry."""
108
109    bucket_name: str = Field(description="The GCS bucket name")
110    connector_count: int = Field(description="Number of connectors found")
111    connectors: list[str] = Field(description="List of connector names")

Result of listing connectors in the registry.

bucket_name: str = PydanticUndefined

The GCS bucket name

connector_count: int = PydanticUndefined

Number of connectors found

connectors: list[str] = PydanticUndefined

List of connector names

class ConnectorMetadata(pydantic.main.BaseModel):
16class ConnectorMetadata(BaseModel):
17    """Connector metadata from metadata.yaml.
18
19    This model represents the essential metadata about a connector
20    read from its metadata.yaml file in the Airbyte monorepo.
21    """
22
23    name: str = Field(description="The connector technical name")
24    docker_repository: str = Field(description="The Docker repository")
25    docker_image_tag: str = Field(description="The Docker image tag/version")
26    support_level: str | None = Field(
27        default=None, description="The support level (certified, community, etc.)"
28    )
29    definition_id: str | None = Field(
30        default=None, description="The connector definition ID"
31    )

Connector metadata from metadata.yaml.

This model represents the essential metadata about a connector read from its metadata.yaml file in the Airbyte monorepo.

name: str = PydanticUndefined

The connector technical name

docker_repository: str = PydanticUndefined

The Docker repository

docker_image_tag: str = PydanticUndefined

The Docker image tag/version

support_level: str | None = None

The support level (certified, community, etc.)

definition_id: str | None = None

The connector definition ID

class ConnectorPublishResult(pydantic.main.BaseModel):
34class ConnectorPublishResult(BaseModel):
35    """Result of a connector publish operation.
36
37    This model provides detailed information about the outcome of a
38    connector publish operation (apply or rollback version override).
39    """
40
41    connector: str = Field(description="The connector technical name")
42    version: str = Field(description="The connector version")
43    action: Literal["progressive-rollout-create", "progressive-rollout-cleanup"] = (
44        Field(description="The action performed")
45    )
46    status: Literal["success", "failure", "dry-run"] = Field(
47        description="The status of the operation"
48    )
49    docker_image: str | None = Field(
50        default=None, description="The Docker image name if applicable"
51    )
52    registry_updated: bool = Field(
53        default=False, description="Whether the registry was updated"
54    )
55    message: str | None = Field(default=None, description="Additional status message")
56
57    def __str__(self) -> str:
58        """Return a string representation of the publish result."""
59        status_prefix = "dry-run" if self.status == "dry-run" else self.status
60        return f"[{status_prefix}] {self.connector}:{self.version} - {self.action}"

Result of a connector publish operation.

This model provides detailed information about the outcome of a connector publish operation (apply or rollback version override).

connector: str = PydanticUndefined

The connector technical name

version: str = PydanticUndefined

The connector version

action: Literal['progressive-rollout-create', 'progressive-rollout-cleanup'] = PydanticUndefined

The action performed

status: Literal['success', 'failure', 'dry-run'] = PydanticUndefined

The status of the operation

docker_image: str | None = None

The Docker image name if applicable

registry_updated: bool = False

Whether the registry was updated

message: str | None = None

Additional status message

class ConnectorType(enum.StrEnum):
70class ConnectorType(StrEnum):
71    """Connector type: source or destination."""
72
73    SOURCE = "source"
74    DESTINATION = "destination"
75
76    @classmethod
77    def parse(cls, value: str) -> ConnectorType:
78        """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch."""
79        try:
80            return cls(value)
81        except ValueError:
82            valid = ", ".join(f"`{m.value}`" for m in cls)
83            raise ValueError(
84                f"Unrecognized connector type: {value!r}. Expected one of: {valid}."
85            ) from None

Connector type: source or destination.

SOURCE = <ConnectorType.SOURCE: 'source'>
DESTINATION = <ConnectorType.DESTINATION: 'destination'>
@classmethod
def parse(cls, value: str) -> ConnectorType:
76    @classmethod
77    def parse(cls, value: str) -> ConnectorType:
78        """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch."""
79        try:
80            return cls(value)
81        except ValueError:
82            valid = ", ".join(f"`{m.value}`" for m in cls)
83            raise ValueError(
84                f"Unrecognized connector type: {value!r}. Expected one of: {valid}."
85            ) from None

Parse a string into a ConnectorType, raising ValueError on mismatch.

@dataclass
class GenerateResult:
127@dataclass
128class GenerateResult:
129    """Result of a local artifact generation run."""
130
131    connector_name: str
132    version: str
133    docker_image: str
134    output_dir: str
135    artifacts_written: list[str] = field(default_factory=list)
136    errors: list[str] = field(default_factory=list)
137    validation_errors: list[str] = field(default_factory=list)
138    dry_run: bool = False
139
140    @property
141    def success(self) -> bool:
142        return len(self.errors) == 0 and len(self.validation_errors) == 0
143
144    def to_dict(self) -> dict[str, Any]:
145        return {
146            "connector_name": self.connector_name,
147            "version": self.version,
148            "docker_image": self.docker_image,
149            "output_dir": self.output_dir,
150            "artifacts_written": self.artifacts_written,
151            "errors": self.errors,
152            "validation_errors": self.validation_errors,
153            "dry_run": self.dry_run,
154            "success": self.success,
155        }

Result of a local artifact generation run.

GenerateResult( connector_name: str, version: str, docker_image: str, output_dir: str, artifacts_written: list[str] = <factory>, errors: list[str] = <factory>, validation_errors: list[str] = <factory>, dry_run: bool = False)
connector_name: str
version: str
docker_image: str
output_dir: str
artifacts_written: list[str]
errors: list[str]
validation_errors: list[str]
dry_run: bool = False
success: bool
140    @property
141    def success(self) -> bool:
142        return len(self.errors) == 0 and len(self.validation_errors) == 0
def to_dict(self) -> dict[str, typing.Any]:
144    def to_dict(self) -> dict[str, Any]:
145        return {
146            "connector_name": self.connector_name,
147            "version": self.version,
148            "docker_image": self.docker_image,
149            "output_dir": self.output_dir,
150            "artifacts_written": self.artifacts_written,
151            "errors": self.errors,
152            "validation_errors": self.validation_errors,
153            "dry_run": self.dry_run,
154            "success": self.success,
155        }
class MetadataPublishResult(pydantic.main.BaseModel):
63class MetadataPublishResult(BaseModel):
64    """Result of a metadata publish operation to GCS.
65
66    This model provides detailed information about the outcome of
67    publishing connector metadata to the registry.
68    """
69
70    connector_name: str = Field(description="The connector technical name")
71    version: str = Field(description="The version that was published")
72    bucket_name: str = Field(description="The GCS bucket name")
73    versioned_path: str = Field(description="The versioned GCS path")
74    latest_path: str | None = Field(
75        default=None, description="The latest GCS path if updated"
76    )
77    versioned_uploaded: bool = Field(
78        default=False, description="Whether the versioned metadata was uploaded"
79    )
80    latest_uploaded: bool = Field(
81        default=False, description="Whether the latest metadata was uploaded"
82    )
83    status: Literal["success", "dry-run", "already-up-to-date"] = Field(
84        description="The status of the operation"
85    )
86    message: str = Field(description="Status message describing the outcome")
87
88    def __str__(self) -> str:
89        """Return a string representation of the publish result."""
90        return f"[{self.status}] {self.connector_name}:{self.version} -> {self.versioned_path}"

Result of a metadata publish operation to GCS.

This model provides detailed information about the outcome of publishing connector metadata to the registry.

connector_name: str = PydanticUndefined

The connector technical name

version: str = PydanticUndefined

The version that was published

bucket_name: str = PydanticUndefined

The GCS bucket name

versioned_path: str = PydanticUndefined

The versioned GCS path

latest_path: str | None = None

The latest GCS path if updated

versioned_uploaded: bool = False

Whether the versioned metadata was uploaded

latest_uploaded: bool = False

Whether the latest metadata was uploaded

status: Literal['success', 'dry-run', 'already-up-to-date'] = PydanticUndefined

The status of the operation

message: str = PydanticUndefined

Status message describing the outcome

OutputMode = typing.Literal['local', 'gcs', 's3']
@dataclass
class PublishArtifactsResult:
46@dataclass
47class PublishArtifactsResult:
48    """Result of a version-artifacts publish operation."""
49
50    connector_name: str
51    version: str
52    target: str
53    gcs_destination: str
54    files_uploaded: list[str] = field(default_factory=list)
55    errors: list[str] = field(default_factory=list)
56    validation_errors: list[str] = field(default_factory=list)
57    dry_run: bool = False
58
59    @property
60    def success(self) -> bool:
61        return len(self.errors) == 0 and len(self.validation_errors) == 0
62
63    @property
64    def status(self) -> str:
65        if self.dry_run:
66            return "dry-run"
67        if self.errors or self.validation_errors:
68            return "completed-with-errors"
69        return "success"

Result of a version-artifacts publish operation.

PublishArtifactsResult( connector_name: str, version: str, target: str, gcs_destination: str, files_uploaded: list[str] = <factory>, errors: list[str] = <factory>, validation_errors: list[str] = <factory>, dry_run: bool = False)
connector_name: str
version: str
target: str
gcs_destination: str
files_uploaded: list[str]
errors: list[str]
validation_errors: list[str]
dry_run: bool = False
success: bool
59    @property
60    def success(self) -> bool:
61        return len(self.errors) == 0 and len(self.validation_errors) == 0
status: str
63    @property
64    def status(self) -> str:
65        if self.dry_run:
66            return "dry-run"
67        if self.errors or self.validation_errors:
68            return "completed-with-errors"
69        return "success"
@dataclass
class PurgeLatestResult:
149@dataclass
150class PurgeLatestResult:
151    """Result of a purge-latest operation."""
152
153    target: str
154    connectors_found: int = 0
155    latest_dirs_deleted: int = 0
156    errors: list[str] = field(default_factory=list)
157    dry_run: bool = False
158
159    @property
160    def status(self) -> str:
161        if self.dry_run:
162            return "dry-run"
163        if self.errors:
164            return "completed-with-errors"
165        return "success"
166
167    def summary(self) -> str:
168        return (
169            f"[{self.status}] Found {self.connectors_found} connectors, "
170            f"deleted {self.latest_dirs_deleted} latest/ directories. "
171            f"Errors: {len(self.errors)}."
172        )

Result of a purge-latest operation.

PurgeLatestResult( target: str, connectors_found: int = 0, latest_dirs_deleted: int = 0, errors: list[str] = <factory>, dry_run: bool = False)
target: str
connectors_found: int = 0
latest_dirs_deleted: int = 0
errors: list[str]
dry_run: bool = False
status: str
159    @property
160    def status(self) -> str:
161        if self.dry_run:
162            return "dry-run"
163        if self.errors:
164            return "completed-with-errors"
165        return "success"
def summary(self) -> str:
167    def summary(self) -> str:
168        return (
169            f"[{self.status}] Found {self.connectors_found} connectors, "
170            f"deleted {self.latest_dirs_deleted} latest/ directories. "
171            f"Errors: {len(self.errors)}."
172        )
@dataclass
class RebuildResult:
43@dataclass
44class RebuildResult:
45    """Result of a registry rebuild operation."""
46
47    source_bucket: str
48    output_mode: OutputMode
49    output_root: str
50    connectors_processed: int = 0
51    blobs_copied: int = 0
52    blobs_skipped: int = 0
53    errors: list[str] = field(default_factory=list)
54    dry_run: bool = False
55
56    @property
57    def status(self) -> str:
58        """Return the status of the rebuild operation."""
59        if self.dry_run:
60            return "dry-run"
61        if self.errors:
62            return "completed-with-errors"
63        return "success"
64
65    def summary(self) -> str:
66        """Return a human-readable summary."""
67        return (
68            f"[{self.status}] Rebuilt {self.connectors_processed} connectors, "
69            f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, "
70            f"{len(self.errors)} errors. Output: {self.output_root}"
71        )

Result of a registry rebuild operation.

RebuildResult( source_bucket: str, output_mode: Literal['local', 'gcs', 's3'], output_root: str, connectors_processed: int = 0, blobs_copied: int = 0, blobs_skipped: int = 0, errors: list[str] = <factory>, dry_run: bool = False)
source_bucket: str
output_mode: Literal['local', 'gcs', 's3']
output_root: str
connectors_processed: int = 0
blobs_copied: int = 0
blobs_skipped: int = 0
errors: list[str]
dry_run: bool = False
status: str
56    @property
57    def status(self) -> str:
58        """Return the status of the rebuild operation."""
59        if self.dry_run:
60            return "dry-run"
61        if self.errors:
62            return "completed-with-errors"
63        return "success"

Return the status of the rebuild operation.

def summary(self) -> str:
65    def summary(self) -> str:
66        """Return a human-readable summary."""
67        return (
68            f"[{self.status}] Rebuilt {self.connectors_processed} connectors, "
69            f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, "
70            f"{len(self.errors)} errors. Output: {self.output_root}"
71        )

Return a human-readable summary.

class Registry(abc.ABC):
 38class Registry(ABC):
 39    """A configured connector registry store.
 40
 41    A Registry is bound to a specific `airbyte_ops_mcp.registry.store.RegistryStore`
 42    (store type + env + optional prefix), and provides methods used by the CLI to
 43    read/write registry contents.
 44    """
 45
 46    def __init__(self, store: RegistryStore) -> None:
 47        self.store = store
 48
 49    @property
 50    def store_type(self) -> StoreType:
 51        return self.store.store_type
 52
 53    @property
 54    def bucket_name(self) -> str:
 55        return self.store.bucket
 56
 57    @property
 58    def prefix(self) -> str:
 59        return self.store.prefix
 60
 61    def _require_no_prefix(self, op_name: str) -> None:
 62        """Raise if this op doesn't support prefixed targets."""
 63
 64        if self.prefix:
 65            raise NotImplementedError(
 66                f"Operation '{op_name}' does not yet support store prefixes (got prefix='{self.prefix}')."
 67            )
 68
 69    # ---------------------------------------------------------------------
 70    # Read operations
 71    # ---------------------------------------------------------------------
 72
 73    @abstractmethod
 74    def list_connectors(
 75        self,
 76        *,
 77        support_level: SupportLevel | None = None,
 78        min_support_level: SupportLevel | None = None,
 79        connector_type: ConnectorType | None = None,
 80        language: ConnectorLanguage | None = None,
 81    ) -> list[str]:
 82        raise NotImplementedError(
 83            _op_not_implemented_message(self.store_type, "list_connectors")
 84        )
 85
 86    def list_connector_versions(self, connector_name: str) -> list[str]:
 87        raise NotImplementedError(
 88            _op_not_implemented_message(self.store_type, "list_connector_versions")
 89        )
 90
 91    def get_connector_metadata(
 92        self, connector_name: str, version: str = "latest"
 93    ) -> dict[str, Any]:
 94        raise NotImplementedError(
 95            _op_not_implemented_message(self.store_type, "get_connector_metadata")
 96        )
 97
 98    # ---------------------------------------------------------------------
 99    # Write / mutate operations
100    # ---------------------------------------------------------------------
101
102    def progressive_rollout_create(
103        self,
104        repo_path: Path,
105        connector_name: str,
106        dry_run: bool = False,
107    ) -> ConnectorPublishResult:
108        raise NotImplementedError(
109            _op_not_implemented_message(self.store_type, "progressive_rollout_create")
110        )
111
112    def progressive_rollout_cleanup(
113        self,
114        repo_path: Path,
115        connector_name: str,
116        dry_run: bool = False,
117    ) -> ConnectorPublishResult:
118        raise NotImplementedError(
119            _op_not_implemented_message(self.store_type, "progressive_rollout_cleanup")
120        )
121
122    def yank(
123        self,
124        connector_name: str,
125        version: str,
126        reason: str = "",
127        dry_run: bool = False,
128    ) -> YankResult:
129        raise NotImplementedError(_op_not_implemented_message(self.store_type, "yank"))
130
131    def unyank(
132        self,
133        connector_name: str,
134        version: str,
135        dry_run: bool = False,
136    ) -> YankResult:
137        raise NotImplementedError(
138            _op_not_implemented_message(self.store_type, "unyank")
139        )
140
141    def publish_version_artifacts(
142        self,
143        connector_name: str,
144        version: str,
145        artifacts_dir: Path,
146        dry_run: bool = False,
147        with_validate: bool = True,
148    ) -> PublishArtifactsResult:
149        raise NotImplementedError(
150            _op_not_implemented_message(self.store_type, "publish_version_artifacts")
151        )
152
153    def delete_dev_latest(
154        self,
155        connector_name: list[str] | None = None,
156        dry_run: bool = False,
157    ) -> PurgeLatestResult:
158        raise NotImplementedError(
159            _op_not_implemented_message(self.store_type, "delete_dev_latest")
160        )
161
162    def compile(
163        self,
164        connector_name: list[str] | None = None,
165        dry_run: bool = False,
166        with_secrets_mask: bool = False,
167        with_legacy_migration: str | None = None,
168        with_metrics: bool = True,
169        force: bool = False,
170    ) -> CompileResult:
171        raise NotImplementedError(
172            _op_not_implemented_message(self.store_type, "compile")
173        )
174
175    def marketing_stubs_check(self, repo_root: Path) -> dict[str, Any]:
176        raise NotImplementedError(
177            _op_not_implemented_message(self.store_type, "marketing_stubs_check")
178        )
179
180    def marketing_stubs_sync(
181        self,
182        repo_root: Path,
183        dry_run: bool = False,
184    ) -> dict[str, Any]:
185        raise NotImplementedError(
186            _op_not_implemented_message(self.store_type, "marketing_stubs_sync")
187        )
188
189    def mirror(
190        self,
191        output_mode: OutputMode,
192        output_path_root: str | None = None,
193        gcs_bucket: str | None = None,
194        s3_bucket: str | None = None,
195        dry_run: bool = False,
196        connector_name: list[str] | None = None,
197    ) -> RebuildResult:
198        raise NotImplementedError(
199            _op_not_implemented_message(self.store_type, "mirror")
200        )

A configured connector registry store.

A Registry is bound to a specific airbyte_ops_mcp.registry.store.RegistryStore (store type + env + optional prefix), and provides methods used by the CLI to read/write registry contents.

store
store_type: StoreType
49    @property
50    def store_type(self) -> StoreType:
51        return self.store.store_type
bucket_name: str
53    @property
54    def bucket_name(self) -> str:
55        return self.store.bucket
prefix: str
57    @property
58    def prefix(self) -> str:
59        return self.store.prefix
@abstractmethod
def list_connectors( self, *, support_level: SupportLevel | None = None, min_support_level: SupportLevel | None = None, connector_type: ConnectorType | None = None, language: ConnectorLanguage | None = None) -> list[str]:
73    @abstractmethod
74    def list_connectors(
75        self,
76        *,
77        support_level: SupportLevel | None = None,
78        min_support_level: SupportLevel | None = None,
79        connector_type: ConnectorType | None = None,
80        language: ConnectorLanguage | None = None,
81    ) -> list[str]:
82        raise NotImplementedError(
83            _op_not_implemented_message(self.store_type, "list_connectors")
84        )
def list_connector_versions(self, connector_name: str) -> list[str]:
86    def list_connector_versions(self, connector_name: str) -> list[str]:
87        raise NotImplementedError(
88            _op_not_implemented_message(self.store_type, "list_connector_versions")
89        )
def get_connector_metadata( self, connector_name: str, version: str = 'latest') -> dict[str, typing.Any]:
91    def get_connector_metadata(
92        self, connector_name: str, version: str = "latest"
93    ) -> dict[str, Any]:
94        raise NotImplementedError(
95            _op_not_implemented_message(self.store_type, "get_connector_metadata")
96        )
def progressive_rollout_create( self, repo_path: pathlib.Path, connector_name: str, dry_run: bool = False) -> ConnectorPublishResult:
102    def progressive_rollout_create(
103        self,
104        repo_path: Path,
105        connector_name: str,
106        dry_run: bool = False,
107    ) -> ConnectorPublishResult:
108        raise NotImplementedError(
109            _op_not_implemented_message(self.store_type, "progressive_rollout_create")
110        )
def progressive_rollout_cleanup( self, repo_path: pathlib.Path, connector_name: str, dry_run: bool = False) -> ConnectorPublishResult:
112    def progressive_rollout_cleanup(
113        self,
114        repo_path: Path,
115        connector_name: str,
116        dry_run: bool = False,
117    ) -> ConnectorPublishResult:
118        raise NotImplementedError(
119            _op_not_implemented_message(self.store_type, "progressive_rollout_cleanup")
120        )
def yank( self, connector_name: str, version: str, reason: str = '', dry_run: bool = False) -> YankResult:
122    def yank(
123        self,
124        connector_name: str,
125        version: str,
126        reason: str = "",
127        dry_run: bool = False,
128    ) -> YankResult:
129        raise NotImplementedError(_op_not_implemented_message(self.store_type, "yank"))
def unyank( self, connector_name: str, version: str, dry_run: bool = False) -> YankResult:
131    def unyank(
132        self,
133        connector_name: str,
134        version: str,
135        dry_run: bool = False,
136    ) -> YankResult:
137        raise NotImplementedError(
138            _op_not_implemented_message(self.store_type, "unyank")
139        )
def publish_version_artifacts( self, connector_name: str, version: str, artifacts_dir: pathlib.Path, dry_run: bool = False, with_validate: bool = True) -> PublishArtifactsResult:
141    def publish_version_artifacts(
142        self,
143        connector_name: str,
144        version: str,
145        artifacts_dir: Path,
146        dry_run: bool = False,
147        with_validate: bool = True,
148    ) -> PublishArtifactsResult:
149        raise NotImplementedError(
150            _op_not_implemented_message(self.store_type, "publish_version_artifacts")
151        )
def delete_dev_latest( self, connector_name: list[str] | None = None, dry_run: bool = False) -> PurgeLatestResult:
153    def delete_dev_latest(
154        self,
155        connector_name: list[str] | None = None,
156        dry_run: bool = False,
157    ) -> PurgeLatestResult:
158        raise NotImplementedError(
159            _op_not_implemented_message(self.store_type, "delete_dev_latest")
160        )
def compile( self, connector_name: list[str] | None = None, dry_run: bool = False, with_secrets_mask: bool = False, with_legacy_migration: str | None = None, with_metrics: bool = True, force: bool = False) -> CompileResult:
162    def compile(
163        self,
164        connector_name: list[str] | None = None,
165        dry_run: bool = False,
166        with_secrets_mask: bool = False,
167        with_legacy_migration: str | None = None,
168        with_metrics: bool = True,
169        force: bool = False,
170    ) -> CompileResult:
171        raise NotImplementedError(
172            _op_not_implemented_message(self.store_type, "compile")
173        )
def marketing_stubs_check(self, repo_root: pathlib.Path) -> dict[str, typing.Any]:
175    def marketing_stubs_check(self, repo_root: Path) -> dict[str, Any]:
176        raise NotImplementedError(
177            _op_not_implemented_message(self.store_type, "marketing_stubs_check")
178        )
def marketing_stubs_sync( self, repo_root: pathlib.Path, dry_run: bool = False) -> dict[str, typing.Any]:
180    def marketing_stubs_sync(
181        self,
182        repo_root: Path,
183        dry_run: bool = False,
184    ) -> dict[str, Any]:
185        raise NotImplementedError(
186            _op_not_implemented_message(self.store_type, "marketing_stubs_sync")
187        )
def mirror( self, output_mode: Literal['local', 'gcs', 's3'], output_path_root: str | None = None, gcs_bucket: str | None = None, s3_bucket: str | None = None, dry_run: bool = False, connector_name: list[str] | None = None) -> RebuildResult:
189    def mirror(
190        self,
191        output_mode: OutputMode,
192        output_path_root: str | None = None,
193        gcs_bucket: str | None = None,
194        s3_bucket: str | None = None,
195        dry_run: bool = False,
196        connector_name: list[str] | None = None,
197    ) -> RebuildResult:
198        raise NotImplementedError(
199            _op_not_implemented_message(self.store_type, "mirror")
200        )
class RegistryEntryResult(pydantic.main.BaseModel):
 93class RegistryEntryResult(BaseModel):
 94    """Result of reading a registry entry from GCS.
 95
 96    This model wraps the raw metadata dictionary with additional context.
 97    """
 98
 99    connector_name: str = Field(description="The connector technical name")
100    version: str = Field(description="The version that was read")
101    bucket_name: str = Field(description="The GCS bucket name")
102    gcs_path: str = Field(description="The GCS path that was read")
103    metadata: dict = Field(description="The raw metadata dictionary")

Result of reading a registry entry from GCS.

This model wraps the raw metadata dictionary with additional context.

connector_name: str = PydanticUndefined

The connector technical name

version: str = PydanticUndefined

The version that was read

bucket_name: str = PydanticUndefined

The GCS bucket name

gcs_path: str = PydanticUndefined

The GCS path that was read

metadata: dict = PydanticUndefined

The raw metadata dictionary

@dataclass(frozen=True, kw_only=True)
class RegistryStore:
132@dataclass(frozen=True, kw_only=True)
133class RegistryStore:
134    """Parsed store target (type + environment + optional prefix).
135
136    Examples::
137
138        RegistryStore.parse("coral:dev")
139        # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="")
140        RegistryStore.parse("coral:dev/aj-test")
141        # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test")
142        RegistryStore.parse("sonar:prod")
143        # -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="")
144    """
145
146    store_type: StoreType
147    env: str
148    prefix: str = ""
149
150    # -- Derived helpers -----------------------------------------------------
151
152    @property
153    def bucket(self) -> str:
154        """Resolve the concrete bucket name for this target."""
155        env_map = BUCKET_MAP.get(self.store_type)
156        if env_map is None:
157            raise ValueError(f"Unknown store type: {self.store_type!r}")
158        bucket_name = env_map.get(self.env)
159        if bucket_name is None:
160            raise ValueError(
161                f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. "
162                f"Expected one of: {', '.join(sorted(env_map))}."
163            )
164        return bucket_name
165
166    @property
167    def bucket_root(self) -> str:
168        """Bucket name with optional prefix appended (`bucket/prefix`)."""
169        if self.prefix:
170            return f"{self.bucket}/{self.prefix}"
171        return self.bucket
172
173    # -- Parsing -------------------------------------------------------------
174
175    @classmethod
176    def parse(cls, target: str) -> RegistryStore:
177        """Parse a store target string.
178
179        Accepted formats:
180
181            "coral:dev"
182
183            "coral:prod"
184            "coral:dev/aj-test100"
185            "sonar:prod"
186
187        Raises:
188            ValueError: If the string cannot be parsed or references an
189                unknown store type / environment.
190        """
191        if ":" not in target:
192            raise ValueError(
193                f"Invalid store target '{target}'. "
194                "Expected format: '<store_type>:<env>[/<prefix>]' "
195                "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')."
196            )
197
198        store_part, env_part = target.split(":", 1)
199
200        # Validate store type
201        store_part_lower = store_part.lower()
202        valid_types = {t.value: t for t in StoreType}
203        if store_part_lower not in valid_types:
204            raise ValueError(
205                f"Unknown store type '{store_part}'. "
206                f"Expected one of: {', '.join(sorted(valid_types))}."
207            )
208        store_type = valid_types[store_part_lower]
209
210        # Split env and prefix
211        env_key, _, prefix = env_part.partition("/")
212        prefix = prefix.strip("/")
213
214        # Validate env
215        env_map = BUCKET_MAP.get(store_type, {})
216        if env_key not in env_map:
217            raise ValueError(
218                f"Unknown environment '{env_key}' for store type '{store_type.value}'. "
219                f"Expected one of: {', '.join(sorted(env_map))}."
220            )
221
222        return cls(store_type=store_type, env=env_key, prefix=prefix)

Parsed store target (type + environment + optional prefix).

Examples::

RegistryStore.parse("coral:dev")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="")
RegistryStore.parse("coral:dev/aj-test")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test")
RegistryStore.parse("sonar:prod")
# -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="")
RegistryStore( *, store_type: StoreType, env: str, prefix: str = '')
store_type: StoreType
env: str
prefix: str = ''
bucket: str
152    @property
153    def bucket(self) -> str:
154        """Resolve the concrete bucket name for this target."""
155        env_map = BUCKET_MAP.get(self.store_type)
156        if env_map is None:
157            raise ValueError(f"Unknown store type: {self.store_type!r}")
158        bucket_name = env_map.get(self.env)
159        if bucket_name is None:
160            raise ValueError(
161                f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. "
162                f"Expected one of: {', '.join(sorted(env_map))}."
163            )
164        return bucket_name

Resolve the concrete bucket name for this target.

bucket_root: str
166    @property
167    def bucket_root(self) -> str:
168        """Bucket name with optional prefix appended (`bucket/prefix`)."""
169        if self.prefix:
170            return f"{self.bucket}/{self.prefix}"
171        return self.bucket

Bucket name with optional prefix appended (bucket/prefix).

@classmethod
def parse(cls, target: str) -> RegistryStore:
175    @classmethod
176    def parse(cls, target: str) -> RegistryStore:
177        """Parse a store target string.
178
179        Accepted formats:
180
181            "coral:dev"
182
183            "coral:prod"
184            "coral:dev/aj-test100"
185            "sonar:prod"
186
187        Raises:
188            ValueError: If the string cannot be parsed or references an
189                unknown store type / environment.
190        """
191        if ":" not in target:
192            raise ValueError(
193                f"Invalid store target '{target}'. "
194                "Expected format: '<store_type>:<env>[/<prefix>]' "
195                "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')."
196            )
197
198        store_part, env_part = target.split(":", 1)
199
200        # Validate store type
201        store_part_lower = store_part.lower()
202        valid_types = {t.value: t for t in StoreType}
203        if store_part_lower not in valid_types:
204            raise ValueError(
205                f"Unknown store type '{store_part}'. "
206                f"Expected one of: {', '.join(sorted(valid_types))}."
207            )
208        store_type = valid_types[store_part_lower]
209
210        # Split env and prefix
211        env_key, _, prefix = env_part.partition("/")
212        prefix = prefix.strip("/")
213
214        # Validate env
215        env_map = BUCKET_MAP.get(store_type, {})
216        if env_key not in env_map:
217            raise ValueError(
218                f"Unknown environment '{env_key}' for store type '{store_type.value}'. "
219                f"Expected one of: {', '.join(sorted(env_map))}."
220            )
221
222        return cls(store_type=store_type, env=env_key, prefix=prefix)

Parse a store target string.

Accepted formats:

"coral:dev"

"coral:prod" "coral:dev/aj-test100" "sonar:prod"

Raises:
  • ValueError: If the string cannot be parsed or references an unknown store type / environment.
class StoreType(builtins.str, enum.Enum):
 59class StoreType(str, Enum):
 60    """Registry store type identifier."""
 61
 62    SONAR = "sonar"
 63    CORAL = "coral"
 64
 65    # -- Auto-detection class methods ----------------------------------------
 66
 67    @classmethod
 68    def get_from_connector_name(cls, name: str) -> StoreType:
 69        """Infer the store type from a connector's technical name.
 70
 71        Connectors whose name starts with `source-` or `destination-` belong
 72        to the **coral** registry.  All other names belong to **sonar**.
 73
 74        Args:
 75            name: Connector technical name (e.g. `"source-github"` or `"stripe"`).
 76
 77        Returns:
 78            The inferred `StoreType`.
 79        """
 80        if name.startswith("source-") or name.startswith("destination-"):
 81            return cls.CORAL
 82        return cls.SONAR
 83
 84    @classmethod
 85    def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None:
 86        """Infer the store type from a repository working directory.
 87
 88        Checks for well-known directory markers:
 89
 90        * **sonar** -- `integrations/` alongside `connector-sdk/`
 91        * **coral** -- `airbyte-integrations/connectors/`
 92
 93        Args:
 94            path: Directory to inspect.  Defaults to `Path.cwd`.
 95
 96        Returns:
 97            The inferred `StoreType`, or `None` if the directory does
 98            not match any known registry repository layout.
 99        """
100        if path is None:
101            path = Path.cwd()
102
103        # Sonar markers
104        if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir():
105            return cls.SONAR
106
107        # Coral markers
108        if (path / "airbyte-integrations" / "connectors").is_dir():
109            return cls.CORAL
110
111        return None

Registry store type identifier.

SONAR = <StoreType.SONAR: 'sonar'>
CORAL = <StoreType.CORAL: 'coral'>
@classmethod
def get_from_connector_name(cls, name: str) -> StoreType:
67    @classmethod
68    def get_from_connector_name(cls, name: str) -> StoreType:
69        """Infer the store type from a connector's technical name.
70
71        Connectors whose name starts with `source-` or `destination-` belong
72        to the **coral** registry.  All other names belong to **sonar**.
73
74        Args:
75            name: Connector technical name (e.g. `"source-github"` or `"stripe"`).
76
77        Returns:
78            The inferred `StoreType`.
79        """
80        if name.startswith("source-") or name.startswith("destination-"):
81            return cls.CORAL
82        return cls.SONAR

Infer the store type from a connector's technical name.

Connectors whose name starts with source- or destination- belong to the coral registry. All other names belong to sonar.

Arguments:
  • name: Connector technical name (e.g. "source-github" or "stripe").
Returns:

The inferred StoreType.

@classmethod
def detect_from_repo_dir( cls, path: pathlib.Path | None = None) -> StoreType | None:
 84    @classmethod
 85    def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None:
 86        """Infer the store type from a repository working directory.
 87
 88        Checks for well-known directory markers:
 89
 90        * **sonar** -- `integrations/` alongside `connector-sdk/`
 91        * **coral** -- `airbyte-integrations/connectors/`
 92
 93        Args:
 94            path: Directory to inspect.  Defaults to `Path.cwd`.
 95
 96        Returns:
 97            The inferred `StoreType`, or `None` if the directory does
 98            not match any known registry repository layout.
 99        """
100        if path is None:
101            path = Path.cwd()
102
103        # Sonar markers
104        if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir():
105            return cls.SONAR
106
107        # Coral markers
108        if (path / "airbyte-integrations" / "connectors").is_dir():
109            return cls.CORAL
110
111        return None

Infer the store type from a repository working directory.

Checks for well-known directory markers:

  • sonar -- integrations/ alongside connector-sdk/
  • coral -- airbyte-integrations/connectors/
Arguments:
  • path: Directory to inspect. Defaults to Path.cwd.
Returns:

The inferred StoreType, or None if the directory does not match any known registry repository layout.

class SupportLevel(enum.StrEnum):
14class SupportLevel(StrEnum):
15    """Connector support levels ordered by precedence."""
16
17    ARCHIVED = "archived"
18    COMMUNITY = "community"
19    CERTIFIED = "certified"
20
21    @property
22    def precedence(self) -> int:
23        """Numeric precedence for ordering comparisons.
24
25        Higher values indicate higher support commitment.
26        """
27        return _SUPPORT_LEVEL_PRECEDENCE[self]
28
29    @classmethod
30    def from_precedence(cls, precedence: int) -> SupportLevel:
31        """Look up a `SupportLevel` by its numeric precedence value.
32
33        Raises `ValueError` when the precedence is not recognised.
34        """
35        for member in cls:
36            if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence:
37                return member
38        valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
39        raise ValueError(
40            f"Unrecognized support-level precedence: {precedence!r}. "
41            f"Expected one of: {valid}."
42        ) from None
43
44    @classmethod
45    def parse(cls, value: str) -> SupportLevel:
46        """Parse a string into a `SupportLevel`.
47
48        Accepts a keyword (`archived`, `community`, `certified`)
49        or a legacy integer string (`100`, `200`, `300`).
50
51        Raises `ValueError` when the value is not recognised.
52        """
53        try:
54            return cls(value)
55        except ValueError:
56            pass
57        # Fallback: try interpreting as an integer precedence value.
58        try:
59            return cls.from_precedence(int(value))
60        except (ValueError, KeyError):
61            pass
62        valid_kw = ", ".join(f"`{m.value}`" for m in cls)
63        valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
64        raise ValueError(
65            f"Unrecognized support level: {value!r}. "
66            f"Expected keyword ({valid_kw}) or integer ({valid_int})."
67        ) from None

Connector support levels ordered by precedence.

ARCHIVED = <SupportLevel.ARCHIVED: 'archived'>
COMMUNITY = <SupportLevel.COMMUNITY: 'community'>
CERTIFIED = <SupportLevel.CERTIFIED: 'certified'>
precedence: int
21    @property
22    def precedence(self) -> int:
23        """Numeric precedence for ordering comparisons.
24
25        Higher values indicate higher support commitment.
26        """
27        return _SUPPORT_LEVEL_PRECEDENCE[self]

Numeric precedence for ordering comparisons.

Higher values indicate higher support commitment.

@classmethod
def from_precedence(cls, precedence: int) -> SupportLevel:
29    @classmethod
30    def from_precedence(cls, precedence: int) -> SupportLevel:
31        """Look up a `SupportLevel` by its numeric precedence value.
32
33        Raises `ValueError` when the precedence is not recognised.
34        """
35        for member in cls:
36            if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence:
37                return member
38        valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
39        raise ValueError(
40            f"Unrecognized support-level precedence: {precedence!r}. "
41            f"Expected one of: {valid}."
42        ) from None

Look up a SupportLevel by its numeric precedence value.

Raises ValueError when the precedence is not recognised.

@classmethod
def parse(cls, value: str) -> SupportLevel:
44    @classmethod
45    def parse(cls, value: str) -> SupportLevel:
46        """Parse a string into a `SupportLevel`.
47
48        Accepts a keyword (`archived`, `community`, `certified`)
49        or a legacy integer string (`100`, `200`, `300`).
50
51        Raises `ValueError` when the value is not recognised.
52        """
53        try:
54            return cls(value)
55        except ValueError:
56            pass
57        # Fallback: try interpreting as an integer precedence value.
58        try:
59            return cls.from_precedence(int(value))
60        except (ValueError, KeyError):
61            pass
62        valid_kw = ", ".join(f"`{m.value}`" for m in cls)
63        valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
64        raise ValueError(
65            f"Unrecognized support level: {value!r}. "
66            f"Expected keyword ({valid_kw}) or integer ({valid_int})."
67        ) from None

Parse a string into a SupportLevel.

Accepts a keyword (archived, community, certified) or a legacy integer string (100, 200, 300).

Raises ValueError when the value is not recognised.

@dataclass
class UnpublishedConnector:
29@dataclass
30class UnpublishedConnector:
31    """A connector whose current local version is not published on GCS."""
32
33    connector_name: str
34    local_version: str

A connector whose current local version is not published on GCS.

UnpublishedConnector(connector_name: str, local_version: str)
connector_name: str
local_version: str
@dataclass(frozen=True)
class ValidateOptions:
37@dataclass(frozen=True)
38class ValidateOptions:
39    """Options that influence which validators run and how."""
40
41    docs_path: str | None = None
42    """Path to the connector's documentation file (for `validate_docs_path_exists`)."""
43
44    is_prerelease: bool = False
45    """Whether this is a pre-release build (skips version-decrement checks)."""

Options that influence which validators run and how.

ValidateOptions(docs_path: str | None = None, is_prerelease: bool = False)
docs_path: str | None = None

Path to the connector's documentation file (for validate_docs_path_exists).

is_prerelease: bool = False

Whether this is a pre-release build (skips version-decrement checks).

@dataclass
class ValidationResult:
48@dataclass
49class ValidationResult:
50    """Aggregate result from running all validators."""
51
52    passed: bool = True
53    errors: list[str] = field(default_factory=list)
54    validators_run: int = 0
55
56    def add_error(self, message: str) -> None:
57        self.passed = False
58        self.errors.append(message)

Aggregate result from running all validators.

ValidationResult( passed: bool = True, errors: list[str] = <factory>, validators_run: int = 0)
passed: bool = True
errors: list[str]
validators_run: int = 0
def add_error(self, message: str) -> None:
56    def add_error(self, message: str) -> None:
57        self.passed = False
58        self.errors.append(message)
class VersionListResult(pydantic.main.BaseModel):
114class VersionListResult(BaseModel):
115    """Result of listing versions for a connector."""
116
117    connector_name: str = Field(description="The connector technical name")
118    bucket_name: str = Field(description="The GCS bucket name")
119    version_count: int = Field(description="Number of versions found")
120    versions: list[str] = Field(description="List of version strings")

Result of listing versions for a connector.

connector_name: str = PydanticUndefined

The connector technical name

bucket_name: str = PydanticUndefined

The GCS bucket name

version_count: int = PydanticUndefined

Number of versions found

versions: list[str] = PydanticUndefined

List of version strings

@dataclass
class YankResult:
30@dataclass
31class YankResult:
32    """Result of a yank or unyank operation."""
33
34    connector_name: str
35    version: str
36    bucket_name: str
37    action: str  # "yank" or "unyank"
38    success: bool
39    message: str
40    dry_run: bool = False
41
42    def to_dict(self) -> dict[str, Any]:
43        """Convert to a dictionary for JSON serialization."""
44        return {
45            "connector_name": self.connector_name,
46            "version": self.version,
47            "bucket_name": self.bucket_name,
48            "action": self.action,
49            "success": self.success,
50            "message": self.message,
51            "dry_run": self.dry_run,
52        }

Result of a yank or unyank operation.

YankResult( connector_name: str, version: str, bucket_name: str, action: str, success: bool, message: str, dry_run: bool = False)
connector_name: str
version: str
bucket_name: str
action: str
success: bool
message: str
dry_run: bool = False
def to_dict(self) -> dict[str, typing.Any]:
42    def to_dict(self) -> dict[str, Any]:
43        """Convert to a dictionary for JSON serialization."""
44        return {
45            "connector_name": self.connector_name,
46            "version": self.version,
47            "bucket_name": self.bucket_name,
48            "action": self.action,
49            "success": self.success,
50            "message": self.message,
51            "dry_run": self.dry_run,
52        }

Convert to a dictionary for JSON serialization.

def compile_registry( *, store: RegistryStore, connector_name: list[str] | None = None, dry_run: bool = False, with_secrets_mask: bool = False, with_legacy_migration: str | None = None, with_metrics: bool = True, force: bool = False) -> CompileResult:
1510def compile_registry(
1511    *,
1512    store: RegistryStore,
1513    connector_name: list[str] | None = None,
1514    dry_run: bool = False,
1515    with_secrets_mask: bool = False,
1516    with_legacy_migration: str | None = None,
1517    with_metrics: bool = True,
1518    force: bool = False,
1519) -> CompileResult:
1520    """Compile the registry: sync latest/ dirs and write index files.
1521
1522    Steps:
1523        1. Glob for all `metadata.yaml` to discover (connector, version) pairs.
1524        2. Glob for `version-yank.yml` to build the yanked set.
1525        3. Compute the latest GA semver per connector.
1526        4. Glob for `version=*` markers in `latest/` dirs for a fast check.
1527        5. Delete stale `latest/` dirs and recursively copy the versioned dir.
1528        5m. (Optional) Legacy migration: delete disabled registry entries.
1529        6. Write global registry JSONs and per-connector `versions.json`.
1530        6c. (Optional) Regenerate `specs_secrets_mask.yaml`.
1531
1532    Args:
1533        store: Registry store (bucket + optional prefix).
1534        connector_name: If provided, only resync `latest/` directories for
1535            these connectors (steps 4-5).  Index rebuilds (steps 6a-6c)
1536            always operate on the full set of connectors so that global
1537            registry files remain complete.
1538        dry_run: If True, report what would be done without writing.
1539        with_secrets_mask: If True, regenerate `specs_secrets_mask.yaml`.
1540        with_legacy_migration: If set, run the named migration step.
1541            Currently supported: `"v1"` — delete `{registry_type}.json`
1542            files for connectors whose `registryOverrides.{registry}.enabled`
1543            is `false`.
1544        with_metrics: If True, inject latest connector metrics from the
1545            analytics JSONL export into `generated.metrics`.
1546        force: If True, resync all connectors' latest/ directories even if the
1547            existing version marker matches the computed latest version. This
1548            is useful when metadata content changes without a version bump.
1549
1550    Returns:
1551        A `CompileResult` describing what was done.
1552    """
1553    if with_legacy_migration and with_legacy_migration not in LEGACY_MIGRATION_VERSIONS:
1554        raise ValueError(
1555            f"Unknown legacy migration version: {with_legacy_migration!r}. "
1556            f"Supported: {', '.join(LEGACY_MIGRATION_VERSIONS)}"
1557        )
1558
1559    result = CompileResult(target=store.bucket_root, dry_run=dry_run)
1560
1561    token = get_gcs_credentials_token()
1562    fs = gcsfs.GCSFileSystem(token=token)
1563
1564    # --- Step 1 + 2: Scan versions and yanks ---
1565    # Always scan ALL connectors so that index rebuilds (step 6) are complete.
1566    _log_progress("Step 1-2: Scanning versions and yank markers...")
1567    connector_versions, yanked = _scan_versions_and_yanks(
1568        fs,
1569        store=store,
1570        connector_name=None,
1571    )
1572    result.connectors_scanned = len(connector_versions)
1573    result.versions_found = sum(len(v) for v in connector_versions.values())
1574    result.yanked_versions = len(yanked)
1575    _log_progress(
1576        "  Found %d connectors, %d versions, %d yanked",
1577        result.connectors_scanned,
1578        result.versions_found,
1579        result.yanked_versions,
1580    )
1581
1582    # --- Step 2b: Scan release candidates ---
1583    _log_progress("Step 2b: Scanning for active release candidates...")
1584    rc_versions = _scan_release_candidates(
1585        fs,
1586        store=store,
1587        connector_name=connector_name,
1588    )
1589    _log_progress("  Found %d active release candidates", len(rc_versions))
1590
1591    # --- Step 3: Compute latest ---
1592    _log_progress("Step 3: Computing latest GA version per connector...")
1593    latest_versions = _compute_latest_versions(
1594        connector_versions=connector_versions,
1595        yanked=yanked,
1596        fs=fs,
1597        store=store,
1598    )
1599    _log_progress("  Computed latest for %d connectors", len(latest_versions))
1600
1601    # --- Step 4: Check existing latest markers ---
1602    # When --connector-name is set, only check/sync those connectors (steps 4-5).
1603    # Index rebuilds in step 6 always use the full unfiltered data.
1604    if connector_name:
1605        connector_name_set = set(connector_name)
1606        sync_scope = {
1607            c: v for c, v in latest_versions.items() if c in connector_name_set
1608        }
1609        _log_progress(
1610            "  --connector-name filter: syncing %d of %d connectors",
1611            len(sync_scope),
1612            len(latest_versions),
1613        )
1614    else:
1615        sync_scope = latest_versions
1616
1617    _log_progress("Step 4: Checking existing latest/ markers...")
1618    sync_scope_names = list(sync_scope) if connector_name else None
1619    existing_markers = _scan_latest_markers(
1620        fs,
1621        store=store,
1622        connector_name=sync_scope_names,
1623    )
1624    _log_progress("  Found %d existing markers", len(existing_markers))
1625
1626    stale_connectors: list[str] = []
1627    for connector, expected_version in sync_scope.items():
1628        current_marker = existing_markers.get(connector)
1629        if not force and current_marker == expected_version:
1630            result.latest_already_current += 1
1631        else:
1632            stale_connectors.append(connector)
1633
1634    _log_progress(
1635        "  %d connectors need latest/ update, %d already current",
1636        len(stale_connectors),
1637        result.latest_already_current,
1638    )
1639
1640    # --- Step 5: Resync stale latest/ dirs (parallel) ---
1641    if stale_connectors:
1642        _log_progress(
1643            "Step 5: Syncing %d stale latest/ directories (max_workers=%d)...",
1644            len(stale_connectors),
1645            _COMPILE_SYNC_MAX_WORKERS,
1646        )
1647
1648        def _sync_one_connector(connector: str) -> None:
1649            """Sync a single connector's latest/ dir."""
1650            version = latest_versions[connector]
1651            _sync_latest_dir(
1652                fs,
1653                store=store,
1654                connector=connector,
1655                version=version,
1656                dry_run=dry_run,
1657            )
1658            if not dry_run:
1659                _apply_overrides_to_latest_entry(
1660                    fs,
1661                    store=store,
1662                    connector=connector,
1663                    version=version,
1664                )
1665
1666        sorted_stale = sorted(stale_connectors)
1667        with ThreadPoolExecutor(max_workers=_COMPILE_SYNC_MAX_WORKERS) as pool:
1668            futures = {pool.submit(_sync_one_connector, c): c for c in sorted_stale}
1669            for i, future in enumerate(as_completed(futures), 1):
1670                connector = futures[future]
1671                try:
1672                    future.result()
1673                    result.latest_updated += 1
1674                except Exception as exc:
1675                    error_msg = f"Failed to sync latest/ for {connector}: {exc}"
1676                    logger.error(error_msg)
1677                    result.errors.append(error_msg)
1678                    # Delete the (possibly partial) latest/ dir so the next
1679                    # compile retries this connector from scratch.
1680                    try:
1681                        _delete_latest_dir(
1682                            fs,
1683                            store=store,
1684                            connector=connector,
1685                        )
1686                        logger.info(
1687                            "Cleaned up partial latest/ for %s after failure",
1688                            connector,
1689                        )
1690                    except Exception as cleanup_exc:
1691                        logger.warning(
1692                            "Could not clean up latest/ for %s: %s",
1693                            connector,
1694                            cleanup_exc,
1695                        )
1696                if i % 100 == 0:
1697                    _log_progress("  Synced %d / %d...", i, len(sorted_stale))
1698    else:
1699        _log_progress("Step 5: All latest/ directories are current, nothing to sync.")
1700
1701    # --- Step 5m: Legacy migration (optional) ---
1702    if with_legacy_migration == "v1":
1703        _log_progress(
1704            "Step 5m: Legacy migration v1 — deleting disabled registry entries..."
1705        )
1706        migration_deleted = _cleanup_disabled_registry_entries(
1707            fs,
1708            store=store,
1709            connector_versions=connector_versions,
1710            dry_run=dry_run,
1711        )
1712        total_deleted = sum(len(v) for v in migration_deleted.values())
1713        if migration_deleted:
1714            for conn, paths in sorted(migration_deleted.items()):
1715                _log_progress(
1716                    "  %s: %s %d files",
1717                    conn,
1718                    "would delete" if dry_run else "deleted",
1719                    len(paths),
1720                )
1721        _log_progress(
1722            "  Migration v1: %s %d files across %d connectors",
1723            "would delete" if dry_run else "deleted",
1724            total_deleted,
1725            len(migration_deleted),
1726        )
1727
1728    # --- Step 5n: Read latest connector metrics (optional) ---
1729    metrics_bundle = None
1730    if with_metrics and store.store_type == StoreType.CORAL:
1731        _log_progress("Step 5n: Reading latest connector metrics JSONL...")
1732        try:
1733            metrics_bundle = read_latest_connector_metrics()
1734            result.metrics_source = metrics_bundle.blob_path
1735            result.metrics_connector_count = metrics_bundle.connector_count
1736            if metrics_bundle.blob_path:
1737                _log_progress(
1738                    "  Loaded metrics for %d connectors from gs://%s",
1739                    metrics_bundle.connector_count,
1740                    metrics_bundle.blob_path,
1741                )
1742            else:
1743                _log_progress("  No connector metrics JSONL file found.")
1744        except Exception as exc:
1745            error_msg = f"Failed to read connector metrics JSONL: {exc}"
1746            logger.warning(error_msg)
1747            result.metrics_error = error_msg
1748            _log_progress("  %s", error_msg)
1749    elif with_metrics:
1750        _log_progress("Step 5n: Skipping connector metrics for non-coral registry.")
1751    else:
1752        _log_progress("Step 5n: Connector metrics injection disabled.")
1753
1754    # --- Step 6a: Compile global registry JSONs ---
1755    _log_progress("Step 6a: Compiling global registry JSON files...")
1756    all_registry_entries: list[dict[str, Any]] = []  # collected for Step 6c
1757    entries_by_registry_type: dict[str, list[dict[str, Any]]] = {}
1758    for registry_type in VALID_REGISTRIES:
1759        entries = _compile_global_registry(
1760            fs,
1761            store=store,
1762            latest_versions=latest_versions,
1763            registry_type=registry_type,
1764        )
1765
1766        # Inject release candidate info into entries that have active RCs.
1767        # For each connector with a release_candidate/metadata.yaml, read the
1768        # RC version's {registry_type}.json and add it under
1769        # releases.releaseCandidates[version] — matching the legacy format.
1770        if rc_versions:
1771            rc_entries: dict[str, dict[str, Any]] = {}
1772            for connector, rc_ver in rc_versions.items():
1773                rc_entry = _read_rc_registry_entry(
1774                    fs,
1775                    store=store,
1776                    connector=connector,
1777                    rc_version=rc_ver,
1778                    registry_type=registry_type,
1779                )
1780                if rc_entry:
1781                    docker_repo = rc_entry.get(
1782                        "dockerRepository",
1783                        f"airbyte/{connector}",
1784                    )
1785                    rc_entries[docker_repo] = {
1786                        "version": rc_ver,
1787                        "entry": rc_entry,
1788                    }
1789            if rc_entries:
1790                entries = _apply_release_candidates_to_entries(entries, rc_entries)
1791                _log_progress(
1792                    "  Injected %d release candidates into %s registry",
1793                    len(rc_entries),
1794                    registry_type,
1795                )
1796
1797        if metrics_bundle is not None:
1798            injected = apply_metrics_to_registry_entries(entries, metrics_bundle)
1799            result.metrics_registry_entries += injected
1800            _log_progress(
1801                "  Injected metrics into %d %s registry entries",
1802                injected,
1803                registry_type,
1804            )
1805
1806        all_registry_entries.extend(entries)
1807        entries_by_registry_type[registry_type] = entries
1808        registry_json = _build_global_registry_json(entries)
1809        entry_count = len(registry_json["sources"]) + len(registry_json["destinations"])
1810
1811        if registry_type == "cloud":
1812            result.cloud_registry_entries = entry_count
1813        else:
1814            result.oss_registry_entries = entry_count
1815
1816        if dry_run:
1817            _log_progress(
1818                "  [DRY RUN] Would write %s_registry.json (%d entries)",
1819                registry_type,
1820                entry_count,
1821            )
1822        else:
1823            content = json.dumps(registry_json, indent=2, sort_keys=True) + "\n"
1824            path_prefix = f"{store.prefix}/" if store.prefix else ""
1825            _write_gcs_blob_with_custom_ttl(
1826                bucket_name=store.bucket,
1827                blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/{registry_type}_registry.json",
1828                content=content,
1829                cache_control=_REGISTRY_INDEX_CACHE_CONTROL,
1830            )
1831            _log_progress(
1832                "  Wrote %s_registry.json (%d entries)",
1833                registry_type,
1834                entry_count,
1835            )
1836
1837    # --- Step 6a.2: Compile composite registry JSON (superset) ---
1838    _log_progress("Step 6a.2: Compiling composite_registry.json (superset)...")
1839    composite_json = _build_composite_registry_json(
1840        cloud_entries=entries_by_registry_type.get("cloud", []),
1841        oss_entries=entries_by_registry_type.get("oss", []),
1842    )
1843    composite_entry_count = len(composite_json["sources"]) + len(
1844        composite_json["destinations"]
1845    )
1846    result.composite_registry_entries = composite_entry_count
1847    if dry_run:
1848        _log_progress(
1849            "  [DRY RUN] Would write composite_registry.json (%d entries)",
1850            composite_entry_count,
1851        )
1852    else:
1853        composite_content = json.dumps(composite_json, indent=2, sort_keys=True) + "\n"
1854        path_prefix = f"{store.prefix}/" if store.prefix else ""
1855        _write_gcs_blob_with_custom_ttl(
1856            bucket_name=store.bucket,
1857            blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/composite_registry.json",
1858            content=composite_content,
1859            cache_control=_REGISTRY_INDEX_CACHE_CONTROL,
1860        )
1861        _log_progress(
1862            "  Wrote composite_registry.json (%d entries)",
1863            composite_entry_count,
1864        )
1865
1866    # --- Step 6b: Per-connector version indexes (parallel) ---
1867    _log_progress(
1868        "Step 6b: Writing per-connector version indexes (max_workers=%d)...",
1869        _COMPILE_WRITE_MAX_WORKERS,
1870    )
1871    base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte"
1872    sorted_connectors = sorted(connector_versions)
1873
1874    def _write_one_version_index(connector: str) -> None:
1875        """Build and write a single connector's versions.json."""
1876        versions = connector_versions[connector]
1877        latest_v = latest_versions.get(connector)
1878        rc_v = rc_versions.get(connector)
1879        index = _build_version_index(
1880            fs,
1881            store=store,
1882            connector=connector,
1883            versions=versions,
1884            yanked=yanked,
1885            latest_version=latest_v,
1886            rc_version=rc_v,
1887        )
1888        index_path = f"{base}/{connector}/versions.json"
1889        if dry_run:
1890            _log_progress(
1891                "  [DRY RUN] Would write %s/versions.json (%d versions)",
1892                connector,
1893                len(versions),
1894            )
1895        else:
1896            content = json.dumps(index, indent=2, sort_keys=True) + "\n"
1897            with fs.open(index_path, "w") as f:
1898                f.write(content)
1899
1900    with ThreadPoolExecutor(max_workers=_COMPILE_WRITE_MAX_WORKERS) as pool:
1901        futures = {
1902            pool.submit(_write_one_version_index, c): c for c in sorted_connectors
1903        }
1904        for i, future in enumerate(as_completed(futures), 1):
1905            connector = futures[future]
1906            try:
1907                future.result()
1908                result.version_indexes_written += 1
1909            except Exception as exc:
1910                error_msg = f"Failed to write versions.json for {connector}: {exc}"
1911                logger.error(error_msg)
1912                result.errors.append(error_msg)
1913            if i % 100 == 0:
1914                _log_progress(
1915                    "  Wrote %d / %d version indexes...", i, len(sorted_connectors)
1916                )
1917
1918    # --- Step 6c: Specs secrets mask (optional) ---
1919    if with_secrets_mask:
1920        _log_progress("Step 6c: Generating specs secrets mask...")
1921        # Reuse entries collected during Step 6a to avoid redundant GCS reads.
1922        secret_names = _extract_secret_property_names(all_registry_entries)
1923        sorted_names = sorted(secret_names)
1924        result.specs_secrets_mask_properties = len(sorted_names)
1925        mask_content = yaml.dump({"properties": sorted_names}, default_flow_style=False)
1926        mask_path = (
1927            f"{store.bucket_root}/{_REGISTRIES_PREFIX}/{_SPECS_SECRETS_MASK_FILENAME}"
1928        )
1929
1930        _log_progress(
1931            "  Found %d secret properties: %s",
1932            len(sorted_names),
1933            ", ".join(sorted_names),
1934        )
1935
1936        if dry_run:
1937            _log_progress(
1938                "  [DRY RUN] Would write %s",
1939                _SPECS_SECRETS_MASK_FILENAME,
1940            )
1941        else:
1942            with fs.open(mask_path, "w") as f:
1943                f.write(mask_content)
1944            _log_progress(
1945                "  Wrote %s",
1946                _SPECS_SECRETS_MASK_FILENAME,
1947            )
1948
1949    _log_progress(result.summary())
1950    return result

Compile the registry: sync latest/ dirs and write index files.

Steps:
  1. Glob for all metadata.yaml to discover (connector, version) pairs.
  2. Glob for version-yank.yml to build the yanked set.
  3. Compute the latest GA semver per connector.
  4. Glob for version=* markers in latest/ dirs for a fast check.
  5. Delete stale latest/ dirs and recursively copy the versioned dir. 5m. (Optional) Legacy migration: delete disabled registry entries.
  6. Write global registry JSONs and per-connector versions.json. 6c. (Optional) Regenerate specs_secrets_mask.yaml.
Arguments:
  • store: Registry store (bucket + optional prefix).
  • connector_name: If provided, only resync latest/ directories for these connectors (steps 4-5). Index rebuilds (steps 6a-6c) always operate on the full set of connectors so that global registry files remain complete.
  • dry_run: If True, report what would be done without writing.
  • with_secrets_mask: If True, regenerate specs_secrets_mask.yaml.
  • with_legacy_migration: If set, run the named migration step. Currently supported: "v1" — delete {registry_type}.json files for connectors whose registryOverrides.{registry}.enabled is false.
  • with_metrics: If True, inject latest connector metrics from the analytics JSONL export into generated.metrics.
  • force: If True, resync all connectors' latest/ directories even if the existing version marker matches the computed latest version. This is useful when metadata content changes without a version bump.
Returns:

A CompileResult describing what was done.

def create_progressive_rollout_blob( repo_path: pathlib.Path, connector_name: str, dry_run: bool = False, bucket_name: str | None = None) -> ConnectorPublishResult:
108def create_progressive_rollout_blob(
109    repo_path: Path,
110    connector_name: str,
111    dry_run: bool = False,
112    bucket_name: str | None = None,
113) -> ConnectorPublishResult:
114    """Create the `release_candidate/metadata.yaml` blob in GCS.
115
116    Copies the versioned metadata (e.g. `1.2.3-rc.1/metadata.yaml` or
117    `2.1.0/metadata.yaml` for GA progressive rollouts) to
118    `release_candidate/metadata.yaml` so the platform knows a progressive
119    rollout is active for this connector.
120
121    The connector's `metadata.yaml` on disk must declare a version that
122    is valid for progressive rollout (i.e. not a `-preview` build).
123    The versioned blob must already exist in GCS (i.e. the version must
124    have been published first).
125
126    Requires `GCS_CREDENTIALS` environment variable to be set.
127    """
128    if not bucket_name:
129        raise ValueError("bucket_name is required for progressive rollout create.")
130
131    metadata = get_connector_metadata(repo_path, connector_name)
132    version = metadata.docker_image_tag
133    docker_repo = metadata.docker_repository
134
135    if not is_valid_for_progressive_rollout(version):
136        return ConnectorPublishResult(
137            connector=metadata.name,
138            version=version,
139            action="progressive-rollout-create",
140            status="failure",
141            docker_image=f"{docker_repo}:{version}",
142            registry_updated=False,
143            message=f"Version '{version}' is not valid for progressive rollout. "
144            "Preview versions are not supported.",
145        )
146
147    gcp_connector_dir = f"{METADATA_FOLDER}/{docker_repo}"
148    versioned_path = f"{gcp_connector_dir}/{version}/{METADATA_FILE_NAME}"
149    rc_path = (
150        f"{gcp_connector_dir}/{RELEASE_CANDIDATE_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}"
151    )
152
153    if dry_run:
154        return ConnectorPublishResult(
155            connector=metadata.name,
156            version=version,
157            action="progressive-rollout-create",
158            status="dry-run",
159            docker_image=f"{docker_repo}:{version}",
160            registry_updated=False,
161            message=f"Would copy {versioned_path} to {rc_path}",
162        )
163
164    storage_client = get_gcs_storage_client()
165    gcs_bucket = storage_client.bucket(bucket_name)
166
167    # Verify the versioned blob exists
168    versioned_blob = gcs_bucket.blob(versioned_path)
169    if not versioned_blob.exists():
170        return ConnectorPublishResult(
171            connector=metadata.name,
172            version=version,
173            action="progressive-rollout-create",
174            status="failure",
175            docker_image=f"{docker_repo}:{version}",
176            registry_updated=False,
177            message=f"Versioned metadata not found: {versioned_path}. "
178            "The version must be published before creating the progressive rollout marker.",
179        )
180
181    # Copy the versioned metadata to the release_candidate/ path
182    gcs_bucket.copy_blob(versioned_blob, gcs_bucket, rc_path)
183    logger.info(
184        "Created progressive rollout metadata blob: %s (copied from %s)",
185        rc_path,
186        versioned_path,
187    )
188
189    return ConnectorPublishResult(
190        connector=metadata.name,
191        version=version,
192        action="progressive-rollout-create",
193        status="success",
194        docker_image=f"{docker_repo}:{version}",
195        registry_updated=True,
196        message=f"Created release_candidate/ blob for {metadata.name} at {rc_path} "
197        f"(copied from {versioned_path}).",
198    )

Create the release_candidate/metadata.yaml blob in GCS.

Copies the versioned metadata (e.g. 1.2.3-rc.1/metadata.yaml or 2.1.0/metadata.yaml for GA progressive rollouts) to release_candidate/metadata.yaml so the platform knows a progressive rollout is active for this connector.

The connector's metadata.yaml on disk must declare a version that is valid for progressive rollout (i.e. not a -preview build). The versioned blob must already exist in GCS (i.e. the version must have been published first).

Requires GCS_CREDENTIALS environment variable to be set.

def delete_progressive_rollout_blob( repo_path: pathlib.Path, connector_name: str, dry_run: bool = False, bucket_name: str | None = None) -> ConnectorPublishResult:
201def delete_progressive_rollout_blob(
202    repo_path: Path,
203    connector_name: str,
204    dry_run: bool = False,
205    bucket_name: str | None = None,
206) -> ConnectorPublishResult:
207    """Delete the `release_candidate/` metadata blob from GCS.
208
209    This is the only GCS operation needed when finalizing a progressive
210    rollout (both promote and rollback).  The versioned blob (e.g.
211    `1.2.3-rc.1/metadata.yaml`) is intentionally preserved as an
212    audit trail of what was actually deployed during the rollout.
213
214    Requires `GCS_CREDENTIALS` environment variable to be set.
215    """
216    if not bucket_name:
217        raise ValueError("bucket_name is required for progressive rollout cleanup.")
218
219    metadata = get_connector_metadata(repo_path, connector_name)
220    version = metadata.docker_image_tag
221    docker_repo = metadata.docker_repository
222
223    # NOTE: We intentionally do NOT gate on version format here.
224    # In the promote workflow the on-disk version will already be bumped
225    # to GA (e.g. 1.2.3) before cleanup runs.  The rc_path is derived
226    # solely from docker_repo + the fixed RELEASE_CANDIDATE_GCS_FOLDER_NAME
227    # constant, so the on-disk version is irrelevant.
228
229    gcp_connector_dir = f"{METADATA_FOLDER}/{docker_repo}"
230    rc_path = (
231        f"{gcp_connector_dir}/{RELEASE_CANDIDATE_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}"
232    )
233
234    if dry_run:
235        return ConnectorPublishResult(
236            connector=metadata.name,
237            version=version,
238            action="progressive-rollout-cleanup",
239            status="dry-run",
240            docker_image=f"{docker_repo}:{version}",
241            registry_updated=False,
242            message=f"Would delete release_candidate/ blob for {metadata.name} "
243            f"(version {version}) at {rc_path}",
244        )
245
246    storage_client = get_gcs_storage_client()
247    gcs_bucket = storage_client.bucket(bucket_name)
248    rc_blob = gcs_bucket.blob(rc_path)
249
250    if not rc_blob.exists():
251        logger.info(
252            "Progressive rollout metadata already deleted (idempotent): %s", rc_path
253        )
254        return ConnectorPublishResult(
255            connector=metadata.name,
256            version=version,
257            action="progressive-rollout-cleanup",
258            status="success",
259            docker_image=f"{docker_repo}:{version}",
260            registry_updated=False,
261            message=f"Progressive rollout metadata already deleted (no-op): {rc_path}",
262        )
263
264    rc_blob.delete()
265    logger.info("Deleted progressive rollout metadata blob: %s", rc_path)
266
267    return ConnectorPublishResult(
268        connector=metadata.name,
269        version=version,
270        action="progressive-rollout-cleanup",
271        status="success",
272        docker_image=f"{docker_repo}:{version}",
273        registry_updated=True,
274        message=f"Deleted release_candidate/ blob for {metadata.name} at {rc_path}.",
275    )

Delete the release_candidate/ metadata blob from GCS.

This is the only GCS operation needed when finalizing a progressive rollout (both promote and rollback). The versioned blob (e.g. 1.2.3-rc.1/metadata.yaml) is intentionally preserved as an audit trail of what was actually deployed during the rollout.

Requires GCS_CREDENTIALS environment variable to be set.

def find_unpublished_connectors( repo_path: str | pathlib.Path, bucket_name: str, connector_names: list[str] | None = None) -> AuditResult:
 90def find_unpublished_connectors(
 91    repo_path: str | Path,
 92    bucket_name: str,
 93    connector_names: list[str] | None = None,
 94) -> AuditResult:
 95    """Find connectors whose local version is not published on GCS.
 96
 97    For each connector in the local checkout, reads `dockerImageTag` from
 98    `metadata.yaml` and checks whether `metadata/<docker-repo>/<version>/metadata.yaml`
 99    exists in the GCS bucket.  Connectors that are archived, disabled on all
100    registries, or have RC versions are skipped.
101
102    Args:
103        repo_path: Path to the Airbyte monorepo checkout.
104        bucket_name: GCS bucket name to check against.
105        connector_names: Optional list of connector names to check.
106            If `None`, discovers all connectors in the repo.
107
108    Returns:
109        An `AuditResult` containing unpublished connectors and metadata.
110    """
111    repo_path = Path(repo_path)
112    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
113
114    if not connectors_dir.exists():
115        raise ValueError(f"Connectors directory not found: {connectors_dir}")
116
117    # Discover connector names if not provided
118    if connector_names is None:
119        connector_names = sorted(
120            d.name
121            for d in connectors_dir.iterdir()
122            if d.is_dir() and (d / METADATA_FILE_NAME).exists()
123        )
124
125    result = AuditResult()
126
127    # Collect connectors and their versions first, then batch-check GCS
128    to_check: list[tuple[str, str]] = []  # (connector_name, version)
129
130    for name in connector_names:
131        metadata_path = connectors_dir / name / METADATA_FILE_NAME
132        metadata = _read_local_metadata(metadata_path)
133        if metadata is None:
134            result.errors.append(f"{name}: metadata.yaml not found or unreadable")
135            continue
136
137        if _is_archived(metadata):
138            result.skipped_archived.append(name)
139            continue
140
141        if _is_disabled_on_all_registries(metadata):
142            result.skipped_disabled.append(name)
143            continue
144
145        data = metadata.get("data", {})
146        version = data.get("dockerImageTag")
147        if not version:
148            result.errors.append(f"{name}: no dockerImageTag in metadata")
149            continue
150
151        if _is_rc_version(version):
152            result.skipped_rc.append(name)
153            continue
154
155        to_check.append((name, version))
156
157    if not to_check:
158        result.checked_count = 0
159        return result
160
161    # Check GCS for each connector version
162    storage_client = get_gcs_storage_client()
163    bucket = storage_client.bucket(bucket_name)
164
165    for name, version in to_check:
166        result.checked_count += 1
167        blob_path = f"{METADATA_FOLDER}/airbyte/{name}/{version}/{METADATA_FILE_NAME}"
168        blob = bucket.blob(blob_path)
169
170        try:
171            exists = blob.exists()
172        except Exception as e:
173            result.errors.append(f"{name}: GCS check failed: {e}")
174            continue
175
176        if not exists:
177            logger.info(
178                "Unpublished: %s version %s (checked %s)",
179                name,
180                version,
181                blob_path,
182            )
183            result.unpublished.append(
184                UnpublishedConnector(connector_name=name, local_version=version)
185            )
186
187    logger.info(
188        "Audit complete: %d checked, %d unpublished, %d archived-skipped, %d disabled-skipped, %d rc-skipped",
189        result.checked_count,
190        len(result.unpublished),
191        len(result.skipped_archived),
192        len(result.skipped_disabled),
193        len(result.skipped_rc),
194    )
195
196    return result

Find connectors whose local version is not published on GCS.

For each connector in the local checkout, reads dockerImageTag from metadata.yaml and checks whether metadata/<docker-repo>/<version>/metadata.yaml exists in the GCS bucket. Connectors that are archived, disabled on all registries, or have RC versions are skipped.

Arguments:
  • repo_path: Path to the Airbyte monorepo checkout.
  • bucket_name: GCS bucket name to check against.
  • connector_names: Optional list of connector names to check. If None, discovers all connectors in the repo.
Returns:

An AuditResult containing unpublished connectors and metadata.

def generate_version_artifacts( metadata_file: pathlib.Path, docker_image: str, output_dir: pathlib.Path | None = None, repo_root: pathlib.Path | None = None, dry_run: bool = False, with_validate: bool = True, with_dependency_dump: bool = True, with_sbom: bool = True) -> GenerateResult:
644def generate_version_artifacts(
645    metadata_file: Path,
646    docker_image: str,
647    output_dir: Path | None = None,
648    repo_root: Path | None = None,
649    dry_run: bool = False,
650    with_validate: bool = True,
651    with_dependency_dump: bool = True,
652    with_sbom: bool = True,
653) -> GenerateResult:
654    """Generate all version artifacts for a connector release.
655
656    Artifacts are enriched with git commit info, SBOM URL, and (when applicable)
657    components SHA before writing.  Validation is run after generation by default.
658
659    Args:
660        metadata_file: Path to the connector's `metadata.yaml`.
661        docker_image: Docker image to run spec against (e.g. `airbyte/source-faker:6.2.38`).
662        output_dir: Directory to write artifacts to.  If `None`, a temp directory is created.
663        repo_root: Root of the Airbyte repo checkout (for resolving `doc.md`).
664            If `None`, inferred by walking up from `metadata_file`.
665        dry_run: If `True`, report what would be generated without writing or running docker.
666        with_validate: If `True` (default), run metadata validators after generation.
667            Pass `False` (`--no-validate`) to skip.
668        with_dependency_dump: If `True` (default), generate `dependencies.json`
669            for Python connectors.  Pass `False` (`--no-dependency-dump`) to skip.
670        with_sbom: If `True` (default), generate `spdx.json` (SBOM) for
671            connectors.  Pass `False` (`--no-sbom`) to skip.
672
673    Returns:
674        A `GenerateResult` describing what was produced.
675    """
676    # --- Load metadata ---
677    if not metadata_file.exists():
678        raise FileNotFoundError(f"Metadata file not found: {metadata_file}")
679
680    raw_metadata: dict[str, Any] = yaml.safe_load(metadata_file.read_text())
681    metadata_data: dict[str, Any] = raw_metadata.get("data", {})
682
683    connector_name = metadata_data.get("dockerRepository", "unknown").replace(
684        "airbyte/", ""
685    )
686    version = metadata_data.get("dockerImageTag", "unknown")
687
688    # --- Resolve output directory ---
689    if output_dir is None:
690        output_dir = Path(
691            tempfile.mkdtemp(prefix=f"connector-artifacts-{connector_name}-{version}-")
692        )
693    output_dir.mkdir(parents=True, exist_ok=True)
694
695    result = GenerateResult(
696        connector_name=connector_name,
697        version=version,
698        docker_image=docker_image,
699        output_dir=str(output_dir),
700        dry_run=dry_run,
701    )
702
703    if dry_run:
704        logger.info("[DRY RUN] Would generate artifacts to %s", output_dir)
705        result.artifacts_written = [
706            "metadata.yaml",
707            "icon.svg",
708            "doc.md",
709            "cloud.json",
710            "oss.json",
711            "manifest.yaml (if present)",
712            "components.zip (if components.py present)",
713            "components.zip.sha256 (if components.py present)",
714            f"version={version}",
715        ]
716        if with_sbom:
717            result.artifacts_written.append(SBOM_FILE_NAME)
718        if with_dependency_dump:
719            result.artifacts_written.append("dependencies.json (if Python connector)")
720        return result
721
722    # --- Prepare metadata output ---
723    metadata_out = output_dir / "metadata.yaml"
724    result.artifacts_written.append("metadata.yaml")
725
726    # --- Enrich metadata with git info *before* building registry entries so
727    #     that `generated.git` propagates into `cloud.json` / `oss.json`. ---
728    raw_metadata = _enrich_metadata_git_info(raw_metadata, metadata_file)
729
730    # --- Generate SBOM from the connector Docker image ---
731    sbom_generated = False
732    if not with_sbom:
733        logger.info("SBOM generation disabled via --no-sbom.")
734    else:
735        try:
736            sbom_path = generate_sbom(docker_image, output_dir)
737        except RuntimeError as exc:
738            logger.warning("SBOM generation failed (non-fatal): %s", exc)
739        except (FileNotFoundError, subprocess.TimeoutExpired):
740            logger.warning("Docker not available or SBOM generation timed out.")
741        else:
742            result.artifacts_written.append(SBOM_FILE_NAME)
743            sbom_generated = True
744            logger.info("Generated SBOM: %s", sbom_path)
745
746    # --- Enrich metadata with SBOM URL ---
747    raw_metadata = _enrich_metadata_sbom_url(
748        raw_metadata, sbom_generated=sbom_generated
749    )
750
751    # --- Run docker spec for cloud and oss ---
752    specs: dict[str, dict[str, Any]] = {}
753    for mode in VALID_REGISTRIES:
754        try:
755            specs[mode] = _run_docker_spec(docker_image, mode)
756            logger.info("Got %s spec from docker image %s", mode, docker_image)
757        except RuntimeError as exc:
758            error_msg = f"Failed to get {mode} spec: {exc}"
759            logger.error(error_msg)
760            result.errors.append(error_msg)
761
762    # --- Generate dependencies.json for Python connectors ---
763    # This must happen *before* building registry entries so that the
764    # local dependencies data can be used for packageInfo without a GCS
765    # round-trip.
766    local_dependencies: dict[str, Any] | None = None
767    if not with_dependency_dump:
768        logger.info("Dependency generation disabled via --no-dependency-dump.")
769    elif _is_python_connector(metadata_data):
770        logger.info("Python connector detected — generating dependencies.json")
771        local_dependencies = generate_python_dependencies_file(
772            metadata_data=metadata_data,
773            docker_image=docker_image,
774            output_dir=output_dir,
775        )
776        if local_dependencies is not None:
777            result.artifacts_written.append(CONNECTOR_DEPENDENCY_FILE_NAME)
778    else:
779        logger.info(
780            "Non-Python connector (%s) — skipping dependencies.json generation.",
781            connector_name,
782        )
783
784    # --- Generate registry entries (cloud.json, oss.json) ---
785    for registry_type in VALID_REGISTRIES:
786        if not is_registry_enabled(metadata_data, registry_type):
787            logger.info(
788                "Registry type %s is not enabled for %s, skipping %s.json generation.",
789                registry_type,
790                connector_name,
791                registry_type,
792            )
793            continue
794
795        spec = specs.get(registry_type)
796        if spec is None:
797            error_msg = (
798                f"Cannot generate {registry_type}.json: no spec available "
799                f"(docker spec for {registry_type} failed or was not run)."
800            )
801            result.errors.append(error_msg)
802            continue
803
804        registry_entry = _build_registry_entry(
805            metadata_data,
806            registry_type,
807            spec,
808            local_dependencies=local_dependencies,
809        )
810
811        out_path = output_dir / f"{registry_type}.json"
812        out_path.write_text(
813            json.dumps(registry_entry, indent=2, sort_keys=True, default=_json_serial)
814            + "\n"
815        )
816        result.artifacts_written.append(f"{registry_type}.json")
817        logger.info("Wrote %s", out_path)
818
819    # --- Copy icon.svg (sibling of metadata.yaml in the connector directory) ---
820    icon_source = metadata_file.parent / "icon.svg"
821    if icon_source.is_file():
822        icon_out = output_dir / "icon.svg"
823        shutil.copy2(icon_source, icon_out)
824        result.artifacts_written.append("icon.svg")
825        logger.info("Wrote %s", icon_out)
826    else:
827        logger.warning("No icon.svg found at %s.", icon_source)
828        result.errors.append("Icon file is missing.")
829
830    # --- Copy doc.md (derived from documentationUrl in metadata) ---
831    if repo_root is None:
832        # Infer repo root by walking up from metadata_file looking for .git
833        # Note: .git can be a directory (normal clone) or a file (git worktree)
834        # Resolve to absolute path first so the walk-up works with relative paths.
835        candidate = metadata_file.resolve().parent
836        while candidate != candidate.parent:
837            git_indicator = candidate / ".git"
838            if git_indicator.is_dir() or git_indicator.is_file():
839                repo_root = candidate
840                break
841            candidate = candidate.parent
842
843    if repo_root is not None:
844        doc_source = _resolve_doc_path(metadata_data, repo_root)
845        if doc_source is not None and doc_source.is_file():
846            doc_out = output_dir / DOC_FILE_NAME
847            shutil.copy2(doc_source, doc_out)
848            result.artifacts_written.append(DOC_FILE_NAME)
849            logger.info("Wrote %s (from %s)", doc_out, doc_source)
850        else:
851            error_msg = (
852                f"Documentation file not found: {doc_source}. "
853                f"Derived from documentationUrl in metadata."
854            )
855            logger.error(error_msg)
856            result.errors.append(error_msg)
857    else:
858        error_msg = "Cannot resolve doc.md: repo root not found."
859        logger.error(error_msg)
860        result.errors.append(error_msg)
861
862    # --- Copy manifest.yaml (from connector root, if present) ---
863    connector_dir = metadata_file.parent
864    manifest_source = connector_dir / MANIFEST_FILE_NAME
865    components_sha256: str | None = None
866    if manifest_source.is_file():
867        manifest_out = output_dir / MANIFEST_FILE_NAME
868        shutil.copy2(manifest_source, manifest_out)
869        result.artifacts_written.append(MANIFEST_FILE_NAME)
870        logger.info("Wrote %s", manifest_out)
871
872        # --- Generate components.zip if components.py exists ---
873        components_source = connector_dir / COMPONENTS_PY_FILE_NAME
874        if components_source.is_file():
875            zip_path, sha256_path = _create_components_zip(
876                manifest_path=manifest_source,
877                components_path=components_source,
878                output_dir=output_dir,
879            )
880            result.artifacts_written.append(COMPONENTS_ZIP_FILE_NAME)
881            result.artifacts_written.append(COMPONENTS_ZIP_SHA256_FILE_NAME)
882            logger.info("Wrote %s and %s", zip_path, sha256_path)
883            # Read back the SHA256 for metadata enrichment
884            components_sha256 = sha256_path.read_text().strip()
885    else:
886        logger.info(
887            "No manifest.yaml at %s — skipping manifest artifacts.", manifest_source
888        )
889
890    # --- Enrich metadata with components SHA (after zip creation) ---
891    raw_metadata = _enrich_metadata_components_sha(raw_metadata, components_sha256)
892
893    # --- Write final enriched metadata.yaml ---
894    # Use sort_keys=True to match the legacy pipeline's alphabetical key ordering.
895    # After Registry 2.0 launches we are free to change the key ordering.
896    metadata_out.write_text(
897        yaml.dump(raw_metadata, default_flow_style=False, sort_keys=True)
898    )
899    logger.info("Wrote enriched %s", metadata_out)
900
901    # --- Write version marker file (version=<semver>) ---
902    # This zero-byte file is used by the compile step as a fast-check marker.
903    # Including it in the generated artifacts means `latest/` gets the marker
904    # for free via a recursive copy, removing the need for a separate write.
905    marker_file = output_dir / f"version={version}"
906    marker_file.write_bytes(b"")
907    result.artifacts_written.append(f"version={version}")
908    logger.info("Wrote version marker %s", marker_file)
909
910    # --- Validate metadata (after generation) ---
911    if with_validate:
912        logger.info("Running post-generation validation...")
913        doc_path: str | None = None
914        if repo_root is not None:
915            resolved = _resolve_doc_path(metadata_data, repo_root)
916            doc_path = str(resolved) if resolved else None
917        validation = validate_metadata(
918            metadata_data=metadata_data,
919            opts=ValidateOptions(docs_path=doc_path),
920        )
921        if not validation.passed:
922            for err in validation.errors:
923                logger.error("Validation error: %s", err)
924            result.validation_errors = validation.errors
925        else:
926            logger.info("Validation passed (%d validators).", validation.validators_run)
927
928    return result

Generate all version artifacts for a connector release.

Artifacts are enriched with git commit info, SBOM URL, and (when applicable) components SHA before writing. Validation is run after generation by default.

Arguments:
  • metadata_file: Path to the connector's metadata.yaml.
  • docker_image: Docker image to run spec against (e.g. airbyte/source-faker:6.2.38).
  • output_dir: Directory to write artifacts to. If None, a temp directory is created.
  • repo_root: Root of the Airbyte repo checkout (for resolving doc.md). If None, inferred by walking up from metadata_file.
  • dry_run: If True, report what would be generated without writing or running docker.
  • with_validate: If True (default), run metadata validators after generation. Pass False (--no-validate) to skip.
  • with_dependency_dump: If True (default), generate dependencies.json for Python connectors. Pass False (--no-dependency-dump) to skip.
  • with_sbom: If True (default), generate spdx.json (SBOM) for connectors. Pass False (--no-sbom) to skip.
Returns:

A GenerateResult describing what was produced.

def get_connector_metadata( repo_path: pathlib.Path, connector_name: str) -> ConnectorMetadata:
41def get_connector_metadata(repo_path: Path, connector_name: str) -> ConnectorMetadata:
42    """Read connector metadata from metadata.yaml.
43
44    Args:
45        repo_path: Path to the Airbyte monorepo.
46        connector_name: The connector technical name (e.g., 'source-github').
47
48    Returns:
49        ConnectorMetadata object with the connector's metadata.
50
51    Raises:
52        FileNotFoundError: If the connector directory or metadata file doesn't exist.
53    """
54    connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name
55    if not connector_dir.exists():
56        raise FileNotFoundError(f"Connector directory not found: {connector_dir}")
57
58    metadata_file = connector_dir / METADATA_FILE_NAME
59    if not metadata_file.exists():
60        raise FileNotFoundError(f"Metadata file not found: {metadata_file}")
61
62    with open(metadata_file) as f:
63        metadata = yaml.safe_load(f)
64
65    data = metadata.get("data", {})
66    return ConnectorMetadata(
67        name=connector_name,
68        docker_repository=data.get("dockerRepository", f"airbyte/{connector_name}"),
69        docker_image_tag=data.get("dockerImageTag", "unknown"),
70        support_level=data.get("supportLevel"),
71        definition_id=data.get("definitionId"),
72    )

Read connector metadata from metadata.yaml.

Arguments:
  • repo_path: Path to the Airbyte monorepo.
  • connector_name: The connector technical name (e.g., 'source-github').
Returns:

ConnectorMetadata object with the connector's metadata.

Raises:
  • FileNotFoundError: If the connector directory or metadata file doesn't exist.
def get_gcs_publish_path(connector_name: str, artifact_type: str, version: str = 'latest') -> str:
278def get_gcs_publish_path(
279    connector_name: str,
280    artifact_type: str,
281    version: str = LATEST_GCS_FOLDER_NAME,
282) -> str:
283    """Compute the GCS path for a connector artifact for publishing.
284
285    All connectors use the airbyte/{connector_name} convention.
286    """
287    artifact_files = {
288        "metadata": METADATA_FILE_NAME,
289        "spec": "spec.json",
290        "icon": "icon.svg",
291        "doc": "doc.md",
292    }
293
294    if artifact_type not in artifact_files:
295        raise ValueError(
296            f"Unknown artifact type: {artifact_type}. "
297            f"Valid types are: {', '.join(artifact_files.keys())}"
298        )
299
300    file_name = artifact_files[artifact_type]
301    return f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{file_name}"

Compute the GCS path for a connector artifact for publishing.

All connectors use the airbyte/{connector_name} convention.

def get_registry( store: RegistryStore) -> Registry:
203def get_registry(store: RegistryStore) -> Registry:
204    """Factory for obtaining the right store implementation."""
205
206    if store.store_type == StoreType.CORAL:
207        from airbyte_ops_mcp.registry.coral_registry_store import CoralRegistry
208
209        return CoralRegistry(store)
210
211    if store.store_type == StoreType.SONAR:
212        from airbyte_ops_mcp.registry.sonar_registry_store import SonarRegistry
213
214        return SonarRegistry(store)
215
216    # defensive: StoreType is an Enum, but keep this for readability
217    raise ValueError(f"Unknown store type: {store.store_type}")

Factory for obtaining the right store implementation.

def get_registry_entry( connector_name: str, bucket_name: str, version: str = 'latest') -> dict[str, typing.Any]:
43def get_registry_entry(
44    connector_name: str,
45    bucket_name: str,
46    version: str = LATEST_GCS_FOLDER_NAME,
47) -> dict[str, Any]:
48    """Get a connector's registry entry from GCS.
49
50    Reads metadata for a connector from the registry stored in GCS.
51
52    Args:
53        connector_name: The connector name (e.g., "source-faker", "destination-postgres")
54        bucket_name: Name of the GCS bucket containing the registry
55        version: Version folder name (e.g., "latest", "1.2.3")
56
57    Returns:
58        dict: The connector's metadata as a dictionary
59
60    Raises:
61        ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure
62        FileNotFoundError: If the connector metadata is not found in the registry
63        yaml.YAMLError: If the metadata file contains invalid YAML syntax
64    """
65    storage_client = get_gcs_storage_client()
66    bucket = storage_client.bucket(bucket_name)
67
68    # Construct the path to the metadata file
69    # Pattern: metadata/airbyte/{connector_name}/{version}/metadata.yaml
70    blob_path = (
71        f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{METADATA_FILE_NAME}"
72    )
73    blob = bucket.blob(blob_path)
74
75    logger.info(f"Reading registry entry for {connector_name} from {blob_path}")
76
77    # Read the file
78    content = safe_read_gcs_file(blob)
79    if content is None:
80        raise FileNotFoundError(
81            f"Connector metadata not found in registry: {connector_name}. "
82            f"Checked path: {blob_path}"
83        )
84
85    # Parse YAML
86    try:
87        metadata = yaml.safe_load(content)
88        if metadata is None or not isinstance(metadata, dict):
89            raise ValueError(f"Metadata file {blob_path} has an invalid structure")
90        return metadata
91    except yaml.YAMLError as e:
92        logger.error(
93            "Failed to parse metadata for %s from %s: %s",
94            connector_name,
95            blob_path,
96            e,
97        )
98        raise

Get a connector's registry entry from GCS.

Reads metadata for a connector from the registry stored in GCS.

Arguments:
  • connector_name: The connector name (e.g., "source-faker", "destination-postgres")
  • bucket_name: Name of the GCS bucket containing the registry
  • version: Version folder name (e.g., "latest", "1.2.3")
Returns:

dict: The connector's metadata as a dictionary

Raises:
  • ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure
  • FileNotFoundError: If the connector metadata is not found in the registry
  • yaml.YAMLError: If the metadata file contains invalid YAML syntax
def get_registry_spec( connector_name: str, bucket_name: str, version: str = 'latest') -> dict[str, typing.Any]:
101def get_registry_spec(
102    connector_name: str,
103    bucket_name: str,
104    version: str = LATEST_GCS_FOLDER_NAME,
105) -> dict[str, Any]:
106    """Get a connector's spec from GCS.
107
108    Reads the connector specification from the registry stored in GCS.
109
110    Args:
111        connector_name: The connector name (e.g., "source-faker", "destination-postgres")
112        bucket_name: Name of the GCS bucket containing the registry
113        version: Version folder name (e.g., "latest", "1.2.3")
114
115    Returns:
116        dict: The connector's spec as a dictionary
117
118    Raises:
119        ValueError: If GCS credentials are not configured, or if the spec is not a JSON object
120        FileNotFoundError: If the connector spec is not found in the registry
121        json.JSONDecodeError: If the spec file contains invalid JSON syntax
122    """
123    storage_client = get_gcs_storage_client()
124    bucket = storage_client.bucket(bucket_name)
125
126    # Construct the path to the spec file
127    # Pattern: metadata/airbyte/{connector_name}/{version}/spec.json
128    blob_path = f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{SPEC_FILE_NAME}"
129    blob = bucket.blob(blob_path)
130
131    logger.info(f"Reading spec for {connector_name} from {blob_path}")
132
133    # Read the file
134    content = safe_read_gcs_file(blob)
135    if content is None:
136        raise FileNotFoundError(
137            f"Connector spec not found in registry: {connector_name}. "
138            f"Checked path: {blob_path}"
139        )
140
141    # Parse JSON
142    try:
143        spec = json.loads(content)
144        if spec is None or not isinstance(spec, dict):
145            raise ValueError(
146                f"Spec file for {connector_name} at {blob_path} is not a JSON object"
147            )
148        return spec
149    except json.JSONDecodeError as e:
150        logger.error(
151            "Failed to parse spec for %s from %s: %s",
152            connector_name,
153            blob_path,
154            e,
155        )
156        raise

Get a connector's spec from GCS.

Reads the connector specification from the registry stored in GCS.

Arguments:
  • connector_name: The connector name (e.g., "source-faker", "destination-postgres")
  • bucket_name: Name of the GCS bucket containing the registry
  • version: Version folder name (e.g., "latest", "1.2.3")
Returns:

dict: The connector's spec as a dictionary

Raises:
  • ValueError: If GCS credentials are not configured, or if the spec is not a JSON object
  • FileNotFoundError: If the connector spec is not found in the registry
  • json.JSONDecodeError: If the spec file contains invalid JSON syntax
def is_valid_for_progressive_rollout(version: str) -> bool:
75def is_valid_for_progressive_rollout(version: str) -> bool:
76    """Check if a version string is valid for progressive rollout.
77
78    Currently rejects only `-preview` versions.  All other semver
79    versions (GA and RC alike) are accepted.  This gate is intentionally
80    kept generic so it can be tightened or turned into a no-op in the
81    future.
82
83    Args:
84        version: The version string to check.
85
86    Returns:
87        True if the version may be used in a progressive rollout,
88        False if the version format is explicitly disallowed.
89    """
90    return "-preview." not in version

Check if a version string is valid for progressive rollout.

Currently rejects only -preview versions. All other semver versions (GA and RC alike) are accepted. This gate is intentionally kept generic so it can be tightened or turned into a no-op in the future.

Arguments:
  • version: The version string to check.
Returns:

True if the version may be used in a progressive rollout, False if the version format is explicitly disallowed.

def list_connector_versions(connector_name: str, bucket_name: str) -> list[str]:
325def list_connector_versions(connector_name: str, bucket_name: str) -> list[str]:
326    """List all versions of a connector in the registry.
327
328    Scans the GCS bucket to find all versions of a specific connector.
329
330    Args:
331        connector_name: The connector name (e.g., "source-faker")
332        bucket_name: Name of the GCS bucket containing the registry
333
334    Returns:
335        list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate')
336
337    Raises:
338        ValueError: If GCS credentials are not configured
339    """
340    storage_client = get_gcs_storage_client()
341    bucket = storage_client.bucket(bucket_name)
342
343    # List all blobs matching the pattern: metadata/airbyte/{connector_name}/*/metadata.yaml
344    glob_pattern = f"{METADATA_FOLDER}/airbyte/{connector_name}/*/{METADATA_FILE_NAME}"
345    logger.info(f"Listing versions for {connector_name} with pattern: {glob_pattern}")
346
347    try:
348        blobs = bucket.list_blobs(match_glob=glob_pattern)
349    except Exception as e:
350        logger.error(f"Error listing blobs in bucket {bucket_name}: {e}")
351        raise
352
353    # Extract versions from blob paths
354    # Path format: metadata/airbyte/{connector-name}/{version}/metadata.yaml
355    versions: set[str] = set()
356    for blob in blobs:
357        path_parts = blob.name.split("/")
358        # Path should be: metadata / airbyte / connector-name / version / metadata.yaml
359        if len(path_parts) >= 5:
360            version = path_parts[3]
361            # Exclude special folders
362            if version not in ("latest", "release_candidate"):
363                versions.add(version)
364
365    return sorted(versions)

List all versions of a connector in the registry.

Scans the GCS bucket to find all versions of a specific connector.

Arguments:
  • connector_name: The connector name (e.g., "source-faker")
  • bucket_name: Name of the GCS bucket containing the registry
Returns:

list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate')

Raises:
  • ValueError: If GCS credentials are not configured
def list_registry_connectors(bucket_name: str) -> list[str]:
159def list_registry_connectors(bucket_name: str) -> list[str]:
160    """List all connectors in the registry.
161
162    Scans the GCS bucket to find all connectors that have metadata files.
163
164    Args:
165        bucket_name: Name of the GCS bucket containing the registry
166
167    Returns:
168        list[str]: Sorted list of connector names
169
170    Raises:
171        ValueError: If GCS credentials are not configured
172    """
173    storage_client = get_gcs_storage_client()
174    bucket = storage_client.bucket(bucket_name)
175
176    # List all blobs matching the pattern: metadata/airbyte/*/latest/metadata.yaml
177    glob_pattern = (
178        f"{METADATA_FOLDER}/airbyte/*/{LATEST_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}"
179    )
180    logger.info(f"Listing connectors with pattern: {glob_pattern}")
181
182    try:
183        blobs = bucket.list_blobs(match_glob=glob_pattern)
184    except Exception as e:
185        logger.error(f"Error listing blobs in bucket {bucket_name}: {e}")
186        raise
187
188    # Extract connector names from blob paths
189    # Path format: metadata/airbyte/{connector-name}/latest/metadata.yaml
190    connector_names: set[str] = set()
191    for blob in blobs:
192        path_parts = blob.name.split("/")
193        # Path should be: metadata / airbyte / connector-name / latest / metadata.yaml
194        if len(path_parts) >= 5:
195            connector_name = path_parts[2]
196            connector_names.add(connector_name)
197
198    return sorted(connector_names)

List all connectors in the registry.

Scans the GCS bucket to find all connectors that have metadata files.

Arguments:
  • bucket_name: Name of the GCS bucket containing the registry
Returns:

list[str]: Sorted list of connector names

Raises:
  • ValueError: If GCS credentials are not configured
def list_registry_connectors_filtered( bucket_name: str, *, support_level: SupportLevel | None = None, min_support_level: SupportLevel | None = None, connector_type: ConnectorType | None = None, language: ConnectorLanguage | None = None, prefix: str = '') -> list[str]:
201def list_registry_connectors_filtered(
202    bucket_name: str,
203    *,
204    support_level: SupportLevel | None = None,
205    min_support_level: SupportLevel | None = None,
206    connector_type: ConnectorType | None = None,
207    language: ConnectorLanguage | None = None,
208    prefix: str = "",
209) -> list[str]:
210    """List connectors from the compiled cloud registry index with filtering.
211
212    When any filter is applied, reads the compiled `cloud_registry.json` index
213    instead of globbing individual metadata blobs. This is significantly faster
214    because the index is a single JSON file containing all connector entries.
215
216    When no filters are applied, falls back to the existing glob-based search
217    which captures all connectors (including OSS-only connectors not in the
218    Cloud index).
219
220    Args:
221        bucket_name: Name of the GCS bucket containing the registry.
222        support_level: Exact support level to match (e.g., `SupportLevel.CERTIFIED`).
223        min_support_level: Minimum support level threshold. Returns connectors
224            at or above this level.
225        connector_type: Filter by connector type (`ConnectorType.SOURCE` or
226            `ConnectorType.DESTINATION`).
227        language: Filter by implementation language (e.g., `ConnectorLanguage.PYTHON`).
228        prefix: Optional bucket prefix (e.g., `"aj-test100"`).
229
230    Returns:
231        Sorted list of connector technical names (e.g., `"source-github"`).
232
233    Raises:
234        ValueError: If `support_level` and `min_support_level` are both provided.
235    """
236    has_filters = any([support_level, min_support_level, connector_type, language])
237
238    if not has_filters:
239        return list_registry_connectors(bucket_name=bucket_name)
240
241    if support_level and min_support_level:
242        raise ValueError(
243            "Cannot specify both `support_level` and `min_support_level`. "
244            "Use `support_level` for an exact match or `min_support_level` for a threshold."
245        )
246
247    entries = _read_cloud_registry_index(bucket_name=bucket_name, prefix=prefix)
248
249    # Apply support_level exact match
250    if support_level:
251        entries = [e for e in entries if e.get("supportLevel") == support_level]
252
253    # Apply min_support_level threshold
254    if min_support_level:
255        threshold = min_support_level.precedence
256        known_levels = {m.value for m in SupportLevel}
257        entries = [
258            e
259            for e in entries
260            if e.get("supportLevel")
261            and e["supportLevel"] in known_levels
262            and SupportLevel(e["supportLevel"]).precedence >= threshold
263        ]
264
265    # Apply connector_type filter
266    if connector_type == ConnectorType.SOURCE:
267        entries = [e for e in entries if "sourceDefinitionId" in e]
268    elif connector_type == ConnectorType.DESTINATION:
269        entries = [e for e in entries if "destinationDefinitionId" in e]
270
271    # Apply language filter
272    if language:
273        entries = [e for e in entries if e.get("language") == language]
274
275    # Extract connector names from dockerRepository (e.g., "airbyte/source-github" -> "source-github")
276    names: set[str] = set()
277    for entry in entries:
278        docker_repo = entry.get("dockerRepository", "")
279        if "/" in docker_repo:
280            names.add(docker_repo.split("/", 1)[1])
281        elif docker_repo:
282            names.add(docker_repo)
283
284    return sorted(names)

List connectors from the compiled cloud registry index with filtering.

When any filter is applied, reads the compiled cloud_registry.json index instead of globbing individual metadata blobs. This is significantly faster because the index is a single JSON file containing all connector entries.

When no filters are applied, falls back to the existing glob-based search which captures all connectors (including OSS-only connectors not in the Cloud index).

Arguments:
  • bucket_name: Name of the GCS bucket containing the registry.
  • support_level: Exact support level to match (e.g., SupportLevel.CERTIFIED).
  • min_support_level: Minimum support level threshold. Returns connectors at or above this level.
  • connector_type: Filter by connector type (ConnectorType.SOURCE or ConnectorType.DESTINATION).
  • language: Filter by implementation language (e.g., ConnectorLanguage.PYTHON).
  • prefix: Optional bucket prefix (e.g., "aj-test100").
Returns:

Sorted list of connector technical names (e.g., "source-github").

Raises:
  • ValueError: If support_level and min_support_level are both provided.
def publish_connector_metadata( connector_name: str, metadata: dict[str, typing.Any], bucket_name: str, version: str, update_latest: bool = True, dry_run: bool = False) -> MetadataPublishResult:
304def publish_connector_metadata(
305    connector_name: str,
306    metadata: dict[str, Any],
307    bucket_name: str,
308    version: str,
309    update_latest: bool = True,
310    dry_run: bool = False,
311) -> MetadataPublishResult:
312    """Publish connector metadata to GCS.
313
314    Uploads the metadata to the registry bucket at a versioned path, and optionally
315    also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading
316    unchanged files.
317
318    Requires GCS_CREDENTIALS environment variable to be set.
319    """
320    if not isinstance(metadata, dict):
321        raise ValueError("Metadata must be a dictionary")
322
323    if "data" not in metadata:
324        raise ValueError("Metadata must contain 'data' field")
325
326    # Construct GCS paths using airbyte/{connector_name} convention
327    versioned_blob_path = get_gcs_publish_path(connector_name, "metadata", version)
328    latest_blob_path = get_gcs_publish_path(
329        connector_name, "metadata", LATEST_GCS_FOLDER_NAME
330    )
331
332    if dry_run:
333        message = f"[DRY RUN] Would upload metadata to gs://{bucket_name}/{versioned_blob_path}"
334        if update_latest:
335            message += f" and gs://{bucket_name}/{latest_blob_path}"
336        logger.info(message)
337        return MetadataPublishResult(
338            connector_name=connector_name,
339            version=version,
340            bucket_name=bucket_name,
341            versioned_path=versioned_blob_path,
342            latest_path=latest_blob_path if update_latest else None,
343            versioned_uploaded=False,
344            latest_uploaded=False,
345            status="dry-run",
346            message=message,
347        )
348
349    # Get GCS client and bucket
350    storage_client = get_gcs_storage_client()
351    bucket = storage_client.bucket(bucket_name)
352
353    # Write metadata to temp file
354    with tempfile.NamedTemporaryFile(
355        mode="w", suffix=".yaml", delete=False
356    ) as tmp_file:
357        yaml.dump(metadata, tmp_file)
358        tmp_path = Path(tmp_file.name)
359
360    try:
361        # Upload versioned file
362        versioned_uploaded, _ = upload_file_if_changed(
363            local_file_path=tmp_path,
364            bucket=bucket,
365            blob_path=versioned_blob_path,
366            disable_cache=True,
367        )
368
369        if versioned_uploaded:
370            logger.info(
371                f"Uploaded metadata for {connector_name} v{version} to {versioned_blob_path}"
372            )
373        else:
374            logger.info(
375                f"Versioned metadata for {connector_name} v{version} is already up to date"
376            )
377
378        # Optionally update latest pointer
379        latest_uploaded = False
380        if update_latest:
381            latest_uploaded, _ = upload_file_if_changed(
382                local_file_path=tmp_path,
383                bucket=bucket,
384                blob_path=latest_blob_path,
385                disable_cache=True,
386            )
387            if latest_uploaded:
388                logger.info(f"Updated latest pointer for {connector_name}")
389            else:
390                logger.info(
391                    f"Latest pointer for {connector_name} is already up to date"
392                )
393    finally:
394        # Clean up temp file even if upload fails
395        tmp_path.unlink(missing_ok=True)
396
397    # Determine status
398    if versioned_uploaded or latest_uploaded:
399        status = "success"
400        message = f"Published metadata for {connector_name} v{version}"
401        if versioned_uploaded:
402            message += f" to {versioned_blob_path}"
403        if latest_uploaded:
404            message += " and updated latest"
405    else:
406        status = "already-up-to-date"
407        message = f"Metadata for {connector_name} v{version} is already up to date"
408
409    return MetadataPublishResult(
410        connector_name=connector_name,
411        version=version,
412        bucket_name=bucket_name,
413        versioned_path=versioned_blob_path,
414        latest_path=latest_blob_path if update_latest else None,
415        versioned_uploaded=versioned_uploaded,
416        latest_uploaded=latest_uploaded,
417        status=status,
418        message=message,
419    )

Publish connector metadata to GCS.

Uploads the metadata to the registry bucket at a versioned path, and optionally also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading unchanged files.

Requires GCS_CREDENTIALS environment variable to be set.

def publish_version_artifacts( connector_name: str, version: str, artifacts_dir: pathlib.Path, store: RegistryStore, dry_run: bool = False, with_validate: bool = True) -> PublishArtifactsResult:
115def publish_version_artifacts(
116    connector_name: str,
117    version: str,
118    artifacts_dir: Path,
119    store: RegistryStore,
120    dry_run: bool = False,
121    with_validate: bool = True,
122) -> PublishArtifactsResult:
123    """Publish locally generated artifacts to a GCS registry bucket.
124
125    Uses `gcsfs.GCSFileSystem` to upload the local *artifacts_dir* to the
126    versioned path inside the target GCS bucket.
127
128    The target GCS path is:
129        `gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/`
130
131    Before uploading, this function validates that the `connector_name`
132    (derived from the connector directory) matches the `dockerRepository`
133    declared in `metadata.yaml`.  A mismatch would cause the registry
134    compile step to see duplicate definition-ID entries and fail.
135
136    Args:
137        connector_name: Connector name (e.g. `source-faker`).
138        version: Version string (e.g. `6.2.38`).
139        artifacts_dir: Local directory containing artifacts from `generate`.
140        store: Parsed store target containing bucket, prefix, and stage info.
141        dry_run: If `True`, report what would be uploaded without writing.
142        with_validate: If `True` (default), validate metadata before uploading.
143            Pass `False` (`--no-validate`) to skip.
144
145    Returns:
146        A `PublishArtifactsResult` describing what was published.
147
148    Raises:
149        ValueError: If the connector directory name does not match
150            `dockerRepository` in the generated metadata.
151    """
152    if not artifacts_dir.is_dir():
153        raise FileNotFoundError(f"Artifacts directory not found: {artifacts_dir}")
154
155    # Fail fast if the connector directory name doesn't match dockerRepository.
156    # A mismatch would publish artifacts under the wrong GCS path and corrupt
157    # the registry (duplicate definition-IDs under different directory names).
158    mismatch_error = _check_connector_name_matches_docker_repo(
159        connector_name, artifacts_dir
160    )
161    if mismatch_error:
162        raise ValueError(mismatch_error)
163
164    # Build the GCS destination path
165    bucket_name = store.bucket
166    prefix = store.prefix
167    blob_root = versioned_blob_root(
168        connector_name=connector_name, version=version, store=store
169    )
170    versioned_dest = f"gcs://{bucket_name}/{blob_root}"
171
172    target_label = f"{bucket_name}/{prefix}" if prefix else bucket_name
173    result = PublishArtifactsResult(
174        connector_name=connector_name,
175        version=version,
176        target=target_label,
177        gcs_destination=versioned_dest,
178        dry_run=dry_run,
179    )
180
181    # --- Pre-publish validation ---
182    if with_validate:
183        metadata_file = artifacts_dir / "metadata.yaml"
184        if metadata_file.is_file():
185            raw_metadata = yaml.safe_load(metadata_file.read_text())
186            metadata_data = (raw_metadata or {}).get("data", {})
187            validation = validate_metadata(metadata_data=metadata_data)
188            if not validation.passed:
189                for err in validation.errors:
190                    logger.error("Pre-publish validation error: %s", err)
191                result.validation_errors = validation.errors
192                return result
193            logger.info(
194                "Pre-publish validation passed (%d validators).",
195                validation.validators_run,
196            )
197        else:
198            logger.warning("No metadata.yaml in artifacts dir; skipping validation.")
199
200    # Enumerate local files
201    local_files = sorted(f for f in artifacts_dir.rglob("*") if f.is_file())
202    if not local_files:
203        result.errors.append(f"No files found in {artifacts_dir}.")
204        return result
205
206    _log_progress(
207        "Publishing %d artifacts for %s@%s%s",
208        len(local_files),
209        connector_name,
210        version,
211        versioned_dest,
212    )
213
214    # Build references used by both dry-run and real upload paths
215    deps_file = artifacts_dir / CONNECTOR_DEPENDENCY_FILE_NAME
216    has_deps = deps_file.is_file()
217    deps_gcs_key = dependencies_blob_path(
218        connector_name=connector_name, version=version, store=store
219    )
220
221    sbom_file = artifacts_dir / SBOM_FILE_NAME
222    has_sbom = sbom_file.is_file()
223
224    if dry_run:
225        for f in local_files:
226            rel = f.relative_to(artifacts_dir)
227            result.files_uploaded.append(str(rel))
228            _log_progress("  [DRY RUN] would upload: %s", rel)
229        # Report the dual-load of dependencies.json to connector_dependencies/
230        if has_deps:
231            result.files_uploaded.append(deps_gcs_key)
232            _log_progress(
233                "  [DRY RUN] would also dual-load: %s → gs://%s/%s",
234                CONNECTOR_DEPENDENCY_FILE_NAME,
235                bucket_name,
236                deps_gcs_key,
237            )
238        # Report the separate sbom/ upload
239        if has_sbom:
240            sbom_gcs_key = sbom_blob_path(
241                connector_name=connector_name,
242                version=version,
243                store=store,
244            )
245            result.files_uploaded.append(sbom_gcs_key)
246            _log_progress(
247                "  [DRY RUN] would also upload: %s → gs://%s/%s",
248                SBOM_FILE_NAME,
249                bucket_name,
250                sbom_gcs_key,
251            )
252        return result
253
254    # Authenticate
255    token = get_gcs_credentials_token()
256    fs = gcsfs.GCSFileSystem(token=token)
257
258    # Strip gcs:// prefix for gcsfs path
259    dest_path = versioned_dest.replace("gcs://", "")
260
261    # Upload all files to the versioned path
262    _log_progress("Uploading to: %s", versioned_dest)
263    for f in local_files:
264        rel = f.relative_to(artifacts_dir)
265        remote_path = f"{dest_path}/{rel}"
266        fs.put(str(f), remote_path)
267        result.files_uploaded.append(str(rel))
268        _log_progress("  Uploaded: %s", rel)
269
270    # Delete remote files that don't exist locally (sync semantics)
271    try:
272        remote_files = fs.ls(dest_path, detail=False)
273        local_rel_paths = {str(f.relative_to(artifacts_dir)) for f in local_files}
274        for remote_file in remote_files:
275            # Skip the directory entry itself if it appears in the listing
276            if remote_file == dest_path:
277                continue
278            # Derive the remote relative path, matching upload semantics
279            if remote_file.startswith(dest_path + "/"):
280                remote_rel = remote_file[len(dest_path) + 1 :]
281            else:
282                remote_rel = remote_file.split("/")[-1]
283            if remote_rel not in local_rel_paths:
284                fs.rm(remote_file)
285                _log_progress("  Deleted stale remote file: %s", remote_rel)
286    except FileNotFoundError:
287        pass  # Destination doesn't exist yet, nothing to clean
288
289    _log_progress("Uploaded %d files to %s", len(local_files), versioned_dest)
290
291    # --- Dual-load dependencies.json to the connector_dependencies/ path ---
292    if not has_deps:
293        logger.debug(
294            "No %s in artifacts dir — skipping dual-load.",
295            CONNECTOR_DEPENDENCY_FILE_NAME,
296        )
297    else:
298        deps_remote = f"{bucket_name}/{deps_gcs_key}"
299        _log_progress(
300            "Dual-loading %s to gs://%s",
301            CONNECTOR_DEPENDENCY_FILE_NAME,
302            deps_remote,
303        )
304        fs.put(str(deps_file), deps_remote)
305        result.files_uploaded.append(deps_gcs_key)
306        _log_progress("  Uploaded %s (dual-load)", CONNECTOR_DEPENDENCY_FILE_NAME)
307
308    # --- Upload SBOM to the dedicated sbom/ path in GCS ---
309    if not has_sbom:
310        logger.debug(
311            "No %s in artifacts dir — skipping SBOM dual-load.",
312            SBOM_FILE_NAME,
313        )
314    else:
315        sbom_gcs_uri = upload_sbom(
316            sbom_path=sbom_file,
317            connector_name=connector_name,
318            version=version,
319            store=store,
320            dry_run=dry_run,
321        )
322        result.files_uploaded.append(
323            sbom_blob_path(
324                connector_name=connector_name,
325                version=version,
326                store=store,
327            ),
328        )
329        _log_progress("Uploaded SBOM to dedicated path: %s", sbom_gcs_uri)
330
331    return result

Publish locally generated artifacts to a GCS registry bucket.

Uses gcsfs.GCSFileSystem to upload the local artifacts_dir to the versioned path inside the target GCS bucket.

The target GCS path is:

gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/

Before uploading, this function validates that the connector_name (derived from the connector directory) matches the dockerRepository declared in metadata.yaml. A mismatch would cause the registry compile step to see duplicate definition-ID entries and fail.

Arguments:
  • connector_name: Connector name (e.g. source-faker).
  • version: Version string (e.g. 6.2.38).
  • artifacts_dir: Local directory containing artifacts from generate.
  • store: Parsed store target containing bucket, prefix, and stage info.
  • dry_run: If True, report what would be uploaded without writing.
  • with_validate: If True (default), validate metadata before uploading. Pass False (--no-validate) to skip.
Returns:

A PublishArtifactsResult describing what was published.

Raises:
  • ValueError: If the connector directory name does not match dockerRepository in the generated metadata.
def purge_latest_dirs( *, store: RegistryStore, connector_name: list[str] | None = None, dry_run: bool = False) -> PurgeLatestResult:
1397def purge_latest_dirs(
1398    *,
1399    store: RegistryStore,
1400    connector_name: list[str] | None = None,
1401    dry_run: bool = False,
1402) -> PurgeLatestResult:
1403    """Delete all `latest/` directories from the registry store.
1404
1405    Discovers connector directories via glob, then deletes each
1406    `latest/` subdirectory in parallel using a thread pool.
1407
1408    Args:
1409        store: Registry store (bucket + optional prefix).
1410        connector_name: If provided, only purge these connectors.
1411        dry_run: If True, report what would be done without deleting.
1412
1413    Returns:
1414        A `PurgeLatestResult` describing what was done.
1415    """
1416    result = PurgeLatestResult(target=store.bucket_root, dry_run=dry_run)
1417
1418    token = get_gcs_credentials_token()
1419    fs = gcsfs.GCSFileSystem(token=token)
1420
1421    base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte"
1422
1423    # Discover latest/ dirs by listing connector directories that contain
1424    # a `latest/` subdirectory.
1425    _log_progress("Discovering latest/ directories...")
1426    base_with_slash = f"{base}/"
1427    if connector_name:
1428        # Check each requested connector for a latest/ dir
1429        seen: set[str] = set()
1430        connectors_with_latest: list[str] = []
1431        for name in connector_name:
1432            if name in seen:
1433                continue
1434            latest_path = f"{base}/{name}/latest"
1435            if fs.exists(latest_path):
1436                connectors_with_latest.append(name)
1437                seen.add(name)
1438    else:
1439        # Glob for all connectors, then filter to those with latest/
1440        all_connector_dirs = fs.glob(f"{base}/*/latest")
1441        seen = set()
1442        connectors_with_latest = []
1443        for path in all_connector_dirs:
1444            # Strip the known base prefix and take the first component
1445            if not path.startswith(base_with_slash):
1446                logger.warning("Could not parse latest path: %s", path)
1447                continue
1448            relative = path[len(base_with_slash) :]
1449            connector = relative.split("/")[0]
1450            if connector and connector not in seen:
1451                connectors_with_latest.append(connector)
1452                seen.add(connector)
1453
1454    result.connectors_found = len(connectors_with_latest)
1455    _log_progress(
1456        "Found %d connectors with latest/ directories",
1457        result.connectors_found,
1458    )
1459
1460    if not connectors_with_latest:
1461        _log_progress("Nothing to purge.")
1462        _log_progress(result.summary())
1463        return result
1464
1465    if dry_run:
1466        for connector in sorted(connectors_with_latest):
1467            _log_progress("  [DRY RUN] Would delete %s/latest/", connector)
1468        result.latest_dirs_deleted = len(connectors_with_latest)
1469        _log_progress(result.summary())
1470        return result
1471
1472    # Delete latest/ dirs in parallel using the shared helper.
1473    def _delete_one(connector: str) -> str | None:
1474        """Delete a single connector's latest/ dir. Returns error string or None."""
1475        try:
1476            _delete_latest_dir(
1477                fs,
1478                store=store,
1479                connector=connector,
1480            )
1481            return None
1482        except Exception as exc:
1483            return f"Failed to delete latest/ for {connector}: {exc}"
1484
1485    _log_progress(
1486        "Deleting %d latest/ directories (max_workers=%d)...",
1487        len(connectors_with_latest),
1488        _PURGE_LATEST_MAX_WORKERS,
1489    )
1490
1491    with ThreadPoolExecutor(max_workers=_PURGE_LATEST_MAX_WORKERS) as pool:
1492        futures = {
1493            pool.submit(_delete_one, c): c for c in sorted(connectors_with_latest)
1494        }
1495        for i, future in enumerate(as_completed(futures), 1):
1496            connector = futures[future]
1497            error = future.result()
1498            if error:
1499                logger.error(error)
1500                result.errors.append(error)
1501            else:
1502                result.latest_dirs_deleted += 1
1503            if i % 100 == 0:
1504                _log_progress("  Deleted %d / %d...", i, len(connectors_with_latest))
1505
1506    _log_progress(result.summary())
1507    return result

Delete all latest/ directories from the registry store.

Discovers connector directories via glob, then deletes each latest/ subdirectory in parallel using a thread pool.

Arguments:
  • store: Registry store (bucket + optional prefix).
  • connector_name: If provided, only purge these connectors.
  • dry_run: If True, report what would be done without deleting.
Returns:

A PurgeLatestResult describing what was done.

def rebuild_registry( source_bucket: str, output_mode: Literal['local', 'gcs', 's3'], output_path_root: str | None = None, gcs_bucket: str | None = None, s3_bucket: str | None = None, dry_run: bool = False, connector_name: list[str] | None = None) -> RebuildResult:
147def rebuild_registry(
148    source_bucket: str,
149    output_mode: OutputMode,
150    output_path_root: str | None = None,
151    gcs_bucket: str | None = None,
152    s3_bucket: str | None = None,
153    dry_run: bool = False,
154    connector_name: list[str] | None = None,
155) -> RebuildResult:
156    """Rebuild the entire registry from a source GCS bucket to an output target.
157
158    Reads all connector metadata blobs from the source GCS bucket and copies
159    them to the output target using fsspec for unified filesystem access.
160
161    The output targets are:
162    - local: Write to a local directory tree.
163    - gcs: Copy to a GCS bucket (must not be the prod bucket).
164    - s3: Copy to an S3 bucket.
165
166    Args:
167        source_bucket: The GCS bucket to read from (typically prod).
168        output_mode: Where to write: "local", "gcs", or "s3".
169        output_path_root: Root path/prefix for output. For local mode, if None
170            creates a temp directory. For GCS/S3, prepended to all blob paths.
171        gcs_bucket: Target GCS bucket name (required if output_mode="gcs").
172        s3_bucket: Target S3 bucket name (required if output_mode="s3").
173        dry_run: If True, report what would be done without writing.
174        connector_name: If provided, only rebuild these connector names
175            (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all.
176
177    Returns:
178        RebuildResult with details of the operation.
179
180    Raises:
181        ValueError: If target is the prod bucket, or required bucket arg is missing.
182    """
183    if output_mode == "gcs" and gcs_bucket:
184        _validate_not_prod_bucket(gcs_bucket)
185
186    # Resolve output root for local mode
187    effective_output_root = output_path_root or ""
188    if output_mode == "local":
189        effective_output_root = _resolve_local_output_root(output_path_root)
190
191    result = RebuildResult(
192        source_bucket=source_bucket,
193        output_mode=output_mode,
194        output_root=effective_output_root,
195        dry_run=dry_run,
196    )
197
198    # Create source filesystem (always GCS)
199    source_fs, gcs_token = _make_source_fs()
200    source_base = f"{source_bucket}/{METADATA_FOLDER}"
201
202    # List files under metadata/ in the source bucket
203    if connector_name:
204        _log_progress(
205            "Listing blobs for %d connectors under gs://%s/...",
206            len(connector_name),
207            source_base,
208        )
209        source_paths: list[str] = []
210        for name in connector_name:
211            connector_prefix = f"{source_base}/airbyte/{name}"
212            found = source_fs.find(connector_prefix)
213            source_paths.extend(found)
214            _log_progress("  %s: %d blobs", name, len(found))
215    else:
216        _log_progress("Listing all blobs under gs://%s/...", source_base)
217        source_paths = source_fs.find(source_base)
218
219    if not source_paths:
220        _log_progress("No blobs found under gs://%s/", source_base)
221        return result
222
223    total_blobs = len(source_paths)
224    _log_progress("Found %d blobs to process", total_blobs)
225
226    # Create output filesystem
227    output_fs, output_base = _make_output_fs(
228        output_mode=output_mode,
229        output_root=effective_output_root,
230        gcs_bucket=gcs_bucket,
231        s3_bucket=s3_bucket,
232        gcs_token=gcs_token,
233    )
234
235    # Collect connector names and compute relative paths for all blobs.
236    bucket_prefix = f"{source_bucket}/"
237    blob_relative_paths: list[str] = []
238    connector_names: set[str] = set()
239
240    for source_path in source_paths:
241        relative_path = source_path
242        if source_path.startswith(bucket_prefix):
243            relative_path = source_path[len(bucket_prefix) :]
244        blob_relative_paths.append(relative_path)
245
246        parts = relative_path.split("/")
247        if len(parts) >= 3:
248            connector_names.add(parts[2])
249
250    if dry_run:
251        result.blobs_copied = total_blobs
252        result.connectors_processed = len(connector_names)
253        _log_progress(
254            "[DRY RUN] Would copy %d blobs (%d connectors)",
255            total_blobs,
256            len(connector_names),
257        )
258        _log_progress(result.summary())
259        return result
260
261    # Use GCS-native server-side copy for GCS→GCS mirrors.
262    if output_mode == "gcs" and gcs_bucket:
263        result = _gcs_native_copy(
264            source_bucket=source_bucket,
265            dest_bucket_name=gcs_bucket,
266            dest_prefix=effective_output_root,
267            blob_relative_paths=blob_relative_paths,
268            connector_names=connector_names,
269            gcs_token=gcs_token,
270            result=result,
271            connector_name_filter=connector_name,
272        )
273        return result
274
275    # Fallback: fsspec-based copy for local and S3 output modes.
276    result = _fsspec_copy(
277        source_fs=source_fs,
278        source_paths=source_paths,
279        output_fs=output_fs,
280        output_base=output_base,
281        output_mode=output_mode,
282        source_bucket=source_bucket,
283        blob_relative_paths=blob_relative_paths,
284        connector_names=connector_names,
285        result=result,
286    )
287    return result

Rebuild the entire registry from a source GCS bucket to an output target.

Reads all connector metadata blobs from the source GCS bucket and copies them to the output target using fsspec for unified filesystem access.

The output targets are:

  • local: Write to a local directory tree.
  • gcs: Copy to a GCS bucket (must not be the prod bucket).
  • s3: Copy to an S3 bucket.
Arguments:
  • source_bucket: The GCS bucket to read from (typically prod).
  • output_mode: Where to write: "local", "gcs", or "s3".
  • output_path_root: Root path/prefix for output. For local mode, if None creates a temp directory. For GCS/S3, prepended to all blob paths.
  • gcs_bucket: Target GCS bucket name (required if output_mode="gcs").
  • s3_bucket: Target S3 bucket name (required if output_mode="s3").
  • dry_run: If True, report what would be done without writing.
  • connector_name: If provided, only rebuild these connector names (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all.
Returns:

RebuildResult with details of the operation.

Raises:
  • ValueError: If target is the prod bucket, or required bucket arg is missing.
def resolve_registry_store( store: str | None = None, connector_name: str | None = None, cwd: pathlib.Path | None = None, default_env: str = 'dev') -> RegistryStore:
229def resolve_registry_store(
230    store: str | None = None,
231    connector_name: str | None = None,
232    cwd: Path | None = None,
233    default_env: str = "dev",
234) -> RegistryStore:
235    """Resolve a `RegistryStore` from CLI inputs.
236
237    All applicable detection methods are evaluated.  Explicit sources
238    (`--store`, then the `AIRBYTE_REGISTRY_STORE` env var) take priority
239    and are returned directly.  When only auto-detected sources remain, they
240    are compared and a `ValueError` is raised if they disagree.
241
242    Priority (highest → lowest):
243
244    1. **Explicit** `--store` argument (e.g. `"coral:dev"`).
245    2. **Environment variable** -- `AIRBYTE_REGISTRY_STORE`.
246    3. **Auto-detected** -- connector name and/or working directory.
247       If both are present and disagree, a `ValueError` is raised.
248
249    Args:
250        store: Explicit store target string (e.g. `"coral:dev"`).
251        connector_name: Optional connector name for auto-detection.
252        cwd: Working directory for repo-based detection.
253        default_env: Environment to use when auto-detecting (default `"dev"`).
254
255    Returns:
256        A fully resolved `RegistryStore`.
257
258    Raises:
259        ValueError: If no detection method succeeds, or if auto-detected
260            methods produce conflicting store types.
261    """
262    # -- Collect all detection results ------------------------------------
263    # Explicit sources (take priority — no conflict checking needed).
264    explicit_target: RegistryStore | None = None
265    explicit_source: str | None = None
266
267    if store is not None:
268        explicit_target = RegistryStore.parse(store)
269        explicit_source = "--store"
270
271    env_store = os.environ.get(REGISTRY_STORE_ENV_VAR)
272    if env_store and explicit_target is None:
273        explicit_target = RegistryStore.parse(env_store)
274        explicit_source = REGISTRY_STORE_ENV_VAR
275
276    # Auto-detected sources (only consulted when no explicit source).
277    auto_detections: dict[str, StoreType] = {}
278
279    if connector_name is not None:
280        auto_detections["connector_name"] = StoreType.get_from_connector_name(
281            connector_name,
282        )
283
284    dir_type = StoreType.detect_from_repo_dir(cwd)
285    if dir_type is not None:
286        auto_detections["working_directory"] = dir_type
287
288    # -- Return explicit source if present --------------------------------
289    if explicit_target is not None:
290        logger.debug(
291            "Using explicit store target from %s: %s:%s",
292            explicit_source,
293            explicit_target.store_type.value,
294            explicit_target.env,
295        )
296        return explicit_target
297
298    # -- No explicit source: resolve from auto-detections -----------------
299    if not auto_detections:
300        raise ValueError(
301            "Cannot determine registry store. "
302            "Provide --store (e.g. 'coral:dev' or 'sonar:prod'), "
303            f"set ${REGISTRY_STORE_ENV_VAR}, "
304            "or run from a recognized repository directory."
305        )
306
307    distinct = set(auto_detections.values())
308
309    if len(distinct) > 1:
310        detail = ", ".join(f"{src}={st.value}" for src, st in auto_detections.items())
311        raise ValueError(
312            f"Conflicting store types detected: {detail}. "
313            "Provide an explicit --store to resolve the ambiguity."
314        )
315
316    resolved_type = distinct.pop()
317    logger.info(
318        "Auto-detected store type '%s' (sources: %s)",
319        resolved_type.value,
320        ", ".join(auto_detections),
321    )
322    return RegistryStore(store_type=resolved_type, env=default_env)

Resolve a RegistryStore from CLI inputs.

All applicable detection methods are evaluated. Explicit sources (--store, then the AIRBYTE_REGISTRY_STORE env var) take priority and are returned directly. When only auto-detected sources remain, they are compared and a ValueError is raised if they disagree.

Priority (highest → lowest):

  1. Explicit --store argument (e.g. "coral:dev").
  2. Environment variable -- AIRBYTE_REGISTRY_STORE.
  3. Auto-detected -- connector name and/or working directory. If both are present and disagree, a ValueError is raised.
Arguments:
  • store: Explicit store target string (e.g. "coral:dev").
  • connector_name: Optional connector name for auto-detection.
  • cwd: Working directory for repo-based detection.
  • default_env: Environment to use when auto-detecting (default "dev").
Returns:

A fully resolved RegistryStore.

Raises:
  • ValueError: If no detection method succeeds, or if auto-detected methods produce conflicting store types.
def strip_rc_suffix(version: str) -> str:
 93def strip_rc_suffix(version: str) -> str:
 94    """Strip the release candidate suffix from a version string.
 95
 96    Args:
 97        version: The version string (e.g., '1.2.3-rc.1').
 98
 99    Returns:
100        The base version without RC suffix (e.g., '1.2.3').
101        Returns the original version if no RC suffix is present.
102    """
103    if "-rc." in version:
104        return version.split("-rc.")[0]
105    return version

Strip the release candidate suffix from a version string.

Arguments:
  • version: The version string (e.g., '1.2.3-rc.1').
Returns:

The base version without RC suffix (e.g., '1.2.3'). Returns the original version if no RC suffix is present.

def unyank_connector_version( connector_name: str, version: str, bucket_name: str, dry_run: bool = False) -> YankResult:
157def unyank_connector_version(
158    connector_name: str,
159    version: str,
160    bucket_name: str,
161    dry_run: bool = False,
162) -> YankResult:
163    """Remove the yank marker from a connector version.
164
165    Deletes the version-yank.yml file at:
166        metadata/airbyte/{connector_name}/{version}/version-yank.yml
167
168    Args:
169        connector_name: The connector name (e.g., "source-faker").
170        version: The version to unyank (e.g., "1.2.3").
171        bucket_name: The GCS bucket name.
172        dry_run: If True, report what would be done without deleting.
173
174    Returns:
175        YankResult with details of the operation.
176    """
177    yank_path = _get_yank_blob_path(connector_name, version)
178
179    storage_client = get_gcs_storage_client()
180    bucket = storage_client.bucket(bucket_name)
181
182    # Check if yank marker exists
183    yank_blob = bucket.blob(yank_path)
184    if not yank_blob.exists():
185        return YankResult(
186            connector_name=connector_name,
187            version=version,
188            bucket_name=bucket_name,
189            action="unyank",
190            success=False,
191            message=f"Version {version} of {connector_name} is not yanked.",
192            dry_run=dry_run,
193        )
194
195    if dry_run:
196        return YankResult(
197            connector_name=connector_name,
198            version=version,
199            bucket_name=bucket_name,
200            action="unyank",
201            success=True,
202            message=f"[DRY RUN] Would unyank {connector_name} {version}.",
203            dry_run=True,
204        )
205
206    # Delete the yank marker
207    yank_blob.delete()
208
209    logger.info("Unyanked %s version %s in %s", connector_name, version, bucket_name)
210
211    return YankResult(
212        connector_name=connector_name,
213        version=version,
214        bucket_name=bucket_name,
215        action="unyank",
216        success=True,
217        message=f"Successfully unyanked {connector_name} {version}.",
218    )

Remove the yank marker from a connector version.

Deletes the version-yank.yml file at: metadata/airbyte/{connector_name}/{version}/version-yank.yml

Arguments:
  • connector_name: The connector name (e.g., "source-faker").
  • version: The version to unyank (e.g., "1.2.3").
  • bucket_name: The GCS bucket name.
  • dry_run: If True, report what would be done without deleting.
Returns:

YankResult with details of the operation.

def validate_metadata( metadata_data: dict[str, typing.Any], opts: ValidateOptions | None = None) -> ValidationResult:
270def validate_metadata(
271    metadata_data: dict[str, Any],
272    opts: ValidateOptions | None = None,
273) -> ValidationResult:
274    """Run all pre-publish validators against raw `metadata.data`.
275
276    Args:
277        metadata_data: The `data` section of a parsed `metadata.yaml`.
278        opts: Options influencing validation behaviour.
279
280    Returns:
281        A `ValidationResult` with aggregate pass/fail and error list.
282    """
283    if opts is None:
284        opts = ValidateOptions()
285
286    result = ValidationResult()
287
288    for validator in PRE_PUBLISH_VALIDATORS:
289        result.validators_run += 1
290        logger.info("Running validator: %s", validator.__name__)
291        passed, error = validator(metadata_data, opts)
292        if not passed and error:
293            logger.error("Validation failed: %s", error)
294            result.add_error(error)
295
296    return result

Run all pre-publish validators against raw metadata.data.

Arguments:
  • metadata_data: The data section of a parsed metadata.yaml.
  • opts: Options influencing validation behaviour.
Returns:

A ValidationResult with aggregate pass/fail and error list.

def yank_connector_version( connector_name: str, version: str, bucket_name: str, reason: str = '', dry_run: bool = False) -> YankResult:
 65def yank_connector_version(
 66    connector_name: str,
 67    version: str,
 68    bucket_name: str,
 69    reason: str = "",
 70    dry_run: bool = False,
 71) -> YankResult:
 72    """Mark a connector version as yanked by writing a version-yank.yml marker.
 73
 74    The marker file is placed at:
 75        metadata/airbyte/{connector_name}/{version}/version-yank.yml
 76
 77    Args:
 78        connector_name: The connector name (e.g., "source-faker").
 79        version: The version to yank (e.g., "1.2.3").
 80        bucket_name: The GCS bucket name.
 81        reason: Optional reason for yanking the version.
 82        dry_run: If True, report what would be done without writing.
 83
 84    Returns:
 85        YankResult with details of the operation.
 86
 87    Raises:
 88        ValueError: If the bucket is the production bucket and no override is set,
 89            or if the version does not exist.
 90    """
 91    yank_path = _get_yank_blob_path(connector_name, version)
 92    metadata_path = _get_metadata_blob_path(connector_name, version)
 93
 94    storage_client = get_gcs_storage_client()
 95    bucket = storage_client.bucket(bucket_name)
 96
 97    # Verify the version exists
 98    metadata_blob = bucket.blob(metadata_path)
 99    if not metadata_blob.exists():
100        return YankResult(
101            connector_name=connector_name,
102            version=version,
103            bucket_name=bucket_name,
104            action="yank",
105            success=False,
106            message=f"Version {version} not found for {connector_name} in {bucket_name}.",
107            dry_run=dry_run,
108        )
109
110    # Check if already yanked
111    yank_blob = bucket.blob(yank_path)
112    if yank_blob.exists():
113        return YankResult(
114            connector_name=connector_name,
115            version=version,
116            bucket_name=bucket_name,
117            action="yank",
118            success=False,
119            message=f"Version {version} of {connector_name} is already yanked.",
120            dry_run=dry_run,
121        )
122
123    if dry_run:
124        return YankResult(
125            connector_name=connector_name,
126            version=version,
127            bucket_name=bucket_name,
128            action="yank",
129            success=True,
130            message=f"[DRY RUN] Would yank {connector_name} {version}.",
131            dry_run=True,
132        )
133
134    # Write the yank marker file
135    yank_content: dict[str, Any] = {
136        "yanked": True,
137        "yanked_at": datetime.now(tz=timezone.utc).isoformat(),
138    }
139    if reason:
140        yank_content["reason"] = reason
141
142    yank_yaml = yaml.dump(yank_content, default_flow_style=False)
143    yank_blob.upload_from_string(yank_yaml, content_type="application/x-yaml")
144
145    logger.info("Yanked %s version %s in %s", connector_name, version, bucket_name)
146
147    return YankResult(
148        connector_name=connector_name,
149        version=version,
150        bucket_name=bucket_name,
151        action="yank",
152        success=True,
153        message=f"Successfully yanked {connector_name} {version}.",
154    )

Mark a connector version as yanked by writing a version-yank.yml marker.

The marker file is placed at:

metadata/airbyte/{connector_name}/{version}/version-yank.yml

Arguments:
  • connector_name: The connector name (e.g., "source-faker").
  • version: The version to yank (e.g., "1.2.3").
  • bucket_name: The GCS bucket name.
  • reason: Optional reason for yanking the version.
  • dry_run: If True, report what would be done without writing.
Returns:

YankResult with details of the operation.

Raises:
  • ValueError: If the bucket is the production bucket and no override is set, or if the version does not exist.