airbyte_ops_mcp.registry

Registry operations for Airbyte connectors.

This package provides functionality for reading, listing, publishing, compiling, and validating connector registry artifacts.

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2"""Registry operations for Airbyte connectors.
  3
  4This package provides functionality for reading, listing, publishing, compiling,
  5and validating connector registry artifacts.
  6"""
  7
  8from __future__ import annotations
  9
 10from airbyte_ops_mcp.registry._constants import (
 11    DEFAULT_METADATA_SERVICE_BUCKET_NAME,
 12    DEV_METADATA_SERVICE_BUCKET_NAME,
 13    LATEST_GCS_FOLDER_NAME,
 14    METADATA_FILE_NAME,
 15    METADATA_FOLDER,
 16    PROD_METADATA_SERVICE_BUCKET_NAME,
 17    RELEASE_CANDIDATE_GCS_FOLDER_NAME,
 18    SONAR_DEV_BUCKET_NAME,
 19    SONAR_PROD_BUCKET_NAME,
 20)
 21from airbyte_ops_mcp.registry._enums import (
 22    ConnectorLanguage,
 23    ConnectorType,
 24    SupportLevel,
 25)
 26from airbyte_ops_mcp.registry.audit import (
 27    AuditResult,
 28    UnpublishedConnector,
 29    find_unpublished_connectors,
 30)
 31from airbyte_ops_mcp.registry.compile import (
 32    CompileResult,
 33    PurgeLatestResult,
 34    compile_registry,
 35    purge_latest_dirs,
 36)
 37from airbyte_ops_mcp.registry.generate import (
 38    GenerateResult,
 39    generate_version_artifacts,
 40)
 41from airbyte_ops_mcp.registry.models import (
 42    ConnectorListResult,
 43    ConnectorMetadata,
 44    MetadataPublishResult,
 45    RegistryEntryResult,
 46    VersionListResult,
 47)
 48from airbyte_ops_mcp.registry.operations import (
 49    get_registry_entry,
 50    get_registry_spec,
 51    list_connector_versions,
 52    list_registry_connectors,
 53    list_registry_connectors_filtered,
 54)
 55from airbyte_ops_mcp.registry.publish import (
 56    CONNECTOR_PATH_PREFIX,
 57    get_connector_metadata,
 58    get_gcs_publish_path,
 59    publish_connector_metadata,
 60)
 61from airbyte_ops_mcp.registry.publish_artifacts import (
 62    PublishArtifactsResult,
 63    publish_version_artifacts,
 64)
 65from airbyte_ops_mcp.registry.rebuild import (
 66    OutputMode,
 67    RebuildResult,
 68    rebuild_registry,
 69)
 70from airbyte_ops_mcp.registry.registry_store_base import (
 71    Registry,
 72    get_registry,
 73)
 74from airbyte_ops_mcp.registry.store import (
 75    REGISTRY_STORE_ENV_VAR,
 76    RegistryStore,
 77    StoreType,
 78    resolve_registry_store,
 79)
 80from airbyte_ops_mcp.registry.validate import (
 81    ValidateOptions,
 82    ValidationResult,
 83    validate_metadata,
 84)
 85from airbyte_ops_mcp.registry.yank import (
 86    YANK_FILE_NAME,
 87    YankResult,
 88    unyank_connector_version,
 89    yank_connector_version,
 90)
 91
 92__all__ = [
 93    "CONNECTOR_PATH_PREFIX",
 94    "DEFAULT_METADATA_SERVICE_BUCKET_NAME",
 95    "DEV_METADATA_SERVICE_BUCKET_NAME",
 96    "LATEST_GCS_FOLDER_NAME",
 97    "METADATA_FILE_NAME",
 98    "METADATA_FOLDER",
 99    "PROD_METADATA_SERVICE_BUCKET_NAME",
100    "REGISTRY_STORE_ENV_VAR",
101    "RELEASE_CANDIDATE_GCS_FOLDER_NAME",
102    "SONAR_DEV_BUCKET_NAME",
103    "SONAR_PROD_BUCKET_NAME",
104    "YANK_FILE_NAME",
105    "AuditResult",
106    "CompileResult",
107    "ConnectorLanguage",
108    "ConnectorListResult",
109    "ConnectorMetadata",
110    "ConnectorType",
111    "GenerateResult",
112    "MetadataPublishResult",
113    "OutputMode",
114    "PublishArtifactsResult",
115    "PurgeLatestResult",
116    "RebuildResult",
117    "Registry",
118    "RegistryEntryResult",
119    "RegistryStore",
120    "StoreType",
121    "SupportLevel",
122    "UnpublishedConnector",
123    "ValidateOptions",
124    "ValidationResult",
125    "VersionListResult",
126    "YankResult",
127    "compile_registry",
128    "find_unpublished_connectors",
129    "generate_version_artifacts",
130    "get_connector_metadata",
131    "get_gcs_publish_path",
132    "get_registry",
133    "get_registry_entry",
134    "get_registry_spec",
135    "list_connector_versions",
136    "list_registry_connectors",
137    "list_registry_connectors_filtered",
138    "publish_connector_metadata",
139    "publish_version_artifacts",
140    "purge_latest_dirs",
141    "rebuild_registry",
142    "resolve_registry_store",
143    "unyank_connector_version",
144    "validate_metadata",
145    "yank_connector_version",
146]
CONNECTOR_PATH_PREFIX = 'airbyte-integrations/connectors'
DEFAULT_METADATA_SERVICE_BUCKET_NAME = 'dev-airbyte-cloud-connector-metadata-service-2'
DEV_METADATA_SERVICE_BUCKET_NAME = 'dev-airbyte-cloud-connector-metadata-service-2'
LATEST_GCS_FOLDER_NAME = 'latest'
METADATA_FILE_NAME = 'metadata.yaml'
METADATA_FOLDER = 'metadata'
PROD_METADATA_SERVICE_BUCKET_NAME = 'prod-airbyte-cloud-connector-metadata-service'
REGISTRY_STORE_ENV_VAR = 'AIRBYTE_REGISTRY_STORE'
RELEASE_CANDIDATE_GCS_FOLDER_NAME = 'release_candidate'
SONAR_DEV_BUCKET_NAME = 'airbyte-connector-registry-dev'
SONAR_PROD_BUCKET_NAME = 'airbyte-connector-registry'
YANK_FILE_NAME = 'version-yank.yml'
@dataclass
class AuditResult:
37@dataclass
38class AuditResult:
39    """Result of auditing which connectors have unpublished versions."""
40
41    unpublished: list[UnpublishedConnector] = field(default_factory=list)
42    checked_count: int = 0
43    skipped_archived: list[str] = field(default_factory=list)
44    skipped_rc: list[str] = field(default_factory=list)
45    skipped_disabled: list[str] = field(default_factory=list)
46    errors: list[str] = field(default_factory=list)

Result of auditing which connectors have unpublished versions.

AuditResult( unpublished: list[UnpublishedConnector] = <factory>, checked_count: int = 0, skipped_archived: list[str] = <factory>, skipped_rc: list[str] = <factory>, skipped_disabled: list[str] = <factory>, errors: list[str] = <factory>)
unpublished: list[UnpublishedConnector]
checked_count: int = 0
skipped_archived: list[str]
skipped_rc: list[str]
skipped_disabled: list[str]
errors: list[str]
@dataclass
class CompileResult:
107@dataclass
108class CompileResult:
109    """Result of a registry compile operation."""
110
111    target: str
112    connectors_scanned: int = 0
113    versions_found: int = 0
114    yanked_versions: int = 0
115    latest_updated: int = 0
116    latest_already_current: int = 0
117    cloud_registry_entries: int = 0
118    oss_registry_entries: int = 0
119    composite_registry_entries: int = 0
120    metrics_connector_count: int = 0
121    metrics_registry_entries: int = 0
122    metrics_source: str | None = None
123    metrics_error: str | None = None
124    version_indexes_written: int = 0
125    specs_secrets_mask_properties: int = 0
126    errors: list[str] = field(default_factory=list)
127    dry_run: bool = False
128
129    @property
130    def status(self) -> str:
131        if self.dry_run:
132            return "dry-run"
133        if self.errors:
134            return "completed-with-errors"
135        return "success"
136
137    def summary(self) -> str:
138        return (
139            f"[{self.status}] Scanned {self.connectors_scanned} connectors, "
140            f"{self.versions_found} versions ({self.yanked_versions} yanked). "
141            f"Latest updated: {self.latest_updated}, "
142            f"already current: {self.latest_already_current}. "
143            f"Registry entries: cloud={self.cloud_registry_entries}, "
144            f"oss={self.oss_registry_entries}, "
145            f"composite={self.composite_registry_entries}. "
146            f"Metrics loaded for {self.metrics_connector_count} connectors, "
147            f"injected into {self.metrics_registry_entries} registry entries. "
148            f"Version indexes: {self.version_indexes_written}. "
149            f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. "
150            f"Errors: {len(self.errors)}."
151        )

Result of a registry compile operation.

CompileResult( target: str, connectors_scanned: int = 0, versions_found: int = 0, yanked_versions: int = 0, latest_updated: int = 0, latest_already_current: int = 0, cloud_registry_entries: int = 0, oss_registry_entries: int = 0, composite_registry_entries: int = 0, metrics_connector_count: int = 0, metrics_registry_entries: int = 0, metrics_source: str | None = None, metrics_error: str | None = None, version_indexes_written: int = 0, specs_secrets_mask_properties: int = 0, errors: list[str] = <factory>, dry_run: bool = False)
target: str
connectors_scanned: int = 0
versions_found: int = 0
yanked_versions: int = 0
latest_updated: int = 0
latest_already_current: int = 0
cloud_registry_entries: int = 0
oss_registry_entries: int = 0
composite_registry_entries: int = 0
metrics_connector_count: int = 0
metrics_registry_entries: int = 0
metrics_source: str | None = None
metrics_error: str | None = None
version_indexes_written: int = 0
specs_secrets_mask_properties: int = 0
errors: list[str]
dry_run: bool = False
status: str
129    @property
130    def status(self) -> str:
131        if self.dry_run:
132            return "dry-run"
133        if self.errors:
134            return "completed-with-errors"
135        return "success"
def summary(self) -> str:
137    def summary(self) -> str:
138        return (
139            f"[{self.status}] Scanned {self.connectors_scanned} connectors, "
140            f"{self.versions_found} versions ({self.yanked_versions} yanked). "
141            f"Latest updated: {self.latest_updated}, "
142            f"already current: {self.latest_already_current}. "
143            f"Registry entries: cloud={self.cloud_registry_entries}, "
144            f"oss={self.oss_registry_entries}, "
145            f"composite={self.composite_registry_entries}. "
146            f"Metrics loaded for {self.metrics_connector_count} connectors, "
147            f"injected into {self.metrics_registry_entries} registry entries. "
148            f"Version indexes: {self.version_indexes_written}. "
149            f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. "
150            f"Errors: {len(self.errors)}."
151        )
class ConnectorLanguage(enum.StrEnum):
 88class ConnectorLanguage(StrEnum):
 89    """Connector implementation languages."""
 90
 91    PYTHON = "python"
 92    JAVA = "java"
 93    LOW_CODE = "low-code"
 94    MANIFEST_ONLY = "manifest-only"
 95
 96    @classmethod
 97    def parse(cls, value: str) -> ConnectorLanguage:
 98        """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch."""
 99        try:
100            return cls(value)
101        except ValueError:
102            valid = ", ".join(f"`{m.value}`" for m in cls)
103            raise ValueError(
104                f"Unrecognized language: {value!r}. Expected one of: {valid}."
105            ) from None

Connector implementation languages.

PYTHON = <ConnectorLanguage.PYTHON: 'python'>
JAVA = <ConnectorLanguage.JAVA: 'java'>
LOW_CODE = <ConnectorLanguage.LOW_CODE: 'low-code'>
MANIFEST_ONLY = <ConnectorLanguage.MANIFEST_ONLY: 'manifest-only'>
@classmethod
def parse(cls, value: str) -> ConnectorLanguage:
 96    @classmethod
 97    def parse(cls, value: str) -> ConnectorLanguage:
 98        """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch."""
 99        try:
100            return cls(value)
101        except ValueError:
102            valid = ", ".join(f"`{m.value}`" for m in cls)
103            raise ValueError(
104                f"Unrecognized language: {value!r}. Expected one of: {valid}."
105            ) from None

Parse a string into a ConnectorLanguage, raising ValueError on mismatch.

class ConnectorListResult(pydantic.main.BaseModel):
73class ConnectorListResult(BaseModel):
74    """Result of listing connectors in the registry."""
75
76    bucket_name: str = Field(description="The GCS bucket name")
77    connector_count: int = Field(description="Number of connectors found")
78    connectors: list[str] = Field(description="List of connector names")

Result of listing connectors in the registry.

bucket_name: str = PydanticUndefined

The GCS bucket name

connector_count: int = PydanticUndefined

Number of connectors found

connectors: list[str] = PydanticUndefined

List of connector names

class ConnectorMetadata(pydantic.main.BaseModel):
12class ConnectorMetadata(BaseModel):
13    """Connector metadata from metadata.yaml.
14
15    This model represents the essential metadata about a connector
16    read from its metadata.yaml file in the Airbyte monorepo.
17    """
18
19    name: str = Field(description="The connector technical name")
20    docker_repository: str = Field(description="The Docker repository")
21    docker_image_tag: str = Field(description="The Docker image tag/version")
22    support_level: str | None = Field(
23        default=None, description="The support level (certified, community, etc.)"
24    )
25    definition_id: str | None = Field(
26        default=None, description="The connector definition ID"
27    )

Connector metadata from metadata.yaml.

This model represents the essential metadata about a connector read from its metadata.yaml file in the Airbyte monorepo.

name: str = PydanticUndefined

The connector technical name

docker_repository: str = PydanticUndefined

The Docker repository

docker_image_tag: str = PydanticUndefined

The Docker image tag/version

support_level: str | None = None

The support level (certified, community, etc.)

definition_id: str | None = None

The connector definition ID

class ConnectorType(enum.StrEnum):
70class ConnectorType(StrEnum):
71    """Connector type: source or destination."""
72
73    SOURCE = "source"
74    DESTINATION = "destination"
75
76    @classmethod
77    def parse(cls, value: str) -> ConnectorType:
78        """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch."""
79        try:
80            return cls(value)
81        except ValueError:
82            valid = ", ".join(f"`{m.value}`" for m in cls)
83            raise ValueError(
84                f"Unrecognized connector type: {value!r}. Expected one of: {valid}."
85            ) from None

Connector type: source or destination.

SOURCE = <ConnectorType.SOURCE: 'source'>
DESTINATION = <ConnectorType.DESTINATION: 'destination'>
@classmethod
def parse(cls, value: str) -> ConnectorType:
76    @classmethod
77    def parse(cls, value: str) -> ConnectorType:
78        """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch."""
79        try:
80            return cls(value)
81        except ValueError:
82            valid = ", ".join(f"`{m.value}`" for m in cls)
83            raise ValueError(
84                f"Unrecognized connector type: {value!r}. Expected one of: {valid}."
85            ) from None

Parse a string into a ConnectorType, raising ValueError on mismatch.

@dataclass
class GenerateResult:
127@dataclass
128class GenerateResult:
129    """Result of a local artifact generation run."""
130
131    connector_name: str
132    version: str
133    docker_image: str
134    output_dir: str
135    artifacts_written: list[str] = field(default_factory=list)
136    errors: list[str] = field(default_factory=list)
137    validation_errors: list[str] = field(default_factory=list)
138    dry_run: bool = False
139
140    @property
141    def success(self) -> bool:
142        return len(self.errors) == 0 and len(self.validation_errors) == 0
143
144    def to_dict(self) -> dict[str, Any]:
145        return {
146            "connector_name": self.connector_name,
147            "version": self.version,
148            "docker_image": self.docker_image,
149            "output_dir": self.output_dir,
150            "artifacts_written": self.artifacts_written,
151            "errors": self.errors,
152            "validation_errors": self.validation_errors,
153            "dry_run": self.dry_run,
154            "success": self.success,
155        }

Result of a local artifact generation run.

GenerateResult( connector_name: str, version: str, docker_image: str, output_dir: str, artifacts_written: list[str] = <factory>, errors: list[str] = <factory>, validation_errors: list[str] = <factory>, dry_run: bool = False)
connector_name: str
version: str
docker_image: str
output_dir: str
artifacts_written: list[str]
errors: list[str]
validation_errors: list[str]
dry_run: bool = False
success: bool
140    @property
141    def success(self) -> bool:
142        return len(self.errors) == 0 and len(self.validation_errors) == 0
def to_dict(self) -> dict[str, typing.Any]:
144    def to_dict(self) -> dict[str, Any]:
145        return {
146            "connector_name": self.connector_name,
147            "version": self.version,
148            "docker_image": self.docker_image,
149            "output_dir": self.output_dir,
150            "artifacts_written": self.artifacts_written,
151            "errors": self.errors,
152            "validation_errors": self.validation_errors,
153            "dry_run": self.dry_run,
154            "success": self.success,
155        }
class MetadataPublishResult(pydantic.main.BaseModel):
30class MetadataPublishResult(BaseModel):
31    """Result of a metadata publish operation to GCS.
32
33    This model provides detailed information about the outcome of
34    publishing connector metadata to the registry.
35    """
36
37    connector_name: str = Field(description="The connector technical name")
38    version: str = Field(description="The version that was published")
39    bucket_name: str = Field(description="The GCS bucket name")
40    versioned_path: str = Field(description="The versioned GCS path")
41    latest_path: str | None = Field(
42        default=None, description="The latest GCS path if updated"
43    )
44    versioned_uploaded: bool = Field(
45        default=False, description="Whether the versioned metadata was uploaded"
46    )
47    latest_uploaded: bool = Field(
48        default=False, description="Whether the latest metadata was uploaded"
49    )
50    status: Literal["success", "dry-run", "already-up-to-date"] = Field(
51        description="The status of the operation"
52    )
53    message: str = Field(description="Status message describing the outcome")
54
55    def __str__(self) -> str:
56        """Return a string representation of the publish result."""
57        return f"[{self.status}] {self.connector_name}:{self.version} -> {self.versioned_path}"

