airbyte_ops_mcp.registry
Registry operations for Airbyte connectors.
This package provides functionality for:
- Reading connector metadata from the GCS registry
- Listing connectors and versions in the registry
- Publishing connector metadata to GCS
- Promoting and rolling back release candidates
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Registry operations for Airbyte connectors. 3 4This package provides functionality for: 5- Reading connector metadata from the GCS registry 6- Listing connectors and versions in the registry 7- Publishing connector metadata to GCS 8- Promoting and rolling back release candidates 9""" 10 11from __future__ import annotations 12 13from airbyte_ops_mcp.registry._constants import ( 14 DEFAULT_METADATA_SERVICE_BUCKET_NAME, 15 DEV_METADATA_SERVICE_BUCKET_NAME, 16 LATEST_GCS_FOLDER_NAME, 17 METADATA_FILE_NAME, 18 METADATA_FOLDER, 19 PROD_METADATA_SERVICE_BUCKET_NAME, 20 RELEASE_CANDIDATE_GCS_FOLDER_NAME, 21 SONAR_DEV_BUCKET_NAME, 22 SONAR_PROD_BUCKET_NAME, 23) 24from airbyte_ops_mcp.registry._enums import ( 25 ConnectorLanguage, 26 ConnectorType, 27 SupportLevel, 28) 29from airbyte_ops_mcp.registry.audit import ( 30 AuditResult, 31 UnpublishedConnector, 32 find_unpublished_connectors, 33) 34from airbyte_ops_mcp.registry.compile import ( 35 CompileResult, 36 PurgeLatestResult, 37 compile_registry, 38 purge_latest_dirs, 39) 40from airbyte_ops_mcp.registry.generate import ( 41 GenerateResult, 42 generate_version_artifacts, 43) 44from airbyte_ops_mcp.registry.models import ( 45 ConnectorListResult, 46 ConnectorMetadata, 47 ConnectorPublishResult, 48 MetadataPublishResult, 49 RegistryEntryResult, 50 VersionListResult, 51) 52from airbyte_ops_mcp.registry.operations import ( 53 get_registry_entry, 54 get_registry_spec, 55 list_connector_versions, 56 list_registry_connectors, 57 list_registry_connectors_filtered, 58) 59from airbyte_ops_mcp.registry.publish import ( 60 CONNECTOR_PATH_PREFIX, 61 create_progressive_rollout_blob, 62 delete_progressive_rollout_blob, 63 get_connector_metadata, 64 get_gcs_publish_path, 65 is_valid_for_progressive_rollout, 66 publish_connector_metadata, 67 strip_rc_suffix, 68) 69from airbyte_ops_mcp.registry.publish_artifacts import ( 70 PublishArtifactsResult, 71 publish_version_artifacts, 72) 73from airbyte_ops_mcp.registry.rebuild import ( 74 OutputMode, 75 RebuildResult, 76 rebuild_registry, 77) 78from airbyte_ops_mcp.registry.registry_store_base import ( 79 Registry, 80 get_registry, 81) 82from airbyte_ops_mcp.registry.store import ( 83 REGISTRY_STORE_ENV_VAR, 84 RegistryStore, 85 StoreType, 86 resolve_registry_store, 87) 88from airbyte_ops_mcp.registry.validate import ( 89 ValidateOptions, 90 ValidationResult, 91 validate_metadata, 92) 93from airbyte_ops_mcp.registry.yank import ( 94 YANK_FILE_NAME, 95 YankResult, 96 unyank_connector_version, 97 yank_connector_version, 98) 99 100__all__ = [ 101 "CONNECTOR_PATH_PREFIX", 102 "DEFAULT_METADATA_SERVICE_BUCKET_NAME", 103 "DEV_METADATA_SERVICE_BUCKET_NAME", 104 "LATEST_GCS_FOLDER_NAME", 105 "METADATA_FILE_NAME", 106 "METADATA_FOLDER", 107 "PROD_METADATA_SERVICE_BUCKET_NAME", 108 "REGISTRY_STORE_ENV_VAR", 109 "RELEASE_CANDIDATE_GCS_FOLDER_NAME", 110 "SONAR_DEV_BUCKET_NAME", 111 "SONAR_PROD_BUCKET_NAME", 112 "YANK_FILE_NAME", 113 "AuditResult", 114 "CompileResult", 115 "ConnectorLanguage", 116 "ConnectorListResult", 117 "ConnectorMetadata", 118 "ConnectorPublishResult", 119 "ConnectorType", 120 "GenerateResult", 121 "MetadataPublishResult", 122 "OutputMode", 123 "PublishArtifactsResult", 124 "PurgeLatestResult", 125 "RebuildResult", 126 "Registry", 127 "RegistryEntryResult", 128 "RegistryStore", 129 "StoreType", 130 "SupportLevel", 131 "UnpublishedConnector", 132 "ValidateOptions", 133 "ValidationResult", 134 "VersionListResult", 135 "YankResult", 136 "compile_registry", 137 "create_progressive_rollout_blob", 138 "delete_progressive_rollout_blob", 139 "find_unpublished_connectors", 140 "generate_version_artifacts", 141 "get_connector_metadata", 142 "get_gcs_publish_path", 143 "get_registry", 144 "get_registry_entry", 145 "get_registry_spec", 146 "is_valid_for_progressive_rollout", 147 "list_connector_versions", 148 "list_registry_connectors", 149 "list_registry_connectors_filtered", 150 "publish_connector_metadata", 151 "publish_version_artifacts", 152 "purge_latest_dirs", 153 "rebuild_registry", 154 "resolve_registry_store", 155 "strip_rc_suffix", 156 "unyank_connector_version", 157 "validate_metadata", 158 "yank_connector_version", 159]
37@dataclass 38class AuditResult: 39 """Result of auditing which connectors have unpublished versions.""" 40 41 unpublished: list[UnpublishedConnector] = field(default_factory=list) 42 checked_count: int = 0 43 skipped_archived: list[str] = field(default_factory=list) 44 skipped_rc: list[str] = field(default_factory=list) 45 skipped_disabled: list[str] = field(default_factory=list) 46 errors: list[str] = field(default_factory=list)
Result of auditing which connectors have unpublished versions.
102@dataclass 103class CompileResult: 104 """Result of a registry compile operation.""" 105 106 target: str 107 connectors_scanned: int = 0 108 versions_found: int = 0 109 yanked_versions: int = 0 110 latest_updated: int = 0 111 latest_already_current: int = 0 112 cloud_registry_entries: int = 0 113 oss_registry_entries: int = 0 114 composite_registry_entries: int = 0 115 metrics_connector_count: int = 0 116 metrics_registry_entries: int = 0 117 metrics_source: str | None = None 118 metrics_error: str | None = None 119 version_indexes_written: int = 0 120 specs_secrets_mask_properties: int = 0 121 errors: list[str] = field(default_factory=list) 122 dry_run: bool = False 123 124 @property 125 def status(self) -> str: 126 if self.dry_run: 127 return "dry-run" 128 if self.errors: 129 return "completed-with-errors" 130 return "success" 131 132 def summary(self) -> str: 133 return ( 134 f"[{self.status}] Scanned {self.connectors_scanned} connectors, " 135 f"{self.versions_found} versions ({self.yanked_versions} yanked). " 136 f"Latest updated: {self.latest_updated}, " 137 f"already current: {self.latest_already_current}. " 138 f"Registry entries: cloud={self.cloud_registry_entries}, " 139 f"oss={self.oss_registry_entries}, " 140 f"composite={self.composite_registry_entries}. " 141 f"Metrics loaded for {self.metrics_connector_count} connectors, " 142 f"injected into {self.metrics_registry_entries} registry entries. " 143 f"Version indexes: {self.version_indexes_written}. " 144 f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. " 145 f"Errors: {len(self.errors)}." 146 )
Result of a registry compile operation.
132 def summary(self) -> str: 133 return ( 134 f"[{self.status}] Scanned {self.connectors_scanned} connectors, " 135 f"{self.versions_found} versions ({self.yanked_versions} yanked). " 136 f"Latest updated: {self.latest_updated}, " 137 f"already current: {self.latest_already_current}. " 138 f"Registry entries: cloud={self.cloud_registry_entries}, " 139 f"oss={self.oss_registry_entries}, " 140 f"composite={self.composite_registry_entries}. " 141 f"Metrics loaded for {self.metrics_connector_count} connectors, " 142 f"injected into {self.metrics_registry_entries} registry entries. " 143 f"Version indexes: {self.version_indexes_written}. " 144 f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. " 145 f"Errors: {len(self.errors)}." 146 )
88class ConnectorLanguage(StrEnum): 89 """Connector implementation languages.""" 90 91 PYTHON = "python" 92 JAVA = "java" 93 LOW_CODE = "low-code" 94 MANIFEST_ONLY = "manifest-only" 95 96 @classmethod 97 def parse(cls, value: str) -> ConnectorLanguage: 98 """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch.""" 99 try: 100 return cls(value) 101 except ValueError: 102 valid = ", ".join(f"`{m.value}`" for m in cls) 103 raise ValueError( 104 f"Unrecognized language: {value!r}. Expected one of: {valid}." 105 ) from None
Connector implementation languages.
96 @classmethod 97 def parse(cls, value: str) -> ConnectorLanguage: 98 """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch.""" 99 try: 100 return cls(value) 101 except ValueError: 102 valid = ", ".join(f"`{m.value}`" for m in cls) 103 raise ValueError( 104 f"Unrecognized language: {value!r}. Expected one of: {valid}." 105 ) from None
Parse a string into a ConnectorLanguage, raising ValueError on mismatch.
106class ConnectorListResult(BaseModel): 107 """Result of listing connectors in the registry.""" 108 109 bucket_name: str = Field(description="The GCS bucket name") 110 connector_count: int = Field(description="Number of connectors found") 111 connectors: list[str] = Field(description="List of connector names")
Result of listing connectors in the registry.
16class ConnectorMetadata(BaseModel): 17 """Connector metadata from metadata.yaml. 18 19 This model represents the essential metadata about a connector 20 read from its metadata.yaml file in the Airbyte monorepo. 21 """ 22 23 name: str = Field(description="The connector technical name") 24 docker_repository: str = Field(description="The Docker repository") 25 docker_image_tag: str = Field(description="The Docker image tag/version") 26 support_level: str | None = Field( 27 default=None, description="The support level (certified, community, etc.)" 28 ) 29 definition_id: str | None = Field( 30 default=None, description="The connector definition ID" 31 )
Connector metadata from metadata.yaml.
This model represents the essential metadata about a connector read from its metadata.yaml file in the Airbyte monorepo.
34class ConnectorPublishResult(BaseModel): 35 """Result of a connector publish operation. 36 37 This model provides detailed information about the outcome of a 38 connector publish operation (apply or rollback version override). 39 """ 40 41 connector: str = Field(description="The connector technical name") 42 version: str = Field(description="The connector version") 43 action: Literal["progressive-rollout-create", "progressive-rollout-cleanup"] = ( 44 Field(description="The action performed") 45 ) 46 status: Literal["success", "failure", "dry-run"] = Field( 47 description="The status of the operation" 48 ) 49 docker_image: str | None = Field( 50 default=None, description="The Docker image name if applicable" 51 ) 52 registry_updated: bool = Field( 53 default=False, description="Whether the registry was updated" 54 ) 55 message: str | None = Field(default=None, description="Additional status message") 56 57 def __str__(self) -> str: 58 """Return a string representation of the publish result.""" 59 status_prefix = "dry-run" if self.status == "dry-run" else self.status 60 return f"[{status_prefix}] {self.connector}:{self.version} - {self.action}"
Result of a connector publish operation.
This model provides detailed information about the outcome of a connector publish operation (apply or rollback version override).
70class ConnectorType(StrEnum): 71 """Connector type: source or destination.""" 72 73 SOURCE = "source" 74 DESTINATION = "destination" 75 76 @classmethod 77 def parse(cls, value: str) -> ConnectorType: 78 """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch.""" 79 try: 80 return cls(value) 81 except ValueError: 82 valid = ", ".join(f"`{m.value}`" for m in cls) 83 raise ValueError( 84 f"Unrecognized connector type: {value!r}. Expected one of: {valid}." 85 ) from None
Connector type: source or destination.
76 @classmethod 77 def parse(cls, value: str) -> ConnectorType: 78 """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch.""" 79 try: 80 return cls(value) 81 except ValueError: 82 valid = ", ".join(f"`{m.value}`" for m in cls) 83 raise ValueError( 84 f"Unrecognized connector type: {value!r}. Expected one of: {valid}." 85 ) from None
Parse a string into a ConnectorType, raising ValueError on mismatch.
127@dataclass 128class GenerateResult: 129 """Result of a local artifact generation run.""" 130 131 connector_name: str 132 version: str 133 docker_image: str 134 output_dir: str 135 artifacts_written: list[str] = field(default_factory=list) 136 errors: list[str] = field(default_factory=list) 137 validation_errors: list[str] = field(default_factory=list) 138 dry_run: bool = False 139 140 @property 141 def success(self) -> bool: 142 return len(self.errors) == 0 and len(self.validation_errors) == 0 143 144 def to_dict(self) -> dict[str, Any]: 145 return { 146 "connector_name": self.connector_name, 147 "version": self.version, 148 "docker_image": self.docker_image, 149 "output_dir": self.output_dir, 150 "artifacts_written": self.artifacts_written, 151 "errors": self.errors, 152 "validation_errors": self.validation_errors, 153 "dry_run": self.dry_run, 154 "success": self.success, 155 }
Result of a local artifact generation run.
144 def to_dict(self) -> dict[str, Any]: 145 return { 146 "connector_name": self.connector_name, 147 "version": self.version, 148 "docker_image": self.docker_image, 149 "output_dir": self.output_dir, 150 "artifacts_written": self.artifacts_written, 151 "errors": self.errors, 152 "validation_errors": self.validation_errors, 153 "dry_run": self.dry_run, 154 "success": self.success, 155 }
63class MetadataPublishResult(BaseModel): 64 """Result of a metadata publish operation to GCS. 65 66 This model provides detailed information about the outcome of 67 publishing connector metadata to the registry. 68 """ 69 70 connector_name: str = Field(description="The connector technical name") 71 version: str = Field(description="The version that was published") 72 bucket_name: str = Field(description="The GCS bucket name") 73 versioned_path: str = Field(description="The versioned GCS path") 74 latest_path: str | None = Field( 75 default=None, description="The latest GCS path if updated" 76 ) 77 versioned_uploaded: bool = Field( 78 default=False, description="Whether the versioned metadata was uploaded" 79 ) 80 latest_uploaded: bool = Field( 81 default=False, description="Whether the latest metadata was uploaded" 82 ) 83 status: Literal["success", "dry-run", "already-up-to-date"] = Field( 84 description="The status of the operation" 85 ) 86 message: str = Field(description="Status message describing the outcome") 87 88 def __str__(self) -> str: 89 """Return a string representation of the publish result.""" 90 return f"[{self.status}] {self.connector_name}:{self.version} -> {self.versioned_path}"
Result of a metadata publish operation to GCS.
This model provides detailed information about the outcome of publishing connector metadata to the registry.
46@dataclass 47class PublishArtifactsResult: 48 """Result of a version-artifacts publish operation.""" 49 50 connector_name: str 51 version: str 52 target: str 53 gcs_destination: str 54 files_uploaded: list[str] = field(default_factory=list) 55 errors: list[str] = field(default_factory=list) 56 validation_errors: list[str] = field(default_factory=list) 57 dry_run: bool = False 58 59 @property 60 def success(self) -> bool: 61 return len(self.errors) == 0 and len(self.validation_errors) == 0 62 63 @property 64 def status(self) -> str: 65 if self.dry_run: 66 return "dry-run" 67 if self.errors or self.validation_errors: 68 return "completed-with-errors" 69 return "success"
Result of a version-artifacts publish operation.
149@dataclass 150class PurgeLatestResult: 151 """Result of a purge-latest operation.""" 152 153 target: str 154 connectors_found: int = 0 155 latest_dirs_deleted: int = 0 156 errors: list[str] = field(default_factory=list) 157 dry_run: bool = False 158 159 @property 160 def status(self) -> str: 161 if self.dry_run: 162 return "dry-run" 163 if self.errors: 164 return "completed-with-errors" 165 return "success" 166 167 def summary(self) -> str: 168 return ( 169 f"[{self.status}] Found {self.connectors_found} connectors, " 170 f"deleted {self.latest_dirs_deleted} latest/ directories. " 171 f"Errors: {len(self.errors)}." 172 )
Result of a purge-latest operation.
43@dataclass 44class RebuildResult: 45 """Result of a registry rebuild operation.""" 46 47 source_bucket: str 48 output_mode: OutputMode 49 output_root: str 50 connectors_processed: int = 0 51 blobs_copied: int = 0 52 blobs_skipped: int = 0 53 errors: list[str] = field(default_factory=list) 54 dry_run: bool = False 55 56 @property 57 def status(self) -> str: 58 """Return the status of the rebuild operation.""" 59 if self.dry_run: 60 return "dry-run" 61 if self.errors: 62 return "completed-with-errors" 63 return "success" 64 65 def summary(self) -> str: 66 """Return a human-readable summary.""" 67 return ( 68 f"[{self.status}] Rebuilt {self.connectors_processed} connectors, " 69 f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, " 70 f"{len(self.errors)} errors. Output: {self.output_root}" 71 )
Result of a registry rebuild operation.
56 @property 57 def status(self) -> str: 58 """Return the status of the rebuild operation.""" 59 if self.dry_run: 60 return "dry-run" 61 if self.errors: 62 return "completed-with-errors" 63 return "success"
Return the status of the rebuild operation.
65 def summary(self) -> str: 66 """Return a human-readable summary.""" 67 return ( 68 f"[{self.status}] Rebuilt {self.connectors_processed} connectors, " 69 f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, " 70 f"{len(self.errors)} errors. Output: {self.output_root}" 71 )
Return a human-readable summary.
38class Registry(ABC): 39 """A configured connector registry store. 40 41 A Registry is bound to a specific `airbyte_ops_mcp.registry.store.RegistryStore` 42 (store type + env + optional prefix), and provides methods used by the CLI to 43 read/write registry contents. 44 """ 45 46 def __init__(self, store: RegistryStore) -> None: 47 self.store = store 48 49 @property 50 def store_type(self) -> StoreType: 51 return self.store.store_type 52 53 @property 54 def bucket_name(self) -> str: 55 return self.store.bucket 56 57 @property 58 def prefix(self) -> str: 59 return self.store.prefix 60 61 def _require_no_prefix(self, op_name: str) -> None: 62 """Raise if this op doesn't support prefixed targets.""" 63 64 if self.prefix: 65 raise NotImplementedError( 66 f"Operation '{op_name}' does not yet support store prefixes (got prefix='{self.prefix}')." 67 ) 68 69 # --------------------------------------------------------------------- 70 # Read operations 71 # --------------------------------------------------------------------- 72 73 @abstractmethod 74 def list_connectors( 75 self, 76 *, 77 support_level: SupportLevel | None = None, 78 min_support_level: SupportLevel | None = None, 79 connector_type: ConnectorType | None = None, 80 language: ConnectorLanguage | None = None, 81 ) -> list[str]: 82 raise NotImplementedError( 83 _op_not_implemented_message(self.store_type, "list_connectors") 84 ) 85 86 def list_connector_versions(self, connector_name: str) -> list[str]: 87 raise NotImplementedError( 88 _op_not_implemented_message(self.store_type, "list_connector_versions") 89 ) 90 91 def get_connector_metadata( 92 self, connector_name: str, version: str = "latest" 93 ) -> dict[str, Any]: 94 raise NotImplementedError( 95 _op_not_implemented_message(self.store_type, "get_connector_metadata") 96 ) 97 98 # --------------------------------------------------------------------- 99 # Write / mutate operations 100 # --------------------------------------------------------------------- 101 102 def progressive_rollout_create( 103 self, 104 repo_path: Path, 105 connector_name: str, 106 dry_run: bool = False, 107 ) -> ConnectorPublishResult: 108 raise NotImplementedError( 109 _op_not_implemented_message(self.store_type, "progressive_rollout_create") 110 ) 111 112 def progressive_rollout_cleanup( 113 self, 114 repo_path: Path, 115 connector_name: str, 116 dry_run: bool = False, 117 ) -> ConnectorPublishResult: 118 raise NotImplementedError( 119 _op_not_implemented_message(self.store_type, "progressive_rollout_cleanup") 120 ) 121 122 def yank( 123 self, 124 connector_name: str, 125 version: str, 126 reason: str = "", 127 dry_run: bool = False, 128 ) -> YankResult: 129 raise NotImplementedError(_op_not_implemented_message(self.store_type, "yank")) 130 131 def unyank( 132 self, 133 connector_name: str, 134 version: str, 135 dry_run: bool = False, 136 ) -> YankResult: 137 raise NotImplementedError( 138 _op_not_implemented_message(self.store_type, "unyank") 139 ) 140 141 def publish_version_artifacts( 142 self, 143 connector_name: str, 144 version: str, 145 artifacts_dir: Path, 146 dry_run: bool = False, 147 with_validate: bool = True, 148 ) -> PublishArtifactsResult: 149 raise NotImplementedError( 150 _op_not_implemented_message(self.store_type, "publish_version_artifacts") 151 ) 152 153 def delete_dev_latest( 154 self, 155 connector_name: list[str] | None = None, 156 dry_run: bool = False, 157 ) -> PurgeLatestResult: 158 raise NotImplementedError( 159 _op_not_implemented_message(self.store_type, "delete_dev_latest") 160 ) 161 162 def compile( 163 self, 164 connector_name: list[str] | None = None, 165 dry_run: bool = False, 166 with_secrets_mask: bool = False, 167 with_legacy_migration: str | None = None, 168 with_metrics: bool = True, 169 force: bool = False, 170 ) -> CompileResult: 171 raise NotImplementedError( 172 _op_not_implemented_message(self.store_type, "compile") 173 ) 174 175 def marketing_stubs_check(self, repo_root: Path) -> dict[str, Any]: 176 raise NotImplementedError( 177 _op_not_implemented_message(self.store_type, "marketing_stubs_check") 178 ) 179 180 def marketing_stubs_sync( 181 self, 182 repo_root: Path, 183 dry_run: bool = False, 184 ) -> dict[str, Any]: 185 raise NotImplementedError( 186 _op_not_implemented_message(self.store_type, "marketing_stubs_sync") 187 ) 188 189 def mirror( 190 self, 191 output_mode: OutputMode, 192 output_path_root: str | None = None, 193 gcs_bucket: str | None = None, 194 s3_bucket: str | None = None, 195 dry_run: bool = False, 196 connector_name: list[str] | None = None, 197 ) -> RebuildResult: 198 raise NotImplementedError( 199 _op_not_implemented_message(self.store_type, "mirror") 200 )
A configured connector registry store.
A Registry is bound to a specific airbyte_ops_mcp.registry.store.RegistryStore
(store type + env + optional prefix), and provides methods used by the CLI to
read/write registry contents.
73 @abstractmethod 74 def list_connectors( 75 self, 76 *, 77 support_level: SupportLevel | None = None, 78 min_support_level: SupportLevel | None = None, 79 connector_type: ConnectorType | None = None, 80 language: ConnectorLanguage | None = None, 81 ) -> list[str]: 82 raise NotImplementedError( 83 _op_not_implemented_message(self.store_type, "list_connectors") 84 )
141 def publish_version_artifacts( 142 self, 143 connector_name: str, 144 version: str, 145 artifacts_dir: Path, 146 dry_run: bool = False, 147 with_validate: bool = True, 148 ) -> PublishArtifactsResult: 149 raise NotImplementedError( 150 _op_not_implemented_message(self.store_type, "publish_version_artifacts") 151 )
162 def compile( 163 self, 164 connector_name: list[str] | None = None, 165 dry_run: bool = False, 166 with_secrets_mask: bool = False, 167 with_legacy_migration: str | None = None, 168 with_metrics: bool = True, 169 force: bool = False, 170 ) -> CompileResult: 171 raise NotImplementedError( 172 _op_not_implemented_message(self.store_type, "compile") 173 )
189 def mirror( 190 self, 191 output_mode: OutputMode, 192 output_path_root: str | None = None, 193 gcs_bucket: str | None = None, 194 s3_bucket: str | None = None, 195 dry_run: bool = False, 196 connector_name: list[str] | None = None, 197 ) -> RebuildResult: 198 raise NotImplementedError( 199 _op_not_implemented_message(self.store_type, "mirror") 200 )
93class RegistryEntryResult(BaseModel): 94 """Result of reading a registry entry from GCS. 95 96 This model wraps the raw metadata dictionary with additional context. 97 """ 98 99 connector_name: str = Field(description="The connector technical name") 100 version: str = Field(description="The version that was read") 101 bucket_name: str = Field(description="The GCS bucket name") 102 gcs_path: str = Field(description="The GCS path that was read") 103 metadata: dict = Field(description="The raw metadata dictionary")
Result of reading a registry entry from GCS.
This model wraps the raw metadata dictionary with additional context.
132@dataclass(frozen=True, kw_only=True) 133class RegistryStore: 134 """Parsed store target (type + environment + optional prefix). 135 136 Examples:: 137 138 RegistryStore.parse("coral:dev") 139 # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="") 140 RegistryStore.parse("coral:dev/aj-test") 141 # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test") 142 RegistryStore.parse("sonar:prod") 143 # -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="") 144 """ 145 146 store_type: StoreType 147 env: str 148 prefix: str = "" 149 150 # -- Derived helpers ----------------------------------------------------- 151 152 @property 153 def bucket(self) -> str: 154 """Resolve the concrete bucket name for this target.""" 155 env_map = BUCKET_MAP.get(self.store_type) 156 if env_map is None: 157 raise ValueError(f"Unknown store type: {self.store_type!r}") 158 bucket_name = env_map.get(self.env) 159 if bucket_name is None: 160 raise ValueError( 161 f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. " 162 f"Expected one of: {', '.join(sorted(env_map))}." 163 ) 164 return bucket_name 165 166 @property 167 def bucket_root(self) -> str: 168 """Bucket name with optional prefix appended (`bucket/prefix`).""" 169 if self.prefix: 170 return f"{self.bucket}/{self.prefix}" 171 return self.bucket 172 173 # -- Parsing ------------------------------------------------------------- 174 175 @classmethod 176 def parse(cls, target: str) -> RegistryStore: 177 """Parse a store target string. 178 179 Accepted formats: 180 181 "coral:dev" 182 183 "coral:prod" 184 "coral:dev/aj-test100" 185 "sonar:prod" 186 187 Raises: 188 ValueError: If the string cannot be parsed or references an 189 unknown store type / environment. 190 """ 191 if ":" not in target: 192 raise ValueError( 193 f"Invalid store target '{target}'. " 194 "Expected format: '<store_type>:<env>[/<prefix>]' " 195 "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')." 196 ) 197 198 store_part, env_part = target.split(":", 1) 199 200 # Validate store type 201 store_part_lower = store_part.lower() 202 valid_types = {t.value: t for t in StoreType} 203 if store_part_lower not in valid_types: 204 raise ValueError( 205 f"Unknown store type '{store_part}'. " 206 f"Expected one of: {', '.join(sorted(valid_types))}." 207 ) 208 store_type = valid_types[store_part_lower] 209 210 # Split env and prefix 211 env_key, _, prefix = env_part.partition("/") 212 prefix = prefix.strip("/") 213 214 # Validate env 215 env_map = BUCKET_MAP.get(store_type, {}) 216 if env_key not in env_map: 217 raise ValueError( 218 f"Unknown environment '{env_key}' for store type '{store_type.value}'. " 219 f"Expected one of: {', '.join(sorted(env_map))}." 220 ) 221 222 return cls(store_type=store_type, env=env_key, prefix=prefix)
Parsed store target (type + environment + optional prefix).
Examples::
RegistryStore.parse("coral:dev")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="")
RegistryStore.parse("coral:dev/aj-test")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test")
RegistryStore.parse("sonar:prod")
# -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="")
152 @property 153 def bucket(self) -> str: 154 """Resolve the concrete bucket name for this target.""" 155 env_map = BUCKET_MAP.get(self.store_type) 156 if env_map is None: 157 raise ValueError(f"Unknown store type: {self.store_type!r}") 158 bucket_name = env_map.get(self.env) 159 if bucket_name is None: 160 raise ValueError( 161 f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. " 162 f"Expected one of: {', '.join(sorted(env_map))}." 163 ) 164 return bucket_name
Resolve the concrete bucket name for this target.
166 @property 167 def bucket_root(self) -> str: 168 """Bucket name with optional prefix appended (`bucket/prefix`).""" 169 if self.prefix: 170 return f"{self.bucket}/{self.prefix}" 171 return self.bucket
Bucket name with optional prefix appended (bucket/prefix).
175 @classmethod 176 def parse(cls, target: str) -> RegistryStore: 177 """Parse a store target string. 178 179 Accepted formats: 180 181 "coral:dev" 182 183 "coral:prod" 184 "coral:dev/aj-test100" 185 "sonar:prod" 186 187 Raises: 188 ValueError: If the string cannot be parsed or references an 189 unknown store type / environment. 190 """ 191 if ":" not in target: 192 raise ValueError( 193 f"Invalid store target '{target}'. " 194 "Expected format: '<store_type>:<env>[/<prefix>]' " 195 "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')." 196 ) 197 198 store_part, env_part = target.split(":", 1) 199 200 # Validate store type 201 store_part_lower = store_part.lower() 202 valid_types = {t.value: t for t in StoreType} 203 if store_part_lower not in valid_types: 204 raise ValueError( 205 f"Unknown store type '{store_part}'. " 206 f"Expected one of: {', '.join(sorted(valid_types))}." 207 ) 208 store_type = valid_types[store_part_lower] 209 210 # Split env and prefix 211 env_key, _, prefix = env_part.partition("/") 212 prefix = prefix.strip("/") 213 214 # Validate env 215 env_map = BUCKET_MAP.get(store_type, {}) 216 if env_key not in env_map: 217 raise ValueError( 218 f"Unknown environment '{env_key}' for store type '{store_type.value}'. " 219 f"Expected one of: {', '.join(sorted(env_map))}." 220 ) 221 222 return cls(store_type=store_type, env=env_key, prefix=prefix)
Parse a store target string.
Accepted formats:
"coral:dev"
"coral:prod" "coral:dev/aj-test100" "sonar:prod"
Raises:
- ValueError: If the string cannot be parsed or references an unknown store type / environment.
59class StoreType(str, Enum): 60 """Registry store type identifier.""" 61 62 SONAR = "sonar" 63 CORAL = "coral" 64 65 # -- Auto-detection class methods ---------------------------------------- 66 67 @classmethod 68 def get_from_connector_name(cls, name: str) -> StoreType: 69 """Infer the store type from a connector's technical name. 70 71 Connectors whose name starts with `source-` or `destination-` belong 72 to the **coral** registry. All other names belong to **sonar**. 73 74 Args: 75 name: Connector technical name (e.g. `"source-github"` or `"stripe"`). 76 77 Returns: 78 The inferred `StoreType`. 79 """ 80 if name.startswith("source-") or name.startswith("destination-"): 81 return cls.CORAL 82 return cls.SONAR 83 84 @classmethod 85 def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None: 86 """Infer the store type from a repository working directory. 87 88 Checks for well-known directory markers: 89 90 * **sonar** -- `integrations/` alongside `connector-sdk/` 91 * **coral** -- `airbyte-integrations/connectors/` 92 93 Args: 94 path: Directory to inspect. Defaults to `Path.cwd`. 95 96 Returns: 97 The inferred `StoreType`, or `None` if the directory does 98 not match any known registry repository layout. 99 """ 100 if path is None: 101 path = Path.cwd() 102 103 # Sonar markers 104 if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir(): 105 return cls.SONAR 106 107 # Coral markers 108 if (path / "airbyte-integrations" / "connectors").is_dir(): 109 return cls.CORAL 110 111 return None
Registry store type identifier.
67 @classmethod 68 def get_from_connector_name(cls, name: str) -> StoreType: 69 """Infer the store type from a connector's technical name. 70 71 Connectors whose name starts with `source-` or `destination-` belong 72 to the **coral** registry. All other names belong to **sonar**. 73 74 Args: 75 name: Connector technical name (e.g. `"source-github"` or `"stripe"`). 76 77 Returns: 78 The inferred `StoreType`. 79 """ 80 if name.startswith("source-") or name.startswith("destination-"): 81 return cls.CORAL 82 return cls.SONAR
Infer the store type from a connector's technical name.
Connectors whose name starts with source- or destination- belong
to the coral registry. All other names belong to sonar.
Arguments:
- name: Connector technical name (e.g.
"source-github"or"stripe").
Returns:
The inferred
StoreType.
84 @classmethod 85 def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None: 86 """Infer the store type from a repository working directory. 87 88 Checks for well-known directory markers: 89 90 * **sonar** -- `integrations/` alongside `connector-sdk/` 91 * **coral** -- `airbyte-integrations/connectors/` 92 93 Args: 94 path: Directory to inspect. Defaults to `Path.cwd`. 95 96 Returns: 97 The inferred `StoreType`, or `None` if the directory does 98 not match any known registry repository layout. 99 """ 100 if path is None: 101 path = Path.cwd() 102 103 # Sonar markers 104 if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir(): 105 return cls.SONAR 106 107 # Coral markers 108 if (path / "airbyte-integrations" / "connectors").is_dir(): 109 return cls.CORAL 110 111 return None
Infer the store type from a repository working directory.
Checks for well-known directory markers:
- sonar --
integrations/alongsideconnector-sdk/ - coral --
airbyte-integrations/connectors/
Arguments:
- path: Directory to inspect. Defaults to
Path.cwd.
Returns:
The inferred
StoreType, orNoneif the directory does not match any known registry repository layout.
14class SupportLevel(StrEnum): 15 """Connector support levels ordered by precedence.""" 16 17 ARCHIVED = "archived" 18 COMMUNITY = "community" 19 CERTIFIED = "certified" 20 21 @property 22 def precedence(self) -> int: 23 """Numeric precedence for ordering comparisons. 24 25 Higher values indicate higher support commitment. 26 """ 27 return _SUPPORT_LEVEL_PRECEDENCE[self] 28 29 @classmethod 30 def from_precedence(cls, precedence: int) -> SupportLevel: 31 """Look up a `SupportLevel` by its numeric precedence value. 32 33 Raises `ValueError` when the precedence is not recognised. 34 """ 35 for member in cls: 36 if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence: 37 return member 38 valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 39 raise ValueError( 40 f"Unrecognized support-level precedence: {precedence!r}. " 41 f"Expected one of: {valid}." 42 ) from None 43 44 @classmethod 45 def parse(cls, value: str) -> SupportLevel: 46 """Parse a string into a `SupportLevel`. 47 48 Accepts a keyword (`archived`, `community`, `certified`) 49 or a legacy integer string (`100`, `200`, `300`). 50 51 Raises `ValueError` when the value is not recognised. 52 """ 53 try: 54 return cls(value) 55 except ValueError: 56 pass 57 # Fallback: try interpreting as an integer precedence value. 58 try: 59 return cls.from_precedence(int(value)) 60 except (ValueError, KeyError): 61 pass 62 valid_kw = ", ".join(f"`{m.value}`" for m in cls) 63 valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 64 raise ValueError( 65 f"Unrecognized support level: {value!r}. " 66 f"Expected keyword ({valid_kw}) or integer ({valid_int})." 67 ) from None
Connector support levels ordered by precedence.
21 @property 22 def precedence(self) -> int: 23 """Numeric precedence for ordering comparisons. 24 25 Higher values indicate higher support commitment. 26 """ 27 return _SUPPORT_LEVEL_PRECEDENCE[self]
Numeric precedence for ordering comparisons.
Higher values indicate higher support commitment.
29 @classmethod 30 def from_precedence(cls, precedence: int) -> SupportLevel: 31 """Look up a `SupportLevel` by its numeric precedence value. 32 33 Raises `ValueError` when the precedence is not recognised. 34 """ 35 for member in cls: 36 if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence: 37 return member 38 valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 39 raise ValueError( 40 f"Unrecognized support-level precedence: {precedence!r}. " 41 f"Expected one of: {valid}." 42 ) from None
Look up a SupportLevel by its numeric precedence value.
Raises ValueError when the precedence is not recognised.
44 @classmethod 45 def parse(cls, value: str) -> SupportLevel: 46 """Parse a string into a `SupportLevel`. 47 48 Accepts a keyword (`archived`, `community`, `certified`) 49 or a legacy integer string (`100`, `200`, `300`). 50 51 Raises `ValueError` when the value is not recognised. 52 """ 53 try: 54 return cls(value) 55 except ValueError: 56 pass 57 # Fallback: try interpreting as an integer precedence value. 58 try: 59 return cls.from_precedence(int(value)) 60 except (ValueError, KeyError): 61 pass 62 valid_kw = ", ".join(f"`{m.value}`" for m in cls) 63 valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 64 raise ValueError( 65 f"Unrecognized support level: {value!r}. " 66 f"Expected keyword ({valid_kw}) or integer ({valid_int})." 67 ) from None
Parse a string into a SupportLevel.
Accepts a keyword (archived, community, certified)
or a legacy integer string (100, 200, 300).
Raises ValueError when the value is not recognised.
29@dataclass 30class UnpublishedConnector: 31 """A connector whose current local version is not published on GCS.""" 32 33 connector_name: str 34 local_version: str
A connector whose current local version is not published on GCS.
37@dataclass(frozen=True) 38class ValidateOptions: 39 """Options that influence which validators run and how.""" 40 41 docs_path: str | None = None 42 """Path to the connector's documentation file (for `validate_docs_path_exists`).""" 43 44 is_prerelease: bool = False 45 """Whether this is a pre-release build (skips version-decrement checks)."""
Options that influence which validators run and how.
48@dataclass 49class ValidationResult: 50 """Aggregate result from running all validators.""" 51 52 passed: bool = True 53 errors: list[str] = field(default_factory=list) 54 validators_run: int = 0 55 56 def add_error(self, message: str) -> None: 57 self.passed = False 58 self.errors.append(message)
Aggregate result from running all validators.
114class VersionListResult(BaseModel): 115 """Result of listing versions for a connector.""" 116 117 connector_name: str = Field(description="The connector technical name") 118 bucket_name: str = Field(description="The GCS bucket name") 119 version_count: int = Field(description="Number of versions found") 120 versions: list[str] = Field(description="List of version strings")
Result of listing versions for a connector.
30@dataclass 31class YankResult: 32 """Result of a yank or unyank operation.""" 33 34 connector_name: str 35 version: str 36 bucket_name: str 37 action: str # "yank" or "unyank" 38 success: bool 39 message: str 40 dry_run: bool = False 41 42 def to_dict(self) -> dict[str, Any]: 43 """Convert to a dictionary for JSON serialization.""" 44 return { 45 "connector_name": self.connector_name, 46 "version": self.version, 47 "bucket_name": self.bucket_name, 48 "action": self.action, 49 "success": self.success, 50 "message": self.message, 51 "dry_run": self.dry_run, 52 }
Result of a yank or unyank operation.
42 def to_dict(self) -> dict[str, Any]: 43 """Convert to a dictionary for JSON serialization.""" 44 return { 45 "connector_name": self.connector_name, 46 "version": self.version, 47 "bucket_name": self.bucket_name, 48 "action": self.action, 49 "success": self.success, 50 "message": self.message, 51 "dry_run": self.dry_run, 52 }
Convert to a dictionary for JSON serialization.
1510def compile_registry( 1511 *, 1512 store: RegistryStore, 1513 connector_name: list[str] | None = None, 1514 dry_run: bool = False, 1515 with_secrets_mask: bool = False, 1516 with_legacy_migration: str | None = None, 1517 with_metrics: bool = True, 1518 force: bool = False, 1519) -> CompileResult: 1520 """Compile the registry: sync latest/ dirs and write index files. 1521 1522 Steps: 1523 1. Glob for all `metadata.yaml` to discover (connector, version) pairs. 1524 2. Glob for `version-yank.yml` to build the yanked set. 1525 3. Compute the latest GA semver per connector. 1526 4. Glob for `version=*` markers in `latest/` dirs for a fast check. 1527 5. Delete stale `latest/` dirs and recursively copy the versioned dir. 1528 5m. (Optional) Legacy migration: delete disabled registry entries. 1529 6. Write global registry JSONs and per-connector `versions.json`. 1530 6c. (Optional) Regenerate `specs_secrets_mask.yaml`. 1531 1532 Args: 1533 store: Registry store (bucket + optional prefix). 1534 connector_name: If provided, only resync `latest/` directories for 1535 these connectors (steps 4-5). Index rebuilds (steps 6a-6c) 1536 always operate on the full set of connectors so that global 1537 registry files remain complete. 1538 dry_run: If True, report what would be done without writing. 1539 with_secrets_mask: If True, regenerate `specs_secrets_mask.yaml`. 1540 with_legacy_migration: If set, run the named migration step. 1541 Currently supported: `"v1"` — delete `{registry_type}.json` 1542 files for connectors whose `registryOverrides.{registry}.enabled` 1543 is `false`. 1544 with_metrics: If True, inject latest connector metrics from the 1545 analytics JSONL export into `generated.metrics`. 1546 force: If True, resync all connectors' latest/ directories even if the 1547 existing version marker matches the computed latest version. This 1548 is useful when metadata content changes without a version bump. 1549 1550 Returns: 1551 A `CompileResult` describing what was done. 1552 """ 1553 if with_legacy_migration and with_legacy_migration not in LEGACY_MIGRATION_VERSIONS: 1554 raise ValueError( 1555 f"Unknown legacy migration version: {with_legacy_migration!r}. " 1556 f"Supported: {', '.join(LEGACY_MIGRATION_VERSIONS)}" 1557 ) 1558 1559 result = CompileResult(target=store.bucket_root, dry_run=dry_run) 1560 1561 token = get_gcs_credentials_token() 1562 fs = gcsfs.GCSFileSystem(token=token) 1563 1564 # --- Step 1 + 2: Scan versions and yanks --- 1565 # Always scan ALL connectors so that index rebuilds (step 6) are complete. 1566 _log_progress("Step 1-2: Scanning versions and yank markers...") 1567 connector_versions, yanked = _scan_versions_and_yanks( 1568 fs, 1569 store=store, 1570 connector_name=None, 1571 ) 1572 result.connectors_scanned = len(connector_versions) 1573 result.versions_found = sum(len(v) for v in connector_versions.values()) 1574 result.yanked_versions = len(yanked) 1575 _log_progress( 1576 " Found %d connectors, %d versions, %d yanked", 1577 result.connectors_scanned, 1578 result.versions_found, 1579 result.yanked_versions, 1580 ) 1581 1582 # --- Step 2b: Scan release candidates --- 1583 _log_progress("Step 2b: Scanning for active release candidates...") 1584 rc_versions = _scan_release_candidates( 1585 fs, 1586 store=store, 1587 connector_name=connector_name, 1588 ) 1589 _log_progress(" Found %d active release candidates", len(rc_versions)) 1590 1591 # --- Step 3: Compute latest --- 1592 _log_progress("Step 3: Computing latest GA version per connector...") 1593 latest_versions = _compute_latest_versions( 1594 connector_versions=connector_versions, 1595 yanked=yanked, 1596 fs=fs, 1597 store=store, 1598 ) 1599 _log_progress(" Computed latest for %d connectors", len(latest_versions)) 1600 1601 # --- Step 4: Check existing latest markers --- 1602 # When --connector-name is set, only check/sync those connectors (steps 4-5). 1603 # Index rebuilds in step 6 always use the full unfiltered data. 1604 if connector_name: 1605 connector_name_set = set(connector_name) 1606 sync_scope = { 1607 c: v for c, v in latest_versions.items() if c in connector_name_set 1608 } 1609 _log_progress( 1610 " --connector-name filter: syncing %d of %d connectors", 1611 len(sync_scope), 1612 len(latest_versions), 1613 ) 1614 else: 1615 sync_scope = latest_versions 1616 1617 _log_progress("Step 4: Checking existing latest/ markers...") 1618 sync_scope_names = list(sync_scope) if connector_name else None 1619 existing_markers = _scan_latest_markers( 1620 fs, 1621 store=store, 1622 connector_name=sync_scope_names, 1623 ) 1624 _log_progress(" Found %d existing markers", len(existing_markers)) 1625 1626 stale_connectors: list[str] = [] 1627 for connector, expected_version in sync_scope.items(): 1628 current_marker = existing_markers.get(connector) 1629 if not force and current_marker == expected_version: 1630 result.latest_already_current += 1 1631 else: 1632 stale_connectors.append(connector) 1633 1634 _log_progress( 1635 " %d connectors need latest/ update, %d already current", 1636 len(stale_connectors), 1637 result.latest_already_current, 1638 ) 1639 1640 # --- Step 5: Resync stale latest/ dirs (parallel) --- 1641 if stale_connectors: 1642 _log_progress( 1643 "Step 5: Syncing %d stale latest/ directories (max_workers=%d)...", 1644 len(stale_connectors), 1645 _COMPILE_SYNC_MAX_WORKERS, 1646 ) 1647 1648 def _sync_one_connector(connector: str) -> None: 1649 """Sync a single connector's latest/ dir.""" 1650 version = latest_versions[connector] 1651 _sync_latest_dir( 1652 fs, 1653 store=store, 1654 connector=connector, 1655 version=version, 1656 dry_run=dry_run, 1657 ) 1658 if not dry_run: 1659 _apply_overrides_to_latest_entry( 1660 fs, 1661 store=store, 1662 connector=connector, 1663 version=version, 1664 ) 1665 1666 sorted_stale = sorted(stale_connectors) 1667 with ThreadPoolExecutor(max_workers=_COMPILE_SYNC_MAX_WORKERS) as pool: 1668 futures = {pool.submit(_sync_one_connector, c): c for c in sorted_stale} 1669 for i, future in enumerate(as_completed(futures), 1): 1670 connector = futures[future] 1671 try: 1672 future.result() 1673 result.latest_updated += 1 1674 except Exception as exc: 1675 error_msg = f"Failed to sync latest/ for {connector}: {exc}" 1676 logger.error(error_msg) 1677 result.errors.append(error_msg) 1678 # Delete the (possibly partial) latest/ dir so the next 1679 # compile retries this connector from scratch. 1680 try: 1681 _delete_latest_dir( 1682 fs, 1683 store=store, 1684 connector=connector, 1685 ) 1686 logger.info( 1687 "Cleaned up partial latest/ for %s after failure", 1688 connector, 1689 ) 1690 except Exception as cleanup_exc: 1691 logger.warning( 1692 "Could not clean up latest/ for %s: %s", 1693 connector, 1694 cleanup_exc, 1695 ) 1696 if i % 100 == 0: 1697 _log_progress(" Synced %d / %d...", i, len(sorted_stale)) 1698 else: 1699 _log_progress("Step 5: All latest/ directories are current, nothing to sync.") 1700 1701 # --- Step 5m: Legacy migration (optional) --- 1702 if with_legacy_migration == "v1": 1703 _log_progress( 1704 "Step 5m: Legacy migration v1 — deleting disabled registry entries..." 1705 ) 1706 migration_deleted = _cleanup_disabled_registry_entries( 1707 fs, 1708 store=store, 1709 connector_versions=connector_versions, 1710 dry_run=dry_run, 1711 ) 1712 total_deleted = sum(len(v) for v in migration_deleted.values()) 1713 if migration_deleted: 1714 for conn, paths in sorted(migration_deleted.items()): 1715 _log_progress( 1716 " %s: %s %d files", 1717 conn, 1718 "would delete" if dry_run else "deleted", 1719 len(paths), 1720 ) 1721 _log_progress( 1722 " Migration v1: %s %d files across %d connectors", 1723 "would delete" if dry_run else "deleted", 1724 total_deleted, 1725 len(migration_deleted), 1726 ) 1727 1728 # --- Step 5n: Read latest connector metrics (optional) --- 1729 metrics_bundle = None 1730 if with_metrics and store.store_type == StoreType.CORAL: 1731 _log_progress("Step 5n: Reading latest connector metrics JSONL...") 1732 try: 1733 metrics_bundle = read_latest_connector_metrics() 1734 result.metrics_source = metrics_bundle.blob_path 1735 result.metrics_connector_count = metrics_bundle.connector_count 1736 if metrics_bundle.blob_path: 1737 _log_progress( 1738 " Loaded metrics for %d connectors from gs://%s", 1739 metrics_bundle.connector_count, 1740 metrics_bundle.blob_path, 1741 ) 1742 else: 1743 _log_progress(" No connector metrics JSONL file found.") 1744 except Exception as exc: 1745 error_msg = f"Failed to read connector metrics JSONL: {exc}" 1746 logger.warning(error_msg) 1747 result.metrics_error = error_msg 1748 _log_progress(" %s", error_msg) 1749 elif with_metrics: 1750 _log_progress("Step 5n: Skipping connector metrics for non-coral registry.") 1751 else: 1752 _log_progress("Step 5n: Connector metrics injection disabled.") 1753 1754 # --- Step 6a: Compile global registry JSONs --- 1755 _log_progress("Step 6a: Compiling global registry JSON files...") 1756 all_registry_entries: list[dict[str, Any]] = [] # collected for Step 6c 1757 entries_by_registry_type: dict[str, list[dict[str, Any]]] = {} 1758 for registry_type in VALID_REGISTRIES: 1759 entries = _compile_global_registry( 1760 fs, 1761 store=store, 1762 latest_versions=latest_versions, 1763 registry_type=registry_type, 1764 ) 1765 1766 # Inject release candidate info into entries that have active RCs. 1767 # For each connector with a release_candidate/metadata.yaml, read the 1768 # RC version's {registry_type}.json and add it under 1769 # releases.releaseCandidates[version] — matching the legacy format. 1770 if rc_versions: 1771 rc_entries: dict[str, dict[str, Any]] = {} 1772 for connector, rc_ver in rc_versions.items(): 1773 rc_entry = _read_rc_registry_entry( 1774 fs, 1775 store=store, 1776 connector=connector, 1777 rc_version=rc_ver, 1778 registry_type=registry_type, 1779 ) 1780 if rc_entry: 1781 docker_repo = rc_entry.get( 1782 "dockerRepository", 1783 f"airbyte/{connector}", 1784 ) 1785 rc_entries[docker_repo] = { 1786 "version": rc_ver, 1787 "entry": rc_entry, 1788 } 1789 if rc_entries: 1790 entries = _apply_release_candidates_to_entries(entries, rc_entries) 1791 _log_progress( 1792 " Injected %d release candidates into %s registry", 1793 len(rc_entries), 1794 registry_type, 1795 ) 1796 1797 if metrics_bundle is not None: 1798 injected = apply_metrics_to_registry_entries(entries, metrics_bundle) 1799 result.metrics_registry_entries += injected 1800 _log_progress( 1801 " Injected metrics into %d %s registry entries", 1802 injected, 1803 registry_type, 1804 ) 1805 1806 all_registry_entries.extend(entries) 1807 entries_by_registry_type[registry_type] = entries 1808 registry_json = _build_global_registry_json(entries) 1809 entry_count = len(registry_json["sources"]) + len(registry_json["destinations"]) 1810 1811 if registry_type == "cloud": 1812 result.cloud_registry_entries = entry_count 1813 else: 1814 result.oss_registry_entries = entry_count 1815 1816 if dry_run: 1817 _log_progress( 1818 " [DRY RUN] Would write %s_registry.json (%d entries)", 1819 registry_type, 1820 entry_count, 1821 ) 1822 else: 1823 content = json.dumps(registry_json, indent=2, sort_keys=True) + "\n" 1824 path_prefix = f"{store.prefix}/" if store.prefix else "" 1825 _write_gcs_blob_with_custom_ttl( 1826 bucket_name=store.bucket, 1827 blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/{registry_type}_registry.json", 1828 content=content, 1829 cache_control=_REGISTRY_INDEX_CACHE_CONTROL, 1830 ) 1831 _log_progress( 1832 " Wrote %s_registry.json (%d entries)", 1833 registry_type, 1834 entry_count, 1835 ) 1836 1837 # --- Step 6a.2: Compile composite registry JSON (superset) --- 1838 _log_progress("Step 6a.2: Compiling composite_registry.json (superset)...") 1839 composite_json = _build_composite_registry_json( 1840 cloud_entries=entries_by_registry_type.get("cloud", []), 1841 oss_entries=entries_by_registry_type.get("oss", []), 1842 ) 1843 composite_entry_count = len(composite_json["sources"]) + len( 1844 composite_json["destinations"] 1845 ) 1846 result.composite_registry_entries = composite_entry_count 1847 if dry_run: 1848 _log_progress( 1849 " [DRY RUN] Would write composite_registry.json (%d entries)", 1850 composite_entry_count, 1851 ) 1852 else: 1853 composite_content = json.dumps(composite_json, indent=2, sort_keys=True) + "\n" 1854 path_prefix = f"{store.prefix}/" if store.prefix else "" 1855 _write_gcs_blob_with_custom_ttl( 1856 bucket_name=store.bucket, 1857 blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/composite_registry.json", 1858 content=composite_content, 1859 cache_control=_REGISTRY_INDEX_CACHE_CONTROL, 1860 ) 1861 _log_progress( 1862 " Wrote composite_registry.json (%d entries)", 1863 composite_entry_count, 1864 ) 1865 1866 # --- Step 6b: Per-connector version indexes (parallel) --- 1867 _log_progress( 1868 "Step 6b: Writing per-connector version indexes (max_workers=%d)...", 1869 _COMPILE_WRITE_MAX_WORKERS, 1870 ) 1871 base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte" 1872 sorted_connectors = sorted(connector_versions) 1873 1874 def _write_one_version_index(connector: str) -> None: 1875 """Build and write a single connector's versions.json.""" 1876 versions = connector_versions[connector] 1877 latest_v = latest_versions.get(connector) 1878 rc_v = rc_versions.get(connector) 1879 index = _build_version_index( 1880 fs, 1881 store=store, 1882 connector=connector, 1883 versions=versions, 1884 yanked=yanked, 1885 latest_version=latest_v, 1886 rc_version=rc_v, 1887 ) 1888 index_path = f"{base}/{connector}/versions.json" 1889 if dry_run: 1890 _log_progress( 1891 " [DRY RUN] Would write %s/versions.json (%d versions)", 1892 connector, 1893 len(versions), 1894 ) 1895 else: 1896 content = json.dumps(index, indent=2, sort_keys=True) + "\n" 1897 with fs.open(index_path, "w") as f: 1898 f.write(content) 1899 1900 with ThreadPoolExecutor(max_workers=_COMPILE_WRITE_MAX_WORKERS) as pool: 1901 futures = { 1902 pool.submit(_write_one_version_index, c): c for c in sorted_connectors 1903 } 1904 for i, future in enumerate(as_completed(futures), 1): 1905 connector = futures[future] 1906 try: 1907 future.result() 1908 result.version_indexes_written += 1 1909 except Exception as exc: 1910 error_msg = f"Failed to write versions.json for {connector}: {exc}" 1911 logger.error(error_msg) 1912 result.errors.append(error_msg) 1913 if i % 100 == 0: 1914 _log_progress( 1915 " Wrote %d / %d version indexes...", i, len(sorted_connectors) 1916 ) 1917 1918 # --- Step 6c: Specs secrets mask (optional) --- 1919 if with_secrets_mask: 1920 _log_progress("Step 6c: Generating specs secrets mask...") 1921 # Reuse entries collected during Step 6a to avoid redundant GCS reads. 1922 secret_names = _extract_secret_property_names(all_registry_entries) 1923 sorted_names = sorted(secret_names) 1924 result.specs_secrets_mask_properties = len(sorted_names) 1925 mask_content = yaml.dump({"properties": sorted_names}, default_flow_style=False) 1926 mask_path = ( 1927 f"{store.bucket_root}/{_REGISTRIES_PREFIX}/{_SPECS_SECRETS_MASK_FILENAME}" 1928 ) 1929 1930 _log_progress( 1931 " Found %d secret properties: %s", 1932 len(sorted_names), 1933 ", ".join(sorted_names), 1934 ) 1935 1936 if dry_run: 1937 _log_progress( 1938 " [DRY RUN] Would write %s", 1939 _SPECS_SECRETS_MASK_FILENAME, 1940 ) 1941 else: 1942 with fs.open(mask_path, "w") as f: 1943 f.write(mask_content) 1944 _log_progress( 1945 " Wrote %s", 1946 _SPECS_SECRETS_MASK_FILENAME, 1947 ) 1948 1949 _log_progress(result.summary()) 1950 return result
Compile the registry: sync latest/ dirs and write index files.
Steps:
- Glob for all
metadata.yamlto discover (connector, version) pairs.- Glob for
version-yank.ymlto build the yanked set.- Compute the latest GA semver per connector.
- Glob for
version=*markers inlatest/dirs for a fast check.- Delete stale
latest/dirs and recursively copy the versioned dir. 5m. (Optional) Legacy migration: delete disabled registry entries.- Write global registry JSONs and per-connector
versions.json. 6c. (Optional) Regeneratespecs_secrets_mask.yaml.
Arguments:
- store: Registry store (bucket + optional prefix).
- connector_name: If provided, only resync
latest/directories for these connectors (steps 4-5). Index rebuilds (steps 6a-6c) always operate on the full set of connectors so that global registry files remain complete. - dry_run: If True, report what would be done without writing.
- with_secrets_mask: If True, regenerate
specs_secrets_mask.yaml. - with_legacy_migration: If set, run the named migration step.
Currently supported:
"v1"— delete{registry_type}.jsonfiles for connectors whoseregistryOverrides.{registry}.enabledisfalse. - with_metrics: If True, inject latest connector metrics from the
analytics JSONL export into
generated.metrics. - force: If True, resync all connectors' latest/ directories even if the existing version marker matches the computed latest version. This is useful when metadata content changes without a version bump.
Returns:
A
CompileResultdescribing what was done.
108def create_progressive_rollout_blob( 109 repo_path: Path, 110 connector_name: str, 111 dry_run: bool = False, 112 bucket_name: str | None = None, 113) -> ConnectorPublishResult: 114 """Create the `release_candidate/metadata.yaml` blob in GCS. 115 116 Copies the versioned metadata (e.g. `1.2.3-rc.1/metadata.yaml` or 117 `2.1.0/metadata.yaml` for GA progressive rollouts) to 118 `release_candidate/metadata.yaml` so the platform knows a progressive 119 rollout is active for this connector. 120 121 The connector's `metadata.yaml` on disk must declare a version that 122 is valid for progressive rollout (i.e. not a `-preview` build). 123 The versioned blob must already exist in GCS (i.e. the version must 124 have been published first). 125 126 Requires `GCS_CREDENTIALS` environment variable to be set. 127 """ 128 if not bucket_name: 129 raise ValueError("bucket_name is required for progressive rollout create.") 130 131 metadata = get_connector_metadata(repo_path, connector_name) 132 version = metadata.docker_image_tag 133 docker_repo = metadata.docker_repository 134 135 if not is_valid_for_progressive_rollout(version): 136 return ConnectorPublishResult( 137 connector=metadata.name, 138 version=version, 139 action="progressive-rollout-create", 140 status="failure", 141 docker_image=f"{docker_repo}:{version}", 142 registry_updated=False, 143 message=f"Version '{version}' is not valid for progressive rollout. " 144 "Preview versions are not supported.", 145 ) 146 147 gcp_connector_dir = f"{METADATA_FOLDER}/{docker_repo}" 148 versioned_path = f"{gcp_connector_dir}/{version}/{METADATA_FILE_NAME}" 149 rc_path = ( 150 f"{gcp_connector_dir}/{RELEASE_CANDIDATE_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}" 151 ) 152 153 if dry_run: 154 return ConnectorPublishResult( 155 connector=metadata.name, 156 version=version, 157 action="progressive-rollout-create", 158 status="dry-run", 159 docker_image=f"{docker_repo}:{version}", 160 registry_updated=False, 161 message=f"Would copy {versioned_path} to {rc_path}", 162 ) 163 164 storage_client = get_gcs_storage_client() 165 gcs_bucket = storage_client.bucket(bucket_name) 166 167 # Verify the versioned blob exists 168 versioned_blob = gcs_bucket.blob(versioned_path) 169 if not versioned_blob.exists(): 170 return ConnectorPublishResult( 171 connector=metadata.name, 172 version=version, 173 action="progressive-rollout-create", 174 status="failure", 175 docker_image=f"{docker_repo}:{version}", 176 registry_updated=False, 177 message=f"Versioned metadata not found: {versioned_path}. " 178 "The version must be published before creating the progressive rollout marker.", 179 ) 180 181 # Copy the versioned metadata to the release_candidate/ path 182 gcs_bucket.copy_blob(versioned_blob, gcs_bucket, rc_path) 183 logger.info( 184 "Created progressive rollout metadata blob: %s (copied from %s)", 185 rc_path, 186 versioned_path, 187 ) 188 189 return ConnectorPublishResult( 190 connector=metadata.name, 191 version=version, 192 action="progressive-rollout-create", 193 status="success", 194 docker_image=f"{docker_repo}:{version}", 195 registry_updated=True, 196 message=f"Created release_candidate/ blob for {metadata.name} at {rc_path} " 197 f"(copied from {versioned_path}).", 198 )
Create the release_candidate/metadata.yaml blob in GCS.
Copies the versioned metadata (e.g. 1.2.3-rc.1/metadata.yaml or
2.1.0/metadata.yaml for GA progressive rollouts) to
release_candidate/metadata.yaml so the platform knows a progressive
rollout is active for this connector.
The connector's metadata.yaml on disk must declare a version that
is valid for progressive rollout (i.e. not a -preview build).
The versioned blob must already exist in GCS (i.e. the version must
have been published first).
Requires GCS_CREDENTIALS environment variable to be set.
201def delete_progressive_rollout_blob( 202 repo_path: Path, 203 connector_name: str, 204 dry_run: bool = False, 205 bucket_name: str | None = None, 206) -> ConnectorPublishResult: 207 """Delete the `release_candidate/` metadata blob from GCS. 208 209 This is the only GCS operation needed when finalizing a progressive 210 rollout (both promote and rollback). The versioned blob (e.g. 211 `1.2.3-rc.1/metadata.yaml`) is intentionally preserved as an 212 audit trail of what was actually deployed during the rollout. 213 214 Requires `GCS_CREDENTIALS` environment variable to be set. 215 """ 216 if not bucket_name: 217 raise ValueError("bucket_name is required for progressive rollout cleanup.") 218 219 metadata = get_connector_metadata(repo_path, connector_name) 220 version = metadata.docker_image_tag 221 docker_repo = metadata.docker_repository 222 223 # NOTE: We intentionally do NOT gate on version format here. 224 # In the promote workflow the on-disk version will already be bumped 225 # to GA (e.g. 1.2.3) before cleanup runs. The rc_path is derived 226 # solely from docker_repo + the fixed RELEASE_CANDIDATE_GCS_FOLDER_NAME 227 # constant, so the on-disk version is irrelevant. 228 229 gcp_connector_dir = f"{METADATA_FOLDER}/{docker_repo}" 230 rc_path = ( 231 f"{gcp_connector_dir}/{RELEASE_CANDIDATE_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}" 232 ) 233 234 if dry_run: 235 return ConnectorPublishResult( 236 connector=metadata.name, 237 version=version, 238 action="progressive-rollout-cleanup", 239 status="dry-run", 240 docker_image=f"{docker_repo}:{version}", 241 registry_updated=False, 242 message=f"Would delete release_candidate/ blob for {metadata.name} " 243 f"(version {version}) at {rc_path}", 244 ) 245 246 storage_client = get_gcs_storage_client() 247 gcs_bucket = storage_client.bucket(bucket_name) 248 rc_blob = gcs_bucket.blob(rc_path) 249 250 if not rc_blob.exists(): 251 logger.info( 252 "Progressive rollout metadata already deleted (idempotent): %s", rc_path 253 ) 254 return ConnectorPublishResult( 255 connector=metadata.name, 256 version=version, 257 action="progressive-rollout-cleanup", 258 status="success", 259 docker_image=f"{docker_repo}:{version}", 260 registry_updated=False, 261 message=f"Progressive rollout metadata already deleted (no-op): {rc_path}", 262 ) 263 264 rc_blob.delete() 265 logger.info("Deleted progressive rollout metadata blob: %s", rc_path) 266 267 return ConnectorPublishResult( 268 connector=metadata.name, 269 version=version, 270 action="progressive-rollout-cleanup", 271 status="success", 272 docker_image=f"{docker_repo}:{version}", 273 registry_updated=True, 274 message=f"Deleted release_candidate/ blob for {metadata.name} at {rc_path}.", 275 )
Delete the release_candidate/ metadata blob from GCS.
This is the only GCS operation needed when finalizing a progressive
rollout (both promote and rollback). The versioned blob (e.g.
1.2.3-rc.1/metadata.yaml) is intentionally preserved as an
audit trail of what was actually deployed during the rollout.
Requires GCS_CREDENTIALS environment variable to be set.
90def find_unpublished_connectors( 91 repo_path: str | Path, 92 bucket_name: str, 93 connector_names: list[str] | None = None, 94) -> AuditResult: 95 """Find connectors whose local version is not published on GCS. 96 97 For each connector in the local checkout, reads `dockerImageTag` from 98 `metadata.yaml` and checks whether `metadata/<docker-repo>/<version>/metadata.yaml` 99 exists in the GCS bucket. Connectors that are archived, disabled on all 100 registries, or have RC versions are skipped. 101 102 Args: 103 repo_path: Path to the Airbyte monorepo checkout. 104 bucket_name: GCS bucket name to check against. 105 connector_names: Optional list of connector names to check. 106 If `None`, discovers all connectors in the repo. 107 108 Returns: 109 An `AuditResult` containing unpublished connectors and metadata. 110 """ 111 repo_path = Path(repo_path) 112 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 113 114 if not connectors_dir.exists(): 115 raise ValueError(f"Connectors directory not found: {connectors_dir}") 116 117 # Discover connector names if not provided 118 if connector_names is None: 119 connector_names = sorted( 120 d.name 121 for d in connectors_dir.iterdir() 122 if d.is_dir() and (d / METADATA_FILE_NAME).exists() 123 ) 124 125 result = AuditResult() 126 127 # Collect connectors and their versions first, then batch-check GCS 128 to_check: list[tuple[str, str]] = [] # (connector_name, version) 129 130 for name in connector_names: 131 metadata_path = connectors_dir / name / METADATA_FILE_NAME 132 metadata = _read_local_metadata(metadata_path) 133 if metadata is None: 134 result.errors.append(f"{name}: metadata.yaml not found or unreadable") 135 continue 136 137 if _is_archived(metadata): 138 result.skipped_archived.append(name) 139 continue 140 141 if _is_disabled_on_all_registries(metadata): 142 result.skipped_disabled.append(name) 143 continue 144 145 data = metadata.get("data", {}) 146 version = data.get("dockerImageTag") 147 if not version: 148 result.errors.append(f"{name}: no dockerImageTag in metadata") 149 continue 150 151 if _is_rc_version(version): 152 result.skipped_rc.append(name) 153 continue 154 155 to_check.append((name, version)) 156 157 if not to_check: 158 result.checked_count = 0 159 return result 160 161 # Check GCS for each connector version 162 storage_client = get_gcs_storage_client() 163 bucket = storage_client.bucket(bucket_name) 164 165 for name, version in to_check: 166 result.checked_count += 1 167 blob_path = f"{METADATA_FOLDER}/airbyte/{name}/{version}/{METADATA_FILE_NAME}" 168 blob = bucket.blob(blob_path) 169 170 try: 171 exists = blob.exists() 172 except Exception as e: 173 result.errors.append(f"{name}: GCS check failed: {e}") 174 continue 175 176 if not exists: 177 logger.info( 178 "Unpublished: %s version %s (checked %s)", 179 name, 180 version, 181 blob_path, 182 ) 183 result.unpublished.append( 184 UnpublishedConnector(connector_name=name, local_version=version) 185 ) 186 187 logger.info( 188 "Audit complete: %d checked, %d unpublished, %d archived-skipped, %d disabled-skipped, %d rc-skipped", 189 result.checked_count, 190 len(result.unpublished), 191 len(result.skipped_archived), 192 len(result.skipped_disabled), 193 len(result.skipped_rc), 194 ) 195 196 return result
Find connectors whose local version is not published on GCS.
For each connector in the local checkout, reads dockerImageTag from
metadata.yaml and checks whether metadata/<docker-repo>/<version>/metadata.yaml
exists in the GCS bucket. Connectors that are archived, disabled on all
registries, or have RC versions are skipped.
Arguments:
- repo_path: Path to the Airbyte monorepo checkout.
- bucket_name: GCS bucket name to check against.
- connector_names: Optional list of connector names to check.
If
None, discovers all connectors in the repo.
Returns:
An
AuditResultcontaining unpublished connectors and metadata.
644def generate_version_artifacts( 645 metadata_file: Path, 646 docker_image: str, 647 output_dir: Path | None = None, 648 repo_root: Path | None = None, 649 dry_run: bool = False, 650 with_validate: bool = True, 651 with_dependency_dump: bool = True, 652 with_sbom: bool = True, 653) -> GenerateResult: 654 """Generate all version artifacts for a connector release. 655 656 Artifacts are enriched with git commit info, SBOM URL, and (when applicable) 657 components SHA before writing. Validation is run after generation by default. 658 659 Args: 660 metadata_file: Path to the connector's `metadata.yaml`. 661 docker_image: Docker image to run spec against (e.g. `airbyte/source-faker:6.2.38`). 662 output_dir: Directory to write artifacts to. If `None`, a temp directory is created. 663 repo_root: Root of the Airbyte repo checkout (for resolving `doc.md`). 664 If `None`, inferred by walking up from `metadata_file`. 665 dry_run: If `True`, report what would be generated without writing or running docker. 666 with_validate: If `True` (default), run metadata validators after generation. 667 Pass `False` (`--no-validate`) to skip. 668 with_dependency_dump: If `True` (default), generate `dependencies.json` 669 for Python connectors. Pass `False` (`--no-dependency-dump`) to skip. 670 with_sbom: If `True` (default), generate `spdx.json` (SBOM) for 671 connectors. Pass `False` (`--no-sbom`) to skip. 672 673 Returns: 674 A `GenerateResult` describing what was produced. 675 """ 676 # --- Load metadata --- 677 if not metadata_file.exists(): 678 raise FileNotFoundError(f"Metadata file not found: {metadata_file}") 679 680 raw_metadata: dict[str, Any] = yaml.safe_load(metadata_file.read_text()) 681 metadata_data: dict[str, Any] = raw_metadata.get("data", {}) 682 683 connector_name = metadata_data.get("dockerRepository", "unknown").replace( 684 "airbyte/", "" 685 ) 686 version = metadata_data.get("dockerImageTag", "unknown") 687 688 # --- Resolve output directory --- 689 if output_dir is None: 690 output_dir = Path( 691 tempfile.mkdtemp(prefix=f"connector-artifacts-{connector_name}-{version}-") 692 ) 693 output_dir.mkdir(parents=True, exist_ok=True) 694 695 result = GenerateResult( 696 connector_name=connector_name, 697 version=version, 698 docker_image=docker_image, 699 output_dir=str(output_dir), 700 dry_run=dry_run, 701 ) 702 703 if dry_run: 704 logger.info("[DRY RUN] Would generate artifacts to %s", output_dir) 705 result.artifacts_written = [ 706 "metadata.yaml", 707 "icon.svg", 708 "doc.md", 709 "cloud.json", 710 "oss.json", 711 "manifest.yaml (if present)", 712 "components.zip (if components.py present)", 713 "components.zip.sha256 (if components.py present)", 714 f"version={version}", 715 ] 716 if with_sbom: 717 result.artifacts_written.append(SBOM_FILE_NAME) 718 if with_dependency_dump: 719 result.artifacts_written.append("dependencies.json (if Python connector)") 720 return result 721 722 # --- Prepare metadata output --- 723 metadata_out = output_dir / "metadata.yaml" 724 result.artifacts_written.append("metadata.yaml") 725 726 # --- Enrich metadata with git info *before* building registry entries so 727 # that `generated.git` propagates into `cloud.json` / `oss.json`. --- 728 raw_metadata = _enrich_metadata_git_info(raw_metadata, metadata_file) 729 730 # --- Generate SBOM from the connector Docker image --- 731 sbom_generated = False 732 if not with_sbom: 733 logger.info("SBOM generation disabled via --no-sbom.") 734 else: 735 try: 736 sbom_path = generate_sbom(docker_image, output_dir) 737 except RuntimeError as exc: 738 logger.warning("SBOM generation failed (non-fatal): %s", exc) 739 except (FileNotFoundError, subprocess.TimeoutExpired): 740 logger.warning("Docker not available or SBOM generation timed out.") 741 else: 742 result.artifacts_written.append(SBOM_FILE_NAME) 743 sbom_generated = True 744 logger.info("Generated SBOM: %s", sbom_path) 745 746 # --- Enrich metadata with SBOM URL --- 747 raw_metadata = _enrich_metadata_sbom_url( 748 raw_metadata, sbom_generated=sbom_generated 749 ) 750 751 # --- Run docker spec for cloud and oss --- 752 specs: dict[str, dict[str, Any]] = {} 753 for mode in VALID_REGISTRIES: 754 try: 755 specs[mode] = _run_docker_spec(docker_image, mode) 756 logger.info("Got %s spec from docker image %s", mode, docker_image) 757 except RuntimeError as exc: 758 error_msg = f"Failed to get {mode} spec: {exc}" 759 logger.error(error_msg) 760 result.errors.append(error_msg) 761 762 # --- Generate dependencies.json for Python connectors --- 763 # This must happen *before* building registry entries so that the 764 # local dependencies data can be used for packageInfo without a GCS 765 # round-trip. 766 local_dependencies: dict[str, Any] | None = None 767 if not with_dependency_dump: 768 logger.info("Dependency generation disabled via --no-dependency-dump.") 769 elif _is_python_connector(metadata_data): 770 logger.info("Python connector detected — generating dependencies.json") 771 local_dependencies = generate_python_dependencies_file( 772 metadata_data=metadata_data, 773 docker_image=docker_image, 774 output_dir=output_dir, 775 ) 776 if local_dependencies is not None: 777 result.artifacts_written.append(CONNECTOR_DEPENDENCY_FILE_NAME) 778 else: 779 logger.info( 780 "Non-Python connector (%s) — skipping dependencies.json generation.", 781 connector_name, 782 ) 783 784 # --- Generate registry entries (cloud.json, oss.json) --- 785 for registry_type in VALID_REGISTRIES: 786 if not is_registry_enabled(metadata_data, registry_type): 787 logger.info( 788 "Registry type %s is not enabled for %s, skipping %s.json generation.", 789 registry_type, 790 connector_name, 791 registry_type, 792 ) 793 continue 794 795 spec = specs.get(registry_type) 796 if spec is None: 797 error_msg = ( 798 f"Cannot generate {registry_type}.json: no spec available " 799 f"(docker spec for {registry_type} failed or was not run)." 800 ) 801 result.errors.append(error_msg) 802 continue 803 804 registry_entry = _build_registry_entry( 805 metadata_data, 806 registry_type, 807 spec, 808 local_dependencies=local_dependencies, 809 ) 810 811 out_path = output_dir / f"{registry_type}.json" 812 out_path.write_text( 813 json.dumps(registry_entry, indent=2, sort_keys=True, default=_json_serial) 814 + "\n" 815 ) 816 result.artifacts_written.append(f"{registry_type}.json") 817 logger.info("Wrote %s", out_path) 818 819 # --- Copy icon.svg (sibling of metadata.yaml in the connector directory) --- 820 icon_source = metadata_file.parent / "icon.svg" 821 if icon_source.is_file(): 822 icon_out = output_dir / "icon.svg" 823 shutil.copy2(icon_source, icon_out) 824 result.artifacts_written.append("icon.svg") 825 logger.info("Wrote %s", icon_out) 826 else: 827 logger.warning("No icon.svg found at %s.", icon_source) 828 result.errors.append("Icon file is missing.") 829 830 # --- Copy doc.md (derived from documentationUrl in metadata) --- 831 if repo_root is None: 832 # Infer repo root by walking up from metadata_file looking for .git 833 # Note: .git can be a directory (normal clone) or a file (git worktree) 834 # Resolve to absolute path first so the walk-up works with relative paths. 835 candidate = metadata_file.resolve().parent 836 while candidate != candidate.parent: 837 git_indicator = candidate / ".git" 838 if git_indicator.is_dir() or git_indicator.is_file(): 839 repo_root = candidate 840 break 841 candidate = candidate.parent 842 843 if repo_root is not None: 844 doc_source = _resolve_doc_path(metadata_data, repo_root) 845 if doc_source is not None and doc_source.is_file(): 846 doc_out = output_dir / DOC_FILE_NAME 847 shutil.copy2(doc_source, doc_out) 848 result.artifacts_written.append(DOC_FILE_NAME) 849 logger.info("Wrote %s (from %s)", doc_out, doc_source) 850 else: 851 error_msg = ( 852 f"Documentation file not found: {doc_source}. " 853 f"Derived from documentationUrl in metadata." 854 ) 855 logger.error(error_msg) 856 result.errors.append(error_msg) 857 else: 858 error_msg = "Cannot resolve doc.md: repo root not found." 859 logger.error(error_msg) 860 result.errors.append(error_msg) 861 862 # --- Copy manifest.yaml (from connector root, if present) --- 863 connector_dir = metadata_file.parent 864 manifest_source = connector_dir / MANIFEST_FILE_NAME 865 components_sha256: str | None = None 866 if manifest_source.is_file(): 867 manifest_out = output_dir / MANIFEST_FILE_NAME 868 shutil.copy2(manifest_source, manifest_out) 869 result.artifacts_written.append(MANIFEST_FILE_NAME) 870 logger.info("Wrote %s", manifest_out) 871 872 # --- Generate components.zip if components.py exists --- 873 components_source = connector_dir / COMPONENTS_PY_FILE_NAME 874 if components_source.is_file(): 875 zip_path, sha256_path = _create_components_zip( 876 manifest_path=manifest_source, 877 components_path=components_source, 878 output_dir=output_dir, 879 ) 880 result.artifacts_written.append(COMPONENTS_ZIP_FILE_NAME) 881 result.artifacts_written.append(COMPONENTS_ZIP_SHA256_FILE_NAME) 882 logger.info("Wrote %s and %s", zip_path, sha256_path) 883 # Read back the SHA256 for metadata enrichment 884 components_sha256 = sha256_path.read_text().strip() 885 else: 886 logger.info( 887 "No manifest.yaml at %s — skipping manifest artifacts.", manifest_source 888 ) 889 890 # --- Enrich metadata with components SHA (after zip creation) --- 891 raw_metadata = _enrich_metadata_components_sha(raw_metadata, components_sha256) 892 893 # --- Write final enriched metadata.yaml --- 894 # Use sort_keys=True to match the legacy pipeline's alphabetical key ordering. 895 # After Registry 2.0 launches we are free to change the key ordering. 896 metadata_out.write_text( 897 yaml.dump(raw_metadata, default_flow_style=False, sort_keys=True) 898 ) 899 logger.info("Wrote enriched %s", metadata_out) 900 901 # --- Write version marker file (version=<semver>) --- 902 # This zero-byte file is used by the compile step as a fast-check marker. 903 # Including it in the generated artifacts means `latest/` gets the marker 904 # for free via a recursive copy, removing the need for a separate write. 905 marker_file = output_dir / f"version={version}" 906 marker_file.write_bytes(b"") 907 result.artifacts_written.append(f"version={version}") 908 logger.info("Wrote version marker %s", marker_file) 909 910 # --- Validate metadata (after generation) --- 911 if with_validate: 912 logger.info("Running post-generation validation...") 913 doc_path: str | None = None 914 if repo_root is not None: 915 resolved = _resolve_doc_path(metadata_data, repo_root) 916 doc_path = str(resolved) if resolved else None 917 validation = validate_metadata( 918 metadata_data=metadata_data, 919 opts=ValidateOptions(docs_path=doc_path), 920 ) 921 if not validation.passed: 922 for err in validation.errors: 923 logger.error("Validation error: %s", err) 924 result.validation_errors = validation.errors 925 else: 926 logger.info("Validation passed (%d validators).", validation.validators_run) 927 928 return result
Generate all version artifacts for a connector release.
Artifacts are enriched with git commit info, SBOM URL, and (when applicable) components SHA before writing. Validation is run after generation by default.
Arguments:
- metadata_file: Path to the connector's
metadata.yaml. - docker_image: Docker image to run spec against (e.g.
airbyte/source-faker:6.2.38). - output_dir: Directory to write artifacts to. If
None, a temp directory is created. - repo_root: Root of the Airbyte repo checkout (for resolving
doc.md). IfNone, inferred by walking up frommetadata_file. - dry_run: If
True, report what would be generated without writing or running docker. - with_validate: If
True(default), run metadata validators after generation. PassFalse(--no-validate) to skip. - with_dependency_dump: If
True(default), generatedependencies.jsonfor Python connectors. PassFalse(--no-dependency-dump) to skip. - with_sbom: If
True(default), generatespdx.json(SBOM) for connectors. PassFalse(--no-sbom) to skip.
Returns:
A
GenerateResultdescribing what was produced.
41def get_connector_metadata(repo_path: Path, connector_name: str) -> ConnectorMetadata: 42 """Read connector metadata from metadata.yaml. 43 44 Args: 45 repo_path: Path to the Airbyte monorepo. 46 connector_name: The connector technical name (e.g., 'source-github'). 47 48 Returns: 49 ConnectorMetadata object with the connector's metadata. 50 51 Raises: 52 FileNotFoundError: If the connector directory or metadata file doesn't exist. 53 """ 54 connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name 55 if not connector_dir.exists(): 56 raise FileNotFoundError(f"Connector directory not found: {connector_dir}") 57 58 metadata_file = connector_dir / METADATA_FILE_NAME 59 if not metadata_file.exists(): 60 raise FileNotFoundError(f"Metadata file not found: {metadata_file}") 61 62 with open(metadata_file) as f: 63 metadata = yaml.safe_load(f) 64 65 data = metadata.get("data", {}) 66 return ConnectorMetadata( 67 name=connector_name, 68 docker_repository=data.get("dockerRepository", f"airbyte/{connector_name}"), 69 docker_image_tag=data.get("dockerImageTag", "unknown"), 70 support_level=data.get("supportLevel"), 71 definition_id=data.get("definitionId"), 72 )
Read connector metadata from metadata.yaml.
Arguments:
- repo_path: Path to the Airbyte monorepo.
- connector_name: The connector technical name (e.g., 'source-github').
Returns:
ConnectorMetadata object with the connector's metadata.
Raises:
- FileNotFoundError: If the connector directory or metadata file doesn't exist.
278def get_gcs_publish_path( 279 connector_name: str, 280 artifact_type: str, 281 version: str = LATEST_GCS_FOLDER_NAME, 282) -> str: 283 """Compute the GCS path for a connector artifact for publishing. 284 285 All connectors use the airbyte/{connector_name} convention. 286 """ 287 artifact_files = { 288 "metadata": METADATA_FILE_NAME, 289 "spec": "spec.json", 290 "icon": "icon.svg", 291 "doc": "doc.md", 292 } 293 294 if artifact_type not in artifact_files: 295 raise ValueError( 296 f"Unknown artifact type: {artifact_type}. " 297 f"Valid types are: {', '.join(artifact_files.keys())}" 298 ) 299 300 file_name = artifact_files[artifact_type] 301 return f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{file_name}"
Compute the GCS path for a connector artifact for publishing.
All connectors use the airbyte/{connector_name} convention.
203def get_registry(store: RegistryStore) -> Registry: 204 """Factory for obtaining the right store implementation.""" 205 206 if store.store_type == StoreType.CORAL: 207 from airbyte_ops_mcp.registry.coral_registry_store import CoralRegistry 208 209 return CoralRegistry(store) 210 211 if store.store_type == StoreType.SONAR: 212 from airbyte_ops_mcp.registry.sonar_registry_store import SonarRegistry 213 214 return SonarRegistry(store) 215 216 # defensive: StoreType is an Enum, but keep this for readability 217 raise ValueError(f"Unknown store type: {store.store_type}")
Factory for obtaining the right store implementation.
43def get_registry_entry( 44 connector_name: str, 45 bucket_name: str, 46 version: str = LATEST_GCS_FOLDER_NAME, 47) -> dict[str, Any]: 48 """Get a connector's registry entry from GCS. 49 50 Reads metadata for a connector from the registry stored in GCS. 51 52 Args: 53 connector_name: The connector name (e.g., "source-faker", "destination-postgres") 54 bucket_name: Name of the GCS bucket containing the registry 55 version: Version folder name (e.g., "latest", "1.2.3") 56 57 Returns: 58 dict: The connector's metadata as a dictionary 59 60 Raises: 61 ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure 62 FileNotFoundError: If the connector metadata is not found in the registry 63 yaml.YAMLError: If the metadata file contains invalid YAML syntax 64 """ 65 storage_client = get_gcs_storage_client() 66 bucket = storage_client.bucket(bucket_name) 67 68 # Construct the path to the metadata file 69 # Pattern: metadata/airbyte/{connector_name}/{version}/metadata.yaml 70 blob_path = ( 71 f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{METADATA_FILE_NAME}" 72 ) 73 blob = bucket.blob(blob_path) 74 75 logger.info(f"Reading registry entry for {connector_name} from {blob_path}") 76 77 # Read the file 78 content = safe_read_gcs_file(blob) 79 if content is None: 80 raise FileNotFoundError( 81 f"Connector metadata not found in registry: {connector_name}. " 82 f"Checked path: {blob_path}" 83 ) 84 85 # Parse YAML 86 try: 87 metadata = yaml.safe_load(content) 88 if metadata is None or not isinstance(metadata, dict): 89 raise ValueError(f"Metadata file {blob_path} has an invalid structure") 90 return metadata 91 except yaml.YAMLError as e: 92 logger.error( 93 "Failed to parse metadata for %s from %s: %s", 94 connector_name, 95 blob_path, 96 e, 97 ) 98 raise
Get a connector's registry entry from GCS.
Reads metadata for a connector from the registry stored in GCS.
Arguments:
- connector_name: The connector name (e.g., "source-faker", "destination-postgres")
- bucket_name: Name of the GCS bucket containing the registry
- version: Version folder name (e.g., "latest", "1.2.3")
Returns:
dict: The connector's metadata as a dictionary
Raises:
- ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure
- FileNotFoundError: If the connector metadata is not found in the registry
- yaml.YAMLError: If the metadata file contains invalid YAML syntax
101def get_registry_spec( 102 connector_name: str, 103 bucket_name: str, 104 version: str = LATEST_GCS_FOLDER_NAME, 105) -> dict[str, Any]: 106 """Get a connector's spec from GCS. 107 108 Reads the connector specification from the registry stored in GCS. 109 110 Args: 111 connector_name: The connector name (e.g., "source-faker", "destination-postgres") 112 bucket_name: Name of the GCS bucket containing the registry 113 version: Version folder name (e.g., "latest", "1.2.3") 114 115 Returns: 116 dict: The connector's spec as a dictionary 117 118 Raises: 119 ValueError: If GCS credentials are not configured, or if the spec is not a JSON object 120 FileNotFoundError: If the connector spec is not found in the registry 121 json.JSONDecodeError: If the spec file contains invalid JSON syntax 122 """ 123 storage_client = get_gcs_storage_client() 124 bucket = storage_client.bucket(bucket_name) 125 126 # Construct the path to the spec file 127 # Pattern: metadata/airbyte/{connector_name}/{version}/spec.json 128 blob_path = f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{SPEC_FILE_NAME}" 129 blob = bucket.blob(blob_path) 130 131 logger.info(f"Reading spec for {connector_name} from {blob_path}") 132 133 # Read the file 134 content = safe_read_gcs_file(blob) 135 if content is None: 136 raise FileNotFoundError( 137 f"Connector spec not found in registry: {connector_name}. " 138 f"Checked path: {blob_path}" 139 ) 140 141 # Parse JSON 142 try: 143 spec = json.loads(content) 144 if spec is None or not isinstance(spec, dict): 145 raise ValueError( 146 f"Spec file for {connector_name} at {blob_path} is not a JSON object" 147 ) 148 return spec 149 except json.JSONDecodeError as e: 150 logger.error( 151 "Failed to parse spec for %s from %s: %s", 152 connector_name, 153 blob_path, 154 e, 155 ) 156 raise
Get a connector's spec from GCS.
Reads the connector specification from the registry stored in GCS.
Arguments:
- connector_name: The connector name (e.g., "source-faker", "destination-postgres")
- bucket_name: Name of the GCS bucket containing the registry
- version: Version folder name (e.g., "latest", "1.2.3")
Returns:
dict: The connector's spec as a dictionary
Raises:
- ValueError: If GCS credentials are not configured, or if the spec is not a JSON object
- FileNotFoundError: If the connector spec is not found in the registry
- json.JSONDecodeError: If the spec file contains invalid JSON syntax
75def is_valid_for_progressive_rollout(version: str) -> bool: 76 """Check if a version string is valid for progressive rollout. 77 78 Currently rejects only `-preview` versions. All other semver 79 versions (GA and RC alike) are accepted. This gate is intentionally 80 kept generic so it can be tightened or turned into a no-op in the 81 future. 82 83 Args: 84 version: The version string to check. 85 86 Returns: 87 True if the version may be used in a progressive rollout, 88 False if the version format is explicitly disallowed. 89 """ 90 return "-preview." not in version
Check if a version string is valid for progressive rollout.
Currently rejects only -preview versions. All other semver
versions (GA and RC alike) are accepted. This gate is intentionally
kept generic so it can be tightened or turned into a no-op in the
future.
Arguments:
- version: The version string to check.
Returns:
True if the version may be used in a progressive rollout, False if the version format is explicitly disallowed.
325def list_connector_versions(connector_name: str, bucket_name: str) -> list[str]: 326 """List all versions of a connector in the registry. 327 328 Scans the GCS bucket to find all versions of a specific connector. 329 330 Args: 331 connector_name: The connector name (e.g., "source-faker") 332 bucket_name: Name of the GCS bucket containing the registry 333 334 Returns: 335 list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate') 336 337 Raises: 338 ValueError: If GCS credentials are not configured 339 """ 340 storage_client = get_gcs_storage_client() 341 bucket = storage_client.bucket(bucket_name) 342 343 # List all blobs matching the pattern: metadata/airbyte/{connector_name}/*/metadata.yaml 344 glob_pattern = f"{METADATA_FOLDER}/airbyte/{connector_name}/*/{METADATA_FILE_NAME}" 345 logger.info(f"Listing versions for {connector_name} with pattern: {glob_pattern}") 346 347 try: 348 blobs = bucket.list_blobs(match_glob=glob_pattern) 349 except Exception as e: 350 logger.error(f"Error listing blobs in bucket {bucket_name}: {e}") 351 raise 352 353 # Extract versions from blob paths 354 # Path format: metadata/airbyte/{connector-name}/{version}/metadata.yaml 355 versions: set[str] = set() 356 for blob in blobs: 357 path_parts = blob.name.split("/") 358 # Path should be: metadata / airbyte / connector-name / version / metadata.yaml 359 if len(path_parts) >= 5: 360 version = path_parts[3] 361 # Exclude special folders 362 if version not in ("latest", "release_candidate"): 363 versions.add(version) 364 365 return sorted(versions)
List all versions of a connector in the registry.
Scans the GCS bucket to find all versions of a specific connector.
Arguments:
- connector_name: The connector name (e.g., "source-faker")
- bucket_name: Name of the GCS bucket containing the registry
Returns:
list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate')
Raises:
- ValueError: If GCS credentials are not configured
159def list_registry_connectors(bucket_name: str) -> list[str]: 160 """List all connectors in the registry. 161 162 Scans the GCS bucket to find all connectors that have metadata files. 163 164 Args: 165 bucket_name: Name of the GCS bucket containing the registry 166 167 Returns: 168 list[str]: Sorted list of connector names 169 170 Raises: 171 ValueError: If GCS credentials are not configured 172 """ 173 storage_client = get_gcs_storage_client() 174 bucket = storage_client.bucket(bucket_name) 175 176 # List all blobs matching the pattern: metadata/airbyte/*/latest/metadata.yaml 177 glob_pattern = ( 178 f"{METADATA_FOLDER}/airbyte/*/{LATEST_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}" 179 ) 180 logger.info(f"Listing connectors with pattern: {glob_pattern}") 181 182 try: 183 blobs = bucket.list_blobs(match_glob=glob_pattern) 184 except Exception as e: 185 logger.error(f"Error listing blobs in bucket {bucket_name}: {e}") 186 raise 187 188 # Extract connector names from blob paths 189 # Path format: metadata/airbyte/{connector-name}/latest/metadata.yaml 190 connector_names: set[str] = set() 191 for blob in blobs: 192 path_parts = blob.name.split("/") 193 # Path should be: metadata / airbyte / connector-name / latest / metadata.yaml 194 if len(path_parts) >= 5: 195 connector_name = path_parts[2] 196 connector_names.add(connector_name) 197 198 return sorted(connector_names)
List all connectors in the registry.
Scans the GCS bucket to find all connectors that have metadata files.
Arguments:
- bucket_name: Name of the GCS bucket containing the registry
Returns:
list[str]: Sorted list of connector names
Raises:
- ValueError: If GCS credentials are not configured
201def list_registry_connectors_filtered( 202 bucket_name: str, 203 *, 204 support_level: SupportLevel | None = None, 205 min_support_level: SupportLevel | None = None, 206 connector_type: ConnectorType | None = None, 207 language: ConnectorLanguage | None = None, 208 prefix: str = "", 209) -> list[str]: 210 """List connectors from the compiled cloud registry index with filtering. 211 212 When any filter is applied, reads the compiled `cloud_registry.json` index 213 instead of globbing individual metadata blobs. This is significantly faster 214 because the index is a single JSON file containing all connector entries. 215 216 When no filters are applied, falls back to the existing glob-based search 217 which captures all connectors (including OSS-only connectors not in the 218 Cloud index). 219 220 Args: 221 bucket_name: Name of the GCS bucket containing the registry. 222 support_level: Exact support level to match (e.g., `SupportLevel.CERTIFIED`). 223 min_support_level: Minimum support level threshold. Returns connectors 224 at or above this level. 225 connector_type: Filter by connector type (`ConnectorType.SOURCE` or 226 `ConnectorType.DESTINATION`). 227 language: Filter by implementation language (e.g., `ConnectorLanguage.PYTHON`). 228 prefix: Optional bucket prefix (e.g., `"aj-test100"`). 229 230 Returns: 231 Sorted list of connector technical names (e.g., `"source-github"`). 232 233 Raises: 234 ValueError: If `support_level` and `min_support_level` are both provided. 235 """ 236 has_filters = any([support_level, min_support_level, connector_type, language]) 237 238 if not has_filters: 239 return list_registry_connectors(bucket_name=bucket_name) 240 241 if support_level and min_support_level: 242 raise ValueError( 243 "Cannot specify both `support_level` and `min_support_level`. " 244 "Use `support_level` for an exact match or `min_support_level` for a threshold." 245 ) 246 247 entries = _read_cloud_registry_index(bucket_name=bucket_name, prefix=prefix) 248 249 # Apply support_level exact match 250 if support_level: 251 entries = [e for e in entries if e.get("supportLevel") == support_level] 252 253 # Apply min_support_level threshold 254 if min_support_level: 255 threshold = min_support_level.precedence 256 known_levels = {m.value for m in SupportLevel} 257 entries = [ 258 e 259 for e in entries 260 if e.get("supportLevel") 261 and e["supportLevel"] in known_levels 262 and SupportLevel(e["supportLevel"]).precedence >= threshold 263 ] 264 265 # Apply connector_type filter 266 if connector_type == ConnectorType.SOURCE: 267 entries = [e for e in entries if "sourceDefinitionId" in e] 268 elif connector_type == ConnectorType.DESTINATION: 269 entries = [e for e in entries if "destinationDefinitionId" in e] 270 271 # Apply language filter 272 if language: 273 entries = [e for e in entries if e.get("language") == language] 274 275 # Extract connector names from dockerRepository (e.g., "airbyte/source-github" -> "source-github") 276 names: set[str] = set() 277 for entry in entries: 278 docker_repo = entry.get("dockerRepository", "") 279 if "/" in docker_repo: 280 names.add(docker_repo.split("/", 1)[1]) 281 elif docker_repo: 282 names.add(docker_repo) 283 284 return sorted(names)
List connectors from the compiled cloud registry index with filtering.
When any filter is applied, reads the compiled cloud_registry.json index
instead of globbing individual metadata blobs. This is significantly faster
because the index is a single JSON file containing all connector entries.
When no filters are applied, falls back to the existing glob-based search which captures all connectors (including OSS-only connectors not in the Cloud index).
Arguments:
- bucket_name: Name of the GCS bucket containing the registry.
- support_level: Exact support level to match (e.g.,
SupportLevel.CERTIFIED). - min_support_level: Minimum support level threshold. Returns connectors at or above this level.
- connector_type: Filter by connector type (
ConnectorType.SOURCEorConnectorType.DESTINATION). - language: Filter by implementation language (e.g.,
ConnectorLanguage.PYTHON). - prefix: Optional bucket prefix (e.g.,
"aj-test100").
Returns:
Sorted list of connector technical names (e.g.,
"source-github").
Raises:
- ValueError: If
support_levelandmin_support_levelare both provided.
304def publish_connector_metadata( 305 connector_name: str, 306 metadata: dict[str, Any], 307 bucket_name: str, 308 version: str, 309 update_latest: bool = True, 310 dry_run: bool = False, 311) -> MetadataPublishResult: 312 """Publish connector metadata to GCS. 313 314 Uploads the metadata to the registry bucket at a versioned path, and optionally 315 also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading 316 unchanged files. 317 318 Requires GCS_CREDENTIALS environment variable to be set. 319 """ 320 if not isinstance(metadata, dict): 321 raise ValueError("Metadata must be a dictionary") 322 323 if "data" not in metadata: 324 raise ValueError("Metadata must contain 'data' field") 325 326 # Construct GCS paths using airbyte/{connector_name} convention 327 versioned_blob_path = get_gcs_publish_path(connector_name, "metadata", version) 328 latest_blob_path = get_gcs_publish_path( 329 connector_name, "metadata", LATEST_GCS_FOLDER_NAME 330 ) 331 332 if dry_run: 333 message = f"[DRY RUN] Would upload metadata to gs://{bucket_name}/{versioned_blob_path}" 334 if update_latest: 335 message += f" and gs://{bucket_name}/{latest_blob_path}" 336 logger.info(message) 337 return MetadataPublishResult( 338 connector_name=connector_name, 339 version=version, 340 bucket_name=bucket_name, 341 versioned_path=versioned_blob_path, 342 latest_path=latest_blob_path if update_latest else None, 343 versioned_uploaded=False, 344 latest_uploaded=False, 345 status="dry-run", 346 message=message, 347 ) 348 349 # Get GCS client and bucket 350 storage_client = get_gcs_storage_client() 351 bucket = storage_client.bucket(bucket_name) 352 353 # Write metadata to temp file 354 with tempfile.NamedTemporaryFile( 355 mode="w", suffix=".yaml", delete=False 356 ) as tmp_file: 357 yaml.dump(metadata, tmp_file) 358 tmp_path = Path(tmp_file.name) 359 360 try: 361 # Upload versioned file 362 versioned_uploaded, _ = upload_file_if_changed( 363 local_file_path=tmp_path, 364 bucket=bucket, 365 blob_path=versioned_blob_path, 366 disable_cache=True, 367 ) 368 369 if versioned_uploaded: 370 logger.info( 371 f"Uploaded metadata for {connector_name} v{version} to {versioned_blob_path}" 372 ) 373 else: 374 logger.info( 375 f"Versioned metadata for {connector_name} v{version} is already up to date" 376 ) 377 378 # Optionally update latest pointer 379 latest_uploaded = False 380 if update_latest: 381 latest_uploaded, _ = upload_file_if_changed( 382 local_file_path=tmp_path, 383 bucket=bucket, 384 blob_path=latest_blob_path, 385 disable_cache=True, 386 ) 387 if latest_uploaded: 388 logger.info(f"Updated latest pointer for {connector_name}") 389 else: 390 logger.info( 391 f"Latest pointer for {connector_name} is already up to date" 392 ) 393 finally: 394 # Clean up temp file even if upload fails 395 tmp_path.unlink(missing_ok=True) 396 397 # Determine status 398 if versioned_uploaded or latest_uploaded: 399 status = "success" 400 message = f"Published metadata for {connector_name} v{version}" 401 if versioned_uploaded: 402 message += f" to {versioned_blob_path}" 403 if latest_uploaded: 404 message += " and updated latest" 405 else: 406 status = "already-up-to-date" 407 message = f"Metadata for {connector_name} v{version} is already up to date" 408 409 return MetadataPublishResult( 410 connector_name=connector_name, 411 version=version, 412 bucket_name=bucket_name, 413 versioned_path=versioned_blob_path, 414 latest_path=latest_blob_path if update_latest else None, 415 versioned_uploaded=versioned_uploaded, 416 latest_uploaded=latest_uploaded, 417 status=status, 418 message=message, 419 )
Publish connector metadata to GCS.
Uploads the metadata to the registry bucket at a versioned path, and optionally also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading unchanged files.
Requires GCS_CREDENTIALS environment variable to be set.
115def publish_version_artifacts( 116 connector_name: str, 117 version: str, 118 artifacts_dir: Path, 119 store: RegistryStore, 120 dry_run: bool = False, 121 with_validate: bool = True, 122) -> PublishArtifactsResult: 123 """Publish locally generated artifacts to a GCS registry bucket. 124 125 Uses `gcsfs.GCSFileSystem` to upload the local *artifacts_dir* to the 126 versioned path inside the target GCS bucket. 127 128 The target GCS path is: 129 `gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/` 130 131 Before uploading, this function validates that the `connector_name` 132 (derived from the connector directory) matches the `dockerRepository` 133 declared in `metadata.yaml`. A mismatch would cause the registry 134 compile step to see duplicate definition-ID entries and fail. 135 136 Args: 137 connector_name: Connector name (e.g. `source-faker`). 138 version: Version string (e.g. `6.2.38`). 139 artifacts_dir: Local directory containing artifacts from `generate`. 140 store: Parsed store target containing bucket, prefix, and stage info. 141 dry_run: If `True`, report what would be uploaded without writing. 142 with_validate: If `True` (default), validate metadata before uploading. 143 Pass `False` (`--no-validate`) to skip. 144 145 Returns: 146 A `PublishArtifactsResult` describing what was published. 147 148 Raises: 149 ValueError: If the connector directory name does not match 150 `dockerRepository` in the generated metadata. 151 """ 152 if not artifacts_dir.is_dir(): 153 raise FileNotFoundError(f"Artifacts directory not found: {artifacts_dir}") 154 155 # Fail fast if the connector directory name doesn't match dockerRepository. 156 # A mismatch would publish artifacts under the wrong GCS path and corrupt 157 # the registry (duplicate definition-IDs under different directory names). 158 mismatch_error = _check_connector_name_matches_docker_repo( 159 connector_name, artifacts_dir 160 ) 161 if mismatch_error: 162 raise ValueError(mismatch_error) 163 164 # Build the GCS destination path 165 bucket_name = store.bucket 166 prefix = store.prefix 167 blob_root = versioned_blob_root( 168 connector_name=connector_name, version=version, store=store 169 ) 170 versioned_dest = f"gcs://{bucket_name}/{blob_root}" 171 172 target_label = f"{bucket_name}/{prefix}" if prefix else bucket_name 173 result = PublishArtifactsResult( 174 connector_name=connector_name, 175 version=version, 176 target=target_label, 177 gcs_destination=versioned_dest, 178 dry_run=dry_run, 179 ) 180 181 # --- Pre-publish validation --- 182 if with_validate: 183 metadata_file = artifacts_dir / "metadata.yaml" 184 if metadata_file.is_file(): 185 raw_metadata = yaml.safe_load(metadata_file.read_text()) 186 metadata_data = (raw_metadata or {}).get("data", {}) 187 validation = validate_metadata(metadata_data=metadata_data) 188 if not validation.passed: 189 for err in validation.errors: 190 logger.error("Pre-publish validation error: %s", err) 191 result.validation_errors = validation.errors 192 return result 193 logger.info( 194 "Pre-publish validation passed (%d validators).", 195 validation.validators_run, 196 ) 197 else: 198 logger.warning("No metadata.yaml in artifacts dir; skipping validation.") 199 200 # Enumerate local files 201 local_files = sorted(f for f in artifacts_dir.rglob("*") if f.is_file()) 202 if not local_files: 203 result.errors.append(f"No files found in {artifacts_dir}.") 204 return result 205 206 _log_progress( 207 "Publishing %d artifacts for %s@%s → %s", 208 len(local_files), 209 connector_name, 210 version, 211 versioned_dest, 212 ) 213 214 # Build references used by both dry-run and real upload paths 215 deps_file = artifacts_dir / CONNECTOR_DEPENDENCY_FILE_NAME 216 has_deps = deps_file.is_file() 217 deps_gcs_key = dependencies_blob_path( 218 connector_name=connector_name, version=version, store=store 219 ) 220 221 sbom_file = artifacts_dir / SBOM_FILE_NAME 222 has_sbom = sbom_file.is_file() 223 224 if dry_run: 225 for f in local_files: 226 rel = f.relative_to(artifacts_dir) 227 result.files_uploaded.append(str(rel)) 228 _log_progress(" [DRY RUN] would upload: %s", rel) 229 # Report the dual-load of dependencies.json to connector_dependencies/ 230 if has_deps: 231 result.files_uploaded.append(deps_gcs_key) 232 _log_progress( 233 " [DRY RUN] would also dual-load: %s → gs://%s/%s", 234 CONNECTOR_DEPENDENCY_FILE_NAME, 235 bucket_name, 236 deps_gcs_key, 237 ) 238 # Report the separate sbom/ upload 239 if has_sbom: 240 sbom_gcs_key = sbom_blob_path( 241 connector_name=connector_name, 242 version=version, 243 store=store, 244 ) 245 result.files_uploaded.append(sbom_gcs_key) 246 _log_progress( 247 " [DRY RUN] would also upload: %s → gs://%s/%s", 248 SBOM_FILE_NAME, 249 bucket_name, 250 sbom_gcs_key, 251 ) 252 return result 253 254 # Authenticate 255 token = get_gcs_credentials_token() 256 fs = gcsfs.GCSFileSystem(token=token) 257 258 # Strip gcs:// prefix for gcsfs path 259 dest_path = versioned_dest.replace("gcs://", "") 260 261 # Upload all files to the versioned path 262 _log_progress("Uploading to: %s", versioned_dest) 263 for f in local_files: 264 rel = f.relative_to(artifacts_dir) 265 remote_path = f"{dest_path}/{rel}" 266 fs.put(str(f), remote_path) 267 result.files_uploaded.append(str(rel)) 268 _log_progress(" Uploaded: %s", rel) 269 270 # Delete remote files that don't exist locally (sync semantics) 271 try: 272 remote_files = fs.ls(dest_path, detail=False) 273 local_rel_paths = {str(f.relative_to(artifacts_dir)) for f in local_files} 274 for remote_file in remote_files: 275 # Skip the directory entry itself if it appears in the listing 276 if remote_file == dest_path: 277 continue 278 # Derive the remote relative path, matching upload semantics 279 if remote_file.startswith(dest_path + "/"): 280 remote_rel = remote_file[len(dest_path) + 1 :] 281 else: 282 remote_rel = remote_file.split("/")[-1] 283 if remote_rel not in local_rel_paths: 284 fs.rm(remote_file) 285 _log_progress(" Deleted stale remote file: %s", remote_rel) 286 except FileNotFoundError: 287 pass # Destination doesn't exist yet, nothing to clean 288 289 _log_progress("Uploaded %d files to %s", len(local_files), versioned_dest) 290 291 # --- Dual-load dependencies.json to the connector_dependencies/ path --- 292 if not has_deps: 293 logger.debug( 294 "No %s in artifacts dir — skipping dual-load.", 295 CONNECTOR_DEPENDENCY_FILE_NAME, 296 ) 297 else: 298 deps_remote = f"{bucket_name}/{deps_gcs_key}" 299 _log_progress( 300 "Dual-loading %s to gs://%s", 301 CONNECTOR_DEPENDENCY_FILE_NAME, 302 deps_remote, 303 ) 304 fs.put(str(deps_file), deps_remote) 305 result.files_uploaded.append(deps_gcs_key) 306 _log_progress(" Uploaded %s (dual-load)", CONNECTOR_DEPENDENCY_FILE_NAME) 307 308 # --- Upload SBOM to the dedicated sbom/ path in GCS --- 309 if not has_sbom: 310 logger.debug( 311 "No %s in artifacts dir — skipping SBOM dual-load.", 312 SBOM_FILE_NAME, 313 ) 314 else: 315 sbom_gcs_uri = upload_sbom( 316 sbom_path=sbom_file, 317 connector_name=connector_name, 318 version=version, 319 store=store, 320 dry_run=dry_run, 321 ) 322 result.files_uploaded.append( 323 sbom_blob_path( 324 connector_name=connector_name, 325 version=version, 326 store=store, 327 ), 328 ) 329 _log_progress("Uploaded SBOM to dedicated path: %s", sbom_gcs_uri) 330 331 return result
Publish locally generated artifacts to a GCS registry bucket.
Uses gcsfs.GCSFileSystem to upload the local artifacts_dir to the
versioned path inside the target GCS bucket.
The target GCS path is:
gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/
Before uploading, this function validates that the connector_name
(derived from the connector directory) matches the dockerRepository
declared in metadata.yaml. A mismatch would cause the registry
compile step to see duplicate definition-ID entries and fail.
Arguments:
- connector_name: Connector name (e.g.
source-faker). - version: Version string (e.g.
6.2.38). - artifacts_dir: Local directory containing artifacts from
generate. - store: Parsed store target containing bucket, prefix, and stage info.
- dry_run: If
True, report what would be uploaded without writing. - with_validate: If
True(default), validate metadata before uploading. PassFalse(--no-validate) to skip.
Returns:
A
PublishArtifactsResultdescribing what was published.
Raises:
- ValueError: If the connector directory name does not match
dockerRepositoryin the generated metadata.
1397def purge_latest_dirs( 1398 *, 1399 store: RegistryStore, 1400 connector_name: list[str] | None = None, 1401 dry_run: bool = False, 1402) -> PurgeLatestResult: 1403 """Delete all `latest/` directories from the registry store. 1404 1405 Discovers connector directories via glob, then deletes each 1406 `latest/` subdirectory in parallel using a thread pool. 1407 1408 Args: 1409 store: Registry store (bucket + optional prefix). 1410 connector_name: If provided, only purge these connectors. 1411 dry_run: If True, report what would be done without deleting. 1412 1413 Returns: 1414 A `PurgeLatestResult` describing what was done. 1415 """ 1416 result = PurgeLatestResult(target=store.bucket_root, dry_run=dry_run) 1417 1418 token = get_gcs_credentials_token() 1419 fs = gcsfs.GCSFileSystem(token=token) 1420 1421 base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte" 1422 1423 # Discover latest/ dirs by listing connector directories that contain 1424 # a `latest/` subdirectory. 1425 _log_progress("Discovering latest/ directories...") 1426 base_with_slash = f"{base}/" 1427 if connector_name: 1428 # Check each requested connector for a latest/ dir 1429 seen: set[str] = set() 1430 connectors_with_latest: list[str] = [] 1431 for name in connector_name: 1432 if name in seen: 1433 continue 1434 latest_path = f"{base}/{name}/latest" 1435 if fs.exists(latest_path): 1436 connectors_with_latest.append(name) 1437 seen.add(name) 1438 else: 1439 # Glob for all connectors, then filter to those with latest/ 1440 all_connector_dirs = fs.glob(f"{base}/*/latest") 1441 seen = set() 1442 connectors_with_latest = [] 1443 for path in all_connector_dirs: 1444 # Strip the known base prefix and take the first component 1445 if not path.startswith(base_with_slash): 1446 logger.warning("Could not parse latest path: %s", path) 1447 continue 1448 relative = path[len(base_with_slash) :] 1449 connector = relative.split("/")[0] 1450 if connector and connector not in seen: 1451 connectors_with_latest.append(connector) 1452 seen.add(connector) 1453 1454 result.connectors_found = len(connectors_with_latest) 1455 _log_progress( 1456 "Found %d connectors with latest/ directories", 1457 result.connectors_found, 1458 ) 1459 1460 if not connectors_with_latest: 1461 _log_progress("Nothing to purge.") 1462 _log_progress(result.summary()) 1463 return result 1464 1465 if dry_run: 1466 for connector in sorted(connectors_with_latest): 1467 _log_progress(" [DRY RUN] Would delete %s/latest/", connector) 1468 result.latest_dirs_deleted = len(connectors_with_latest) 1469 _log_progress(result.summary()) 1470 return result 1471 1472 # Delete latest/ dirs in parallel using the shared helper. 1473 def _delete_one(connector: str) -> str | None: 1474 """Delete a single connector's latest/ dir. Returns error string or None.""" 1475 try: 1476 _delete_latest_dir( 1477 fs, 1478 store=store, 1479 connector=connector, 1480 ) 1481 return None 1482 except Exception as exc: 1483 return f"Failed to delete latest/ for {connector}: {exc}" 1484 1485 _log_progress( 1486 "Deleting %d latest/ directories (max_workers=%d)...", 1487 len(connectors_with_latest), 1488 _PURGE_LATEST_MAX_WORKERS, 1489 ) 1490 1491 with ThreadPoolExecutor(max_workers=_PURGE_LATEST_MAX_WORKERS) as pool: 1492 futures = { 1493 pool.submit(_delete_one, c): c for c in sorted(connectors_with_latest) 1494 } 1495 for i, future in enumerate(as_completed(futures), 1): 1496 connector = futures[future] 1497 error = future.result() 1498 if error: 1499 logger.error(error) 1500 result.errors.append(error) 1501 else: 1502 result.latest_dirs_deleted += 1 1503 if i % 100 == 0: 1504 _log_progress(" Deleted %d / %d...", i, len(connectors_with_latest)) 1505 1506 _log_progress(result.summary()) 1507 return result
Delete all latest/ directories from the registry store.
Discovers connector directories via glob, then deletes each
latest/ subdirectory in parallel using a thread pool.
Arguments:
- store: Registry store (bucket + optional prefix).
- connector_name: If provided, only purge these connectors.
- dry_run: If True, report what would be done without deleting.
Returns:
A
PurgeLatestResultdescribing what was done.
147def rebuild_registry( 148 source_bucket: str, 149 output_mode: OutputMode, 150 output_path_root: str | None = None, 151 gcs_bucket: str | None = None, 152 s3_bucket: str | None = None, 153 dry_run: bool = False, 154 connector_name: list[str] | None = None, 155) -> RebuildResult: 156 """Rebuild the entire registry from a source GCS bucket to an output target. 157 158 Reads all connector metadata blobs from the source GCS bucket and copies 159 them to the output target using fsspec for unified filesystem access. 160 161 The output targets are: 162 - local: Write to a local directory tree. 163 - gcs: Copy to a GCS bucket (must not be the prod bucket). 164 - s3: Copy to an S3 bucket. 165 166 Args: 167 source_bucket: The GCS bucket to read from (typically prod). 168 output_mode: Where to write: "local", "gcs", or "s3". 169 output_path_root: Root path/prefix for output. For local mode, if None 170 creates a temp directory. For GCS/S3, prepended to all blob paths. 171 gcs_bucket: Target GCS bucket name (required if output_mode="gcs"). 172 s3_bucket: Target S3 bucket name (required if output_mode="s3"). 173 dry_run: If True, report what would be done without writing. 174 connector_name: If provided, only rebuild these connector names 175 (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all. 176 177 Returns: 178 RebuildResult with details of the operation. 179 180 Raises: 181 ValueError: If target is the prod bucket, or required bucket arg is missing. 182 """ 183 if output_mode == "gcs" and gcs_bucket: 184 _validate_not_prod_bucket(gcs_bucket) 185 186 # Resolve output root for local mode 187 effective_output_root = output_path_root or "" 188 if output_mode == "local": 189 effective_output_root = _resolve_local_output_root(output_path_root) 190 191 result = RebuildResult( 192 source_bucket=source_bucket, 193 output_mode=output_mode, 194 output_root=effective_output_root, 195 dry_run=dry_run, 196 ) 197 198 # Create source filesystem (always GCS) 199 source_fs, gcs_token = _make_source_fs() 200 source_base = f"{source_bucket}/{METADATA_FOLDER}" 201 202 # List files under metadata/ in the source bucket 203 if connector_name: 204 _log_progress( 205 "Listing blobs for %d connectors under gs://%s/...", 206 len(connector_name), 207 source_base, 208 ) 209 source_paths: list[str] = [] 210 for name in connector_name: 211 connector_prefix = f"{source_base}/airbyte/{name}" 212 found = source_fs.find(connector_prefix) 213 source_paths.extend(found) 214 _log_progress(" %s: %d blobs", name, len(found)) 215 else: 216 _log_progress("Listing all blobs under gs://%s/...", source_base) 217 source_paths = source_fs.find(source_base) 218 219 if not source_paths: 220 _log_progress("No blobs found under gs://%s/", source_base) 221 return result 222 223 total_blobs = len(source_paths) 224 _log_progress("Found %d blobs to process", total_blobs) 225 226 # Create output filesystem 227 output_fs, output_base = _make_output_fs( 228 output_mode=output_mode, 229 output_root=effective_output_root, 230 gcs_bucket=gcs_bucket, 231 s3_bucket=s3_bucket, 232 gcs_token=gcs_token, 233 ) 234 235 # Collect connector names and compute relative paths for all blobs. 236 bucket_prefix = f"{source_bucket}/" 237 blob_relative_paths: list[str] = [] 238 connector_names: set[str] = set() 239 240 for source_path in source_paths: 241 relative_path = source_path 242 if source_path.startswith(bucket_prefix): 243 relative_path = source_path[len(bucket_prefix) :] 244 blob_relative_paths.append(relative_path) 245 246 parts = relative_path.split("/") 247 if len(parts) >= 3: 248 connector_names.add(parts[2]) 249 250 if dry_run: 251 result.blobs_copied = total_blobs 252 result.connectors_processed = len(connector_names) 253 _log_progress( 254 "[DRY RUN] Would copy %d blobs (%d connectors)", 255 total_blobs, 256 len(connector_names), 257 ) 258 _log_progress(result.summary()) 259 return result 260 261 # Use GCS-native server-side copy for GCS→GCS mirrors. 262 if output_mode == "gcs" and gcs_bucket: 263 result = _gcs_native_copy( 264 source_bucket=source_bucket, 265 dest_bucket_name=gcs_bucket, 266 dest_prefix=effective_output_root, 267 blob_relative_paths=blob_relative_paths, 268 connector_names=connector_names, 269 gcs_token=gcs_token, 270 result=result, 271 connector_name_filter=connector_name, 272 ) 273 return result 274 275 # Fallback: fsspec-based copy for local and S3 output modes. 276 result = _fsspec_copy( 277 source_fs=source_fs, 278 source_paths=source_paths, 279 output_fs=output_fs, 280 output_base=output_base, 281 output_mode=output_mode, 282 source_bucket=source_bucket, 283 blob_relative_paths=blob_relative_paths, 284 connector_names=connector_names, 285 result=result, 286 ) 287 return result
Rebuild the entire registry from a source GCS bucket to an output target.
Reads all connector metadata blobs from the source GCS bucket and copies them to the output target using fsspec for unified filesystem access.
The output targets are:
- local: Write to a local directory tree.
- gcs: Copy to a GCS bucket (must not be the prod bucket).
- s3: Copy to an S3 bucket.
Arguments:
- source_bucket: The GCS bucket to read from (typically prod).
- output_mode: Where to write: "local", "gcs", or "s3".
- output_path_root: Root path/prefix for output. For local mode, if None creates a temp directory. For GCS/S3, prepended to all blob paths.
- gcs_bucket: Target GCS bucket name (required if output_mode="gcs").
- s3_bucket: Target S3 bucket name (required if output_mode="s3").
- dry_run: If True, report what would be done without writing.
- connector_name: If provided, only rebuild these connector names (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all.
Returns:
RebuildResult with details of the operation.
Raises:
- ValueError: If target is the prod bucket, or required bucket arg is missing.
229def resolve_registry_store( 230 store: str | None = None, 231 connector_name: str | None = None, 232 cwd: Path | None = None, 233 default_env: str = "dev", 234) -> RegistryStore: 235 """Resolve a `RegistryStore` from CLI inputs. 236 237 All applicable detection methods are evaluated. Explicit sources 238 (`--store`, then the `AIRBYTE_REGISTRY_STORE` env var) take priority 239 and are returned directly. When only auto-detected sources remain, they 240 are compared and a `ValueError` is raised if they disagree. 241 242 Priority (highest → lowest): 243 244 1. **Explicit** `--store` argument (e.g. `"coral:dev"`). 245 2. **Environment variable** -- `AIRBYTE_REGISTRY_STORE`. 246 3. **Auto-detected** -- connector name and/or working directory. 247 If both are present and disagree, a `ValueError` is raised. 248 249 Args: 250 store: Explicit store target string (e.g. `"coral:dev"`). 251 connector_name: Optional connector name for auto-detection. 252 cwd: Working directory for repo-based detection. 253 default_env: Environment to use when auto-detecting (default `"dev"`). 254 255 Returns: 256 A fully resolved `RegistryStore`. 257 258 Raises: 259 ValueError: If no detection method succeeds, or if auto-detected 260 methods produce conflicting store types. 261 """ 262 # -- Collect all detection results ------------------------------------ 263 # Explicit sources (take priority — no conflict checking needed). 264 explicit_target: RegistryStore | None = None 265 explicit_source: str | None = None 266 267 if store is not None: 268 explicit_target = RegistryStore.parse(store) 269 explicit_source = "--store" 270 271 env_store = os.environ.get(REGISTRY_STORE_ENV_VAR) 272 if env_store and explicit_target is None: 273 explicit_target = RegistryStore.parse(env_store) 274 explicit_source = REGISTRY_STORE_ENV_VAR 275 276 # Auto-detected sources (only consulted when no explicit source). 277 auto_detections: dict[str, StoreType] = {} 278 279 if connector_name is not None: 280 auto_detections["connector_name"] = StoreType.get_from_connector_name( 281 connector_name, 282 ) 283 284 dir_type = StoreType.detect_from_repo_dir(cwd) 285 if dir_type is not None: 286 auto_detections["working_directory"] = dir_type 287 288 # -- Return explicit source if present -------------------------------- 289 if explicit_target is not None: 290 logger.debug( 291 "Using explicit store target from %s: %s:%s", 292 explicit_source, 293 explicit_target.store_type.value, 294 explicit_target.env, 295 ) 296 return explicit_target 297 298 # -- No explicit source: resolve from auto-detections ----------------- 299 if not auto_detections: 300 raise ValueError( 301 "Cannot determine registry store. " 302 "Provide --store (e.g. 'coral:dev' or 'sonar:prod'), " 303 f"set ${REGISTRY_STORE_ENV_VAR}, " 304 "or run from a recognized repository directory." 305 ) 306 307 distinct = set(auto_detections.values()) 308 309 if len(distinct) > 1: 310 detail = ", ".join(f"{src}={st.value}" for src, st in auto_detections.items()) 311 raise ValueError( 312 f"Conflicting store types detected: {detail}. " 313 "Provide an explicit --store to resolve the ambiguity." 314 ) 315 316 resolved_type = distinct.pop() 317 logger.info( 318 "Auto-detected store type '%s' (sources: %s)", 319 resolved_type.value, 320 ", ".join(auto_detections), 321 ) 322 return RegistryStore(store_type=resolved_type, env=default_env)
Resolve a RegistryStore from CLI inputs.
All applicable detection methods are evaluated. Explicit sources
(--store, then the AIRBYTE_REGISTRY_STORE env var) take priority
and are returned directly. When only auto-detected sources remain, they
are compared and a ValueError is raised if they disagree.
Priority (highest → lowest):
- Explicit
--storeargument (e.g."coral:dev"). - Environment variable --
AIRBYTE_REGISTRY_STORE. - Auto-detected -- connector name and/or working directory.
If both are present and disagree, a
ValueErroris raised.
Arguments:
- store: Explicit store target string (e.g.
"coral:dev"). - connector_name: Optional connector name for auto-detection.
- cwd: Working directory for repo-based detection.
- default_env: Environment to use when auto-detecting (default
"dev").
Returns:
A fully resolved
RegistryStore.
Raises:
- ValueError: If no detection method succeeds, or if auto-detected methods produce conflicting store types.
93def strip_rc_suffix(version: str) -> str: 94 """Strip the release candidate suffix from a version string. 95 96 Args: 97 version: The version string (e.g., '1.2.3-rc.1'). 98 99 Returns: 100 The base version without RC suffix (e.g., '1.2.3'). 101 Returns the original version if no RC suffix is present. 102 """ 103 if "-rc." in version: 104 return version.split("-rc.")[0] 105 return version
Strip the release candidate suffix from a version string.
Arguments:
- version: The version string (e.g., '1.2.3-rc.1').
Returns:
The base version without RC suffix (e.g., '1.2.3'). Returns the original version if no RC suffix is present.
157def unyank_connector_version( 158 connector_name: str, 159 version: str, 160 bucket_name: str, 161 dry_run: bool = False, 162) -> YankResult: 163 """Remove the yank marker from a connector version. 164 165 Deletes the version-yank.yml file at: 166 metadata/airbyte/{connector_name}/{version}/version-yank.yml 167 168 Args: 169 connector_name: The connector name (e.g., "source-faker"). 170 version: The version to unyank (e.g., "1.2.3"). 171 bucket_name: The GCS bucket name. 172 dry_run: If True, report what would be done without deleting. 173 174 Returns: 175 YankResult with details of the operation. 176 """ 177 yank_path = _get_yank_blob_path(connector_name, version) 178 179 storage_client = get_gcs_storage_client() 180 bucket = storage_client.bucket(bucket_name) 181 182 # Check if yank marker exists 183 yank_blob = bucket.blob(yank_path) 184 if not yank_blob.exists(): 185 return YankResult( 186 connector_name=connector_name, 187 version=version, 188 bucket_name=bucket_name, 189 action="unyank", 190 success=False, 191 message=f"Version {version} of {connector_name} is not yanked.", 192 dry_run=dry_run, 193 ) 194 195 if dry_run: 196 return YankResult( 197 connector_name=connector_name, 198 version=version, 199 bucket_name=bucket_name, 200 action="unyank", 201 success=True, 202 message=f"[DRY RUN] Would unyank {connector_name} {version}.", 203 dry_run=True, 204 ) 205 206 # Delete the yank marker 207 yank_blob.delete() 208 209 logger.info("Unyanked %s version %s in %s", connector_name, version, bucket_name) 210 211 return YankResult( 212 connector_name=connector_name, 213 version=version, 214 bucket_name=bucket_name, 215 action="unyank", 216 success=True, 217 message=f"Successfully unyanked {connector_name} {version}.", 218 )
Remove the yank marker from a connector version.
Deletes the version-yank.yml file at: metadata/airbyte/{connector_name}/{version}/version-yank.yml
Arguments:
- connector_name: The connector name (e.g., "source-faker").
- version: The version to unyank (e.g., "1.2.3").
- bucket_name: The GCS bucket name.
- dry_run: If True, report what would be done without deleting.
Returns:
YankResult with details of the operation.
270def validate_metadata( 271 metadata_data: dict[str, Any], 272 opts: ValidateOptions | None = None, 273) -> ValidationResult: 274 """Run all pre-publish validators against raw `metadata.data`. 275 276 Args: 277 metadata_data: The `data` section of a parsed `metadata.yaml`. 278 opts: Options influencing validation behaviour. 279 280 Returns: 281 A `ValidationResult` with aggregate pass/fail and error list. 282 """ 283 if opts is None: 284 opts = ValidateOptions() 285 286 result = ValidationResult() 287 288 for validator in PRE_PUBLISH_VALIDATORS: 289 result.validators_run += 1 290 logger.info("Running validator: %s", validator.__name__) 291 passed, error = validator(metadata_data, opts) 292 if not passed and error: 293 logger.error("Validation failed: %s", error) 294 result.add_error(error) 295 296 return result
Run all pre-publish validators against raw metadata.data.
Arguments:
- metadata_data: The
datasection of a parsedmetadata.yaml. - opts: Options influencing validation behaviour.
Returns:
A
ValidationResultwith aggregate pass/fail and error list.
65def yank_connector_version( 66 connector_name: str, 67 version: str, 68 bucket_name: str, 69 reason: str = "", 70 dry_run: bool = False, 71) -> YankResult: 72 """Mark a connector version as yanked by writing a version-yank.yml marker. 73 74 The marker file is placed at: 75 metadata/airbyte/{connector_name}/{version}/version-yank.yml 76 77 Args: 78 connector_name: The connector name (e.g., "source-faker"). 79 version: The version to yank (e.g., "1.2.3"). 80 bucket_name: The GCS bucket name. 81 reason: Optional reason for yanking the version. 82 dry_run: If True, report what would be done without writing. 83 84 Returns: 85 YankResult with details of the operation. 86 87 Raises: 88 ValueError: If the bucket is the production bucket and no override is set, 89 or if the version does not exist. 90 """ 91 yank_path = _get_yank_blob_path(connector_name, version) 92 metadata_path = _get_metadata_blob_path(connector_name, version) 93 94 storage_client = get_gcs_storage_client() 95 bucket = storage_client.bucket(bucket_name) 96 97 # Verify the version exists 98 metadata_blob = bucket.blob(metadata_path) 99 if not metadata_blob.exists(): 100 return YankResult( 101 connector_name=connector_name, 102 version=version, 103 bucket_name=bucket_name, 104 action="yank", 105 success=False, 106 message=f"Version {version} not found for {connector_name} in {bucket_name}.", 107 dry_run=dry_run, 108 ) 109 110 # Check if already yanked 111 yank_blob = bucket.blob(yank_path) 112 if yank_blob.exists(): 113 return YankResult( 114 connector_name=connector_name, 115 version=version, 116 bucket_name=bucket_name, 117 action="yank", 118 success=False, 119 message=f"Version {version} of {connector_name} is already yanked.", 120 dry_run=dry_run, 121 ) 122 123 if dry_run: 124 return YankResult( 125 connector_name=connector_name, 126 version=version, 127 bucket_name=bucket_name, 128 action="yank", 129 success=True, 130 message=f"[DRY RUN] Would yank {connector_name} {version}.", 131 dry_run=True, 132 ) 133 134 # Write the yank marker file 135 yank_content: dict[str, Any] = { 136 "yanked": True, 137 "yanked_at": datetime.now(tz=timezone.utc).isoformat(), 138 } 139 if reason: 140 yank_content["reason"] = reason 141 142 yank_yaml = yaml.dump(yank_content, default_flow_style=False) 143 yank_blob.upload_from_string(yank_yaml, content_type="application/x-yaml") 144 145 logger.info("Yanked %s version %s in %s", connector_name, version, bucket_name) 146 147 return YankResult( 148 connector_name=connector_name, 149 version=version, 150 bucket_name=bucket_name, 151 action="yank", 152 success=True, 153 message=f"Successfully yanked {connector_name} {version}.", 154 )
Mark a connector version as yanked by writing a version-yank.yml marker.
The marker file is placed at:
metadata/airbyte/{connector_name}/{version}/version-yank.yml
Arguments:
- connector_name: The connector name (e.g., "source-faker").
- version: The version to yank (e.g., "1.2.3").
- bucket_name: The GCS bucket name.
- reason: Optional reason for yanking the version.
- dry_run: If True, report what would be done without writing.
Returns:
YankResult with details of the operation.
Raises:
- ValueError: If the bucket is the production bucket and no override is set, or if the version does not exist.