airbyte_cdk.sources.declarative.manifest_declarative_source
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5import json 6import logging 7import pkgutil 8from copy import deepcopy 9from importlib import metadata 10from types import ModuleType 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set 12 13import orjson 14import yaml 15from jsonschema.exceptions import ValidationError 16from jsonschema.validators import validate 17from packaging.version import InvalidVersion, Version 18 19from airbyte_cdk.config_observation import create_connector_config_control_message 20from airbyte_cdk.connector_builder.models import ( 21 LogMessage as ConnectorBuilderLogMessage, 22) 23from airbyte_cdk.manifest_migrations.migration_handler import ( 24 ManifestMigrationHandler, 25) 26from airbyte_cdk.models import ( 27 AirbyteConnectionStatus, 28 AirbyteMessage, 29 AirbyteStateMessage, 30 ConfiguredAirbyteCatalog, 31 ConnectorSpecification, 32 FailureType, 33) 34from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer 35from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 36from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 37from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource 38from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean 39from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 40 ConditionalStreams as ConditionalStreamsModel, 41) 42from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 43 DeclarativeStream as DeclarativeStreamModel, 44) 45from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 46 Spec as SpecModel, 47) 48from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 49 StateDelegatingStream as StateDelegatingStreamModel, 50) 51from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 52 get_registered_components_module, 53) 54from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 55 ManifestComponentTransformer, 56) 57from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( 58 ManifestNormalizer, 59) 60from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 61 ManifestReferenceResolver, 62) 63from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 64 ModelToComponentFactory, 65) 66from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 67from airbyte_cdk.sources.declarative.spec.spec import Spec 68from airbyte_cdk.sources.message import MessageRepository 69from airbyte_cdk.sources.streams.core import Stream 70from airbyte_cdk.sources.types import Config, ConnectionDefinition 71from airbyte_cdk.sources.utils.slice_logger import ( 72 AlwaysLogSliceLogger, 73 DebugSliceLogger, 74 SliceLogger, 75) 76from airbyte_cdk.utils.traced_exception import AirbyteTracedException 77 78 79def _get_declarative_component_schema() -> Dict[str, Any]: 80 try: 81 raw_component_schema = pkgutil.get_data( 82 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 83 ) 84 if raw_component_schema is not None: 85 declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) 86 return declarative_component_schema # type: ignore 87 else: 88 raise RuntimeError( 89 "Failed to read manifest component json schema required for deduplication" 90 ) 91 except FileNotFoundError as e: 92 raise FileNotFoundError( 93 f"Failed to read manifest component json schema required for deduplication: {e}" 94 ) 95 96 97class ManifestDeclarativeSource(DeclarativeSource): 98 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 99 100 def __init__( 101 self, 102 source_config: ConnectionDefinition, 103 *, 104 config: Mapping[str, Any] | None = None, 105 debug: bool = False, 106 emit_connector_builder_messages: bool = False, 107 component_factory: Optional[ModelToComponentFactory] = None, 108 migrate_manifest: Optional[bool] = False, 109 normalize_manifest: Optional[bool] = False, 110 config_path: Optional[str] = None, 111 ) -> None: 112 """ 113 Args: 114 config: The provided config dict. 115 source_config: The manifest of low-code components that describe the source connector. 116 debug: True if debug mode is enabled. 117 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 118 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 119 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 120 config_path: Optional path to the config file. 121 """ 122 self.logger = logging.getLogger(f"airbyte.{self.name}") 123 self._should_normalize = normalize_manifest 124 self._should_migrate = migrate_manifest 125 self._declarative_component_schema = _get_declarative_component_schema() 126 # If custom components are needed, locate and/or register them. 127 self.components_module: ModuleType | None = get_registered_components_module(config=config) 128 # set additional attributes 129 self._debug = debug 130 self._emit_connector_builder_messages = emit_connector_builder_messages 131 self._constructor = ( 132 component_factory 133 if component_factory 134 else ModelToComponentFactory( 135 emit_connector_builder_messages=emit_connector_builder_messages, 136 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 137 ) 138 ) 139 self._message_repository = self._constructor.get_message_repository() 140 self._slice_logger: SliceLogger = ( 141 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 142 ) 143 144 # resolve all components in the manifest 145 self._source_config = self._pre_process_manifest(dict(source_config)) 146 # validate resolved manifest against the declarative component schema 147 self._validate_source() 148 # apply additional post-processing to the manifest 149 self._post_process_manifest() 150 151 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 152 self._spec_component: Optional[Spec] = ( 153 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 154 ) 155 self._config = self._migrate_and_transform_config(config_path, config) or {} 156 157 @property 158 def resolved_manifest(self) -> Mapping[str, Any]: 159 """ 160 Returns the resolved manifest configuration for the source. 161 162 This property provides access to the internal source configuration as a mapping, 163 which contains all settings and parameters required to define the source's behavior. 164 165 Returns: 166 Mapping[str, Any]: The resolved source configuration manifest. 167 """ 168 return self._source_config 169 170 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 171 """ 172 Preprocesses the provided manifest dictionary by resolving any manifest references. 173 174 This method modifies the input manifest in place, resolving references using the 175 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 176 177 Args: 178 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 179 180 Returns: 181 None 182 """ 183 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 184 manifest = self._fix_source_type(manifest) 185 # Resolve references in the manifest 186 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 187 # Propagate types and parameters throughout the manifest 188 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 189 "", resolved_manifest, {} 190 ) 191 192 return propagated_manifest 193 194 def _post_process_manifest(self) -> None: 195 """ 196 Post-processes the manifest after validation. 197 This method is responsible for any additional modifications or transformations needed 198 after the manifest has been validated and before it is used in the source. 199 """ 200 # apply manifest migration, if required 201 self._migrate_manifest() 202 # apply manifest normalization, if required 203 self._normalize_manifest() 204 205 def _normalize_manifest(self) -> None: 206 """ 207 This method is used to normalize the manifest. It should be called after the manifest has been validated. 208 209 Connector Builder UI rendering requires the manifest to be in a specific format. 210 - references have been resolved 211 - the commonly used definitions are extracted to the `definitions.linked.*` 212 """ 213 if self._should_normalize: 214 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 215 self._source_config = normalizer.normalize() 216 217 def _migrate_and_transform_config( 218 self, 219 config_path: Optional[str], 220 config: Optional[Config], 221 ) -> Optional[Config]: 222 if not config: 223 return None 224 if not self._spec_component: 225 return config 226 mutable_config = dict(config) 227 self._spec_component.migrate_config(mutable_config) 228 if mutable_config != config: 229 if config_path: 230 with open(config_path, "w") as f: 231 json.dump(mutable_config, f) 232 self.message_repository.emit_message( 233 create_connector_config_control_message(mutable_config) 234 ) 235 # We have no mechanism for consuming the queue, so we print the messages to stdout 236 for message in self.message_repository.consume_queue(): 237 print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) 238 self._spec_component.transform_config(mutable_config) 239 return mutable_config 240 241 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 242 config = self._config or config 243 return super().configure(config, temp_dir) 244 245 def _migrate_manifest(self) -> None: 246 """ 247 This method is used to migrate the manifest. It should be called after the manifest has been validated. 248 The migration is done in place, so the original manifest is modified. 249 250 The original manifest is returned if any error occurs during migration. 251 """ 252 if self._should_migrate: 253 manifest_migrator = ManifestMigrationHandler(self._source_config) 254 self._source_config = manifest_migrator.apply_migrations() 255 # validate migrated manifest against the declarative component schema 256 self._validate_source() 257 258 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 259 """ 260 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 261 """ 262 if "type" not in manifest: 263 manifest["type"] = "DeclarativeSource" 264 265 return manifest 266 267 @property 268 def message_repository(self) -> MessageRepository: 269 return self._message_repository 270 271 @property 272 def dynamic_streams(self) -> List[Dict[str, Any]]: 273 return self._dynamic_stream_configs( 274 manifest=self._source_config, 275 config=self._config, 276 with_dynamic_stream_name=True, 277 ) 278 279 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 280 return self._constructor.get_model_deprecations() 281 282 @property 283 def connection_checker(self) -> ConnectionChecker: 284 check = self._source_config["check"] 285 if "type" not in check: 286 check["type"] = "CheckStream" 287 check_stream = self._constructor.create_component( 288 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 289 check, 290 dict(), 291 emit_connector_builder_messages=self._emit_connector_builder_messages, 292 ) 293 if isinstance(check_stream, ConnectionChecker): 294 return check_stream 295 else: 296 raise ValueError( 297 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 298 ) 299 300 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 301 if self._spec_component: 302 self._spec_component.validate_config(config) 303 304 self._emit_manifest_debug_message( 305 extra_args={ 306 "source_name": self.name, 307 "parsed_config": json.dumps(self._source_config), 308 } 309 ) 310 311 stream_configs = ( 312 self._stream_configs(self._source_config, config=config) + self.dynamic_streams 313 ) 314 315 api_budget_model = self._source_config.get("api_budget") 316 if api_budget_model: 317 self._constructor.set_api_budget(api_budget_model, config) 318 319 source_streams = [ 320 self._constructor.create_component( 321 ( 322 StateDelegatingStreamModel 323 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 324 else DeclarativeStreamModel 325 ), 326 stream_config, 327 config, 328 emit_connector_builder_messages=self._emit_connector_builder_messages, 329 ) 330 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 331 ] 332 return source_streams 333 334 @staticmethod 335 def _initialize_cache_for_parent_streams( 336 stream_configs: List[Dict[str, Any]], 337 ) -> List[Dict[str, Any]]: 338 parent_streams = set() 339 340 def update_with_cache_parent_configs( 341 parent_configs: list[dict[str, Any]], 342 ) -> None: 343 for parent_config in parent_configs: 344 parent_streams.add(parent_config["stream"]["name"]) 345 if parent_config["stream"]["type"] == "StateDelegatingStream": 346 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 347 "use_cache" 348 ] = True 349 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 350 "use_cache" 351 ] = True 352 else: 353 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 354 355 for stream_config in stream_configs: 356 if stream_config.get("incremental_sync", {}).get("parent_stream"): 357 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 358 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 359 "use_cache" 360 ] = True 361 362 elif stream_config.get("retriever", {}).get("partition_router", {}): 363 partition_router = stream_config["retriever"]["partition_router"] 364 365 if isinstance(partition_router, dict) and partition_router.get( 366 "parent_stream_configs" 367 ): 368 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 369 elif isinstance(partition_router, list): 370 for router in partition_router: 371 if router.get("parent_stream_configs"): 372 update_with_cache_parent_configs(router["parent_stream_configs"]) 373 374 for stream_config in stream_configs: 375 if stream_config["name"] in parent_streams: 376 if stream_config["type"] == "StateDelegatingStream": 377 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 378 True 379 ) 380 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 381 True 382 ) 383 else: 384 stream_config["retriever"]["requester"]["use_cache"] = True 385 return stream_configs 386 387 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 388 """ 389 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 390 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 391 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 392 in the project root. 393 """ 394 self._configure_logger_level(logger) 395 self._emit_manifest_debug_message( 396 extra_args={ 397 "source_name": self.name, 398 "parsed_config": json.dumps(self._source_config), 399 } 400 ) 401 402 return ( 403 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 404 ) 405 406 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 407 self._configure_logger_level(logger) 408 return super().check(logger, config) 409 410 def read( 411 self, 412 logger: logging.Logger, 413 config: Mapping[str, Any], 414 catalog: ConfiguredAirbyteCatalog, 415 state: Optional[List[AirbyteStateMessage]] = None, 416 ) -> Iterator[AirbyteMessage]: 417 self._configure_logger_level(logger) 418 yield from super().read(logger, config, catalog, state) 419 420 def _configure_logger_level(self, logger: logging.Logger) -> None: 421 """ 422 Set the log level to logging.DEBUG if debug mode is enabled 423 """ 424 if self._debug: 425 logger.setLevel(logging.DEBUG) 426 427 def _validate_source(self) -> None: 428 """ 429 Validates the connector manifest against the declarative component schema 430 """ 431 432 try: 433 validate(self._source_config, self._declarative_component_schema) 434 except ValidationError as e: 435 raise ValidationError( 436 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 437 ) from e 438 439 cdk_version_str = metadata.version("airbyte_cdk") 440 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 441 manifest_version_str = self._source_config.get("version") 442 if manifest_version_str is None: 443 raise RuntimeError( 444 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 445 ) 446 manifest_version = self._parse_version(manifest_version_str, "manifest") 447 448 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 449 # Skipping version compatibility check on unreleased dev branch 450 pass 451 elif (cdk_version.major, cdk_version.minor) < ( 452 manifest_version.major, 453 manifest_version.minor, 454 ): 455 raise ValidationError( 456 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 457 f"manifest may contain features that are not in the current CDK version." 458 ) 459 elif (manifest_version.major, manifest_version.minor) < (0, 29): 460 raise ValidationError( 461 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 462 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 463 f"{cdk_version!s} which contains these breaking changes." 464 ) 465 466 @staticmethod 467 def _parse_version( 468 version: str, 469 version_type: str, 470 ) -> Version: 471 """Takes a semantic version represented as a string and splits it into a tuple. 472 473 The fourth part (prerelease) is not returned in the tuple. 474 475 Returns: 476 Version: the parsed version object 477 """ 478 try: 479 parsed_version = Version(version) 480 except InvalidVersion as ex: 481 raise ValidationError( 482 f"The {version_type} version '{version}' is not a valid version format." 483 ) from ex 484 else: 485 # No exception 486 return parsed_version 487 488 def _stream_configs( 489 self, manifest: Mapping[str, Any], config: Mapping[str, Any] 490 ) -> List[Dict[str, Any]]: 491 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 492 stream_configs = [] 493 for current_stream_config in manifest.get("streams", []): 494 if ( 495 "type" in current_stream_config 496 and current_stream_config["type"] == "ConditionalStreams" 497 ): 498 interpolated_boolean = InterpolatedBoolean( 499 condition=current_stream_config.get("condition"), 500 parameters={}, 501 ) 502 503 if interpolated_boolean.eval(config=config): 504 stream_configs.extend(current_stream_config.get("streams", [])) 505 else: 506 if "type" not in current_stream_config: 507 current_stream_config["type"] = "DeclarativeStream" 508 stream_configs.append(current_stream_config) 509 return stream_configs 510 511 def _dynamic_stream_configs( 512 self, 513 manifest: Mapping[str, Any], 514 config: Mapping[str, Any], 515 with_dynamic_stream_name: Optional[bool] = None, 516 ) -> List[Dict[str, Any]]: 517 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 518 dynamic_stream_configs: List[Dict[str, Any]] = [] 519 seen_dynamic_streams: Set[str] = set() 520 521 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 522 components_resolver_config = dynamic_definition["components_resolver"] 523 524 if not components_resolver_config: 525 raise ValueError( 526 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 527 ) 528 529 resolver_type = components_resolver_config.get("type") 530 if not resolver_type: 531 raise ValueError( 532 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 533 ) 534 535 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 536 raise ValueError( 537 f"Invalid components resolver type '{resolver_type}'. " 538 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 539 ) 540 541 if "retriever" in components_resolver_config: 542 components_resolver_config["retriever"]["requester"]["use_cache"] = True 543 544 # Create a resolver for dynamic components based on type 545 components_resolver = self._constructor.create_component( 546 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 547 components_resolver_config, 548 config, 549 ) 550 551 stream_template_config = dynamic_definition["stream_template"] 552 553 for dynamic_stream in components_resolver.resolve_components( 554 stream_template_config=stream_template_config 555 ): 556 dynamic_stream = { 557 **ManifestComponentTransformer().propagate_types_and_parameters( 558 "", dynamic_stream, {}, use_parent_parameters=True 559 ) 560 } 561 562 if "type" not in dynamic_stream: 563 dynamic_stream["type"] = "DeclarativeStream" 564 565 # Ensure that each stream is created with a unique name 566 name = dynamic_stream.get("name") 567 568 if with_dynamic_stream_name: 569 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 570 "name", f"dynamic_stream_{dynamic_definition_index}" 571 ) 572 573 if not isinstance(name, str): 574 raise ValueError( 575 f"Expected stream name {name} to be a string, got {type(name)}." 576 ) 577 578 if name in seen_dynamic_streams: 579 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 580 failure_type = FailureType.system_error 581 582 if resolver_type == "ConfigComponentsResolver": 583 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 584 failure_type = FailureType.config_error 585 586 raise AirbyteTracedException( 587 message=error_message, 588 internal_message=error_message, 589 failure_type=failure_type, 590 ) 591 592 seen_dynamic_streams.add(name) 593 dynamic_stream_configs.append(dynamic_stream) 594 595 return dynamic_stream_configs 596 597 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 598 self.logger.debug("declarative source created from manifest", extra=extra_args)
98class ManifestDeclarativeSource(DeclarativeSource): 99 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 100 101 def __init__( 102 self, 103 source_config: ConnectionDefinition, 104 *, 105 config: Mapping[str, Any] | None = None, 106 debug: bool = False, 107 emit_connector_builder_messages: bool = False, 108 component_factory: Optional[ModelToComponentFactory] = None, 109 migrate_manifest: Optional[bool] = False, 110 normalize_manifest: Optional[bool] = False, 111 config_path: Optional[str] = None, 112 ) -> None: 113 """ 114 Args: 115 config: The provided config dict. 116 source_config: The manifest of low-code components that describe the source connector. 117 debug: True if debug mode is enabled. 118 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 119 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 120 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 121 config_path: Optional path to the config file. 122 """ 123 self.logger = logging.getLogger(f"airbyte.{self.name}") 124 self._should_normalize = normalize_manifest 125 self._should_migrate = migrate_manifest 126 self._declarative_component_schema = _get_declarative_component_schema() 127 # If custom components are needed, locate and/or register them. 128 self.components_module: ModuleType | None = get_registered_components_module(config=config) 129 # set additional attributes 130 self._debug = debug 131 self._emit_connector_builder_messages = emit_connector_builder_messages 132 self._constructor = ( 133 component_factory 134 if component_factory 135 else ModelToComponentFactory( 136 emit_connector_builder_messages=emit_connector_builder_messages, 137 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 138 ) 139 ) 140 self._message_repository = self._constructor.get_message_repository() 141 self._slice_logger: SliceLogger = ( 142 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 143 ) 144 145 # resolve all components in the manifest 146 self._source_config = self._pre_process_manifest(dict(source_config)) 147 # validate resolved manifest against the declarative component schema 148 self._validate_source() 149 # apply additional post-processing to the manifest 150 self._post_process_manifest() 151 152 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 153 self._spec_component: Optional[Spec] = ( 154 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 155 ) 156 self._config = self._migrate_and_transform_config(config_path, config) or {} 157 158 @property 159 def resolved_manifest(self) -> Mapping[str, Any]: 160 """ 161 Returns the resolved manifest configuration for the source. 162 163 This property provides access to the internal source configuration as a mapping, 164 which contains all settings and parameters required to define the source's behavior. 165 166 Returns: 167 Mapping[str, Any]: The resolved source configuration manifest. 168 """ 169 return self._source_config 170 171 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 172 """ 173 Preprocesses the provided manifest dictionary by resolving any manifest references. 174 175 This method modifies the input manifest in place, resolving references using the 176 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 177 178 Args: 179 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 180 181 Returns: 182 None 183 """ 184 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 185 manifest = self._fix_source_type(manifest) 186 # Resolve references in the manifest 187 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 188 # Propagate types and parameters throughout the manifest 189 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 190 "", resolved_manifest, {} 191 ) 192 193 return propagated_manifest 194 195 def _post_process_manifest(self) -> None: 196 """ 197 Post-processes the manifest after validation. 198 This method is responsible for any additional modifications or transformations needed 199 after the manifest has been validated and before it is used in the source. 200 """ 201 # apply manifest migration, if required 202 self._migrate_manifest() 203 # apply manifest normalization, if required 204 self._normalize_manifest() 205 206 def _normalize_manifest(self) -> None: 207 """ 208 This method is used to normalize the manifest. It should be called after the manifest has been validated. 209 210 Connector Builder UI rendering requires the manifest to be in a specific format. 211 - references have been resolved 212 - the commonly used definitions are extracted to the `definitions.linked.*` 213 """ 214 if self._should_normalize: 215 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 216 self._source_config = normalizer.normalize() 217 218 def _migrate_and_transform_config( 219 self, 220 config_path: Optional[str], 221 config: Optional[Config], 222 ) -> Optional[Config]: 223 if not config: 224 return None 225 if not self._spec_component: 226 return config 227 mutable_config = dict(config) 228 self._spec_component.migrate_config(mutable_config) 229 if mutable_config != config: 230 if config_path: 231 with open(config_path, "w") as f: 232 json.dump(mutable_config, f) 233 self.message_repository.emit_message( 234 create_connector_config_control_message(mutable_config) 235 ) 236 # We have no mechanism for consuming the queue, so we print the messages to stdout 237 for message in self.message_repository.consume_queue(): 238 print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) 239 self._spec_component.transform_config(mutable_config) 240 return mutable_config 241 242 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 243 config = self._config or config 244 return super().configure(config, temp_dir) 245 246 def _migrate_manifest(self) -> None: 247 """ 248 This method is used to migrate the manifest. It should be called after the manifest has been validated. 249 The migration is done in place, so the original manifest is modified. 250 251 The original manifest is returned if any error occurs during migration. 252 """ 253 if self._should_migrate: 254 manifest_migrator = ManifestMigrationHandler(self._source_config) 255 self._source_config = manifest_migrator.apply_migrations() 256 # validate migrated manifest against the declarative component schema 257 self._validate_source() 258 259 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 260 """ 261 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 262 """ 263 if "type" not in manifest: 264 manifest["type"] = "DeclarativeSource" 265 266 return manifest 267 268 @property 269 def message_repository(self) -> MessageRepository: 270 return self._message_repository 271 272 @property 273 def dynamic_streams(self) -> List[Dict[str, Any]]: 274 return self._dynamic_stream_configs( 275 manifest=self._source_config, 276 config=self._config, 277 with_dynamic_stream_name=True, 278 ) 279 280 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 281 return self._constructor.get_model_deprecations() 282 283 @property 284 def connection_checker(self) -> ConnectionChecker: 285 check = self._source_config["check"] 286 if "type" not in check: 287 check["type"] = "CheckStream" 288 check_stream = self._constructor.create_component( 289 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 290 check, 291 dict(), 292 emit_connector_builder_messages=self._emit_connector_builder_messages, 293 ) 294 if isinstance(check_stream, ConnectionChecker): 295 return check_stream 296 else: 297 raise ValueError( 298 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 299 ) 300 301 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 302 if self._spec_component: 303 self._spec_component.validate_config(config) 304 305 self._emit_manifest_debug_message( 306 extra_args={ 307 "source_name": self.name, 308 "parsed_config": json.dumps(self._source_config), 309 } 310 ) 311 312 stream_configs = ( 313 self._stream_configs(self._source_config, config=config) + self.dynamic_streams 314 ) 315 316 api_budget_model = self._source_config.get("api_budget") 317 if api_budget_model: 318 self._constructor.set_api_budget(api_budget_model, config) 319 320 source_streams = [ 321 self._constructor.create_component( 322 ( 323 StateDelegatingStreamModel 324 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 325 else DeclarativeStreamModel 326 ), 327 stream_config, 328 config, 329 emit_connector_builder_messages=self._emit_connector_builder_messages, 330 ) 331 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 332 ] 333 return source_streams 334 335 @staticmethod 336 def _initialize_cache_for_parent_streams( 337 stream_configs: List[Dict[str, Any]], 338 ) -> List[Dict[str, Any]]: 339 parent_streams = set() 340 341 def update_with_cache_parent_configs( 342 parent_configs: list[dict[str, Any]], 343 ) -> None: 344 for parent_config in parent_configs: 345 parent_streams.add(parent_config["stream"]["name"]) 346 if parent_config["stream"]["type"] == "StateDelegatingStream": 347 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 348 "use_cache" 349 ] = True 350 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 351 "use_cache" 352 ] = True 353 else: 354 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 355 356 for stream_config in stream_configs: 357 if stream_config.get("incremental_sync", {}).get("parent_stream"): 358 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 359 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 360 "use_cache" 361 ] = True 362 363 elif stream_config.get("retriever", {}).get("partition_router", {}): 364 partition_router = stream_config["retriever"]["partition_router"] 365 366 if isinstance(partition_router, dict) and partition_router.get( 367 "parent_stream_configs" 368 ): 369 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 370 elif isinstance(partition_router, list): 371 for router in partition_router: 372 if router.get("parent_stream_configs"): 373 update_with_cache_parent_configs(router["parent_stream_configs"]) 374 375 for stream_config in stream_configs: 376 if stream_config["name"] in parent_streams: 377 if stream_config["type"] == "StateDelegatingStream": 378 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 379 True 380 ) 381 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 382 True 383 ) 384 else: 385 stream_config["retriever"]["requester"]["use_cache"] = True 386 return stream_configs 387 388 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 389 """ 390 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 391 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 392 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 393 in the project root. 394 """ 395 self._configure_logger_level(logger) 396 self._emit_manifest_debug_message( 397 extra_args={ 398 "source_name": self.name, 399 "parsed_config": json.dumps(self._source_config), 400 } 401 ) 402 403 return ( 404 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 405 ) 406 407 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 408 self._configure_logger_level(logger) 409 return super().check(logger, config) 410 411 def read( 412 self, 413 logger: logging.Logger, 414 config: Mapping[str, Any], 415 catalog: ConfiguredAirbyteCatalog, 416 state: Optional[List[AirbyteStateMessage]] = None, 417 ) -> Iterator[AirbyteMessage]: 418 self._configure_logger_level(logger) 419 yield from super().read(logger, config, catalog, state) 420 421 def _configure_logger_level(self, logger: logging.Logger) -> None: 422 """ 423 Set the log level to logging.DEBUG if debug mode is enabled 424 """ 425 if self._debug: 426 logger.setLevel(logging.DEBUG) 427 428 def _validate_source(self) -> None: 429 """ 430 Validates the connector manifest against the declarative component schema 431 """ 432 433 try: 434 validate(self._source_config, self._declarative_component_schema) 435 except ValidationError as e: 436 raise ValidationError( 437 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 438 ) from e 439 440 cdk_version_str = metadata.version("airbyte_cdk") 441 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 442 manifest_version_str = self._source_config.get("version") 443 if manifest_version_str is None: 444 raise RuntimeError( 445 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 446 ) 447 manifest_version = self._parse_version(manifest_version_str, "manifest") 448 449 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 450 # Skipping version compatibility check on unreleased dev branch 451 pass 452 elif (cdk_version.major, cdk_version.minor) < ( 453 manifest_version.major, 454 manifest_version.minor, 455 ): 456 raise ValidationError( 457 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 458 f"manifest may contain features that are not in the current CDK version." 459 ) 460 elif (manifest_version.major, manifest_version.minor) < (0, 29): 461 raise ValidationError( 462 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 463 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 464 f"{cdk_version!s} which contains these breaking changes." 465 ) 466 467 @staticmethod 468 def _parse_version( 469 version: str, 470 version_type: str, 471 ) -> Version: 472 """Takes a semantic version represented as a string and splits it into a tuple. 473 474 The fourth part (prerelease) is not returned in the tuple. 475 476 Returns: 477 Version: the parsed version object 478 """ 479 try: 480 parsed_version = Version(version) 481 except InvalidVersion as ex: 482 raise ValidationError( 483 f"The {version_type} version '{version}' is not a valid version format." 484 ) from ex 485 else: 486 # No exception 487 return parsed_version 488 489 def _stream_configs( 490 self, manifest: Mapping[str, Any], config: Mapping[str, Any] 491 ) -> List[Dict[str, Any]]: 492 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 493 stream_configs = [] 494 for current_stream_config in manifest.get("streams", []): 495 if ( 496 "type" in current_stream_config 497 and current_stream_config["type"] == "ConditionalStreams" 498 ): 499 interpolated_boolean = InterpolatedBoolean( 500 condition=current_stream_config.get("condition"), 501 parameters={}, 502 ) 503 504 if interpolated_boolean.eval(config=config): 505 stream_configs.extend(current_stream_config.get("streams", [])) 506 else: 507 if "type" not in current_stream_config: 508 current_stream_config["type"] = "DeclarativeStream" 509 stream_configs.append(current_stream_config) 510 return stream_configs 511 512 def _dynamic_stream_configs( 513 self, 514 manifest: Mapping[str, Any], 515 config: Mapping[str, Any], 516 with_dynamic_stream_name: Optional[bool] = None, 517 ) -> List[Dict[str, Any]]: 518 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 519 dynamic_stream_configs: List[Dict[str, Any]] = [] 520 seen_dynamic_streams: Set[str] = set() 521 522 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 523 components_resolver_config = dynamic_definition["components_resolver"] 524 525 if not components_resolver_config: 526 raise ValueError( 527 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 528 ) 529 530 resolver_type = components_resolver_config.get("type") 531 if not resolver_type: 532 raise ValueError( 533 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 534 ) 535 536 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 537 raise ValueError( 538 f"Invalid components resolver type '{resolver_type}'. " 539 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 540 ) 541 542 if "retriever" in components_resolver_config: 543 components_resolver_config["retriever"]["requester"]["use_cache"] = True 544 545 # Create a resolver for dynamic components based on type 546 components_resolver = self._constructor.create_component( 547 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 548 components_resolver_config, 549 config, 550 ) 551 552 stream_template_config = dynamic_definition["stream_template"] 553 554 for dynamic_stream in components_resolver.resolve_components( 555 stream_template_config=stream_template_config 556 ): 557 dynamic_stream = { 558 **ManifestComponentTransformer().propagate_types_and_parameters( 559 "", dynamic_stream, {}, use_parent_parameters=True 560 ) 561 } 562 563 if "type" not in dynamic_stream: 564 dynamic_stream["type"] = "DeclarativeStream" 565 566 # Ensure that each stream is created with a unique name 567 name = dynamic_stream.get("name") 568 569 if with_dynamic_stream_name: 570 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 571 "name", f"dynamic_stream_{dynamic_definition_index}" 572 ) 573 574 if not isinstance(name, str): 575 raise ValueError( 576 f"Expected stream name {name} to be a string, got {type(name)}." 577 ) 578 579 if name in seen_dynamic_streams: 580 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 581 failure_type = FailureType.system_error 582 583 if resolver_type == "ConfigComponentsResolver": 584 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 585 failure_type = FailureType.config_error 586 587 raise AirbyteTracedException( 588 message=error_message, 589 internal_message=error_message, 590 failure_type=failure_type, 591 ) 592 593 seen_dynamic_streams.add(name) 594 dynamic_stream_configs.append(dynamic_stream) 595 596 return dynamic_stream_configs 597 598 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 599 self.logger.debug("declarative source created from manifest", extra=extra_args)
Declarative source defined by a manifest of low-code components that define source connector behavior
101 def __init__( 102 self, 103 source_config: ConnectionDefinition, 104 *, 105 config: Mapping[str, Any] | None = None, 106 debug: bool = False, 107 emit_connector_builder_messages: bool = False, 108 component_factory: Optional[ModelToComponentFactory] = None, 109 migrate_manifest: Optional[bool] = False, 110 normalize_manifest: Optional[bool] = False, 111 config_path: Optional[str] = None, 112 ) -> None: 113 """ 114 Args: 115 config: The provided config dict. 116 source_config: The manifest of low-code components that describe the source connector. 117 debug: True if debug mode is enabled. 118 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 119 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 120 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 121 config_path: Optional path to the config file. 122 """ 123 self.logger = logging.getLogger(f"airbyte.{self.name}") 124 self._should_normalize = normalize_manifest 125 self._should_migrate = migrate_manifest 126 self._declarative_component_schema = _get_declarative_component_schema() 127 # If custom components are needed, locate and/or register them. 128 self.components_module: ModuleType | None = get_registered_components_module(config=config) 129 # set additional attributes 130 self._debug = debug 131 self._emit_connector_builder_messages = emit_connector_builder_messages 132 self._constructor = ( 133 component_factory 134 if component_factory 135 else ModelToComponentFactory( 136 emit_connector_builder_messages=emit_connector_builder_messages, 137 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 138 ) 139 ) 140 self._message_repository = self._constructor.get_message_repository() 141 self._slice_logger: SliceLogger = ( 142 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 143 ) 144 145 # resolve all components in the manifest 146 self._source_config = self._pre_process_manifest(dict(source_config)) 147 # validate resolved manifest against the declarative component schema 148 self._validate_source() 149 # apply additional post-processing to the manifest 150 self._post_process_manifest() 151 152 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 153 self._spec_component: Optional[Spec] = ( 154 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 155 ) 156 self._config = self._migrate_and_transform_config(config_path, config) or {}
Arguments:
- config: The provided config dict.
- source_config: The manifest of low-code components that describe the source connector.
- debug: True if debug mode is enabled.
- emit_connector_builder_messages: True if messages should be emitted to the connector builder.
- component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
- normalize_manifest: Optional flag to indicate if the manifest should be normalized.
- config_path: Optional path to the config file.
158 @property 159 def resolved_manifest(self) -> Mapping[str, Any]: 160 """ 161 Returns the resolved manifest configuration for the source. 162 163 This property provides access to the internal source configuration as a mapping, 164 which contains all settings and parameters required to define the source's behavior. 165 166 Returns: 167 Mapping[str, Any]: The resolved source configuration manifest. 168 """ 169 return self._source_config
Returns the resolved manifest configuration for the source.
This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.
Returns:
Mapping[str, Any]: The resolved source configuration manifest.
280 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 281 return self._constructor.get_model_deprecations()
Returns a list of deprecation warnings for the source.
283 @property 284 def connection_checker(self) -> ConnectionChecker: 285 check = self._source_config["check"] 286 if "type" not in check: 287 check["type"] = "CheckStream" 288 check_stream = self._constructor.create_component( 289 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 290 check, 291 dict(), 292 emit_connector_builder_messages=self._emit_connector_builder_messages, 293 ) 294 if isinstance(check_stream, ConnectionChecker): 295 return check_stream 296 else: 297 raise ValueError( 298 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 299 )
Returns the ConnectionChecker to use for the check
operation
301 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 302 if self._spec_component: 303 self._spec_component.validate_config(config) 304 305 self._emit_manifest_debug_message( 306 extra_args={ 307 "source_name": self.name, 308 "parsed_config": json.dumps(self._source_config), 309 } 310 ) 311 312 stream_configs = ( 313 self._stream_configs(self._source_config, config=config) + self.dynamic_streams 314 ) 315 316 api_budget_model = self._source_config.get("api_budget") 317 if api_budget_model: 318 self._constructor.set_api_budget(api_budget_model, config) 319 320 source_streams = [ 321 self._constructor.create_component( 322 ( 323 StateDelegatingStreamModel 324 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 325 else DeclarativeStreamModel 326 ), 327 stream_config, 328 config, 329 emit_connector_builder_messages=self._emit_connector_builder_messages, 330 ) 331 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 332 ] 333 return source_streams
Parameters
- config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns
A list of the streams in this source connector.
388 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 389 """ 390 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 391 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 392 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 393 in the project root. 394 """ 395 self._configure_logger_level(logger) 396 self._emit_manifest_debug_message( 397 extra_args={ 398 "source_name": self.name, 399 "parsed_config": json.dumps(self._source_config), 400 } 401 ) 402 403 return ( 404 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 405 )
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
407 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 408 self._configure_logger_level(logger) 409 return super().check(logger, config)
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
411 def read( 412 self, 413 logger: logging.Logger, 414 config: Mapping[str, Any], 415 catalog: ConfiguredAirbyteCatalog, 416 state: Optional[List[AirbyteStateMessage]] = None, 417 ) -> Iterator[AirbyteMessage]: 418 self._configure_logger_level(logger) 419 yield from super().read(logger, config, catalog, state)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.