Result of a metadata publish operation to GCS.

This model provides detailed information about the outcome of publishing connector metadata to the registry.

connector_name: str = PydanticUndefined

The connector technical name

version: str = PydanticUndefined

The version that was published

bucket_name: str = PydanticUndefined

The GCS bucket name

versioned_path: str = PydanticUndefined

The versioned GCS path

latest_path: str | None = None

The latest GCS path if updated

versioned_uploaded: bool = False

Whether the versioned metadata was uploaded

latest_uploaded: bool = False

Whether the latest metadata was uploaded

status: Literal['success', 'dry-run', 'already-up-to-date'] = PydanticUndefined

The status of the operation

message: str = PydanticUndefined

Status message describing the outcome

OutputMode = typing.Literal['local', 'gcs', 's3']
@dataclass
class PublishArtifactsResult:
51@dataclass
52class PublishArtifactsResult:
53    """Result of a version-artifacts publish operation."""
54
55    connector_name: str
56    version: str
57    target: str
58    gcs_destination: str
59    files_uploaded: list[str] = field(default_factory=list)
60    errors: list[str] = field(default_factory=list)
61    validation_errors: list[str] = field(default_factory=list)
62    dry_run: bool = False
63
64    @property
65    def success(self) -> bool:
66        return len(self.errors) == 0 and len(self.validation_errors) == 0
67
68    @property
69    def status(self) -> str:
70        if self.dry_run:
71            return "dry-run"
72        if self.errors or self.validation_errors:
73            return "completed-with-errors"
74        return "success"

Result of a version-artifacts publish operation.

PublishArtifactsResult( connector_name: str, version: str, target: str, gcs_destination: str, files_uploaded: list[str] = <factory>, errors: list[str] = <factory>, validation_errors: list[str] = <factory>, dry_run: bool = False)
connector_name: str
version: str
target: str
gcs_destination: str
files_uploaded: list[str]
errors: list[str]
validation_errors: list[str]
dry_run: bool = False
success: bool
64    @property
65    def success(self) -> bool:
66        return len(self.errors) == 0 and len(self.validation_errors) == 0
status: str
68    @property
69    def status(self) -> str:
70        if self.dry_run:
71            return "dry-run"
72        if self.errors or self.validation_errors:
73            return "completed-with-errors"
74        return "success"
@dataclass
class PurgeLatestResult:
154@dataclass
155class PurgeLatestResult:
156    """Result of a purge-latest operation."""
157
158    target: str
159    connectors_found: int = 0
160    latest_dirs_deleted: int = 0
161    errors: list[str] = field(default_factory=list)
162    dry_run: bool = False
163
164    @property
165    def status(self) -> str:
166        if self.dry_run:
167            return "dry-run"
168        if self.errors:
169            return "completed-with-errors"
170        return "success"
171
172    def summary(self) -> str:
173        return (
174            f"[{self.status}] Found {self.connectors_found} connectors, "
175            f"deleted {self.latest_dirs_deleted} latest/ directories. "
176            f"Errors: {len(self.errors)}."
177        )

Result of a purge-latest operation.

PurgeLatestResult( target: str, connectors_found: int = 0, latest_dirs_deleted: int = 0, errors: list[str] = <factory>, dry_run: bool = False)
target: str
connectors_found: int = 0
latest_dirs_deleted: int = 0
errors: list[str]
dry_run: bool = False
status: str
164    @property
165    def status(self) -> str:
166        if self.dry_run:
167            return "dry-run"
168        if self.errors:
169            return "completed-with-errors"
170        return "success"
def summary(self) -> str:
172    def summary(self) -> str:
173        return (
174            f"[{self.status}] Found {self.connectors_found} connectors, "
175            f"deleted {self.latest_dirs_deleted} latest/ directories. "
176            f"Errors: {len(self.errors)}."
177        )
@dataclass
class RebuildResult:
43@dataclass
44class RebuildResult:
45    """Result of a registry rebuild operation."""
46
47    source_bucket: str
48    output_mode: OutputMode
49    output_root: str
50    connectors_processed: int = 0
51    blobs_copied: int = 0
52    blobs_skipped: int = 0
53    errors: list[str] = field(default_factory=list)
54    dry_run: bool = False
55
56    @property
57    def status(self) -> str:
58        """Return the status of the rebuild operation."""
59        if self.dry_run:
60            return "dry-run"
61        if self.errors:
62            return "completed-with-errors"
63        return "success"
64
65    def summary(self) -> str:
66        """Return a human-readable summary."""
67        return (
68            f"[{self.status}] Rebuilt {self.connectors_processed} connectors, "
69            f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, "
70            f"{len(self.errors)} errors. Output: {self.output_root}"
71        )

Result of a registry rebuild operation.

RebuildResult( source_bucket: str, output_mode: Literal['local', 'gcs', 's3'], output_root: str, connectors_processed: int = 0, blobs_copied: int = 0, blobs_skipped: int = 0, errors: list[str] = <factory>, dry_run: bool = False)
source_bucket: str
output_mode: Literal['local', 'gcs', 's3']
output_root: str
connectors_processed: int = 0
blobs_copied: int = 0
blobs_skipped: int = 0
errors: list[str]
dry_run: bool = False
status: str
56    @property
57    def status(self) -> str:
58        """Return the status of the rebuild operation."""
59        if self.dry_run:
60            return "dry-run"
61        if self.errors:
62            return "completed-with-errors"
63        return "success"

Return the status of the rebuild operation.

def summary(self) -> str:
65    def summary(self) -> str:
66        """Return a human-readable summary."""
67        return (
68            f"[{self.status}] Rebuilt {self.connectors_processed} connectors, "
69            f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, "
70            f"{len(self.errors)} errors. Output: {self.output_root}"
71        )

Return a human-readable summary.

class Registry(abc.ABC):
 40class Registry(ABC):
 41    """A configured connector registry store.
 42
 43    A Registry is bound to a specific `airbyte_ops_mcp.registry.store.RegistryStore`
 44    (store type + env + optional prefix), and provides methods used by the CLI to
 45    read/write registry contents.
 46    """
 47
 48    def __init__(self, store: RegistryStore) -> None:
 49        self.store = store
 50
 51    @property
 52    def store_type(self) -> StoreType:
 53        return self.store.store_type
 54
 55    @property
 56    def bucket_name(self) -> str:
 57        return self.store.bucket
 58
 59    @property
 60    def prefix(self) -> str:
 61        return self.store.prefix
 62
 63    def _require_no_prefix(self, op_name: str) -> None:
 64        """Raise if this op doesn't support prefixed targets."""
 65
 66        if self.prefix:
 67            raise NotImplementedError(
 68                f"Operation '{op_name}' does not yet support store prefixes (got prefix='{self.prefix}')."
 69            )
 70
 71    # ---------------------------------------------------------------------
 72    # Read operations
 73    # ---------------------------------------------------------------------
 74
 75    @abstractmethod
 76    def list_connectors(
 77        self,
 78        *,
 79        support_level: SupportLevel | None = None,
 80        min_support_level: SupportLevel | None = None,
 81        connector_type: ConnectorType | None = None,
 82        language: ConnectorLanguage | None = None,
 83    ) -> list[str]:
 84        raise NotImplementedError(
 85            _op_not_implemented_message(self.store_type, "list_connectors")
 86        )
 87
 88    def list_connector_versions(self, connector_name: str) -> list[str]:
 89        raise NotImplementedError(
 90            _op_not_implemented_message(self.store_type, "list_connector_versions")
 91        )
 92
 93    def get_connector_metadata(
 94        self, connector_name: str, version: str = "latest"
 95    ) -> dict[str, Any]:
 96        raise NotImplementedError(
 97            _op_not_implemented_message(self.store_type, "get_connector_metadata")
 98        )
 99
100    # ---------------------------------------------------------------------
101    # Write / mutate operations
102    # ---------------------------------------------------------------------
103
104    def yank(
105        self,
106        connector_name: str,
107        version: str,
108        reason: str = "",
109        approval_url: str = "",
110        dry_run: bool = False,
111    ) -> YankResult:
112        raise NotImplementedError(_op_not_implemented_message(self.store_type, "yank"))
113
114    def unyank(
115        self,
116        connector_name: str,
117        version: str,
118        dry_run: bool = False,
119    ) -> YankResult:
120        raise NotImplementedError(
121            _op_not_implemented_message(self.store_type, "unyank")
122        )
123
124    def finalize_progressive_rollout_marker(
125        self,
126        connector_name: str,
127        outcome: Literal["promoted", "aborted"],
128        version: str | None = None,
129        dry_run: bool = False,
130    ) -> ProgressiveRolloutMarkerResult:
131        raise NotImplementedError(
132            _op_not_implemented_message(
133                self.store_type,
134                "finalize_progressive_rollout_marker",
135            )
136        )
137
138    def publish_version_artifacts(
139        self,
140        connector_name: str,
141        version: str,
142        artifacts_dir: Path,
143        dry_run: bool = False,
144        with_validate: bool = True,
145    ) -> PublishArtifactsResult:
146        raise NotImplementedError(
147            _op_not_implemented_message(self.store_type, "publish_version_artifacts")
148        )
149
150    def delete_dev_latest(
151        self,
152        connector_name: list[str] | None = None,
153        dry_run: bool = False,
154    ) -> PurgeLatestResult:
155        raise NotImplementedError(
156            _op_not_implemented_message(self.store_type, "delete_dev_latest")
157        )
158
159    def compile(
160        self,
161        connector_name: list[str] | None = None,
162        dry_run: bool = False,
163        with_secrets_mask: bool = False,
164        with_legacy_migration: str | None = None,
165        with_metrics: bool = True,
166        force: bool = False,
167    ) -> CompileResult:
168        raise NotImplementedError(
169            _op_not_implemented_message(self.store_type, "compile")
170        )
171
172    def marketing_stubs_check(self, repo_root: Path) -> dict[str, Any]:
173        raise NotImplementedError(
174            _op_not_implemented_message(self.store_type, "marketing_stubs_check")
175        )
176
177    def marketing_stubs_sync(
178        self,
179        repo_root: Path,
180        dry_run: bool = False,
181    ) -> dict[str, Any]:
182        raise NotImplementedError(
183            _op_not_implemented_message(self.store_type, "marketing_stubs_sync")
184        )
185
186    def mirror(
187        self,
188        output_mode: OutputMode,
189        output_path_root: str | None = None,
190        gcs_bucket: str | None = None,
191        s3_bucket: str | None = None,
192        dry_run: bool = False,
193        connector_name: list[str] | None = None,
194    ) -> RebuildResult:
195        raise NotImplementedError(
196            _op_not_implemented_message(self.store_type, "mirror")
197        )

A configured connector registry store.

A Registry is bound to a specific airbyte_ops_mcp.registry.store.RegistryStore (store type + env + optional prefix), and provides methods used by the CLI to read/write registry contents.

store
store_type: StoreType
51    @property
52    def store_type(self) -> StoreType:
53        return self.store.store_type
bucket_name: str
55    @property
56    def bucket_name(self) -> str:
57        return self.store.bucket
prefix: str
59    @property
60    def prefix(self) -> str:
61        return self.store.prefix
@abstractmethod
def list_connectors( self, *, support_level: SupportLevel | None = None, min_support_level: SupportLevel | None = None, connector_type: ConnectorType | None = None, language: ConnectorLanguage | None = None) -> list[str]:
75    @abstractmethod
76    def list_connectors(
77        self,
78        *,
79        support_level: SupportLevel | None = None,
80        min_support_level: SupportLevel | None = None,
81        connector_type: ConnectorType | None = None,
82        language: ConnectorLanguage | None = None,
83    ) -> list[str]:
84        raise NotImplementedError(
85            _op_not_implemented_message(self.store_type, "list_connectors")
86        )
def list_connector_versions(self, connector_name: str) -> list[str]:
88    def list_connector_versions(self, connector_name: str) -> list[str]:
89        raise NotImplementedError(
90            _op_not_implemented_message(self.store_type, "list_connector_versions")
91        )
def get_connector_metadata( self, connector_name: str, version: str = 'latest') -> dict[str, typing.Any]:
93    def get_connector_metadata(
94        self, connector_name: str, version: str = "latest"
95    ) -> dict[str, Any]:
96        raise NotImplementedError(
97            _op_not_implemented_message(self.store_type, "get_connector_metadata")
98        )
def yank( self, connector_name: str, version: str, reason: str = '', approval_url: str = '', dry_run: bool = False) -> YankResult:
104    def yank(
105        self,
106        connector_name: str,
107        version: str,
108        reason: str = "",
109        approval_url: str = "",
110        dry_run: bool = False,
111    ) -> YankResult:
112        raise NotImplementedError(_op_not_implemented_message(self.store_type, "yank"))
def unyank( self, connector_name: str, version: str, dry_run: bool = False) -> YankResult:
114    def unyank(
115        self,
116        connector_name: str,
117        version: str,
118        dry_run: bool = False,
119    ) -> YankResult:
120        raise NotImplementedError(
121            _op_not_implemented_message(self.store_type, "unyank")
122        )
def finalize_progressive_rollout_marker( self, connector_name: str, outcome: Literal['promoted', 'aborted'], version: str | None = None, dry_run: bool = False) -> airbyte_ops_mcp.registry.progressive_rollout_marker.ProgressiveRolloutMarkerResult:
124    def finalize_progressive_rollout_marker(
125        self,
126        connector_name: str,
127        outcome: Literal["promoted", "aborted"],
128        version: str | None = None,
129        dry_run: bool = False,
130    ) -> ProgressiveRolloutMarkerResult:
131        raise NotImplementedError(
132            _op_not_implemented_message(
133                self.store_type,
134                "finalize_progressive_rollout_marker",
135            )
136        )
def publish_version_artifacts( self, connector_name: str, version: str, artifacts_dir: pathlib.Path, dry_run: bool = False, with_validate: bool = True) -> PublishArtifactsResult:
138    def publish_version_artifacts(
139        self,
140        connector_name: str,
141        version: str,
142        artifacts_dir: Path,
143        dry_run: bool = False,
144        with_validate: bool = True,
145    ) -> PublishArtifactsResult:
146        raise NotImplementedError(
147            _op_not_implemented_message(self.store_type, "publish_version_artifacts")
148        )
def delete_dev_latest( self, connector_name: list[str] | None = None, dry_run: bool = False) -> PurgeLatestResult:
150    def delete_dev_latest(
151        self,
152        connector_name: list[str] | None = None,
153        dry_run: bool = False,
154    ) -> PurgeLatestResult:
155        raise NotImplementedError(
156            _op_not_implemented_message(self.store_type, "delete_dev_latest")
157        )
def compile( self, connector_name: list[str] | None = None, dry_run: bool = False, with_secrets_mask: bool = False, with_legacy_migration: str | None = None, with_metrics: bool = True, force: bool = False) -> CompileResult:
159    def compile(
160        self,
161        connector_name: list[str] | None = None,
162        dry_run: bool = False,
163        with_secrets_mask: bool = False,
164        with_legacy_migration: str | None = None,
165        with_metrics: bool = True,
166        force: bool = False,
167    ) -> CompileResult:
168        raise NotImplementedError(
169            _op_not_implemented_message(self.store_type, "compile")
170        )
def marketing_stubs_check(self, repo_root: pathlib.Path) -> dict[str, typing.Any]:
172    def marketing_stubs_check(self, repo_root: Path) -> dict[str, Any]:
173        raise NotImplementedError(
174            _op_not_implemented_message(self.store_type, "marketing_stubs_check")
175        )
def marketing_stubs_sync( self, repo_root: pathlib.Path, dry_run: bool = False) -> dict[str, typing.Any]:
177    def marketing_stubs_sync(
178        self,
179        repo_root: Path,
180        dry_run: bool = False,
181    ) -> dict[str, Any]:
182        raise NotImplementedError(
183            _op_not_implemented_message(self.store_type, "marketing_stubs_sync")
184        )
def mirror( self, output_mode: Literal['local', 'gcs', 's3'], output_path_root: str | None = None, gcs_bucket: str | None = None, s3_bucket: str | None = None, dry_run: bool = False, connector_name: list[str] | None = None) -> RebuildResult:
186    def mirror(
187        self,
188        output_mode: OutputMode,
189        output_path_root: str | None = None,
190        gcs_bucket: str | None = None,
191        s3_bucket: str | None = None,
192        dry_run: bool = False,
193        connector_name: list[str] | None = None,
194    ) -> RebuildResult:
195        raise NotImplementedError(
196            _op_not_implemented_message(self.store_type, "mirror")
197        )
class RegistryEntryResult(pydantic.main.BaseModel):
60class RegistryEntryResult(BaseModel):
61    """Result of reading a registry entry from GCS.
62
63    This model wraps the raw metadata dictionary with additional context.
64    """
65
66    connector_name: str = Field(description="The connector technical name")
67    version: str = Field(description="The version that was read")
68    bucket_name: str = Field(description="The GCS bucket name")
69    gcs_path: str = Field(description="The GCS path that was read")
70    metadata: dict = Field(description="The raw metadata dictionary")

Result of reading a registry entry from GCS.

This model wraps the raw metadata dictionary with additional context.

connector_name: str = PydanticUndefined

The connector technical name

version: str = PydanticUndefined

The version that was read

bucket_name: str = PydanticUndefined

The GCS bucket name

gcs_path: str = PydanticUndefined

The GCS path that was read

metadata: dict = PydanticUndefined

The raw metadata dictionary

