airbyte_cdk.sources.declarative.manifest_declarative_source
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5import json 6import logging 7import pkgutil 8from copy import deepcopy 9from importlib import metadata 10from types import ModuleType 11from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set 12 13import orjson 14import yaml 15from jsonschema.exceptions import ValidationError 16from jsonschema.validators import validate 17from packaging.version import InvalidVersion, Version 18 19from airbyte_cdk.config_observation import create_connector_config_control_message 20from airbyte_cdk.connector_builder.models import ( 21 LogMessage as ConnectorBuilderLogMessage, 22) 23from airbyte_cdk.manifest_migrations.migration_handler import ( 24 ManifestMigrationHandler, 25) 26from airbyte_cdk.models import ( 27 AirbyteConnectionStatus, 28 AirbyteMessage, 29 AirbyteStateMessage, 30 ConfiguredAirbyteCatalog, 31 ConnectorSpecification, 32 FailureType, 33) 34from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer 35from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 36from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 37from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 39 DeclarativeStream as DeclarativeStreamModel, 40) 41from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 42 Spec as SpecModel, 43) 44from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 45 StateDelegatingStream as StateDelegatingStreamModel, 46) 47from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 48 get_registered_components_module, 49) 50from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 51 ManifestComponentTransformer, 52) 53from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( 54 ManifestNormalizer, 55) 56from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 57 ManifestReferenceResolver, 58) 59from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 60 ModelToComponentFactory, 61) 62from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 63from airbyte_cdk.sources.declarative.spec.spec import Spec 64from airbyte_cdk.sources.message import MessageRepository 65from airbyte_cdk.sources.streams.core import Stream 66from airbyte_cdk.sources.types import Config, ConnectionDefinition 67from airbyte_cdk.sources.utils.slice_logger import ( 68 AlwaysLogSliceLogger, 69 DebugSliceLogger, 70 SliceLogger, 71) 72from airbyte_cdk.utils.traced_exception import AirbyteTracedException 73 74 75def _get_declarative_component_schema() -> Dict[str, Any]: 76 try: 77 raw_component_schema = pkgutil.get_data( 78 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 79 ) 80 if raw_component_schema is not None: 81 declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) 82 return declarative_component_schema # type: ignore 83 else: 84 raise RuntimeError( 85 "Failed to read manifest component json schema required for deduplication" 86 ) 87 except FileNotFoundError as e: 88 raise FileNotFoundError( 89 f"Failed to read manifest component json schema required for deduplication: {e}" 90 ) 91 92 93class ManifestDeclarativeSource(DeclarativeSource): 94 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 95 96 def __init__( 97 self, 98 source_config: ConnectionDefinition, 99 *, 100 config: Mapping[str, Any] | None = None, 101 debug: bool = False, 102 emit_connector_builder_messages: bool = False, 103 component_factory: Optional[ModelToComponentFactory] = None, 104 migrate_manifest: Optional[bool] = False, 105 normalize_manifest: Optional[bool] = False, 106 config_path: Optional[str] = None, 107 ) -> None: 108 """ 109 Args: 110 config: The provided config dict. 111 source_config: The manifest of low-code components that describe the source connector. 112 debug: True if debug mode is enabled. 113 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 114 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 115 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 116 config_path: Optional path to the config file. 117 """ 118 self.logger = logging.getLogger(f"airbyte.{self.name}") 119 self._should_normalize = normalize_manifest 120 self._should_migrate = migrate_manifest 121 self._declarative_component_schema = _get_declarative_component_schema() 122 # If custom components are needed, locate and/or register them. 123 self.components_module: ModuleType | None = get_registered_components_module(config=config) 124 # set additional attributes 125 self._debug = debug 126 self._emit_connector_builder_messages = emit_connector_builder_messages 127 self._constructor = ( 128 component_factory 129 if component_factory 130 else ModelToComponentFactory( 131 emit_connector_builder_messages, 132 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 133 ) 134 ) 135 self._message_repository = self._constructor.get_message_repository() 136 self._slice_logger: SliceLogger = ( 137 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 138 ) 139 140 # resolve all components in the manifest 141 self._source_config = self._pre_process_manifest(dict(source_config)) 142 # validate resolved manifest against the declarative component schema 143 self._validate_source() 144 # apply additional post-processing to the manifest 145 self._post_process_manifest() 146 147 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 148 self._spec_component: Optional[Spec] = ( 149 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 150 ) 151 self._config = self._migrate_and_transform_config(config_path, config) or {} 152 153 @property 154 def resolved_manifest(self) -> Mapping[str, Any]: 155 """ 156 Returns the resolved manifest configuration for the source. 157 158 This property provides access to the internal source configuration as a mapping, 159 which contains all settings and parameters required to define the source's behavior. 160 161 Returns: 162 Mapping[str, Any]: The resolved source configuration manifest. 163 """ 164 return self._source_config 165 166 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 167 """ 168 Preprocesses the provided manifest dictionary by resolving any manifest references. 169 170 This method modifies the input manifest in place, resolving references using the 171 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 172 173 Args: 174 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 175 176 Returns: 177 None 178 """ 179 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 180 manifest = self._fix_source_type(manifest) 181 # Resolve references in the manifest 182 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 183 # Propagate types and parameters throughout the manifest 184 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 185 "", resolved_manifest, {} 186 ) 187 188 return propagated_manifest 189 190 def _post_process_manifest(self) -> None: 191 """ 192 Post-processes the manifest after validation. 193 This method is responsible for any additional modifications or transformations needed 194 after the manifest has been validated and before it is used in the source. 195 """ 196 # apply manifest migration, if required 197 self._migrate_manifest() 198 # apply manifest normalization, if required 199 self._normalize_manifest() 200 201 def _normalize_manifest(self) -> None: 202 """ 203 This method is used to normalize the manifest. It should be called after the manifest has been validated. 204 205 Connector Builder UI rendering requires the manifest to be in a specific format. 206 - references have been resolved 207 - the commonly used definitions are extracted to the `definitions.linked.*` 208 """ 209 if self._should_normalize: 210 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 211 self._source_config = normalizer.normalize() 212 213 def _migrate_and_transform_config( 214 self, 215 config_path: Optional[str], 216 config: Optional[Config], 217 ) -> Optional[Config]: 218 if not config: 219 return None 220 if not self._spec_component: 221 return config 222 mutable_config = dict(config) 223 self._spec_component.migrate_config(mutable_config) 224 if mutable_config != config: 225 if config_path: 226 with open(config_path, "w") as f: 227 json.dump(mutable_config, f) 228 self.message_repository.emit_message( 229 create_connector_config_control_message(mutable_config) 230 ) 231 # We have no mechanism for consuming the queue, so we print the messages to stdout 232 for message in self.message_repository.consume_queue(): 233 print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) 234 self._spec_component.transform_config(mutable_config) 235 return mutable_config 236 237 def _migrate_manifest(self) -> None: 238 """ 239 This method is used to migrate the manifest. It should be called after the manifest has been validated. 240 The migration is done in place, so the original manifest is modified. 241 242 The original manifest is returned if any error occurs during migration. 243 """ 244 if self._should_migrate: 245 manifest_migrator = ManifestMigrationHandler(self._source_config) 246 self._source_config = manifest_migrator.apply_migrations() 247 # validate migrated manifest against the declarative component schema 248 self._validate_source() 249 250 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 251 """ 252 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 253 """ 254 if "type" not in manifest: 255 manifest["type"] = "DeclarativeSource" 256 257 return manifest 258 259 @property 260 def message_repository(self) -> MessageRepository: 261 return self._message_repository 262 263 @property 264 def dynamic_streams(self) -> List[Dict[str, Any]]: 265 return self._dynamic_stream_configs( 266 manifest=self._source_config, 267 config=self._config, 268 with_dynamic_stream_name=True, 269 ) 270 271 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 272 return self._constructor.get_model_deprecations() 273 274 @property 275 def connection_checker(self) -> ConnectionChecker: 276 check = self._source_config["check"] 277 if "type" not in check: 278 check["type"] = "CheckStream" 279 check_stream = self._constructor.create_component( 280 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 281 check, 282 dict(), 283 emit_connector_builder_messages=self._emit_connector_builder_messages, 284 ) 285 if isinstance(check_stream, ConnectionChecker): 286 return check_stream 287 else: 288 raise ValueError( 289 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 290 ) 291 292 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 293 if self._spec_component: 294 self._spec_component.validate_config(config) 295 296 self._emit_manifest_debug_message( 297 extra_args={ 298 "source_name": self.name, 299 "parsed_config": json.dumps(self._source_config), 300 } 301 ) 302 303 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 304 305 api_budget_model = self._source_config.get("api_budget") 306 if api_budget_model: 307 self._constructor.set_api_budget(api_budget_model, config) 308 309 source_streams = [ 310 self._constructor.create_component( 311 ( 312 StateDelegatingStreamModel 313 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 314 else DeclarativeStreamModel 315 ), 316 stream_config, 317 config, 318 emit_connector_builder_messages=self._emit_connector_builder_messages, 319 ) 320 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 321 ] 322 323 return source_streams 324 325 @staticmethod 326 def _initialize_cache_for_parent_streams( 327 stream_configs: List[Dict[str, Any]], 328 ) -> List[Dict[str, Any]]: 329 parent_streams = set() 330 331 def update_with_cache_parent_configs( 332 parent_configs: list[dict[str, Any]], 333 ) -> None: 334 for parent_config in parent_configs: 335 parent_streams.add(parent_config["stream"]["name"]) 336 if parent_config["stream"]["type"] == "StateDelegatingStream": 337 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 338 "use_cache" 339 ] = True 340 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 341 "use_cache" 342 ] = True 343 else: 344 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 345 346 for stream_config in stream_configs: 347 if stream_config.get("incremental_sync", {}).get("parent_stream"): 348 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 349 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 350 "use_cache" 351 ] = True 352 353 elif stream_config.get("retriever", {}).get("partition_router", {}): 354 partition_router = stream_config["retriever"]["partition_router"] 355 356 if isinstance(partition_router, dict) and partition_router.get( 357 "parent_stream_configs" 358 ): 359 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 360 elif isinstance(partition_router, list): 361 for router in partition_router: 362 if router.get("parent_stream_configs"): 363 update_with_cache_parent_configs(router["parent_stream_configs"]) 364 365 for stream_config in stream_configs: 366 if stream_config["name"] in parent_streams: 367 if stream_config["type"] == "StateDelegatingStream": 368 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 369 True 370 ) 371 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 372 True 373 ) 374 else: 375 stream_config["retriever"]["requester"]["use_cache"] = True 376 377 return stream_configs 378 379 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 380 """ 381 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 382 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 383 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 384 in the project root. 385 """ 386 self._configure_logger_level(logger) 387 self._emit_manifest_debug_message( 388 extra_args={ 389 "source_name": self.name, 390 "parsed_config": json.dumps(self._source_config), 391 } 392 ) 393 394 return ( 395 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 396 ) 397 398 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 399 self._configure_logger_level(logger) 400 return super().check(logger, config) 401 402 def read( 403 self, 404 logger: logging.Logger, 405 config: Mapping[str, Any], 406 catalog: ConfiguredAirbyteCatalog, 407 state: Optional[List[AirbyteStateMessage]] = None, 408 ) -> Iterator[AirbyteMessage]: 409 self._configure_logger_level(logger) 410 yield from super().read(logger, config, catalog, state) 411 412 def _configure_logger_level(self, logger: logging.Logger) -> None: 413 """ 414 Set the log level to logging.DEBUG if debug mode is enabled 415 """ 416 if self._debug: 417 logger.setLevel(logging.DEBUG) 418 419 def _validate_source(self) -> None: 420 """ 421 Validates the connector manifest against the declarative component schema 422 """ 423 424 try: 425 validate(self._source_config, self._declarative_component_schema) 426 except ValidationError as e: 427 raise ValidationError( 428 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 429 ) from e 430 431 cdk_version_str = metadata.version("airbyte_cdk") 432 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 433 manifest_version_str = self._source_config.get("version") 434 if manifest_version_str is None: 435 raise RuntimeError( 436 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 437 ) 438 manifest_version = self._parse_version(manifest_version_str, "manifest") 439 440 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 441 # Skipping version compatibility check on unreleased dev branch 442 pass 443 elif (cdk_version.major, cdk_version.minor) < ( 444 manifest_version.major, 445 manifest_version.minor, 446 ): 447 raise ValidationError( 448 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 449 f"manifest may contain features that are not in the current CDK version." 450 ) 451 elif (manifest_version.major, manifest_version.minor) < (0, 29): 452 raise ValidationError( 453 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 454 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 455 f"{cdk_version!s} which contains these breaking changes." 456 ) 457 458 @staticmethod 459 def _parse_version( 460 version: str, 461 version_type: str, 462 ) -> Version: 463 """Takes a semantic version represented as a string and splits it into a tuple. 464 465 The fourth part (prerelease) is not returned in the tuple. 466 467 Returns: 468 Version: the parsed version object 469 """ 470 try: 471 parsed_version = Version(version) 472 except InvalidVersion as ex: 473 raise ValidationError( 474 f"The {version_type} version '{version}' is not a valid version format." 475 ) from ex 476 else: 477 # No exception 478 return parsed_version 479 480 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 481 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 482 stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) 483 for s in stream_configs: 484 if "type" not in s: 485 s["type"] = "DeclarativeStream" 486 return stream_configs 487 488 def _dynamic_stream_configs( 489 self, 490 manifest: Mapping[str, Any], 491 config: Mapping[str, Any], 492 with_dynamic_stream_name: Optional[bool] = None, 493 ) -> List[Dict[str, Any]]: 494 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 495 dynamic_stream_configs: List[Dict[str, Any]] = [] 496 seen_dynamic_streams: Set[str] = set() 497 498 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 499 components_resolver_config = dynamic_definition["components_resolver"] 500 501 if not components_resolver_config: 502 raise ValueError( 503 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 504 ) 505 506 resolver_type = components_resolver_config.get("type") 507 if not resolver_type: 508 raise ValueError( 509 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 510 ) 511 512 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 513 raise ValueError( 514 f"Invalid components resolver type '{resolver_type}'. " 515 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 516 ) 517 518 if "retriever" in components_resolver_config: 519 components_resolver_config["retriever"]["requester"]["use_cache"] = True 520 521 # Create a resolver for dynamic components based on type 522 components_resolver = self._constructor.create_component( 523 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 524 components_resolver_config, 525 config, 526 ) 527 528 stream_template_config = dynamic_definition["stream_template"] 529 530 for dynamic_stream in components_resolver.resolve_components( 531 stream_template_config=stream_template_config 532 ): 533 dynamic_stream = { 534 **ManifestComponentTransformer().propagate_types_and_parameters( 535 "", dynamic_stream, {}, use_parent_parameters=True 536 ) 537 } 538 539 if "type" not in dynamic_stream: 540 dynamic_stream["type"] = "DeclarativeStream" 541 542 # Ensure that each stream is created with a unique name 543 name = dynamic_stream.get("name") 544 545 if with_dynamic_stream_name: 546 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 547 "name", f"dynamic_stream_{dynamic_definition_index}" 548 ) 549 550 if not isinstance(name, str): 551 raise ValueError( 552 f"Expected stream name {name} to be a string, got {type(name)}." 553 ) 554 555 if name in seen_dynamic_streams: 556 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 557 failure_type = FailureType.system_error 558 559 if resolver_type == "ConfigComponentsResolver": 560 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 561 failure_type = FailureType.config_error 562 563 raise AirbyteTracedException( 564 message=error_message, 565 internal_message=error_message, 566 failure_type=failure_type, 567 ) 568 569 seen_dynamic_streams.add(name) 570 dynamic_stream_configs.append(dynamic_stream) 571 572 return dynamic_stream_configs 573 574 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 575 self.logger.debug("declarative source created from manifest", extra=extra_args)
94class ManifestDeclarativeSource(DeclarativeSource): 95 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 96 97 def __init__( 98 self, 99 source_config: ConnectionDefinition, 100 *, 101 config: Mapping[str, Any] | None = None, 102 debug: bool = False, 103 emit_connector_builder_messages: bool = False, 104 component_factory: Optional[ModelToComponentFactory] = None, 105 migrate_manifest: Optional[bool] = False, 106 normalize_manifest: Optional[bool] = False, 107 config_path: Optional[str] = None, 108 ) -> None: 109 """ 110 Args: 111 config: The provided config dict. 112 source_config: The manifest of low-code components that describe the source connector. 113 debug: True if debug mode is enabled. 114 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 115 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 116 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 117 config_path: Optional path to the config file. 118 """ 119 self.logger = logging.getLogger(f"airbyte.{self.name}") 120 self._should_normalize = normalize_manifest 121 self._should_migrate = migrate_manifest 122 self._declarative_component_schema = _get_declarative_component_schema() 123 # If custom components are needed, locate and/or register them. 124 self.components_module: ModuleType | None = get_registered_components_module(config=config) 125 # set additional attributes 126 self._debug = debug 127 self._emit_connector_builder_messages = emit_connector_builder_messages 128 self._constructor = ( 129 component_factory 130 if component_factory 131 else ModelToComponentFactory( 132 emit_connector_builder_messages, 133 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 134 ) 135 ) 136 self._message_repository = self._constructor.get_message_repository() 137 self._slice_logger: SliceLogger = ( 138 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 139 ) 140 141 # resolve all components in the manifest 142 self._source_config = self._pre_process_manifest(dict(source_config)) 143 # validate resolved manifest against the declarative component schema 144 self._validate_source() 145 # apply additional post-processing to the manifest 146 self._post_process_manifest() 147 148 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 149 self._spec_component: Optional[Spec] = ( 150 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 151 ) 152 self._config = self._migrate_and_transform_config(config_path, config) or {} 153 154 @property 155 def resolved_manifest(self) -> Mapping[str, Any]: 156 """ 157 Returns the resolved manifest configuration for the source. 158 159 This property provides access to the internal source configuration as a mapping, 160 which contains all settings and parameters required to define the source's behavior. 161 162 Returns: 163 Mapping[str, Any]: The resolved source configuration manifest. 164 """ 165 return self._source_config 166 167 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 168 """ 169 Preprocesses the provided manifest dictionary by resolving any manifest references. 170 171 This method modifies the input manifest in place, resolving references using the 172 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 173 174 Args: 175 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 176 177 Returns: 178 None 179 """ 180 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 181 manifest = self._fix_source_type(manifest) 182 # Resolve references in the manifest 183 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 184 # Propagate types and parameters throughout the manifest 185 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 186 "", resolved_manifest, {} 187 ) 188 189 return propagated_manifest 190 191 def _post_process_manifest(self) -> None: 192 """ 193 Post-processes the manifest after validation. 194 This method is responsible for any additional modifications or transformations needed 195 after the manifest has been validated and before it is used in the source. 196 """ 197 # apply manifest migration, if required 198 self._migrate_manifest() 199 # apply manifest normalization, if required 200 self._normalize_manifest() 201 202 def _normalize_manifest(self) -> None: 203 """ 204 This method is used to normalize the manifest. It should be called after the manifest has been validated. 205 206 Connector Builder UI rendering requires the manifest to be in a specific format. 207 - references have been resolved 208 - the commonly used definitions are extracted to the `definitions.linked.*` 209 """ 210 if self._should_normalize: 211 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 212 self._source_config = normalizer.normalize() 213 214 def _migrate_and_transform_config( 215 self, 216 config_path: Optional[str], 217 config: Optional[Config], 218 ) -> Optional[Config]: 219 if not config: 220 return None 221 if not self._spec_component: 222 return config 223 mutable_config = dict(config) 224 self._spec_component.migrate_config(mutable_config) 225 if mutable_config != config: 226 if config_path: 227 with open(config_path, "w") as f: 228 json.dump(mutable_config, f) 229 self.message_repository.emit_message( 230 create_connector_config_control_message(mutable_config) 231 ) 232 # We have no mechanism for consuming the queue, so we print the messages to stdout 233 for message in self.message_repository.consume_queue(): 234 print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) 235 self._spec_component.transform_config(mutable_config) 236 return mutable_config 237 238 def _migrate_manifest(self) -> None: 239 """ 240 This method is used to migrate the manifest. It should be called after the manifest has been validated. 241 The migration is done in place, so the original manifest is modified. 242 243 The original manifest is returned if any error occurs during migration. 244 """ 245 if self._should_migrate: 246 manifest_migrator = ManifestMigrationHandler(self._source_config) 247 self._source_config = manifest_migrator.apply_migrations() 248 # validate migrated manifest against the declarative component schema 249 self._validate_source() 250 251 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 252 """ 253 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 254 """ 255 if "type" not in manifest: 256 manifest["type"] = "DeclarativeSource" 257 258 return manifest 259 260 @property 261 def message_repository(self) -> MessageRepository: 262 return self._message_repository 263 264 @property 265 def dynamic_streams(self) -> List[Dict[str, Any]]: 266 return self._dynamic_stream_configs( 267 manifest=self._source_config, 268 config=self._config, 269 with_dynamic_stream_name=True, 270 ) 271 272 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 273 return self._constructor.get_model_deprecations() 274 275 @property 276 def connection_checker(self) -> ConnectionChecker: 277 check = self._source_config["check"] 278 if "type" not in check: 279 check["type"] = "CheckStream" 280 check_stream = self._constructor.create_component( 281 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 282 check, 283 dict(), 284 emit_connector_builder_messages=self._emit_connector_builder_messages, 285 ) 286 if isinstance(check_stream, ConnectionChecker): 287 return check_stream 288 else: 289 raise ValueError( 290 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 291 ) 292 293 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 294 if self._spec_component: 295 self._spec_component.validate_config(config) 296 297 self._emit_manifest_debug_message( 298 extra_args={ 299 "source_name": self.name, 300 "parsed_config": json.dumps(self._source_config), 301 } 302 ) 303 304 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 305 306 api_budget_model = self._source_config.get("api_budget") 307 if api_budget_model: 308 self._constructor.set_api_budget(api_budget_model, config) 309 310 source_streams = [ 311 self._constructor.create_component( 312 ( 313 StateDelegatingStreamModel 314 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 315 else DeclarativeStreamModel 316 ), 317 stream_config, 318 config, 319 emit_connector_builder_messages=self._emit_connector_builder_messages, 320 ) 321 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 322 ] 323 324 return source_streams 325 326 @staticmethod 327 def _initialize_cache_for_parent_streams( 328 stream_configs: List[Dict[str, Any]], 329 ) -> List[Dict[str, Any]]: 330 parent_streams = set() 331 332 def update_with_cache_parent_configs( 333 parent_configs: list[dict[str, Any]], 334 ) -> None: 335 for parent_config in parent_configs: 336 parent_streams.add(parent_config["stream"]["name"]) 337 if parent_config["stream"]["type"] == "StateDelegatingStream": 338 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 339 "use_cache" 340 ] = True 341 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 342 "use_cache" 343 ] = True 344 else: 345 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 346 347 for stream_config in stream_configs: 348 if stream_config.get("incremental_sync", {}).get("parent_stream"): 349 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 350 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 351 "use_cache" 352 ] = True 353 354 elif stream_config.get("retriever", {}).get("partition_router", {}): 355 partition_router = stream_config["retriever"]["partition_router"] 356 357 if isinstance(partition_router, dict) and partition_router.get( 358 "parent_stream_configs" 359 ): 360 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 361 elif isinstance(partition_router, list): 362 for router in partition_router: 363 if router.get("parent_stream_configs"): 364 update_with_cache_parent_configs(router["parent_stream_configs"]) 365 366 for stream_config in stream_configs: 367 if stream_config["name"] in parent_streams: 368 if stream_config["type"] == "StateDelegatingStream": 369 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 370 True 371 ) 372 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 373 True 374 ) 375 else: 376 stream_config["retriever"]["requester"]["use_cache"] = True 377 378 return stream_configs 379 380 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 381 """ 382 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 383 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 384 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 385 in the project root. 386 """ 387 self._configure_logger_level(logger) 388 self._emit_manifest_debug_message( 389 extra_args={ 390 "source_name": self.name, 391 "parsed_config": json.dumps(self._source_config), 392 } 393 ) 394 395 return ( 396 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 397 ) 398 399 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 400 self._configure_logger_level(logger) 401 return super().check(logger, config) 402 403 def read( 404 self, 405 logger: logging.Logger, 406 config: Mapping[str, Any], 407 catalog: ConfiguredAirbyteCatalog, 408 state: Optional[List[AirbyteStateMessage]] = None, 409 ) -> Iterator[AirbyteMessage]: 410 self._configure_logger_level(logger) 411 yield from super().read(logger, config, catalog, state) 412 413 def _configure_logger_level(self, logger: logging.Logger) -> None: 414 """ 415 Set the log level to logging.DEBUG if debug mode is enabled 416 """ 417 if self._debug: 418 logger.setLevel(logging.DEBUG) 419 420 def _validate_source(self) -> None: 421 """ 422 Validates the connector manifest against the declarative component schema 423 """ 424 425 try: 426 validate(self._source_config, self._declarative_component_schema) 427 except ValidationError as e: 428 raise ValidationError( 429 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 430 ) from e 431 432 cdk_version_str = metadata.version("airbyte_cdk") 433 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 434 manifest_version_str = self._source_config.get("version") 435 if manifest_version_str is None: 436 raise RuntimeError( 437 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 438 ) 439 manifest_version = self._parse_version(manifest_version_str, "manifest") 440 441 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 442 # Skipping version compatibility check on unreleased dev branch 443 pass 444 elif (cdk_version.major, cdk_version.minor) < ( 445 manifest_version.major, 446 manifest_version.minor, 447 ): 448 raise ValidationError( 449 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 450 f"manifest may contain features that are not in the current CDK version." 451 ) 452 elif (manifest_version.major, manifest_version.minor) < (0, 29): 453 raise ValidationError( 454 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 455 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 456 f"{cdk_version!s} which contains these breaking changes." 457 ) 458 459 @staticmethod 460 def _parse_version( 461 version: str, 462 version_type: str, 463 ) -> Version: 464 """Takes a semantic version represented as a string and splits it into a tuple. 465 466 The fourth part (prerelease) is not returned in the tuple. 467 468 Returns: 469 Version: the parsed version object 470 """ 471 try: 472 parsed_version = Version(version) 473 except InvalidVersion as ex: 474 raise ValidationError( 475 f"The {version_type} version '{version}' is not a valid version format." 476 ) from ex 477 else: 478 # No exception 479 return parsed_version 480 481 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 482 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 483 stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) 484 for s in stream_configs: 485 if "type" not in s: 486 s["type"] = "DeclarativeStream" 487 return stream_configs 488 489 def _dynamic_stream_configs( 490 self, 491 manifest: Mapping[str, Any], 492 config: Mapping[str, Any], 493 with_dynamic_stream_name: Optional[bool] = None, 494 ) -> List[Dict[str, Any]]: 495 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 496 dynamic_stream_configs: List[Dict[str, Any]] = [] 497 seen_dynamic_streams: Set[str] = set() 498 499 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 500 components_resolver_config = dynamic_definition["components_resolver"] 501 502 if not components_resolver_config: 503 raise ValueError( 504 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 505 ) 506 507 resolver_type = components_resolver_config.get("type") 508 if not resolver_type: 509 raise ValueError( 510 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 511 ) 512 513 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 514 raise ValueError( 515 f"Invalid components resolver type '{resolver_type}'. " 516 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 517 ) 518 519 if "retriever" in components_resolver_config: 520 components_resolver_config["retriever"]["requester"]["use_cache"] = True 521 522 # Create a resolver for dynamic components based on type 523 components_resolver = self._constructor.create_component( 524 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 525 components_resolver_config, 526 config, 527 ) 528 529 stream_template_config = dynamic_definition["stream_template"] 530 531 for dynamic_stream in components_resolver.resolve_components( 532 stream_template_config=stream_template_config 533 ): 534 dynamic_stream = { 535 **ManifestComponentTransformer().propagate_types_and_parameters( 536 "", dynamic_stream, {}, use_parent_parameters=True 537 ) 538 } 539 540 if "type" not in dynamic_stream: 541 dynamic_stream["type"] = "DeclarativeStream" 542 543 # Ensure that each stream is created with a unique name 544 name = dynamic_stream.get("name") 545 546 if with_dynamic_stream_name: 547 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 548 "name", f"dynamic_stream_{dynamic_definition_index}" 549 ) 550 551 if not isinstance(name, str): 552 raise ValueError( 553 f"Expected stream name {name} to be a string, got {type(name)}." 554 ) 555 556 if name in seen_dynamic_streams: 557 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 558 failure_type = FailureType.system_error 559 560 if resolver_type == "ConfigComponentsResolver": 561 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 562 failure_type = FailureType.config_error 563 564 raise AirbyteTracedException( 565 message=error_message, 566 internal_message=error_message, 567 failure_type=failure_type, 568 ) 569 570 seen_dynamic_streams.add(name) 571 dynamic_stream_configs.append(dynamic_stream) 572 573 return dynamic_stream_configs 574 575 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 576 self.logger.debug("declarative source created from manifest", extra=extra_args)
Declarative source defined by a manifest of low-code components that define source connector behavior
97 def __init__( 98 self, 99 source_config: ConnectionDefinition, 100 *, 101 config: Mapping[str, Any] | None = None, 102 debug: bool = False, 103 emit_connector_builder_messages: bool = False, 104 component_factory: Optional[ModelToComponentFactory] = None, 105 migrate_manifest: Optional[bool] = False, 106 normalize_manifest: Optional[bool] = False, 107 config_path: Optional[str] = None, 108 ) -> None: 109 """ 110 Args: 111 config: The provided config dict. 112 source_config: The manifest of low-code components that describe the source connector. 113 debug: True if debug mode is enabled. 114 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 115 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 116 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 117 config_path: Optional path to the config file. 118 """ 119 self.logger = logging.getLogger(f"airbyte.{self.name}") 120 self._should_normalize = normalize_manifest 121 self._should_migrate = migrate_manifest 122 self._declarative_component_schema = _get_declarative_component_schema() 123 # If custom components are needed, locate and/or register them. 124 self.components_module: ModuleType | None = get_registered_components_module(config=config) 125 # set additional attributes 126 self._debug = debug 127 self._emit_connector_builder_messages = emit_connector_builder_messages 128 self._constructor = ( 129 component_factory 130 if component_factory 131 else ModelToComponentFactory( 132 emit_connector_builder_messages, 133 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 134 ) 135 ) 136 self._message_repository = self._constructor.get_message_repository() 137 self._slice_logger: SliceLogger = ( 138 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 139 ) 140 141 # resolve all components in the manifest 142 self._source_config = self._pre_process_manifest(dict(source_config)) 143 # validate resolved manifest against the declarative component schema 144 self._validate_source() 145 # apply additional post-processing to the manifest 146 self._post_process_manifest() 147 148 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 149 self._spec_component: Optional[Spec] = ( 150 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 151 ) 152 self._config = self._migrate_and_transform_config(config_path, config) or {}
Arguments:
- config: The provided config dict.
- source_config: The manifest of low-code components that describe the source connector.
- debug: True if debug mode is enabled.
- emit_connector_builder_messages: True if messages should be emitted to the connector builder.
- component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
- normalize_manifest: Optional flag to indicate if the manifest should be normalized.
- config_path: Optional path to the config file.
154 @property 155 def resolved_manifest(self) -> Mapping[str, Any]: 156 """ 157 Returns the resolved manifest configuration for the source. 158 159 This property provides access to the internal source configuration as a mapping, 160 which contains all settings and parameters required to define the source's behavior. 161 162 Returns: 163 Mapping[str, Any]: The resolved source configuration manifest. 164 """ 165 return self._source_config
Returns the resolved manifest configuration for the source.
This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.
Returns:
Mapping[str, Any]: The resolved source configuration manifest.
272 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 273 return self._constructor.get_model_deprecations()
Returns a list of deprecation warnings for the source.
275 @property 276 def connection_checker(self) -> ConnectionChecker: 277 check = self._source_config["check"] 278 if "type" not in check: 279 check["type"] = "CheckStream" 280 check_stream = self._constructor.create_component( 281 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 282 check, 283 dict(), 284 emit_connector_builder_messages=self._emit_connector_builder_messages, 285 ) 286 if isinstance(check_stream, ConnectionChecker): 287 return check_stream 288 else: 289 raise ValueError( 290 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 291 )
Returns the ConnectionChecker to use for the check
operation
293 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 294 if self._spec_component: 295 self._spec_component.validate_config(config) 296 297 self._emit_manifest_debug_message( 298 extra_args={ 299 "source_name": self.name, 300 "parsed_config": json.dumps(self._source_config), 301 } 302 ) 303 304 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 305 306 api_budget_model = self._source_config.get("api_budget") 307 if api_budget_model: 308 self._constructor.set_api_budget(api_budget_model, config) 309 310 source_streams = [ 311 self._constructor.create_component( 312 ( 313 StateDelegatingStreamModel 314 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 315 else DeclarativeStreamModel 316 ), 317 stream_config, 318 config, 319 emit_connector_builder_messages=self._emit_connector_builder_messages, 320 ) 321 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 322 ] 323 324 return source_streams
Parameters
- config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns
A list of the streams in this source connector.
380 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 381 """ 382 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 383 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 384 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 385 in the project root. 386 """ 387 self._configure_logger_level(logger) 388 self._emit_manifest_debug_message( 389 extra_args={ 390 "source_name": self.name, 391 "parsed_config": json.dumps(self._source_config), 392 } 393 ) 394 395 return ( 396 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 397 )
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
399 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 400 self._configure_logger_level(logger) 401 return super().check(logger, config)
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
403 def read( 404 self, 405 logger: logging.Logger, 406 config: Mapping[str, Any], 407 catalog: ConfiguredAirbyteCatalog, 408 state: Optional[List[AirbyteStateMessage]] = None, 409 ) -> Iterator[AirbyteMessage]: 410 self._configure_logger_level(logger) 411 yield from super().read(logger, config, catalog, state)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.