airbyte_ops_mcp.registry
Registry operations for Airbyte connectors.
This package provides functionality for:
- Reading connector metadata from the GCS registry
- Listing connectors and versions in the registry
- Publishing connector metadata to GCS
- Promoting and rolling back release candidates
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Registry operations for Airbyte connectors. 3 4This package provides functionality for: 5- Reading connector metadata from the GCS registry 6- Listing connectors and versions in the registry 7- Publishing connector metadata to GCS 8- Promoting and rolling back release candidates 9""" 10 11from __future__ import annotations 12 13from airbyte_ops_mcp.registry._constants import ( 14 DEFAULT_METADATA_SERVICE_BUCKET_NAME, 15 DEV_METADATA_SERVICE_BUCKET_NAME, 16 LATEST_GCS_FOLDER_NAME, 17 METADATA_FILE_NAME, 18 METADATA_FOLDER, 19 PROD_METADATA_SERVICE_BUCKET_NAME, 20 RELEASE_CANDIDATE_GCS_FOLDER_NAME, 21 SONAR_DEV_BUCKET_NAME, 22 SONAR_PROD_BUCKET_NAME, 23) 24from airbyte_ops_mcp.registry._enums import ( 25 ConnectorLanguage, 26 ConnectorType, 27 SupportLevel, 28) 29from airbyte_ops_mcp.registry.audit import ( 30 AuditResult, 31 UnpublishedConnector, 32 find_unpublished_connectors, 33) 34from airbyte_ops_mcp.registry.compile import ( 35 CompileResult, 36 PurgeLatestResult, 37 compile_registry, 38 purge_latest_dirs, 39) 40from airbyte_ops_mcp.registry.generate import ( 41 GenerateResult, 42 generate_version_artifacts, 43) 44from airbyte_ops_mcp.registry.models import ( 45 ConnectorListResult, 46 ConnectorMetadata, 47 ConnectorPublishResult, 48 MetadataPublishResult, 49 RegistryEntryResult, 50 VersionListResult, 51) 52from airbyte_ops_mcp.registry.operations import ( 53 get_registry_entry, 54 get_registry_spec, 55 list_connector_versions, 56 list_registry_connectors, 57 list_registry_connectors_filtered, 58) 59from airbyte_ops_mcp.registry.publish import ( 60 CONNECTOR_PATH_PREFIX, 61 create_progressive_rollout_blob, 62 delete_progressive_rollout_blob, 63 get_connector_metadata, 64 get_gcs_publish_path, 65 is_valid_for_progressive_rollout, 66 publish_connector_metadata, 67 strip_rc_suffix, 68) 69from airbyte_ops_mcp.registry.publish_artifacts import ( 70 PublishArtifactsResult, 71 publish_version_artifacts, 72) 73from airbyte_ops_mcp.registry.rebuild import ( 74 OutputMode, 75 RebuildResult, 76 rebuild_registry, 77) 78from airbyte_ops_mcp.registry.registry_store_base import ( 79 Registry, 80 get_registry, 81) 82from airbyte_ops_mcp.registry.store import ( 83 REGISTRY_STORE_ENV_VAR, 84 RegistryStore, 85 StoreType, 86 resolve_registry_store, 87) 88from airbyte_ops_mcp.registry.validate import ( 89 ValidateOptions, 90 ValidationResult, 91 validate_metadata, 92) 93from airbyte_ops_mcp.registry.yank import ( 94 YANK_FILE_NAME, 95 YankResult, 96 unyank_connector_version, 97 yank_connector_version, 98) 99 100__all__ = [ 101 "CONNECTOR_PATH_PREFIX", 102 "DEFAULT_METADATA_SERVICE_BUCKET_NAME", 103 "DEV_METADATA_SERVICE_BUCKET_NAME", 104 "LATEST_GCS_FOLDER_NAME", 105 "METADATA_FILE_NAME", 106 "METADATA_FOLDER", 107 "PROD_METADATA_SERVICE_BUCKET_NAME", 108 "REGISTRY_STORE_ENV_VAR", 109 "RELEASE_CANDIDATE_GCS_FOLDER_NAME", 110 "SONAR_DEV_BUCKET_NAME", 111 "SONAR_PROD_BUCKET_NAME", 112 "YANK_FILE_NAME", 113 "AuditResult", 114 "CompileResult", 115 "ConnectorLanguage", 116 "ConnectorListResult", 117 "ConnectorMetadata", 118 "ConnectorPublishResult", 119 "ConnectorType", 120 "GenerateResult", 121 "MetadataPublishResult", 122 "OutputMode", 123 "PublishArtifactsResult", 124 "PurgeLatestResult", 125 "RebuildResult", 126 "Registry", 127 "RegistryEntryResult", 128 "RegistryStore", 129 "StoreType", 130 "SupportLevel", 131 "UnpublishedConnector", 132 "ValidateOptions", 133 "ValidationResult", 134 "VersionListResult", 135 "YankResult", 136 "compile_registry", 137 "create_progressive_rollout_blob", 138 "delete_progressive_rollout_blob", 139 "find_unpublished_connectors", 140 "generate_version_artifacts", 141 "get_connector_metadata", 142 "get_gcs_publish_path", 143 "get_registry", 144 "get_registry_entry", 145 "get_registry_spec", 146 "is_valid_for_progressive_rollout", 147 "list_connector_versions", 148 "list_registry_connectors", 149 "list_registry_connectors_filtered", 150 "publish_connector_metadata", 151 "publish_version_artifacts", 152 "purge_latest_dirs", 153 "rebuild_registry", 154 "resolve_registry_store", 155 "strip_rc_suffix", 156 "unyank_connector_version", 157 "validate_metadata", 158 "yank_connector_version", 159]
37@dataclass 38class AuditResult: 39 """Result of auditing which connectors have unpublished versions.""" 40 41 unpublished: list[UnpublishedConnector] = field(default_factory=list) 42 checked_count: int = 0 43 skipped_archived: list[str] = field(default_factory=list) 44 skipped_rc: list[str] = field(default_factory=list) 45 skipped_disabled: list[str] = field(default_factory=list) 46 errors: list[str] = field(default_factory=list)
Result of auditing which connectors have unpublished versions.
93@dataclass 94class CompileResult: 95 """Result of a registry compile operation.""" 96 97 target: str 98 connectors_scanned: int = 0 99 versions_found: int = 0 100 yanked_versions: int = 0 101 latest_updated: int = 0 102 latest_already_current: int = 0 103 cloud_registry_entries: int = 0 104 oss_registry_entries: int = 0 105 composite_registry_entries: int = 0 106 version_indexes_written: int = 0 107 specs_secrets_mask_properties: int = 0 108 errors: list[str] = field(default_factory=list) 109 dry_run: bool = False 110 111 @property 112 def status(self) -> str: 113 if self.dry_run: 114 return "dry-run" 115 if self.errors: 116 return "completed-with-errors" 117 return "success" 118 119 def summary(self) -> str: 120 return ( 121 f"[{self.status}] Scanned {self.connectors_scanned} connectors, " 122 f"{self.versions_found} versions ({self.yanked_versions} yanked). " 123 f"Latest updated: {self.latest_updated}, " 124 f"already current: {self.latest_already_current}. " 125 f"Registry entries: cloud={self.cloud_registry_entries}, " 126 f"oss={self.oss_registry_entries}, " 127 f"composite={self.composite_registry_entries}. " 128 f"Version indexes: {self.version_indexes_written}. " 129 f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. " 130 f"Errors: {len(self.errors)}." 131 )
Result of a registry compile operation.
119 def summary(self) -> str: 120 return ( 121 f"[{self.status}] Scanned {self.connectors_scanned} connectors, " 122 f"{self.versions_found} versions ({self.yanked_versions} yanked). " 123 f"Latest updated: {self.latest_updated}, " 124 f"already current: {self.latest_already_current}. " 125 f"Registry entries: cloud={self.cloud_registry_entries}, " 126 f"oss={self.oss_registry_entries}, " 127 f"composite={self.composite_registry_entries}. " 128 f"Version indexes: {self.version_indexes_written}. " 129 f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. " 130 f"Errors: {len(self.errors)}." 131 )
88class ConnectorLanguage(StrEnum): 89 """Connector implementation languages.""" 90 91 PYTHON = "python" 92 JAVA = "java" 93 LOW_CODE = "low-code" 94 MANIFEST_ONLY = "manifest-only" 95 96 @classmethod 97 def parse(cls, value: str) -> ConnectorLanguage: 98 """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch.""" 99 try: 100 return cls(value) 101 except ValueError: 102 valid = ", ".join(f"`{m.value}`" for m in cls) 103 raise ValueError( 104 f"Unrecognized language: {value!r}. Expected one of: {valid}." 105 ) from None
Connector implementation languages.
96 @classmethod 97 def parse(cls, value: str) -> ConnectorLanguage: 98 """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch.""" 99 try: 100 return cls(value) 101 except ValueError: 102 valid = ", ".join(f"`{m.value}`" for m in cls) 103 raise ValueError( 104 f"Unrecognized language: {value!r}. Expected one of: {valid}." 105 ) from None
Parse a string into a ConnectorLanguage, raising ValueError on mismatch.
106class ConnectorListResult(BaseModel): 107 """Result of listing connectors in the registry.""" 108 109 bucket_name: str = Field(description="The GCS bucket name") 110 connector_count: int = Field(description="Number of connectors found") 111 connectors: list[str] = Field(description="List of connector names")
Result of listing connectors in the registry.
16class ConnectorMetadata(BaseModel): 17 """Connector metadata from metadata.yaml. 18 19 This model represents the essential metadata about a connector 20 read from its metadata.yaml file in the Airbyte monorepo. 21 """ 22 23 name: str = Field(description="The connector technical name") 24 docker_repository: str = Field(description="The Docker repository") 25 docker_image_tag: str = Field(description="The Docker image tag/version") 26 support_level: str | None = Field( 27 default=None, description="The support level (certified, community, etc.)" 28 ) 29 definition_id: str | None = Field( 30 default=None, description="The connector definition ID" 31 )
Connector metadata from metadata.yaml.
This model represents the essential metadata about a connector read from its metadata.yaml file in the Airbyte monorepo.
34class ConnectorPublishResult(BaseModel): 35 """Result of a connector publish operation. 36 37 This model provides detailed information about the outcome of a 38 connector publish operation (apply or rollback version override). 39 """ 40 41 connector: str = Field(description="The connector technical name") 42 version: str = Field(description="The connector version") 43 action: Literal["progressive-rollout-create", "progressive-rollout-cleanup"] = ( 44 Field(description="The action performed") 45 ) 46 status: Literal["success", "failure", "dry-run"] = Field( 47 description="The status of the operation" 48 ) 49 docker_image: str | None = Field( 50 default=None, description="The Docker image name if applicable" 51 ) 52 registry_updated: bool = Field( 53 default=False, description="Whether the registry was updated" 54 ) 55 message: str | None = Field(default=None, description="Additional status message") 56 57 def __str__(self) -> str: 58 """Return a string representation of the publish result.""" 59 status_prefix = "dry-run" if self.status == "dry-run" else self.status 60 return f"[{status_prefix}] {self.connector}:{self.version} - {self.action}"
Result of a connector publish operation.
This model provides detailed information about the outcome of a connector publish operation (apply or rollback version override).
70class ConnectorType(StrEnum): 71 """Connector type: source or destination.""" 72 73 SOURCE = "source" 74 DESTINATION = "destination" 75 76 @classmethod 77 def parse(cls, value: str) -> ConnectorType: 78 """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch.""" 79 try: 80 return cls(value) 81 except ValueError: 82 valid = ", ".join(f"`{m.value}`" for m in cls) 83 raise ValueError( 84 f"Unrecognized connector type: {value!r}. Expected one of: {valid}." 85 ) from None
Connector type: source or destination.
76 @classmethod 77 def parse(cls, value: str) -> ConnectorType: 78 """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch.""" 79 try: 80 return cls(value) 81 except ValueError: 82 valid = ", ".join(f"`{m.value}`" for m in cls) 83 raise ValueError( 84 f"Unrecognized connector type: {value!r}. Expected one of: {valid}." 85 ) from None
Parse a string into a ConnectorType, raising ValueError on mismatch.
100@dataclass 101class GenerateResult: 102 """Result of a local artifact generation run.""" 103 104 connector_name: str 105 version: str 106 docker_image: str 107 output_dir: str 108 artifacts_written: list[str] = field(default_factory=list) 109 errors: list[str] = field(default_factory=list) 110 validation_errors: list[str] = field(default_factory=list) 111 dry_run: bool = False 112 113 @property 114 def success(self) -> bool: 115 return len(self.errors) == 0 and len(self.validation_errors) == 0 116 117 def to_dict(self) -> dict[str, Any]: 118 return { 119 "connector_name": self.connector_name, 120 "version": self.version, 121 "docker_image": self.docker_image, 122 "output_dir": self.output_dir, 123 "artifacts_written": self.artifacts_written, 124 "errors": self.errors, 125 "validation_errors": self.validation_errors, 126 "dry_run": self.dry_run, 127 "success": self.success, 128 }
Result of a local artifact generation run.
117 def to_dict(self) -> dict[str, Any]: 118 return { 119 "connector_name": self.connector_name, 120 "version": self.version, 121 "docker_image": self.docker_image, 122 "output_dir": self.output_dir, 123 "artifacts_written": self.artifacts_written, 124 "errors": self.errors, 125 "validation_errors": self.validation_errors, 126 "dry_run": self.dry_run, 127 "success": self.success, 128 }
63class MetadataPublishResult(BaseModel): 64 """Result of a metadata publish operation to GCS. 65 66 This model provides detailed information about the outcome of 67 publishing connector metadata to the registry. 68 """ 69 70 connector_name: str = Field(description="The connector technical name") 71 version: str = Field(description="The version that was published") 72 bucket_name: str = Field(description="The GCS bucket name") 73 versioned_path: str = Field(description="The versioned GCS path") 74 latest_path: str | None = Field( 75 default=None, description="The latest GCS path if updated" 76 ) 77 versioned_uploaded: bool = Field( 78 default=False, description="Whether the versioned metadata was uploaded" 79 ) 80 latest_uploaded: bool = Field( 81 default=False, description="Whether the latest metadata was uploaded" 82 ) 83 status: Literal["success", "dry-run", "already-up-to-date"] = Field( 84 description="The status of the operation" 85 ) 86 message: str = Field(description="Status message describing the outcome") 87 88 def __str__(self) -> str: 89 """Return a string representation of the publish result.""" 90 return f"[{self.status}] {self.connector_name}:{self.version} -> {self.versioned_path}"
Result of a metadata publish operation to GCS.
This model provides detailed information about the outcome of publishing connector metadata to the registry.
46@dataclass 47class PublishArtifactsResult: 48 """Result of a version-artifacts publish operation.""" 49 50 connector_name: str 51 version: str 52 target: str 53 gcs_destination: str 54 files_uploaded: list[str] = field(default_factory=list) 55 errors: list[str] = field(default_factory=list) 56 validation_errors: list[str] = field(default_factory=list) 57 dry_run: bool = False 58 59 @property 60 def success(self) -> bool: 61 return len(self.errors) == 0 and len(self.validation_errors) == 0 62 63 @property 64 def status(self) -> str: 65 if self.dry_run: 66 return "dry-run" 67 if self.errors or self.validation_errors: 68 return "completed-with-errors" 69 return "success"
Result of a version-artifacts publish operation.
134@dataclass 135class PurgeLatestResult: 136 """Result of a purge-latest operation.""" 137 138 target: str 139 connectors_found: int = 0 140 latest_dirs_deleted: int = 0 141 errors: list[str] = field(default_factory=list) 142 dry_run: bool = False 143 144 @property 145 def status(self) -> str: 146 if self.dry_run: 147 return "dry-run" 148 if self.errors: 149 return "completed-with-errors" 150 return "success" 151 152 def summary(self) -> str: 153 return ( 154 f"[{self.status}] Found {self.connectors_found} connectors, " 155 f"deleted {self.latest_dirs_deleted} latest/ directories. " 156 f"Errors: {len(self.errors)}." 157 )
Result of a purge-latest operation.
43@dataclass 44class RebuildResult: 45 """Result of a registry rebuild operation.""" 46 47 source_bucket: str 48 output_mode: OutputMode 49 output_root: str 50 connectors_processed: int = 0 51 blobs_copied: int = 0 52 blobs_skipped: int = 0 53 errors: list[str] = field(default_factory=list) 54 dry_run: bool = False 55 56 @property 57 def status(self) -> str: 58 """Return the status of the rebuild operation.""" 59 if self.dry_run: 60 return "dry-run" 61 if self.errors: 62 return "completed-with-errors" 63 return "success" 64 65 def summary(self) -> str: 66 """Return a human-readable summary.""" 67 return ( 68 f"[{self.status}] Rebuilt {self.connectors_processed} connectors, " 69 f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, " 70 f"{len(self.errors)} errors. Output: {self.output_root}" 71 )
Result of a registry rebuild operation.
56 @property 57 def status(self) -> str: 58 """Return the status of the rebuild operation.""" 59 if self.dry_run: 60 return "dry-run" 61 if self.errors: 62 return "completed-with-errors" 63 return "success"
Return the status of the rebuild operation.
65 def summary(self) -> str: 66 """Return a human-readable summary.""" 67 return ( 68 f"[{self.status}] Rebuilt {self.connectors_processed} connectors, " 69 f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, " 70 f"{len(self.errors)} errors. Output: {self.output_root}" 71 )
Return a human-readable summary.
38class Registry(ABC): 39 """A configured connector registry store. 40 41 A Registry is bound to a specific `airbyte_ops_mcp.registry.store.RegistryStore` 42 (store type + env + optional prefix), and provides methods used by the CLI to 43 read/write registry contents. 44 """ 45 46 def __init__(self, store: RegistryStore) -> None: 47 self.store = store 48 49 @property 50 def store_type(self) -> StoreType: 51 return self.store.store_type 52 53 @property 54 def bucket_name(self) -> str: 55 return self.store.bucket 56 57 @property 58 def prefix(self) -> str: 59 return self.store.prefix 60 61 def _require_no_prefix(self, op_name: str) -> None: 62 """Raise if this op doesn't support prefixed targets.""" 63 64 if self.prefix: 65 raise NotImplementedError( 66 f"Operation '{op_name}' does not yet support store prefixes (got prefix='{self.prefix}')." 67 ) 68 69 # --------------------------------------------------------------------- 70 # Read operations 71 # --------------------------------------------------------------------- 72 73 @abstractmethod 74 def list_connectors( 75 self, 76 *, 77 support_level: SupportLevel | None = None, 78 min_support_level: SupportLevel | None = None, 79 connector_type: ConnectorType | None = None, 80 language: ConnectorLanguage | None = None, 81 ) -> list[str]: 82 raise NotImplementedError( 83 _op_not_implemented_message(self.store_type, "list_connectors") 84 ) 85 86 def list_connector_versions(self, connector_name: str) -> list[str]: 87 raise NotImplementedError( 88 _op_not_implemented_message(self.store_type, "list_connector_versions") 89 ) 90 91 def get_connector_metadata( 92 self, connector_name: str, version: str = "latest" 93 ) -> dict[str, Any]: 94 raise NotImplementedError( 95 _op_not_implemented_message(self.store_type, "get_connector_metadata") 96 ) 97 98 # --------------------------------------------------------------------- 99 # Write / mutate operations 100 # --------------------------------------------------------------------- 101 102 def progressive_rollout_create( 103 self, 104 repo_path: Path, 105 connector_name: str, 106 dry_run: bool = False, 107 ) -> ConnectorPublishResult: 108 raise NotImplementedError( 109 _op_not_implemented_message(self.store_type, "progressive_rollout_create") 110 ) 111 112 def progressive_rollout_cleanup( 113 self, 114 repo_path: Path, 115 connector_name: str, 116 dry_run: bool = False, 117 ) -> ConnectorPublishResult: 118 raise NotImplementedError( 119 _op_not_implemented_message(self.store_type, "progressive_rollout_cleanup") 120 ) 121 122 def yank( 123 self, 124 connector_name: str, 125 version: str, 126 reason: str = "", 127 dry_run: bool = False, 128 ) -> YankResult: 129 raise NotImplementedError(_op_not_implemented_message(self.store_type, "yank")) 130 131 def unyank( 132 self, 133 connector_name: str, 134 version: str, 135 dry_run: bool = False, 136 ) -> YankResult: 137 raise NotImplementedError( 138 _op_not_implemented_message(self.store_type, "unyank") 139 ) 140 141 def publish_version_artifacts( 142 self, 143 connector_name: str, 144 version: str, 145 artifacts_dir: Path, 146 dry_run: bool = False, 147 with_validate: bool = True, 148 ) -> PublishArtifactsResult: 149 raise NotImplementedError( 150 _op_not_implemented_message(self.store_type, "publish_version_artifacts") 151 ) 152 153 def delete_dev_latest( 154 self, 155 connector_name: list[str] | None = None, 156 dry_run: bool = False, 157 ) -> PurgeLatestResult: 158 raise NotImplementedError( 159 _op_not_implemented_message(self.store_type, "delete_dev_latest") 160 ) 161 162 def compile( 163 self, 164 connector_name: list[str] | None = None, 165 dry_run: bool = False, 166 with_secrets_mask: bool = False, 167 with_legacy_migration: str | None = None, 168 force: bool = False, 169 ) -> CompileResult: 170 raise NotImplementedError( 171 _op_not_implemented_message(self.store_type, "compile") 172 ) 173 174 def marketing_stubs_check(self, repo_root: Path) -> dict[str, Any]: 175 raise NotImplementedError( 176 _op_not_implemented_message(self.store_type, "marketing_stubs_check") 177 ) 178 179 def marketing_stubs_sync( 180 self, 181 repo_root: Path, 182 dry_run: bool = False, 183 ) -> dict[str, Any]: 184 raise NotImplementedError( 185 _op_not_implemented_message(self.store_type, "marketing_stubs_sync") 186 ) 187 188 def mirror( 189 self, 190 output_mode: OutputMode, 191 output_path_root: str | None = None, 192 gcs_bucket: str | None = None, 193 s3_bucket: str | None = None, 194 dry_run: bool = False, 195 connector_name: list[str] | None = None, 196 ) -> RebuildResult: 197 raise NotImplementedError( 198 _op_not_implemented_message(self.store_type, "mirror") 199 )
A configured connector registry store.
A Registry is bound to a specific airbyte_ops_mcp.registry.store.RegistryStore
(store type + env + optional prefix), and provides methods used by the CLI to
read/write registry contents.
73 @abstractmethod 74 def list_connectors( 75 self, 76 *, 77 support_level: SupportLevel | None = None, 78 min_support_level: SupportLevel | None = None, 79 connector_type: ConnectorType | None = None, 80 language: ConnectorLanguage | None = None, 81 ) -> list[str]: 82 raise NotImplementedError( 83 _op_not_implemented_message(self.store_type, "list_connectors") 84 )
141 def publish_version_artifacts( 142 self, 143 connector_name: str, 144 version: str, 145 artifacts_dir: Path, 146 dry_run: bool = False, 147 with_validate: bool = True, 148 ) -> PublishArtifactsResult: 149 raise NotImplementedError( 150 _op_not_implemented_message(self.store_type, "publish_version_artifacts") 151 )
162 def compile( 163 self, 164 connector_name: list[str] | None = None, 165 dry_run: bool = False, 166 with_secrets_mask: bool = False, 167 with_legacy_migration: str | None = None, 168 force: bool = False, 169 ) -> CompileResult: 170 raise NotImplementedError( 171 _op_not_implemented_message(self.store_type, "compile") 172 )
188 def mirror( 189 self, 190 output_mode: OutputMode, 191 output_path_root: str | None = None, 192 gcs_bucket: str | None = None, 193 s3_bucket: str | None = None, 194 dry_run: bool = False, 195 connector_name: list[str] | None = None, 196 ) -> RebuildResult: 197 raise NotImplementedError( 198 _op_not_implemented_message(self.store_type, "mirror") 199 )
93class RegistryEntryResult(BaseModel): 94 """Result of reading a registry entry from GCS. 95 96 This model wraps the raw metadata dictionary with additional context. 97 """ 98 99 connector_name: str = Field(description="The connector technical name") 100 version: str = Field(description="The version that was read") 101 bucket_name: str = Field(description="The GCS bucket name") 102 gcs_path: str = Field(description="The GCS path that was read") 103 metadata: dict = Field(description="The raw metadata dictionary")
Result of reading a registry entry from GCS.
This model wraps the raw metadata dictionary with additional context.
132@dataclass(frozen=True, kw_only=True) 133class RegistryStore: 134 """Parsed store target (type + environment + optional prefix). 135 136 Examples:: 137 138 RegistryStore.parse("coral:dev") 139 # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="") 140 RegistryStore.parse("coral:dev/aj-test") 141 # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test") 142 RegistryStore.parse("sonar:prod") 143 # -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="") 144 """ 145 146 store_type: StoreType 147 env: str 148 prefix: str = "" 149 150 # -- Derived helpers ----------------------------------------------------- 151 152 @property 153 def bucket(self) -> str: 154 """Resolve the concrete bucket name for this target.""" 155 env_map = BUCKET_MAP.get(self.store_type) 156 if env_map is None: 157 raise ValueError(f"Unknown store type: {self.store_type!r}") 158 bucket_name = env_map.get(self.env) 159 if bucket_name is None: 160 raise ValueError( 161 f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. " 162 f"Expected one of: {', '.join(sorted(env_map))}." 163 ) 164 return bucket_name 165 166 @property 167 def bucket_root(self) -> str: 168 """Bucket name with optional prefix appended (`bucket/prefix`).""" 169 if self.prefix: 170 return f"{self.bucket}/{self.prefix}" 171 return self.bucket 172 173 # -- Parsing ------------------------------------------------------------- 174 175 @classmethod 176 def parse(cls, target: str) -> RegistryStore: 177 """Parse a store target string. 178 179 Accepted formats: 180 181 "coral:dev" 182 183 "coral:prod" 184 "coral:dev/aj-test100" 185 "sonar:prod" 186 187 Raises: 188 ValueError: If the string cannot be parsed or references an 189 unknown store type / environment. 190 """ 191 if ":" not in target: 192 raise ValueError( 193 f"Invalid store target '{target}'. " 194 "Expected format: '<store_type>:<env>[/<prefix>]' " 195 "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')." 196 ) 197 198 store_part, env_part = target.split(":", 1) 199 200 # Validate store type 201 store_part_lower = store_part.lower() 202 valid_types = {t.value: t for t in StoreType} 203 if store_part_lower not in valid_types: 204 raise ValueError( 205 f"Unknown store type '{store_part}'. " 206 f"Expected one of: {', '.join(sorted(valid_types))}." 207 ) 208 store_type = valid_types[store_part_lower] 209 210 # Split env and prefix 211 env_key, _, prefix = env_part.partition("/") 212 prefix = prefix.strip("/") 213 214 # Validate env 215 env_map = BUCKET_MAP.get(store_type, {}) 216 if env_key not in env_map: 217 raise ValueError( 218 f"Unknown environment '{env_key}' for store type '{store_type.value}'. " 219 f"Expected one of: {', '.join(sorted(env_map))}." 220 ) 221 222 return cls(store_type=store_type, env=env_key, prefix=prefix)
Parsed store target (type + environment + optional prefix).
Examples::
RegistryStore.parse("coral:dev")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="")
RegistryStore.parse("coral:dev/aj-test")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test")
RegistryStore.parse("sonar:prod")
# -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="")
152 @property 153 def bucket(self) -> str: 154 """Resolve the concrete bucket name for this target.""" 155 env_map = BUCKET_MAP.get(self.store_type) 156 if env_map is None: 157 raise ValueError(f"Unknown store type: {self.store_type!r}") 158 bucket_name = env_map.get(self.env) 159 if bucket_name is None: 160 raise ValueError( 161 f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. " 162 f"Expected one of: {', '.join(sorted(env_map))}." 163 ) 164 return bucket_name
Resolve the concrete bucket name for this target.
166 @property 167 def bucket_root(self) -> str: 168 """Bucket name with optional prefix appended (`bucket/prefix`).""" 169 if self.prefix: 170 return f"{self.bucket}/{self.prefix}" 171 return self.bucket
Bucket name with optional prefix appended (bucket/prefix).
175 @classmethod 176 def parse(cls, target: str) -> RegistryStore: 177 """Parse a store target string. 178 179 Accepted formats: 180 181 "coral:dev" 182 183 "coral:prod" 184 "coral:dev/aj-test100" 185 "sonar:prod" 186 187 Raises: 188 ValueError: If the string cannot be parsed or references an 189 unknown store type / environment. 190 """ 191 if ":" not in target: 192 raise ValueError( 193 f"Invalid store target '{target}'. " 194 "Expected format: '<store_type>:<env>[/<prefix>]' " 195 "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')." 196 ) 197 198 store_part, env_part = target.split(":", 1) 199 200 # Validate store type 201 store_part_lower = store_part.lower() 202 valid_types = {t.value: t for t in StoreType} 203 if store_part_lower not in valid_types: 204 raise ValueError( 205 f"Unknown store type '{store_part}'. " 206 f"Expected one of: {', '.join(sorted(valid_types))}." 207 ) 208 store_type = valid_types[store_part_lower] 209 210 # Split env and prefix 211 env_key, _, prefix = env_part.partition("/") 212 prefix = prefix.strip("/") 213 214 # Validate env 215 env_map = BUCKET_MAP.get(store_type, {}) 216 if env_key not in env_map: 217 raise ValueError( 218 f"Unknown environment '{env_key}' for store type '{store_type.value}'. " 219 f"Expected one of: {', '.join(sorted(env_map))}." 220 ) 221 222 return cls(store_type=store_type, env=env_key, prefix=prefix)
Parse a store target string.
Accepted formats:
"coral:dev"
"coral:prod" "coral:dev/aj-test100" "sonar:prod"
Raises:
- ValueError: If the string cannot be parsed or references an unknown store type / environment.
59class StoreType(str, Enum): 60 """Registry store type identifier.""" 61 62 SONAR = "sonar" 63 CORAL = "coral" 64 65 # -- Auto-detection class methods ---------------------------------------- 66 67 @classmethod 68 def get_from_connector_name(cls, name: str) -> StoreType: 69 """Infer the store type from a connector's technical name. 70 71 Connectors whose name starts with `source-` or `destination-` belong 72 to the **coral** registry. All other names belong to **sonar**. 73 74 Args: 75 name: Connector technical name (e.g. `"source-github"` or `"stripe"`). 76 77 Returns: 78 The inferred `StoreType`. 79 """ 80 if name.startswith("source-") or name.startswith("destination-"): 81 return cls.CORAL 82 return cls.SONAR 83 84 @classmethod 85 def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None: 86 """Infer the store type from a repository working directory. 87 88 Checks for well-known directory markers: 89 90 * **sonar** -- `integrations/` alongside `connector-sdk/` 91 * **coral** -- `airbyte-integrations/connectors/` 92 93 Args: 94 path: Directory to inspect. Defaults to `Path.cwd`. 95 96 Returns: 97 The inferred `StoreType`, or `None` if the directory does 98 not match any known registry repository layout. 99 """ 100 if path is None: 101 path = Path.cwd() 102 103 # Sonar markers 104 if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir(): 105 return cls.SONAR 106 107 # Coral markers 108 if (path / "airbyte-integrations" / "connectors").is_dir(): 109 return cls.CORAL 110 111 return None
Registry store type identifier.
67 @classmethod 68 def get_from_connector_name(cls, name: str) -> StoreType: 69 """Infer the store type from a connector's technical name. 70 71 Connectors whose name starts with `source-` or `destination-` belong 72 to the **coral** registry. All other names belong to **sonar**. 73 74 Args: 75 name: Connector technical name (e.g. `"source-github"` or `"stripe"`). 76 77 Returns: 78 The inferred `StoreType`. 79 """ 80 if name.startswith("source-") or name.startswith("destination-"): 81 return cls.CORAL 82 return cls.SONAR
Infer the store type from a connector's technical name.
Connectors whose name starts with source- or destination- belong
to the coral registry. All other names belong to sonar.
Arguments:
- name: Connector technical name (e.g.
"source-github"or"stripe").
Returns:
The inferred
StoreType.
84 @classmethod 85 def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None: 86 """Infer the store type from a repository working directory. 87 88 Checks for well-known directory markers: 89 90 * **sonar** -- `integrations/` alongside `connector-sdk/` 91 * **coral** -- `airbyte-integrations/connectors/` 92 93 Args: 94 path: Directory to inspect. Defaults to `Path.cwd`. 95 96 Returns: 97 The inferred `StoreType`, or `None` if the directory does 98 not match any known registry repository layout. 99 """ 100 if path is None: 101 path = Path.cwd() 102 103 # Sonar markers 104 if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir(): 105 return cls.SONAR 106 107 # Coral markers 108 if (path / "airbyte-integrations" / "connectors").is_dir(): 109 return cls.CORAL 110 111 return None
Infer the store type from a repository working directory.
Checks for well-known directory markers:
- sonar --
integrations/alongsideconnector-sdk/ - coral --
airbyte-integrations/connectors/
Arguments:
- path: Directory to inspect. Defaults to
Path.cwd.
Returns:
The inferred
StoreType, orNoneif the directory does not match any known registry repository layout.
14class SupportLevel(StrEnum): 15 """Connector support levels ordered by precedence.""" 16 17 ARCHIVED = "archived" 18 COMMUNITY = "community" 19 CERTIFIED = "certified" 20 21 @property 22 def precedence(self) -> int: 23 """Numeric precedence for ordering comparisons. 24 25 Higher values indicate higher support commitment. 26 """ 27 return _SUPPORT_LEVEL_PRECEDENCE[self] 28 29 @classmethod 30 def from_precedence(cls, precedence: int) -> SupportLevel: 31 """Look up a `SupportLevel` by its numeric precedence value. 32 33 Raises `ValueError` when the precedence is not recognised. 34 """ 35 for member in cls: 36 if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence: 37 return member 38 valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 39 raise ValueError( 40 f"Unrecognized support-level precedence: {precedence!r}. " 41 f"Expected one of: {valid}." 42 ) from None 43 44 @classmethod 45 def parse(cls, value: str) -> SupportLevel: 46 """Parse a string into a `SupportLevel`. 47 48 Accepts a keyword (`archived`, `community`, `certified`) 49 or a legacy integer string (`100`, `200`, `300`). 50 51 Raises `ValueError` when the value is not recognised. 52 """ 53 try: 54 return cls(value) 55 except ValueError: 56 pass 57 # Fallback: try interpreting as an integer precedence value. 58 try: 59 return cls.from_precedence(int(value)) 60 except (ValueError, KeyError): 61 pass 62 valid_kw = ", ".join(f"`{m.value}`" for m in cls) 63 valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 64 raise ValueError( 65 f"Unrecognized support level: {value!r}. " 66 f"Expected keyword ({valid_kw}) or integer ({valid_int})." 67 ) from None
Connector support levels ordered by precedence.
21 @property 22 def precedence(self) -> int: 23 """Numeric precedence for ordering comparisons. 24 25 Higher values indicate higher support commitment. 26 """ 27 return _SUPPORT_LEVEL_PRECEDENCE[self]
Numeric precedence for ordering comparisons.
Higher values indicate higher support commitment.
29 @classmethod 30 def from_precedence(cls, precedence: int) -> SupportLevel: 31 """Look up a `SupportLevel` by its numeric precedence value. 32 33 Raises `ValueError` when the precedence is not recognised. 34 """ 35 for member in cls: 36 if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence: 37 return member 38 valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 39 raise ValueError( 40 f"Unrecognized support-level precedence: {precedence!r}. " 41 f"Expected one of: {valid}." 42 ) from None
Look up a SupportLevel by its numeric precedence value.
Raises ValueError when the precedence is not recognised.
44 @classmethod 45 def parse(cls, value: str) -> SupportLevel: 46 """Parse a string into a `SupportLevel`. 47 48 Accepts a keyword (`archived`, `community`, `certified`) 49 or a legacy integer string (`100`, `200`, `300`). 50 51 Raises `ValueError` when the value is not recognised. 52 """ 53 try: 54 return cls(value) 55 except ValueError: 56 pass 57 # Fallback: try interpreting as an integer precedence value. 58 try: 59 return cls.from_precedence(int(value)) 60 except (ValueError, KeyError): 61 pass 62 valid_kw = ", ".join(f"`{m.value}`" for m in cls) 63 valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 64 raise ValueError( 65 f"Unrecognized support level: {value!r}. " 66 f"Expected keyword ({valid_kw}) or integer ({valid_int})." 67 ) from None
Parse a string into a SupportLevel.
Accepts a keyword (archived, community, certified)
or a legacy integer string (100, 200, 300).
Raises ValueError when the value is not recognised.
29@dataclass 30class UnpublishedConnector: 31 """A connector whose current local version is not published on GCS.""" 32 33 connector_name: str 34 local_version: str
A connector whose current local version is not published on GCS.
37@dataclass(frozen=True) 38class ValidateOptions: 39 """Options that influence which validators run and how.""" 40 41 docs_path: str | None = None 42 """Path to the connector's documentation file (for `validate_docs_path_exists`).""" 43 44 is_prerelease: bool = False 45 """Whether this is a pre-release build (skips version-decrement checks)."""
Options that influence which validators run and how.
48@dataclass 49class ValidationResult: 50 """Aggregate result from running all validators.""" 51 52 passed: bool = True 53 errors: list[str] = field(default_factory=list) 54 validators_run: int = 0 55 56 def add_error(self, message: str) -> None: 57 self.passed = False 58 self.errors.append(message)
Aggregate result from running all validators.
114class VersionListResult(BaseModel): 115 """Result of listing versions for a connector.""" 116 117 connector_name: str = Field(description="The connector technical name") 118 bucket_name: str = Field(description="The GCS bucket name") 119 version_count: int = Field(description="Number of versions found") 120 versions: list[str] = Field(description="List of version strings")
Result of listing versions for a connector.
30@dataclass 31class YankResult: 32 """Result of a yank or unyank operation.""" 33 34 connector_name: str 35 version: str 36 bucket_name: str 37 action: str # "yank" or "unyank" 38 success: bool 39 message: str 40 dry_run: bool = False 41 42 def to_dict(self) -> dict[str, Any]: 43 """Convert to a dictionary for JSON serialization.""" 44 return { 45 "connector_name": self.connector_name, 46 "version": self.version, 47 "bucket_name": self.bucket_name, 48 "action": self.action, 49 "success": self.success, 50 "message": self.message, 51 "dry_run": self.dry_run, 52 }
Result of a yank or unyank operation.
42 def to_dict(self) -> dict[str, Any]: 43 """Convert to a dictionary for JSON serialization.""" 44 return { 45 "connector_name": self.connector_name, 46 "version": self.version, 47 "bucket_name": self.bucket_name, 48 "action": self.action, 49 "success": self.success, 50 "message": self.message, 51 "dry_run": self.dry_run, 52 }
Convert to a dictionary for JSON serialization.
1500def compile_registry( 1501 *, 1502 store: RegistryStore, 1503 connector_name: list[str] | None = None, 1504 dry_run: bool = False, 1505 with_secrets_mask: bool = False, 1506 with_legacy_migration: str | None = None, 1507 force: bool = False, 1508) -> CompileResult: 1509 """Compile the registry: sync latest/ dirs and write index files. 1510 1511 Steps: 1512 1. Glob for all `metadata.yaml` to discover (connector, version) pairs. 1513 2. Glob for `version-yank.yml` to build the yanked set. 1514 3. Compute the latest GA semver per connector. 1515 4. Glob for `version=*` markers in `latest/` dirs for a fast check. 1516 5. Delete stale `latest/` dirs and recursively copy the versioned dir. 1517 5m. (Optional) Legacy migration: delete disabled registry entries. 1518 6. Write global registry JSONs and per-connector `versions.json`. 1519 6c. (Optional) Regenerate `specs_secrets_mask.yaml`. 1520 1521 Args: 1522 store: Registry store (bucket + optional prefix). 1523 connector_name: If provided, only resync `latest/` directories for 1524 these connectors (steps 4-5). Index rebuilds (steps 6a-6c) 1525 always operate on the full set of connectors so that global 1526 registry files remain complete. 1527 dry_run: If True, report what would be done without writing. 1528 with_secrets_mask: If True, regenerate `specs_secrets_mask.yaml`. 1529 with_legacy_migration: If set, run the named migration step. 1530 Currently supported: `"v1"` — delete `{registry_type}.json` 1531 files for connectors whose `registryOverrides.{registry}.enabled` 1532 is `false`. 1533 force: If True, resync all connectors' latest/ directories even if the 1534 existing version marker matches the computed latest version. This 1535 is useful when metadata content changes without a version bump. 1536 1537 Returns: 1538 A `CompileResult` describing what was done. 1539 """ 1540 if with_legacy_migration and with_legacy_migration not in LEGACY_MIGRATION_VERSIONS: 1541 raise ValueError( 1542 f"Unknown legacy migration version: {with_legacy_migration!r}. " 1543 f"Supported: {', '.join(LEGACY_MIGRATION_VERSIONS)}" 1544 ) 1545 1546 result = CompileResult(target=store.bucket_root, dry_run=dry_run) 1547 1548 token = get_gcs_credentials_token() 1549 fs = gcsfs.GCSFileSystem(token=token) 1550 1551 # --- Step 1 + 2: Scan versions and yanks --- 1552 # Always scan ALL connectors so that index rebuilds (step 6) are complete. 1553 _log_progress("Step 1-2: Scanning versions and yank markers...") 1554 connector_versions, yanked = _scan_versions_and_yanks( 1555 fs, 1556 store=store, 1557 connector_name=None, 1558 ) 1559 result.connectors_scanned = len(connector_versions) 1560 result.versions_found = sum(len(v) for v in connector_versions.values()) 1561 result.yanked_versions = len(yanked) 1562 _log_progress( 1563 " Found %d connectors, %d versions, %d yanked", 1564 result.connectors_scanned, 1565 result.versions_found, 1566 result.yanked_versions, 1567 ) 1568 1569 # --- Step 2b: Scan release candidates --- 1570 _log_progress("Step 2b: Scanning for active release candidates...") 1571 rc_versions = _scan_release_candidates( 1572 fs, 1573 store=store, 1574 connector_name=connector_name, 1575 ) 1576 _log_progress(" Found %d active release candidates", len(rc_versions)) 1577 1578 # --- Step 3: Compute latest --- 1579 _log_progress("Step 3: Computing latest GA version per connector...") 1580 latest_versions = _compute_latest_versions( 1581 connector_versions=connector_versions, 1582 yanked=yanked, 1583 fs=fs, 1584 store=store, 1585 ) 1586 _log_progress(" Computed latest for %d connectors", len(latest_versions)) 1587 1588 # --- Step 4: Check existing latest markers --- 1589 # When --connector-name is set, only check/sync those connectors (steps 4-5). 1590 # Index rebuilds in step 6 always use the full unfiltered data. 1591 if connector_name: 1592 connector_name_set = set(connector_name) 1593 sync_scope = { 1594 c: v for c, v in latest_versions.items() if c in connector_name_set 1595 } 1596 _log_progress( 1597 " --connector-name filter: syncing %d of %d connectors", 1598 len(sync_scope), 1599 len(latest_versions), 1600 ) 1601 else: 1602 sync_scope = latest_versions 1603 1604 _log_progress("Step 4: Checking existing latest/ markers...") 1605 sync_scope_names = list(sync_scope) if connector_name else None 1606 existing_markers = _scan_latest_markers( 1607 fs, 1608 store=store, 1609 connector_name=sync_scope_names, 1610 ) 1611 _log_progress(" Found %d existing markers", len(existing_markers)) 1612 1613 stale_connectors: list[str] = [] 1614 for connector, expected_version in sync_scope.items(): 1615 current_marker = existing_markers.get(connector) 1616 if not force and current_marker == expected_version: 1617 result.latest_already_current += 1 1618 else: 1619 stale_connectors.append(connector) 1620 1621 _log_progress( 1622 " %d connectors need latest/ update, %d already current", 1623 len(stale_connectors), 1624 result.latest_already_current, 1625 ) 1626 1627 # --- Step 5: Resync stale latest/ dirs (parallel) --- 1628 if stale_connectors: 1629 _log_progress( 1630 "Step 5: Syncing %d stale latest/ directories (max_workers=%d)...", 1631 len(stale_connectors), 1632 _COMPILE_SYNC_MAX_WORKERS, 1633 ) 1634 1635 def _sync_one_connector(connector: str) -> None: 1636 """Sync a single connector's latest/ dir.""" 1637 version = latest_versions[connector] 1638 _sync_latest_dir( 1639 fs, 1640 store=store, 1641 connector=connector, 1642 version=version, 1643 dry_run=dry_run, 1644 ) 1645 if not dry_run: 1646 _apply_overrides_to_latest_entry( 1647 fs, 1648 store=store, 1649 connector=connector, 1650 version=version, 1651 ) 1652 1653 sorted_stale = sorted(stale_connectors) 1654 with ThreadPoolExecutor(max_workers=_COMPILE_SYNC_MAX_WORKERS) as pool: 1655 futures = {pool.submit(_sync_one_connector, c): c for c in sorted_stale} 1656 for i, future in enumerate(as_completed(futures), 1): 1657 connector = futures[future] 1658 try: 1659 future.result() 1660 result.latest_updated += 1 1661 except Exception as exc: 1662 error_msg = f"Failed to sync latest/ for {connector}: {exc}" 1663 logger.error(error_msg) 1664 result.errors.append(error_msg) 1665 # Delete the (possibly partial) latest/ dir so the next 1666 # compile retries this connector from scratch. 1667 try: 1668 _delete_latest_dir( 1669 fs, 1670 store=store, 1671 connector=connector, 1672 ) 1673 logger.info( 1674 "Cleaned up partial latest/ for %s after failure", 1675 connector, 1676 ) 1677 except Exception as cleanup_exc: 1678 logger.warning( 1679 "Could not clean up latest/ for %s: %s", 1680 connector, 1681 cleanup_exc, 1682 ) 1683 if i % 100 == 0: 1684 _log_progress(" Synced %d / %d...", i, len(sorted_stale)) 1685 else: 1686 _log_progress("Step 5: All latest/ directories are current, nothing to sync.") 1687 1688 # --- Step 5m: Legacy migration (optional) --- 1689 if with_legacy_migration == "v1": 1690 _log_progress( 1691 "Step 5m: Legacy migration v1 — deleting disabled registry entries..." 1692 ) 1693 migration_deleted = _cleanup_disabled_registry_entries( 1694 fs, 1695 store=store, 1696 connector_versions=connector_versions, 1697 dry_run=dry_run, 1698 ) 1699 total_deleted = sum(len(v) for v in migration_deleted.values()) 1700 if migration_deleted: 1701 for conn, paths in sorted(migration_deleted.items()): 1702 _log_progress( 1703 " %s: %s %d files", 1704 conn, 1705 "would delete" if dry_run else "deleted", 1706 len(paths), 1707 ) 1708 _log_progress( 1709 " Migration v1: %s %d files across %d connectors", 1710 "would delete" if dry_run else "deleted", 1711 total_deleted, 1712 len(migration_deleted), 1713 ) 1714 1715 # --- Step 6a: Compile global registry JSONs --- 1716 _log_progress("Step 6a: Compiling global registry JSON files...") 1717 all_registry_entries: list[dict[str, Any]] = [] # collected for Step 6c 1718 entries_by_registry_type: dict[str, list[dict[str, Any]]] = {} 1719 for registry_type in ("cloud", "oss"): 1720 entries = _compile_global_registry( 1721 fs, 1722 store=store, 1723 latest_versions=latest_versions, 1724 registry_type=registry_type, 1725 ) 1726 1727 # Inject release candidate info into entries that have active RCs. 1728 # For each connector with a release_candidate/metadata.yaml, read the 1729 # RC version's {registry_type}.json and add it under 1730 # releases.releaseCandidates[version] — matching the legacy format. 1731 if rc_versions: 1732 rc_entries: dict[str, dict[str, Any]] = {} 1733 for connector, rc_ver in rc_versions.items(): 1734 rc_entry = _read_rc_registry_entry( 1735 fs, 1736 store=store, 1737 connector=connector, 1738 rc_version=rc_ver, 1739 registry_type=registry_type, 1740 ) 1741 if rc_entry: 1742 docker_repo = rc_entry.get( 1743 "dockerRepository", 1744 f"airbyte/{connector}", 1745 ) 1746 rc_entries[docker_repo] = { 1747 "version": rc_ver, 1748 "entry": rc_entry, 1749 } 1750 if rc_entries: 1751 entries = _apply_release_candidates_to_entries(entries, rc_entries) 1752 _log_progress( 1753 " Injected %d release candidates into %s registry", 1754 len(rc_entries), 1755 registry_type, 1756 ) 1757 1758 all_registry_entries.extend(entries) 1759 entries_by_registry_type[registry_type] = entries 1760 registry_json = _build_global_registry_json(entries) 1761 entry_count = len(registry_json["sources"]) + len(registry_json["destinations"]) 1762 1763 if registry_type == "cloud": 1764 result.cloud_registry_entries = entry_count 1765 else: 1766 result.oss_registry_entries = entry_count 1767 1768 if dry_run: 1769 _log_progress( 1770 " [DRY RUN] Would write %s_registry.json (%d entries)", 1771 registry_type, 1772 entry_count, 1773 ) 1774 else: 1775 content = json.dumps(registry_json, indent=2, sort_keys=True) + "\n" 1776 path_prefix = f"{store.prefix}/" if store.prefix else "" 1777 _write_gcs_blob_with_custom_ttl( 1778 bucket_name=store.bucket, 1779 blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/{registry_type}_registry.json", 1780 content=content, 1781 cache_control=_REGISTRY_INDEX_CACHE_CONTROL, 1782 ) 1783 _log_progress( 1784 " Wrote %s_registry.json (%d entries)", 1785 registry_type, 1786 entry_count, 1787 ) 1788 1789 # --- Step 6a.2: Compile composite registry JSON (superset) --- 1790 _log_progress("Step 6a.2: Compiling composite_registry.json (superset)...") 1791 composite_json = _build_composite_registry_json( 1792 cloud_entries=entries_by_registry_type.get("cloud", []), 1793 oss_entries=entries_by_registry_type.get("oss", []), 1794 ) 1795 composite_entry_count = len(composite_json["sources"]) + len( 1796 composite_json["destinations"] 1797 ) 1798 result.composite_registry_entries = composite_entry_count 1799 if dry_run: 1800 _log_progress( 1801 " [DRY RUN] Would write composite_registry.json (%d entries)", 1802 composite_entry_count, 1803 ) 1804 else: 1805 composite_content = json.dumps(composite_json, indent=2, sort_keys=True) + "\n" 1806 path_prefix = f"{store.prefix}/" if store.prefix else "" 1807 _write_gcs_blob_with_custom_ttl( 1808 bucket_name=store.bucket, 1809 blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/composite_registry.json", 1810 content=composite_content, 1811 cache_control=_REGISTRY_INDEX_CACHE_CONTROL, 1812 ) 1813 _log_progress( 1814 " Wrote composite_registry.json (%d entries)", 1815 composite_entry_count, 1816 ) 1817 1818 # --- Step 6b: Per-connector version indexes (parallel) --- 1819 _log_progress( 1820 "Step 6b: Writing per-connector version indexes (max_workers=%d)...", 1821 _COMPILE_WRITE_MAX_WORKERS, 1822 ) 1823 base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte" 1824 sorted_connectors = sorted(connector_versions) 1825 1826 def _write_one_version_index(connector: str) -> None: 1827 """Build and write a single connector's versions.json.""" 1828 versions = connector_versions[connector] 1829 latest_v = latest_versions.get(connector) 1830 rc_v = rc_versions.get(connector) 1831 index = _build_version_index( 1832 fs, 1833 store=store, 1834 connector=connector, 1835 versions=versions, 1836 yanked=yanked, 1837 latest_version=latest_v, 1838 rc_version=rc_v, 1839 ) 1840 index_path = f"{base}/{connector}/versions.json" 1841 if dry_run: 1842 _log_progress( 1843 " [DRY RUN] Would write %s/versions.json (%d versions)", 1844 connector, 1845 len(versions), 1846 ) 1847 else: 1848 content = json.dumps(index, indent=2, sort_keys=True) + "\n" 1849 with fs.open(index_path, "w") as f: 1850 f.write(content) 1851 1852 with ThreadPoolExecutor(max_workers=_COMPILE_WRITE_MAX_WORKERS) as pool: 1853 futures = { 1854 pool.submit(_write_one_version_index, c): c for c in sorted_connectors 1855 } 1856 for i, future in enumerate(as_completed(futures), 1): 1857 connector = futures[future] 1858 try: 1859 future.result() 1860 result.version_indexes_written += 1 1861 except Exception as exc: 1862 error_msg = f"Failed to write versions.json for {connector}: {exc}" 1863 logger.error(error_msg) 1864 result.errors.append(error_msg) 1865 if i % 100 == 0: 1866 _log_progress( 1867 " Wrote %d / %d version indexes...", i, len(sorted_connectors) 1868 ) 1869 1870 # --- Step 6c: Specs secrets mask (optional) --- 1871 if with_secrets_mask: 1872 _log_progress("Step 6c: Generating specs secrets mask...") 1873 # Reuse entries collected during Step 6a to avoid redundant GCS reads. 1874 secret_names = _extract_secret_property_names(all_registry_entries) 1875 sorted_names = sorted(secret_names) 1876 result.specs_secrets_mask_properties = len(sorted_names) 1877 mask_content = yaml.dump({"properties": sorted_names}, default_flow_style=False) 1878 mask_path = ( 1879 f"{store.bucket_root}/{_REGISTRIES_PREFIX}/{_SPECS_SECRETS_MASK_FILENAME}" 1880 ) 1881 1882 _log_progress( 1883 " Found %d secret properties: %s", 1884 len(sorted_names), 1885 ", ".join(sorted_names), 1886 ) 1887 1888 if dry_run: 1889 _log_progress( 1890 " [DRY RUN] Would write %s", 1891 _SPECS_SECRETS_MASK_FILENAME, 1892 ) 1893 else: 1894 with fs.open(mask_path, "w") as f: 1895 f.write(mask_content) 1896 _log_progress( 1897 " Wrote %s", 1898 _SPECS_SECRETS_MASK_FILENAME, 1899 ) 1900 1901 _log_progress(result.summary()) 1902 return result
Compile the registry: sync latest/ dirs and write index files.
Steps:
- Glob for all
metadata.yamlto discover (connector, version) pairs.- Glob for
version-yank.ymlto build the yanked set.- Compute the latest GA semver per connector.
- Glob for
version=*markers inlatest/dirs for a fast check.- Delete stale
latest/dirs and recursively copy the versioned dir. 5m. (Optional) Legacy migration: delete disabled registry entries.- Write global registry JSONs and per-connector
versions.json. 6c. (Optional) Regeneratespecs_secrets_mask.yaml.
Arguments:
- store: Registry store (bucket + optional prefix).
- connector_name: If provided, only resync
latest/directories for these connectors (steps 4-5). Index rebuilds (steps 6a-6c) always operate on the full set of connectors so that global registry files remain complete. - dry_run: If True, report what would be done without writing.
- with_secrets_mask: If True, regenerate
specs_secrets_mask.yaml. - with_legacy_migration: If set, run the named migration step.
Currently supported:
"v1"— delete{registry_type}.jsonfiles for connectors whoseregistryOverrides.{registry}.enabledisfalse. - force: If True, resync all connectors' latest/ directories even if the existing version marker matches the computed latest version. This is useful when metadata content changes without a version bump.
Returns:
A
CompileResultdescribing what was done.
108def create_progressive_rollout_blob( 109 repo_path: Path, 110 connector_name: str, 111 dry_run: bool = False, 112 bucket_name: str | None = None, 113) -> ConnectorPublishResult: 114 """Create the `release_candidate/metadata.yaml` blob in GCS. 115 116 Copies the versioned metadata (e.g. `1.2.3-rc.1/metadata.yaml` or 117 `2.1.0/metadata.yaml` for GA progressive rollouts) to 118 `release_candidate/metadata.yaml` so the platform knows a progressive 119 rollout is active for this connector. 120 121 The connector's `metadata.yaml` on disk must declare a version that 122 is valid for progressive rollout (i.e. not a `-preview` build). 123 The versioned blob must already exist in GCS (i.e. the version must 124 have been published first). 125 126 Requires `GCS_CREDENTIALS` environment variable to be set. 127 """ 128 if not bucket_name: 129 raise ValueError("bucket_name is required for progressive rollout create.") 130 131 metadata = get_connector_metadata(repo_path, connector_name) 132 version = metadata.docker_image_tag 133 docker_repo = metadata.docker_repository 134 135 if not is_valid_for_progressive_rollout(version): 136 return ConnectorPublishResult( 137 connector=metadata.name, 138 version=version, 139 action="progressive-rollout-create", 140 status="failure", 141 docker_image=f"{docker_repo}:{version}", 142 registry_updated=False, 143 message=f"Version '{version}' is not valid for progressive rollout. " 144 "Preview versions are not supported.", 145 ) 146 147 gcp_connector_dir = f"{METADATA_FOLDER}/{docker_repo}" 148 versioned_path = f"{gcp_connector_dir}/{version}/{METADATA_FILE_NAME}" 149 rc_path = ( 150 f"{gcp_connector_dir}/{RELEASE_CANDIDATE_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}" 151 ) 152 153 if dry_run: 154 return ConnectorPublishResult( 155 connector=metadata.name, 156 version=version, 157 action="progressive-rollout-create", 158 status="dry-run", 159 docker_image=f"{docker_repo}:{version}", 160 registry_updated=False, 161 message=f"Would copy {versioned_path} to {rc_path}", 162 ) 163 164 storage_client = get_gcs_storage_client() 165 gcs_bucket = storage_client.bucket(bucket_name) 166 167 # Verify the versioned blob exists 168 versioned_blob = gcs_bucket.blob(versioned_path) 169 if not versioned_blob.exists(): 170 return ConnectorPublishResult( 171 connector=metadata.name, 172 version=version, 173 action="progressive-rollout-create", 174 status="failure", 175 docker_image=f"{docker_repo}:{version}", 176 registry_updated=False, 177 message=f"Versioned metadata not found: {versioned_path}. " 178 "The version must be published before creating the progressive rollout marker.", 179 ) 180 181 # Copy the versioned metadata to the release_candidate/ path 182 gcs_bucket.copy_blob(versioned_blob, gcs_bucket, rc_path) 183 logger.info( 184 "Created progressive rollout metadata blob: %s (copied from %s)", 185 rc_path, 186 versioned_path, 187 ) 188 189 return ConnectorPublishResult( 190 connector=metadata.name, 191 version=version, 192 action="progressive-rollout-create", 193 status="success", 194 docker_image=f"{docker_repo}:{version}", 195 registry_updated=True, 196 message=f"Created release_candidate/ blob for {metadata.name} at {rc_path} " 197 f"(copied from {versioned_path}).", 198 )
Create the release_candidate/metadata.yaml blob in GCS.
Copies the versioned metadata (e.g. 1.2.3-rc.1/metadata.yaml or
2.1.0/metadata.yaml for GA progressive rollouts) to
release_candidate/metadata.yaml so the platform knows a progressive
rollout is active for this connector.
The connector's metadata.yaml on disk must declare a version that
is valid for progressive rollout (i.e. not a -preview build).
The versioned blob must already exist in GCS (i.e. the version must
have been published first).
Requires GCS_CREDENTIALS environment variable to be set.
201def delete_progressive_rollout_blob( 202 repo_path: Path, 203 connector_name: str, 204 dry_run: bool = False, 205 bucket_name: str | None = None, 206) -> ConnectorPublishResult: 207 """Delete the `release_candidate/` metadata blob from GCS. 208 209 This is the only GCS operation needed when finalizing a progressive 210 rollout (both promote and rollback). The versioned blob (e.g. 211 `1.2.3-rc.1/metadata.yaml`) is intentionally preserved as an 212 audit trail of what was actually deployed during the rollout. 213 214 Requires `GCS_CREDENTIALS` environment variable to be set. 215 """ 216 if not bucket_name: 217 raise ValueError("bucket_name is required for progressive rollout cleanup.") 218 219 metadata = get_connector_metadata(repo_path, connector_name) 220 version = metadata.docker_image_tag 221 docker_repo = metadata.docker_repository 222 223 # NOTE: We intentionally do NOT gate on version format here. 224 # In the promote workflow the on-disk version will already be bumped 225 # to GA (e.g. 1.2.3) before cleanup runs. The rc_path is derived 226 # solely from docker_repo + the fixed RELEASE_CANDIDATE_GCS_FOLDER_NAME 227 # constant, so the on-disk version is irrelevant. 228 229 gcp_connector_dir = f"{METADATA_FOLDER}/{docker_repo}" 230 rc_path = ( 231 f"{gcp_connector_dir}/{RELEASE_CANDIDATE_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}" 232 ) 233 234 if dry_run: 235 return ConnectorPublishResult( 236 connector=metadata.name, 237 version=version, 238 action="progressive-rollout-cleanup", 239 status="dry-run", 240 docker_image=f"{docker_repo}:{version}", 241 registry_updated=False, 242 message=f"Would delete release_candidate/ blob for {metadata.name} " 243 f"(version {version}) at {rc_path}", 244 ) 245 246 storage_client = get_gcs_storage_client() 247 gcs_bucket = storage_client.bucket(bucket_name) 248 rc_blob = gcs_bucket.blob(rc_path) 249 250 if not rc_blob.exists(): 251 logger.info( 252 "Progressive rollout metadata already deleted (idempotent): %s", rc_path 253 ) 254 return ConnectorPublishResult( 255 connector=metadata.name, 256 version=version, 257 action="progressive-rollout-cleanup", 258 status="success", 259 docker_image=f"{docker_repo}:{version}", 260 registry_updated=False, 261 message=f"Progressive rollout metadata already deleted (no-op): {rc_path}", 262 ) 263 264 rc_blob.delete() 265 logger.info("Deleted progressive rollout metadata blob: %s", rc_path) 266 267 return ConnectorPublishResult( 268 connector=metadata.name, 269 version=version, 270 action="progressive-rollout-cleanup", 271 status="success", 272 docker_image=f"{docker_repo}:{version}", 273 registry_updated=True, 274 message=f"Deleted release_candidate/ blob for {metadata.name} at {rc_path}.", 275 )
Delete the release_candidate/ metadata blob from GCS.
This is the only GCS operation needed when finalizing a progressive
rollout (both promote and rollback). The versioned blob (e.g.
1.2.3-rc.1/metadata.yaml) is intentionally preserved as an
audit trail of what was actually deployed during the rollout.
Requires GCS_CREDENTIALS environment variable to be set.
90def find_unpublished_connectors( 91 repo_path: str | Path, 92 bucket_name: str, 93 connector_names: list[str] | None = None, 94) -> AuditResult: 95 """Find connectors whose local version is not published on GCS. 96 97 For each connector in the local checkout, reads `dockerImageTag` from 98 `metadata.yaml` and checks whether `metadata/<docker-repo>/<version>/metadata.yaml` 99 exists in the GCS bucket. Connectors that are archived, disabled on all 100 registries, or have RC versions are skipped. 101 102 Args: 103 repo_path: Path to the Airbyte monorepo checkout. 104 bucket_name: GCS bucket name to check against. 105 connector_names: Optional list of connector names to check. 106 If `None`, discovers all connectors in the repo. 107 108 Returns: 109 An `AuditResult` containing unpublished connectors and metadata. 110 """ 111 repo_path = Path(repo_path) 112 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 113 114 if not connectors_dir.exists(): 115 raise ValueError(f"Connectors directory not found: {connectors_dir}") 116 117 # Discover connector names if not provided 118 if connector_names is None: 119 connector_names = sorted( 120 d.name 121 for d in connectors_dir.iterdir() 122 if d.is_dir() and (d / METADATA_FILE_NAME).exists() 123 ) 124 125 result = AuditResult() 126 127 # Collect connectors and their versions first, then batch-check GCS 128 to_check: list[tuple[str, str]] = [] # (connector_name, version) 129 130 for name in connector_names: 131 metadata_path = connectors_dir / name / METADATA_FILE_NAME 132 metadata = _read_local_metadata(metadata_path) 133 if metadata is None: 134 result.errors.append(f"{name}: metadata.yaml not found or unreadable") 135 continue 136 137 if _is_archived(metadata): 138 result.skipped_archived.append(name) 139 continue 140 141 if _is_disabled_on_all_registries(metadata): 142 result.skipped_disabled.append(name) 143 continue 144 145 data = metadata.get("data", {}) 146 version = data.get("dockerImageTag") 147 if not version: 148 result.errors.append(f"{name}: no dockerImageTag in metadata") 149 continue 150 151 if _is_rc_version(version): 152 result.skipped_rc.append(name) 153 continue 154 155 to_check.append((name, version)) 156 157 if not to_check: 158 result.checked_count = 0 159 return result 160 161 # Check GCS for each connector version 162 storage_client = get_gcs_storage_client() 163 bucket = storage_client.bucket(bucket_name) 164 165 for name, version in to_check: 166 result.checked_count += 1 167 blob_path = f"{METADATA_FOLDER}/airbyte/{name}/{version}/{METADATA_FILE_NAME}" 168 blob = bucket.blob(blob_path) 169 170 try: 171 exists = blob.exists() 172 except Exception as e: 173 result.errors.append(f"{name}: GCS check failed: {e}") 174 continue 175 176 if not exists: 177 logger.info( 178 "Unpublished: %s version %s (checked %s)", 179 name, 180 version, 181 blob_path, 182 ) 183 result.unpublished.append( 184 UnpublishedConnector(connector_name=name, local_version=version) 185 ) 186 187 logger.info( 188 "Audit complete: %d checked, %d unpublished, %d archived-skipped, %d disabled-skipped, %d rc-skipped", 189 result.checked_count, 190 len(result.unpublished), 191 len(result.skipped_archived), 192 len(result.skipped_disabled), 193 len(result.skipped_rc), 194 ) 195 196 return result
Find connectors whose local version is not published on GCS.
For each connector in the local checkout, reads dockerImageTag from
metadata.yaml and checks whether metadata/<docker-repo>/<version>/metadata.yaml
exists in the GCS bucket. Connectors that are archived, disabled on all
registries, or have RC versions are skipped.
Arguments:
- repo_path: Path to the Airbyte monorepo checkout.
- bucket_name: GCS bucket name to check against.
- connector_names: Optional list of connector names to check.
If
None, discovers all connectors in the repo.
Returns:
An
AuditResultcontaining unpublished connectors and metadata.
617def generate_version_artifacts( 618 metadata_file: Path, 619 docker_image: str, 620 output_dir: Path | None = None, 621 repo_root: Path | None = None, 622 dry_run: bool = False, 623 with_validate: bool = True, 624 with_dependency_dump: bool = True, 625 with_sbom: bool = True, 626) -> GenerateResult: 627 """Generate all version artifacts for a connector release. 628 629 Artifacts are enriched with git commit info, SBOM URL, and (when applicable) 630 components SHA before writing. Validation is run after generation by default. 631 632 Args: 633 metadata_file: Path to the connector's `metadata.yaml`. 634 docker_image: Docker image to run spec against (e.g. `airbyte/source-faker:6.2.38`). 635 output_dir: Directory to write artifacts to. If `None`, a temp directory is created. 636 repo_root: Root of the Airbyte repo checkout (for resolving `doc.md`). 637 If `None`, inferred by walking up from `metadata_file`. 638 dry_run: If `True`, report what would be generated without writing or running docker. 639 with_validate: If `True` (default), run metadata validators after generation. 640 Pass `False` (`--no-validate`) to skip. 641 with_dependency_dump: If `True` (default), generate `dependencies.json` 642 for Python connectors. Pass `False` (`--no-dependency-dump`) to skip. 643 with_sbom: If `True` (default), generate `spdx.json` (SBOM) for 644 connectors. Pass `False` (`--no-sbom`) to skip. 645 646 Returns: 647 A `GenerateResult` describing what was produced. 648 """ 649 # --- Load metadata --- 650 if not metadata_file.exists(): 651 raise FileNotFoundError(f"Metadata file not found: {metadata_file}") 652 653 raw_metadata: dict[str, Any] = yaml.safe_load(metadata_file.read_text()) 654 metadata_data: dict[str, Any] = raw_metadata.get("data", {}) 655 656 connector_name = metadata_data.get("dockerRepository", "unknown").replace( 657 "airbyte/", "" 658 ) 659 version = metadata_data.get("dockerImageTag", "unknown") 660 661 # --- Resolve output directory --- 662 if output_dir is None: 663 output_dir = Path( 664 tempfile.mkdtemp(prefix=f"connector-artifacts-{connector_name}-{version}-") 665 ) 666 output_dir.mkdir(parents=True, exist_ok=True) 667 668 result = GenerateResult( 669 connector_name=connector_name, 670 version=version, 671 docker_image=docker_image, 672 output_dir=str(output_dir), 673 dry_run=dry_run, 674 ) 675 676 if dry_run: 677 logger.info("[DRY RUN] Would generate artifacts to %s", output_dir) 678 result.artifacts_written = [ 679 "metadata.yaml", 680 "icon.svg", 681 "doc.md", 682 "cloud.json", 683 "oss.json", 684 "manifest.yaml (if present)", 685 "components.zip (if components.py present)", 686 "components.zip.sha256 (if components.py present)", 687 f"version={version}", 688 ] 689 if with_sbom: 690 result.artifacts_written.append(SBOM_FILE_NAME) 691 if with_dependency_dump: 692 result.artifacts_written.append("dependencies.json (if Python connector)") 693 return result 694 695 # --- Prepare metadata output --- 696 metadata_out = output_dir / "metadata.yaml" 697 result.artifacts_written.append("metadata.yaml") 698 699 # --- Enrich metadata with git info *before* building registry entries so 700 # that `generated.git` propagates into `cloud.json` / `oss.json`. --- 701 raw_metadata = _enrich_metadata_git_info(raw_metadata, metadata_file) 702 703 # --- Generate SBOM from the connector Docker image --- 704 sbom_generated = False 705 if not with_sbom: 706 logger.info("SBOM generation disabled via --no-sbom.") 707 else: 708 try: 709 sbom_path = generate_sbom(docker_image, output_dir) 710 except RuntimeError as exc: 711 logger.warning("SBOM generation failed (non-fatal): %s", exc) 712 except (FileNotFoundError, subprocess.TimeoutExpired): 713 logger.warning("Docker not available or SBOM generation timed out.") 714 else: 715 result.artifacts_written.append(SBOM_FILE_NAME) 716 sbom_generated = True 717 logger.info("Generated SBOM: %s", sbom_path) 718 719 # --- Enrich metadata with SBOM URL --- 720 raw_metadata = _enrich_metadata_sbom_url( 721 raw_metadata, sbom_generated=sbom_generated 722 ) 723 724 # --- Run docker spec for cloud and oss --- 725 specs: dict[str, dict[str, Any]] = {} 726 for mode in VALID_REGISTRIES: 727 try: 728 specs[mode] = _run_docker_spec(docker_image, mode) 729 logger.info("Got %s spec from docker image %s", mode, docker_image) 730 except RuntimeError as exc: 731 error_msg = f"Failed to get {mode} spec: {exc}" 732 logger.error(error_msg) 733 result.errors.append(error_msg) 734 735 # --- Generate dependencies.json for Python connectors --- 736 # This must happen *before* building registry entries so that the 737 # local dependencies data can be used for packageInfo without a GCS 738 # round-trip. 739 local_dependencies: dict[str, Any] | None = None 740 if not with_dependency_dump: 741 logger.info("Dependency generation disabled via --no-dependency-dump.") 742 elif _is_python_connector(metadata_data): 743 logger.info("Python connector detected — generating dependencies.json") 744 local_dependencies = generate_python_dependencies_file( 745 metadata_data=metadata_data, 746 docker_image=docker_image, 747 output_dir=output_dir, 748 ) 749 if local_dependencies is not None: 750 result.artifacts_written.append(CONNECTOR_DEPENDENCY_FILE_NAME) 751 else: 752 logger.info( 753 "Non-Python connector (%s) — skipping dependencies.json generation.", 754 connector_name, 755 ) 756 757 # --- Check if registryOverrides enable each registry --- 758 registry_overrides = metadata_data.get("registryOverrides", {}) 759 760 # --- Generate registry entries (cloud.json, oss.json) --- 761 for registry_type in VALID_REGISTRIES: 762 overrides = registry_overrides.get(registry_type, {}) 763 enabled = overrides.get("enabled", False) 764 if not enabled: 765 logger.info( 766 "Registry type %s is not enabled for %s, skipping %s.json generation.", 767 registry_type, 768 connector_name, 769 registry_type, 770 ) 771 continue 772 773 spec = specs.get(registry_type) 774 if spec is None: 775 error_msg = ( 776 f"Cannot generate {registry_type}.json: no spec available " 777 f"(docker spec for {registry_type} failed or was not run)." 778 ) 779 result.errors.append(error_msg) 780 continue 781 782 registry_entry = _build_registry_entry( 783 metadata_data, 784 registry_type, 785 spec, 786 local_dependencies=local_dependencies, 787 ) 788 789 out_path = output_dir / f"{registry_type}.json" 790 out_path.write_text( 791 json.dumps(registry_entry, indent=2, sort_keys=True, default=_json_serial) 792 + "\n" 793 ) 794 result.artifacts_written.append(f"{registry_type}.json") 795 logger.info("Wrote %s", out_path) 796 797 # --- Copy icon.svg (sibling of metadata.yaml in the connector directory) --- 798 icon_source = metadata_file.parent / "icon.svg" 799 if icon_source.is_file(): 800 icon_out = output_dir / "icon.svg" 801 shutil.copy2(icon_source, icon_out) 802 result.artifacts_written.append("icon.svg") 803 logger.info("Wrote %s", icon_out) 804 else: 805 logger.warning("No icon.svg found at %s.", icon_source) 806 result.errors.append("Icon file is missing.") 807 808 # --- Copy doc.md (derived from documentationUrl in metadata) --- 809 if repo_root is None: 810 # Infer repo root by walking up from metadata_file looking for .git 811 # Note: .git can be a directory (normal clone) or a file (git worktree) 812 # Resolve to absolute path first so the walk-up works with relative paths. 813 candidate = metadata_file.resolve().parent 814 while candidate != candidate.parent: 815 git_indicator = candidate / ".git" 816 if git_indicator.is_dir() or git_indicator.is_file(): 817 repo_root = candidate 818 break 819 candidate = candidate.parent 820 821 if repo_root is not None: 822 doc_source = _resolve_doc_path(metadata_data, repo_root) 823 if doc_source is not None and doc_source.is_file(): 824 doc_out = output_dir / DOC_FILE_NAME 825 shutil.copy2(doc_source, doc_out) 826 result.artifacts_written.append(DOC_FILE_NAME) 827 logger.info("Wrote %s (from %s)", doc_out, doc_source) 828 else: 829 error_msg = ( 830 f"Documentation file not found: {doc_source}. " 831 f"Derived from documentationUrl in metadata." 832 ) 833 logger.error(error_msg) 834 result.errors.append(error_msg) 835 else: 836 error_msg = "Cannot resolve doc.md: repo root not found." 837 logger.error(error_msg) 838 result.errors.append(error_msg) 839 840 # --- Copy manifest.yaml (from connector root, if present) --- 841 connector_dir = metadata_file.parent 842 manifest_source = connector_dir / MANIFEST_FILE_NAME 843 components_sha256: str | None = None 844 if manifest_source.is_file(): 845 manifest_out = output_dir / MANIFEST_FILE_NAME 846 shutil.copy2(manifest_source, manifest_out) 847 result.artifacts_written.append(MANIFEST_FILE_NAME) 848 logger.info("Wrote %s", manifest_out) 849 850 # --- Generate components.zip if components.py exists --- 851 components_source = connector_dir / COMPONENTS_PY_FILE_NAME 852 if components_source.is_file(): 853 zip_path, sha256_path = _create_components_zip( 854 manifest_path=manifest_source, 855 components_path=components_source, 856 output_dir=output_dir, 857 ) 858 result.artifacts_written.append(COMPONENTS_ZIP_FILE_NAME) 859 result.artifacts_written.append(COMPONENTS_ZIP_SHA256_FILE_NAME) 860 logger.info("Wrote %s and %s", zip_path, sha256_path) 861 # Read back the SHA256 for metadata enrichment 862 components_sha256 = sha256_path.read_text().strip() 863 else: 864 logger.info( 865 "No manifest.yaml at %s — skipping manifest artifacts.", manifest_source 866 ) 867 868 # --- Enrich metadata with components SHA (after zip creation) --- 869 raw_metadata = _enrich_metadata_components_sha(raw_metadata, components_sha256) 870 871 # --- Write final enriched metadata.yaml --- 872 # Use sort_keys=True to match the legacy pipeline's alphabetical key ordering. 873 # After Registry 2.0 launches we are free to change the key ordering. 874 metadata_out.write_text( 875 yaml.dump(raw_metadata, default_flow_style=False, sort_keys=True) 876 ) 877 logger.info("Wrote enriched %s", metadata_out) 878 879 # --- Write version marker file (version=<semver>) --- 880 # This zero-byte file is used by the compile step as a fast-check marker. 881 # Including it in the generated artifacts means `latest/` gets the marker 882 # for free via a recursive copy, removing the need for a separate write. 883 marker_file = output_dir / f"version={version}" 884 marker_file.write_bytes(b"") 885 result.artifacts_written.append(f"version={version}") 886 logger.info("Wrote version marker %s", marker_file) 887 888 # --- Validate metadata (after generation) --- 889 if with_validate: 890 logger.info("Running post-generation validation...") 891 doc_path: str | None = None 892 if repo_root is not None: 893 resolved = _resolve_doc_path(metadata_data, repo_root) 894 doc_path = str(resolved) if resolved else None 895 validation = validate_metadata( 896 metadata_data=metadata_data, 897 opts=ValidateOptions(docs_path=doc_path), 898 ) 899 if not validation.passed: 900 for err in validation.errors: 901 logger.error("Validation error: %s", err) 902 result.validation_errors = validation.errors 903 else: 904 logger.info("Validation passed (%d validators).", validation.validators_run) 905 906 return result
Generate all version artifacts for a connector release.
Artifacts are enriched with git commit info, SBOM URL, and (when applicable) components SHA before writing. Validation is run after generation by default.
Arguments:
- metadata_file: Path to the connector's
metadata.yaml. - docker_image: Docker image to run spec against (e.g.
airbyte/source-faker:6.2.38). - output_dir: Directory to write artifacts to. If
None, a temp directory is created. - repo_root: Root of the Airbyte repo checkout (for resolving
doc.md). IfNone, inferred by walking up frommetadata_file. - dry_run: If
True, report what would be generated without writing or running docker. - with_validate: If
True(default), run metadata validators after generation. PassFalse(--no-validate) to skip. - with_dependency_dump: If
True(default), generatedependencies.jsonfor Python connectors. PassFalse(--no-dependency-dump) to skip. - with_sbom: If
True(default), generatespdx.json(SBOM) for connectors. PassFalse(--no-sbom) to skip.
Returns:
A
GenerateResultdescribing what was produced.
41def get_connector_metadata(repo_path: Path, connector_name: str) -> ConnectorMetadata: 42 """Read connector metadata from metadata.yaml. 43 44 Args: 45 repo_path: Path to the Airbyte monorepo. 46 connector_name: The connector technical name (e.g., 'source-github'). 47 48 Returns: 49 ConnectorMetadata object with the connector's metadata. 50 51 Raises: 52 FileNotFoundError: If the connector directory or metadata file doesn't exist. 53 """ 54 connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name 55 if not connector_dir.exists(): 56 raise FileNotFoundError(f"Connector directory not found: {connector_dir}") 57 58 metadata_file = connector_dir / METADATA_FILE_NAME 59 if not metadata_file.exists(): 60 raise FileNotFoundError(f"Metadata file not found: {metadata_file}") 61 62 with open(metadata_file) as f: 63 metadata = yaml.safe_load(f) 64 65 data = metadata.get("data", {}) 66 return ConnectorMetadata( 67 name=connector_name, 68 docker_repository=data.get("dockerRepository", f"airbyte/{connector_name}"), 69 docker_image_tag=data.get("dockerImageTag", "unknown"), 70 support_level=data.get("supportLevel"), 71 definition_id=data.get("definitionId"), 72 )
Read connector metadata from metadata.yaml.
Arguments:
- repo_path: Path to the Airbyte monorepo.
- connector_name: The connector technical name (e.g., 'source-github').
Returns:
ConnectorMetadata object with the connector's metadata.
Raises:
- FileNotFoundError: If the connector directory or metadata file doesn't exist.
278def get_gcs_publish_path( 279 connector_name: str, 280 artifact_type: str, 281 version: str = LATEST_GCS_FOLDER_NAME, 282) -> str: 283 """Compute the GCS path for a connector artifact for publishing. 284 285 All connectors use the airbyte/{connector_name} convention. 286 """ 287 artifact_files = { 288 "metadata": METADATA_FILE_NAME, 289 "spec": "spec.json", 290 "icon": "icon.svg", 291 "doc": "doc.md", 292 } 293 294 if artifact_type not in artifact_files: 295 raise ValueError( 296 f"Unknown artifact type: {artifact_type}. " 297 f"Valid types are: {', '.join(artifact_files.keys())}" 298 ) 299 300 file_name = artifact_files[artifact_type] 301 return f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{file_name}"
Compute the GCS path for a connector artifact for publishing.
All connectors use the airbyte/{connector_name} convention.
202def get_registry(store: RegistryStore) -> Registry: 203 """Factory for obtaining the right store implementation.""" 204 205 if store.store_type == StoreType.CORAL: 206 from airbyte_ops_mcp.registry.coral_registry_store import CoralRegistry 207 208 return CoralRegistry(store) 209 210 if store.store_type == StoreType.SONAR: 211 from airbyte_ops_mcp.registry.sonar_registry_store import SonarRegistry 212 213 return SonarRegistry(store) 214 215 # defensive: StoreType is an Enum, but keep this for readability 216 raise ValueError(f"Unknown store type: {store.store_type}")
Factory for obtaining the right store implementation.
43def get_registry_entry( 44 connector_name: str, 45 bucket_name: str, 46 version: str = LATEST_GCS_FOLDER_NAME, 47) -> dict[str, Any]: 48 """Get a connector's registry entry from GCS. 49 50 Reads metadata for a connector from the registry stored in GCS. 51 52 Args: 53 connector_name: The connector name (e.g., "source-faker", "destination-postgres") 54 bucket_name: Name of the GCS bucket containing the registry 55 version: Version folder name (e.g., "latest", "1.2.3") 56 57 Returns: 58 dict: The connector's metadata as a dictionary 59 60 Raises: 61 ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure 62 FileNotFoundError: If the connector metadata is not found in the registry 63 yaml.YAMLError: If the metadata file contains invalid YAML syntax 64 """ 65 storage_client = get_gcs_storage_client() 66 bucket = storage_client.bucket(bucket_name) 67 68 # Construct the path to the metadata file 69 # Pattern: metadata/airbyte/{connector_name}/{version}/metadata.yaml 70 blob_path = ( 71 f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{METADATA_FILE_NAME}" 72 ) 73 blob = bucket.blob(blob_path) 74 75 logger.info(f"Reading registry entry for {connector_name} from {blob_path}") 76 77 # Read the file 78 content = safe_read_gcs_file(blob) 79 if content is None: 80 raise FileNotFoundError( 81 f"Connector metadata not found in registry: {connector_name}. " 82 f"Checked path: {blob_path}" 83 ) 84 85 # Parse YAML 86 try: 87 metadata = yaml.safe_load(content) 88 if metadata is None or not isinstance(metadata, dict): 89 raise ValueError(f"Metadata file {blob_path} has an invalid structure") 90 return metadata 91 except yaml.YAMLError as e: 92 logger.error( 93 "Failed to parse metadata for %s from %s: %s", 94 connector_name, 95 blob_path, 96 e, 97 ) 98 raise
Get a connector's registry entry from GCS.
Reads metadata for a connector from the registry stored in GCS.
Arguments:
- connector_name: The connector name (e.g., "source-faker", "destination-postgres")
- bucket_name: Name of the GCS bucket containing the registry
- version: Version folder name (e.g., "latest", "1.2.3")
Returns:
dict: The connector's metadata as a dictionary
Raises:
- ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure
- FileNotFoundError: If the connector metadata is not found in the registry
- yaml.YAMLError: If the metadata file contains invalid YAML syntax
101def get_registry_spec( 102 connector_name: str, 103 bucket_name: str, 104 version: str = LATEST_GCS_FOLDER_NAME, 105) -> dict[str, Any]: 106 """Get a connector's spec from GCS. 107 108 Reads the connector specification from the registry stored in GCS. 109 110 Args: 111 connector_name: The connector name (e.g., "source-faker", "destination-postgres") 112 bucket_name: Name of the GCS bucket containing the registry 113 version: Version folder name (e.g., "latest", "1.2.3") 114 115 Returns: 116 dict: The connector's spec as a dictionary 117 118 Raises: 119 ValueError: If GCS credentials are not configured, or if the spec is not a JSON object 120 FileNotFoundError: If the connector spec is not found in the registry 121 json.JSONDecodeError: If the spec file contains invalid JSON syntax 122 """ 123 storage_client = get_gcs_storage_client() 124 bucket = storage_client.bucket(bucket_name) 125 126 # Construct the path to the spec file 127 # Pattern: metadata/airbyte/{connector_name}/{version}/spec.json 128 blob_path = f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{SPEC_FILE_NAME}" 129 blob = bucket.blob(blob_path) 130 131 logger.info(f"Reading spec for {connector_name} from {blob_path}") 132 133 # Read the file 134 content = safe_read_gcs_file(blob) 135 if content is None: 136 raise FileNotFoundError( 137 f"Connector spec not found in registry: {connector_name}. " 138 f"Checked path: {blob_path}" 139 ) 140 141 # Parse JSON 142 try: 143 spec = json.loads(content) 144 if spec is None or not isinstance(spec, dict): 145 raise ValueError( 146 f"Spec file for {connector_name} at {blob_path} is not a JSON object" 147 ) 148 return spec 149 except json.JSONDecodeError as e: 150 logger.error( 151 "Failed to parse spec for %s from %s: %s", 152 connector_name, 153 blob_path, 154 e, 155 ) 156 raise
Get a connector's spec from GCS.
Reads the connector specification from the registry stored in GCS.
Arguments:
- connector_name: The connector name (e.g., "source-faker", "destination-postgres")
- bucket_name: Name of the GCS bucket containing the registry
- version: Version folder name (e.g., "latest", "1.2.3")
Returns:
dict: The connector's spec as a dictionary
Raises:
- ValueError: If GCS credentials are not configured, or if the spec is not a JSON object
- FileNotFoundError: If the connector spec is not found in the registry
- json.JSONDecodeError: If the spec file contains invalid JSON syntax
75def is_valid_for_progressive_rollout(version: str) -> bool: 76 """Check if a version string is valid for progressive rollout. 77 78 Currently rejects only `-preview` versions. All other semver 79 versions (GA and RC alike) are accepted. This gate is intentionally 80 kept generic so it can be tightened or turned into a no-op in the 81 future. 82 83 Args: 84 version: The version string to check. 85 86 Returns: 87 True if the version may be used in a progressive rollout, 88 False if the version format is explicitly disallowed. 89 """ 90 return "-preview." not in version
Check if a version string is valid for progressive rollout.
Currently rejects only -preview versions. All other semver
versions (GA and RC alike) are accepted. This gate is intentionally
kept generic so it can be tightened or turned into a no-op in the
future.
Arguments:
- version: The version string to check.
Returns:
True if the version may be used in a progressive rollout, False if the version format is explicitly disallowed.
325def list_connector_versions(connector_name: str, bucket_name: str) -> list[str]: 326 """List all versions of a connector in the registry. 327 328 Scans the GCS bucket to find all versions of a specific connector. 329 330 Args: 331 connector_name: The connector name (e.g., "source-faker") 332 bucket_name: Name of the GCS bucket containing the registry 333 334 Returns: 335 list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate') 336 337 Raises: 338 ValueError: If GCS credentials are not configured 339 """ 340 storage_client = get_gcs_storage_client() 341 bucket = storage_client.bucket(bucket_name) 342 343 # List all blobs matching the pattern: metadata/airbyte/{connector_name}/*/metadata.yaml 344 glob_pattern = f"{METADATA_FOLDER}/airbyte/{connector_name}/*/{METADATA_FILE_NAME}" 345 logger.info(f"Listing versions for {connector_name} with pattern: {glob_pattern}") 346 347 try: 348 blobs = bucket.list_blobs(match_glob=glob_pattern) 349 except Exception as e: 350 logger.error(f"Error listing blobs in bucket {bucket_name}: {e}") 351 raise 352 353 # Extract versions from blob paths 354 # Path format: metadata/airbyte/{connector-name}/{version}/metadata.yaml 355 versions: set[str] = set() 356 for blob in blobs: 357 path_parts = blob.name.split("/") 358 # Path should be: metadata / airbyte / connector-name / version / metadata.yaml 359 if len(path_parts) >= 5: 360 version = path_parts[3] 361 # Exclude special folders 362 if version not in ("latest", "release_candidate"): 363 versions.add(version) 364 365 return sorted(versions)
List all versions of a connector in the registry.
Scans the GCS bucket to find all versions of a specific connector.
Arguments:
- connector_name: The connector name (e.g., "source-faker")
- bucket_name: Name of the GCS bucket containing the registry
Returns:
list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate')
Raises:
- ValueError: If GCS credentials are not configured
159def list_registry_connectors(bucket_name: str) -> list[str]: 160 """List all connectors in the registry. 161 162 Scans the GCS bucket to find all connectors that have metadata files. 163 164 Args: 165 bucket_name: Name of the GCS bucket containing the registry 166 167 Returns: 168 list[str]: Sorted list of connector names 169 170 Raises: 171 ValueError: If GCS credentials are not configured 172 """ 173 storage_client = get_gcs_storage_client() 174 bucket = storage_client.bucket(bucket_name) 175 176 # List all blobs matching the pattern: metadata/airbyte/*/latest/metadata.yaml 177 glob_pattern = ( 178 f"{METADATA_FOLDER}/airbyte/*/{LATEST_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}" 179 ) 180 logger.info(f"Listing connectors with pattern: {glob_pattern}") 181 182 try: 183 blobs = bucket.list_blobs(match_glob=glob_pattern) 184 except Exception as e: 185 logger.error(f"Error listing blobs in bucket {bucket_name}: {e}") 186 raise 187 188 # Extract connector names from blob paths 189 # Path format: metadata/airbyte/{connector-name}/latest/metadata.yaml 190 connector_names: set[str] = set() 191 for blob in blobs: 192 path_parts = blob.name.split("/") 193 # Path should be: metadata / airbyte / connector-name / latest / metadata.yaml 194 if len(path_parts) >= 5: 195 connector_name = path_parts[2] 196 connector_names.add(connector_name) 197 198 return sorted(connector_names)
List all connectors in the registry.
Scans the GCS bucket to find all connectors that have metadata files.
Arguments:
- bucket_name: Name of the GCS bucket containing the registry
Returns:
list[str]: Sorted list of connector names
Raises:
- ValueError: If GCS credentials are not configured
201def list_registry_connectors_filtered( 202 bucket_name: str, 203 *, 204 support_level: SupportLevel | None = None, 205 min_support_level: SupportLevel | None = None, 206 connector_type: ConnectorType | None = None, 207 language: ConnectorLanguage | None = None, 208 prefix: str = "", 209) -> list[str]: 210 """List connectors from the compiled cloud registry index with filtering. 211 212 When any filter is applied, reads the compiled `cloud_registry.json` index 213 instead of globbing individual metadata blobs. This is significantly faster 214 because the index is a single JSON file containing all connector entries. 215 216 When no filters are applied, falls back to the existing glob-based search 217 which captures all connectors (including OSS-only connectors not in the 218 Cloud index). 219 220 Args: 221 bucket_name: Name of the GCS bucket containing the registry. 222 support_level: Exact support level to match (e.g., `SupportLevel.CERTIFIED`). 223 min_support_level: Minimum support level threshold. Returns connectors 224 at or above this level. 225 connector_type: Filter by connector type (`ConnectorType.SOURCE` or 226 `ConnectorType.DESTINATION`). 227 language: Filter by implementation language (e.g., `ConnectorLanguage.PYTHON`). 228 prefix: Optional bucket prefix (e.g., `"aj-test100"`). 229 230 Returns: 231 Sorted list of connector technical names (e.g., `"source-github"`). 232 233 Raises: 234 ValueError: If `support_level` and `min_support_level` are both provided. 235 """ 236 has_filters = any([support_level, min_support_level, connector_type, language]) 237 238 if not has_filters: 239 return list_registry_connectors(bucket_name=bucket_name) 240 241 if support_level and min_support_level: 242 raise ValueError( 243 "Cannot specify both `support_level` and `min_support_level`. " 244 "Use `support_level` for an exact match or `min_support_level` for a threshold." 245 ) 246 247 entries = _read_cloud_registry_index(bucket_name=bucket_name, prefix=prefix) 248 249 # Apply support_level exact match 250 if support_level: 251 entries = [e for e in entries if e.get("supportLevel") == support_level] 252 253 # Apply min_support_level threshold 254 if min_support_level: 255 threshold = min_support_level.precedence 256 known_levels = {m.value for m in SupportLevel} 257 entries = [ 258 e 259 for e in entries 260 if e.get("supportLevel") 261 and e["supportLevel"] in known_levels 262 and SupportLevel(e["supportLevel"]).precedence >= threshold 263 ] 264 265 # Apply connector_type filter 266 if connector_type == ConnectorType.SOURCE: 267 entries = [e for e in entries if "sourceDefinitionId" in e] 268 elif connector_type == ConnectorType.DESTINATION: 269 entries = [e for e in entries if "destinationDefinitionId" in e] 270 271 # Apply language filter 272 if language: 273 entries = [e for e in entries if e.get("language") == language] 274 275 # Extract connector names from dockerRepository (e.g., "airbyte/source-github" -> "source-github") 276 names: set[str] = set() 277 for entry in entries: 278 docker_repo = entry.get("dockerRepository", "") 279 if "/" in docker_repo: 280 names.add(docker_repo.split("/", 1)[1]) 281 elif docker_repo: 282 names.add(docker_repo) 283 284 return sorted(names)
List connectors from the compiled cloud registry index with filtering.
When any filter is applied, reads the compiled cloud_registry.json index
instead of globbing individual metadata blobs. This is significantly faster
because the index is a single JSON file containing all connector entries.
When no filters are applied, falls back to the existing glob-based search which captures all connectors (including OSS-only connectors not in the Cloud index).
Arguments:
- bucket_name: Name of the GCS bucket containing the registry.
- support_level: Exact support level to match (e.g.,
SupportLevel.CERTIFIED). - min_support_level: Minimum support level threshold. Returns connectors at or above this level.
- connector_type: Filter by connector type (
ConnectorType.SOURCEorConnectorType.DESTINATION). - language: Filter by implementation language (e.g.,
ConnectorLanguage.PYTHON). - prefix: Optional bucket prefix (e.g.,
"aj-test100").
Returns:
Sorted list of connector technical names (e.g.,
"source-github").
Raises:
- ValueError: If
support_levelandmin_support_levelare both provided.
304def publish_connector_metadata( 305 connector_name: str, 306 metadata: dict[str, Any], 307 bucket_name: str, 308 version: str, 309 update_latest: bool = True, 310 dry_run: bool = False, 311) -> MetadataPublishResult: 312 """Publish connector metadata to GCS. 313 314 Uploads the metadata to the registry bucket at a versioned path, and optionally 315 also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading 316 unchanged files. 317 318 Requires GCS_CREDENTIALS environment variable to be set. 319 """ 320 if not isinstance(metadata, dict): 321 raise ValueError("Metadata must be a dictionary") 322 323 if "data" not in metadata: 324 raise ValueError("Metadata must contain 'data' field") 325 326 # Construct GCS paths using airbyte/{connector_name} convention 327 versioned_blob_path = get_gcs_publish_path(connector_name, "metadata", version) 328 latest_blob_path = get_gcs_publish_path( 329 connector_name, "metadata", LATEST_GCS_FOLDER_NAME 330 ) 331 332 if dry_run: 333 message = f"[DRY RUN] Would upload metadata to gs://{bucket_name}/{versioned_blob_path}" 334 if update_latest: 335 message += f" and gs://{bucket_name}/{latest_blob_path}" 336 logger.info(message) 337 return MetadataPublishResult( 338 connector_name=connector_name, 339 version=version, 340 bucket_name=bucket_name, 341 versioned_path=versioned_blob_path, 342 latest_path=latest_blob_path if update_latest else None, 343 versioned_uploaded=False, 344 latest_uploaded=False, 345 status="dry-run", 346 message=message, 347 ) 348 349 # Get GCS client and bucket 350 storage_client = get_gcs_storage_client() 351 bucket = storage_client.bucket(bucket_name) 352 353 # Write metadata to temp file 354 with tempfile.NamedTemporaryFile( 355 mode="w", suffix=".yaml", delete=False 356 ) as tmp_file: 357 yaml.dump(metadata, tmp_file) 358 tmp_path = Path(tmp_file.name) 359 360 try: 361 # Upload versioned file 362 versioned_uploaded, _ = upload_file_if_changed( 363 local_file_path=tmp_path, 364 bucket=bucket, 365 blob_path=versioned_blob_path, 366 disable_cache=True, 367 ) 368 369 if versioned_uploaded: 370 logger.info( 371 f"Uploaded metadata for {connector_name} v{version} to {versioned_blob_path}" 372 ) 373 else: 374 logger.info( 375 f"Versioned metadata for {connector_name} v{version} is already up to date" 376 ) 377 378 # Optionally update latest pointer 379 latest_uploaded = False 380 if update_latest: 381 latest_uploaded, _ = upload_file_if_changed( 382 local_file_path=tmp_path, 383 bucket=bucket, 384 blob_path=latest_blob_path, 385 disable_cache=True, 386 ) 387 if latest_uploaded: 388 logger.info(f"Updated latest pointer for {connector_name}") 389 else: 390 logger.info( 391 f"Latest pointer for {connector_name} is already up to date" 392 ) 393 finally: 394 # Clean up temp file even if upload fails 395 tmp_path.unlink(missing_ok=True) 396 397 # Determine status 398 if versioned_uploaded or latest_uploaded: 399 status = "success" 400 message = f"Published metadata for {connector_name} v{version}" 401 if versioned_uploaded: 402 message += f" to {versioned_blob_path}" 403 if latest_uploaded: 404 message += " and updated latest" 405 else: 406 status = "already-up-to-date" 407 message = f"Metadata for {connector_name} v{version} is already up to date" 408 409 return MetadataPublishResult( 410 connector_name=connector_name, 411 version=version, 412 bucket_name=bucket_name, 413 versioned_path=versioned_blob_path, 414 latest_path=latest_blob_path if update_latest else None, 415 versioned_uploaded=versioned_uploaded, 416 latest_uploaded=latest_uploaded, 417 status=status, 418 message=message, 419 )
Publish connector metadata to GCS.
Uploads the metadata to the registry bucket at a versioned path, and optionally also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading unchanged files.
Requires GCS_CREDENTIALS environment variable to be set.
115def publish_version_artifacts( 116 connector_name: str, 117 version: str, 118 artifacts_dir: Path, 119 store: RegistryStore, 120 dry_run: bool = False, 121 with_validate: bool = True, 122) -> PublishArtifactsResult: 123 """Publish locally generated artifacts to a GCS registry bucket. 124 125 Uses `gcsfs.GCSFileSystem` to upload the local *artifacts_dir* to the 126 versioned path inside the target GCS bucket. 127 128 The target GCS path is: 129 `gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/` 130 131 Before uploading, this function validates that the `connector_name` 132 (derived from the connector directory) matches the `dockerRepository` 133 declared in `metadata.yaml`. A mismatch would cause the registry 134 compile step to see duplicate definition-ID entries and fail. 135 136 Args: 137 connector_name: Connector name (e.g. `source-faker`). 138 version: Version string (e.g. `6.2.38`). 139 artifacts_dir: Local directory containing artifacts from `generate`. 140 store: Parsed store target containing bucket, prefix, and stage info. 141 dry_run: If `True`, report what would be uploaded without writing. 142 with_validate: If `True` (default), validate metadata before uploading. 143 Pass `False` (`--no-validate`) to skip. 144 145 Returns: 146 A `PublishArtifactsResult` describing what was published. 147 148 Raises: 149 ValueError: If the connector directory name does not match 150 `dockerRepository` in the generated metadata. 151 """ 152 if not artifacts_dir.is_dir(): 153 raise FileNotFoundError(f"Artifacts directory not found: {artifacts_dir}") 154 155 # Fail fast if the connector directory name doesn't match dockerRepository. 156 # A mismatch would publish artifacts under the wrong GCS path and corrupt 157 # the registry (duplicate definition-IDs under different directory names). 158 mismatch_error = _check_connector_name_matches_docker_repo( 159 connector_name, artifacts_dir 160 ) 161 if mismatch_error: 162 raise ValueError(mismatch_error) 163 164 # Build the GCS destination path 165 bucket_name = store.bucket 166 prefix = store.prefix 167 blob_root = versioned_blob_root( 168 connector_name=connector_name, version=version, store=store 169 ) 170 versioned_dest = f"gcs://{bucket_name}/{blob_root}" 171 172 target_label = f"{bucket_name}/{prefix}" if prefix else bucket_name 173 result = PublishArtifactsResult( 174 connector_name=connector_name, 175 version=version, 176 target=target_label, 177 gcs_destination=versioned_dest, 178 dry_run=dry_run, 179 ) 180 181 # --- Pre-publish validation --- 182 if with_validate: 183 metadata_file = artifacts_dir / "metadata.yaml" 184 if metadata_file.is_file(): 185 raw_metadata = yaml.safe_load(metadata_file.read_text()) 186 metadata_data = (raw_metadata or {}).get("data", {}) 187 validation = validate_metadata(metadata_data=metadata_data) 188 if not validation.passed: 189 for err in validation.errors: 190 logger.error("Pre-publish validation error: %s", err) 191 result.validation_errors = validation.errors 192 return result 193 logger.info( 194 "Pre-publish validation passed (%d validators).", 195 validation.validators_run, 196 ) 197 else: 198 logger.warning("No metadata.yaml in artifacts dir; skipping validation.") 199 200 # Enumerate local files 201 local_files = sorted(f for f in artifacts_dir.rglob("*") if f.is_file()) 202 if not local_files: 203 result.errors.append(f"No files found in {artifacts_dir}.") 204 return result 205 206 _log_progress( 207 "Publishing %d artifacts for %s@%s → %s", 208 len(local_files), 209 connector_name, 210 version, 211 versioned_dest, 212 ) 213 214 # Build references used by both dry-run and real upload paths 215 deps_file = artifacts_dir / CONNECTOR_DEPENDENCY_FILE_NAME 216 has_deps = deps_file.is_file() 217 deps_gcs_key = dependencies_blob_path( 218 connector_name=connector_name, version=version, store=store 219 ) 220 221 sbom_file = artifacts_dir / SBOM_FILE_NAME 222 has_sbom = sbom_file.is_file() 223 224 if dry_run: 225 for f in local_files: 226 rel = f.relative_to(artifacts_dir) 227 result.files_uploaded.append(str(rel)) 228 _log_progress(" [DRY RUN] would upload: %s", rel) 229 # Report the dual-load of dependencies.json to connector_dependencies/ 230 if has_deps: 231 result.files_uploaded.append(deps_gcs_key) 232 _log_progress( 233 " [DRY RUN] would also dual-load: %s → gs://%s/%s", 234 CONNECTOR_DEPENDENCY_FILE_NAME, 235 bucket_name, 236 deps_gcs_key, 237 ) 238 # Report the separate sbom/ upload 239 if has_sbom: 240 sbom_gcs_key = sbom_blob_path( 241 connector_name=connector_name, 242 version=version, 243 store=store, 244 ) 245 result.files_uploaded.append(sbom_gcs_key) 246 _log_progress( 247 " [DRY RUN] would also upload: %s → gs://%s/%s", 248 SBOM_FILE_NAME, 249 bucket_name, 250 sbom_gcs_key, 251 ) 252 return result 253 254 # Authenticate 255 token = get_gcs_credentials_token() 256 fs = gcsfs.GCSFileSystem(token=token) 257 258 # Strip gcs:// prefix for gcsfs path 259 dest_path = versioned_dest.replace("gcs://", "") 260 261 # Upload all files to the versioned path 262 _log_progress("Uploading to: %s", versioned_dest) 263 for f in local_files: 264 rel = f.relative_to(artifacts_dir) 265 remote_path = f"{dest_path}/{rel}" 266 fs.put(str(f), remote_path) 267 result.files_uploaded.append(str(rel)) 268 _log_progress(" Uploaded: %s", rel) 269 270 # Delete remote files that don't exist locally (sync semantics) 271 try: 272 remote_files = fs.ls(dest_path, detail=False) 273 local_rel_paths = {str(f.relative_to(artifacts_dir)) for f in local_files} 274 for remote_file in remote_files: 275 # Skip the directory entry itself if it appears in the listing 276 if remote_file == dest_path: 277 continue 278 # Derive the remote relative path, matching upload semantics 279 if remote_file.startswith(dest_path + "/"): 280 remote_rel = remote_file[len(dest_path) + 1 :] 281 else: 282 remote_rel = remote_file.split("/")[-1] 283 if remote_rel not in local_rel_paths: 284 fs.rm(remote_file) 285 _log_progress(" Deleted stale remote file: %s", remote_rel) 286 except FileNotFoundError: 287 pass # Destination doesn't exist yet, nothing to clean 288 289 _log_progress("Uploaded %d files to %s", len(local_files), versioned_dest) 290 291 # --- Dual-load dependencies.json to the connector_dependencies/ path --- 292 if not has_deps: 293 logger.debug( 294 "No %s in artifacts dir — skipping dual-load.", 295 CONNECTOR_DEPENDENCY_FILE_NAME, 296 ) 297 else: 298 deps_remote = f"{bucket_name}/{deps_gcs_key}" 299 _log_progress( 300 "Dual-loading %s to gs://%s", 301 CONNECTOR_DEPENDENCY_FILE_NAME, 302 deps_remote, 303 ) 304 fs.put(str(deps_file), deps_remote) 305 result.files_uploaded.append(deps_gcs_key) 306 _log_progress(" Uploaded %s (dual-load)", CONNECTOR_DEPENDENCY_FILE_NAME) 307 308 # --- Upload SBOM to the dedicated sbom/ path in GCS --- 309 if not has_sbom: 310 logger.debug( 311 "No %s in artifacts dir — skipping SBOM dual-load.", 312 SBOM_FILE_NAME, 313 ) 314 else: 315 sbom_gcs_uri = upload_sbom( 316 sbom_path=sbom_file, 317 connector_name=connector_name, 318 version=version, 319 store=store, 320 dry_run=dry_run, 321 ) 322 result.files_uploaded.append( 323 sbom_blob_path( 324 connector_name=connector_name, 325 version=version, 326 store=store, 327 ), 328 ) 329 _log_progress("Uploaded SBOM to dedicated path: %s", sbom_gcs_uri) 330 331 return result
Publish locally generated artifacts to a GCS registry bucket.
Uses gcsfs.GCSFileSystem to upload the local artifacts_dir to the
versioned path inside the target GCS bucket.
The target GCS path is:
gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/
Before uploading, this function validates that the connector_name
(derived from the connector directory) matches the dockerRepository
declared in metadata.yaml. A mismatch would cause the registry
compile step to see duplicate definition-ID entries and fail.
Arguments:
- connector_name: Connector name (e.g.
source-faker). - version: Version string (e.g.
6.2.38). - artifacts_dir: Local directory containing artifacts from
generate. - store: Parsed store target containing bucket, prefix, and stage info.
- dry_run: If
True, report what would be uploaded without writing. - with_validate: If
True(default), validate metadata before uploading. PassFalse(--no-validate) to skip.
Returns:
A
PublishArtifactsResultdescribing what was published.
Raises:
- ValueError: If the connector directory name does not match
dockerRepositoryin the generated metadata.
1387def purge_latest_dirs( 1388 *, 1389 store: RegistryStore, 1390 connector_name: list[str] | None = None, 1391 dry_run: bool = False, 1392) -> PurgeLatestResult: 1393 """Delete all `latest/` directories from the registry store. 1394 1395 Discovers connector directories via glob, then deletes each 1396 `latest/` subdirectory in parallel using a thread pool. 1397 1398 Args: 1399 store: Registry store (bucket + optional prefix). 1400 connector_name: If provided, only purge these connectors. 1401 dry_run: If True, report what would be done without deleting. 1402 1403 Returns: 1404 A `PurgeLatestResult` describing what was done. 1405 """ 1406 result = PurgeLatestResult(target=store.bucket_root, dry_run=dry_run) 1407 1408 token = get_gcs_credentials_token() 1409 fs = gcsfs.GCSFileSystem(token=token) 1410 1411 base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte" 1412 1413 # Discover latest/ dirs by listing connector directories that contain 1414 # a `latest/` subdirectory. 1415 _log_progress("Discovering latest/ directories...") 1416 base_with_slash = f"{base}/" 1417 if connector_name: 1418 # Check each requested connector for a latest/ dir 1419 seen: set[str] = set() 1420 connectors_with_latest: list[str] = [] 1421 for name in connector_name: 1422 if name in seen: 1423 continue 1424 latest_path = f"{base}/{name}/latest" 1425 if fs.exists(latest_path): 1426 connectors_with_latest.append(name) 1427 seen.add(name) 1428 else: 1429 # Glob for all connectors, then filter to those with latest/ 1430 all_connector_dirs = fs.glob(f"{base}/*/latest") 1431 seen = set() 1432 connectors_with_latest = [] 1433 for path in all_connector_dirs: 1434 # Strip the known base prefix and take the first component 1435 if not path.startswith(base_with_slash): 1436 logger.warning("Could not parse latest path: %s", path) 1437 continue 1438 relative = path[len(base_with_slash) :] 1439 connector = relative.split("/")[0] 1440 if connector and connector not in seen: 1441 connectors_with_latest.append(connector) 1442 seen.add(connector) 1443 1444 result.connectors_found = len(connectors_with_latest) 1445 _log_progress( 1446 "Found %d connectors with latest/ directories", 1447 result.connectors_found, 1448 ) 1449 1450 if not connectors_with_latest: 1451 _log_progress("Nothing to purge.") 1452 _log_progress(result.summary()) 1453 return result 1454 1455 if dry_run: 1456 for connector in sorted(connectors_with_latest): 1457 _log_progress(" [DRY RUN] Would delete %s/latest/", connector) 1458 result.latest_dirs_deleted = len(connectors_with_latest) 1459 _log_progress(result.summary()) 1460 return result 1461 1462 # Delete latest/ dirs in parallel using the shared helper. 1463 def _delete_one(connector: str) -> str | None: 1464 """Delete a single connector's latest/ dir. Returns error string or None.""" 1465 try: 1466 _delete_latest_dir( 1467 fs, 1468 store=store, 1469 connector=connector, 1470 ) 1471 return None 1472 except Exception as exc: 1473 return f"Failed to delete latest/ for {connector}: {exc}" 1474 1475 _log_progress( 1476 "Deleting %d latest/ directories (max_workers=%d)...", 1477 len(connectors_with_latest), 1478 _PURGE_LATEST_MAX_WORKERS, 1479 ) 1480 1481 with ThreadPoolExecutor(max_workers=_PURGE_LATEST_MAX_WORKERS) as pool: 1482 futures = { 1483 pool.submit(_delete_one, c): c for c in sorted(connectors_with_latest) 1484 } 1485 for i, future in enumerate(as_completed(futures), 1): 1486 connector = futures[future] 1487 error = future.result() 1488 if error: 1489 logger.error(error) 1490 result.errors.append(error) 1491 else: 1492 result.latest_dirs_deleted += 1 1493 if i % 100 == 0: 1494 _log_progress(" Deleted %d / %d...", i, len(connectors_with_latest)) 1495 1496 _log_progress(result.summary()) 1497 return result
Delete all latest/ directories from the registry store.
Discovers connector directories via glob, then deletes each
latest/ subdirectory in parallel using a thread pool.
Arguments:
- store: Registry store (bucket + optional prefix).
- connector_name: If provided, only purge these connectors.
- dry_run: If True, report what would be done without deleting.
Returns:
A
PurgeLatestResultdescribing what was done.
147def rebuild_registry( 148 source_bucket: str, 149 output_mode: OutputMode, 150 output_path_root: str | None = None, 151 gcs_bucket: str | None = None, 152 s3_bucket: str | None = None, 153 dry_run: bool = False, 154 connector_name: list[str] | None = None, 155) -> RebuildResult: 156 """Rebuild the entire registry from a source GCS bucket to an output target. 157 158 Reads all connector metadata blobs from the source GCS bucket and copies 159 them to the output target using fsspec for unified filesystem access. 160 161 The output targets are: 162 - local: Write to a local directory tree. 163 - gcs: Copy to a GCS bucket (must not be the prod bucket). 164 - s3: Copy to an S3 bucket. 165 166 Args: 167 source_bucket: The GCS bucket to read from (typically prod). 168 output_mode: Where to write: "local", "gcs", or "s3". 169 output_path_root: Root path/prefix for output. For local mode, if None 170 creates a temp directory. For GCS/S3, prepended to all blob paths. 171 gcs_bucket: Target GCS bucket name (required if output_mode="gcs"). 172 s3_bucket: Target S3 bucket name (required if output_mode="s3"). 173 dry_run: If True, report what would be done without writing. 174 connector_name: If provided, only rebuild these connector names 175 (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all. 176 177 Returns: 178 RebuildResult with details of the operation. 179 180 Raises: 181 ValueError: If target is the prod bucket, or required bucket arg is missing. 182 """ 183 if output_mode == "gcs" and gcs_bucket: 184 _validate_not_prod_bucket(gcs_bucket) 185 186 # Resolve output root for local mode 187 effective_output_root = output_path_root or "" 188 if output_mode == "local": 189 effective_output_root = _resolve_local_output_root(output_path_root) 190 191 result = RebuildResult( 192 source_bucket=source_bucket, 193 output_mode=output_mode, 194 output_root=effective_output_root, 195 dry_run=dry_run, 196 ) 197 198 # Create source filesystem (always GCS) 199 source_fs, gcs_token = _make_source_fs() 200 source_base = f"{source_bucket}/{METADATA_FOLDER}" 201 202 # List files under metadata/ in the source bucket 203 if connector_name: 204 _log_progress( 205 "Listing blobs for %d connectors under gs://%s/...", 206 len(connector_name), 207 source_base, 208 ) 209 source_paths: list[str] = [] 210 for name in connector_name: 211 connector_prefix = f"{source_base}/airbyte/{name}" 212 found = source_fs.find(connector_prefix) 213 source_paths.extend(found) 214 _log_progress(" %s: %d blobs", name, len(found)) 215 else: 216 _log_progress("Listing all blobs under gs://%s/...", source_base) 217 source_paths = source_fs.find(source_base) 218 219 if not source_paths: 220 _log_progress("No blobs found under gs://%s/", source_base) 221 return result 222 223 total_blobs = len(source_paths) 224 _log_progress("Found %d blobs to process", total_blobs) 225 226 # Create output filesystem 227 output_fs, output_base = _make_output_fs( 228 output_mode=output_mode, 229 output_root=effective_output_root, 230 gcs_bucket=gcs_bucket, 231 s3_bucket=s3_bucket, 232 gcs_token=gcs_token, 233 ) 234 235 # Collect connector names and compute relative paths for all blobs. 236 bucket_prefix = f"{source_bucket}/" 237 blob_relative_paths: list[str] = [] 238 connector_names: set[str] = set() 239 240 for source_path in source_paths: 241 relative_path = source_path 242 if source_path.startswith(bucket_prefix): 243 relative_path = source_path[len(bucket_prefix) :] 244 blob_relative_paths.append(relative_path) 245 246 parts = relative_path.split("/") 247 if len(parts) >= 3: 248 connector_names.add(parts[2]) 249 250 if dry_run: 251 result.blobs_copied = total_blobs 252 result.connectors_processed = len(connector_names) 253 _log_progress( 254 "[DRY RUN] Would copy %d blobs (%d connectors)", 255 total_blobs, 256 len(connector_names), 257 ) 258 _log_progress(result.summary()) 259 return result 260 261 # Use GCS-native server-side copy for GCS→GCS mirrors. 262 if output_mode == "gcs" and gcs_bucket: 263 result = _gcs_native_copy( 264 source_bucket=source_bucket, 265 dest_bucket_name=gcs_bucket, 266 dest_prefix=effective_output_root, 267 blob_relative_paths=blob_relative_paths, 268 connector_names=connector_names, 269 gcs_token=gcs_token, 270 result=result, 271 connector_name_filter=connector_name, 272 ) 273 return result 274 275 # Fallback: fsspec-based copy for local and S3 output modes. 276 result = _fsspec_copy( 277 source_fs=source_fs, 278 source_paths=source_paths, 279 output_fs=output_fs, 280 output_base=output_base, 281 output_mode=output_mode, 282 source_bucket=source_bucket, 283 blob_relative_paths=blob_relative_paths, 284 connector_names=connector_names, 285 result=result, 286 ) 287 return result
Rebuild the entire registry from a source GCS bucket to an output target.
Reads all connector metadata blobs from the source GCS bucket and copies them to the output target using fsspec for unified filesystem access.
The output targets are:
- local: Write to a local directory tree.
- gcs: Copy to a GCS bucket (must not be the prod bucket).
- s3: Copy to an S3 bucket.
Arguments:
- source_bucket: The GCS bucket to read from (typically prod).
- output_mode: Where to write: "local", "gcs", or "s3".
- output_path_root: Root path/prefix for output. For local mode, if None creates a temp directory. For GCS/S3, prepended to all blob paths.
- gcs_bucket: Target GCS bucket name (required if output_mode="gcs").
- s3_bucket: Target S3 bucket name (required if output_mode="s3").
- dry_run: If True, report what would be done without writing.
- connector_name: If provided, only rebuild these connector names (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all.
Returns:
RebuildResult with details of the operation.
Raises:
- ValueError: If target is the prod bucket, or required bucket arg is missing.
229def resolve_registry_store( 230 store: str | None = None, 231 connector_name: str | None = None, 232 cwd: Path | None = None, 233 default_env: str = "dev", 234) -> RegistryStore: 235 """Resolve a `RegistryStore` from CLI inputs. 236 237 All applicable detection methods are evaluated. Explicit sources 238 (`--store`, then the `AIRBYTE_REGISTRY_STORE` env var) take priority 239 and are returned directly. When only auto-detected sources remain, they 240 are compared and a `ValueError` is raised if they disagree. 241 242 Priority (highest → lowest): 243 244 1. **Explicit** `--store` argument (e.g. `"coral:dev"`). 245 2. **Environment variable** -- `AIRBYTE_REGISTRY_STORE`. 246 3. **Auto-detected** -- connector name and/or working directory. 247 If both are present and disagree, a `ValueError` is raised. 248 249 Args: 250 store: Explicit store target string (e.g. `"coral:dev"`). 251 connector_name: Optional connector name for auto-detection. 252 cwd: Working directory for repo-based detection. 253 default_env: Environment to use when auto-detecting (default `"dev"`). 254 255 Returns: 256 A fully resolved `RegistryStore`. 257 258 Raises: 259 ValueError: If no detection method succeeds, or if auto-detected 260 methods produce conflicting store types. 261 """ 262 # -- Collect all detection results ------------------------------------ 263 # Explicit sources (take priority — no conflict checking needed). 264 explicit_target: RegistryStore | None = None 265 explicit_source: str | None = None 266 267 if store is not None: 268 explicit_target = RegistryStore.parse(store) 269 explicit_source = "--store" 270 271 env_store = os.environ.get(REGISTRY_STORE_ENV_VAR) 272 if env_store and explicit_target is None: 273 explicit_target = RegistryStore.parse(env_store) 274 explicit_source = REGISTRY_STORE_ENV_VAR 275 276 # Auto-detected sources (only consulted when no explicit source). 277 auto_detections: dict[str, StoreType] = {} 278 279 if connector_name is not None: 280 auto_detections["connector_name"] = StoreType.get_from_connector_name( 281 connector_name, 282 ) 283 284 dir_type = StoreType.detect_from_repo_dir(cwd) 285 if dir_type is not None: 286 auto_detections["working_directory"] = dir_type 287 288 # -- Return explicit source if present -------------------------------- 289 if explicit_target is not None: 290 logger.debug( 291 "Using explicit store target from %s: %s:%s", 292 explicit_source, 293 explicit_target.store_type.value, 294 explicit_target.env, 295 ) 296 return explicit_target 297 298 # -- No explicit source: resolve from auto-detections ----------------- 299 if not auto_detections: 300 raise ValueError( 301 "Cannot determine registry store. " 302 "Provide --store (e.g. 'coral:dev' or 'sonar:prod'), " 303 f"set ${REGISTRY_STORE_ENV_VAR}, " 304 "or run from a recognized repository directory." 305 ) 306 307 distinct = set(auto_detections.values()) 308 309 if len(distinct) > 1: 310 detail = ", ".join(f"{src}={st.value}" for src, st in auto_detections.items()) 311 raise ValueError( 312 f"Conflicting store types detected: {detail}. " 313 "Provide an explicit --store to resolve the ambiguity." 314 ) 315 316 resolved_type = distinct.pop() 317 logger.info( 318 "Auto-detected store type '%s' (sources: %s)", 319 resolved_type.value, 320 ", ".join(auto_detections), 321 ) 322 return RegistryStore(store_type=resolved_type, env=default_env)
Resolve a RegistryStore from CLI inputs.
All applicable detection methods are evaluated. Explicit sources
(--store, then the AIRBYTE_REGISTRY_STORE env var) take priority
and are returned directly. When only auto-detected sources remain, they
are compared and a ValueError is raised if they disagree.
Priority (highest → lowest):
- Explicit
--storeargument (e.g."coral:dev"). - Environment variable --
AIRBYTE_REGISTRY_STORE. - Auto-detected -- connector name and/or working directory.
If both are present and disagree, a
ValueErroris raised.
Arguments:
- store: Explicit store target string (e.g.
"coral:dev"). - connector_name: Optional connector name for auto-detection.
- cwd: Working directory for repo-based detection.
- default_env: Environment to use when auto-detecting (default
"dev").
Returns:
A fully resolved
RegistryStore.
Raises:
- ValueError: If no detection method succeeds, or if auto-detected methods produce conflicting store types.
93def strip_rc_suffix(version: str) -> str: 94 """Strip the release candidate suffix from a version string. 95 96 Args: 97 version: The version string (e.g., '1.2.3-rc.1'). 98 99 Returns: 100 The base version without RC suffix (e.g., '1.2.3'). 101 Returns the original version if no RC suffix is present. 102 """ 103 if "-rc." in version: 104 return version.split("-rc.")[0] 105 return version
Strip the release candidate suffix from a version string.
Arguments:
- version: The version string (e.g., '1.2.3-rc.1').
Returns:
The base version without RC suffix (e.g., '1.2.3'). Returns the original version if no RC suffix is present.
157def unyank_connector_version( 158 connector_name: str, 159 version: str, 160 bucket_name: str, 161 dry_run: bool = False, 162) -> YankResult: 163 """Remove the yank marker from a connector version. 164 165 Deletes the version-yank.yml file at: 166 metadata/airbyte/{connector_name}/{version}/version-yank.yml 167 168 Args: 169 connector_name: The connector name (e.g., "source-faker"). 170 version: The version to unyank (e.g., "1.2.3"). 171 bucket_name: The GCS bucket name. 172 dry_run: If True, report what would be done without deleting. 173 174 Returns: 175 YankResult with details of the operation. 176 """ 177 yank_path = _get_yank_blob_path(connector_name, version) 178 179 storage_client = get_gcs_storage_client() 180 bucket = storage_client.bucket(bucket_name) 181 182 # Check if yank marker exists 183 yank_blob = bucket.blob(yank_path) 184 if not yank_blob.exists(): 185 return YankResult( 186 connector_name=connector_name, 187 version=version, 188 bucket_name=bucket_name, 189 action="unyank", 190 success=False, 191 message=f"Version {version} of {connector_name} is not yanked.", 192 dry_run=dry_run, 193 ) 194 195 if dry_run: 196 return YankResult( 197 connector_name=connector_name, 198 version=version, 199 bucket_name=bucket_name, 200 action="unyank", 201 success=True, 202 message=f"[DRY RUN] Would unyank {connector_name} {version}.", 203 dry_run=True, 204 ) 205 206 # Delete the yank marker 207 yank_blob.delete() 208 209 logger.info("Unyanked %s version %s in %s", connector_name, version, bucket_name) 210 211 return YankResult( 212 connector_name=connector_name, 213 version=version, 214 bucket_name=bucket_name, 215 action="unyank", 216 success=True, 217 message=f"Successfully unyanked {connector_name} {version}.", 218 )
Remove the yank marker from a connector version.
Deletes the version-yank.yml file at: metadata/airbyte/{connector_name}/{version}/version-yank.yml
Arguments:
- connector_name: The connector name (e.g., "source-faker").
- version: The version to unyank (e.g., "1.2.3").
- bucket_name: The GCS bucket name.
- dry_run: If True, report what would be done without deleting.
Returns:
YankResult with details of the operation.
270def validate_metadata( 271 metadata_data: dict[str, Any], 272 opts: ValidateOptions | None = None, 273) -> ValidationResult: 274 """Run all pre-publish validators against raw `metadata.data`. 275 276 Args: 277 metadata_data: The `data` section of a parsed `metadata.yaml`. 278 opts: Options influencing validation behaviour. 279 280 Returns: 281 A `ValidationResult` with aggregate pass/fail and error list. 282 """ 283 if opts is None: 284 opts = ValidateOptions() 285 286 result = ValidationResult() 287 288 for validator in PRE_PUBLISH_VALIDATORS: 289 result.validators_run += 1 290 logger.info("Running validator: %s", validator.__name__) 291 passed, error = validator(metadata_data, opts) 292 if not passed and error: 293 logger.error("Validation failed: %s", error) 294 result.add_error(error) 295 296 return result
Run all pre-publish validators against raw metadata.data.
Arguments:
- metadata_data: The
datasection of a parsedmetadata.yaml. - opts: Options influencing validation behaviour.
Returns:
A
ValidationResultwith aggregate pass/fail and error list.
65def yank_connector_version( 66 connector_name: str, 67 version: str, 68 bucket_name: str, 69 reason: str = "", 70 dry_run: bool = False, 71) -> YankResult: 72 """Mark a connector version as yanked by writing a version-yank.yml marker. 73 74 The marker file is placed at: 75 metadata/airbyte/{connector_name}/{version}/version-yank.yml 76 77 Args: 78 connector_name: The connector name (e.g., "source-faker"). 79 version: The version to yank (e.g., "1.2.3"). 80 bucket_name: The GCS bucket name. 81 reason: Optional reason for yanking the version. 82 dry_run: If True, report what would be done without writing. 83 84 Returns: 85 YankResult with details of the operation. 86 87 Raises: 88 ValueError: If the bucket is the production bucket and no override is set, 89 or if the version does not exist. 90 """ 91 yank_path = _get_yank_blob_path(connector_name, version) 92 metadata_path = _get_metadata_blob_path(connector_name, version) 93 94 storage_client = get_gcs_storage_client() 95 bucket = storage_client.bucket(bucket_name) 96 97 # Verify the version exists 98 metadata_blob = bucket.blob(metadata_path) 99 if not metadata_blob.exists(): 100 return YankResult( 101 connector_name=connector_name, 102 version=version, 103 bucket_name=bucket_name, 104 action="yank", 105 success=False, 106 message=f"Version {version} not found for {connector_name} in {bucket_name}.", 107 dry_run=dry_run, 108 ) 109 110 # Check if already yanked 111 yank_blob = bucket.blob(yank_path) 112 if yank_blob.exists(): 113 return YankResult( 114 connector_name=connector_name, 115 version=version, 116 bucket_name=bucket_name, 117 action="yank", 118 success=False, 119 message=f"Version {version} of {connector_name} is already yanked.", 120 dry_run=dry_run, 121 ) 122 123 if dry_run: 124 return YankResult( 125 connector_name=connector_name, 126 version=version, 127 bucket_name=bucket_name, 128 action="yank", 129 success=True, 130 message=f"[DRY RUN] Would yank {connector_name} {version}.", 131 dry_run=True, 132 ) 133 134 # Write the yank marker file 135 yank_content: dict[str, Any] = { 136 "yanked": True, 137 "yanked_at": datetime.now(tz=timezone.utc).isoformat(), 138 } 139 if reason: 140 yank_content["reason"] = reason 141 142 yank_yaml = yaml.dump(yank_content, default_flow_style=False) 143 yank_blob.upload_from_string(yank_yaml, content_type="application/x-yaml") 144 145 logger.info("Yanked %s version %s in %s", connector_name, version, bucket_name) 146 147 return YankResult( 148 connector_name=connector_name, 149 version=version, 150 bucket_name=bucket_name, 151 action="yank", 152 success=True, 153 message=f"Successfully yanked {connector_name} {version}.", 154 )
Mark a connector version as yanked by writing a version-yank.yml marker.
The marker file is placed at:
metadata/airbyte/{connector_name}/{version}/version-yank.yml
Arguments:
- connector_name: The connector name (e.g., "source-faker").
- version: The version to yank (e.g., "1.2.3").
- bucket_name: The GCS bucket name.
- reason: Optional reason for yanking the version.
- dry_run: If True, report what would be done without writing.
Returns:
YankResult with details of the operation.
Raises:
- ValueError: If the bucket is the production bucket and no override is set, or if the version does not exist.