@dataclass(frozen=True, kw_only=True)
class RegistryStore:
132@dataclass(frozen=True, kw_only=True)
133class RegistryStore:
134    """Parsed store target (type + environment + optional prefix).
135
136    Examples::
137
138        RegistryStore.parse("coral:dev")
139        # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="")
140        RegistryStore.parse("coral:dev/aj-test")
141        # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test")
142        RegistryStore.parse("sonar:prod")
143        # -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="")
144    """
145
146    store_type: StoreType
147    env: str
148    prefix: str = ""
149
150    # -- Derived helpers -----------------------------------------------------
151
152    @property
153    def bucket(self) -> str:
154        """Resolve the concrete bucket name for this target."""
155        env_map = BUCKET_MAP.get(self.store_type)
156        if env_map is None:
157            raise ValueError(f"Unknown store type: {self.store_type!r}")
158        bucket_name = env_map.get(self.env)
159        if bucket_name is None:
160            raise ValueError(
161                f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. "
162                f"Expected one of: {', '.join(sorted(env_map))}."
163            )
164        return bucket_name
165
166    @property
167    def bucket_root(self) -> str:
168        """Bucket name with optional prefix appended (`bucket/prefix`)."""
169        if self.prefix:
170            return f"{self.bucket}/{self.prefix}"
171        return self.bucket
172
173    # -- Parsing -------------------------------------------------------------
174
175    @classmethod
176    def parse(cls, target: str) -> RegistryStore:
177        """Parse a store target string.
178
179        Accepted formats:
180
181            "coral:dev"
182
183            "coral:prod"
184            "coral:dev/aj-test100"
185            "sonar:prod"
186
187        Raises:
188            ValueError: If the string cannot be parsed or references an
189                unknown store type / environment.
190        """
191        if ":" not in target:
192            raise ValueError(
193                f"Invalid store target '{target}'. "
194                "Expected format: '<store_type>:<env>[/<prefix>]' "
195                "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')."
196            )
197
198        store_part, env_part = target.split(":", 1)
199
200        # Validate store type
201        store_part_lower = store_part.lower()
202        valid_types = {t.value: t for t in StoreType}
203        if store_part_lower not in valid_types:
204            raise ValueError(
205                f"Unknown store type '{store_part}'. "
206                f"Expected one of: {', '.join(sorted(valid_types))}."
207            )
208        store_type = valid_types[store_part_lower]
209
210        # Split env and prefix
211        env_key, _, prefix = env_part.partition("/")
212        prefix = prefix.strip("/")
213
214        # Validate env
215        env_map = BUCKET_MAP.get(store_type, {})
216        if env_key not in env_map:
217            raise ValueError(
218                f"Unknown environment '{env_key}' for store type '{store_type.value}'. "
219                f"Expected one of: {', '.join(sorted(env_map))}."
220            )
221
222        return cls(store_type=store_type, env=env_key, prefix=prefix)

Parsed store target (type + environment + optional prefix).

Examples::

RegistryStore.parse("coral:dev")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="")
RegistryStore.parse("coral:dev/aj-test")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test")
RegistryStore.parse("sonar:prod")
# -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="")
RegistryStore( *, store_type: StoreType, env: str, prefix: str = '')
store_type: StoreType
env: str
prefix: str = ''
bucket: str
152    @property
153    def bucket(self) -> str:
154        """Resolve the concrete bucket name for this target."""
155        env_map = BUCKET_MAP.get(self.store_type)
156        if env_map is None:
157            raise ValueError(f"Unknown store type: {self.store_type!r}")
158        bucket_name = env_map.get(self.env)
159        if bucket_name is None:
160            raise ValueError(
161                f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. "
162                f"Expected one of: {', '.join(sorted(env_map))}."
163            )
164        return bucket_name

Resolve the concrete bucket name for this target.

bucket_root: str
166    @property
167    def bucket_root(self) -> str:
168        """Bucket name with optional prefix appended (`bucket/prefix`)."""
169        if self.prefix:
170            return f"{self.bucket}/{self.prefix}"
171        return self.bucket

Bucket name with optional prefix appended (bucket/prefix).

@classmethod
def parse(cls, target: str) -> RegistryStore:
175    @classmethod
176    def parse(cls, target: str) -> RegistryStore:
177        """Parse a store target string.
178
179        Accepted formats:
180
181            "coral:dev"
182
183            "coral:prod"
184            "coral:dev/aj-test100"
185            "sonar:prod"
186
187        Raises:
188            ValueError: If the string cannot be parsed or references an
189                unknown store type / environment.
190        """
191        if ":" not in target:
192            raise ValueError(
193                f"Invalid store target '{target}'. "
194                "Expected format: '<store_type>:<env>[/<prefix>]' "
195                "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')."
196            )
197
198        store_part, env_part = target.split(":", 1)
199
200        # Validate store type
201        store_part_lower = store_part.lower()
202        valid_types = {t.value: t for t in StoreType}
203        if store_part_lower not in valid_types:
204            raise ValueError(
205                f"Unknown store type '{store_part}'. "
206                f"Expected one of: {', '.join(sorted(valid_types))}."
207            )
208        store_type = valid_types[store_part_lower]
209
210        # Split env and prefix
211        env_key, _, prefix = env_part.partition("/")
212        prefix = prefix.strip("/")
213
214        # Validate env
215        env_map = BUCKET_MAP.get(store_type, {})
216        if env_key not in env_map:
217            raise ValueError(
218                f"Unknown environment '{env_key}' for store type '{store_type.value}'. "
219                f"Expected one of: {', '.join(sorted(env_map))}."
220            )
221
222        return cls(store_type=store_type, env=env_key, prefix=prefix)

Parse a store target string.

Accepted formats:

"coral:dev"

"coral:prod" "coral:dev/aj-test100" "sonar:prod"

Raises:
  • ValueError: If the string cannot be parsed or references an unknown store type / environment.
class StoreType(builtins.str, enum.Enum):
 59class StoreType(str, Enum):
 60    """Registry store type identifier."""
 61
 62    SONAR = "sonar"
 63    CORAL = "coral"
 64
 65    # -- Auto-detection class methods ----------------------------------------
 66
 67    @classmethod
 68    def get_from_connector_name(cls, name: str) -> StoreType:
 69        """Infer the store type from a connector's technical name.
 70
 71        Connectors whose name starts with `source-` or `destination-` belong
 72        to the **coral** registry.  All other names belong to **sonar**.
 73
 74        Args:
 75            name: Connector technical name (e.g. `"source-github"` or `"stripe"`).
 76
 77        Returns:
 78            The inferred `StoreType`.
 79        """
 80        if name.startswith("source-") or name.startswith("destination-"):
 81            return cls.CORAL
 82        return cls.SONAR
 83
 84    @classmethod
 85    def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None:
 86        """Infer the store type from a repository working directory.
 87
 88        Checks for well-known directory markers:
 89
 90        * **sonar** -- `integrations/` alongside `connector-sdk/`
 91        * **coral** -- `airbyte-integrations/connectors/`
 92
 93        Args:
 94            path: Directory to inspect.  Defaults to `Path.cwd`.
 95
 96        Returns:
 97            The inferred `StoreType`, or `None` if the directory does
 98            not match any known registry repository layout.
 99        """
100        if path is None:
101            path = Path.cwd()
102
103        # Sonar markers
104        if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir():
105            return cls.SONAR
106
107        # Coral markers
108        if (path / "airbyte-integrations" / "connectors").is_dir():
109            return cls.CORAL
110
111        return None

Registry store type identifier.

SONAR = <StoreType.SONAR: 'sonar'>
CORAL = <StoreType.CORAL: 'coral'>
@classmethod
def get_from_connector_name(cls, name: str) -> StoreType:
67    @classmethod
68    def get_from_connector_name(cls, name: str) -> StoreType:
69        """Infer the store type from a connector's technical name.
70
71        Connectors whose name starts with `source-` or `destination-` belong
72        to the **coral** registry.  All other names belong to **sonar**.
73
74        Args:
75            name: Connector technical name (e.g. `"source-github"` or `"stripe"`).
76
77        Returns:
78            The inferred `StoreType`.
79        """
80        if name.startswith("source-") or name.startswith("destination-"):
81            return cls.CORAL
82        return cls.SONAR

Infer the store type from a connector's technical name.

Connectors whose name starts with source- or destination- belong to the coral registry. All other names belong to sonar.

Arguments:
  • name: Connector technical name (e.g. "source-github" or "stripe").
Returns:

The inferred StoreType.

@classmethod
def detect_from_repo_dir( cls, path: pathlib.Path | None = None) -> StoreType | None:
 84    @classmethod
 85    def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None:
 86        """Infer the store type from a repository working directory.
 87
 88        Checks for well-known directory markers:
 89
 90        * **sonar** -- `integrations/` alongside `connector-sdk/`
 91        * **coral** -- `airbyte-integrations/connectors/`
 92
 93        Args:
 94            path: Directory to inspect.  Defaults to `Path.cwd`.
 95
 96        Returns:
 97            The inferred `StoreType`, or `None` if the directory does
 98            not match any known registry repository layout.
 99        """
100        if path is None:
101            path = Path.cwd()
102
103        # Sonar markers
104        if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir():
105            return cls.SONAR
106
107        # Coral markers
108        if (path / "airbyte-integrations" / "connectors").is_dir():
109            return cls.CORAL
110
111        return None

Infer the store type from a repository working directory.

Checks for well-known directory markers:

  • sonar -- integrations/ alongside connector-sdk/
  • coral -- airbyte-integrations/connectors/
Arguments:
  • path: Directory to inspect. Defaults to Path.cwd.
Returns:

The inferred StoreType, or None if the directory does not match any known registry repository layout.

class SupportLevel(enum.StrEnum):
14class SupportLevel(StrEnum):
15    """Connector support levels ordered by precedence."""
16
17    ARCHIVED = "archived"
18    COMMUNITY = "community"
19    CERTIFIED = "certified"
20
21    @property
22    def precedence(self) -> int:
23        """Numeric precedence for ordering comparisons.
24
25        Higher values indicate higher support commitment.
26        """
27        return _SUPPORT_LEVEL_PRECEDENCE[self]
28
29    @classmethod
30    def from_precedence(cls, precedence: int) -> SupportLevel:
31        """Look up a `SupportLevel` by its numeric precedence value.
32
33        Raises `ValueError` when the precedence is not recognised.
34        """
35        for member in cls:
36            if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence:
37                return member
38        valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
39        raise ValueError(
40            f"Unrecognized support-level precedence: {precedence!r}. "
41            f"Expected one of: {valid}."
42        ) from None
43
44    @classmethod
45    def parse(cls, value: str) -> SupportLevel:
46        """Parse a string into a `SupportLevel`.
47
48        Accepts a keyword (`archived`, `community`, `certified`)
49        or a legacy integer string (`100`, `200`, `300`).
50
51        Raises `ValueError` when the value is not recognised.
52        """
53        try:
54            return cls(value)
55        except ValueError:
56            pass
57        # Fallback: try interpreting as an integer precedence value.
58        try:
59            return cls.from_precedence(int(value))
60        except (ValueError, KeyError):
61            pass
62        valid_kw = ", ".join(f"`{m.value}`" for m in cls)
63        valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
64        raise ValueError(
65            f"Unrecognized support level: {value!r}. "
66            f"Expected keyword ({valid_kw}) or integer ({valid_int})."
67        ) from None

Connector support levels ordered by precedence.

ARCHIVED = <SupportLevel.ARCHIVED: 'archived'>
COMMUNITY = <SupportLevel.COMMUNITY: 'community'>
CERTIFIED = <SupportLevel.CERTIFIED: 'certified'>
precedence: int
21    @property
22    def precedence(self) -> int:
23        """Numeric precedence for ordering comparisons.
24
25        Higher values indicate higher support commitment.
26        """
27        return _SUPPORT_LEVEL_PRECEDENCE[self]

Numeric precedence for ordering comparisons.

Higher values indicate higher support commitment.

@classmethod
def from_precedence(cls, precedence: int) -> SupportLevel:
29    @classmethod
30    def from_precedence(cls, precedence: int) -> SupportLevel:
31        """Look up a `SupportLevel` by its numeric precedence value.
32
33        Raises `ValueError` when the precedence is not recognised.
34        """
35        for member in cls:
36            if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence:
37                return member
38        valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
39        raise ValueError(
40            f"Unrecognized support-level precedence: {precedence!r}. "
41            f"Expected one of: {valid}."
42        ) from None

Look up a SupportLevel by its numeric precedence value.

Raises ValueError when the precedence is not recognised.

@classmethod
def parse(cls, value: str) -> SupportLevel:
44    @classmethod
45    def parse(cls, value: str) -> SupportLevel:
46        """Parse a string into a `SupportLevel`.
47
48        Accepts a keyword (`archived`, `community`, `certified`)
49        or a legacy integer string (`100`, `200`, `300`).
50
51        Raises `ValueError` when the value is not recognised.
52        """
53        try:
54            return cls(value)
55        except ValueError:
56            pass
57        # Fallback: try interpreting as an integer precedence value.
58        try:
59            return cls.from_precedence(int(value))
60        except (ValueError, KeyError):
61            pass
62        valid_kw = ", ".join(f"`{m.value}`" for m in cls)
63        valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls)
64        raise ValueError(
65            f"Unrecognized support level: {value!r}. "
66            f"Expected keyword ({valid_kw}) or integer ({valid_int})."
67        ) from None

Parse a string into a SupportLevel.

Accepts a keyword (archived, community, certified) or a legacy integer string (100, 200, 300).

Raises ValueError when the value is not recognised.

@dataclass
class UnpublishedConnector:
29@dataclass
30class UnpublishedConnector:
31    """A connector whose current local version is not published on GCS."""
32
33    connector_name: str
34    local_version: str

A connector whose current local version is not published on GCS.

UnpublishedConnector(connector_name: str, local_version: str)
connector_name: str
local_version: str
@dataclass(frozen=True)
class ValidateOptions:
37@dataclass(frozen=True)
38class ValidateOptions:
39    """Options that influence which validators run and how."""
40
41    docs_path: str | None = None
42    """Path to the connector's documentation file (for `validate_docs_path_exists`)."""
43
44    is_prerelease: bool = False
45    """Whether this is a pre-release build (skips version-decrement checks)."""

Options that influence which validators run and how.

ValidateOptions(docs_path: str | None = None, is_prerelease: bool = False)
docs_path: str | None = None

Path to the connector's documentation file (for validate_docs_path_exists).

is_prerelease: bool = False

Whether this is a pre-release build (skips version-decrement checks).

@dataclass
class ValidationResult:
48@dataclass
49class ValidationResult:
50    """Aggregate result from running all validators."""
51
52    passed: bool = True
53    errors: list[str] = field(default_factory=list)
54    validators_run: int = 0
55
56    def add_error(self, message: str) -> None:
57        self.passed = False
58        self.errors.append(message)

Aggregate result from running all validators.

ValidationResult( passed: bool = True, errors: list[str] = <factory>, validators_run: int = 0)
passed: bool = True
errors: list[str]
validators_run: int = 0
def add_error(self, message: str) -> None:
56    def add_error(self, message: str) -> None:
57        self.passed = False
58        self.errors.append(message)
class VersionListResult(pydantic.main.BaseModel):
81class VersionListResult(BaseModel):
82    """Result of listing versions for a connector."""
83
84    connector_name: str = Field(description="The connector technical name")
85    bucket_name: str = Field(description="The GCS bucket name")
86    version_count: int = Field(description="Number of versions found")
87    versions: list[str] = Field(description="List of version strings")

Result of listing versions for a connector.

connector_name: str = PydanticUndefined

The connector technical name

bucket_name: str = PydanticUndefined

The GCS bucket name

version_count: int = PydanticUndefined

Number of versions found

versions: list[str] = PydanticUndefined

List of version strings

@dataclass
class YankResult:
29@dataclass
30class YankResult:
31    """Result of a yank or unyank operation."""
32
33    connector_name: str
34    version: str
35    bucket_name: str
36    action: str  # "yank" or "unyank"
37    success: bool
38    message: str
39    dry_run: bool = False
40
41    def to_dict(self) -> dict[str, Any]:
42        """Convert to a dictionary for JSON serialization."""
43        return {
44            "connector_name": self.connector_name,
45            "version": self.version,
46            "bucket_name": self.bucket_name,
47            "action": self.action,
48            "success": self.success,
49            "message": self.message,
50            "dry_run": self.dry_run,
51        }

Result of a yank or unyank operation.

YankResult( connector_name: str, version: str, bucket_name: str, action: str, success: bool, message: str, dry_run: bool = False)
connector_name: str
version: str
bucket_name: str
action: str
success: bool
message: str
dry_run: bool = False
def to_dict(self) -> dict[str, typing.Any]:
41    def to_dict(self) -> dict[str, Any]:
42        """Convert to a dictionary for JSON serialization."""
43        return {
44            "connector_name": self.connector_name,
45            "version": self.version,
46            "bucket_name": self.bucket_name,
47            "action": self.action,
48            "success": self.success,
49            "message": self.message,
50            "dry_run": self.dry_run,
51        }

Convert to a dictionary for JSON serialization.

