airbyte_cdk.legacy.sources.declarative.manifest_declarative_source
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5import json 6import logging 7import pkgutil 8from copy import deepcopy 9from importlib import metadata 10from types import ModuleType 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union 12 13import orjson 14import yaml 15from jsonschema.exceptions import ValidationError 16from jsonschema.validators import validate 17from packaging.version import InvalidVersion, Version 18 19from airbyte_cdk.config_observation import create_connector_config_control_message 20from airbyte_cdk.connector_builder.models import ( 21 LogMessage as ConnectorBuilderLogMessage, 22) 23from airbyte_cdk.legacy.sources.declarative.declarative_source import DeclarativeSource 24from airbyte_cdk.manifest_migrations.migration_handler import ( 25 ManifestMigrationHandler, 26) 27from airbyte_cdk.models import ( 28 AirbyteConnectionStatus, 29 AirbyteMessage, 30 AirbyteStateMessage, 31 ConfiguredAirbyteCatalog, 32 ConnectorSpecification, 33 FailureType, 34) 35from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer 36from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 37from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 38from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean 39from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 40 ConditionalStreams as ConditionalStreamsModel, 41) 42from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 43 DeclarativeStream as DeclarativeStreamModel, 44) 45from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 46 Spec as SpecModel, 47) 48from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 49 StateDelegatingStream as StateDelegatingStreamModel, 50) 51from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 52 get_registered_components_module, 53) 54from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 55 ManifestComponentTransformer, 56) 57from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( 58 ManifestNormalizer, 59) 60from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 61 ManifestReferenceResolver, 62) 63from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 64 ModelToComponentFactory, 65) 66from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 67from airbyte_cdk.sources.declarative.spec.spec import Spec 68from airbyte_cdk.sources.message import MessageRepository 69from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 70from airbyte_cdk.sources.streams.core import Stream 71from airbyte_cdk.sources.types import Config, ConnectionDefinition 72from airbyte_cdk.sources.utils.slice_logger import ( 73 AlwaysLogSliceLogger, 74 DebugSliceLogger, 75 SliceLogger, 76) 77from airbyte_cdk.utils.traced_exception import AirbyteTracedException 78 79 80def _get_declarative_component_schema() -> Dict[str, Any]: 81 try: 82 raw_component_schema = pkgutil.get_data( 83 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 84 ) 85 if raw_component_schema is not None: 86 declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) 87 return declarative_component_schema # type: ignore 88 else: 89 raise RuntimeError( 90 "Failed to read manifest component json schema required for deduplication" 91 ) 92 except FileNotFoundError as e: 93 raise FileNotFoundError( 94 f"Failed to read manifest component json schema required for deduplication: {e}" 95 ) 96 97 98class ManifestDeclarativeSource(DeclarativeSource): 99 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 100 101 def __init__( 102 self, 103 source_config: ConnectionDefinition, 104 *, 105 config: Mapping[str, Any] | None = None, 106 debug: bool = False, 107 emit_connector_builder_messages: bool = False, 108 component_factory: Optional[ModelToComponentFactory] = None, 109 migrate_manifest: Optional[bool] = False, 110 normalize_manifest: Optional[bool] = False, 111 config_path: Optional[str] = None, 112 ) -> None: 113 """ 114 Args: 115 config: The provided config dict. 116 source_config: The manifest of low-code components that describe the source connector. 117 debug: True if debug mode is enabled. 118 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 119 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 120 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 121 config_path: Optional path to the config file. 122 """ 123 self.logger = logging.getLogger(f"airbyte.{self.name}") 124 self._should_normalize = normalize_manifest 125 self._should_migrate = migrate_manifest 126 self._declarative_component_schema = _get_declarative_component_schema() 127 # If custom components are needed, locate and/or register them. 128 self.components_module: ModuleType | None = get_registered_components_module(config=config) 129 # set additional attributes 130 self._debug = debug 131 self._emit_connector_builder_messages = emit_connector_builder_messages 132 self._constructor = ( 133 component_factory 134 if component_factory 135 else ModelToComponentFactory( 136 emit_connector_builder_messages=emit_connector_builder_messages, 137 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 138 ) 139 ) 140 self._message_repository = self._constructor.get_message_repository() 141 self._slice_logger: SliceLogger = ( 142 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 143 ) 144 145 # resolve all components in the manifest 146 self._source_config = self._pre_process_manifest(dict(source_config)) 147 # validate resolved manifest against the declarative component schema 148 self._validate_source() 149 # apply additional post-processing to the manifest 150 self._post_process_manifest() 151 152 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 153 self._spec_component: Optional[Spec] = ( 154 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 155 ) 156 self._config = self._migrate_and_transform_config(config_path, config) or {} 157 158 @property 159 def resolved_manifest(self) -> Mapping[str, Any]: 160 """ 161 Returns the resolved manifest configuration for the source. 162 163 This property provides access to the internal source configuration as a mapping, 164 which contains all settings and parameters required to define the source's behavior. 165 166 Returns: 167 Mapping[str, Any]: The resolved source configuration manifest. 168 """ 169 return self._source_config 170 171 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 172 """ 173 Preprocesses the provided manifest dictionary by resolving any manifest references. 174 175 This method modifies the input manifest in place, resolving references using the 176 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 177 178 Args: 179 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 180 181 Returns: 182 None 183 """ 184 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 185 manifest = self._fix_source_type(manifest) 186 # Resolve references in the manifest 187 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 188 # Propagate types and parameters throughout the manifest 189 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 190 "", resolved_manifest, {} 191 ) 192 193 return propagated_manifest 194 195 def _post_process_manifest(self) -> None: 196 """ 197 Post-processes the manifest after validation. 198 This method is responsible for any additional modifications or transformations needed 199 after the manifest has been validated and before it is used in the source. 200 """ 201 # apply manifest migration, if required 202 self._migrate_manifest() 203 # apply manifest normalization, if required 204 self._normalize_manifest() 205 206 def _normalize_manifest(self) -> None: 207 """ 208 This method is used to normalize the manifest. It should be called after the manifest has been validated. 209 210 Connector Builder UI rendering requires the manifest to be in a specific format. 211 - references have been resolved 212 - the commonly used definitions are extracted to the `definitions.linked.*` 213 """ 214 if self._should_normalize: 215 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 216 self._source_config = normalizer.normalize() 217 218 def _migrate_and_transform_config( 219 self, 220 config_path: Optional[str], 221 config: Optional[Config], 222 ) -> Optional[Config]: 223 if not config: 224 return None 225 if not self._spec_component: 226 return config 227 mutable_config = dict(config) 228 self._spec_component.migrate_config(mutable_config) 229 if mutable_config != config: 230 if config_path: 231 with open(config_path, "w") as f: 232 json.dump(mutable_config, f) 233 self.message_repository.emit_message( 234 create_connector_config_control_message(mutable_config) 235 ) 236 # We have no mechanism for consuming the queue, so we print the messages to stdout 237 for message in self.message_repository.consume_queue(): 238 print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) 239 self._spec_component.transform_config(mutable_config) 240 return mutable_config 241 242 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 243 config = self._config or config 244 return super().configure(config, temp_dir) 245 246 def _migrate_manifest(self) -> None: 247 """ 248 This method is used to migrate the manifest. It should be called after the manifest has been validated. 249 The migration is done in place, so the original manifest is modified. 250 251 The original manifest is returned if any error occurs during migration. 252 """ 253 if self._should_migrate: 254 manifest_migrator = ManifestMigrationHandler(self._source_config) 255 self._source_config = manifest_migrator.apply_migrations() 256 # validate migrated manifest against the declarative component schema 257 self._validate_source() 258 259 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 260 """ 261 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 262 """ 263 if "type" not in manifest: 264 manifest["type"] = "DeclarativeSource" 265 266 return manifest 267 268 @property 269 def message_repository(self) -> MessageRepository: 270 return self._message_repository 271 272 @property 273 def dynamic_streams(self) -> List[Dict[str, Any]]: 274 return self._dynamic_stream_configs( 275 manifest=self._source_config, 276 config=self._config, 277 with_dynamic_stream_name=True, 278 ) 279 280 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 281 return self._constructor.get_model_deprecations() 282 283 @property 284 def connection_checker(self) -> ConnectionChecker: 285 check = self._source_config["check"] 286 if "type" not in check: 287 check["type"] = "CheckStream" 288 check_stream = self._constructor.create_component( 289 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 290 check, 291 dict(), 292 emit_connector_builder_messages=self._emit_connector_builder_messages, 293 ) 294 if isinstance(check_stream, ConnectionChecker): 295 return check_stream 296 else: 297 raise ValueError( 298 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 299 ) 300 301 def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 302 """ 303 As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). 304 Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to 305 fully decouple this from the AbstractSource. 306 """ 307 if self._spec_component: 308 self._spec_component.validate_config(config) 309 310 self._emit_manifest_debug_message( 311 extra_args={ 312 "source_name": self.name, 313 "parsed_config": json.dumps(self._source_config), 314 } 315 ) 316 317 stream_configs = ( 318 self._stream_configs(self._source_config, config=config) + self.dynamic_streams 319 ) 320 321 api_budget_model = self._source_config.get("api_budget") 322 if api_budget_model: 323 self._constructor.set_api_budget(api_budget_model, config) 324 325 source_streams = [ 326 self._constructor.create_component( 327 ( 328 StateDelegatingStreamModel 329 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 330 else DeclarativeStreamModel 331 ), 332 stream_config, 333 config, 334 emit_connector_builder_messages=self._emit_connector_builder_messages, 335 ) 336 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 337 ] 338 return source_streams 339 340 @staticmethod 341 def _initialize_cache_for_parent_streams( 342 stream_configs: List[Dict[str, Any]], 343 ) -> List[Dict[str, Any]]: 344 """Enable caching for parent streams unless explicitly disabled. 345 346 Caching is enabled by default for parent streams to optimize performance when the same 347 parent data is needed by multiple child streams. However, explicit `use_cache: false` 348 settings are respected for streams that cannot use caching (e.g., scroll-based pagination 349 APIs where caching causes duplicate records). 350 """ 351 parent_streams = set() 352 353 def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None: 354 """Set use_cache to True only if not explicitly disabled.""" 355 if requester.get("use_cache") is not False: 356 requester["use_cache"] = True 357 358 def update_with_cache_parent_configs( 359 parent_configs: list[dict[str, Any]], 360 ) -> None: 361 for parent_config in parent_configs: 362 parent_streams.add(parent_config["stream"]["name"]) 363 if parent_config["stream"]["type"] == "StateDelegatingStream": 364 _set_cache_if_not_disabled( 365 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"] 366 ) 367 _set_cache_if_not_disabled( 368 parent_config["stream"]["incremental_stream"]["retriever"]["requester"] 369 ) 370 else: 371 _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"]) 372 373 for stream_config in stream_configs: 374 if stream_config.get("incremental_sync", {}).get("parent_stream"): 375 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 376 _set_cache_if_not_disabled( 377 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"] 378 ) 379 380 elif stream_config.get("retriever", {}).get("partition_router", {}): 381 partition_router = stream_config["retriever"]["partition_router"] 382 383 if isinstance(partition_router, dict) and partition_router.get( 384 "parent_stream_configs" 385 ): 386 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 387 elif isinstance(partition_router, list): 388 for router in partition_router: 389 if router.get("parent_stream_configs"): 390 update_with_cache_parent_configs(router["parent_stream_configs"]) 391 392 for stream_config in stream_configs: 393 if stream_config["name"] in parent_streams: 394 if stream_config["type"] == "StateDelegatingStream": 395 _set_cache_if_not_disabled( 396 stream_config["full_refresh_stream"]["retriever"]["requester"] 397 ) 398 _set_cache_if_not_disabled( 399 stream_config["incremental_stream"]["retriever"]["requester"] 400 ) 401 else: 402 _set_cache_if_not_disabled(stream_config["retriever"]["requester"]) 403 return stream_configs 404 405 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 406 """ 407 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 408 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 409 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 410 in the project root. 411 """ 412 self._configure_logger_level(logger) 413 self._emit_manifest_debug_message( 414 extra_args={ 415 "source_name": self.name, 416 "parsed_config": json.dumps(self._source_config), 417 } 418 ) 419 420 return ( 421 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 422 ) 423 424 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 425 self._configure_logger_level(logger) 426 return super().check(logger, config) 427 428 def read( 429 self, 430 logger: logging.Logger, 431 config: Mapping[str, Any], 432 catalog: ConfiguredAirbyteCatalog, 433 state: Optional[List[AirbyteStateMessage]] = None, 434 ) -> Iterator[AirbyteMessage]: 435 self._configure_logger_level(logger) 436 yield from super().read(logger, config, catalog, state) 437 438 def _configure_logger_level(self, logger: logging.Logger) -> None: 439 """ 440 Set the log level to logging.DEBUG if debug mode is enabled 441 """ 442 if self._debug: 443 logger.setLevel(logging.DEBUG) 444 445 def _validate_source(self) -> None: 446 """ 447 Validates the connector manifest against the declarative component schema 448 """ 449 450 try: 451 validate(self._source_config, self._declarative_component_schema) 452 except ValidationError as e: 453 raise ValidationError( 454 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 455 ) from e 456 457 cdk_version_str = metadata.version("airbyte_cdk") 458 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 459 manifest_version_str = self._source_config.get("version") 460 if manifest_version_str is None: 461 raise RuntimeError( 462 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 463 ) 464 manifest_version = self._parse_version(manifest_version_str, "manifest") 465 466 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 467 # Skipping version compatibility check on unreleased dev branch 468 pass 469 elif (cdk_version.major, cdk_version.minor) < ( 470 manifest_version.major, 471 manifest_version.minor, 472 ): 473 raise ValidationError( 474 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 475 f"manifest may contain features that are not in the current CDK version." 476 ) 477 elif (manifest_version.major, manifest_version.minor) < (0, 29): 478 raise ValidationError( 479 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 480 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 481 f"{cdk_version!s} which contains these breaking changes." 482 ) 483 484 @staticmethod 485 def _parse_version( 486 version: str, 487 version_type: str, 488 ) -> Version: 489 """Takes a semantic version represented as a string and splits it into a tuple. 490 491 The fourth part (prerelease) is not returned in the tuple. 492 493 Returns: 494 Version: the parsed version object 495 """ 496 try: 497 parsed_version = Version(version) 498 except InvalidVersion as ex: 499 raise ValidationError( 500 f"The {version_type} version '{version}' is not a valid version format." 501 ) from ex 502 else: 503 # No exception 504 return parsed_version 505 506 def _stream_configs( 507 self, manifest: Mapping[str, Any], config: Mapping[str, Any] 508 ) -> List[Dict[str, Any]]: 509 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 510 stream_configs = [] 511 for current_stream_config in manifest.get("streams", []): 512 if ( 513 "type" in current_stream_config 514 and current_stream_config["type"] == "ConditionalStreams" 515 ): 516 interpolated_boolean = InterpolatedBoolean( 517 condition=current_stream_config.get("condition"), 518 parameters={}, 519 ) 520 521 if interpolated_boolean.eval(config=config): 522 stream_configs.extend(current_stream_config.get("streams", [])) 523 else: 524 if "type" not in current_stream_config: 525 current_stream_config["type"] = "DeclarativeStream" 526 stream_configs.append(current_stream_config) 527 return stream_configs 528 529 def _dynamic_stream_configs( 530 self, 531 manifest: Mapping[str, Any], 532 config: Mapping[str, Any], 533 with_dynamic_stream_name: Optional[bool] = None, 534 ) -> List[Dict[str, Any]]: 535 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 536 dynamic_stream_configs: List[Dict[str, Any]] = [] 537 seen_dynamic_streams: Set[str] = set() 538 539 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 540 components_resolver_config = dynamic_definition["components_resolver"] 541 542 if not components_resolver_config: 543 raise ValueError( 544 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 545 ) 546 547 resolver_type = components_resolver_config.get("type") 548 if not resolver_type: 549 raise ValueError( 550 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 551 ) 552 553 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 554 raise ValueError( 555 f"Invalid components resolver type '{resolver_type}'. " 556 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 557 ) 558 559 if "retriever" in components_resolver_config: 560 components_resolver_config["retriever"]["requester"]["use_cache"] = True 561 562 # Create a resolver for dynamic components based on type 563 if resolver_type == "HttpComponentsResolver": 564 components_resolver = self._constructor.create_component( 565 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 566 component_definition=components_resolver_config, 567 config=config, 568 stream_name=dynamic_definition.get("name"), 569 ) 570 else: 571 components_resolver = self._constructor.create_component( 572 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 573 component_definition=components_resolver_config, 574 config=config, 575 ) 576 577 stream_template_config = dynamic_definition["stream_template"] 578 579 for dynamic_stream in components_resolver.resolve_components( 580 stream_template_config=stream_template_config 581 ): 582 # Get the use_parent_parameters configuration from the dynamic definition 583 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 584 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 585 586 dynamic_stream = { 587 **ManifestComponentTransformer().propagate_types_and_parameters( 588 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 589 ) 590 } 591 592 if "type" not in dynamic_stream: 593 dynamic_stream["type"] = "DeclarativeStream" 594 595 # Ensure that each stream is created with a unique name 596 name = dynamic_stream.get("name") 597 598 if with_dynamic_stream_name: 599 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 600 "name", f"dynamic_stream_{dynamic_definition_index}" 601 ) 602 603 if not isinstance(name, str): 604 raise ValueError( 605 f"Expected stream name {name} to be a string, got {type(name)}." 606 ) 607 608 if name in seen_dynamic_streams: 609 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 610 failure_type = FailureType.system_error 611 612 if resolver_type == "ConfigComponentsResolver": 613 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 614 failure_type = FailureType.config_error 615 616 raise AirbyteTracedException( 617 message=error_message, 618 internal_message=error_message, 619 failure_type=failure_type, 620 ) 621 622 seen_dynamic_streams.add(name) 623 dynamic_stream_configs.append(dynamic_stream) 624 625 return dynamic_stream_configs 626 627 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 628 self.logger.debug("declarative source created from manifest", extra=extra_args)
99class ManifestDeclarativeSource(DeclarativeSource): 100 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 101 102 def __init__( 103 self, 104 source_config: ConnectionDefinition, 105 *, 106 config: Mapping[str, Any] | None = None, 107 debug: bool = False, 108 emit_connector_builder_messages: bool = False, 109 component_factory: Optional[ModelToComponentFactory] = None, 110 migrate_manifest: Optional[bool] = False, 111 normalize_manifest: Optional[bool] = False, 112 config_path: Optional[str] = None, 113 ) -> None: 114 """ 115 Args: 116 config: The provided config dict. 117 source_config: The manifest of low-code components that describe the source connector. 118 debug: True if debug mode is enabled. 119 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 120 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 121 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 122 config_path: Optional path to the config file. 123 """ 124 self.logger = logging.getLogger(f"airbyte.{self.name}") 125 self._should_normalize = normalize_manifest 126 self._should_migrate = migrate_manifest 127 self._declarative_component_schema = _get_declarative_component_schema() 128 # If custom components are needed, locate and/or register them. 129 self.components_module: ModuleType | None = get_registered_components_module(config=config) 130 # set additional attributes 131 self._debug = debug 132 self._emit_connector_builder_messages = emit_connector_builder_messages 133 self._constructor = ( 134 component_factory 135 if component_factory 136 else ModelToComponentFactory( 137 emit_connector_builder_messages=emit_connector_builder_messages, 138 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 139 ) 140 ) 141 self._message_repository = self._constructor.get_message_repository() 142 self._slice_logger: SliceLogger = ( 143 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 144 ) 145 146 # resolve all components in the manifest 147 self._source_config = self._pre_process_manifest(dict(source_config)) 148 # validate resolved manifest against the declarative component schema 149 self._validate_source() 150 # apply additional post-processing to the manifest 151 self._post_process_manifest() 152 153 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 154 self._spec_component: Optional[Spec] = ( 155 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 156 ) 157 self._config = self._migrate_and_transform_config(config_path, config) or {} 158 159 @property 160 def resolved_manifest(self) -> Mapping[str, Any]: 161 """ 162 Returns the resolved manifest configuration for the source. 163 164 This property provides access to the internal source configuration as a mapping, 165 which contains all settings and parameters required to define the source's behavior. 166 167 Returns: 168 Mapping[str, Any]: The resolved source configuration manifest. 169 """ 170 return self._source_config 171 172 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 173 """ 174 Preprocesses the provided manifest dictionary by resolving any manifest references. 175 176 This method modifies the input manifest in place, resolving references using the 177 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 178 179 Args: 180 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 181 182 Returns: 183 None 184 """ 185 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 186 manifest = self._fix_source_type(manifest) 187 # Resolve references in the manifest 188 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 189 # Propagate types and parameters throughout the manifest 190 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 191 "", resolved_manifest, {} 192 ) 193 194 return propagated_manifest 195 196 def _post_process_manifest(self) -> None: 197 """ 198 Post-processes the manifest after validation. 199 This method is responsible for any additional modifications or transformations needed 200 after the manifest has been validated and before it is used in the source. 201 """ 202 # apply manifest migration, if required 203 self._migrate_manifest() 204 # apply manifest normalization, if required 205 self._normalize_manifest() 206 207 def _normalize_manifest(self) -> None: 208 """ 209 This method is used to normalize the manifest. It should be called after the manifest has been validated. 210 211 Connector Builder UI rendering requires the manifest to be in a specific format. 212 - references have been resolved 213 - the commonly used definitions are extracted to the `definitions.linked.*` 214 """ 215 if self._should_normalize: 216 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 217 self._source_config = normalizer.normalize() 218 219 def _migrate_and_transform_config( 220 self, 221 config_path: Optional[str], 222 config: Optional[Config], 223 ) -> Optional[Config]: 224 if not config: 225 return None 226 if not self._spec_component: 227 return config 228 mutable_config = dict(config) 229 self._spec_component.migrate_config(mutable_config) 230 if mutable_config != config: 231 if config_path: 232 with open(config_path, "w") as f: 233 json.dump(mutable_config, f) 234 self.message_repository.emit_message( 235 create_connector_config_control_message(mutable_config) 236 ) 237 # We have no mechanism for consuming the queue, so we print the messages to stdout 238 for message in self.message_repository.consume_queue(): 239 print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) 240 self._spec_component.transform_config(mutable_config) 241 return mutable_config 242 243 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 244 config = self._config or config 245 return super().configure(config, temp_dir) 246 247 def _migrate_manifest(self) -> None: 248 """ 249 This method is used to migrate the manifest. It should be called after the manifest has been validated. 250 The migration is done in place, so the original manifest is modified. 251 252 The original manifest is returned if any error occurs during migration. 253 """ 254 if self._should_migrate: 255 manifest_migrator = ManifestMigrationHandler(self._source_config) 256 self._source_config = manifest_migrator.apply_migrations() 257 # validate migrated manifest against the declarative component schema 258 self._validate_source() 259 260 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 261 """ 262 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 263 """ 264 if "type" not in manifest: 265 manifest["type"] = "DeclarativeSource" 266 267 return manifest 268 269 @property 270 def message_repository(self) -> MessageRepository: 271 return self._message_repository 272 273 @property 274 def dynamic_streams(self) -> List[Dict[str, Any]]: 275 return self._dynamic_stream_configs( 276 manifest=self._source_config, 277 config=self._config, 278 with_dynamic_stream_name=True, 279 ) 280 281 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 282 return self._constructor.get_model_deprecations() 283 284 @property 285 def connection_checker(self) -> ConnectionChecker: 286 check = self._source_config["check"] 287 if "type" not in check: 288 check["type"] = "CheckStream" 289 check_stream = self._constructor.create_component( 290 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 291 check, 292 dict(), 293 emit_connector_builder_messages=self._emit_connector_builder_messages, 294 ) 295 if isinstance(check_stream, ConnectionChecker): 296 return check_stream 297 else: 298 raise ValueError( 299 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 300 ) 301 302 def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 303 """ 304 As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). 305 Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to 306 fully decouple this from the AbstractSource. 307 """ 308 if self._spec_component: 309 self._spec_component.validate_config(config) 310 311 self._emit_manifest_debug_message( 312 extra_args={ 313 "source_name": self.name, 314 "parsed_config": json.dumps(self._source_config), 315 } 316 ) 317 318 stream_configs = ( 319 self._stream_configs(self._source_config, config=config) + self.dynamic_streams 320 ) 321 322 api_budget_model = self._source_config.get("api_budget") 323 if api_budget_model: 324 self._constructor.set_api_budget(api_budget_model, config) 325 326 source_streams = [ 327 self._constructor.create_component( 328 ( 329 StateDelegatingStreamModel 330 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 331 else DeclarativeStreamModel 332 ), 333 stream_config, 334 config, 335 emit_connector_builder_messages=self._emit_connector_builder_messages, 336 ) 337 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 338 ] 339 return source_streams 340 341 @staticmethod 342 def _initialize_cache_for_parent_streams( 343 stream_configs: List[Dict[str, Any]], 344 ) -> List[Dict[str, Any]]: 345 """Enable caching for parent streams unless explicitly disabled. 346 347 Caching is enabled by default for parent streams to optimize performance when the same 348 parent data is needed by multiple child streams. However, explicit `use_cache: false` 349 settings are respected for streams that cannot use caching (e.g., scroll-based pagination 350 APIs where caching causes duplicate records). 351 """ 352 parent_streams = set() 353 354 def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None: 355 """Set use_cache to True only if not explicitly disabled.""" 356 if requester.get("use_cache") is not False: 357 requester["use_cache"] = True 358 359 def update_with_cache_parent_configs( 360 parent_configs: list[dict[str, Any]], 361 ) -> None: 362 for parent_config in parent_configs: 363 parent_streams.add(parent_config["stream"]["name"]) 364 if parent_config["stream"]["type"] == "StateDelegatingStream": 365 _set_cache_if_not_disabled( 366 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"] 367 ) 368 _set_cache_if_not_disabled( 369 parent_config["stream"]["incremental_stream"]["retriever"]["requester"] 370 ) 371 else: 372 _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"]) 373 374 for stream_config in stream_configs: 375 if stream_config.get("incremental_sync", {}).get("parent_stream"): 376 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 377 _set_cache_if_not_disabled( 378 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"] 379 ) 380 381 elif stream_config.get("retriever", {}).get("partition_router", {}): 382 partition_router = stream_config["retriever"]["partition_router"] 383 384 if isinstance(partition_router, dict) and partition_router.get( 385 "parent_stream_configs" 386 ): 387 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 388 elif isinstance(partition_router, list): 389 for router in partition_router: 390 if router.get("parent_stream_configs"): 391 update_with_cache_parent_configs(router["parent_stream_configs"]) 392 393 for stream_config in stream_configs: 394 if stream_config["name"] in parent_streams: 395 if stream_config["type"] == "StateDelegatingStream": 396 _set_cache_if_not_disabled( 397 stream_config["full_refresh_stream"]["retriever"]["requester"] 398 ) 399 _set_cache_if_not_disabled( 400 stream_config["incremental_stream"]["retriever"]["requester"] 401 ) 402 else: 403 _set_cache_if_not_disabled(stream_config["retriever"]["requester"]) 404 return stream_configs 405 406 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 407 """ 408 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 409 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 410 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 411 in the project root. 412 """ 413 self._configure_logger_level(logger) 414 self._emit_manifest_debug_message( 415 extra_args={ 416 "source_name": self.name, 417 "parsed_config": json.dumps(self._source_config), 418 } 419 ) 420 421 return ( 422 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 423 ) 424 425 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 426 self._configure_logger_level(logger) 427 return super().check(logger, config) 428 429 def read( 430 self, 431 logger: logging.Logger, 432 config: Mapping[str, Any], 433 catalog: ConfiguredAirbyteCatalog, 434 state: Optional[List[AirbyteStateMessage]] = None, 435 ) -> Iterator[AirbyteMessage]: 436 self._configure_logger_level(logger) 437 yield from super().read(logger, config, catalog, state) 438 439 def _configure_logger_level(self, logger: logging.Logger) -> None: 440 """ 441 Set the log level to logging.DEBUG if debug mode is enabled 442 """ 443 if self._debug: 444 logger.setLevel(logging.DEBUG) 445 446 def _validate_source(self) -> None: 447 """ 448 Validates the connector manifest against the declarative component schema 449 """ 450 451 try: 452 validate(self._source_config, self._declarative_component_schema) 453 except ValidationError as e: 454 raise ValidationError( 455 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 456 ) from e 457 458 cdk_version_str = metadata.version("airbyte_cdk") 459 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 460 manifest_version_str = self._source_config.get("version") 461 if manifest_version_str is None: 462 raise RuntimeError( 463 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 464 ) 465 manifest_version = self._parse_version(manifest_version_str, "manifest") 466 467 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 468 # Skipping version compatibility check on unreleased dev branch 469 pass 470 elif (cdk_version.major, cdk_version.minor) < ( 471 manifest_version.major, 472 manifest_version.minor, 473 ): 474 raise ValidationError( 475 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 476 f"manifest may contain features that are not in the current CDK version." 477 ) 478 elif (manifest_version.major, manifest_version.minor) < (0, 29): 479 raise ValidationError( 480 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 481 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 482 f"{cdk_version!s} which contains these breaking changes." 483 ) 484 485 @staticmethod 486 def _parse_version( 487 version: str, 488 version_type: str, 489 ) -> Version: 490 """Takes a semantic version represented as a string and splits it into a tuple. 491 492 The fourth part (prerelease) is not returned in the tuple. 493 494 Returns: 495 Version: the parsed version object 496 """ 497 try: 498 parsed_version = Version(version) 499 except InvalidVersion as ex: 500 raise ValidationError( 501 f"The {version_type} version '{version}' is not a valid version format." 502 ) from ex 503 else: 504 # No exception 505 return parsed_version 506 507 def _stream_configs( 508 self, manifest: Mapping[str, Any], config: Mapping[str, Any] 509 ) -> List[Dict[str, Any]]: 510 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 511 stream_configs = [] 512 for current_stream_config in manifest.get("streams", []): 513 if ( 514 "type" in current_stream_config 515 and current_stream_config["type"] == "ConditionalStreams" 516 ): 517 interpolated_boolean = InterpolatedBoolean( 518 condition=current_stream_config.get("condition"), 519 parameters={}, 520 ) 521 522 if interpolated_boolean.eval(config=config): 523 stream_configs.extend(current_stream_config.get("streams", [])) 524 else: 525 if "type" not in current_stream_config: 526 current_stream_config["type"] = "DeclarativeStream" 527 stream_configs.append(current_stream_config) 528 return stream_configs 529 530 def _dynamic_stream_configs( 531 self, 532 manifest: Mapping[str, Any], 533 config: Mapping[str, Any], 534 with_dynamic_stream_name: Optional[bool] = None, 535 ) -> List[Dict[str, Any]]: 536 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 537 dynamic_stream_configs: List[Dict[str, Any]] = [] 538 seen_dynamic_streams: Set[str] = set() 539 540 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 541 components_resolver_config = dynamic_definition["components_resolver"] 542 543 if not components_resolver_config: 544 raise ValueError( 545 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 546 ) 547 548 resolver_type = components_resolver_config.get("type") 549 if not resolver_type: 550 raise ValueError( 551 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 552 ) 553 554 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 555 raise ValueError( 556 f"Invalid components resolver type '{resolver_type}'. " 557 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 558 ) 559 560 if "retriever" in components_resolver_config: 561 components_resolver_config["retriever"]["requester"]["use_cache"] = True 562 563 # Create a resolver for dynamic components based on type 564 if resolver_type == "HttpComponentsResolver": 565 components_resolver = self._constructor.create_component( 566 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 567 component_definition=components_resolver_config, 568 config=config, 569 stream_name=dynamic_definition.get("name"), 570 ) 571 else: 572 components_resolver = self._constructor.create_component( 573 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 574 component_definition=components_resolver_config, 575 config=config, 576 ) 577 578 stream_template_config = dynamic_definition["stream_template"] 579 580 for dynamic_stream in components_resolver.resolve_components( 581 stream_template_config=stream_template_config 582 ): 583 # Get the use_parent_parameters configuration from the dynamic definition 584 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 585 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 586 587 dynamic_stream = { 588 **ManifestComponentTransformer().propagate_types_and_parameters( 589 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 590 ) 591 } 592 593 if "type" not in dynamic_stream: 594 dynamic_stream["type"] = "DeclarativeStream" 595 596 # Ensure that each stream is created with a unique name 597 name = dynamic_stream.get("name") 598 599 if with_dynamic_stream_name: 600 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 601 "name", f"dynamic_stream_{dynamic_definition_index}" 602 ) 603 604 if not isinstance(name, str): 605 raise ValueError( 606 f"Expected stream name {name} to be a string, got {type(name)}." 607 ) 608 609 if name in seen_dynamic_streams: 610 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 611 failure_type = FailureType.system_error 612 613 if resolver_type == "ConfigComponentsResolver": 614 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 615 failure_type = FailureType.config_error 616 617 raise AirbyteTracedException( 618 message=error_message, 619 internal_message=error_message, 620 failure_type=failure_type, 621 ) 622 623 seen_dynamic_streams.add(name) 624 dynamic_stream_configs.append(dynamic_stream) 625 626 return dynamic_stream_configs 627 628 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 629 self.logger.debug("declarative source created from manifest", extra=extra_args)
Declarative source defined by a manifest of low-code components that define source connector behavior
102 def __init__( 103 self, 104 source_config: ConnectionDefinition, 105 *, 106 config: Mapping[str, Any] | None = None, 107 debug: bool = False, 108 emit_connector_builder_messages: bool = False, 109 component_factory: Optional[ModelToComponentFactory] = None, 110 migrate_manifest: Optional[bool] = False, 111 normalize_manifest: Optional[bool] = False, 112 config_path: Optional[str] = None, 113 ) -> None: 114 """ 115 Args: 116 config: The provided config dict. 117 source_config: The manifest of low-code components that describe the source connector. 118 debug: True if debug mode is enabled. 119 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 120 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 121 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 122 config_path: Optional path to the config file. 123 """ 124 self.logger = logging.getLogger(f"airbyte.{self.name}") 125 self._should_normalize = normalize_manifest 126 self._should_migrate = migrate_manifest 127 self._declarative_component_schema = _get_declarative_component_schema() 128 # If custom components are needed, locate and/or register them. 129 self.components_module: ModuleType | None = get_registered_components_module(config=config) 130 # set additional attributes 131 self._debug = debug 132 self._emit_connector_builder_messages = emit_connector_builder_messages 133 self._constructor = ( 134 component_factory 135 if component_factory 136 else ModelToComponentFactory( 137 emit_connector_builder_messages=emit_connector_builder_messages, 138 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 139 ) 140 ) 141 self._message_repository = self._constructor.get_message_repository() 142 self._slice_logger: SliceLogger = ( 143 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 144 ) 145 146 # resolve all components in the manifest 147 self._source_config = self._pre_process_manifest(dict(source_config)) 148 # validate resolved manifest against the declarative component schema 149 self._validate_source() 150 # apply additional post-processing to the manifest 151 self._post_process_manifest() 152 153 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 154 self._spec_component: Optional[Spec] = ( 155 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 156 ) 157 self._config = self._migrate_and_transform_config(config_path, config) or {}
Arguments:
- config: The provided config dict.
- source_config: The manifest of low-code components that describe the source connector.
- debug: True if debug mode is enabled.
- emit_connector_builder_messages: True if messages should be emitted to the connector builder.
- component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
- normalize_manifest: Optional flag to indicate if the manifest should be normalized.
- config_path: Optional path to the config file.
159 @property 160 def resolved_manifest(self) -> Mapping[str, Any]: 161 """ 162 Returns the resolved manifest configuration for the source. 163 164 This property provides access to the internal source configuration as a mapping, 165 which contains all settings and parameters required to define the source's behavior. 166 167 Returns: 168 Mapping[str, Any]: The resolved source configuration manifest. 169 """ 170 return self._source_config
Returns the resolved manifest configuration for the source.
This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.
Returns:
Mapping[str, Any]: The resolved source configuration manifest.
281 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 282 return self._constructor.get_model_deprecations()
Returns a list of deprecation warnings for the source.
284 @property 285 def connection_checker(self) -> ConnectionChecker: 286 check = self._source_config["check"] 287 if "type" not in check: 288 check["type"] = "CheckStream" 289 check_stream = self._constructor.create_component( 290 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 291 check, 292 dict(), 293 emit_connector_builder_messages=self._emit_connector_builder_messages, 294 ) 295 if isinstance(check_stream, ConnectionChecker): 296 return check_stream 297 else: 298 raise ValueError( 299 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 300 )
Returns the ConnectionChecker to use for the check operation
302 def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 303 """ 304 As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). 305 Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to 306 fully decouple this from the AbstractSource. 307 """ 308 if self._spec_component: 309 self._spec_component.validate_config(config) 310 311 self._emit_manifest_debug_message( 312 extra_args={ 313 "source_name": self.name, 314 "parsed_config": json.dumps(self._source_config), 315 } 316 ) 317 318 stream_configs = ( 319 self._stream_configs(self._source_config, config=config) + self.dynamic_streams 320 ) 321 322 api_budget_model = self._source_config.get("api_budget") 323 if api_budget_model: 324 self._constructor.set_api_budget(api_budget_model, config) 325 326 source_streams = [ 327 self._constructor.create_component( 328 ( 329 StateDelegatingStreamModel 330 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 331 else DeclarativeStreamModel 332 ), 333 stream_config, 334 config, 335 emit_connector_builder_messages=self._emit_connector_builder_messages, 336 ) 337 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 338 ] 339 return source_streams
As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to fully decouple this from the AbstractSource.
406 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 407 """ 408 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 409 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 410 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 411 in the project root. 412 """ 413 self._configure_logger_level(logger) 414 self._emit_manifest_debug_message( 415 extra_args={ 416 "source_name": self.name, 417 "parsed_config": json.dumps(self._source_config), 418 } 419 ) 420 421 return ( 422 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 423 )
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
425 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 426 self._configure_logger_level(logger) 427 return super().check(logger, config)
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
429 def read( 430 self, 431 logger: logging.Logger, 432 config: Mapping[str, Any], 433 catalog: ConfiguredAirbyteCatalog, 434 state: Optional[List[AirbyteStateMessage]] = None, 435 ) -> Iterator[AirbyteMessage]: 436 self._configure_logger_level(logger) 437 yield from super().read(logger, config, catalog, state)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.