airbyte_ops_mcp.registry
Registry operations for Airbyte connectors.
This package provides functionality for reading, listing, publishing, compiling, and validating connector registry artifacts.
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Registry operations for Airbyte connectors. 3 4This package provides functionality for reading, listing, publishing, compiling, 5and validating connector registry artifacts. 6""" 7 8from __future__ import annotations 9 10from airbyte_ops_mcp.registry._constants import ( 11 DEFAULT_METADATA_SERVICE_BUCKET_NAME, 12 DEV_METADATA_SERVICE_BUCKET_NAME, 13 LATEST_GCS_FOLDER_NAME, 14 METADATA_FILE_NAME, 15 METADATA_FOLDER, 16 PROD_METADATA_SERVICE_BUCKET_NAME, 17 RELEASE_CANDIDATE_GCS_FOLDER_NAME, 18 SONAR_DEV_BUCKET_NAME, 19 SONAR_PROD_BUCKET_NAME, 20) 21from airbyte_ops_mcp.registry._enums import ( 22 ConnectorLanguage, 23 ConnectorType, 24 SupportLevel, 25) 26from airbyte_ops_mcp.registry.audit import ( 27 AuditResult, 28 UnpublishedConnector, 29 find_unpublished_connectors, 30) 31from airbyte_ops_mcp.registry.compile import ( 32 CompileResult, 33 PurgeLatestResult, 34 compile_registry, 35 purge_latest_dirs, 36) 37from airbyte_ops_mcp.registry.generate import ( 38 GenerateResult, 39 generate_version_artifacts, 40) 41from airbyte_ops_mcp.registry.models import ( 42 ConnectorListResult, 43 ConnectorMetadata, 44 MetadataPublishResult, 45 RegistryEntryResult, 46 VersionListResult, 47) 48from airbyte_ops_mcp.registry.operations import ( 49 get_registry_entry, 50 get_registry_spec, 51 list_connector_versions, 52 list_registry_connectors, 53 list_registry_connectors_filtered, 54) 55from airbyte_ops_mcp.registry.publish import ( 56 CONNECTOR_PATH_PREFIX, 57 get_connector_metadata, 58 get_gcs_publish_path, 59 publish_connector_metadata, 60) 61from airbyte_ops_mcp.registry.publish_artifacts import ( 62 PublishArtifactsResult, 63 publish_version_artifacts, 64) 65from airbyte_ops_mcp.registry.rebuild import ( 66 OutputMode, 67 RebuildResult, 68 rebuild_registry, 69) 70from airbyte_ops_mcp.registry.registry_store_base import ( 71 Registry, 72 get_registry, 73) 74from airbyte_ops_mcp.registry.store import ( 75 REGISTRY_STORE_ENV_VAR, 76 RegistryStore, 77 StoreType, 78 resolve_registry_store, 79) 80from airbyte_ops_mcp.registry.validate import ( 81 ValidateOptions, 82 ValidationResult, 83 validate_metadata, 84) 85from airbyte_ops_mcp.registry.yank import ( 86 YANK_FILE_NAME, 87 YankResult, 88 unyank_connector_version, 89 yank_connector_version, 90) 91 92__all__ = [ 93 "CONNECTOR_PATH_PREFIX", 94 "DEFAULT_METADATA_SERVICE_BUCKET_NAME", 95 "DEV_METADATA_SERVICE_BUCKET_NAME", 96 "LATEST_GCS_FOLDER_NAME", 97 "METADATA_FILE_NAME", 98 "METADATA_FOLDER", 99 "PROD_METADATA_SERVICE_BUCKET_NAME", 100 "REGISTRY_STORE_ENV_VAR", 101 "RELEASE_CANDIDATE_GCS_FOLDER_NAME", 102 "SONAR_DEV_BUCKET_NAME", 103 "SONAR_PROD_BUCKET_NAME", 104 "YANK_FILE_NAME", 105 "AuditResult", 106 "CompileResult", 107 "ConnectorLanguage", 108 "ConnectorListResult", 109 "ConnectorMetadata", 110 "ConnectorType", 111 "GenerateResult", 112 "MetadataPublishResult", 113 "OutputMode", 114 "PublishArtifactsResult", 115 "PurgeLatestResult", 116 "RebuildResult", 117 "Registry", 118 "RegistryEntryResult", 119 "RegistryStore", 120 "StoreType", 121 "SupportLevel", 122 "UnpublishedConnector", 123 "ValidateOptions", 124 "ValidationResult", 125 "VersionListResult", 126 "YankResult", 127 "compile_registry", 128 "find_unpublished_connectors", 129 "generate_version_artifacts", 130 "get_connector_metadata", 131 "get_gcs_publish_path", 132 "get_registry", 133 "get_registry_entry", 134 "get_registry_spec", 135 "list_connector_versions", 136 "list_registry_connectors", 137 "list_registry_connectors_filtered", 138 "publish_connector_metadata", 139 "publish_version_artifacts", 140 "purge_latest_dirs", 141 "rebuild_registry", 142 "resolve_registry_store", 143 "unyank_connector_version", 144 "validate_metadata", 145 "yank_connector_version", 146]
37@dataclass 38class AuditResult: 39 """Result of auditing which connectors have unpublished versions.""" 40 41 unpublished: list[UnpublishedConnector] = field(default_factory=list) 42 checked_count: int = 0 43 skipped_archived: list[str] = field(default_factory=list) 44 skipped_rc: list[str] = field(default_factory=list) 45 skipped_disabled: list[str] = field(default_factory=list) 46 errors: list[str] = field(default_factory=list)
Result of auditing which connectors have unpublished versions.
107@dataclass 108class CompileResult: 109 """Result of a registry compile operation.""" 110 111 target: str 112 connectors_scanned: int = 0 113 versions_found: int = 0 114 yanked_versions: int = 0 115 latest_updated: int = 0 116 latest_already_current: int = 0 117 cloud_registry_entries: int = 0 118 oss_registry_entries: int = 0 119 composite_registry_entries: int = 0 120 metrics_connector_count: int = 0 121 metrics_registry_entries: int = 0 122 metrics_source: str | None = None 123 metrics_error: str | None = None 124 version_indexes_written: int = 0 125 specs_secrets_mask_properties: int = 0 126 errors: list[str] = field(default_factory=list) 127 dry_run: bool = False 128 129 @property 130 def status(self) -> str: 131 if self.dry_run: 132 return "dry-run" 133 if self.errors: 134 return "completed-with-errors" 135 return "success" 136 137 def summary(self) -> str: 138 return ( 139 f"[{self.status}] Scanned {self.connectors_scanned} connectors, " 140 f"{self.versions_found} versions ({self.yanked_versions} yanked). " 141 f"Latest updated: {self.latest_updated}, " 142 f"already current: {self.latest_already_current}. " 143 f"Registry entries: cloud={self.cloud_registry_entries}, " 144 f"oss={self.oss_registry_entries}, " 145 f"composite={self.composite_registry_entries}. " 146 f"Metrics loaded for {self.metrics_connector_count} connectors, " 147 f"injected into {self.metrics_registry_entries} registry entries. " 148 f"Version indexes: {self.version_indexes_written}. " 149 f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. " 150 f"Errors: {len(self.errors)}." 151 )
Result of a registry compile operation.
137 def summary(self) -> str: 138 return ( 139 f"[{self.status}] Scanned {self.connectors_scanned} connectors, " 140 f"{self.versions_found} versions ({self.yanked_versions} yanked). " 141 f"Latest updated: {self.latest_updated}, " 142 f"already current: {self.latest_already_current}. " 143 f"Registry entries: cloud={self.cloud_registry_entries}, " 144 f"oss={self.oss_registry_entries}, " 145 f"composite={self.composite_registry_entries}. " 146 f"Metrics loaded for {self.metrics_connector_count} connectors, " 147 f"injected into {self.metrics_registry_entries} registry entries. " 148 f"Version indexes: {self.version_indexes_written}. " 149 f"Specs secrets mask: {self.specs_secrets_mask_properties} properties. " 150 f"Errors: {len(self.errors)}." 151 )
88class ConnectorLanguage(StrEnum): 89 """Connector implementation languages.""" 90 91 PYTHON = "python" 92 JAVA = "java" 93 LOW_CODE = "low-code" 94 MANIFEST_ONLY = "manifest-only" 95 96 @classmethod 97 def parse(cls, value: str) -> ConnectorLanguage: 98 """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch.""" 99 try: 100 return cls(value) 101 except ValueError: 102 valid = ", ".join(f"`{m.value}`" for m in cls) 103 raise ValueError( 104 f"Unrecognized language: {value!r}. Expected one of: {valid}." 105 ) from None
Connector implementation languages.
96 @classmethod 97 def parse(cls, value: str) -> ConnectorLanguage: 98 """Parse a string into a `ConnectorLanguage`, raising `ValueError` on mismatch.""" 99 try: 100 return cls(value) 101 except ValueError: 102 valid = ", ".join(f"`{m.value}`" for m in cls) 103 raise ValueError( 104 f"Unrecognized language: {value!r}. Expected one of: {valid}." 105 ) from None
Parse a string into a ConnectorLanguage, raising ValueError on mismatch.
73class ConnectorListResult(BaseModel): 74 """Result of listing connectors in the registry.""" 75 76 bucket_name: str = Field(description="The GCS bucket name") 77 connector_count: int = Field(description="Number of connectors found") 78 connectors: list[str] = Field(description="List of connector names")
Result of listing connectors in the registry.
12class ConnectorMetadata(BaseModel): 13 """Connector metadata from metadata.yaml. 14 15 This model represents the essential metadata about a connector 16 read from its metadata.yaml file in the Airbyte monorepo. 17 """ 18 19 name: str = Field(description="The connector technical name") 20 docker_repository: str = Field(description="The Docker repository") 21 docker_image_tag: str = Field(description="The Docker image tag/version") 22 support_level: str | None = Field( 23 default=None, description="The support level (certified, community, etc.)" 24 ) 25 definition_id: str | None = Field( 26 default=None, description="The connector definition ID" 27 )
Connector metadata from metadata.yaml.
This model represents the essential metadata about a connector read from its metadata.yaml file in the Airbyte monorepo.
70class ConnectorType(StrEnum): 71 """Connector type: source or destination.""" 72 73 SOURCE = "source" 74 DESTINATION = "destination" 75 76 @classmethod 77 def parse(cls, value: str) -> ConnectorType: 78 """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch.""" 79 try: 80 return cls(value) 81 except ValueError: 82 valid = ", ".join(f"`{m.value}`" for m in cls) 83 raise ValueError( 84 f"Unrecognized connector type: {value!r}. Expected one of: {valid}." 85 ) from None
Connector type: source or destination.
76 @classmethod 77 def parse(cls, value: str) -> ConnectorType: 78 """Parse a string into a `ConnectorType`, raising `ValueError` on mismatch.""" 79 try: 80 return cls(value) 81 except ValueError: 82 valid = ", ".join(f"`{m.value}`" for m in cls) 83 raise ValueError( 84 f"Unrecognized connector type: {value!r}. Expected one of: {valid}." 85 ) from None
Parse a string into a ConnectorType, raising ValueError on mismatch.
127@dataclass 128class GenerateResult: 129 """Result of a local artifact generation run.""" 130 131 connector_name: str 132 version: str 133 docker_image: str 134 output_dir: str 135 artifacts_written: list[str] = field(default_factory=list) 136 errors: list[str] = field(default_factory=list) 137 validation_errors: list[str] = field(default_factory=list) 138 dry_run: bool = False 139 140 @property 141 def success(self) -> bool: 142 return len(self.errors) == 0 and len(self.validation_errors) == 0 143 144 def to_dict(self) -> dict[str, Any]: 145 return { 146 "connector_name": self.connector_name, 147 "version": self.version, 148 "docker_image": self.docker_image, 149 "output_dir": self.output_dir, 150 "artifacts_written": self.artifacts_written, 151 "errors": self.errors, 152 "validation_errors": self.validation_errors, 153 "dry_run": self.dry_run, 154 "success": self.success, 155 }
Result of a local artifact generation run.
144 def to_dict(self) -> dict[str, Any]: 145 return { 146 "connector_name": self.connector_name, 147 "version": self.version, 148 "docker_image": self.docker_image, 149 "output_dir": self.output_dir, 150 "artifacts_written": self.artifacts_written, 151 "errors": self.errors, 152 "validation_errors": self.validation_errors, 153 "dry_run": self.dry_run, 154 "success": self.success, 155 }
30class MetadataPublishResult(BaseModel): 31 """Result of a metadata publish operation to GCS. 32 33 This model provides detailed information about the outcome of 34 publishing connector metadata to the registry. 35 """ 36 37 connector_name: str = Field(description="The connector technical name") 38 version: str = Field(description="The version that was published") 39 bucket_name: str = Field(description="The GCS bucket name") 40 versioned_path: str = Field(description="The versioned GCS path") 41 latest_path: str | None = Field( 42 default=None, description="The latest GCS path if updated" 43 ) 44 versioned_uploaded: bool = Field( 45 default=False, description="Whether the versioned metadata was uploaded" 46 ) 47 latest_uploaded: bool = Field( 48 default=False, description="Whether the latest metadata was uploaded" 49 ) 50 status: Literal["success", "dry-run", "already-up-to-date"] = Field( 51 description="The status of the operation" 52 ) 53 message: str = Field(description="Status message describing the outcome") 54 55 def __str__(self) -> str: 56 """Return a string representation of the publish result.""" 57 return f"[{self.status}] {self.connector_name}:{self.version} -> {self.versioned_path}"
Result of a metadata publish operation to GCS.
This model provides detailed information about the outcome of publishing connector metadata to the registry.
51@dataclass 52class PublishArtifactsResult: 53 """Result of a version-artifacts publish operation.""" 54 55 connector_name: str 56 version: str 57 target: str 58 gcs_destination: str 59 files_uploaded: list[str] = field(default_factory=list) 60 errors: list[str] = field(default_factory=list) 61 validation_errors: list[str] = field(default_factory=list) 62 dry_run: bool = False 63 64 @property 65 def success(self) -> bool: 66 return len(self.errors) == 0 and len(self.validation_errors) == 0 67 68 @property 69 def status(self) -> str: 70 if self.dry_run: 71 return "dry-run" 72 if self.errors or self.validation_errors: 73 return "completed-with-errors" 74 return "success"
Result of a version-artifacts publish operation.
154@dataclass 155class PurgeLatestResult: 156 """Result of a purge-latest operation.""" 157 158 target: str 159 connectors_found: int = 0 160 latest_dirs_deleted: int = 0 161 errors: list[str] = field(default_factory=list) 162 dry_run: bool = False 163 164 @property 165 def status(self) -> str: 166 if self.dry_run: 167 return "dry-run" 168 if self.errors: 169 return "completed-with-errors" 170 return "success" 171 172 def summary(self) -> str: 173 return ( 174 f"[{self.status}] Found {self.connectors_found} connectors, " 175 f"deleted {self.latest_dirs_deleted} latest/ directories. " 176 f"Errors: {len(self.errors)}." 177 )
Result of a purge-latest operation.
43@dataclass 44class RebuildResult: 45 """Result of a registry rebuild operation.""" 46 47 source_bucket: str 48 output_mode: OutputMode 49 output_root: str 50 connectors_processed: int = 0 51 blobs_copied: int = 0 52 blobs_skipped: int = 0 53 errors: list[str] = field(default_factory=list) 54 dry_run: bool = False 55 56 @property 57 def status(self) -> str: 58 """Return the status of the rebuild operation.""" 59 if self.dry_run: 60 return "dry-run" 61 if self.errors: 62 return "completed-with-errors" 63 return "success" 64 65 def summary(self) -> str: 66 """Return a human-readable summary.""" 67 return ( 68 f"[{self.status}] Rebuilt {self.connectors_processed} connectors, " 69 f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, " 70 f"{len(self.errors)} errors. Output: {self.output_root}" 71 )
Result of a registry rebuild operation.
56 @property 57 def status(self) -> str: 58 """Return the status of the rebuild operation.""" 59 if self.dry_run: 60 return "dry-run" 61 if self.errors: 62 return "completed-with-errors" 63 return "success"
Return the status of the rebuild operation.
65 def summary(self) -> str: 66 """Return a human-readable summary.""" 67 return ( 68 f"[{self.status}] Rebuilt {self.connectors_processed} connectors, " 69 f"{self.blobs_copied} blobs copied, {self.blobs_skipped} skipped, " 70 f"{len(self.errors)} errors. Output: {self.output_root}" 71 )
Return a human-readable summary.
40class Registry(ABC): 41 """A configured connector registry store. 42 43 A Registry is bound to a specific `airbyte_ops_mcp.registry.store.RegistryStore` 44 (store type + env + optional prefix), and provides methods used by the CLI to 45 read/write registry contents. 46 """ 47 48 def __init__(self, store: RegistryStore) -> None: 49 self.store = store 50 51 @property 52 def store_type(self) -> StoreType: 53 return self.store.store_type 54 55 @property 56 def bucket_name(self) -> str: 57 return self.store.bucket 58 59 @property 60 def prefix(self) -> str: 61 return self.store.prefix 62 63 def _require_no_prefix(self, op_name: str) -> None: 64 """Raise if this op doesn't support prefixed targets.""" 65 66 if self.prefix: 67 raise NotImplementedError( 68 f"Operation '{op_name}' does not yet support store prefixes (got prefix='{self.prefix}')." 69 ) 70 71 # --------------------------------------------------------------------- 72 # Read operations 73 # --------------------------------------------------------------------- 74 75 @abstractmethod 76 def list_connectors( 77 self, 78 *, 79 support_level: SupportLevel | None = None, 80 min_support_level: SupportLevel | None = None, 81 connector_type: ConnectorType | None = None, 82 language: ConnectorLanguage | None = None, 83 ) -> list[str]: 84 raise NotImplementedError( 85 _op_not_implemented_message(self.store_type, "list_connectors") 86 ) 87 88 def list_connector_versions(self, connector_name: str) -> list[str]: 89 raise NotImplementedError( 90 _op_not_implemented_message(self.store_type, "list_connector_versions") 91 ) 92 93 def get_connector_metadata( 94 self, connector_name: str, version: str = "latest" 95 ) -> dict[str, Any]: 96 raise NotImplementedError( 97 _op_not_implemented_message(self.store_type, "get_connector_metadata") 98 ) 99 100 # --------------------------------------------------------------------- 101 # Write / mutate operations 102 # --------------------------------------------------------------------- 103 104 def yank( 105 self, 106 connector_name: str, 107 version: str, 108 reason: str = "", 109 approval_url: str = "", 110 dry_run: bool = False, 111 ) -> YankResult: 112 raise NotImplementedError(_op_not_implemented_message(self.store_type, "yank")) 113 114 def unyank( 115 self, 116 connector_name: str, 117 version: str, 118 dry_run: bool = False, 119 ) -> YankResult: 120 raise NotImplementedError( 121 _op_not_implemented_message(self.store_type, "unyank") 122 ) 123 124 def finalize_progressive_rollout_marker( 125 self, 126 connector_name: str, 127 outcome: Literal["promoted", "aborted"], 128 version: str | None = None, 129 dry_run: bool = False, 130 ) -> ProgressiveRolloutMarkerResult: 131 raise NotImplementedError( 132 _op_not_implemented_message( 133 self.store_type, 134 "finalize_progressive_rollout_marker", 135 ) 136 ) 137 138 def publish_version_artifacts( 139 self, 140 connector_name: str, 141 version: str, 142 artifacts_dir: Path, 143 dry_run: bool = False, 144 with_validate: bool = True, 145 ) -> PublishArtifactsResult: 146 raise NotImplementedError( 147 _op_not_implemented_message(self.store_type, "publish_version_artifacts") 148 ) 149 150 def delete_dev_latest( 151 self, 152 connector_name: list[str] | None = None, 153 dry_run: bool = False, 154 ) -> PurgeLatestResult: 155 raise NotImplementedError( 156 _op_not_implemented_message(self.store_type, "delete_dev_latest") 157 ) 158 159 def compile( 160 self, 161 connector_name: list[str] | None = None, 162 dry_run: bool = False, 163 with_secrets_mask: bool = False, 164 with_legacy_migration: str | None = None, 165 with_metrics: bool = True, 166 force: bool = False, 167 ) -> CompileResult: 168 raise NotImplementedError( 169 _op_not_implemented_message(self.store_type, "compile") 170 ) 171 172 def marketing_stubs_check(self, repo_root: Path) -> dict[str, Any]: 173 raise NotImplementedError( 174 _op_not_implemented_message(self.store_type, "marketing_stubs_check") 175 ) 176 177 def marketing_stubs_sync( 178 self, 179 repo_root: Path, 180 dry_run: bool = False, 181 ) -> dict[str, Any]: 182 raise NotImplementedError( 183 _op_not_implemented_message(self.store_type, "marketing_stubs_sync") 184 ) 185 186 def mirror( 187 self, 188 output_mode: OutputMode, 189 output_path_root: str | None = None, 190 gcs_bucket: str | None = None, 191 s3_bucket: str | None = None, 192 dry_run: bool = False, 193 connector_name: list[str] | None = None, 194 ) -> RebuildResult: 195 raise NotImplementedError( 196 _op_not_implemented_message(self.store_type, "mirror") 197 )
A configured connector registry store.
A Registry is bound to a specific airbyte_ops_mcp.registry.store.RegistryStore
(store type + env + optional prefix), and provides methods used by the CLI to
read/write registry contents.
75 @abstractmethod 76 def list_connectors( 77 self, 78 *, 79 support_level: SupportLevel | None = None, 80 min_support_level: SupportLevel | None = None, 81 connector_type: ConnectorType | None = None, 82 language: ConnectorLanguage | None = None, 83 ) -> list[str]: 84 raise NotImplementedError( 85 _op_not_implemented_message(self.store_type, "list_connectors") 86 )
124 def finalize_progressive_rollout_marker( 125 self, 126 connector_name: str, 127 outcome: Literal["promoted", "aborted"], 128 version: str | None = None, 129 dry_run: bool = False, 130 ) -> ProgressiveRolloutMarkerResult: 131 raise NotImplementedError( 132 _op_not_implemented_message( 133 self.store_type, 134 "finalize_progressive_rollout_marker", 135 ) 136 )
138 def publish_version_artifacts( 139 self, 140 connector_name: str, 141 version: str, 142 artifacts_dir: Path, 143 dry_run: bool = False, 144 with_validate: bool = True, 145 ) -> PublishArtifactsResult: 146 raise NotImplementedError( 147 _op_not_implemented_message(self.store_type, "publish_version_artifacts") 148 )
159 def compile( 160 self, 161 connector_name: list[str] | None = None, 162 dry_run: bool = False, 163 with_secrets_mask: bool = False, 164 with_legacy_migration: str | None = None, 165 with_metrics: bool = True, 166 force: bool = False, 167 ) -> CompileResult: 168 raise NotImplementedError( 169 _op_not_implemented_message(self.store_type, "compile") 170 )
186 def mirror( 187 self, 188 output_mode: OutputMode, 189 output_path_root: str | None = None, 190 gcs_bucket: str | None = None, 191 s3_bucket: str | None = None, 192 dry_run: bool = False, 193 connector_name: list[str] | None = None, 194 ) -> RebuildResult: 195 raise NotImplementedError( 196 _op_not_implemented_message(self.store_type, "mirror") 197 )
60class RegistryEntryResult(BaseModel): 61 """Result of reading a registry entry from GCS. 62 63 This model wraps the raw metadata dictionary with additional context. 64 """ 65 66 connector_name: str = Field(description="The connector technical name") 67 version: str = Field(description="The version that was read") 68 bucket_name: str = Field(description="The GCS bucket name") 69 gcs_path: str = Field(description="The GCS path that was read") 70 metadata: dict = Field(description="The raw metadata dictionary")
Result of reading a registry entry from GCS.
This model wraps the raw metadata dictionary with additional context.
132@dataclass(frozen=True, kw_only=True) 133class RegistryStore: 134 """Parsed store target (type + environment + optional prefix). 135 136 Examples:: 137 138 RegistryStore.parse("coral:dev") 139 # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="") 140 RegistryStore.parse("coral:dev/aj-test") 141 # -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test") 142 RegistryStore.parse("sonar:prod") 143 # -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="") 144 """ 145 146 store_type: StoreType 147 env: str 148 prefix: str = "" 149 150 # -- Derived helpers ----------------------------------------------------- 151 152 @property 153 def bucket(self) -> str: 154 """Resolve the concrete bucket name for this target.""" 155 env_map = BUCKET_MAP.get(self.store_type) 156 if env_map is None: 157 raise ValueError(f"Unknown store type: {self.store_type!r}") 158 bucket_name = env_map.get(self.env) 159 if bucket_name is None: 160 raise ValueError( 161 f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. " 162 f"Expected one of: {', '.join(sorted(env_map))}." 163 ) 164 return bucket_name 165 166 @property 167 def bucket_root(self) -> str: 168 """Bucket name with optional prefix appended (`bucket/prefix`).""" 169 if self.prefix: 170 return f"{self.bucket}/{self.prefix}" 171 return self.bucket 172 173 # -- Parsing ------------------------------------------------------------- 174 175 @classmethod 176 def parse(cls, target: str) -> RegistryStore: 177 """Parse a store target string. 178 179 Accepted formats: 180 181 "coral:dev" 182 183 "coral:prod" 184 "coral:dev/aj-test100" 185 "sonar:prod" 186 187 Raises: 188 ValueError: If the string cannot be parsed or references an 189 unknown store type / environment. 190 """ 191 if ":" not in target: 192 raise ValueError( 193 f"Invalid store target '{target}'. " 194 "Expected format: '<store_type>:<env>[/<prefix>]' " 195 "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')." 196 ) 197 198 store_part, env_part = target.split(":", 1) 199 200 # Validate store type 201 store_part_lower = store_part.lower() 202 valid_types = {t.value: t for t in StoreType} 203 if store_part_lower not in valid_types: 204 raise ValueError( 205 f"Unknown store type '{store_part}'. " 206 f"Expected one of: {', '.join(sorted(valid_types))}." 207 ) 208 store_type = valid_types[store_part_lower] 209 210 # Split env and prefix 211 env_key, _, prefix = env_part.partition("/") 212 prefix = prefix.strip("/") 213 214 # Validate env 215 env_map = BUCKET_MAP.get(store_type, {}) 216 if env_key not in env_map: 217 raise ValueError( 218 f"Unknown environment '{env_key}' for store type '{store_type.value}'. " 219 f"Expected one of: {', '.join(sorted(env_map))}." 220 ) 221 222 return cls(store_type=store_type, env=env_key, prefix=prefix)
Parsed store target (type + environment + optional prefix).
Examples::
RegistryStore.parse("coral:dev")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="")
RegistryStore.parse("coral:dev/aj-test")
# -> RegistryStore(store_type=StoreType.CORAL, env="dev", prefix="aj-test")
RegistryStore.parse("sonar:prod")
# -> RegistryStore(store_type=StoreType.SONAR, env="prod", prefix="")
152 @property 153 def bucket(self) -> str: 154 """Resolve the concrete bucket name for this target.""" 155 env_map = BUCKET_MAP.get(self.store_type) 156 if env_map is None: 157 raise ValueError(f"Unknown store type: {self.store_type!r}") 158 bucket_name = env_map.get(self.env) 159 if bucket_name is None: 160 raise ValueError( 161 f"Unknown environment '{self.env}' for store type '{self.store_type.value}'. " 162 f"Expected one of: {', '.join(sorted(env_map))}." 163 ) 164 return bucket_name
Resolve the concrete bucket name for this target.
166 @property 167 def bucket_root(self) -> str: 168 """Bucket name with optional prefix appended (`bucket/prefix`).""" 169 if self.prefix: 170 return f"{self.bucket}/{self.prefix}" 171 return self.bucket
Bucket name with optional prefix appended (bucket/prefix).
175 @classmethod 176 def parse(cls, target: str) -> RegistryStore: 177 """Parse a store target string. 178 179 Accepted formats: 180 181 "coral:dev" 182 183 "coral:prod" 184 "coral:dev/aj-test100" 185 "sonar:prod" 186 187 Raises: 188 ValueError: If the string cannot be parsed or references an 189 unknown store type / environment. 190 """ 191 if ":" not in target: 192 raise ValueError( 193 f"Invalid store target '{target}'. " 194 "Expected format: '<store_type>:<env>[/<prefix>]' " 195 "(e.g. 'coral:dev', 'sonar:prod', 'coral:dev/my-test')." 196 ) 197 198 store_part, env_part = target.split(":", 1) 199 200 # Validate store type 201 store_part_lower = store_part.lower() 202 valid_types = {t.value: t for t in StoreType} 203 if store_part_lower not in valid_types: 204 raise ValueError( 205 f"Unknown store type '{store_part}'. " 206 f"Expected one of: {', '.join(sorted(valid_types))}." 207 ) 208 store_type = valid_types[store_part_lower] 209 210 # Split env and prefix 211 env_key, _, prefix = env_part.partition("/") 212 prefix = prefix.strip("/") 213 214 # Validate env 215 env_map = BUCKET_MAP.get(store_type, {}) 216 if env_key not in env_map: 217 raise ValueError( 218 f"Unknown environment '{env_key}' for store type '{store_type.value}'. " 219 f"Expected one of: {', '.join(sorted(env_map))}." 220 ) 221 222 return cls(store_type=store_type, env=env_key, prefix=prefix)
Parse a store target string.
Accepted formats:
"coral:dev"
"coral:prod" "coral:dev/aj-test100" "sonar:prod"
Raises:
- ValueError: If the string cannot be parsed or references an unknown store type / environment.
59class StoreType(str, Enum): 60 """Registry store type identifier.""" 61 62 SONAR = "sonar" 63 CORAL = "coral" 64 65 # -- Auto-detection class methods ---------------------------------------- 66 67 @classmethod 68 def get_from_connector_name(cls, name: str) -> StoreType: 69 """Infer the store type from a connector's technical name. 70 71 Connectors whose name starts with `source-` or `destination-` belong 72 to the **coral** registry. All other names belong to **sonar**. 73 74 Args: 75 name: Connector technical name (e.g. `"source-github"` or `"stripe"`). 76 77 Returns: 78 The inferred `StoreType`. 79 """ 80 if name.startswith("source-") or name.startswith("destination-"): 81 return cls.CORAL 82 return cls.SONAR 83 84 @classmethod 85 def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None: 86 """Infer the store type from a repository working directory. 87 88 Checks for well-known directory markers: 89 90 * **sonar** -- `integrations/` alongside `connector-sdk/` 91 * **coral** -- `airbyte-integrations/connectors/` 92 93 Args: 94 path: Directory to inspect. Defaults to `Path.cwd`. 95 96 Returns: 97 The inferred `StoreType`, or `None` if the directory does 98 not match any known registry repository layout. 99 """ 100 if path is None: 101 path = Path.cwd() 102 103 # Sonar markers 104 if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir(): 105 return cls.SONAR 106 107 # Coral markers 108 if (path / "airbyte-integrations" / "connectors").is_dir(): 109 return cls.CORAL 110 111 return None
Registry store type identifier.
67 @classmethod 68 def get_from_connector_name(cls, name: str) -> StoreType: 69 """Infer the store type from a connector's technical name. 70 71 Connectors whose name starts with `source-` or `destination-` belong 72 to the **coral** registry. All other names belong to **sonar**. 73 74 Args: 75 name: Connector technical name (e.g. `"source-github"` or `"stripe"`). 76 77 Returns: 78 The inferred `StoreType`. 79 """ 80 if name.startswith("source-") or name.startswith("destination-"): 81 return cls.CORAL 82 return cls.SONAR
Infer the store type from a connector's technical name.
Connectors whose name starts with source- or destination- belong
to the coral registry. All other names belong to sonar.
Arguments:
- name: Connector technical name (e.g.
"source-github"or"stripe").
Returns:
The inferred
StoreType.
84 @classmethod 85 def detect_from_repo_dir(cls, path: Path | None = None) -> StoreType | None: 86 """Infer the store type from a repository working directory. 87 88 Checks for well-known directory markers: 89 90 * **sonar** -- `integrations/` alongside `connector-sdk/` 91 * **coral** -- `airbyte-integrations/connectors/` 92 93 Args: 94 path: Directory to inspect. Defaults to `Path.cwd`. 95 96 Returns: 97 The inferred `StoreType`, or `None` if the directory does 98 not match any known registry repository layout. 99 """ 100 if path is None: 101 path = Path.cwd() 102 103 # Sonar markers 104 if (path / "integrations").is_dir() and (path / "connector-sdk").is_dir(): 105 return cls.SONAR 106 107 # Coral markers 108 if (path / "airbyte-integrations" / "connectors").is_dir(): 109 return cls.CORAL 110 111 return None
Infer the store type from a repository working directory.
Checks for well-known directory markers:
- sonar --
integrations/alongsideconnector-sdk/ - coral --
airbyte-integrations/connectors/
Arguments:
- path: Directory to inspect. Defaults to
Path.cwd.
Returns:
The inferred
StoreType, orNoneif the directory does not match any known registry repository layout.
14class SupportLevel(StrEnum): 15 """Connector support levels ordered by precedence.""" 16 17 ARCHIVED = "archived" 18 COMMUNITY = "community" 19 CERTIFIED = "certified" 20 21 @property 22 def precedence(self) -> int: 23 """Numeric precedence for ordering comparisons. 24 25 Higher values indicate higher support commitment. 26 """ 27 return _SUPPORT_LEVEL_PRECEDENCE[self] 28 29 @classmethod 30 def from_precedence(cls, precedence: int) -> SupportLevel: 31 """Look up a `SupportLevel` by its numeric precedence value. 32 33 Raises `ValueError` when the precedence is not recognised. 34 """ 35 for member in cls: 36 if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence: 37 return member 38 valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 39 raise ValueError( 40 f"Unrecognized support-level precedence: {precedence!r}. " 41 f"Expected one of: {valid}." 42 ) from None 43 44 @classmethod 45 def parse(cls, value: str) -> SupportLevel: 46 """Parse a string into a `SupportLevel`. 47 48 Accepts a keyword (`archived`, `community`, `certified`) 49 or a legacy integer string (`100`, `200`, `300`). 50 51 Raises `ValueError` when the value is not recognised. 52 """ 53 try: 54 return cls(value) 55 except ValueError: 56 pass 57 # Fallback: try interpreting as an integer precedence value. 58 try: 59 return cls.from_precedence(int(value)) 60 except (ValueError, KeyError): 61 pass 62 valid_kw = ", ".join(f"`{m.value}`" for m in cls) 63 valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 64 raise ValueError( 65 f"Unrecognized support level: {value!r}. " 66 f"Expected keyword ({valid_kw}) or integer ({valid_int})." 67 ) from None
Connector support levels ordered by precedence.
21 @property 22 def precedence(self) -> int: 23 """Numeric precedence for ordering comparisons. 24 25 Higher values indicate higher support commitment. 26 """ 27 return _SUPPORT_LEVEL_PRECEDENCE[self]
Numeric precedence for ordering comparisons.
Higher values indicate higher support commitment.
29 @classmethod 30 def from_precedence(cls, precedence: int) -> SupportLevel: 31 """Look up a `SupportLevel` by its numeric precedence value. 32 33 Raises `ValueError` when the precedence is not recognised. 34 """ 35 for member in cls: 36 if _SUPPORT_LEVEL_PRECEDENCE[member] == precedence: 37 return member 38 valid = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 39 raise ValueError( 40 f"Unrecognized support-level precedence: {precedence!r}. " 41 f"Expected one of: {valid}." 42 ) from None
Look up a SupportLevel by its numeric precedence value.
Raises ValueError when the precedence is not recognised.
44 @classmethod 45 def parse(cls, value: str) -> SupportLevel: 46 """Parse a string into a `SupportLevel`. 47 48 Accepts a keyword (`archived`, `community`, `certified`) 49 or a legacy integer string (`100`, `200`, `300`). 50 51 Raises `ValueError` when the value is not recognised. 52 """ 53 try: 54 return cls(value) 55 except ValueError: 56 pass 57 # Fallback: try interpreting as an integer precedence value. 58 try: 59 return cls.from_precedence(int(value)) 60 except (ValueError, KeyError): 61 pass 62 valid_kw = ", ".join(f"`{m.value}`" for m in cls) 63 valid_int = ", ".join(f"`{_SUPPORT_LEVEL_PRECEDENCE[m]}`" for m in cls) 64 raise ValueError( 65 f"Unrecognized support level: {value!r}. " 66 f"Expected keyword ({valid_kw}) or integer ({valid_int})." 67 ) from None
Parse a string into a SupportLevel.
Accepts a keyword (archived, community, certified)
or a legacy integer string (100, 200, 300).
Raises ValueError when the value is not recognised.
29@dataclass 30class UnpublishedConnector: 31 """A connector whose current local version is not published on GCS.""" 32 33 connector_name: str 34 local_version: str
A connector whose current local version is not published on GCS.
37@dataclass(frozen=True) 38class ValidateOptions: 39 """Options that influence which validators run and how.""" 40 41 docs_path: str | None = None 42 """Path to the connector's documentation file (for `validate_docs_path_exists`).""" 43 44 is_prerelease: bool = False 45 """Whether this is a pre-release build (skips version-decrement checks)."""
Options that influence which validators run and how.
48@dataclass 49class ValidationResult: 50 """Aggregate result from running all validators.""" 51 52 passed: bool = True 53 errors: list[str] = field(default_factory=list) 54 validators_run: int = 0 55 56 def add_error(self, message: str) -> None: 57 self.passed = False 58 self.errors.append(message)
Aggregate result from running all validators.
81class VersionListResult(BaseModel): 82 """Result of listing versions for a connector.""" 83 84 connector_name: str = Field(description="The connector technical name") 85 bucket_name: str = Field(description="The GCS bucket name") 86 version_count: int = Field(description="Number of versions found") 87 versions: list[str] = Field(description="List of version strings")
Result of listing versions for a connector.
29@dataclass 30class YankResult: 31 """Result of a yank or unyank operation.""" 32 33 connector_name: str 34 version: str 35 bucket_name: str 36 action: str # "yank" or "unyank" 37 success: bool 38 message: str 39 dry_run: bool = False 40 41 def to_dict(self) -> dict[str, Any]: 42 """Convert to a dictionary for JSON serialization.""" 43 return { 44 "connector_name": self.connector_name, 45 "version": self.version, 46 "bucket_name": self.bucket_name, 47 "action": self.action, 48 "success": self.success, 49 "message": self.message, 50 "dry_run": self.dry_run, 51 }
Result of a yank or unyank operation.
41 def to_dict(self) -> dict[str, Any]: 42 """Convert to a dictionary for JSON serialization.""" 43 return { 44 "connector_name": self.connector_name, 45 "version": self.version, 46 "bucket_name": self.bucket_name, 47 "action": self.action, 48 "success": self.success, 49 "message": self.message, 50 "dry_run": self.dry_run, 51 }
Convert to a dictionary for JSON serialization.
1581def compile_registry( 1582 *, 1583 store: RegistryStore, 1584 connector_name: list[str] | None = None, 1585 dry_run: bool = False, 1586 with_secrets_mask: bool = False, 1587 with_legacy_migration: str | None = None, 1588 with_metrics: bool = True, 1589 force: bool = False, 1590) -> CompileResult: 1591 """Compile the registry: sync latest/ dirs and write index files. 1592 1593 Steps: 1594 1. Glob for all `metadata.yaml` to discover (connector, version) pairs. 1595 2. Glob for active marker files. 1596 3. Compute the latest GA semver per connector. 1597 4. Compute active release candidates from versioned markers. 1598 5. Glob for `version=*` markers in `latest/` dirs for a fast check. 1599 6. Delete stale `latest/` dirs and recursively copy the versioned dir. 1600 7. Synthesize missing registry entries from pinned latest overrides. 1601 8. (Optional) Legacy migration: delete disabled registry entries. 1602 9. (Optional) Read latest connector metrics. 1603 10. Write global registry JSONs. 1604 11. Write composite registry JSON. 1605 12. Write per-connector `versions.json`. 1606 13. (Optional) Regenerate `specs_secrets_mask.yaml`. 1607 1608 Args: 1609 store: Registry store (bucket + optional prefix). 1610 connector_name: If provided, only resync `latest/` directories for 1611 these connectors (steps 5-6). Index rebuilds (steps 9-12) 1612 always operate on the full set of connectors so that global 1613 registry files remain complete. 1614 dry_run: If True, report what would be done without writing. 1615 with_secrets_mask: If True, regenerate `specs_secrets_mask.yaml`. 1616 with_legacy_migration: If set, run the named migration step. 1617 Currently supported: `"v1"` — delete `{registry_type}.json` 1618 files for connectors whose `registryOverrides.{registry}.enabled` 1619 is `false`. 1620 with_metrics: If True, inject latest connector metrics from the 1621 analytics JSONL export into `generated.metrics`. 1622 force: If True, resync all connectors' latest/ directories even if the 1623 existing version marker matches the computed latest version. This 1624 is useful when metadata content changes without a version bump. 1625 1626 Returns: 1627 A `CompileResult` describing what was done. 1628 """ 1629 if with_legacy_migration and with_legacy_migration not in LEGACY_MIGRATION_VERSIONS: 1630 raise ValueError( 1631 f"Unknown legacy migration version: {with_legacy_migration!r}. " 1632 f"Supported: {', '.join(LEGACY_MIGRATION_VERSIONS)}" 1633 ) 1634 1635 result = CompileResult(target=store.bucket_root, dry_run=dry_run) 1636 1637 token = get_gcs_credentials_token() 1638 fs = gcsfs.GCSFileSystem(token=token) 1639 1640 # --- Steps 1 and 2: Scan versions and active markers --- 1641 # Always scan ALL connectors so that index rebuilds are complete. 1642 _log_progress("Step 1-2: Scanning versions and active markers...") 1643 connector_versions, yanked, progressive_rollouts = _scan_versions_and_markers( 1644 fs, 1645 store=store, 1646 connector_name=None, 1647 ) 1648 result.connectors_scanned = len(connector_versions) 1649 result.versions_found = sum(len(v) for v in connector_versions.values()) 1650 result.yanked_versions = len(yanked) 1651 _log_progress( 1652 " Found %d connectors, %d versions, %d yanked", 1653 result.connectors_scanned, 1654 result.versions_found, 1655 result.yanked_versions, 1656 ) 1657 _log_progress(" Found %d progressive rollout markers", len(progressive_rollouts)) 1658 1659 # --- Step 3: Compute latest --- 1660 _log_progress("Step 3: Computing latest GA version per connector...") 1661 latest_versions = _compute_latest_versions( 1662 connector_versions=connector_versions, 1663 yanked=yanked, 1664 progressive_rollouts=progressive_rollouts, 1665 ) 1666 _log_progress(" Computed latest for %d connectors", len(latest_versions)) 1667 1668 # --- Step 4: Compute release candidates --- 1669 _log_progress("Step 4: Computing active release candidates...") 1670 rc_versions = _compute_release_candidates( 1671 connector_versions=connector_versions, 1672 yanked=yanked, 1673 progressive_rollouts=progressive_rollouts, 1674 ) 1675 _log_progress(" Computed %d active release candidates", len(rc_versions)) 1676 1677 # --- Step 5: Check existing latest markers --- 1678 # When --connector-name is set, only check/sync those connectors (steps 5-6). 1679 # Index rebuilds always use the full unfiltered data. 1680 if connector_name: 1681 connector_name_set = set(connector_name) 1682 sync_scope = { 1683 c: v for c, v in latest_versions.items() if c in connector_name_set 1684 } 1685 _log_progress( 1686 " --connector-name filter: syncing %d of %d connectors", 1687 len(sync_scope), 1688 len(latest_versions), 1689 ) 1690 else: 1691 sync_scope = latest_versions 1692 1693 _log_progress("Step 5: Checking existing latest/ markers...") 1694 sync_scope_names = list(sync_scope) if connector_name else None 1695 existing_markers = _scan_latest_markers( 1696 fs, 1697 store=store, 1698 connector_name=sync_scope_names, 1699 ) 1700 _log_progress(" Found %d existing markers", len(existing_markers)) 1701 1702 stale_connectors: list[str] = [] 1703 pinned_override_synthesis_connectors: list[str] = [] 1704 for connector, expected_version in sync_scope.items(): 1705 current_marker = existing_markers.get(connector) 1706 if force or current_marker != expected_version: 1707 stale_connectors.append(connector) 1708 continue 1709 1710 if _requires_pinned_override_synthesis( 1711 fs, 1712 store=store, 1713 connector=connector, 1714 version=expected_version, 1715 ): 1716 pinned_override_synthesis_connectors.append(connector) 1717 continue 1718 1719 result.latest_already_current += 1 1720 1721 _log_progress( 1722 " %d connectors need latest/ update, %d already current", 1723 len(stale_connectors), 1724 result.latest_already_current, 1725 ) 1726 1727 # --- Step 6: Resync stale latest/ dirs (parallel) --- 1728 if stale_connectors: 1729 _log_progress( 1730 "Step 6: Syncing %d stale latest/ directories (max_workers=%d)...", 1731 len(stale_connectors), 1732 _COMPILE_SYNC_MAX_WORKERS, 1733 ) 1734 1735 def _sync_one_connector(connector: str) -> None: 1736 """Sync a single connector's latest/ dir.""" 1737 version = latest_versions[connector] 1738 _sync_latest_dir( 1739 fs, 1740 store=store, 1741 connector=connector, 1742 version=version, 1743 dry_run=dry_run, 1744 ) 1745 if not dry_run: 1746 _apply_overrides_to_latest_entry( 1747 fs, 1748 store=store, 1749 connector=connector, 1750 version=version, 1751 ) 1752 1753 sorted_stale = sorted(stale_connectors) 1754 with ThreadPoolExecutor(max_workers=_COMPILE_SYNC_MAX_WORKERS) as pool: 1755 futures = {pool.submit(_sync_one_connector, c): c for c in sorted_stale} 1756 for i, future in enumerate(as_completed(futures), 1): 1757 connector = futures[future] 1758 try: 1759 future.result() 1760 result.latest_updated += 1 1761 except Exception as exc: 1762 error_msg = f"Failed to sync latest/ for {connector}: {exc}" 1763 logger.error(error_msg) 1764 result.errors.append(error_msg) 1765 # Delete the (possibly partial) latest/ dir so the next 1766 # compile retries this connector from scratch. 1767 try: 1768 _delete_latest_dir( 1769 fs, 1770 store=store, 1771 connector=connector, 1772 ) 1773 logger.info( 1774 "Cleaned up partial latest/ for %s after failure", 1775 connector, 1776 ) 1777 except Exception as cleanup_exc: 1778 logger.warning( 1779 "Could not clean up latest/ for %s: %s", 1780 connector, 1781 cleanup_exc, 1782 ) 1783 if i % 100 == 0: 1784 _log_progress(" Synced %d / %d...", i, len(sorted_stale)) 1785 else: 1786 _log_progress("Step 6: All latest/ directories are current, nothing to sync.") 1787 1788 # --- Step 7: Synthesize missing latest entries from pinned overrides --- 1789 if pinned_override_synthesis_connectors: 1790 _log_progress( 1791 "Step 7: Synthesizing %d latest/ registry entries from pinned overrides...", 1792 len(pinned_override_synthesis_connectors), 1793 ) 1794 for connector in sorted(pinned_override_synthesis_connectors): 1795 if dry_run: 1796 _log_progress( 1797 " [DRY RUN] Would synthesize pinned latest entries for %s", 1798 connector, 1799 ) 1800 result.latest_updated += 1 1801 continue 1802 try: 1803 _apply_overrides_to_latest_entry( 1804 fs, 1805 store=store, 1806 connector=connector, 1807 version=sync_scope[connector], 1808 ) 1809 result.latest_updated += 1 1810 except Exception as exc: 1811 error_msg = ( 1812 f"Failed to synthesize pinned latest entries for {connector}: {exc}" 1813 ) 1814 logger.error(error_msg) 1815 result.errors.append(error_msg) 1816 1817 # --- Step 8: Legacy migration (optional) --- 1818 if with_legacy_migration == "v1": 1819 _log_progress( 1820 "Step 8: Legacy migration v1 — deleting disabled registry entries..." 1821 ) 1822 migration_deleted = _cleanup_disabled_registry_entries( 1823 fs, 1824 store=store, 1825 connector_versions=connector_versions, 1826 dry_run=dry_run, 1827 ) 1828 total_deleted = sum(len(v) for v in migration_deleted.values()) 1829 if migration_deleted: 1830 for conn, paths in sorted(migration_deleted.items()): 1831 _log_progress( 1832 " %s: %s %d files", 1833 conn, 1834 "would delete" if dry_run else "deleted", 1835 len(paths), 1836 ) 1837 _log_progress( 1838 " Migration v1: %s %d files across %d connectors", 1839 "would delete" if dry_run else "deleted", 1840 total_deleted, 1841 len(migration_deleted), 1842 ) 1843 1844 # --- Step 9: Read latest connector metrics (optional) --- 1845 metrics_bundle = None 1846 if with_metrics and store.store_type == StoreType.CORAL: 1847 _log_progress("Step 9: Reading latest connector metrics JSONL...") 1848 try: 1849 metrics_bundle = read_latest_connector_metrics() 1850 result.metrics_source = metrics_bundle.blob_path 1851 result.metrics_connector_count = metrics_bundle.connector_count 1852 if metrics_bundle.blob_path: 1853 _log_progress( 1854 " Loaded metrics for %d connectors from gs://%s", 1855 metrics_bundle.connector_count, 1856 metrics_bundle.blob_path, 1857 ) 1858 else: 1859 _log_progress(" No connector metrics JSONL file found.") 1860 except Exception as exc: 1861 error_msg = f"Failed to read connector metrics JSONL: {exc}" 1862 logger.warning(error_msg) 1863 result.metrics_error = error_msg 1864 _log_progress(" %s", error_msg) 1865 elif with_metrics: 1866 _log_progress("Step 9: Skipping connector metrics for non-coral registry.") 1867 else: 1868 _log_progress("Step 9: Connector metrics injection disabled.") 1869 1870 # --- Step 10: Compile global registry JSONs --- 1871 _log_progress("Step 10: Compiling global registry JSON files...") 1872 all_registry_entries: list[dict[str, Any]] = [] # collected for Step 13 1873 entries_by_registry_type: dict[str, list[dict[str, Any]]] = {} 1874 for registry_type in VALID_REGISTRIES: 1875 entries = _compile_global_registry( 1876 fs, 1877 store=store, 1878 latest_versions=latest_versions, 1879 registry_type=registry_type, 1880 ) 1881 1882 # Inject release candidate info into entries that have active RCs. 1883 if rc_versions: 1884 rc_entries: dict[str, dict[str, Any]] = {} 1885 for connector, rc_ver in rc_versions.items(): 1886 rc_entry = _read_rc_registry_entry( 1887 fs, 1888 store=store, 1889 connector=connector, 1890 rc_version=rc_ver, 1891 registry_type=registry_type, 1892 ) 1893 if rc_entry: 1894 docker_repo = rc_entry.get( 1895 "dockerRepository", 1896 f"airbyte/{connector}", 1897 ) 1898 rc_entries[docker_repo] = { 1899 "version": rc_ver, 1900 "entry": rc_entry, 1901 } 1902 if rc_entries: 1903 entries = _apply_release_candidates_to_entries(entries, rc_entries) 1904 _log_progress( 1905 " Injected %d release candidates into %s registry", 1906 len(rc_entries), 1907 registry_type, 1908 ) 1909 1910 if metrics_bundle is not None: 1911 injected = apply_metrics_to_registry_entries(entries, metrics_bundle) 1912 result.metrics_registry_entries += injected 1913 _log_progress( 1914 " Injected metrics into %d %s registry entries", 1915 injected, 1916 registry_type, 1917 ) 1918 1919 all_registry_entries.extend(entries) 1920 entries_by_registry_type[registry_type] = entries 1921 registry_json = _build_global_registry_json(entries) 1922 entry_count = len(registry_json["sources"]) + len(registry_json["destinations"]) 1923 1924 if registry_type == "cloud": 1925 result.cloud_registry_entries = entry_count 1926 else: 1927 result.oss_registry_entries = entry_count 1928 1929 if dry_run: 1930 _log_progress( 1931 " [DRY RUN] Would write %s_registry.json (%d entries)", 1932 registry_type, 1933 entry_count, 1934 ) 1935 else: 1936 content = json.dumps(registry_json, indent=2, sort_keys=True) + "\n" 1937 path_prefix = f"{store.prefix}/" if store.prefix else "" 1938 _write_gcs_blob_with_custom_ttl( 1939 bucket_name=store.bucket, 1940 blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/{registry_type}_registry.json", 1941 content=content, 1942 cache_control=_REGISTRY_INDEX_CACHE_CONTROL, 1943 ) 1944 _log_progress( 1945 " Wrote %s_registry.json (%d entries)", 1946 registry_type, 1947 entry_count, 1948 ) 1949 1950 # --- Step 11: Compile composite registry JSON (superset) --- 1951 _log_progress("Step 11: Compiling composite_registry.json (superset)...") 1952 composite_json = _build_composite_registry_json( 1953 cloud_entries=entries_by_registry_type.get("cloud", []), 1954 oss_entries=entries_by_registry_type.get("oss", []), 1955 ) 1956 composite_entry_count = len(composite_json["sources"]) + len( 1957 composite_json["destinations"] 1958 ) 1959 result.composite_registry_entries = composite_entry_count 1960 if dry_run: 1961 _log_progress( 1962 " [DRY RUN] Would write composite_registry.json (%d entries)", 1963 composite_entry_count, 1964 ) 1965 else: 1966 composite_content = json.dumps(composite_json, indent=2, sort_keys=True) + "\n" 1967 path_prefix = f"{store.prefix}/" if store.prefix else "" 1968 _write_gcs_blob_with_custom_ttl( 1969 bucket_name=store.bucket, 1970 blob_path=f"{path_prefix}{_REGISTRIES_PREFIX}/composite_registry.json", 1971 content=composite_content, 1972 cache_control=_REGISTRY_INDEX_CACHE_CONTROL, 1973 ) 1974 _log_progress( 1975 " Wrote composite_registry.json (%d entries)", 1976 composite_entry_count, 1977 ) 1978 1979 # --- Step 12: Per-connector version indexes (parallel) --- 1980 _log_progress( 1981 "Step 12: Writing per-connector version indexes (max_workers=%d)...", 1982 _COMPILE_WRITE_MAX_WORKERS, 1983 ) 1984 base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte" 1985 sorted_connectors = sorted(connector_versions) 1986 1987 def _write_one_version_index(connector: str) -> None: 1988 """Build and write a single connector's versions.json.""" 1989 versions = connector_versions[connector] 1990 latest_v = latest_versions.get(connector) 1991 rc_v = rc_versions.get(connector) 1992 index = _build_version_index( 1993 fs, 1994 store=store, 1995 connector=connector, 1996 versions=versions, 1997 yanked=yanked, 1998 latest_version=latest_v, 1999 rc_version=rc_v, 2000 ) 2001 index_path = f"{base}/{connector}/versions.json" 2002 if dry_run: 2003 _log_progress( 2004 " [DRY RUN] Would write %s/versions.json (%d versions)", 2005 connector, 2006 len(versions), 2007 ) 2008 else: 2009 content = json.dumps(index, indent=2, sort_keys=True) + "\n" 2010 with fs.open(index_path, "w") as f: 2011 f.write(content) 2012 2013 with ThreadPoolExecutor(max_workers=_COMPILE_WRITE_MAX_WORKERS) as pool: 2014 futures = { 2015 pool.submit(_write_one_version_index, c): c for c in sorted_connectors 2016 } 2017 for i, future in enumerate(as_completed(futures), 1): 2018 connector = futures[future] 2019 try: 2020 future.result() 2021 result.version_indexes_written += 1 2022 except Exception as exc: 2023 error_msg = f"Failed to write versions.json for {connector}: {exc}" 2024 logger.error(error_msg) 2025 result.errors.append(error_msg) 2026 if i % 100 == 0: 2027 _log_progress( 2028 " Wrote %d / %d version indexes...", i, len(sorted_connectors) 2029 ) 2030 2031 # --- Step 13: Specs secrets mask (optional) --- 2032 if with_secrets_mask: 2033 _log_progress("Step 13: Generating specs secrets mask...") 2034 # Reuse entries collected during Step 10 to avoid redundant GCS reads. 2035 secret_names = _extract_secret_property_names(all_registry_entries) 2036 sorted_names = sorted(secret_names) 2037 result.specs_secrets_mask_properties = len(sorted_names) 2038 mask_content = yaml.dump({"properties": sorted_names}, default_flow_style=False) 2039 mask_path = ( 2040 f"{store.bucket_root}/{_REGISTRIES_PREFIX}/{_SPECS_SECRETS_MASK_FILENAME}" 2041 ) 2042 2043 _log_progress( 2044 " Found %d secret properties: %s", 2045 len(sorted_names), 2046 ", ".join(sorted_names), 2047 ) 2048 2049 if dry_run: 2050 _log_progress( 2051 " [DRY RUN] Would write %s", 2052 _SPECS_SECRETS_MASK_FILENAME, 2053 ) 2054 else: 2055 with fs.open(mask_path, "w") as f: 2056 f.write(mask_content) 2057 _log_progress( 2058 " Wrote %s", 2059 _SPECS_SECRETS_MASK_FILENAME, 2060 ) 2061 2062 _log_progress(result.summary()) 2063 return result
Compile the registry: sync latest/ dirs and write index files.
Steps:
- Glob for all
metadata.yamlto discover (connector, version) pairs.- Glob for active marker files.
- Compute the latest GA semver per connector.
- Compute active release candidates from versioned markers.
- Glob for
version=*markers inlatest/dirs for a fast check.- Delete stale
latest/dirs and recursively copy the versioned dir.- Synthesize missing registry entries from pinned latest overrides.
- (Optional) Legacy migration: delete disabled registry entries.
- (Optional) Read latest connector metrics.
- Write global registry JSONs.
- Write composite registry JSON.
- Write per-connector
versions.json.- (Optional) Regenerate
specs_secrets_mask.yaml.
Arguments:
- store: Registry store (bucket + optional prefix).
- connector_name: If provided, only resync
latest/directories for these connectors (steps 5-6). Index rebuilds (steps 9-12) always operate on the full set of connectors so that global registry files remain complete. - dry_run: If True, report what would be done without writing.
- with_secrets_mask: If True, regenerate
specs_secrets_mask.yaml. - with_legacy_migration: If set, run the named migration step.
Currently supported:
"v1"— delete{registry_type}.jsonfiles for connectors whoseregistryOverrides.{registry}.enabledisfalse. - with_metrics: If True, inject latest connector metrics from the
analytics JSONL export into
generated.metrics. - force: If True, resync all connectors' latest/ directories even if the existing version marker matches the computed latest version. This is useful when metadata content changes without a version bump.
Returns:
A
CompileResultdescribing what was done.
90def find_unpublished_connectors( 91 repo_path: str | Path, 92 bucket_name: str, 93 connector_names: list[str] | None = None, 94) -> AuditResult: 95 """Find connectors whose local version is not published on GCS. 96 97 For each connector in the local checkout, reads `dockerImageTag` from 98 `metadata.yaml` and checks whether `metadata/<docker-repo>/<version>/metadata.yaml` 99 exists in the GCS bucket. Connectors that are archived, disabled on all 100 registries, or have RC versions are skipped. 101 102 Args: 103 repo_path: Path to the Airbyte monorepo checkout. 104 bucket_name: GCS bucket name to check against. 105 connector_names: Optional list of connector names to check. 106 If `None`, discovers all connectors in the repo. 107 108 Returns: 109 An `AuditResult` containing unpublished connectors and metadata. 110 """ 111 repo_path = Path(repo_path) 112 connectors_dir = repo_path / CONNECTOR_PATH_PREFIX 113 114 if not connectors_dir.exists(): 115 raise ValueError(f"Connectors directory not found: {connectors_dir}") 116 117 # Discover connector names if not provided 118 if connector_names is None: 119 connector_names = sorted( 120 d.name 121 for d in connectors_dir.iterdir() 122 if d.is_dir() and (d / METADATA_FILE_NAME).exists() 123 ) 124 125 result = AuditResult() 126 127 # Collect connectors and their versions first, then batch-check GCS 128 to_check: list[tuple[str, str]] = [] # (connector_name, version) 129 130 for name in connector_names: 131 metadata_path = connectors_dir / name / METADATA_FILE_NAME 132 metadata = _read_local_metadata(metadata_path) 133 if metadata is None: 134 result.errors.append(f"{name}: metadata.yaml not found or unreadable") 135 continue 136 137 if _is_archived(metadata): 138 result.skipped_archived.append(name) 139 continue 140 141 if _is_disabled_on_all_registries(metadata): 142 result.skipped_disabled.append(name) 143 continue 144 145 data = metadata.get("data", {}) 146 version = data.get("dockerImageTag") 147 if not version: 148 result.errors.append(f"{name}: no dockerImageTag in metadata") 149 continue 150 151 if _is_rc_version(version): 152 result.skipped_rc.append(name) 153 continue 154 155 to_check.append((name, version)) 156 157 if not to_check: 158 result.checked_count = 0 159 return result 160 161 # Check GCS for each connector version 162 storage_client = get_gcs_storage_client() 163 bucket = storage_client.bucket(bucket_name) 164 165 for name, version in to_check: 166 result.checked_count += 1 167 blob_path = f"{METADATA_FOLDER}/airbyte/{name}/{version}/{METADATA_FILE_NAME}" 168 blob = bucket.blob(blob_path) 169 170 try: 171 exists = blob.exists() 172 except Exception as e: 173 result.errors.append(f"{name}: GCS check failed: {e}") 174 continue 175 176 if not exists: 177 logger.info( 178 "Unpublished: %s version %s (checked %s)", 179 name, 180 version, 181 blob_path, 182 ) 183 result.unpublished.append( 184 UnpublishedConnector(connector_name=name, local_version=version) 185 ) 186 187 logger.info( 188 "Audit complete: %d checked, %d unpublished, %d archived-skipped, %d disabled-skipped, %d rc-skipped", 189 result.checked_count, 190 len(result.unpublished), 191 len(result.skipped_archived), 192 len(result.skipped_disabled), 193 len(result.skipped_rc), 194 ) 195 196 return result
Find connectors whose local version is not published on GCS.
For each connector in the local checkout, reads dockerImageTag from
metadata.yaml and checks whether metadata/<docker-repo>/<version>/metadata.yaml
exists in the GCS bucket. Connectors that are archived, disabled on all
registries, or have RC versions are skipped.
Arguments:
- repo_path: Path to the Airbyte monorepo checkout.
- bucket_name: GCS bucket name to check against.
- connector_names: Optional list of connector names to check.
If
None, discovers all connectors in the repo.
Returns:
An
AuditResultcontaining unpublished connectors and metadata.
644def generate_version_artifacts( 645 metadata_file: Path, 646 docker_image: str, 647 output_dir: Path | None = None, 648 repo_root: Path | None = None, 649 dry_run: bool = False, 650 with_validate: bool = True, 651 with_dependency_dump: bool = True, 652 with_sbom: bool = True, 653) -> GenerateResult: 654 """Generate all version artifacts for a connector release. 655 656 Artifacts are enriched with git commit info, SBOM URL, and (when applicable) 657 components SHA before writing. Validation is run after generation by default. 658 659 Args: 660 metadata_file: Path to the connector's `metadata.yaml`. 661 docker_image: Docker image to run spec against (e.g. `airbyte/source-faker:6.2.38`). 662 output_dir: Directory to write artifacts to. If `None`, a temp directory is created. 663 repo_root: Root of the Airbyte repo checkout (for resolving `doc.md`). 664 If `None`, inferred by walking up from `metadata_file`. 665 dry_run: If `True`, report what would be generated without writing or running docker. 666 with_validate: If `True` (default), run metadata validators after generation. 667 Pass `False` (`--no-validate`) to skip. 668 with_dependency_dump: If `True` (default), generate `dependencies.json` 669 for Python connectors. Pass `False` (`--no-dependency-dump`) to skip. 670 with_sbom: If `True` (default), generate `spdx.json` (SBOM) for 671 connectors. Pass `False` (`--no-sbom`) to skip. 672 673 Returns: 674 A `GenerateResult` describing what was produced. 675 """ 676 # --- Load metadata --- 677 if not metadata_file.exists(): 678 raise FileNotFoundError(f"Metadata file not found: {metadata_file}") 679 680 raw_metadata: dict[str, Any] = yaml.safe_load(metadata_file.read_text()) 681 metadata_data: dict[str, Any] = raw_metadata.get("data", {}) 682 683 connector_name = metadata_data.get("dockerRepository", "unknown").replace( 684 "airbyte/", "" 685 ) 686 version = metadata_data.get("dockerImageTag", "unknown") 687 688 # --- Resolve output directory --- 689 if output_dir is None: 690 output_dir = Path( 691 tempfile.mkdtemp(prefix=f"connector-artifacts-{connector_name}-{version}-") 692 ) 693 output_dir.mkdir(parents=True, exist_ok=True) 694 695 result = GenerateResult( 696 connector_name=connector_name, 697 version=version, 698 docker_image=docker_image, 699 output_dir=str(output_dir), 700 dry_run=dry_run, 701 ) 702 703 if dry_run: 704 logger.info("[DRY RUN] Would generate artifacts to %s", output_dir) 705 result.artifacts_written = [ 706 "metadata.yaml", 707 "icon.svg", 708 "doc.md", 709 "cloud.json", 710 "oss.json", 711 "manifest.yaml (if present)", 712 "components.zip (if components.py present)", 713 "components.zip.sha256 (if components.py present)", 714 f"version={version}", 715 ] 716 if with_sbom: 717 result.artifacts_written.append(SBOM_FILE_NAME) 718 if with_dependency_dump: 719 result.artifacts_written.append("dependencies.json (if Python connector)") 720 return result 721 722 # --- Prepare metadata output --- 723 metadata_out = output_dir / "metadata.yaml" 724 result.artifacts_written.append("metadata.yaml") 725 726 # --- Enrich metadata with git info *before* building registry entries so 727 # that `generated.git` propagates into `cloud.json` / `oss.json`. --- 728 raw_metadata = _enrich_metadata_git_info(raw_metadata, metadata_file) 729 730 # --- Generate SBOM from the connector Docker image --- 731 sbom_generated = False 732 if not with_sbom: 733 logger.info("SBOM generation disabled via --no-sbom.") 734 else: 735 try: 736 sbom_path = generate_sbom(docker_image, output_dir) 737 except RuntimeError as exc: 738 logger.warning("SBOM generation failed (non-fatal): %s", exc) 739 except (FileNotFoundError, subprocess.TimeoutExpired): 740 logger.warning("Docker not available or SBOM generation timed out.") 741 else: 742 result.artifacts_written.append(SBOM_FILE_NAME) 743 sbom_generated = True 744 logger.info("Generated SBOM: %s", sbom_path) 745 746 # --- Enrich metadata with SBOM URL --- 747 raw_metadata = _enrich_metadata_sbom_url( 748 raw_metadata, sbom_generated=sbom_generated 749 ) 750 751 # --- Run docker spec for cloud and oss --- 752 specs: dict[str, dict[str, Any]] = {} 753 for mode in VALID_REGISTRIES: 754 try: 755 specs[mode] = _run_docker_spec(docker_image, mode) 756 logger.info("Got %s spec from docker image %s", mode, docker_image) 757 except RuntimeError as exc: 758 error_msg = f"Failed to get {mode} spec: {exc}" 759 logger.error(error_msg) 760 result.errors.append(error_msg) 761 762 # --- Generate dependencies.json for Python connectors --- 763 # This must happen *before* building registry entries so that the 764 # local dependencies data can be used for packageInfo without a GCS 765 # round-trip. 766 local_dependencies: dict[str, Any] | None = None 767 if not with_dependency_dump: 768 logger.info("Dependency generation disabled via --no-dependency-dump.") 769 elif _is_python_connector(metadata_data): 770 logger.info("Python connector detected — generating dependencies.json") 771 local_dependencies = generate_python_dependencies_file( 772 metadata_data=metadata_data, 773 docker_image=docker_image, 774 output_dir=output_dir, 775 ) 776 if local_dependencies is not None: 777 result.artifacts_written.append(CONNECTOR_DEPENDENCY_FILE_NAME) 778 else: 779 logger.info( 780 "Non-Python connector (%s) — skipping dependencies.json generation.", 781 connector_name, 782 ) 783 784 # --- Generate registry entries (cloud.json, oss.json) --- 785 for registry_type in VALID_REGISTRIES: 786 if not is_registry_enabled(metadata_data, registry_type): 787 logger.info( 788 "Registry type %s is not enabled for %s, skipping %s.json generation.", 789 registry_type, 790 connector_name, 791 registry_type, 792 ) 793 continue 794 795 spec = specs.get(registry_type) 796 if spec is None: 797 error_msg = ( 798 f"Cannot generate {registry_type}.json: no spec available " 799 f"(docker spec for {registry_type} failed or was not run)." 800 ) 801 result.errors.append(error_msg) 802 continue 803 804 registry_entry = _build_registry_entry( 805 metadata_data, 806 registry_type, 807 spec, 808 local_dependencies=local_dependencies, 809 ) 810 811 out_path = output_dir / f"{registry_type}.json" 812 out_path.write_text( 813 json.dumps(registry_entry, indent=2, sort_keys=True, default=_json_serial) 814 + "\n" 815 ) 816 result.artifacts_written.append(f"{registry_type}.json") 817 logger.info("Wrote %s", out_path) 818 819 # --- Copy icon.svg (sibling of metadata.yaml in the connector directory) --- 820 icon_source = metadata_file.parent / "icon.svg" 821 if icon_source.is_file(): 822 icon_out = output_dir / "icon.svg" 823 shutil.copy2(icon_source, icon_out) 824 result.artifacts_written.append("icon.svg") 825 logger.info("Wrote %s", icon_out) 826 else: 827 logger.warning("No icon.svg found at %s.", icon_source) 828 result.errors.append("Icon file is missing.") 829 830 # --- Copy doc.md (derived from documentationUrl in metadata) --- 831 if repo_root is None: 832 # Infer repo root by walking up from metadata_file looking for .git 833 # Note: .git can be a directory (normal clone) or a file (git worktree) 834 # Resolve to absolute path first so the walk-up works with relative paths. 835 candidate = metadata_file.resolve().parent 836 while candidate != candidate.parent: 837 git_indicator = candidate / ".git" 838 if git_indicator.is_dir() or git_indicator.is_file(): 839 repo_root = candidate 840 break 841 candidate = candidate.parent 842 843 if repo_root is not None: 844 doc_source = _resolve_doc_path(metadata_data, repo_root) 845 if doc_source is not None and doc_source.is_file(): 846 doc_out = output_dir / DOC_FILE_NAME 847 shutil.copy2(doc_source, doc_out) 848 result.artifacts_written.append(DOC_FILE_NAME) 849 logger.info("Wrote %s (from %s)", doc_out, doc_source) 850 else: 851 error_msg = ( 852 f"Documentation file not found: {doc_source}. " 853 f"Derived from documentationUrl in metadata." 854 ) 855 logger.error(error_msg) 856 result.errors.append(error_msg) 857 else: 858 error_msg = "Cannot resolve doc.md: repo root not found." 859 logger.error(error_msg) 860 result.errors.append(error_msg) 861 862 # --- Copy manifest.yaml (from connector root, if present) --- 863 connector_dir = metadata_file.parent 864 manifest_source = connector_dir / MANIFEST_FILE_NAME 865 components_sha256: str | None = None 866 if manifest_source.is_file(): 867 manifest_out = output_dir / MANIFEST_FILE_NAME 868 shutil.copy2(manifest_source, manifest_out) 869 result.artifacts_written.append(MANIFEST_FILE_NAME) 870 logger.info("Wrote %s", manifest_out) 871 872 # --- Generate components.zip if components.py exists --- 873 components_source = connector_dir / COMPONENTS_PY_FILE_NAME 874 if components_source.is_file(): 875 zip_path, sha256_path = _create_components_zip( 876 manifest_path=manifest_source, 877 components_path=components_source, 878 output_dir=output_dir, 879 ) 880 result.artifacts_written.append(COMPONENTS_ZIP_FILE_NAME) 881 result.artifacts_written.append(COMPONENTS_ZIP_SHA256_FILE_NAME) 882 logger.info("Wrote %s and %s", zip_path, sha256_path) 883 # Read back the SHA256 for metadata enrichment 884 components_sha256 = sha256_path.read_text().strip() 885 else: 886 logger.info( 887 "No manifest.yaml at %s — skipping manifest artifacts.", manifest_source 888 ) 889 890 # --- Enrich metadata with components SHA (after zip creation) --- 891 raw_metadata = _enrich_metadata_components_sha(raw_metadata, components_sha256) 892 893 # --- Write final enriched metadata.yaml --- 894 # Use sort_keys=True to match the legacy pipeline's alphabetical key ordering. 895 # After Registry 2.0 launches we are free to change the key ordering. 896 metadata_out.write_text( 897 yaml.dump(raw_metadata, default_flow_style=False, sort_keys=True) 898 ) 899 logger.info("Wrote enriched %s", metadata_out) 900 901 # --- Write version marker file (version=<semver>) --- 902 # This zero-byte file is used by the compile step as a fast-check marker. 903 # Including it in the generated artifacts means `latest/` gets the marker 904 # for free via a recursive copy, removing the need for a separate write. 905 marker_file = output_dir / f"version={version}" 906 marker_file.write_bytes(b"") 907 result.artifacts_written.append(f"version={version}") 908 logger.info("Wrote version marker %s", marker_file) 909 910 # --- Validate metadata (after generation) --- 911 if with_validate: 912 logger.info("Running post-generation validation...") 913 doc_path: str | None = None 914 if repo_root is not None: 915 resolved = _resolve_doc_path(metadata_data, repo_root) 916 doc_path = str(resolved) if resolved else None 917 validation = validate_metadata( 918 metadata_data=metadata_data, 919 opts=ValidateOptions(docs_path=doc_path), 920 ) 921 if not validation.passed: 922 for err in validation.errors: 923 logger.error("Validation error: %s", err) 924 result.validation_errors = validation.errors 925 else: 926 logger.info("Validation passed (%d validators).", validation.validators_run) 927 928 return result
Generate all version artifacts for a connector release.
Artifacts are enriched with git commit info, SBOM URL, and (when applicable) components SHA before writing. Validation is run after generation by default.
Arguments:
- metadata_file: Path to the connector's
metadata.yaml. - docker_image: Docker image to run spec against (e.g.
airbyte/source-faker:6.2.38). - output_dir: Directory to write artifacts to. If
None, a temp directory is created. - repo_root: Root of the Airbyte repo checkout (for resolving
doc.md). IfNone, inferred by walking up frommetadata_file. - dry_run: If
True, report what would be generated without writing or running docker. - with_validate: If
True(default), run metadata validators after generation. PassFalse(--no-validate) to skip. - with_dependency_dump: If
True(default), generatedependencies.jsonfor Python connectors. PassFalse(--no-dependency-dump) to skip. - with_sbom: If
True(default), generatespdx.json(SBOM) for connectors. PassFalse(--no-sbom) to skip.
Returns:
A
GenerateResultdescribing what was produced.
37def get_connector_metadata(repo_path: Path, connector_name: str) -> ConnectorMetadata: 38 """Read connector metadata from metadata.yaml. 39 40 Args: 41 repo_path: Path to the Airbyte monorepo. 42 connector_name: The connector technical name (e.g., 'source-github'). 43 44 Returns: 45 ConnectorMetadata object with the connector's metadata. 46 47 Raises: 48 FileNotFoundError: If the connector directory or metadata file doesn't exist. 49 """ 50 connector_dir = repo_path / CONNECTOR_PATH_PREFIX / connector_name 51 if not connector_dir.exists(): 52 raise FileNotFoundError(f"Connector directory not found: {connector_dir}") 53 54 metadata_file = connector_dir / METADATA_FILE_NAME 55 if not metadata_file.exists(): 56 raise FileNotFoundError(f"Metadata file not found: {metadata_file}") 57 58 with open(metadata_file) as f: 59 metadata = yaml.safe_load(f) 60 61 data = metadata.get("data", {}) 62 return ConnectorMetadata( 63 name=connector_name, 64 docker_repository=data.get("dockerRepository", f"airbyte/{connector_name}"), 65 docker_image_tag=data.get("dockerImageTag", "unknown"), 66 support_level=data.get("supportLevel"), 67 definition_id=data.get("definitionId"), 68 )
Read connector metadata from metadata.yaml.
Arguments:
- repo_path: Path to the Airbyte monorepo.
- connector_name: The connector technical name (e.g., 'source-github').
Returns:
ConnectorMetadata object with the connector's metadata.
Raises:
- FileNotFoundError: If the connector directory or metadata file doesn't exist.
71def get_gcs_publish_path( 72 connector_name: str, 73 artifact_type: str, 74 version: str = LATEST_GCS_FOLDER_NAME, 75) -> str: 76 """Compute the GCS path for a connector artifact for publishing. 77 78 All connectors use the airbyte/{connector_name} convention. 79 """ 80 artifact_files = { 81 "metadata": METADATA_FILE_NAME, 82 "spec": "spec.json", 83 "icon": "icon.svg", 84 "doc": "doc.md", 85 } 86 87 if artifact_type not in artifact_files: 88 raise ValueError( 89 f"Unknown artifact type: {artifact_type}. " 90 f"Valid types are: {', '.join(artifact_files.keys())}" 91 ) 92 93 file_name = artifact_files[artifact_type] 94 return f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{file_name}"
Compute the GCS path for a connector artifact for publishing.
All connectors use the airbyte/{connector_name} convention.
200def get_registry(store: RegistryStore) -> Registry: 201 """Factory for obtaining the right store implementation.""" 202 203 if store.store_type == StoreType.CORAL: 204 from airbyte_ops_mcp.registry.coral_registry_store import CoralRegistry 205 206 return CoralRegistry(store) 207 208 if store.store_type == StoreType.SONAR: 209 from airbyte_ops_mcp.registry.sonar_registry_store import SonarRegistry 210 211 return SonarRegistry(store) 212 213 # defensive: StoreType is an Enum, but keep this for readability 214 raise ValueError(f"Unknown store type: {store.store_type}")
Factory for obtaining the right store implementation.
43def get_registry_entry( 44 connector_name: str, 45 bucket_name: str, 46 version: str = LATEST_GCS_FOLDER_NAME, 47) -> dict[str, Any]: 48 """Get a connector's registry entry from GCS. 49 50 Reads metadata for a connector from the registry stored in GCS. 51 52 Args: 53 connector_name: The connector name (e.g., "source-faker", "destination-postgres") 54 bucket_name: Name of the GCS bucket containing the registry 55 version: Version folder name (e.g., "latest", "1.2.3") 56 57 Returns: 58 dict: The connector's metadata as a dictionary 59 60 Raises: 61 ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure 62 FileNotFoundError: If the connector metadata is not found in the registry 63 yaml.YAMLError: If the metadata file contains invalid YAML syntax 64 """ 65 storage_client = get_gcs_storage_client() 66 bucket = storage_client.bucket(bucket_name) 67 68 # Construct the path to the metadata file 69 # Pattern: metadata/airbyte/{connector_name}/{version}/metadata.yaml 70 blob_path = ( 71 f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{METADATA_FILE_NAME}" 72 ) 73 blob = bucket.blob(blob_path) 74 75 logger.info(f"Reading registry entry for {connector_name} from {blob_path}") 76 77 # Read the file 78 content = safe_read_gcs_file(blob) 79 if content is None: 80 raise FileNotFoundError( 81 f"Connector metadata not found in registry: {connector_name}. " 82 f"Checked path: {blob_path}" 83 ) 84 85 # Parse YAML 86 try: 87 metadata = yaml.safe_load(content) 88 if metadata is None or not isinstance(metadata, dict): 89 raise ValueError(f"Metadata file {blob_path} has an invalid structure") 90 return metadata 91 except yaml.YAMLError as e: 92 logger.error( 93 "Failed to parse metadata for %s from %s: %s", 94 connector_name, 95 blob_path, 96 e, 97 ) 98 raise
Get a connector's registry entry from GCS.
Reads metadata for a connector from the registry stored in GCS.
Arguments:
- connector_name: The connector name (e.g., "source-faker", "destination-postgres")
- bucket_name: Name of the GCS bucket containing the registry
- version: Version folder name (e.g., "latest", "1.2.3")
Returns:
dict: The connector's metadata as a dictionary
Raises:
- ValueError: If GCS credentials are not configured, or if the metadata has an invalid structure
- FileNotFoundError: If the connector metadata is not found in the registry
- yaml.YAMLError: If the metadata file contains invalid YAML syntax
101def get_registry_spec( 102 connector_name: str, 103 bucket_name: str, 104 version: str = LATEST_GCS_FOLDER_NAME, 105) -> dict[str, Any]: 106 """Get a connector's spec from GCS. 107 108 Reads the connector specification from the registry stored in GCS. 109 110 Args: 111 connector_name: The connector name (e.g., "source-faker", "destination-postgres") 112 bucket_name: Name of the GCS bucket containing the registry 113 version: Version folder name (e.g., "latest", "1.2.3") 114 115 Returns: 116 dict: The connector's spec as a dictionary 117 118 Raises: 119 ValueError: If GCS credentials are not configured, or if the spec is not a JSON object 120 FileNotFoundError: If the connector spec is not found in the registry 121 json.JSONDecodeError: If the spec file contains invalid JSON syntax 122 """ 123 storage_client = get_gcs_storage_client() 124 bucket = storage_client.bucket(bucket_name) 125 126 # Construct the path to the spec file 127 # Pattern: metadata/airbyte/{connector_name}/{version}/spec.json 128 blob_path = f"{METADATA_FOLDER}/airbyte/{connector_name}/{version}/{SPEC_FILE_NAME}" 129 blob = bucket.blob(blob_path) 130 131 logger.info(f"Reading spec for {connector_name} from {blob_path}") 132 133 # Read the file 134 content = safe_read_gcs_file(blob) 135 if content is None: 136 raise FileNotFoundError( 137 f"Connector spec not found in registry: {connector_name}. " 138 f"Checked path: {blob_path}" 139 ) 140 141 # Parse JSON 142 try: 143 spec = json.loads(content) 144 if spec is None or not isinstance(spec, dict): 145 raise ValueError( 146 f"Spec file for {connector_name} at {blob_path} is not a JSON object" 147 ) 148 return spec 149 except json.JSONDecodeError as e: 150 logger.error( 151 "Failed to parse spec for %s from %s: %s", 152 connector_name, 153 blob_path, 154 e, 155 ) 156 raise
Get a connector's spec from GCS.
Reads the connector specification from the registry stored in GCS.
Arguments:
- connector_name: The connector name (e.g., "source-faker", "destination-postgres")
- bucket_name: Name of the GCS bucket containing the registry
- version: Version folder name (e.g., "latest", "1.2.3")
Returns:
dict: The connector's spec as a dictionary
Raises:
- ValueError: If GCS credentials are not configured, or if the spec is not a JSON object
- FileNotFoundError: If the connector spec is not found in the registry
- json.JSONDecodeError: If the spec file contains invalid JSON syntax
325def list_connector_versions(connector_name: str, bucket_name: str) -> list[str]: 326 """List all versions of a connector in the registry. 327 328 Scans the GCS bucket to find all versions of a specific connector. 329 330 Args: 331 connector_name: The connector name (e.g., "source-faker") 332 bucket_name: Name of the GCS bucket containing the registry 333 334 Returns: 335 list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate') 336 337 Raises: 338 ValueError: If GCS credentials are not configured 339 """ 340 storage_client = get_gcs_storage_client() 341 bucket = storage_client.bucket(bucket_name) 342 343 # List all blobs matching the pattern: metadata/airbyte/{connector_name}/*/metadata.yaml 344 glob_pattern = f"{METADATA_FOLDER}/airbyte/{connector_name}/*/{METADATA_FILE_NAME}" 345 logger.info(f"Listing versions for {connector_name} with pattern: {glob_pattern}") 346 347 try: 348 blobs = bucket.list_blobs(match_glob=glob_pattern) 349 except Exception as e: 350 logger.error(f"Error listing blobs in bucket {bucket_name}: {e}") 351 raise 352 353 # Extract versions from blob paths 354 # Path format: metadata/airbyte/{connector-name}/{version}/metadata.yaml 355 versions: set[str] = set() 356 for blob in blobs: 357 path_parts = blob.name.split("/") 358 # Path should be: metadata / airbyte / connector-name / version / metadata.yaml 359 if len(path_parts) >= 5: 360 version = path_parts[3] 361 # Exclude special folders 362 if version not in ("latest", "release_candidate"): 363 versions.add(version) 364 365 return sorted(versions)
List all versions of a connector in the registry.
Scans the GCS bucket to find all versions of a specific connector.
Arguments:
- connector_name: The connector name (e.g., "source-faker")
- bucket_name: Name of the GCS bucket containing the registry
Returns:
list[str]: Sorted list of version strings (excluding 'latest' and 'release_candidate')
Raises:
- ValueError: If GCS credentials are not configured
159def list_registry_connectors(bucket_name: str) -> list[str]: 160 """List all connectors in the registry. 161 162 Scans the GCS bucket to find all connectors that have metadata files. 163 164 Args: 165 bucket_name: Name of the GCS bucket containing the registry 166 167 Returns: 168 list[str]: Sorted list of connector names 169 170 Raises: 171 ValueError: If GCS credentials are not configured 172 """ 173 storage_client = get_gcs_storage_client() 174 bucket = storage_client.bucket(bucket_name) 175 176 # List all blobs matching the pattern: metadata/airbyte/*/latest/metadata.yaml 177 glob_pattern = ( 178 f"{METADATA_FOLDER}/airbyte/*/{LATEST_GCS_FOLDER_NAME}/{METADATA_FILE_NAME}" 179 ) 180 logger.info(f"Listing connectors with pattern: {glob_pattern}") 181 182 try: 183 blobs = bucket.list_blobs(match_glob=glob_pattern) 184 except Exception as e: 185 logger.error(f"Error listing blobs in bucket {bucket_name}: {e}") 186 raise 187 188 # Extract connector names from blob paths 189 # Path format: metadata/airbyte/{connector-name}/latest/metadata.yaml 190 connector_names: set[str] = set() 191 for blob in blobs: 192 path_parts = blob.name.split("/") 193 # Path should be: metadata / airbyte / connector-name / latest / metadata.yaml 194 if len(path_parts) >= 5: 195 connector_name = path_parts[2] 196 connector_names.add(connector_name) 197 198 return sorted(connector_names)
List all connectors in the registry.
Scans the GCS bucket to find all connectors that have metadata files.
Arguments:
- bucket_name: Name of the GCS bucket containing the registry
Returns:
list[str]: Sorted list of connector names
Raises:
- ValueError: If GCS credentials are not configured
201def list_registry_connectors_filtered( 202 bucket_name: str, 203 *, 204 support_level: SupportLevel | None = None, 205 min_support_level: SupportLevel | None = None, 206 connector_type: ConnectorType | None = None, 207 language: ConnectorLanguage | None = None, 208 prefix: str = "", 209) -> list[str]: 210 """List connectors from the compiled cloud registry index with filtering. 211 212 When any filter is applied, reads the compiled `cloud_registry.json` index 213 instead of globbing individual metadata blobs. This is significantly faster 214 because the index is a single JSON file containing all connector entries. 215 216 When no filters are applied, falls back to the existing glob-based search 217 which captures all connectors (including OSS-only connectors not in the 218 Cloud index). 219 220 Args: 221 bucket_name: Name of the GCS bucket containing the registry. 222 support_level: Exact support level to match (e.g., `SupportLevel.CERTIFIED`). 223 min_support_level: Minimum support level threshold. Returns connectors 224 at or above this level. 225 connector_type: Filter by connector type (`ConnectorType.SOURCE` or 226 `ConnectorType.DESTINATION`). 227 language: Filter by implementation language (e.g., `ConnectorLanguage.PYTHON`). 228 prefix: Optional bucket prefix (e.g., `"aj-test100"`). 229 230 Returns: 231 Sorted list of connector technical names (e.g., `"source-github"`). 232 233 Raises: 234 ValueError: If `support_level` and `min_support_level` are both provided. 235 """ 236 has_filters = any([support_level, min_support_level, connector_type, language]) 237 238 if not has_filters: 239 return list_registry_connectors(bucket_name=bucket_name) 240 241 if support_level and min_support_level: 242 raise ValueError( 243 "Cannot specify both `support_level` and `min_support_level`. " 244 "Use `support_level` for an exact match or `min_support_level` for a threshold." 245 ) 246 247 entries = _read_cloud_registry_index(bucket_name=bucket_name, prefix=prefix) 248 249 # Apply support_level exact match 250 if support_level: 251 entries = [e for e in entries if e.get("supportLevel") == support_level] 252 253 # Apply min_support_level threshold 254 if min_support_level: 255 threshold = min_support_level.precedence 256 known_levels = {m.value for m in SupportLevel} 257 entries = [ 258 e 259 for e in entries 260 if e.get("supportLevel") 261 and e["supportLevel"] in known_levels 262 and SupportLevel(e["supportLevel"]).precedence >= threshold 263 ] 264 265 # Apply connector_type filter 266 if connector_type == ConnectorType.SOURCE: 267 entries = [e for e in entries if "sourceDefinitionId" in e] 268 elif connector_type == ConnectorType.DESTINATION: 269 entries = [e for e in entries if "destinationDefinitionId" in e] 270 271 # Apply language filter 272 if language: 273 entries = [e for e in entries if e.get("language") == language] 274 275 # Extract connector names from dockerRepository (e.g., "airbyte/source-github" -> "source-github") 276 names: set[str] = set() 277 for entry in entries: 278 docker_repo = entry.get("dockerRepository", "") 279 if "/" in docker_repo: 280 names.add(docker_repo.split("/", 1)[1]) 281 elif docker_repo: 282 names.add(docker_repo) 283 284 return sorted(names)
List connectors from the compiled cloud registry index with filtering.
When any filter is applied, reads the compiled cloud_registry.json index
instead of globbing individual metadata blobs. This is significantly faster
because the index is a single JSON file containing all connector entries.
When no filters are applied, falls back to the existing glob-based search which captures all connectors (including OSS-only connectors not in the Cloud index).
Arguments:
- bucket_name: Name of the GCS bucket containing the registry.
- support_level: Exact support level to match (e.g.,
SupportLevel.CERTIFIED). - min_support_level: Minimum support level threshold. Returns connectors at or above this level.
- connector_type: Filter by connector type (
ConnectorType.SOURCEorConnectorType.DESTINATION). - language: Filter by implementation language (e.g.,
ConnectorLanguage.PYTHON). - prefix: Optional bucket prefix (e.g.,
"aj-test100").
Returns:
Sorted list of connector technical names (e.g.,
"source-github").
Raises:
- ValueError: If
support_levelandmin_support_levelare both provided.
97def publish_connector_metadata( 98 connector_name: str, 99 metadata: dict[str, Any], 100 bucket_name: str, 101 version: str, 102 update_latest: bool = True, 103 dry_run: bool = False, 104) -> MetadataPublishResult: 105 """Publish connector metadata to GCS. 106 107 Uploads the metadata to the registry bucket at a versioned path, and optionally 108 also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading 109 unchanged files. 110 111 Requires GCS_CREDENTIALS environment variable to be set. 112 """ 113 if not isinstance(metadata, dict): 114 raise ValueError("Metadata must be a dictionary") 115 116 if "data" not in metadata: 117 raise ValueError("Metadata must contain 'data' field") 118 119 # Construct GCS paths using airbyte/{connector_name} convention 120 versioned_blob_path = get_gcs_publish_path(connector_name, "metadata", version) 121 latest_blob_path = get_gcs_publish_path( 122 connector_name, "metadata", LATEST_GCS_FOLDER_NAME 123 ) 124 125 if dry_run: 126 message = f"[DRY RUN] Would upload metadata to gs://{bucket_name}/{versioned_blob_path}" 127 if update_latest: 128 message += f" and gs://{bucket_name}/{latest_blob_path}" 129 logger.info(message) 130 return MetadataPublishResult( 131 connector_name=connector_name, 132 version=version, 133 bucket_name=bucket_name, 134 versioned_path=versioned_blob_path, 135 latest_path=latest_blob_path if update_latest else None, 136 versioned_uploaded=False, 137 latest_uploaded=False, 138 status="dry-run", 139 message=message, 140 ) 141 142 # Get GCS client and bucket 143 storage_client = get_gcs_storage_client() 144 bucket = storage_client.bucket(bucket_name) 145 146 # Write metadata to temp file 147 with tempfile.NamedTemporaryFile( 148 mode="w", suffix=".yaml", delete=False 149 ) as tmp_file: 150 yaml.dump(metadata, tmp_file) 151 tmp_path = Path(tmp_file.name) 152 153 try: 154 # Upload versioned file 155 versioned_uploaded, _ = upload_file_if_changed( 156 local_file_path=tmp_path, 157 bucket=bucket, 158 blob_path=versioned_blob_path, 159 disable_cache=True, 160 ) 161 162 if versioned_uploaded: 163 logger.info( 164 f"Uploaded metadata for {connector_name} v{version} to {versioned_blob_path}" 165 ) 166 else: 167 logger.info( 168 f"Versioned metadata for {connector_name} v{version} is already up to date" 169 ) 170 171 # Optionally update latest pointer 172 latest_uploaded = False 173 if update_latest: 174 latest_uploaded, _ = upload_file_if_changed( 175 local_file_path=tmp_path, 176 bucket=bucket, 177 blob_path=latest_blob_path, 178 disable_cache=True, 179 ) 180 if latest_uploaded: 181 logger.info(f"Updated latest pointer for {connector_name}") 182 else: 183 logger.info( 184 f"Latest pointer for {connector_name} is already up to date" 185 ) 186 finally: 187 # Clean up temp file even if upload fails 188 tmp_path.unlink(missing_ok=True) 189 190 # Determine status 191 if versioned_uploaded or latest_uploaded: 192 status = "success" 193 message = f"Published metadata for {connector_name} v{version}" 194 if versioned_uploaded: 195 message += f" to {versioned_blob_path}" 196 if latest_uploaded: 197 message += " and updated latest" 198 else: 199 status = "already-up-to-date" 200 message = f"Metadata for {connector_name} v{version} is already up to date" 201 202 return MetadataPublishResult( 203 connector_name=connector_name, 204 version=version, 205 bucket_name=bucket_name, 206 versioned_path=versioned_blob_path, 207 latest_path=latest_blob_path if update_latest else None, 208 versioned_uploaded=versioned_uploaded, 209 latest_uploaded=latest_uploaded, 210 status=status, 211 message=message, 212 )
Publish connector metadata to GCS.
Uploads the metadata to the registry bucket at a versioned path, and optionally also updates the 'latest' pointer. Uses MD5 hash comparison to avoid re-uploading unchanged files.
Requires GCS_CREDENTIALS environment variable to be set.
144def publish_version_artifacts( 145 connector_name: str, 146 version: str, 147 artifacts_dir: Path, 148 store: RegistryStore, 149 dry_run: bool = False, 150 with_validate: bool = True, 151) -> PublishArtifactsResult: 152 """Publish locally generated artifacts to a GCS registry bucket. 153 154 Uses `gcsfs.GCSFileSystem` to upload the local *artifacts_dir* to the 155 versioned path inside the target GCS bucket. 156 157 The target GCS path is: 158 `gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/` 159 160 Before uploading, this function validates that the `connector_name` 161 (derived from the connector directory) matches the `dockerRepository` 162 declared in `metadata.yaml`. A mismatch would cause the registry 163 compile step to see duplicate definition-ID entries and fail. 164 165 Args: 166 connector_name: Connector name (e.g. `source-faker`). 167 version: Version string (e.g. `6.2.38`). 168 artifacts_dir: Local directory containing artifacts from `generate`. 169 store: Parsed store target containing bucket, prefix, and stage info. 170 dry_run: If `True`, report what would be uploaded without writing. 171 with_validate: If `True` (default), validate metadata before uploading. 172 Pass `False` (`--no-validate`) to skip. 173 174 Returns: 175 A `PublishArtifactsResult` describing what was published. 176 177 Raises: 178 ValueError: If the connector directory name does not match 179 `dockerRepository` in the generated metadata. 180 """ 181 if not artifacts_dir.is_dir(): 182 raise FileNotFoundError(f"Artifacts directory not found: {artifacts_dir}") 183 184 # Fail fast if the connector directory name doesn't match dockerRepository. 185 # A mismatch would publish artifacts under the wrong GCS path and corrupt 186 # the registry (duplicate definition-IDs under different directory names). 187 mismatch_error = _check_connector_name_matches_docker_repo( 188 connector_name, artifacts_dir 189 ) 190 if mismatch_error: 191 raise ValueError(mismatch_error) 192 193 # Build the GCS destination path 194 bucket_name = store.bucket 195 prefix = store.prefix 196 blob_root = versioned_blob_root( 197 connector_name=connector_name, version=version, store=store 198 ) 199 versioned_dest = f"gcs://{bucket_name}/{blob_root}" 200 201 target_label = f"{bucket_name}/{prefix}" if prefix else bucket_name 202 should_publish_rollout_marker = _metadata_enables_progressive_rollout(artifacts_dir) 203 result = PublishArtifactsResult( 204 connector_name=connector_name, 205 version=version, 206 target=target_label, 207 gcs_destination=versioned_dest, 208 dry_run=dry_run, 209 ) 210 211 # --- Pre-publish validation --- 212 if with_validate: 213 metadata_file = artifacts_dir / "metadata.yaml" 214 if metadata_file.is_file(): 215 raw_metadata = yaml.safe_load(metadata_file.read_text()) 216 metadata_data = (raw_metadata or {}).get("data", {}) 217 validation = validate_metadata(metadata_data=metadata_data) 218 if not validation.passed: 219 for err in validation.errors: 220 logger.error("Pre-publish validation error: %s", err) 221 result.validation_errors = validation.errors 222 return result 223 logger.info( 224 "Pre-publish validation passed (%d validators).", 225 validation.validators_run, 226 ) 227 else: 228 logger.warning("No metadata.yaml in artifacts dir; skipping validation.") 229 230 # Enumerate local files 231 local_files = sorted(f for f in artifacts_dir.rglob("*") if f.is_file()) 232 if not local_files: 233 result.errors.append(f"No files found in {artifacts_dir}.") 234 return result 235 236 _log_progress( 237 "Publishing %d artifacts for %s@%s → %s", 238 len(local_files), 239 connector_name, 240 version, 241 versioned_dest, 242 ) 243 244 # Build references used by both dry-run and real upload paths 245 deps_file = artifacts_dir / CONNECTOR_DEPENDENCY_FILE_NAME 246 has_deps = deps_file.is_file() 247 deps_gcs_key = dependencies_blob_path( 248 connector_name=connector_name, version=version, store=store 249 ) 250 251 sbom_file = artifacts_dir / SBOM_FILE_NAME 252 has_sbom = sbom_file.is_file() 253 rollout_marker_path = f"{blob_root}/{PROGRESSIVE_ROLLOUT_MARKER_FILE}" 254 255 if dry_run: 256 for f in local_files: 257 rel = f.relative_to(artifacts_dir) 258 result.files_uploaded.append(str(rel)) 259 _log_progress(" [DRY RUN] would upload: %s", rel) 260 # Report the dual-load of dependencies.json to connector_dependencies/ 261 if has_deps: 262 result.files_uploaded.append(deps_gcs_key) 263 _log_progress( 264 " [DRY RUN] would also dual-load: %s → gs://%s/%s", 265 CONNECTOR_DEPENDENCY_FILE_NAME, 266 bucket_name, 267 deps_gcs_key, 268 ) 269 # Report the separate sbom/ upload 270 if has_sbom: 271 sbom_gcs_key = sbom_blob_path( 272 connector_name=connector_name, 273 version=version, 274 store=store, 275 ) 276 result.files_uploaded.append(sbom_gcs_key) 277 _log_progress( 278 " [DRY RUN] would also upload: %s → gs://%s/%s", 279 SBOM_FILE_NAME, 280 bucket_name, 281 sbom_gcs_key, 282 ) 283 if should_publish_rollout_marker: 284 result.files_uploaded.append(PROGRESSIVE_ROLLOUT_MARKER_FILE) 285 _log_progress( 286 " [DRY RUN] would write active marker: gs://%s/%s", 287 bucket_name, 288 rollout_marker_path, 289 ) 290 return result 291 292 # Authenticate 293 token = get_gcs_credentials_token() 294 fs = gcsfs.GCSFileSystem(token=token) 295 296 # Strip gcs:// prefix for gcsfs path 297 dest_path = versioned_dest.replace("gcs://", "") 298 299 # Upload all files to the versioned path 300 _log_progress("Uploading to: %s", versioned_dest) 301 for f in local_files: 302 rel = f.relative_to(artifacts_dir) 303 remote_path = f"{dest_path}/{rel}" 304 fs.put(str(f), remote_path) 305 result.files_uploaded.append(str(rel)) 306 _log_progress(" Uploaded: %s", rel) 307 308 # Delete remote files that don't exist locally (sync semantics) 309 try: 310 remote_files = fs.ls(dest_path, detail=False) 311 local_rel_paths = {str(f.relative_to(artifacts_dir)) for f in local_files} 312 for remote_file in remote_files: 313 # Skip the directory entry itself if it appears in the listing 314 if remote_file == dest_path: 315 continue 316 # Derive the remote relative path, matching upload semantics 317 if remote_file.startswith(dest_path + "/"): 318 remote_rel = remote_file[len(dest_path) + 1 :] 319 else: 320 remote_rel = remote_file.split("/")[-1] 321 if is_registry_state_marker_file(Path(remote_rel).name): 322 continue 323 if remote_rel not in local_rel_paths: 324 fs.rm(remote_file) 325 _log_progress(" Deleted stale remote file: %s", remote_rel) 326 except FileNotFoundError: 327 pass # Destination doesn't exist yet, nothing to clean 328 329 _log_progress("Uploaded %d files to %s", len(local_files), versioned_dest) 330 331 if should_publish_rollout_marker: 332 marker_remote = f"{bucket_name}/{rollout_marker_path}" 333 with fs.open(marker_remote, "w") as marker_file: 334 marker_file.write(_progressive_rollout_marker_content()) 335 result.files_uploaded.append(PROGRESSIVE_ROLLOUT_MARKER_FILE) 336 _log_progress("Wrote active marker: gs://%s", marker_remote) 337 338 # --- Dual-load dependencies.json to the connector_dependencies/ path --- 339 if not has_deps: 340 logger.debug( 341 "No %s in artifacts dir — skipping dual-load.", 342 CONNECTOR_DEPENDENCY_FILE_NAME, 343 ) 344 else: 345 deps_remote = f"{bucket_name}/{deps_gcs_key}" 346 _log_progress( 347 "Dual-loading %s to gs://%s", 348 CONNECTOR_DEPENDENCY_FILE_NAME, 349 deps_remote, 350 ) 351 fs.put(str(deps_file), deps_remote) 352 result.files_uploaded.append(deps_gcs_key) 353 _log_progress(" Uploaded %s (dual-load)", CONNECTOR_DEPENDENCY_FILE_NAME) 354 355 # --- Upload SBOM to the dedicated sbom/ path in GCS --- 356 if not has_sbom: 357 logger.debug( 358 "No %s in artifacts dir — skipping SBOM dual-load.", 359 SBOM_FILE_NAME, 360 ) 361 else: 362 sbom_gcs_uri = upload_sbom( 363 sbom_path=sbom_file, 364 connector_name=connector_name, 365 version=version, 366 store=store, 367 dry_run=dry_run, 368 ) 369 result.files_uploaded.append( 370 sbom_blob_path( 371 connector_name=connector_name, 372 version=version, 373 store=store, 374 ), 375 ) 376 _log_progress("Uploaded SBOM to dedicated path: %s", sbom_gcs_uri) 377 378 return result
Publish locally generated artifacts to a GCS registry bucket.
Uses gcsfs.GCSFileSystem to upload the local artifacts_dir to the
versioned path inside the target GCS bucket.
The target GCS path is:
gs://<bucket>/[<prefix>/]metadata/airbyte/<connector>/<version>/
Before uploading, this function validates that the connector_name
(derived from the connector directory) matches the dockerRepository
declared in metadata.yaml. A mismatch would cause the registry
compile step to see duplicate definition-ID entries and fail.
Arguments:
- connector_name: Connector name (e.g.
source-faker). - version: Version string (e.g.
6.2.38). - artifacts_dir: Local directory containing artifacts from
generate. - store: Parsed store target containing bucket, prefix, and stage info.
- dry_run: If
True, report what would be uploaded without writing. - with_validate: If
True(default), validate metadata before uploading. PassFalse(--no-validate) to skip.
Returns:
A
PublishArtifactsResultdescribing what was published.
Raises:
- ValueError: If the connector directory name does not match
dockerRepositoryin the generated metadata.
1468def purge_latest_dirs( 1469 *, 1470 store: RegistryStore, 1471 connector_name: list[str] | None = None, 1472 dry_run: bool = False, 1473) -> PurgeLatestResult: 1474 """Delete all `latest/` directories from the registry store. 1475 1476 Discovers connector directories via glob, then deletes each 1477 `latest/` subdirectory in parallel using a thread pool. 1478 1479 Args: 1480 store: Registry store (bucket + optional prefix). 1481 connector_name: If provided, only purge these connectors. 1482 dry_run: If True, report what would be done without deleting. 1483 1484 Returns: 1485 A `PurgeLatestResult` describing what was done. 1486 """ 1487 result = PurgeLatestResult(target=store.bucket_root, dry_run=dry_run) 1488 1489 token = get_gcs_credentials_token() 1490 fs = gcsfs.GCSFileSystem(token=token) 1491 1492 base = f"{store.bucket_root}/{METADATA_FOLDER}/airbyte" 1493 1494 # Discover latest/ dirs by listing connector directories that contain 1495 # a `latest/` subdirectory. 1496 _log_progress("Discovering latest/ directories...") 1497 base_with_slash = f"{base}/" 1498 if connector_name: 1499 # Check each requested connector for a latest/ dir 1500 seen: set[str] = set() 1501 connectors_with_latest: list[str] = [] 1502 for name in connector_name: 1503 if name in seen: 1504 continue 1505 latest_path = f"{base}/{name}/latest" 1506 if fs.exists(latest_path): 1507 connectors_with_latest.append(name) 1508 seen.add(name) 1509 else: 1510 # Glob for all connectors, then filter to those with latest/ 1511 all_connector_dirs = fs.glob(f"{base}/*/latest") 1512 seen = set() 1513 connectors_with_latest = [] 1514 for path in all_connector_dirs: 1515 # Strip the known base prefix and take the first component 1516 if not path.startswith(base_with_slash): 1517 logger.warning("Could not parse latest path: %s", path) 1518 continue 1519 relative = path[len(base_with_slash) :] 1520 connector = relative.split("/")[0] 1521 if connector and connector not in seen: 1522 connectors_with_latest.append(connector) 1523 seen.add(connector) 1524 1525 result.connectors_found = len(connectors_with_latest) 1526 _log_progress( 1527 "Found %d connectors with latest/ directories", 1528 result.connectors_found, 1529 ) 1530 1531 if not connectors_with_latest: 1532 _log_progress("Nothing to purge.") 1533 _log_progress(result.summary()) 1534 return result 1535 1536 if dry_run: 1537 for connector in sorted(connectors_with_latest): 1538 _log_progress(" [DRY RUN] Would delete %s/latest/", connector) 1539 result.latest_dirs_deleted = len(connectors_with_latest) 1540 _log_progress(result.summary()) 1541 return result 1542 1543 # Delete latest/ dirs in parallel using the shared helper. 1544 def _delete_one(connector: str) -> str | None: 1545 """Delete a single connector's latest/ dir. Returns error string or None.""" 1546 try: 1547 _delete_latest_dir( 1548 fs, 1549 store=store, 1550 connector=connector, 1551 ) 1552 return None 1553 except Exception as exc: 1554 return f"Failed to delete latest/ for {connector}: {exc}" 1555 1556 _log_progress( 1557 "Deleting %d latest/ directories (max_workers=%d)...", 1558 len(connectors_with_latest), 1559 _PURGE_LATEST_MAX_WORKERS, 1560 ) 1561 1562 with ThreadPoolExecutor(max_workers=_PURGE_LATEST_MAX_WORKERS) as pool: 1563 futures = { 1564 pool.submit(_delete_one, c): c for c in sorted(connectors_with_latest) 1565 } 1566 for i, future in enumerate(as_completed(futures), 1): 1567 connector = futures[future] 1568 error = future.result() 1569 if error: 1570 logger.error(error) 1571 result.errors.append(error) 1572 else: 1573 result.latest_dirs_deleted += 1 1574 if i % 100 == 0: 1575 _log_progress(" Deleted %d / %d...", i, len(connectors_with_latest)) 1576 1577 _log_progress(result.summary()) 1578 return result
Delete all latest/ directories from the registry store.
Discovers connector directories via glob, then deletes each
latest/ subdirectory in parallel using a thread pool.
Arguments:
- store: Registry store (bucket + optional prefix).
- connector_name: If provided, only purge these connectors.
- dry_run: If True, report what would be done without deleting.
Returns:
A
PurgeLatestResultdescribing what was done.
147def rebuild_registry( 148 source_bucket: str, 149 output_mode: OutputMode, 150 output_path_root: str | None = None, 151 gcs_bucket: str | None = None, 152 s3_bucket: str | None = None, 153 dry_run: bool = False, 154 connector_name: list[str] | None = None, 155) -> RebuildResult: 156 """Rebuild the entire registry from a source GCS bucket to an output target. 157 158 Reads all connector metadata blobs from the source GCS bucket and copies 159 them to the output target using fsspec for unified filesystem access. 160 161 The output targets are: 162 - local: Write to a local directory tree. 163 - gcs: Copy to a GCS bucket (must not be the prod bucket). 164 - s3: Copy to an S3 bucket. 165 166 Args: 167 source_bucket: The GCS bucket to read from (typically prod). 168 output_mode: Where to write: "local", "gcs", or "s3". 169 output_path_root: Root path/prefix for output. For local mode, if None 170 creates a temp directory. For GCS/S3, prepended to all blob paths. 171 gcs_bucket: Target GCS bucket name (required if output_mode="gcs"). 172 s3_bucket: Target S3 bucket name (required if output_mode="s3"). 173 dry_run: If True, report what would be done without writing. 174 connector_name: If provided, only rebuild these connector names 175 (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all. 176 177 Returns: 178 RebuildResult with details of the operation. 179 180 Raises: 181 ValueError: If target is the prod bucket, or required bucket arg is missing. 182 """ 183 if output_mode == "gcs" and gcs_bucket: 184 _validate_not_prod_bucket(gcs_bucket) 185 186 # Resolve output root for local mode 187 effective_output_root = output_path_root or "" 188 if output_mode == "local": 189 effective_output_root = _resolve_local_output_root(output_path_root) 190 191 result = RebuildResult( 192 source_bucket=source_bucket, 193 output_mode=output_mode, 194 output_root=effective_output_root, 195 dry_run=dry_run, 196 ) 197 198 # Create source filesystem (always GCS) 199 source_fs, gcs_token = _make_source_fs() 200 source_base = f"{source_bucket}/{METADATA_FOLDER}" 201 202 # List files under metadata/ in the source bucket 203 if connector_name: 204 _log_progress( 205 "Listing blobs for %d connectors under gs://%s/...", 206 len(connector_name), 207 source_base, 208 ) 209 source_paths: list[str] = [] 210 for name in connector_name: 211 connector_prefix = f"{source_base}/airbyte/{name}" 212 found = source_fs.find(connector_prefix) 213 source_paths.extend(found) 214 _log_progress(" %s: %d blobs", name, len(found)) 215 else: 216 _log_progress("Listing all blobs under gs://%s/...", source_base) 217 source_paths = source_fs.find(source_base) 218 219 if not source_paths: 220 _log_progress("No blobs found under gs://%s/", source_base) 221 return result 222 223 total_blobs = len(source_paths) 224 _log_progress("Found %d blobs to process", total_blobs) 225 226 # Create output filesystem 227 output_fs, output_base = _make_output_fs( 228 output_mode=output_mode, 229 output_root=effective_output_root, 230 gcs_bucket=gcs_bucket, 231 s3_bucket=s3_bucket, 232 gcs_token=gcs_token, 233 ) 234 235 # Collect connector names and compute relative paths for all blobs. 236 bucket_prefix = f"{source_bucket}/" 237 blob_relative_paths: list[str] = [] 238 connector_names: set[str] = set() 239 240 for source_path in source_paths: 241 relative_path = source_path 242 if source_path.startswith(bucket_prefix): 243 relative_path = source_path[len(bucket_prefix) :] 244 blob_relative_paths.append(relative_path) 245 246 parts = relative_path.split("/") 247 if len(parts) >= 3: 248 connector_names.add(parts[2]) 249 250 if dry_run: 251 result.blobs_copied = total_blobs 252 result.connectors_processed = len(connector_names) 253 _log_progress( 254 "[DRY RUN] Would copy %d blobs (%d connectors)", 255 total_blobs, 256 len(connector_names), 257 ) 258 _log_progress(result.summary()) 259 return result 260 261 # Use GCS-native server-side copy for GCS→GCS mirrors. 262 if output_mode == "gcs" and gcs_bucket: 263 result = _gcs_native_copy( 264 source_bucket=source_bucket, 265 dest_bucket_name=gcs_bucket, 266 dest_prefix=effective_output_root, 267 blob_relative_paths=blob_relative_paths, 268 connector_names=connector_names, 269 gcs_token=gcs_token, 270 result=result, 271 connector_name_filter=connector_name, 272 ) 273 return result 274 275 # Fallback: fsspec-based copy for local and S3 output modes. 276 result = _fsspec_copy( 277 source_fs=source_fs, 278 source_paths=source_paths, 279 output_fs=output_fs, 280 output_base=output_base, 281 output_mode=output_mode, 282 source_bucket=source_bucket, 283 blob_relative_paths=blob_relative_paths, 284 connector_names=connector_names, 285 result=result, 286 ) 287 return result
Rebuild the entire registry from a source GCS bucket to an output target.
Reads all connector metadata blobs from the source GCS bucket and copies them to the output target using fsspec for unified filesystem access.
The output targets are:
- local: Write to a local directory tree.
- gcs: Copy to a GCS bucket (must not be the prod bucket).
- s3: Copy to an S3 bucket.
Arguments:
- source_bucket: The GCS bucket to read from (typically prod).
- output_mode: Where to write: "local", "gcs", or "s3".
- output_path_root: Root path/prefix for output. For local mode, if None creates a temp directory. For GCS/S3, prepended to all blob paths.
- gcs_bucket: Target GCS bucket name (required if output_mode="gcs").
- s3_bucket: Target S3 bucket name (required if output_mode="s3").
- dry_run: If True, report what would be done without writing.
- connector_name: If provided, only rebuild these connector names (e.g. ["source-faker", "destination-bigquery"]). If None, rebuilds all.
Returns:
RebuildResult with details of the operation.
Raises:
- ValueError: If target is the prod bucket, or required bucket arg is missing.
229def resolve_registry_store( 230 store: str | None = None, 231 connector_name: str | None = None, 232 cwd: Path | None = None, 233 default_env: str = "dev", 234) -> RegistryStore: 235 """Resolve a `RegistryStore` from CLI inputs. 236 237 All applicable detection methods are evaluated. Explicit sources 238 (`--store`, then the `AIRBYTE_REGISTRY_STORE` env var) take priority 239 and are returned directly. When only auto-detected sources remain, they 240 are compared and a `ValueError` is raised if they disagree. 241 242 Priority (highest → lowest): 243 244 1. **Explicit** `--store` argument (e.g. `"coral:dev"`). 245 2. **Environment variable** -- `AIRBYTE_REGISTRY_STORE`. 246 3. **Auto-detected** -- connector name and/or working directory. 247 If both are present and disagree, a `ValueError` is raised. 248 249 Args: 250 store: Explicit store target string (e.g. `"coral:dev"`). 251 connector_name: Optional connector name for auto-detection. 252 cwd: Working directory for repo-based detection. 253 default_env: Environment to use when auto-detecting (default `"dev"`). 254 255 Returns: 256 A fully resolved `RegistryStore`. 257 258 Raises: 259 ValueError: If no detection method succeeds, or if auto-detected 260 methods produce conflicting store types. 261 """ 262 # -- Collect all detection results ------------------------------------ 263 # Explicit sources (take priority — no conflict checking needed). 264 explicit_target: RegistryStore | None = None 265 explicit_source: str | None = None 266 267 if store is not None: 268 explicit_target = RegistryStore.parse(store) 269 explicit_source = "--store" 270 271 env_store = os.environ.get(REGISTRY_STORE_ENV_VAR) 272 if env_store and explicit_target is None: 273 explicit_target = RegistryStore.parse(env_store) 274 explicit_source = REGISTRY_STORE_ENV_VAR 275 276 # Auto-detected sources (only consulted when no explicit source). 277 auto_detections: dict[str, StoreType] = {} 278 279 if connector_name is not None: 280 auto_detections["connector_name"] = StoreType.get_from_connector_name( 281 connector_name, 282 ) 283 284 dir_type = StoreType.detect_from_repo_dir(cwd) 285 if dir_type is not None: 286 auto_detections["working_directory"] = dir_type 287 288 # -- Return explicit source if present -------------------------------- 289 if explicit_target is not None: 290 logger.debug( 291 "Using explicit store target from %s: %s:%s", 292 explicit_source, 293 explicit_target.store_type.value, 294 explicit_target.env, 295 ) 296 return explicit_target 297 298 # -- No explicit source: resolve from auto-detections ----------------- 299 if not auto_detections: 300 raise ValueError( 301 "Cannot determine registry store. " 302 "Provide --store (e.g. 'coral:dev' or 'sonar:prod'), " 303 f"set ${REGISTRY_STORE_ENV_VAR}, " 304 "or run from a recognized repository directory." 305 ) 306 307 distinct = set(auto_detections.values()) 308 309 if len(distinct) > 1: 310 detail = ", ".join(f"{src}={st.value}" for src, st in auto_detections.items()) 311 raise ValueError( 312 f"Conflicting store types detected: {detail}. " 313 "Provide an explicit --store to resolve the ambiguity." 314 ) 315 316 resolved_type = distinct.pop() 317 logger.info( 318 "Auto-detected store type '%s' (sources: %s)", 319 resolved_type.value, 320 ", ".join(auto_detections), 321 ) 322 return RegistryStore(store_type=resolved_type, env=default_env)
Resolve a RegistryStore from CLI inputs.
All applicable detection methods are evaluated. Explicit sources
(--store, then the AIRBYTE_REGISTRY_STORE env var) take priority
and are returned directly. When only auto-detected sources remain, they
are compared and a ValueError is raised if they disagree.
Priority (highest → lowest):
- Explicit
--storeargument (e.g."coral:dev"). - Environment variable --
AIRBYTE_REGISTRY_STORE. - Auto-detected -- connector name and/or working directory.
If both are present and disagree, a
ValueErroris raised.
Arguments:
- store: Explicit store target string (e.g.
"coral:dev"). - connector_name: Optional connector name for auto-detection.
- cwd: Working directory for repo-based detection.
- default_env: Environment to use when auto-detecting (default
"dev").
Returns:
A fully resolved
RegistryStore.
Raises:
- ValueError: If no detection method succeeds, or if auto-detected methods produce conflicting store types.
160def unyank_connector_version( 161 connector_name: str, 162 version: str, 163 bucket_name: str, 164 dry_run: bool = False, 165) -> YankResult: 166 """Rename the active yank marker to an unyanked audit marker. 167 168 Moves the active version-yank.yml marker at: 169 metadata/airbyte/{connector_name}/{version}/version-yank.yml 170 to: 171 metadata/airbyte/{connector_name}/{version}/version-unyanked-yyyymmdd.yml 172 173 Args: 174 connector_name: The connector name (e.g., "source-faker"). 175 version: The version to unyank (e.g., "1.2.3"). 176 bucket_name: The GCS bucket name. 177 dry_run: If True, report what would be done without writing. 178 179 Returns: 180 YankResult with details of the operation. 181 """ 182 yank_path = _get_yank_blob_path(connector_name, version) 183 184 storage_client = get_gcs_storage_client() 185 bucket = storage_client.bucket(bucket_name) 186 187 # Check if yank marker exists 188 yank_blob = bucket.blob(yank_path) 189 if not yank_blob.exists(): 190 return YankResult( 191 connector_name=connector_name, 192 version=version, 193 bucket_name=bucket_name, 194 action="unyank", 195 success=False, 196 message=f"Version {version} of {connector_name} is not yanked.", 197 dry_run=dry_run, 198 ) 199 200 if dry_run: 201 return YankResult( 202 connector_name=connector_name, 203 version=version, 204 bucket_name=bucket_name, 205 action="unyank", 206 success=True, 207 message=f"[DRY RUN] Would unyank {connector_name} {version}.", 208 dry_run=True, 209 ) 210 211 unyanked_path = _get_yank_blob_path(connector_name, version).replace( 212 YANK_FILE_NAME, 213 unyanked_marker_file(), 214 ) 215 bucket.copy_blob(yank_blob, bucket, new_name=unyanked_path) 216 yank_blob.delete() 217 218 logger.info("Unyanked %s version %s in %s", connector_name, version, bucket_name) 219 220 return YankResult( 221 connector_name=connector_name, 222 version=version, 223 bucket_name=bucket_name, 224 action="unyank", 225 success=True, 226 message=f"Successfully unyanked {connector_name} {version}.", 227 )
Rename the active yank marker to an unyanked audit marker.
Moves the active version-yank.yml marker at: metadata/airbyte/{connector_name}/{version}/version-yank.yml to: metadata/airbyte/{connector_name}/{version}/version-unyanked-yyyymmdd.yml
Arguments:
- connector_name: The connector name (e.g., "source-faker").
- version: The version to unyank (e.g., "1.2.3").
- bucket_name: The GCS bucket name.
- dry_run: If True, report what would be done without writing.
Returns:
YankResult with details of the operation.
270def validate_metadata( 271 metadata_data: dict[str, Any], 272 opts: ValidateOptions | None = None, 273) -> ValidationResult: 274 """Run all pre-publish validators against raw `metadata.data`. 275 276 Args: 277 metadata_data: The `data` section of a parsed `metadata.yaml`. 278 opts: Options influencing validation behaviour. 279 280 Returns: 281 A `ValidationResult` with aggregate pass/fail and error list. 282 """ 283 if opts is None: 284 opts = ValidateOptions() 285 286 result = ValidationResult() 287 288 for validator in PRE_PUBLISH_VALIDATORS: 289 result.validators_run += 1 290 logger.info("Running validator: %s", validator.__name__) 291 passed, error = validator(metadata_data, opts) 292 if not passed and error: 293 logger.error("Validation failed: %s", error) 294 result.add_error(error) 295 296 return result
Run all pre-publish validators against raw metadata.data.
Arguments:
- metadata_data: The
datasection of a parsedmetadata.yaml. - opts: Options influencing validation behaviour.
Returns:
A
ValidationResultwith aggregate pass/fail and error list.
64def yank_connector_version( 65 connector_name: str, 66 version: str, 67 bucket_name: str, 68 reason: str = "", 69 approval_url: str = "", 70 dry_run: bool = False, 71) -> YankResult: 72 """Mark a connector version as yanked by writing a version-yank.yml marker. 73 74 The marker file is placed at: 75 metadata/airbyte/{connector_name}/{version}/version-yank.yml 76 77 Args: 78 connector_name: The connector name (e.g., "source-faker"). 79 version: The version to yank (e.g., "1.2.3"). 80 bucket_name: The GCS bucket name. 81 reason: Optional reason for yanking the version. 82 approval_url: Optional approval evidence URL to record in the marker. 83 dry_run: If True, report what would be done without writing. 84 85 Returns: 86 YankResult with details of the operation. 87 88 Raises: 89 ValueError: If the bucket is the production bucket and no override is set, 90 or if the version does not exist. 91 """ 92 yank_path = _get_yank_blob_path(connector_name, version) 93 metadata_path = _get_metadata_blob_path(connector_name, version) 94 95 storage_client = get_gcs_storage_client() 96 bucket = storage_client.bucket(bucket_name) 97 98 # Verify the version exists 99 metadata_blob = bucket.blob(metadata_path) 100 if not metadata_blob.exists(): 101 return YankResult( 102 connector_name=connector_name, 103 version=version, 104 bucket_name=bucket_name, 105 action="yank", 106 success=False, 107 message=f"Version {version} not found for {connector_name} in {bucket_name}.", 108 dry_run=dry_run, 109 ) 110 111 # Check if already yanked 112 yank_blob = bucket.blob(yank_path) 113 if yank_blob.exists(): 114 return YankResult( 115 connector_name=connector_name, 116 version=version, 117 bucket_name=bucket_name, 118 action="yank", 119 success=False, 120 message=f"Version {version} of {connector_name} is already yanked.", 121 dry_run=dry_run, 122 ) 123 124 if dry_run: 125 return YankResult( 126 connector_name=connector_name, 127 version=version, 128 bucket_name=bucket_name, 129 action="yank", 130 success=True, 131 message=f"[DRY RUN] Would yank {connector_name} {version}.", 132 dry_run=True, 133 ) 134 135 # Write the yank marker file 136 yank_content: dict[str, Any] = { 137 "yanked": True, 138 "yanked_at": datetime.now(tz=timezone.utc).isoformat(), 139 } 140 if reason: 141 yank_content["reason"] = reason 142 if approval_url: 143 yank_content["approval_url"] = approval_url 144 145 yank_yaml = yaml.dump(yank_content, default_flow_style=False) 146 yank_blob.upload_from_string(yank_yaml, content_type="application/x-yaml") 147 148 logger.info("Yanked %s version %s in %s", connector_name, version, bucket_name) 149 150 return YankResult( 151 connector_name=connector_name, 152 version=version, 153 bucket_name=bucket_name, 154 action="yank", 155 success=True, 156 message=f"Successfully yanked {connector_name} {version}.", 157 )
Mark a connector version as yanked by writing a version-yank.yml marker.
The marker file is placed at:
metadata/airbyte/{connector_name}/{version}/version-yank.yml
Arguments:
- connector_name: The connector name (e.g., "source-faker").
- version: The version to yank (e.g., "1.2.3").
- bucket_name: The GCS bucket name.
- reason: Optional reason for yanking the version.
- approval_url: Optional approval evidence URL to record in the marker.
- dry_run: If True, report what would be done without writing.
Returns:
YankResult with details of the operation.
Raises:
- ValueError: If the bucket is the production bucket and no override is set, or if the version does not exist.