def compile_registry( *, store: RegistryStore, connector_name: list[str] | None = None, dry_run: bool = False, with_secrets_mask: bool = False, with_legacy_migration: str | None = None, with_metrics: bool = True, force: bool = False) -> CompileResult:
1581def compile_registry(
1582    *,
1583    store: RegistryStore,
1584    connector_name: list[str] | None = None,
1585    dry_run: bool = False,
1586    with_secrets_mask: bool = False,
1587    with_legacy_migration: str | None = None,
1588    with_metrics: bool = True,
1589    force: bool = False,
1590) -> CompileResult:
1591    """Compile the registry: sync latest/ dirs and write index files.
1592
1593    Steps:
1594        1. Glob for all `metadata.yaml` to discover (connector, version) pairs.
1595        2. Glob for active marker files.
1596        3. Compute the latest GA semver per connector.
1597        4. Compute active release candidates from versioned markers.
1598        5. Glob for `version=*` markers in `latest/` dirs for a fast check.
1599        6. Delete stale `latest/` dirs and recursively copy the versioned dir.
1600        7. Synthesize missing registry entries from pinned latest overrides.
1601        8. (Optional) Legacy migration: delete disabled registry entries.
1602        9. (Optional) Read latest connector metrics.
1603        10. Write global registry JSONs.
1604        11. Write composite registry JSON.
1605        12. Write per-connector `versions.json`.
1606        13. (Optional) Regenerate `specs_secrets_mask.yaml`.
1607
1608    Args:
1609        store: Registry store (bucket + optional prefix).
1610        connector_name: If provided, only resync `latest/` directories for
1611            these connectors (steps 5-6).  Index rebuilds (steps 9-12)
1612            always operate on the full set of connectors so that global
1613            registry files remain complete.
1614        dry_run: If True, report what would be done without writing.
1615        with_secrets_mask: If True, regenerate `specs_secrets_mask.yaml`.
1616        with_legacy_migration: If set, run the named migration step.
1617            Currently supported: `"v1"` — delete `{registry_type}.json`
1618            files for connectors whose `registryOverrides.{registry}.enabled`
1619            is `false`.
1620        with_metrics: If True, inject latest connector metrics from the
1621            analytics JSONL export into `generated.metrics`.
1622        force: If True, resync all connectors' latest/ directories even if the
1623            existing version marker matches the computed latest version. This
1624            is useful when metadata content changes without a version bump.
1625
1626    Returns:
1627        A `CompileResult` describing what was done.
1628    """
1629    if with_legacy_migration and with_legacy_migration not in LEGACY_MIGRATION_VERSIONS:
1630        raise ValueError(
1631            f"Unknown legacy migration version: {with_legacy_migration!r}. "
1632            f"Supported: {', '.join(LEGACY_MIGRATION_VERSIONS)}"
1633        )
1634
1635    result = CompileResult(target=store.bucket_root, dry_run=dry_run)
1636
1637    token = get_gcs_credentials_token()
1638    fs = gcsfs.GCSFileSystem(token=token)
1639
1640    # --- Steps 1 and 2: Scan versions and active markers ---
1641    # Always scan ALL connectors so that index rebuilds are complete.
1642    _log_progress("Step 1-2: Scanning versions and active markers...")
1643    connector_versions, yanked, progressive_rollouts = _scan_versions_and_markers(
1644        fs,
1645        store=store,
1646        connector_name=None,
1647    )
1648    result.connectors_scanned = len(connector_versions)
1649    result.versions_found = sum(len(v) for v in connector_versions.values())
1650    result.yanked_versions = len(yanked)
1651    _log_progress(
1652        "  Found %d connectors, %d versions, %d yanked",
1653        result.connectors_scanned,
1654        result.versions_found,
1655        result.yanked_versions,
1656    )
1657    _log_progress("  Found %d progressive rollout markers", len(progressive_rollouts))
1658
1659    # --- Step 3: Compute latest ---
1660    _log_progress("Step 3: Computing latest GA version per connector...")
1661    latest_versions = _compute_latest_versions(
1662        connector_versions=connector_versions,
1663        yanked=yanked,
1664        progressive_rollouts=progressive_rollouts,
1665    )
1666    _log_progress("  Computed latest for %d connectors", len(latest_versions))
1667
1668    # --- Step 4: Compute release candidates ---
1669    _log_progress("Step 4: Computing active release candidates...")
1670    rc_versions = _compute_release_candidates(
1671        connector_versions=connector_versions,
1672        yanked=yanked,
1673        progressive_rollouts=progressive_rollouts,
1674    )
1675    _log_progress("  Computed %d active release candidates", len(rc_versions))
1676
1677    # --- Step 5: Check existing latest markers ---
1678    # When --connector-name is set, only check/sync those connectors (steps 5-6).
1679    # Index rebuilds always use the full unfiltered data.
1680    if connector_name:
1681        connector_name_set = set(connector_name)
1682        sync_scope = {
1683            c: v for c, v in latest_versions.items() if c in connector_name_set
1684        }
1685        _log_progress(
1686            "  --connector-name filter: syncing %d of %d connectors",
1687            len(sync_scope),
1688            len(latest_versions),
1689        )
1690    else:
1691        sync_scope = latest_versions
1692
1693    _log_progress("Step 5: Checking existing latest/ markers...")
1694    sync_scope_names = list(sync_scope) if connector_name else None
1695    existing_markers = _scan_latest_markers(
1696        fs,
1697        store=store,
1698        connector_name=sync_scope_names,
1699    )
1700    _log_progress("  Found %d existing markers", len(existing_markers))
1701
1702    stale_connectors: list[str] = []
1703    pinned_override_synthesis_connectors: list[str] = []
1704    for connector, expected_version in sync_scope.items():
1705        current_marker = existing_markers.get(connector)
1706        if force or current_marker != expected_version:
1707            stale_connectors.append(connector)
1708            continue
1709
1710        if _requires_pinned_override_synthesis(
1711            fs,
1712            store=store,
1713            connector=connector,
1714            version=expected_version,
1715        ):
1716            pinned_override_synthesis_connectors.append(connector)
1717            continue
1718
1719        result.latest_already_current += 1
1720
1721    _log_progress(
1722        "  %d connectors need latest/ update, %d already current",
1723        len(stale_connectors),
1724        result.latest_already_current,
1725    )
1726
1727    # --- Step 6: Resync stale latest/ dirs (parallel) ---
1728    if stale_connectors:
1729        _log_progress(
1730            "Step 6: Syncing %d stale latest/ directories (max_workers=%d)...",
1731            len(stale_connectors),
1732            _COMPILE_SYNC_MAX_WORKERS,
1733        )
1734
1735        def _sync_one_connector(connector: str) -> None:
1736            """Sync a single connector's latest/ dir."""
1737            version = latest_versions[connector]
1738            _sync_latest_dir(
1739                fs,
1740                store=store,
1741                connector=connector,
1742                version=version,
1743                dry_run=dry_run,
1744            )
1745            if not dry_run:
1746                _apply_overrides_to_latest_entry(
1747                    fs,
1748                    store=store,
1749                    connector=connector,
1750                    version=version,
1751                )
1752
1753        sorted_stale = sorted(stale_connectors)
1754        with ThreadPoolExecutor(max_workers=_COMPILE_SYNC_MAX_WORKERS) as pool:
1755            futures = {pool.submit(_sync_one_connector, c): c for c in sorted_stale}
1756            for i, future in enumerate(as_completed(futures), 1):
1757                connector = futures[future]
1758                try:
1759                    future.result()
1760                    result.latest_updated += 1
1761                except Exception as exc:
1762                    error_msg = f"Failed to sync latest/ for {connector}: {exc}"
1763                    logger.error(error_msg)
1764                    result.errors.append(error_msg)
1765                    # Delete the (possibly partial) latest/ dir so the next
1766                    # compile retries this connector from scratch.
1767                    try:
1768                        _delete_latest_dir(
1769                            fs,
1770                            store=store,
1771                            connector=connector,
1772                        )
1773                        logger.info(
1774                            "Cleaned up partial latest/ for %s after failure",
1775                            connector,
1776                        )
1777                    except Exception as cleanup_exc:
1778                        logger.warning(
1779                            "Could not clean up latest/ for %s: %s",
1780                            connector,
1781                            cleanup_exc,
1782                        )
1783                if i % 100 == 0:
1784                    _log_progress("  Synced %d / %d...", i, len(sorted_stale))
1785    else:
1786        _log_progress("Step 6: All latest/ directories are current, nothing to sync.")
1787
1788    # --- Step 7: Synthesize missing latest entries from pinned overrides ---
1789    if pinned_override_synthesis_connectors:
1790        _log_progress(
1791            "Step 7: Synthesizing %d latest/ registry entries from pinned overrides...",
1792            len(pinned_override_synthesis_connectors),
1793        )
1794        for connector in sorted(pinned_override_synthesis_connectors):
1795            if dry_run:
1796                _log_progress(
1797                    "  [DRY RUN] Would synthesize pinned latest entries for %s",
1798                    connector,
1799                )
1800                result.latest_updated += 1
1801                continue
1802            try:
1803                _apply_overrides_to_latest_entry(
1804                    fs,
1805                    store=store,
1806                    connector=connector,
1807                    version=sync_scope[connector],
1808                )
1809                result.latest_updated += 1
1810            except Exception as exc:
1811                error_msg = (
1812                    f"Failed to synthesize pinned latest entries for {connector}: {exc}"
1813                )
1814                logger.error(error_msg)
1815                result.errors.append(error_msg)
1816
1817    # --- Step 8: Legacy migration (optional) ---
1818    if with_legacy_migration == "v1":
1819        _log_progress(
1820            "Step 8: Legacy migration v1 — deleting disabled registry entries..."
1821        )
1822        migration_deleted = _cleanup_disabled_registry_entries(
1823            fs,
1824            store=store,
1825            connector_versions=connector_versions,
1826            dry_run=dry_run,
1827        )
1828        total_deleted = sum(len(v) for v in migration_deleted.values())
1829        if migration_deleted:
1830            for conn, paths in sorted(migration_deleted.items()):
1831                _log_progress(
1832                    "  %s: %s %d files",
1833                    conn,
1834                    "would delete" if dry_run else "deleted",
1835                    len(paths),
1836                )
1837        _log_progress(
1838            "  Migration v1: %s %d files across %d connectors",
1839            "would delete" if dry_run else "deleted",
1840            total_deleted,
1841            len(migration_deleted),
1842        )
1843
1844    # --- Step 9: Read latest connector metrics (optional) ---
1845    metrics_bundle = None
1846    if with_metrics and store.store_type == StoreType.CORAL:
1847        _log_progress("Step 9: Reading latest connector metrics JSONL...")
1848        try:
1849            metrics_bundle = read_latest_connector_metrics()
1850            result.metrics_source = metrics_bundle.blob_path
1851            result.metrics_connector_count = metrics_bundle.connector_count
1852            if metrics_bundle.blob_path:
1853                _log_progress(
1854                    "  Loaded metrics for %d connectors from gs://%s",
1855                    metrics_bundle.connector_count,
1856                    metrics_bundle.blob_path,
1857                )
1858            else:
1859                _log_progress("  No connector metrics JSONL file found.")
1860        except Exception as exc:
1861            error_msg = f"Failed to read connector metrics JSONL: {exc}"
1862            logger.warning(error_msg)
1863            result.metrics_error = error_msg
1864            _log_progress("  %s", error_msg)
1865    elif with_metrics:
1866        _log_progress("Step 9: Skipping connector metrics for non-coral registry.")
1867    else:
1868        _log_progress("Step 9: Connector metrics injection disabled.")
1869
1870    # --- Step 10: Compile global registry JSONs ---
1871    _log_progress("Step 10: Compiling global registry JSON files...")
1872    all_registry_entries: list[dict[str, Any]] = []  # collected for Step 13
1873    entries_by_registry_type: dict[str, list[dict[str, Any]]] = {}
1874    for registry_type in VALID_REGISTRIES:
1875        entries = _compile_global_registry(
1876            fs,
1877            store=store,
1878            latest_versions=latest_versions,
1879            registry_type=registry_type,
1880        )
1881
1882        # Inject release candidate info into entries that have active RCs.
1883        if rc_versions:
1884            rc_entries: dict[str, dict[str, Any]] = {}
1885            for connector, rc_ver in rc_versions.items():
1886                rc_entry = _read_rc_registry_entry(
1887                    fs,
1888                    store=store,
1889                    connector=connector,
1890                    rc_version=rc_ver,
1891                    registry_type=registry_type,
1892                )
1893                if rc_entry:
1894                    docker_repo = rc_entry.get(
1895                        "dockerRepository",
1896                        f"airbyte/{connector}",
1897                    )
1898                    rc_entries[docker_repo] = {
1899                        "version": rc_ver,
1900                        "entry": rc_entry,
1901                    }
1902            if rc_entries:
1903                entries = _apply_release_candidates_to_entries(entries, rc_entries)
1904                _log_progress(
1905                    "  Injected %d release candidates into %s registry",
1906                    len(rc_entries),
1907                    registry_type,
1908                )
1909
1910        if metrics_bundle is not None:
1911            injected = apply_metrics_to_registry_entries(entries, metrics_bundle)
1912            result.metrics_registry_entries += injected
1913            _log_progress(
1914                "  Injected metrics into %d %s registry entries",
1915                injected,
1916                registry_type,
1917            )
1918
1919        all_registry_entries.extend(entries)
1920        entries_by_registry_type[registry_type] = entries
1921        registry_json = _build_global_registry_json(entries)
1922        entry_count = len(registry_json["sources"]) + len(registry_json["destinations"])
1923
1924        if registry_type == "cloud":
1925            result.cloud_registry_entries = entry_count
1926        else:
1927            result.oss_registry_entries = entry_count
1928
1929        if dry_run:
1930            _log_progress(
1931                "  [DRY RUN] Would write %s_registry.json (%d entries)",
1932                registry_type,
1933                entry_count,
1934            )
1935        else:
1936            content = json.dumps(registry_json, indent=2, sort_keys=True) + "\n"
1937            path_prefix = f"{store.prefix}/" if store.prefix else ""
1938            _write_gcs_blob_with_custom_ttl(
1939                bucket_name=store.bucket,
1940                blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/{registry_type}_registry.json",
1941                content=content,
1942                cache_control=_REGISTRY_INDEX_CACHE_CONTROL,
1943            )
1944            _log_progress(
1945                "  Wrote %s_registry.json (%d entries)",
1946                registry_type,
1947                entry_count,
1948            )
1949
1950    # --- Step 11: Compile composite registry JSON (superset) ---
1951    _log_progress("Step 11: Compiling composite_registry.json (superset)...")
1952    composite_json = _build_composite_registry_json(
1953        cloud_entries=entries_by_registry_type.get("cloud", []),
1954        oss_entries=entries_by_registry_type.get("oss", []),
1955    )
1956    composite_entry_count = len(composite_json["sources"]) + len(
1957        composite_json["destinations"]
1958    )
1959    result.composite_registry_entries = composite_entry_count
1960    if dry_run:
1961        _log_progress(
1962            "  [DRY RUN] Would write composite_registry.json (%d entries)",
1963            composite_entry_count,
1964        )
1965    else:
1966        composite_content = json.dumps(composite_json, indent=2, sort_keys=True) + "\n"
1967        path_prefix = f"{store.prefix}/" if store.prefix else ""
1968        _write_gcs_blob_with_custom_ttl(
1969            bucket_name=store.bucket,
1970            blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/composite_registry.json",
1971            content=composite_content,
1972            cache_control=_REGISTRY_INDEX_CACHE_CONTROL,
1973        )
1974        _log_progress(
1975            "  Wrote composite_registry.json (%d entries)",
1976            composite_entry_count,
1977        )
1978
1979    # --- Step 12: Per-connector version indexes (parallel) ---
1980    _log_progress(
1981        "Step 12: Writing per-connector version indexes (max_workers=%d)...",
1982        _COMPILE_WRITE_MAX_WORKERS,
1983    )
1984    base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte"
1985    sorted_connectors = sorted(connector_versions)
1986
1987    def _write_one_version_index(connector: str) -> None:
1988        """Build and write a single connector's versions.json."""
1989        versions = connector_versions[connector]
1990        latest_v = latest_versions.get(connector)
1991        rc_v = rc_versions.get(connector)
1992        index = _build_version_index(
1993            fs,
1994            store=store,
1995            connector=connector,
1996            versions=versions,
1997            yanked=yanked,
1998            latest_version=latest_v,
1999            rc_version=rc_v,
2000        )
2001        index_path = f"{base}/{connector}/versions.json"
2002        if dry_run:
2003            _log_progress(
2004                "  [DRY RUN] Would write %s/versions.json (%d versions)",
2005                connector,
2006                len(versions),
2007            )
2008        else:
2009            content = json.dumps(index, indent=2, sort_keys=True) + "\n"
2010            with fs.open(index_path, "w") as f:
2011                f.write(content)
2012
2013    with ThreadPoolExecutor(max_workers=_COMPILE_WRITE_MAX_WORKERS) as pool:
2014        futures = {
2015            pool.submit(_write_one_version_index, c): c for c in sorted_connectors
2016        }
2017        for i, future in enumerate(as_completed(futures), 1):
2018            connector = futures[future]
2019            try:
2020                future.result()
2021                result.version_indexes_written += 1
2022            except Exception as exc:
2023                error_msg = f"Failed to write versions.json for {connector}: {exc}"
2024                logger.error(error_msg)
2025                result.errors.append(error_msg)
2026            if i % 100 == 0:
2027                _log_progress(
2028                    "  Wrote %d / %d version indexes...", i, len(sorted_connectors)
2029                )
2030
2031    # --- Step 13: Specs secrets mask (optional) ---
2032    if with_secrets_mask:
2033        _log_progress("Step 13: Generating specs secrets mask...")
2034        # Reuse entries collected during Step 10 to avoid redundant GCS reads.
2035        secret_names = _extract_secret_property_names(all_registry_entries)
2036        sorted_names = sorted(secret_names)
2037        result.specs_secrets_mask_properties = len(sorted_names)
2038        mask_content = yaml.dump({"properties": sorted_names}, default_flow_style=False)
2039        mask_path = (
2040            f"{store.bucket_root}/{_REGISTRIES_PREFIX}/{_SPECS_SECRETS_MASK_FILENAME}"
2041        )
2042
2043        _log_progress(
2044            "  Found %d secret properties: %s",
2045            len(sorted_names),
2046            ", ".join(sorted_names),
2047        )
2048
2049        if dry_run:
2050            _log_progress(
2051                "  [DRY RUN] Would write %s",
2052                _SPECS_SECRETS_MASK_FILENAME,
2053            )
2054        else:
2055            with fs.open(mask_path, "w") as f:
2056                f.write(mask_content)
2057            _log_progress(
2058                "  Wrote %s",
2059                _SPECS_SECRETS_MASK_FILENAME,
2060            )
2061
2062    _log_progress(result.summary())
2063    return result

Compile the registry: sync latest/ dirs and write index files.

