airbyte_cdk.sources.declarative.manifest_declarative_source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import json 6import logging 7import pkgutil 8from copy import deepcopy 9from importlib import metadata 10from types import ModuleType 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set 12 13import yaml 14from jsonschema.exceptions import ValidationError 15from jsonschema.validators import validate 16from packaging.version import InvalidVersion, Version 17 18from airbyte_cdk.connector_builder.models import ( 19 LogMessage as ConnectorBuilderLogMessage, 20) 21from airbyte_cdk.manifest_migrations.migration_handler import ( 22 ManifestMigrationHandler, 23) 24from airbyte_cdk.models import ( 25 AirbyteConnectionStatus, 26 AirbyteMessage, 27 AirbyteStateMessage, 28 ConfiguredAirbyteCatalog, 29 ConnectorSpecification, 30 FailureType, 31) 32from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 33from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 34from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 36 DeclarativeStream as DeclarativeStreamModel, 37) 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 39 Spec as SpecModel, 40) 41from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 42 StateDelegatingStream as StateDelegatingStreamModel, 43) 44from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 45 get_registered_components_module, 46) 47from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 48 ManifestComponentTransformer, 49) 50from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( 51 ManifestNormalizer, 52) 53from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 54 ManifestReferenceResolver, 55) 56from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 57 ModelToComponentFactory, 58) 59from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 60from airbyte_cdk.sources.message import MessageRepository 61from airbyte_cdk.sources.streams.core import Stream 62from airbyte_cdk.sources.types import ConnectionDefinition 63from airbyte_cdk.sources.utils.slice_logger import ( 64 AlwaysLogSliceLogger, 65 DebugSliceLogger, 66 SliceLogger, 67) 68from airbyte_cdk.utils.traced_exception import AirbyteTracedException 69 70 71def _get_declarative_component_schema() -> Dict[str, Any]: 72 try: 73 raw_component_schema = pkgutil.get_data( 74 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 75 ) 76 if raw_component_schema is not None: 77 declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) 78 return declarative_component_schema # type: ignore 79 else: 80 raise RuntimeError( 81 "Failed to read manifest component json schema required for deduplication" 82 ) 83 except FileNotFoundError as e: 84 raise FileNotFoundError( 85 f"Failed to read manifest component json schema required for deduplication: {e}" 86 ) 87 88 89class ManifestDeclarativeSource(DeclarativeSource): 90 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 91 92 def __init__( 93 self, 94 source_config: ConnectionDefinition, 95 *, 96 config: Mapping[str, Any] | None = None, 97 debug: bool = False, 98 emit_connector_builder_messages: bool = False, 99 component_factory: Optional[ModelToComponentFactory] = None, 100 migrate_manifest: Optional[bool] = False, 101 normalize_manifest: Optional[bool] = False, 102 ) -> None: 103 """ 104 Args: 105 config: The provided config dict. 106 source_config: The manifest of low-code components that describe the source connector. 107 debug: True if debug mode is enabled. 108 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 109 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 110 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 111 """ 112 self.logger = logging.getLogger(f"airbyte.{self.name}") 113 self._should_normalize = normalize_manifest 114 self._should_migrate = migrate_manifest 115 self._declarative_component_schema = _get_declarative_component_schema() 116 # If custom components are needed, locate and/or register them. 117 self.components_module: ModuleType | None = get_registered_components_module(config=config) 118 # set additional attributes 119 self._debug = debug 120 self._emit_connector_builder_messages = emit_connector_builder_messages 121 self._constructor = ( 122 component_factory 123 if component_factory 124 else ModelToComponentFactory( 125 emit_connector_builder_messages, 126 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 127 ) 128 ) 129 self._message_repository = self._constructor.get_message_repository() 130 self._slice_logger: SliceLogger = ( 131 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 132 ) 133 self._config = config or {} 134 135 # resolve all components in the manifest 136 self._source_config = self._pre_process_manifest(dict(source_config)) 137 # validate resolved manifest against the declarative component schema 138 self._validate_source() 139 # apply additional post-processing to the manifest 140 self._post_process_manifest() 141 142 @property 143 def resolved_manifest(self) -> Mapping[str, Any]: 144 """ 145 Returns the resolved manifest configuration for the source. 146 147 This property provides access to the internal source configuration as a mapping, 148 which contains all settings and parameters required to define the source's behavior. 149 150 Returns: 151 Mapping[str, Any]: The resolved source configuration manifest. 152 """ 153 return self._source_config 154 155 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 156 """ 157 Preprocesses the provided manifest dictionary by resolving any manifest references. 158 159 This method modifies the input manifest in place, resolving references using the 160 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 161 162 Args: 163 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 164 165 Returns: 166 None 167 """ 168 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 169 manifest = self._fix_source_type(manifest) 170 # Resolve references in the manifest 171 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 172 # Propagate types and parameters throughout the manifest 173 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 174 "", resolved_manifest, {} 175 ) 176 177 return propagated_manifest 178 179 def _post_process_manifest(self) -> None: 180 """ 181 Post-processes the manifest after validation. 182 This method is responsible for any additional modifications or transformations needed 183 after the manifest has been validated and before it is used in the source. 184 """ 185 # apply manifest migration, if required 186 self._migrate_manifest() 187 # apply manifest normalization, if required 188 self._normalize_manifest() 189 190 def _normalize_manifest(self) -> None: 191 """ 192 This method is used to normalize the manifest. It should be called after the manifest has been validated. 193 194 Connector Builder UI rendering requires the manifest to be in a specific format. 195 - references have been resolved 196 - the commonly used definitions are extracted to the `definitions.linked.*` 197 """ 198 if self._should_normalize: 199 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 200 self._source_config = normalizer.normalize() 201 202 def _migrate_manifest(self) -> None: 203 """ 204 This method is used to migrate the manifest. It should be called after the manifest has been validated. 205 The migration is done in place, so the original manifest is modified. 206 207 The original manifest is returned if any error occurs during migration. 208 """ 209 if self._should_migrate: 210 manifest_migrator = ManifestMigrationHandler(self._source_config) 211 self._source_config = manifest_migrator.apply_migrations() 212 # validate migrated manifest against the declarative component schema 213 self._validate_source() 214 215 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 216 """ 217 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 218 """ 219 if "type" not in manifest: 220 manifest["type"] = "DeclarativeSource" 221 222 return manifest 223 224 @property 225 def message_repository(self) -> MessageRepository: 226 return self._message_repository 227 228 @property 229 def dynamic_streams(self) -> List[Dict[str, Any]]: 230 return self._dynamic_stream_configs( 231 manifest=self._source_config, 232 config=self._config, 233 with_dynamic_stream_name=True, 234 ) 235 236 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 237 return self._constructor.get_model_deprecations() 238 239 @property 240 def connection_checker(self) -> ConnectionChecker: 241 check = self._source_config["check"] 242 if "type" not in check: 243 check["type"] = "CheckStream" 244 check_stream = self._constructor.create_component( 245 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 246 check, 247 dict(), 248 emit_connector_builder_messages=self._emit_connector_builder_messages, 249 ) 250 if isinstance(check_stream, ConnectionChecker): 251 return check_stream 252 else: 253 raise ValueError( 254 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 255 ) 256 257 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 258 self._emit_manifest_debug_message( 259 extra_args={ 260 "source_name": self.name, 261 "parsed_config": json.dumps(self._source_config), 262 } 263 ) 264 265 stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 266 self._source_config, config 267 ) 268 269 api_budget_model = self._source_config.get("api_budget") 270 if api_budget_model: 271 self._constructor.set_api_budget(api_budget_model, config) 272 273 source_streams = [ 274 self._constructor.create_component( 275 ( 276 StateDelegatingStreamModel 277 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 278 else DeclarativeStreamModel 279 ), 280 stream_config, 281 config, 282 emit_connector_builder_messages=self._emit_connector_builder_messages, 283 ) 284 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 285 ] 286 287 return source_streams 288 289 @staticmethod 290 def _initialize_cache_for_parent_streams( 291 stream_configs: List[Dict[str, Any]], 292 ) -> List[Dict[str, Any]]: 293 parent_streams = set() 294 295 def update_with_cache_parent_configs( 296 parent_configs: list[dict[str, Any]], 297 ) -> None: 298 for parent_config in parent_configs: 299 parent_streams.add(parent_config["stream"]["name"]) 300 if parent_config["stream"]["type"] == "StateDelegatingStream": 301 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 302 "use_cache" 303 ] = True 304 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 305 "use_cache" 306 ] = True 307 else: 308 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 309 310 for stream_config in stream_configs: 311 if stream_config.get("incremental_sync", {}).get("parent_stream"): 312 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 313 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 314 "use_cache" 315 ] = True 316 317 elif stream_config.get("retriever", {}).get("partition_router", {}): 318 partition_router = stream_config["retriever"]["partition_router"] 319 320 if isinstance(partition_router, dict) and partition_router.get( 321 "parent_stream_configs" 322 ): 323 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 324 elif isinstance(partition_router, list): 325 for router in partition_router: 326 if router.get("parent_stream_configs"): 327 update_with_cache_parent_configs(router["parent_stream_configs"]) 328 329 for stream_config in stream_configs: 330 if stream_config["name"] in parent_streams: 331 if stream_config["type"] == "StateDelegatingStream": 332 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 333 True 334 ) 335 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 336 True 337 ) 338 else: 339 stream_config["retriever"]["requester"]["use_cache"] = True 340 341 return stream_configs 342 343 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 344 """ 345 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 346 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 347 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 348 in the project root. 349 """ 350 self._configure_logger_level(logger) 351 self._emit_manifest_debug_message( 352 extra_args={ 353 "source_name": self.name, 354 "parsed_config": json.dumps(self._source_config), 355 } 356 ) 357 358 spec = self._source_config.get("spec") 359 if spec: 360 if "type" not in spec: 361 spec["type"] = "Spec" 362 spec_component = self._constructor.create_component(SpecModel, spec, dict()) 363 return spec_component.generate_spec() 364 else: 365 return super().spec(logger) 366 367 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 368 self._configure_logger_level(logger) 369 return super().check(logger, config) 370 371 def read( 372 self, 373 logger: logging.Logger, 374 config: Mapping[str, Any], 375 catalog: ConfiguredAirbyteCatalog, 376 state: Optional[List[AirbyteStateMessage]] = None, 377 ) -> Iterator[AirbyteMessage]: 378 self._configure_logger_level(logger) 379 yield from super().read(logger, config, catalog, state) 380 381 def _configure_logger_level(self, logger: logging.Logger) -> None: 382 """ 383 Set the log level to logging.DEBUG if debug mode is enabled 384 """ 385 if self._debug: 386 logger.setLevel(logging.DEBUG) 387 388 def _validate_source(self) -> None: 389 """ 390 Validates the connector manifest against the declarative component schema 391 """ 392 393 try: 394 validate(self._source_config, self._declarative_component_schema) 395 except ValidationError as e: 396 raise ValidationError( 397 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 398 ) from e 399 400 cdk_version_str = metadata.version("airbyte_cdk") 401 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 402 manifest_version_str = self._source_config.get("version") 403 if manifest_version_str is None: 404 raise RuntimeError( 405 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 406 ) 407 manifest_version = self._parse_version(manifest_version_str, "manifest") 408 409 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 410 # Skipping version compatibility check on unreleased dev branch 411 pass 412 elif (cdk_version.major, cdk_version.minor) < ( 413 manifest_version.major, 414 manifest_version.minor, 415 ): 416 raise ValidationError( 417 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 418 f"manifest may contain features that are not in the current CDK version." 419 ) 420 elif (manifest_version.major, manifest_version.minor) < (0, 29): 421 raise ValidationError( 422 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 423 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 424 f"{cdk_version!s} which contains these breaking changes." 425 ) 426 427 @staticmethod 428 def _parse_version( 429 version: str, 430 version_type: str, 431 ) -> Version: 432 """Takes a semantic version represented as a string and splits it into a tuple. 433 434 The fourth part (prerelease) is not returned in the tuple. 435 436 Returns: 437 Version: the parsed version object 438 """ 439 try: 440 parsed_version = Version(version) 441 except InvalidVersion as ex: 442 raise ValidationError( 443 f"The {version_type} version '{version}' is not a valid version format." 444 ) from ex 445 else: 446 # No exception 447 return parsed_version 448 449 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 450 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 451 stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) 452 for s in stream_configs: 453 if "type" not in s: 454 s["type"] = "DeclarativeStream" 455 return stream_configs 456 457 def _dynamic_stream_configs( 458 self, 459 manifest: Mapping[str, Any], 460 config: Mapping[str, Any], 461 with_dynamic_stream_name: Optional[bool] = None, 462 ) -> List[Dict[str, Any]]: 463 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 464 dynamic_stream_configs: List[Dict[str, Any]] = [] 465 seen_dynamic_streams: Set[str] = set() 466 467 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 468 components_resolver_config = dynamic_definition["components_resolver"] 469 470 if not components_resolver_config: 471 raise ValueError( 472 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 473 ) 474 475 resolver_type = components_resolver_config.get("type") 476 if not resolver_type: 477 raise ValueError( 478 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 479 ) 480 481 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 482 raise ValueError( 483 f"Invalid components resolver type '{resolver_type}'. " 484 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 485 ) 486 487 if "retriever" in components_resolver_config: 488 components_resolver_config["retriever"]["requester"]["use_cache"] = True 489 490 # Create a resolver for dynamic components based on type 491 components_resolver = self._constructor.create_component( 492 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 493 components_resolver_config, 494 config, 495 ) 496 497 stream_template_config = dynamic_definition["stream_template"] 498 499 for dynamic_stream in components_resolver.resolve_components( 500 stream_template_config=stream_template_config 501 ): 502 dynamic_stream = { 503 **ManifestComponentTransformer().propagate_types_and_parameters( 504 "", dynamic_stream, {}, use_parent_parameters=True 505 ) 506 } 507 508 if "type" not in dynamic_stream: 509 dynamic_stream["type"] = "DeclarativeStream" 510 511 # Ensure that each stream is created with a unique name 512 name = dynamic_stream.get("name") 513 514 if with_dynamic_stream_name: 515 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 516 "name", f"dynamic_stream_{dynamic_definition_index}" 517 ) 518 519 if not isinstance(name, str): 520 raise ValueError( 521 f"Expected stream name {name} to be a string, got {type(name)}." 522 ) 523 524 if name in seen_dynamic_streams: 525 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 526 failure_type = FailureType.system_error 527 528 if resolver_type == "ConfigComponentsResolver": 529 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 530 failure_type = FailureType.config_error 531 532 raise AirbyteTracedException( 533 message=error_message, 534 internal_message=error_message, 535 failure_type=failure_type, 536 ) 537 538 seen_dynamic_streams.add(name) 539 dynamic_stream_configs.append(dynamic_stream) 540 541 return dynamic_stream_configs 542 543 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 544 self.logger.debug("declarative source created from manifest", extra=extra_args)
90class ManifestDeclarativeSource(DeclarativeSource): 91 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 92 93 def __init__( 94 self, 95 source_config: ConnectionDefinition, 96 *, 97 config: Mapping[str, Any] | None = None, 98 debug: bool = False, 99 emit_connector_builder_messages: bool = False, 100 component_factory: Optional[ModelToComponentFactory] = None, 101 migrate_manifest: Optional[bool] = False, 102 normalize_manifest: Optional[bool] = False, 103 ) -> None: 104 """ 105 Args: 106 config: The provided config dict. 107 source_config: The manifest of low-code components that describe the source connector. 108 debug: True if debug mode is enabled. 109 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 110 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 111 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 112 """ 113 self.logger = logging.getLogger(f"airbyte.{self.name}") 114 self._should_normalize = normalize_manifest 115 self._should_migrate = migrate_manifest 116 self._declarative_component_schema = _get_declarative_component_schema() 117 # If custom components are needed, locate and/or register them. 118 self.components_module: ModuleType | None = get_registered_components_module(config=config) 119 # set additional attributes 120 self._debug = debug 121 self._emit_connector_builder_messages = emit_connector_builder_messages 122 self._constructor = ( 123 component_factory 124 if component_factory 125 else ModelToComponentFactory( 126 emit_connector_builder_messages, 127 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 128 ) 129 ) 130 self._message_repository = self._constructor.get_message_repository() 131 self._slice_logger: SliceLogger = ( 132 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 133 ) 134 self._config = config or {} 135 136 # resolve all components in the manifest 137 self._source_config = self._pre_process_manifest(dict(source_config)) 138 # validate resolved manifest against the declarative component schema 139 self._validate_source() 140 # apply additional post-processing to the manifest 141 self._post_process_manifest() 142 143 @property 144 def resolved_manifest(self) -> Mapping[str, Any]: 145 """ 146 Returns the resolved manifest configuration for the source. 147 148 This property provides access to the internal source configuration as a mapping, 149 which contains all settings and parameters required to define the source's behavior. 150 151 Returns: 152 Mapping[str, Any]: The resolved source configuration manifest. 153 """ 154 return self._source_config 155 156 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 157 """ 158 Preprocesses the provided manifest dictionary by resolving any manifest references. 159 160 This method modifies the input manifest in place, resolving references using the 161 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 162 163 Args: 164 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 165 166 Returns: 167 None 168 """ 169 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 170 manifest = self._fix_source_type(manifest) 171 # Resolve references in the manifest 172 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 173 # Propagate types and parameters throughout the manifest 174 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 175 "", resolved_manifest, {} 176 ) 177 178 return propagated_manifest 179 180 def _post_process_manifest(self) -> None: 181 """ 182 Post-processes the manifest after validation. 183 This method is responsible for any additional modifications or transformations needed 184 after the manifest has been validated and before it is used in the source. 185 """ 186 # apply manifest migration, if required 187 self._migrate_manifest() 188 # apply manifest normalization, if required 189 self._normalize_manifest() 190 191 def _normalize_manifest(self) -> None: 192 """ 193 This method is used to normalize the manifest. It should be called after the manifest has been validated. 194 195 Connector Builder UI rendering requires the manifest to be in a specific format. 196 - references have been resolved 197 - the commonly used definitions are extracted to the `definitions.linked.*` 198 """ 199 if self._should_normalize: 200 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 201 self._source_config = normalizer.normalize() 202 203 def _migrate_manifest(self) -> None: 204 """ 205 This method is used to migrate the manifest. It should be called after the manifest has been validated. 206 The migration is done in place, so the original manifest is modified. 207 208 The original manifest is returned if any error occurs during migration. 209 """ 210 if self._should_migrate: 211 manifest_migrator = ManifestMigrationHandler(self._source_config) 212 self._source_config = manifest_migrator.apply_migrations() 213 # validate migrated manifest against the declarative component schema 214 self._validate_source() 215 216 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 217 """ 218 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 219 """ 220 if "type" not in manifest: 221 manifest["type"] = "DeclarativeSource" 222 223 return manifest 224 225 @property 226 def message_repository(self) -> MessageRepository: 227 return self._message_repository 228 229 @property 230 def dynamic_streams(self) -> List[Dict[str, Any]]: 231 return self._dynamic_stream_configs( 232 manifest=self._source_config, 233 config=self._config, 234 with_dynamic_stream_name=True, 235 ) 236 237 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 238 return self._constructor.get_model_deprecations() 239 240 @property 241 def connection_checker(self) -> ConnectionChecker: 242 check = self._source_config["check"] 243 if "type" not in check: 244 check["type"] = "CheckStream" 245 check_stream = self._constructor.create_component( 246 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 247 check, 248 dict(), 249 emit_connector_builder_messages=self._emit_connector_builder_messages, 250 ) 251 if isinstance(check_stream, ConnectionChecker): 252 return check_stream 253 else: 254 raise ValueError( 255 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 256 ) 257 258 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 259 self._emit_manifest_debug_message( 260 extra_args={ 261 "source_name": self.name, 262 "parsed_config": json.dumps(self._source_config), 263 } 264 ) 265 266 stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 267 self._source_config, config 268 ) 269 270 api_budget_model = self._source_config.get("api_budget") 271 if api_budget_model: 272 self._constructor.set_api_budget(api_budget_model, config) 273 274 source_streams = [ 275 self._constructor.create_component( 276 ( 277 StateDelegatingStreamModel 278 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 279 else DeclarativeStreamModel 280 ), 281 stream_config, 282 config, 283 emit_connector_builder_messages=self._emit_connector_builder_messages, 284 ) 285 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 286 ] 287 288 return source_streams 289 290 @staticmethod 291 def _initialize_cache_for_parent_streams( 292 stream_configs: List[Dict[str, Any]], 293 ) -> List[Dict[str, Any]]: 294 parent_streams = set() 295 296 def update_with_cache_parent_configs( 297 parent_configs: list[dict[str, Any]], 298 ) -> None: 299 for parent_config in parent_configs: 300 parent_streams.add(parent_config["stream"]["name"]) 301 if parent_config["stream"]["type"] == "StateDelegatingStream": 302 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 303 "use_cache" 304 ] = True 305 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 306 "use_cache" 307 ] = True 308 else: 309 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 310 311 for stream_config in stream_configs: 312 if stream_config.get("incremental_sync", {}).get("parent_stream"): 313 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 314 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 315 "use_cache" 316 ] = True 317 318 elif stream_config.get("retriever", {}).get("partition_router", {}): 319 partition_router = stream_config["retriever"]["partition_router"] 320 321 if isinstance(partition_router, dict) and partition_router.get( 322 "parent_stream_configs" 323 ): 324 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 325 elif isinstance(partition_router, list): 326 for router in partition_router: 327 if router.get("parent_stream_configs"): 328 update_with_cache_parent_configs(router["parent_stream_configs"]) 329 330 for stream_config in stream_configs: 331 if stream_config["name"] in parent_streams: 332 if stream_config["type"] == "StateDelegatingStream": 333 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 334 True 335 ) 336 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 337 True 338 ) 339 else: 340 stream_config["retriever"]["requester"]["use_cache"] = True 341 342 return stream_configs 343 344 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 345 """ 346 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 347 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 348 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 349 in the project root. 350 """ 351 self._configure_logger_level(logger) 352 self._emit_manifest_debug_message( 353 extra_args={ 354 "source_name": self.name, 355 "parsed_config": json.dumps(self._source_config), 356 } 357 ) 358 359 spec = self._source_config.get("spec") 360 if spec: 361 if "type" not in spec: 362 spec["type"] = "Spec" 363 spec_component = self._constructor.create_component(SpecModel, spec, dict()) 364 return spec_component.generate_spec() 365 else: 366 return super().spec(logger) 367 368 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 369 self._configure_logger_level(logger) 370 return super().check(logger, config) 371 372 def read( 373 self, 374 logger: logging.Logger, 375 config: Mapping[str, Any], 376 catalog: ConfiguredAirbyteCatalog, 377 state: Optional[List[AirbyteStateMessage]] = None, 378 ) -> Iterator[AirbyteMessage]: 379 self._configure_logger_level(logger) 380 yield from super().read(logger, config, catalog, state) 381 382 def _configure_logger_level(self, logger: logging.Logger) -> None: 383 """ 384 Set the log level to logging.DEBUG if debug mode is enabled 385 """ 386 if self._debug: 387 logger.setLevel(logging.DEBUG) 388 389 def _validate_source(self) -> None: 390 """ 391 Validates the connector manifest against the declarative component schema 392 """ 393 394 try: 395 validate(self._source_config, self._declarative_component_schema) 396 except ValidationError as e: 397 raise ValidationError( 398 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 399 ) from e 400 401 cdk_version_str = metadata.version("airbyte_cdk") 402 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 403 manifest_version_str = self._source_config.get("version") 404 if manifest_version_str is None: 405 raise RuntimeError( 406 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 407 ) 408 manifest_version = self._parse_version(manifest_version_str, "manifest") 409 410 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 411 # Skipping version compatibility check on unreleased dev branch 412 pass 413 elif (cdk_version.major, cdk_version.minor) < ( 414 manifest_version.major, 415 manifest_version.minor, 416 ): 417 raise ValidationError( 418 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 419 f"manifest may contain features that are not in the current CDK version." 420 ) 421 elif (manifest_version.major, manifest_version.minor) < (0, 29): 422 raise ValidationError( 423 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 424 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 425 f"{cdk_version!s} which contains these breaking changes." 426 ) 427 428 @staticmethod 429 def _parse_version( 430 version: str, 431 version_type: str, 432 ) -> Version: 433 """Takes a semantic version represented as a string and splits it into a tuple. 434 435 The fourth part (prerelease) is not returned in the tuple. 436 437 Returns: 438 Version: the parsed version object 439 """ 440 try: 441 parsed_version = Version(version) 442 except InvalidVersion as ex: 443 raise ValidationError( 444 f"The {version_type} version '{version}' is not a valid version format." 445 ) from ex 446 else: 447 # No exception 448 return parsed_version 449 450 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 451 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 452 stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) 453 for s in stream_configs: 454 if "type" not in s: 455 s["type"] = "DeclarativeStream" 456 return stream_configs 457 458 def _dynamic_stream_configs( 459 self, 460 manifest: Mapping[str, Any], 461 config: Mapping[str, Any], 462 with_dynamic_stream_name: Optional[bool] = None, 463 ) -> List[Dict[str, Any]]: 464 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 465 dynamic_stream_configs: List[Dict[str, Any]] = [] 466 seen_dynamic_streams: Set[str] = set() 467 468 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 469 components_resolver_config = dynamic_definition["components_resolver"] 470 471 if not components_resolver_config: 472 raise ValueError( 473 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 474 ) 475 476 resolver_type = components_resolver_config.get("type") 477 if not resolver_type: 478 raise ValueError( 479 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 480 ) 481 482 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 483 raise ValueError( 484 f"Invalid components resolver type '{resolver_type}'. " 485 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 486 ) 487 488 if "retriever" in components_resolver_config: 489 components_resolver_config["retriever"]["requester"]["use_cache"] = True 490 491 # Create a resolver for dynamic components based on type 492 components_resolver = self._constructor.create_component( 493 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 494 components_resolver_config, 495 config, 496 ) 497 498 stream_template_config = dynamic_definition["stream_template"] 499 500 for dynamic_stream in components_resolver.resolve_components( 501 stream_template_config=stream_template_config 502 ): 503 dynamic_stream = { 504 **ManifestComponentTransformer().propagate_types_and_parameters( 505 "", dynamic_stream, {}, use_parent_parameters=True 506 ) 507 } 508 509 if "type" not in dynamic_stream: 510 dynamic_stream["type"] = "DeclarativeStream" 511 512 # Ensure that each stream is created with a unique name 513 name = dynamic_stream.get("name") 514 515 if with_dynamic_stream_name: 516 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 517 "name", f"dynamic_stream_{dynamic_definition_index}" 518 ) 519 520 if not isinstance(name, str): 521 raise ValueError( 522 f"Expected stream name {name} to be a string, got {type(name)}." 523 ) 524 525 if name in seen_dynamic_streams: 526 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 527 failure_type = FailureType.system_error 528 529 if resolver_type == "ConfigComponentsResolver": 530 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 531 failure_type = FailureType.config_error 532 533 raise AirbyteTracedException( 534 message=error_message, 535 internal_message=error_message, 536 failure_type=failure_type, 537 ) 538 539 seen_dynamic_streams.add(name) 540 dynamic_stream_configs.append(dynamic_stream) 541 542 return dynamic_stream_configs 543 544 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 545 self.logger.debug("declarative source created from manifest", extra=extra_args)
Declarative source defined by a manifest of low-code components that define source connector behavior
93 def __init__( 94 self, 95 source_config: ConnectionDefinition, 96 *, 97 config: Mapping[str, Any] | None = None, 98 debug: bool = False, 99 emit_connector_builder_messages: bool = False, 100 component_factory: Optional[ModelToComponentFactory] = None, 101 migrate_manifest: Optional[bool] = False, 102 normalize_manifest: Optional[bool] = False, 103 ) -> None: 104 """ 105 Args: 106 config: The provided config dict. 107 source_config: The manifest of low-code components that describe the source connector. 108 debug: True if debug mode is enabled. 109 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 110 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 111 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 112 """ 113 self.logger = logging.getLogger(f"airbyte.{self.name}") 114 self._should_normalize = normalize_manifest 115 self._should_migrate = migrate_manifest 116 self._declarative_component_schema = _get_declarative_component_schema() 117 # If custom components are needed, locate and/or register them. 118 self.components_module: ModuleType | None = get_registered_components_module(config=config) 119 # set additional attributes 120 self._debug = debug 121 self._emit_connector_builder_messages = emit_connector_builder_messages 122 self._constructor = ( 123 component_factory 124 if component_factory 125 else ModelToComponentFactory( 126 emit_connector_builder_messages, 127 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 128 ) 129 ) 130 self._message_repository = self._constructor.get_message_repository() 131 self._slice_logger: SliceLogger = ( 132 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 133 ) 134 self._config = config or {} 135 136 # resolve all components in the manifest 137 self._source_config = self._pre_process_manifest(dict(source_config)) 138 # validate resolved manifest against the declarative component schema 139 self._validate_source() 140 # apply additional post-processing to the manifest 141 self._post_process_manifest()
Arguments:
- config: The provided config dict.
- source_config: The manifest of low-code components that describe the source connector.
- debug: True if debug mode is enabled.
- emit_connector_builder_messages: True if messages should be emitted to the connector builder.
- component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
- normalize_manifest: Optional flag to indicate if the manifest should be normalized.
143 @property 144 def resolved_manifest(self) -> Mapping[str, Any]: 145 """ 146 Returns the resolved manifest configuration for the source. 147 148 This property provides access to the internal source configuration as a mapping, 149 which contains all settings and parameters required to define the source's behavior. 150 151 Returns: 152 Mapping[str, Any]: The resolved source configuration manifest. 153 """ 154 return self._source_config
Returns the resolved manifest configuration for the source.
This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.
Returns:
Mapping[str, Any]: The resolved source configuration manifest.
237 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 238 return self._constructor.get_model_deprecations()
Returns a list of deprecation warnings for the source.
240 @property 241 def connection_checker(self) -> ConnectionChecker: 242 check = self._source_config["check"] 243 if "type" not in check: 244 check["type"] = "CheckStream" 245 check_stream = self._constructor.create_component( 246 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 247 check, 248 dict(), 249 emit_connector_builder_messages=self._emit_connector_builder_messages, 250 ) 251 if isinstance(check_stream, ConnectionChecker): 252 return check_stream 253 else: 254 raise ValueError( 255 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 256 )
Returns the ConnectionChecker to use for the check
operation
258 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 259 self._emit_manifest_debug_message( 260 extra_args={ 261 "source_name": self.name, 262 "parsed_config": json.dumps(self._source_config), 263 } 264 ) 265 266 stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 267 self._source_config, config 268 ) 269 270 api_budget_model = self._source_config.get("api_budget") 271 if api_budget_model: 272 self._constructor.set_api_budget(api_budget_model, config) 273 274 source_streams = [ 275 self._constructor.create_component( 276 ( 277 StateDelegatingStreamModel 278 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 279 else DeclarativeStreamModel 280 ), 281 stream_config, 282 config, 283 emit_connector_builder_messages=self._emit_connector_builder_messages, 284 ) 285 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 286 ] 287 288 return source_streams
Parameters
- config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns
A list of the streams in this source connector.
344 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 345 """ 346 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 347 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 348 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 349 in the project root. 350 """ 351 self._configure_logger_level(logger) 352 self._emit_manifest_debug_message( 353 extra_args={ 354 "source_name": self.name, 355 "parsed_config": json.dumps(self._source_config), 356 } 357 ) 358 359 spec = self._source_config.get("spec") 360 if spec: 361 if "type" not in spec: 362 spec["type"] = "Spec" 363 spec_component = self._constructor.create_component(SpecModel, spec, dict()) 364 return spec_component.generate_spec() 365 else: 366 return super().spec(logger)
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
368 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 369 self._configure_logger_level(logger) 370 return super().check(logger, config)
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
372 def read( 373 self, 374 logger: logging.Logger, 375 config: Mapping[str, Any], 376 catalog: ConfiguredAirbyteCatalog, 377 state: Optional[List[AirbyteStateMessage]] = None, 378 ) -> Iterator[AirbyteMessage]: 379 self._configure_logger_level(logger) 380 yield from super().read(logger, config, catalog, state)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.