Steps:
  1. Glob for all metadata.yaml to discover (connector, version) pairs.
  2. Glob for active marker files.
  3. Compute the latest GA semver per connector.
  4. Compute active release candidates from versioned markers.
  5. Glob for version=* markers in latest/ dirs for a fast check.
  6. Delete stale latest/ dirs and recursively copy the versioned dir.
  7. Synthesize missing registry entries from pinned latest overrides.
  8. (Optional) Legacy migration: delete disabled registry entries.
  9. (Optional) Read latest connector metrics.
  10. Write global registry JSONs.
  11. Write composite registry JSON.
  12. Write per-connector versions.json.
  13. (Optional) Regenerate specs_secrets_mask.yaml.
Arguments:
  • store: Registry store (bucket + optional prefix).
  • connector_name: If provided, only resync latest/ directories for these connectors (steps 5-6). Index rebuilds (steps 9-12) always operate on the full set of connectors so that global registry files remain complete.
  • dry_run: If True, report what would be done without writing.
  • with_secrets_mask: If True, regenerate specs_secrets_mask.yaml.
  • with_legacy_migration: If set, run the named migration step. Currently supported: "v1" — delete {registry_type}.json files for connectors whose registryOverrides.{registry}.enabled is false.
  • with_metrics: If True, inject latest connector metrics from the analytics JSONL export into generated.metrics.
  • force: If True, resync all connectors' latest/ directories even if the existing version marker matches the computed latest version. This is useful when metadata content changes without a version bump.
Returns:

A CompileResult describing what was done.

def find_unpublished_connectors( repo_path: str | pathlib.Path, bucket_name: str, connector_names: list[str] | None = None) -> AuditResult:
 90def find_unpublished_connectors(
 91    repo_path: str | Path,
 92    bucket_name: str,
 93    connector_names: list[str] | None = None,
 94) -> AuditResult:
 95    """Find connectors whose local version is not published on GCS.
 96
 97    For each connector in the local checkout, reads `dockerImageTag` from
 98    `metadata.yaml` and checks whether `metadata/<docker-repo>/<version>/metadata.yaml`
 99    exists in the GCS bucket.  Connectors that are archived, disabled on all
100    registries, or have RC versions are skipped.
101
102    Args:
103        repo_path: Path to the Airbyte monorepo checkout.
104        bucket_name: GCS bucket name to check against.
105        connector_names: Optional list of connector names to check.
106            If `None`, discovers all connectors in the repo.
107
108    Returns:
109        An `AuditResult` containing unpublished connectors and metadata.
110    """
111    repo_path = Path(repo_path)
112    connectors_dir = repo_path / CONNECTOR_PATH_PREFIX
113
114    if not connectors_dir.exists():
115        raise ValueError(f"Connectors directory not found: {connectors_dir}")
116
117    # Discover connector names if not provided
118    if connector_names is None:
119        connector_names = sorted(
120            d.name
121            for d in connectors_dir.iterdir()
122            if d.is_dir() and (d / METADATA_FILE_NAME).exists()
123        )
124
125    result = AuditResult()
126
127    # Collect connectors and their versions first, then batch-check GCS
128    to_check: list[tuple[str, str]] = []  # (connector_name, version)
129
130    for name in connector_names:
131        metadata_path = connectors_dir / name / METADATA_FILE_NAME
132        metadata = _read_local_metadata(metadata_path)
133        if metadata is None:
134            result.errors.append(f"{name}: metadata.yaml not found or unreadable")
135            continue
136
137        if _is_archived(metadata):
138            result.skipped_archived.append(name)
139            continue
140
141        if _is_disabled_on_all_registries(metadata):
142            result.skipped_disabled.append(name)
143            continue
144
145        data = metadata.get("data", {})
146        version = data.get("dockerImageTag")
147        if not version:
148            result.errors.append(f"{name}: no dockerImageTag in metadata")
149            continue
150
151        if _is_rc_version(version):
152            result.skipped_rc.append(name)
153            continue
154
155        to_check.append((name, version))
156
157    if not to_check:
158        result.checked_count = 0
159        return result
160
161    # Check GCS for each connector version
162    storage_client = get_gcs_storage_client()
163    bucket = storage_client.bucket(bucket_name)
164
165    for name, version in to_check:
166        result.checked_count += 1
167        blob_path = f"{METADATA_FOLDER}/airbyte/{name}/{version}/{METADATA_FILE_NAME}"
168        blob = bucket.blob(blob_path)
169
170        try:
171            exists = blob.exists()
172        except Exception as e:
173            result.errors.append(f"{name}: GCS check failed: {e}")
174            continue
175
176        if not exists:
177            logger.info(
178                "Unpublished: %s version %s (checked %s)",
179                name,
180                version,
181                blob_path,
182            )
183            result.unpublished.append(
184                UnpublishedConnector(connector_name=name, local_version=version)
185            )
186
187    logger.info(
188        "Audit complete: %d checked, %d unpublished, %d archived-skipped, %d disabled-skipped, %d rc-skipped",
189        result.checked_count,
190        len(result.unpublished),
191        len(result.skipped_archived),
192        len(result.skipped_disabled),
193        len(result.skipped_rc),
194    )
195
196    return result

Find connectors whose local version is not published on GCS.

For each connector in the local checkout, reads dockerImageTag from metadata.yaml and checks whether metadata/<docker-repo>/<version>/metadata.yaml exists in the GCS bucket. Connectors that are archived, disabled on all registries, or have RC versions are skipped.

Arguments:
  • repo_path: Path to the Airbyte monorepo checkout.
  • bucket_name: GCS bucket name to check against.
  • connector_names: Optional list of connector names to check. If None, discovers all connectors in the repo.
Returns:

An AuditResult containing unpublished connectors and metadata.

def generate_version_artifacts( metadata_file: pathlib.Path, docker_image: str, output_dir: pathlib.Path | None = None, repo_root: pathlib.Path | None = None, dry_run: bool = False, with_validate: bool = True, with_dependency_dump: bool = True, with_sbom: bool = True) -> GenerateResult:
644def generate_version_artifacts(
645    metadata_file: Path,
646    docker_image: str,
647    output_dir: Path | None = None,
648    repo_root: Path | None = None,
649    dry_run: bool = False,
650    with_validate: bool = True,
651    with_dependency_dump: bool = True,
652    with_sbom: bool = True,
653) -> GenerateResult:
654    """Generate all version artifacts for a connector release.
655
656    Artifacts are enriched with git commit info, SBOM URL, and (when applicable)
657    components SHA before writing.  Validation is run after generation by default.
658
659    Args:
660        metadata_file: Path to the connector's `metadata.yaml`.
661        docker_image: Docker image to run spec against (e.g. `airbyte/source-faker:6.2.38`).
662        output_dir: Directory to write artifacts to.  If `None`, a temp directory is created.
663        repo_root: Root of the Airbyte repo checkout (for resolving `doc.md`).
664            If `None`, inferred by walking up from `metadata_file`.
665        dry_run: If `True`, report what would be generated without writing or running docker.
666        with_validate: If `True` (default), run metadata validators after generation.
667            Pass `False` (`--no-validate`) to skip.
668        with_dependency_dump: If `True` (default), generate `dependencies.json`
669            for Python connectors.  Pass `False` (`--no-dependency-dump`) to skip.
670        with_sbom: If `True` (default), generate `spdx.json` (SBOM) for
671            connectors.  Pass `False` (`--no-sbom`) to skip.
672
673    Returns:
674        A `GenerateResult` describing what was produced.
675    """
676    # --- Load metadata ---
677    if not metadata_file.exists():
678        raise FileNotFoundError(f"Metadata file not found: {metadata_file}")
679
680    raw_metadata: dict[str, Any] = yaml.safe_load(metadata_file.read_text())
681    metadata_data: dict[str, Any] = raw_metadata.get("data", {})
682
683    connector_name = metadata_data.get("dockerRepository", "unknown").replace(
684        "airbyte/", ""
685    )
686    version = metadata_data.get("dockerImageTag", "unknown")
687
688    # --- Resolve output directory ---
689    if output_dir is None:
690        output_dir = Path(
691            tempfile.mkdtemp(prefix=f"connector-artifacts-{connector_name}-{version}-")
692        )
693    output_dir.mkdir(parents=True, exist_ok=True)
694
695    result = GenerateResult(
696        connector_name=connector_name,
697        version=version,
698        docker_image=docker_image,
699        output_dir=str(output_dir),
700        dry_run=dry_run,
701    )
702
703    if dry_run:
704        logger.info("[DRY RUN] Would generate artifacts to %s", output_dir)
705        result.artifacts_written = [
706            "metadata.yaml",
707            "icon.svg",
708            "doc.md",
709            "cloud.json",
710            "oss.json",
711            "manifest.yaml (if present)",
712            "components.zip (if components.py present)",
713            "components.zip.sha256 (if components.py present)",
714            f"version={version}",
715        ]
716        if with_sbom:
717            result.artifacts_written.append(SBOM_FILE_NAME)
718        if with_dependency_dump:
719            result.artifacts_written.append("dependencies.json (if Python connector)")
720        return result
721
722    # --- Prepare metadata output ---
723    metadata_out = output_dir / "metadata.yaml"
724    result.artifacts_written.append("metadata.yaml")
725
726    # --- Enrich metadata with git info *before* building registry entries so
727    #     that `generated.git` propagates into `cloud.json` / `oss.json`. ---
728    raw_metadata = _enrich_metadata_git_info(raw_metadata, metadata_file)
729
730    # --- Generate SBOM from the connector Docker image ---
731    sbom_generated = False
732    if not with_sbom:
733        logger.info("SBOM generation disabled via --no-sbom.")
734    else:
735        try:
736            sbom_path = generate_sbom(docker_image, output_dir)
737        except RuntimeError as exc:
738            logger.warning("SBOM generation failed (non-fatal): %s", exc)
739        except (FileNotFoundError, subprocess.TimeoutExpired):
740            logger.warning("Docker not available or SBOM generation timed out.")
741        else:
742            result.artifacts_written.append(SBOM_FILE_NAME)
743            sbom_generated = True
744            logger.info("Generated SBOM: %s", sbom_path)
745
746    # --- Enrich metadata with SBOM URL ---
747    raw_metadata = _enrich_metadata_sbom_url(
748        raw_metadata, sbom_generated=sbom_generated
749    )
750
751    # --- Run docker spec for cloud and oss ---
752    specs: dict[str, dict[str, Any]] = {}
753    for mode in VALID_REGISTRIES:
754        try:
755            specs[mode] = _run_docker_spec(docker_image, mode)
756            logger.info("Got %s spec from docker image %s", mode, docker_image)
757        except RuntimeError as exc:
758            error_msg = f"Failed to get {mode} spec: {exc}"
759            logger.error(error_msg)
760            result.errors.append(error_msg)
761
762    # --- Generate dependencies.json for Python connectors ---
763    # This must happen *before* building registry entries so that the
764    # local dependencies data can be used for packageInfo without a GCS
765    # round-trip.
766    local_dependencies: dict[str, Any] | None = None
767    if not with_dependency_dump:
768        logger.info("Dependency generation disabled via --no-dependency-dump.")
769    elif _is_python_connector(metadata_data):
770        logger.info("Python connector detected — generating dependencies.json")
771        local_dependencies = generate_python_dependencies_file(
772            metadata_data=metadata_data,
773            docker_image=docker_image,
774            output_dir=output_dir,
775        )
776        if local_dependencies is not None:
777            result.artifacts_written.append(CONNECTOR_DEPENDENCY_FILE_NAME)
778    else:
779        logger.info(
780            "Non-Python connector (%s) — skipping dependencies.json generation.",
781            connector_name,
782        )
783
784    # --- Generate registry entries (cloud.json, oss.json) ---
785    for registry_type in VALID_REGISTRIES:
786        if not is_registry_enabled(metadata_data, registry_type):
787            logger.info(
788                "Registry type %s is not enabled for %s, skipping %s.json generation.",
789                registry_type,
790                connector_name,
791                registry_type,
792            )
793            continue
794
795        spec = specs.get(registry_type)
796        if spec is None:
797            error_msg = (
798                f"Cannot generate {registry_type}.json: no spec available "
799                f"(docker spec for {registry_type} failed or was not run)."
800            )
801            result.errors.append(error_msg)
802            continue
803
804        registry_entry = _build_registry_entry(
805            metadata_data,
806            registry_type,
807            spec,
808            local_dependencies=local_dependencies,
809        )
810
811        out_path = output_dir / f"{registry_type}.json"
812        out_path.write_text(
813            json.dumps(registry_entry, indent=2, sort_keys=True, default=_json_serial)
814            + "\n"
815        )
816        result.artifacts_written.append(f"{registry_type}.json")
817        logger.info("Wrote %s", out_path)
818
819    # --- Copy icon.svg (sibling of metadata.yaml in the connector directory) ---
820    icon_source = metadata_file.parent / "icon.svg"
821    if icon_source.is_file():
822        icon_out = output_dir / "icon.svg"
823        shutil.copy2(icon_source, icon_out)
824        result.artifacts_written.append("icon.svg")
825        logger.info("Wrote %s", icon_out)
826    else:
827        logger.warning("No icon.svg found at %s.", icon_source)
828        result.errors.append("Icon file is missing.")
829
830    # --- Copy doc.md (derived from documentationUrl in metadata) ---
831    if repo_root is None:
832        # Infer repo root by walking up from metadata_file looking for .git
833        # Note: .git can be a directory (normal clone) or a file (git worktree)
834        # Resolve to absolute path first so the walk-up works with relative paths.
835        candidate = metadata_file.resolve().parent
836        while candidate != candidate.parent:
837            git_indicator = candidate / ".git"
838            if git_indicator.is_dir() or git_indicator.is_file():
839                repo_root = candidate
840                break
841            candidate = candidate.parent
842
843    if repo_root is not None:
844        doc_source = _resolve_doc_path(metadata_data, repo_root)
845        if doc_source is not None and doc_source.is_file():
846            doc_out = output_dir / DOC_FILE_NAME
847            shutil.copy2(doc_source, doc_out)
848            result.artifacts_written.append(DOC_FILE_NAME)
849            logger.info("Wrote %s (from %s)", doc_out, doc_source)
850        else:
851            error_msg = (
852                f"Documentation file not found: {doc_source}. "
853                f"Derived from documentationUrl in metadata."
854            )
855            logger.error(error_msg)
856            result.errors.append(error_msg)
857    else:
858        error_msg = "Cannot resolve doc.md: repo root not found."
859        logger.error(error_msg)
860        result.errors.append(error_msg)
861
862    # --- Copy manifest.yaml (from connector root, if present) ---
863    connector_dir = metadata_file.parent
864    manifest_source = connector_dir / MANIFEST_FILE_NAME
865    components_sha256: str | None = None
866    if manifest_source.is_file():
867        manifest_out = output_dir / MANIFEST_FILE_NAME
868        shutil.copy2(manifest_source, manifest_out)
869        result.artifacts_written.append(MANIFEST_FILE_NAME)
870        logger.info("Wrote %s", manifest_out)
871
872        # --- Generate components.zip if components.py exists ---
873        components_source = connector_dir / COMPONENTS_PY_FILE_NAME
874        if components_source.is_file():
875            zip_path, sha256_path = _create_components_zip(
876                manifest_path=manifest_source,
877                components_path=components_source,
878                output_dir=output_dir,
879            )
880            result.artifacts_written.append(COMPONENTS_ZIP_FILE_NAME)
881            result.artifacts_written.append(COMPONENTS_ZIP_SHA256_FILE_NAME)
882            logger.info("Wrote %s and %s", zip_path, sha256_path)
883            # Read back the SHA256 for metadata enrichment
884            components_sha256 = sha256_path.read_text().strip()
885    else:
886        logger.info(
887            "No manifest.yaml at %s — skipping manifest artifacts.", manifest_source
888        )
889
890    # --- Enrich metadata with components SHA (after zip creation) ---
891    raw_metadata = _enrich_metadata_components_sha(raw_metadata, components_sha256)
892
893    # --- Write final enriched metadata.yaml ---
894    # Use sort_keys=True to match the legacy pipeline's alphabetical key ordering.
895    # After Registry 2.0 launches we are free to change the key ordering.
896    metadata_out.write_text(
897        yaml.dump(raw_metadata, default_flow_style=False, sort_keys=True)
898    )
899    logger.info("Wrote enriched %s", metadata_out)
900
901    # --- Write version marker file (version=<semver>) ---
902    # This zero-byte file is used by the compile step as a fast-check marker.
903    # Including it in the generated artifacts means `latest/` gets the marker
904    # for free via a recursive copy, removing the need for a separate write.
905    marker_file = output_dir / f"version={version}"
906    marker_file.write_bytes(b"")
907    result.artifacts_written.append(f"version={version}")
908    logger.info("Wrote version marker %s", marker_file)
909
910    # --- Validate metadata (after generation) ---
911    if with_validate:
912        logger.info("Running post-generation validation...")
913        doc_path: str | None = None
914        if repo_root is not None:
915            resolved = _resolve_doc_path(metadata_data, repo_root)
916            doc_path = str(resolved) if resolved else None
917        validation = validate_metadata(
918            metadata_data=metadata_data,
919            opts=ValidateOptions(docs_path=doc_path),
920        )
921        if not validation.passed:
922            for err in validation.errors:
923                logger.error("Validation error: %s", err)
924            result.validation_errors = validation.errors
925        else:
926            logger.info("Validation passed (%d validators).", validation.validators_run)
927
928    return result

Generate all version artifacts for a connector release.

Artifacts are enriched with git commit info, SBOM URL, and (when applicable) components SHA before writing. Validation is run after generation by default.

Arguments:
  • metadata_file: Path to the connector's metadata.yaml.
  • docker_image: Docker image to run spec against (e.g. airbyte/source-faker:6.2.38).
  • output_dir: Directory to write artifacts to. If None, a temp directory is created.
  • repo_root: Root of the Airbyte repo checkout (for resolving doc.md). If None, inferred by walking up from metadata_file.
  • dry_run: If True, report what would be generated without writing or running docker.
  • with_validate: If True (default), run metadata validators after generation. Pass False (--no-validate) to skip.
  • with_dependency_dump: If True (default), generate dependencies.json for Python connectors. Pass False (--no-dependency-dump) to skip.
  • with_sbom: If True (default), generate spdx.json (SBOM) for connectors. Pass False (--no-sbom) to skip.
Returns:

A GenerateResult describing what was produced.

def get_connector_metadata( repo_path: pathlib.Path, connector_name: str) -> ConnectorMetadata:
37def get_connector_metadata(repo_path: Path, connector_name: str) -> ConnectorMetadata:
38    """Read connector metadata from metadata.yaml.
39
40    Args:
41        repo_path: Path to the Airbyte monorepo.
42        connector_name: The connector technical name (e.g., 'source-github').
43
44    Returns:
45        ConnectorMetadata object with the connector's metadata.
46
47    Raises:
48        FileNotFoundError: If the connector directory or metadata file doesn't exist.
49    """
50    connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name
51    if not connector_dir.exists():
52        raise FileNotFoundError(f"Connector directory not found: {connector_dir}")
53
54    metadata_file = connector_dir / METADATA_FILE_NAME
55    if not metadata_file.exists():
56        raise FileNotFoundError(f"Metadata file not found: {metadata_file}")
57
58    with open(metadata_file) as f:
59        metadata = yaml.safe_load(f)
60
61    data = metadata.get("data", {})
62    return ConnectorMetadata(
63        name=connector_name,
64        docker_repository=data.get("dockerRepository", f"airbyte/{connector_name}"),
65        docker_image_tag=data.get("dockerImageTag", "unknown"),
66        support_level=data.get("supportLevel"),
67        definition_id=data.get("definitionId"),
68    )

Read connector metadata from metadata.yaml.

Arguments:
  • repo_path: Path to the Airbyte monorepo.
  • connector_name: The connector technical name (e.g., 'source-github').
Returns:

ConnectorMetadata object with the connector's metadata.

Raises:
  • FileNotFoundError: If the connector directory or metadata file doesn't exist.
def get_gcs_publish_path(connector_name: str, artifact_type: str, version: str = 'latest') -> str:
71def get_gcs_publish_path(
72    connector_name: str,
73    artifact_type: str,
74    version: str = LATEST_GCS_FOLDER_NAME,
75) -> str:
76    """Compute the GCS path for a connector artifact for publishing.
77
78    All connectors use the airbyte/{connector_name} convention.
79    """
80    artifact_files = {
81        "metadata": METADATA_FILE_NAME,
82        "spec": "spec.json",
83        "icon": "icon.svg",
84        "doc": "doc.md",
85    }
86
87    if artifact_type not in artifact_files:
88        raise ValueError(
89            f"Unknown artifact type: {artifact_type}. "
90            f"Valid types are: {', '.join(artifact_files.keys())}"
91        )
92
93    file_name = artifact_files[artifact_type]
94    return f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{file_name}"

Compute the GCS path for a connector artifact for publishing.

All connectors use the airbyte/{connector_name} convention.

def get_registry( store: RegistryStore) -> Registry:
200def get_registry(store: RegistryStore) -> Registry:
201    """Factory for obtaining the right store implementation."""
202
203    if store.store_type == StoreType.CORAL:
204        from airbyte_ops_mcp.registry.coral_registry_store import CoralRegistry
205
206        return CoralRegistry(store)
207
208    if store.store_type == StoreType.SONAR:
209        from airbyte_ops_mcp.registry.sonar_registry_store import SonarRegistry
210
211        return SonarRegistry(store)
212
213    # defensive: StoreType is an Enum, but keep this for readability
214    raise ValueError(f"Unknown store type: {store.store_type}")

Factory for obtaining the right store implementation.

def get_registry_entry( connector_name: str, bucket_name: str, version: str = 'latest') -> dict[str, typing.Any]:
43def get_registry_entry(
44    connector_name: str,
45    bucket_name: str,
46    version: str = LATEST_GCS_FOLDER_NAME,
47) -> dict[str, Any]:
48    """Get a connector's registry entry from GCS.
49
50    Reads metadata for a connector from the registry stored in GCS.
51
52    Args:
53        connector_name: The connector name (e.g., "source-faker", "destination-postgres")
54        bucket_name: Name of the GCS bucket containing the registry
55        version: Version folder name (e.g., "latest", "1.2.3")
56
57    Returns:
58        dict: The connector's metadata as a dictionary
59
60    Raises:
61        ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure
62        FileNotFoundError: If the connector metadata is not found in the registry
63        yaml.YAMLError: If the metadata file contains invalid YAML syntax
64    """
65    storage_client = get_gcs_storage_client()
66    bucket = storage_client.bucket(bucket_name)
67
68    # Construct the path to the metadata file
69    # Pattern: metadata/airbyte/{connector_name}/{version}/metadata.yaml
70    blob_path = (
71        f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{METADATA_FILE_NAME}"
72    )
73    blob = bucket.blob(blob_path)
74
75    logger.info(f"Reading registry entry for {connector_name} from {blob_path}")
76
77    # Read the file
78    content = safe_read_gcs_file(blob)
79    if content is None:
80        raise FileNotFoundError(
81            f"Connector metadata not found in registry: {connector_name}. "
82            f"Checked path: {blob_path}"
83        )
84
85    # Parse YAML
86    try:
87        metadata = yaml.safe_load(content)
88        if metadata is None or not isinstance(metadata, dict):
89            raise ValueError(f"Metadata file {blob_path} has an invalid structure")
90        return metadata
91    except yaml.YAMLError as e:
92        logger.error(
93            "Failed to parse metadata for %s from %s: %s",
94            connector_name,
95            blob_path,
96            e,
97        )
98        raise

Get a connector's registry entry from GCS.

Reads metadata for a connector from the registry stored in GCS.

Arguments:
  • connector_name: The connector name (e.g., "source-faker", "destination-postgres")
  • bucket_name: Name of the GCS bucket containing the registry
  • version: Version folder name (e.g., "latest", "1.2.3")
Returns:

dict: The connector's metadata as a dictionary

Raises:
  • ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure
  • FileNotFoundError: If the connector metadata is not found in the registry
  • yaml.YAMLError: If the metadata file contains invalid YAML syntax
def get_registry_spec( connector_name: str, bucket_name: str, version: str = 'latest') -> dict[str, typing.Any]:
101def get_registry_spec(
102    connector_name: str,
103    bucket_name: str,
104    version: str = LATEST_GCS_FOLDER_NAME,
105) -> dict[str, Any]:
106    """Get a connector's spec from GCS.
107
108    Reads the connector specification from the registry stored in GCS.
109
110    Args:
111        connector_name: The connector name (e.g., "source-faker", "destination-postgres")
112        bucket_name: Name of the GCS bucket containing the registry
113        version: Version folder name (e.g., "latest", "1.2.3")
114
115    Returns:
116        dict: The connector's spec as a dictionary
117
118    Raises:
119        ValueError: If GCS credentials are not configured, or if the spec is not a JSON object
120        FileNotFoundError: If the connector spec is not found in the registry
121        json.JSONDecodeError: If the spec file contains invalid JSON syntax
122    """
123    storage_client = get_gcs_storage_client()
124    bucket = storage_client.bucket(bucket_name)
125
126    # Construct the path to the spec file
127    # Pattern: metadata/airbyte/{connector_name}/{version}/spec.json
128    blob_path = f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{SPEC_FILE_NAME}"
129    blob = bucket.blob(blob_path)
130
131    logger.info(f"Reading spec for {connector_name} from {blob_path}")
132
133    # Read the file
134    content = safe_read_gcs_file(blob)
135    if content is None:
136        raise FileNotFoundError(
137            f"Connector spec not found in registry: {connector_name}. "
138            f"Checked path: {blob_path}"
139        )
140
141    # Parse JSON
142    try:
143        spec = json.loads(content)
144        if spec is None or not isinstance(spec, dict):
145            raise ValueError(
146                f"Spec file for {connector_name} at {blob_path} is not a JSON object"
147            )
148        return spec
149    except json.JSONDecodeError as e:
150        logger.error(
151            "Failed to parse spec for %s from %s: %s",
152            connector_name,
153            blob_path,
154            e,
155        )
156        raise

Get a connector's spec from GCS.

Reads the connector specification from the registry stored in GCS.

Arguments:
  • connector_name: The connector name (e.g., "source-faker", "destination-postgres")
  • bucket_name: Name of the GCS bucket containing the registry
  • version: Version folder name (e.g., "latest", "1.2.3")
Returns:

dict: The connector's spec as a dictionary

Raises:
  • ValueError: If GCS credentials are not configured, or if the spec is not a JSON object
  • FileNotFoundError: If the connector spec is not found in the registry
  • json.JSONDecodeError: If the spec file contains invalid JSON syntax
def list_connector_versions(connector_name: str, bucket_name: str) -> list[str]:
325def list_connector_versions(connector_name: str, bucket_name: str) -> list[str]:
326    """List all versions of a connector in the registry.
327
328    Scans the GCS bucket to find all versions of a specific connector.
329
330    Args:
331        connector_name: The connector name (e.g., "source-faker")
332        bucket_name: Name of the GCS bucket containing the registry
333
334    Returns:
335        list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate')
336
337    Raises:
338        ValueError: If GCS credentials are not configured
339    """
340    storage_client = get_gcs_storage_client()
341    bucket = storage_client.bucket(bucket_name)
342
343    # List all blobs matching the pattern: metadata/airbyte/{connector_name}/*/metadata.yaml
344    glob_pattern = f"{METADATA_FOLDER}/airbyte/{connector_name}/*/{METADATA_FILE_NAME}"
345    logger.info(f"Listing versions for {connector_name} with pattern: {glob_pattern}")
346
347    try:
348        blobs = bucket.list_blobs(match_glob=glob_pattern)
349    except Exception as e:
350        logger.error(f"Error listing blobs in bucket {bucket_name}: {e}")
351        raise
352
353    # Extract versions from blob paths
354    # Path format: metadata/airbyte/{connector-name}/{version}/metadata.yaml
355    versions: set[str] = set()
356    for blob in blobs:
357        path_parts = blob.name.split("/")
358        # Path should be: metadata / airbyte / connector-name / version / metadata.yaml
359        if len(path_parts) >= 5:
360            version = path_parts[3]
361            # Exclude special folders
362            if version not in ("latest", "release_candidate"):
363                versions.add(version)
364
365    return sorted(versions)

List all versions of a connector in the registry.

Scans the GCS bucket to find all versions of a specific connector.

Arguments:
  • connector_name: The connector name (e.g., "source-faker")
  • bucket_name: Name of the GCS bucket containing the registry
Returns:

list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate')

Raises:
  • ValueError: If GCS credentials are not configured
def list_registry_connectors(bucket_name: str) -> list[str]:
159def list_registry_connectors(bucket_name: str) -> list[str]:
160    """List all connectors in the registry.
161
162    Scans the GCS bucket to find all connectors that have metadata files.
163
164    Args:
165        bucket_name: Name of the GCS bucket containing the registry
166
167    Returns:
168        list[str]: Sorted list of connector names
169
170    Raises:
171        ValueError: If GCS credentials are not configured
172    """
173    storage_client = get_gcs_storage_client()
174    bucket = storage_client.bucket(bucket_name)
175
176    # List all blobs matching the pattern: metadata/airbyte/*/latest/metadata.yaml
177    glob_pattern = (
178        f"{METADATA_FOLDER}/airbyte/*/{LATEST_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}"
179    )
180    logger.info(f"Listing connectors with pattern: {glob_pattern}")
181
182    try:
183        blobs = bucket.list_blobs(match_glob=glob_pattern)
184    except Exception as e:
185        logger.error(f"Error listing blobs in bucket {bucket_name}: {e}")
186        raise
187
188    # Extract connector names from blob paths
189    # Path format: metadata/airbyte/{connector-name}/latest/metadata.yaml
190    connector_names: set[str] = set()
191    for blob in blobs:
192        path_parts = blob.name.split("/")
193        # Path should be: metadata / airbyte / connector-name / latest / metadata.yaml
194        if len(path_parts) >= 5:
195            connector_name = path_parts[2]
196            connector_names.add(connector_name)
197
198    return sorted(connector_names)

List all connectors in the registry.

Scans the GCS bucket to find all connectors that have metadata files.

Arguments:
  • bucket_name: Name of the GCS bucket containing the registry
Returns:

list[str]: Sorted list of connector names

Raises:
  • ValueError: If GCS credentials are not configured
def list_registry_connectors_filtered( bucket_name: str, *, support_level: SupportLevel | None = None, min_support_level: SupportLevel | None = None, connector_type: ConnectorType | None = None, language: ConnectorLanguage | None = None, prefix: str = '') -> list[str]:
201def list_registry_connectors_filtered(
202    bucket_name: str,
203    *,
204    support_level: SupportLevel | None = None,
205    min_support_level: SupportLevel | None = None,
206    connector_type: ConnectorType | None = None,
207    language: ConnectorLanguage | None = None,
208    prefix: str = "",
209) -> list[str]:
210    """List connectors from the compiled cloud registry index with filtering.
211
212    When any filter is applied, reads the compiled `cloud_registry.json` index
213    instead of globbing individual metadata blobs. This is significantly faster
214    because the index is a single JSON file containing all connector entries.
215
216    When no filters are applied, falls back to the existing glob-based search
217    which captures all connectors (including OSS-only connectors not in the
218    Cloud index).
219
220    Args:
221        bucket_name: Name of the GCS bucket containing the registry.
222        support_level: Exact support level to match (e.g., `SupportLevel.CERTIFIED`).
223        min_support_level: Minimum support level threshold. Returns connectors
224            at or above this level.
225        connector_type: Filter by connector type (`ConnectorType.SOURCE` or
226            `ConnectorType.DESTINATION`).
227        language: Filter by implementation language (e.g., `ConnectorLanguage.PYTHON`).
228        prefix: Optional bucket prefix (e.g., `"aj-test100"`).
229
230    Returns:
231        Sorted list of connector technical names (e.g., `"source-github"`).
232
233    Raises:
234        ValueError: If `support_level` and `min_support_level` are both provided.
235    """
236    has_filters = any([support_level, min_support_level, connector_type, language])
237
238    if not has_filters:
239        return list_registry_connectors(bucket_name=bucket_name)
240
241    if support_level and min_support_level:
242        raise ValueError(
243            "Cannot specify both `support_level` and `min_support_level`. "
244            "Use `support_level` for an exact match or `min_support_level` for a threshold."
245        )
246
247    entries = _read_cloud_registry_index(bucket_name=bucket_name, prefix=prefix)
248
249    # Apply support_level exact match
250    if support_level:
251        entries = [e for e in entries if e.get("supportLevel") == support_level]
252
253    # Apply min_support_level threshold
254    if min_support_level:
255        threshold = min_support_level.precedence
256        known_levels = {m.value for m in SupportLevel}
257        entries = [
258            e
259            for e in entries
260            if e.get("supportLevel")
261            and e["supportLevel"] in known_levels
262            and SupportLevel(e["supportLevel"]).precedence >= threshold
263        ]
264
265    # Apply connector_type filter
266    if connector_type == ConnectorType.SOURCE:
267        entries = [e for e in entries if "sourceDefinitionId" in e]
268    elif connector_type == ConnectorType.DESTINATION:
269        entries = [e for e in entries if "destinationDefinitionId" in e]
270
271    # Apply language filter
272    if language:
273        entries = [e for e in entries if e.get("language") == language]
274
275    # Extract connector names from dockerRepository (e.g., "airbyte/source-github" -> "source-github")
276    names: set[str] = set()
277    for entry in entries:
278        docker_repo = entry.get("dockerRepository", "")
279        if "/" in docker_repo:
280            names.add(docker_repo.split("/", 1)[1])
281        elif docker_repo:
282            names.add(docker_repo)
283
284    return sorted(names)

List connectors from the compiled cloud registry index with filtering.

When any filter is applied, reads the compiled cloud_registry.json index instead of globbing individual metadata blobs. This is significantly faster because the index is a single JSON file containing all connector entries.

When no filters are applied, falls back to the existing glob-based search which captures all connectors (including OSS-only connectors not in the Cloud index).

Arguments:
  • bucket_name: Name of the GCS bucket containing the registry.
  • support_level: Exact support level to match (e.g., SupportLevel.CERTIFIED).
  • min_support_level: Minimum support level threshold. Returns connectors at or above this level.
  • connector_type: Filter by connector type (ConnectorType.SOURCE or ConnectorType.DESTINATION).
  • language: Filter by implementation language (e.g., ConnectorLanguage.PYTHON).
  • prefix: Optional bucket prefix (e.g., "aj-test100").
Returns:

Sorted list of connector technical names (e.g., "source-github").

Raises:
  • ValueError: If support_level and min_support_level are both provided.
def publish_connector_metadata( connector_name: str, metadata: dict[str, typing.Any], bucket_name: str, version: str, update_latest: bool = True, dry_run: bool = False) -> MetadataPublishResult:
 97def publish_connector_metadata(
 98    connector_name: str,
 99    metadata: dict[str, Any],
100    bucket_name: str,
101    version: str,
102    update_latest: bool = True,
103    dry_run: bool = False,
104) -> MetadataPublishResult:
105    """Publish connector metadata to GCS.
106
107    Uploads the metadata to the registry bucket at a versioned path, and optionally
108    also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading
109    unchanged files.
110
111    Requires GCS_CREDENTIALS environment variable to be set.
112    """
113    if not isinstance(metadata, dict):
114        raise ValueError("Metadata must be a dictionary")
115
116    if "data" not in metadata:
117        raise ValueError("Metadata must contain 'data' field")
118
119    # Construct GCS paths using airbyte/{connector_name} convention
120    versioned_blob_path = get_gcs_publish_path(connector_name, "metadata", version)
121    latest_blob_path = get_gcs_publish_path(
122        connector_name, "metadata", LATEST_GCS_FOLDER_NAME
123    )
124
125    if dry_run:
126        message = f"[DRY RUN] Would upload metadata to gs://{bucket_name}/{versioned_blob_path}"
127        if update_latest:
128            message += f" and gs://{bucket_name}/{latest_blob_path}"
129        logger.info(message)
130        return MetadataPublishResult(
131            connector_name=connector_name,
132            version=version,
133            bucket_name=bucket_name,
134            versioned_path=versioned_blob_path,
135            latest_path=latest_blob_path if update_latest else None,
136            versioned_uploaded=False,
137            latest_uploaded=False,
138            status="dry-run",
139            message=message,
140        )
141
142    # Get GCS client and bucket
143    storage_client = get_gcs_storage_client()
144    bucket = storage_client.bucket(bucket_name)
145
146    # Write metadata to temp file
147    with tempfile.NamedTemporaryFile(
148        mode="w", suffix=".yaml", delete=False
149    ) as tmp_file:
150        yaml.dump(metadata, tmp_file)
151        tmp_path = Path(tmp_file.name)
152
153    try:
154        # Upload versioned file
155        versioned_uploaded, _ = upload_file_if_changed(
156            local_file_path=tmp_path,
157            bucket=bucket,
158            blob_path=versioned_blob_path,
159            disable_cache=True,
160        )
161
162        if versioned_uploaded:
163            logger.info(
164                f"Uploaded metadata for {connector_name} v{version} to {versioned_blob_path}"
165            )
166        else:
167            logger.info(
168                f"Versioned metadata for {connector_name} v{version} is already up to date"
169            )
170
171        # Optionally update latest pointer
172        latest_uploaded = False
173        if update_latest:
174            latest_uploaded, _ = upload_file_if_changed(
175                local_file_path=tmp_path,
176                bucket=bucket,
177                blob_path=latest_blob_path,
178                disable_cache=True,
179            )
180            if latest_uploaded:
181                logger.info(f"Updated latest pointer for {connector_name}")
182            else:
183                logger.info(
184                    f"Latest pointer for {connector_name} is already up to date"
185                )
186    finally:
187        # Clean up temp file even if upload fails
188        tmp_path.unlink(missing_ok=True)
189
190    # Determine status
191    if versioned_uploaded or latest_uploaded:
192        status = "success"
193        message = f"Published metadata for {connector_name} v{version}"
194        if versioned_uploaded:
195            message += f" to {versioned_blob_path}"
196        if latest_uploaded:
197            message += " and updated latest"
198    else:
199        status = "already-up-to-date"
200        message = f"Metadata for {connector_name} v{version} is already up to date"
201
202    return MetadataPublishResult(
203        connector_name=connector_name,
204        version=version,
205        bucket_name=bucket_name,
206        versioned_path=versioned_blob_path,
207        latest_path=latest_blob_path if update_latest else None,
208        versioned_uploaded=versioned_uploaded,
209        latest_uploaded=latest_uploaded,
210        status=status,
211        message=message,
212    )

Publish connector metadata to GCS.

Uploads the metadata to the registry bucket at a versioned path, and optionally also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading unchanged files.

Requires GCS_CREDENTIALS environment variable to be set.

def publish_version_artifacts( connector_name: str, version: str, artifacts_dir: pathlib.Path, store: RegistryStore, dry_run: bool = False, with_validate: bool = True) -> PublishArtifactsResult:
144def publish_version_artifacts(
145    connector_name: str,
146    version: str,
147    artifacts_dir: Path,
148    store: RegistryStore,
149    dry_run: bool = False,
150    with_validate: bool = True,
151) -> PublishArtifactsResult:
152    """Publish locally generated artifacts to a GCS registry bucket.
153
154    Uses `gcsfs.GCSFileSystem` to upload the local *artifacts_dir* to the
155    versioned path inside the target GCS bucket.
156
157    The target GCS path is:
158        `gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/`
159
160    Before uploading, this function validates that the `connector_name`
161    (derived from the connector directory) matches the `dockerRepository`
162    declared in `metadata.yaml`.  A mismatch would cause the registry
163    compile step to see duplicate definition-ID entries and fail.
164
165    Args:
166        connector_name: Connector name (e.g. `source-faker`).
167        version: Version string (e.g. `6.2.38`).
168        artifacts_dir: Local directory containing artifacts from `generate`.
169        store: Parsed store target containing bucket, prefix, and stage info.
170        dry_run: If `True`, report what would be uploaded without writing.
171        with_validate: If `True` (default), validate metadata before uploading.
172            Pass `False` (`--no-validate`) to skip.
173
174    Returns:
175        A `PublishArtifactsResult` describing what was published.
176
177    Raises:
178        ValueError: If the connector directory name does not match
179            `dockerRepository` in the generated metadata.
180    """
181    if not artifacts_dir.is_dir():
182        raise FileNotFoundError(f"Artifacts directory not found: {artifacts_dir}")
183
184    # Fail fast if the connector directory name doesn't match dockerRepository.
185    # A mismatch would publish artifacts under the wrong GCS path and corrupt
186    # the registry (duplicate definition-IDs under different directory names).
187    mismatch_error = _check_connector_name_matches_docker_repo(
188        connector_name, artifacts_dir
189    )
190    if mismatch_error:
191        raise ValueError(mismatch_error)
192
193    # Build the GCS destination path
194    bucket_name = store.bucket
195    prefix = store.prefix
196    blob_root = versioned_blob_root(
197        connector_name=connector_name, version=version, store=store
198    )
199    versioned_dest = f"gcs://{bucket_name}/{blob_root}"
200
201    target_label = f"{bucket_name}/{prefix}" if prefix else bucket_name
202    should_publish_rollout_marker = _metadata_enables_progressive_rollout(artifacts_dir)
203    result = PublishArtifactsResult(
204        connector_name=connector_name,
205        version=version,
206        target=target_label,
207        gcs_destination=versioned_dest,
208        dry_run=dry_run,
209    )
210
211    # --- Pre-publish validation ---
212    if with_validate:
213        metadata_file = artifacts_dir / "metadata.yaml"
214        if metadata_file.is_file():
215            raw_metadata = yaml.safe_load(metadata_file.read_text())
216            metadata_data = (raw_metadata or {}).get("data", {})
217            validation = validate_metadata(metadata_data=metadata_data)
218            if not validation.passed:
219                for err in validation.errors:
220                    logger.error("Pre-publish validation error: %s", err)
221                result.validation_errors = validation.errors
222                return result
223            logger.info(
224                "Pre-publish validation passed (%d validators).",
225                validation.validators_run,
226            )
227        else:
228            logger.warning("No metadata.yaml in artifacts dir; skipping validation.")
229
230    # Enumerate local files
231    local_files = sorted(f for f in artifacts_dir.rglob("*") if f.is_file())
232    if not local_files:
233        result.errors.append(f"No files found in {artifacts_dir}.")
234        return result
235
236    _log_progress(
237        "Publishing %d artifacts for %s@%s%s",
238        len(local_files),
239        connector_name,
240        version,
241        versioned_dest,
242    )
243
244    # Build references used by both dry-run and real upload paths
245    deps_file = artifacts_dir / CONNECTOR_DEPENDENCY_FILE_NAME
246    has_deps = deps_file.is_file()
247    deps_gcs_key = dependencies_blob_path(
248        connector_name=connector_name, version=version, store=store
249    )
250
251    sbom_file = artifacts_dir / SBOM_FILE_NAME
252    has_sbom = sbom_file.is_file()
253    rollout_marker_path = f"{blob_root}/{PROGRESSIVE_ROLLOUT_MARKER_FILE}"
254
255    if dry_run:
256        for f in local_files:
257            rel = f.relative_to(artifacts_dir)
258            result.files_uploaded.append(str(rel))
259            _log_progress("  [DRY RUN] would upload: %s", rel)
260        # Report the dual-load of dependencies.json to connector_dependencies/
261        if has_deps:
262            result.files_uploaded.append(deps_gcs_key)
263            _log_progress(
264                "  [DRY RUN] would also dual-load: %s → gs://%s/%s",
265                CONNECTOR_DEPENDENCY_FILE_NAME,
266                bucket_name,
267                deps_gcs_key,
268            )
269        # Report the separate sbom/ upload
270        if has_sbom:
271            sbom_gcs_key = sbom_blob_path(
272                connector_name=connector_name,
273                version=version,
274                store=store,
275            )
276            result.files_uploaded.append(sbom_gcs_key)
277            _log_progress(
278                "  [DRY RUN] would also upload: %s → gs://%s/%s",
279                SBOM_FILE_NAME,
280                bucket_name,
281                sbom_gcs_key,
282            )
283        if should_publish_rollout_marker:
284            result.files_uploaded.append(PROGRESSIVE_ROLLOUT_MARKER_FILE)
285            _log_progress(
286                "  [DRY RUN] would write active marker: gs://%s/%s",
287                bucket_name,
288                rollout_marker_path,
289            )
290        return result
291
292    # Authenticate
293    token = get_gcs_credentials_token()
294    fs = gcsfs.GCSFileSystem(token=token)
295
296    # Strip gcs:// prefix for gcsfs path
297    dest_path = versioned_dest.replace("gcs://", "")
298
299    # Upload all files to the versioned path
300    _log_progress("Uploading to: %s", versioned_dest)
301    for f in local_files:
302        rel = f.relative_to(artifacts_dir)
303        remote_path = f"{dest_path}/{rel}"
304        fs.put(str(f), remote_path)
305        result.files_uploaded.append(str(rel))
306        _log_progress("  Uploaded: %s", rel)
307
308    # Delete remote files that don't exist locally (sync semantics)
309    try:
310        remote_files = fs.ls(dest_path, detail=False)
311        local_rel_paths = {str(f.relative_to(artifacts_dir)) for f in local_files}
312        for remote_file in remote_files:
313            # Skip the directory entry itself if it appears in the listing
314            if remote_file == dest_path:
315                continue
316            # Derive the remote relative path, matching upload semantics
317            if remote_file.startswith(dest_path + "/"):
318                remote_rel = remote_file[len(dest_path) + 1 :]
319            else:
320                remote_rel = remote_file.split("/")[-1]
321            if is_registry_state_marker_file(Path(remote_rel).name):
322                continue
323            if remote_rel not in local_rel_paths:
324                fs.rm(remote_file)
325                _log_progress("  Deleted stale remote file: %s", remote_rel)
326    except FileNotFoundError:
327        pass  # Destination doesn't exist yet, nothing to clean
328
329    _log_progress("Uploaded %d files to %s", len(local_files), versioned_dest)
330
331    if should_publish_rollout_marker:
332        marker_remote = f"{bucket_name}/{rollout_marker_path}"
333        with fs.open(marker_remote, "w") as marker_file:
334            marker_file.write(_progressive_rollout_marker_content())
335        result.files_uploaded.append(PROGRESSIVE_ROLLOUT_MARKER_FILE)
336        _log_progress("Wrote active marker: gs://%s", marker_remote)
337
338    # --- Dual-load dependencies.json to the connector_dependencies/ path ---
339    if not has_deps:
340        logger.debug(
341            "No %s in artifacts dir — skipping dual-load.",
342            CONNECTOR_DEPENDENCY_FILE_NAME,
343        )
344    else:
345        deps_remote = f"{bucket_name}/{deps_gcs_key}"
346        _log_progress(
347            "Dual-loading %s to gs://%s",
348            CONNECTOR_DEPENDENCY_FILE_NAME,
349            deps_remote,
350        )
351        fs.put(str(deps_file), deps_remote)
352        result.files_uploaded.append(deps_gcs_key)
353        _log_progress("  Uploaded %s (dual-load)", CONNECTOR_DEPENDENCY_FILE_NAME)
354
355    # --- Upload SBOM to the dedicated sbom/ path in GCS ---
356    if not has_sbom:
357        logger.debug(
358            "No %s in artifacts dir — skipping SBOM dual-load.",
359            SBOM_FILE_NAME,
360        )
361    else:
362        sbom_gcs_uri = upload_sbom(
363            sbom_path=sbom_file,
364            connector_name=connector_name,
365            version=version,
366            store=store,
367            dry_run=dry_run,
368        )
369        result.files_uploaded.append(
370            sbom_blob_path(
371                connector_name=connector_name,
372                version=version,
373                store=store,
374            ),
375        )
376        _log_progress("Uploaded SBOM to dedicated path: %s", sbom_gcs_uri)
377
378    return result

Publish locally generated artifacts to a GCS registry bucket.

Uses gcsfs.GCSFileSystem to upload the local artifacts_dir to the versioned path inside the target GCS bucket.

The target GCS path is:

gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/

Before uploading, this function validates that the connector_name (derived from the connector directory) matches the dockerRepository declared in metadata.yaml. A mismatch would cause the registry compile step to see duplicate definition-ID entries and fail.

Arguments:
  • connector_name: Connector name (e.g. source-faker).
  • version: Version string (e.g. 6.2.38).
  • artifacts_dir: Local directory containing artifacts from generate.
  • store: Parsed store target containing bucket, prefix, and stage info.
  • dry_run: If True, report what would be uploaded without writing.
  • with_validate: If True (default), validate metadata before uploading. Pass False (--no-validate) to skip.
Returns:

A PublishArtifactsResult describing what was published.

Raises:
  • ValueError: If the connector directory name does not match dockerRepository in the generated metadata.
def purge_latest_dirs( *, store: RegistryStore, connector_name: list[str] | None = None, dry_run: bool = False) -> PurgeLatestResult:
1468def purge_latest_dirs(
1469    *,
1470    store: RegistryStore,
1471    connector_name: list[str] | None = None,
1472    dry_run: bool = False,
1473) -> PurgeLatestResult:
1474    """Delete all `latest/` directories from the registry store.
1475
1476    Discovers connector directories via glob, then deletes each
1477    `latest/` subdirectory in parallel using a thread pool.
1478
1479    Args:
1480        store: Registry store (bucket + optional prefix).
1481        connector_name: If provided, only purge these connectors.
1482        dry_run: If True, report what would be done without deleting.
1483
1484    Returns:
1485        A `PurgeLatestResult` describing what was done.
1486    """
1487    result = PurgeLatestResult(target=store.bucket_root, dry_run=dry_run)
1488
1489    token = get_gcs_credentials_token()
1490    fs = gcsfs.GCSFileSystem(token=token)
1491
1492    base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte"
1493
1494    # Discover latest/ dirs by listing connector directories that contain
1495    # a `latest/` subdirectory.
1496    _log_progress("Discovering latest/ directories...")
1497    base_with_slash = f"{base}/"
1498    if connector_name:
1499        # Check each requested connector for a latest/ dir
1500        seen: set[str] = set()
1501        connectors_with_latest: list[str] = []
1502        for name in connector_name:
1503            if name in seen:
1504                continue
1505            latest_path = f"{base}/{name}/latest"
1506            if fs.exists(latest_path):
1507                connectors_with_latest.append(name)
1508                seen.add(name)
1509    else:
1510        # Glob for all connectors, then filter to those with latest/
1511        all_connector_dirs = fs.glob(f"{base}/*/latest")
1512        seen = set()
1513        connectors_with_latest = []
1514        for path in all_connector_dirs:
1515            # Strip the known base prefix and take the first component
1516            if not path.startswith(base_with_slash):
1517                logger.warning("Could not parse latest path: %s", path)
1518                continue
1519            relative = path[len(base_with_slash) :]
1520            connector = relative.split("/")[0]
1521            if connector and connector not in seen:
1522                connectors_with_latest.append(connector)
1523                seen.add(connector)
1524
1525    result.connectors_found = len(connectors_with_latest)
1526    _log_progress(
1527        "Found %d connectors with latest/ directories",
1528        result.connectors_found,
1529    )
1530
1531    if not connectors_with_latest:
1532        _log_progress("Nothing to purge.")
1533        _log_progress(result.summary())
1534        return result
1535
1536    if dry_run:
1537        for connector in sorted(connectors_with_latest):
1538            _log_progress("  [DRY RUN] Would delete %s/latest/", connector)
1539        result.latest_dirs_deleted = len(connectors_with_latest)
1540        _log_progress(result.summary())
1541        return result
1542
1543    # Delete latest/ dirs in parallel using the shared helper.
1544    def _delete_one(connector: str) -> str | None:
1545        """Delete a single connector's latest/ dir. Returns error string or None."""
1546        try:
1547            _delete_latest_dir(
1548                fs,
1549                store=store,
1550                connector=connector,
1551            )
1552            return None
1553        except Exception as exc:
1554            return f"Failed to delete latest/ for {connector}: {exc}"
1555
1556    _log_progress(
1557        "Deleting %d latest/ directories (max_workers=%d)...",
1558        len(connectors_with_latest),
1559        _PURGE_LATEST_MAX_WORKERS,
1560    )
1561
1562    with ThreadPoolExecutor(max_workers=_PURGE_LATEST_MAX_WORKERS) as pool:
1563        futures = {
1564            pool.submit(_delete_one, c): c for c in sorted(connectors_with_latest)
1565        }
1566        for i, future in enumerate(as_completed(futures), 1):
1567            connector = futures[future]
1568            error = future.result()
1569            if error:
1570                logger.error(error)
1571                result.errors.append(error)
1572            else:
1573                result.latest_dirs_deleted += 1
1574            if i % 100 == 0:
1575                _log_progress("  Deleted %d / %d...", i, len(connectors_with_latest))
1576
1577    _log_progress(result.summary())
1578    return result

Delete all latest/ directories from the registry store.

Discovers connector directories via glob, then deletes each latest/ subdirectory in parallel using a thread pool.

Arguments:
  • store: Registry store (bucket + optional prefix).
  • connector_name: If provided, only purge these connectors.
  • dry_run: If True, report what would be done without deleting.
Returns:

A PurgeLatestResult describing what was done.

def rebuild_registry( source_bucket: str, output_mode: Literal['local', 'gcs', 's3'], output_path_root: str | None = None, gcs_bucket: str | None = None, s3_bucket: str | None = None, dry_run: bool = False, connector_name: list[str] | None = None) -> RebuildResult:
147def rebuild_registry(
148    source_bucket: str,
149    output_mode: OutputMode,
150    output_path_root: str | None = None,
151    gcs_bucket: str | None = None,
152    s3_bucket: str | None = None,
153    dry_run: bool = False,
154    connector_name: list[str] | None = None,
155) -> RebuildResult:
156    """Rebuild the entire registry from a source GCS bucket to an output target.
157
158    Reads all connector metadata blobs from the source GCS bucket and copies
159    them to the output target using fsspec for unified filesystem access.
160
161    The output targets are:
162    - local: Write to a local directory tree.
163    - gcs: Copy to a GCS bucket (must not be the prod bucket).
164    - s3: Copy to an S3 bucket.
165
166    Args:
167        source_bucket: The GCS bucket to read from (typically prod).
168        output_mode: Where to write: "local", "gcs", or "s3".
169        output_path_root: Root path/prefix for output. For local mode, if None
170            creates a temp directory. For GCS/S3, prepended to all blob paths.
171        gcs_bucket: Target GCS bucket name (required if output_mode="gcs").
172        s3_bucket: Target S3 bucket name (required if output_mode="s3").
173        dry_run: If True, report what would be done without writing.
174        connector_name: If provided, only rebuild these connector names
175            (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all.
176
177    Returns:
178        RebuildResult with details of the operation.
179
180    Raises:
181        ValueError: If target is the prod bucket, or required bucket arg is missing.
182    """
183    if output_mode == "gcs" and gcs_bucket:
184        _validate_not_prod_bucket(gcs_bucket)
185
186    # Resolve output root for local mode
187    effective_output_root = output_path_root or ""
188    if output_mode == "local":
189        effective_output_root = _resolve_local_output_root(output_path_root)
190
191    result = RebuildResult(
192        source_bucket=source_bucket,
193        output_mode=output_mode,
194        output_root=effective_output_root,
195        dry_run=dry_run,
196    )
197
198    # Create source filesystem (always GCS)
199    source_fs, gcs_token = _make_source_fs()
200    source_base = f"{source_bucket}/{METADATA_FOLDER}"
201
202    # List files under metadata/ in the source bucket
203    if connector_name:
204        _log_progress(
205            "Listing blobs for %d connectors under gs://%s/...",
206            len(connector_name),
207            source_base,
208        )
209        source_paths: list[str] = []
210        for name in connector_name:
211            connector_prefix = f"{source_base}/airbyte/{name}"
212            found = source_fs.find(connector_prefix)
213            source_paths.extend(found)
214            _log_progress("  %s: %d blobs", name, len(found))
215    else:
216        _log_progress("Listing all blobs under gs://%s/...", source_base)
217        source_paths = source_fs.find(source_base)
218
219    if not source_paths:
220        _log_progress("No blobs found under gs://%s/", source_base)
221        return result
222
223    total_blobs = len(source_paths)
224    _log_progress("Found %d blobs to process", total_blobs)
225
226    # Create output filesystem
227    output_fs, output_base = _make_output_fs(
228        output_mode=output_mode,
229        output_root=effective_output_root,
230        gcs_bucket=gcs_bucket,
231        s3_bucket=s3_bucket,
232        gcs_token=gcs_token,
233    )
234
235    # Collect connector names and compute relative paths for all blobs.
236    bucket_prefix = f"{source_bucket}/"
237    blob_relative_paths: list[str] = []
238    connector_names: set[str] = set()
239
240    for source_path in source_paths:
241        relative_path = source_path
242        if source_path.startswith(bucket_prefix):
243            relative_path = source_path[len(bucket_prefix) :]
244        blob_relative_paths.append(relative_path)
245
246        parts = relative_path.split("/")
247        if len(parts) >= 3:
248            connector_names.add(parts[2])
249
250    if dry_run:
251        result.blobs_copied = total_blobs
252        result.connectors_processed = len(connector_names)
253        _log_progress(
254            "[DRY RUN] Would copy %d blobs (%d connectors)",
255            total_blobs,
256            len(connector_names),
257        )
258        _log_progress(result.summary())
259        return result
260
261    # Use GCS-native server-side copy for GCS→GCS mirrors.
262    if output_mode == "gcs" and gcs_bucket:
263        result = _gcs_native_copy(
264            source_bucket=source_bucket,
265            dest_bucket_name=gcs_bucket,
266            dest_prefix=effective_output_root,
267            blob_relative_paths=blob_relative_paths,
268            connector_names=connector_names,
269            gcs_token=gcs_token,
270            result=result,
271            connector_name_filter=connector_name,
272        )
273        return result
274
275    # Fallback: fsspec-based copy for local and S3 output modes.
276    result = _fsspec_copy(
277        source_fs=source_fs,
278        source_paths=source_paths,
279        output_fs=output_fs,
280        output_base=output_base,
281        output_mode=output_mode,
282        source_bucket=source_bucket,
283        blob_relative_paths=blob_relative_paths,
284        connector_names=connector_names,
285        result=result,
286    )
287    return result

Rebuild the entire registry from a source GCS bucket to an output target.

Reads all connector metadata blobs from the source GCS bucket and copies them to the output target using fsspec for unified filesystem access.

The output targets are:

  • local: Write to a local directory tree.
  • gcs: Copy to a GCS bucket (must not be the prod bucket).
  • s3: Copy to an S3 bucket.
Arguments:
  • source_bucket: The GCS bucket to read from (typically prod).
  • output_mode: Where to write: "local", "gcs", or "s3".
  • output_path_root: Root path/prefix for output. For local mode, if None creates a temp directory. For GCS/S3, prepended to all blob paths.
  • gcs_bucket: Target GCS bucket name (required if output_mode="gcs").
  • s3_bucket: Target S3 bucket name (required if output_mode="s3").
  • dry_run: If True, report what would be done without writing.
  • connector_name: If provided, only rebuild these connector names (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all.
Returns:

RebuildResult with details of the operation.

Raises:
  • ValueError: If target is the prod bucket, or required bucket arg is missing.
def resolve_registry_store( store: str | None = None, connector_name: str | None = None, cwd: pathlib.Path | None = None, default_env: str = 'dev') -> RegistryStore:
229def resolve_registry_store(
230    store: str | None = None,
231    connector_name: str | None = None,
232    cwd: Path | None = None,
233    default_env: str = "dev",
234) -> RegistryStore:
235    """Resolve a `RegistryStore` from CLI inputs.
236
237    All applicable detection methods are evaluated.  Explicit sources
238    (`--store`, then the `AIRBYTE_REGISTRY_STORE` env var) take priority
239    and are returned directly.  When only auto-detected sources remain, they
240    are compared and a `ValueError` is raised if they disagree.
241
242    Priority (highest → lowest):
243
244    1. **Explicit** `--store` argument (e.g. `"coral:dev"`).
245    2. **Environment variable** -- `AIRBYTE_REGISTRY_STORE`.
246    3. **Auto-detected** -- connector name and/or working directory.
247       If both are present and disagree, a `ValueError` is raised.
248
249    Args:
250        store: Explicit store target string (e.g. `"coral:dev"`).
251        connector_name: Optional connector name for auto-detection.
252        cwd: Working directory for repo-based detection.
253        default_env: Environment to use when auto-detecting (default `"dev"`).
254
255    Returns:
256        A fully resolved `RegistryStore`.
257
258    Raises:
259        ValueError: If no detection method succeeds, or if auto-detected
260            methods produce conflicting store types.
261    """
262    # -- Collect all detection results ------------------------------------
263    # Explicit sources (take priority — no conflict checking needed).
264    explicit_target: RegistryStore | None = None
265    explicit_source: str | None = None
266
267    if store is not None:
268        explicit_target = RegistryStore.parse(store)
269        explicit_source = "--store"
270
271    env_store = os.environ.get(REGISTRY_STORE_ENV_VAR)
272    if env_store and explicit_target is None:
273        explicit_target = RegistryStore.parse(env_store)
274        explicit_source = REGISTRY_STORE_ENV_VAR
275
276    # Auto-detected sources (only consulted when no explicit source).
277    auto_detections: dict[str, StoreType] = {}
278
279    if connector_name is not None:
280        auto_detections["connector_name"] = StoreType.get_from_connector_name(
281            connector_name,
282        )
283
284    dir_type = StoreType.detect_from_repo_dir(cwd)
285    if dir_type is not None:
286        auto_detections["working_directory"] = dir_type
287
288    # -- Return explicit source if present --------------------------------
289    if explicit_target is not None:
290        logger.debug(
291            "Using explicit store target from %s: %s:%s",
292            explicit_source,
293            explicit_target.store_type.value,
294            explicit_target.env,
295        )
296        return explicit_target
297
298    # -- No explicit source: resolve from auto-detections -----------------
299    if not auto_detections:
300        raise ValueError(
301            "Cannot determine registry store. "
302            "Provide --store (e.g. 'coral:dev' or 'sonar:prod'), "
303            f"set ${REGISTRY_STORE_ENV_VAR}, "
304            "or run from a recognized repository directory."
305        )
306
307    distinct = set(auto_detections.values())
308
309    if len(distinct) > 1:
310        detail = ", ".join(f"{src}={st.value}" for src, st in auto_detections.items())
311        raise ValueError(
312            f"Conflicting store types detected: {detail}. "
313            "Provide an explicit --store to resolve the ambiguity."
314        )
315
316    resolved_type = distinct.pop()
317    logger.info(
318        "Auto-detected store type '%s' (sources: %s)",
319        resolved_type.value,
320        ", ".join(auto_detections),
321    )
322    return RegistryStore(store_type=resolved_type, env=default_env)

Resolve a RegistryStore from CLI inputs.

All applicable detection methods are evaluated. Explicit sources (--store, then the AIRBYTE_REGISTRY_STORE env var) take priority and are returned directly. When only auto-detected sources remain, they are compared and a ValueError is raised if they disagree.

Priority (highest → lowest):

  1. Explicit --store argument (e.g. "coral:dev").
  2. Environment variable -- AIRBYTE_REGISTRY_STORE.
  3. Auto-detected -- connector name and/or working directory. If both are present and disagree, a ValueError is raised.
Arguments:
  • store: Explicit store target string (e.g. "coral:dev").
  • connector_name: Optional connector name for auto-detection.
  • cwd: Working directory for repo-based detection.
  • default_env: Environment to use when auto-detecting (default "dev").
Returns:

A fully resolved RegistryStore.

Raises:
  • ValueError: If no detection method succeeds, or if auto-detected methods produce conflicting store types.
def unyank_connector_version( connector_name: str, version: str, bucket_name: str, dry_run: bool = False) -> YankResult:
160def unyank_connector_version(
161    connector_name: str,
162    version: str,
163    bucket_name: str,
164    dry_run: bool = False,
165) -> YankResult:
166    """Rename the active yank marker to an unyanked audit marker.
167
168    Moves the active version-yank.yml marker at:
169        metadata/airbyte/{connector_name}/{version}/version-yank.yml
170    to:
171        metadata/airbyte/{connector_name}/{version}/version-unyanked-yyyymmdd.yml
172
173    Args:
174        connector_name: The connector name (e.g., "source-faker").
175        version: The version to unyank (e.g., "1.2.3").
176        bucket_name: The GCS bucket name.
177        dry_run: If True, report what would be done without writing.
178
179    Returns:
180        YankResult with details of the operation.
181    """
182    yank_path = _get_yank_blob_path(connector_name, version)
183
184    storage_client = get_gcs_storage_client()
185    bucket = storage_client.bucket(bucket_name)
186
187    # Check if yank marker exists
188    yank_blob = bucket.blob(yank_path)
189    if not yank_blob.exists():
190        return YankResult(
191            connector_name=connector_name,
192            version=version,
193            bucket_name=bucket_name,
194            action="unyank",
195            success=False,
196            message=f"Version {version} of {connector_name} is not yanked.",
197            dry_run=dry_run,
198        )
199
200    if dry_run:
201        return YankResult(
202            connector_name=connector_name,
203            version=version,
204            bucket_name=bucket_name,
205            action="unyank",
206            success=True,
207            message=f"[DRY RUN] Would unyank {connector_name} {version}.",
208            dry_run=True,
209        )
210
211    unyanked_path = _get_yank_blob_path(connector_name, version).replace(
212        YANK_FILE_NAME,
213        unyanked_marker_file(),
214    )
215    bucket.copy_blob(yank_blob, bucket, new_name=unyanked_path)
216    yank_blob.delete()
217
218    logger.info("Unyanked %s version %s in %s", connector_name, version, bucket_name)
219
220    return YankResult(
221        connector_name=connector_name,
222        version=version,
223        bucket_name=bucket_name,
224        action="unyank",
225        success=True,
226        message=f"Successfully unyanked {connector_name} {version}.",
227    )

Rename the active yank marker to an unyanked audit marker.

Moves the active version-yank.yml marker at: metadata/airbyte/{connector_name}/{version}/version-yank.yml to: metadata/airbyte/{connector_name}/{version}/version-unyanked-yyyymmdd.yml

Arguments:
  • connector_name: The connector name (e.g., "source-faker").
  • version: The version to unyank (e.g., "1.2.3").
  • bucket_name: The GCS bucket name.
  • dry_run: If True, report what would be done without writing.
Returns:

YankResult with details of the operation.

def validate_metadata( metadata_data: dict[str, typing.Any], opts: ValidateOptions | None = None) -> ValidationResult:
270def validate_metadata(
271    metadata_data: dict[str, Any],
272    opts: ValidateOptions | None = None,
273) -> ValidationResult:
274    """Run all pre-publish validators against raw `metadata.data`.
275
276    Args:
277        metadata_data: The `data` section of a parsed `metadata.yaml`.
278        opts: Options influencing validation behaviour.
279
280    Returns:
281        A `ValidationResult` with aggregate pass/fail and error list.
282    """
283    if opts is None:
284        opts = ValidateOptions()
285
286    result = ValidationResult()
287
288    for validator in PRE_PUBLISH_VALIDATORS:
289        result.validators_run += 1
290        logger.info("Running validator: %s", validator.__name__)
291        passed, error = validator(metadata_data, opts)
292        if not passed and error:
293            logger.error("Validation failed: %s", error)
294            result.add_error(error)
295
296    return result

Run all pre-publish validators against raw metadata.data.

Arguments:
  • metadata_data: The data section of a parsed metadata.yaml.
  • opts: Options influencing validation behaviour.
Returns:

A ValidationResult with aggregate pass/fail and error list.

def yank_connector_version( connector_name: str, version: str, bucket_name: str, reason: str = '', approval_url: str = '', dry_run: bool = False) -> YankResult:
 64def yank_connector_version(
 65    connector_name: str,
 66    version: str,
 67    bucket_name: str,
 68    reason: str = "",
 69    approval_url: str = "",
 70    dry_run: bool = False,
 71) -> YankResult:
 72    """Mark a connector version as yanked by writing a version-yank.yml marker.
 73
 74    The marker file is placed at:
 75        metadata/airbyte/{connector_name}/{version}/version-yank.yml
 76
 77    Args:
 78        connector_name: The connector name (e.g., "source-faker").
 79        version: The version to yank (e.g., "1.2.3").
 80        bucket_name: The GCS bucket name.
 81        reason: Optional reason for yanking the version.
 82        approval_url: Optional approval evidence URL to record in the marker.
 83        dry_run: If True, report what would be done without writing.
 84
 85    Returns:
 86        YankResult with details of the operation.
 87
 88    Raises:
 89        ValueError: If the bucket is the production bucket and no override is set,
 90            or if the version does not exist.
 91    """
 92    yank_path = _get_yank_blob_path(connector_name, version)
 93    metadata_path = _get_metadata_blob_path(connector_name, version)
 94
 95    storage_client = get_gcs_storage_client()
 96    bucket = storage_client.bucket(bucket_name)
 97
 98    # Verify the version exists
 99    metadata_blob = bucket.blob(metadata_path)
100    if not metadata_blob.exists():
101        return YankResult(
102            connector_name=connector_name,
103            version=version,
104            bucket_name=bucket_name,
105            action="yank",
106            success=False,
107            message=f"Version {version} not found for {connector_name} in {bucket_name}.",
108            dry_run=dry_run,
109        )
110
111    # Check if already yanked
112    yank_blob = bucket.blob(yank_path)
113    if yank_blob.exists():
114        return YankResult(
115            connector_name=connector_name,
116            version=version,
117            bucket_name=bucket_name,
118            action="yank",
119            success=False,
120            message=f"Version {version} of {connector_name} is already yanked.",
121            dry_run=dry_run,
122        )
123
124    if dry_run:
125        return YankResult(
126            connector_name=connector_name,
127            version=version,
128            bucket_name=bucket_name,
129            action="yank",
130            success=True,
131            message=f"[DRY RUN] Would yank {connector_name} {version}.",
132            dry_run=True,
133        )
134
135    # Write the yank marker file
136    yank_content: dict[str, Any] = {
137        "yanked": True,
138        "yanked_at": datetime.now(tz=timezone.utc).isoformat(),
139    }
140    if reason:
141        yank_content["reason"] = reason
142    if approval_url:
143        yank_content["approval_url"] = approval_url
144
145    yank_yaml = yaml.dump(yank_content, default_flow_style=False)
146    yank_blob.upload_from_string(yank_yaml, content_type="application/x-yaml")
147
148    logger.info("Yanked %s version %s in %s", connector_name, version, bucket_name)
149
150    return YankResult(
151        connector_name=connector_name,
152        version=version,
153        bucket_name=bucket_name,
154        action="yank",
155        success=True,
156        message=f"Successfully yanked {connector_name} {version}.",
157    )

Mark a connector version as yanked by writing a version-yank.yml marker.

The marker file is placed at:

metadata/airbyte/{connector_name}/{version}/version-yank.yml

Arguments:
  • connector_name: The connector name (e.g., "source-faker").
  • version: The version to yank (e.g., "1.2.3").
  • bucket_name: The GCS bucket name.
  • reason: Optional reason for yanking the version.
  • approval_url: Optional approval evidence URL to record in the marker.
  • dry_run: If True, report what would be done without writing.
Returns:

YankResult with details of the operation.

Raises:
  • ValueError: If the bucket is the production bucket and no override is set, or if the version does not exist.