airbyte_cdk.legacy.sources.declarative.manifest_declarative_source
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5import json 6import logging 7import pkgutil 8from copy import deepcopy 9from importlib import metadata 10from types import ModuleType 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union 12 13import orjson 14import yaml 15from jsonschema.exceptions import ValidationError 16from jsonschema.validators import validate 17from packaging.version import InvalidVersion, Version 18 19from airbyte_cdk.config_observation import create_connector_config_control_message 20from airbyte_cdk.connector_builder.models import ( 21 LogMessage as ConnectorBuilderLogMessage, 22) 23from airbyte_cdk.legacy.sources.declarative.declarative_source import DeclarativeSource 24from airbyte_cdk.manifest_migrations.migration_handler import ( 25 ManifestMigrationHandler, 26) 27from airbyte_cdk.models import ( 28 AirbyteConnectionStatus, 29 AirbyteMessage, 30 AirbyteStateMessage, 31 ConfiguredAirbyteCatalog, 32 ConnectorSpecification, 33 FailureType, 34) 35from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer 36from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 37from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 38from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean 39from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 40 ConditionalStreams as ConditionalStreamsModel, 41) 42from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 43 DeclarativeStream as DeclarativeStreamModel, 44) 45from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 46 Spec as SpecModel, 47) 48from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 49 StateDelegatingStream as StateDelegatingStreamModel, 50) 51from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 52 get_registered_components_module, 53) 54from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 55 ManifestComponentTransformer, 56) 57from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( 58 ManifestNormalizer, 59) 60from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 61 ManifestReferenceResolver, 62) 63from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 64 ModelToComponentFactory, 65) 66from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 67from airbyte_cdk.sources.declarative.spec.spec import Spec 68from airbyte_cdk.sources.message import MessageRepository 69from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 70from airbyte_cdk.sources.streams.core import Stream 71from airbyte_cdk.sources.types import Config, ConnectionDefinition 72from airbyte_cdk.sources.utils.slice_logger import ( 73 AlwaysLogSliceLogger, 74 DebugSliceLogger, 75 SliceLogger, 76) 77from airbyte_cdk.utils.traced_exception import AirbyteTracedException 78 79 80def _get_declarative_component_schema() -> Dict[str, Any]: 81 try: 82 raw_component_schema = pkgutil.get_data( 83 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 84 ) 85 if raw_component_schema is not None: 86 declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) 87 return declarative_component_schema # type: ignore 88 else: 89 raise RuntimeError( 90 "Failed to read manifest component json schema required for deduplication" 91 ) 92 except FileNotFoundError as e: 93 raise FileNotFoundError( 94 f"Failed to read manifest component json schema required for deduplication: {e}" 95 ) 96 97 98class ManifestDeclarativeSource(DeclarativeSource): 99 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 100 101 def __init__( 102 self, 103 source_config: ConnectionDefinition, 104 *, 105 config: Mapping[str, Any] | None = None, 106 debug: bool = False, 107 emit_connector_builder_messages: bool = False, 108 component_factory: Optional[ModelToComponentFactory] = None, 109 migrate_manifest: Optional[bool] = False, 110 normalize_manifest: Optional[bool] = False, 111 config_path: Optional[str] = None, 112 ) -> None: 113 """ 114 Args: 115 config: The provided config dict. 116 source_config: The manifest of low-code components that describe the source connector. 117 debug: True if debug mode is enabled. 118 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 119 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 120 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 121 config_path: Optional path to the config file. 122 """ 123 self.logger = logging.getLogger(f"airbyte.{self.name}") 124 self._should_normalize = normalize_manifest 125 self._should_migrate = migrate_manifest 126 self._declarative_component_schema = _get_declarative_component_schema() 127 # If custom components are needed, locate and/or register them. 128 self.components_module: ModuleType | None = get_registered_components_module(config=config) 129 # set additional attributes 130 self._debug = debug 131 self._emit_connector_builder_messages = emit_connector_builder_messages 132 self._constructor = ( 133 component_factory 134 if component_factory 135 else ModelToComponentFactory( 136 emit_connector_builder_messages=emit_connector_builder_messages, 137 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 138 ) 139 ) 140 self._message_repository = self._constructor.get_message_repository() 141 self._slice_logger: SliceLogger = ( 142 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 143 ) 144 145 # resolve all components in the manifest 146 self._source_config = self._pre_process_manifest(dict(source_config)) 147 # validate resolved manifest against the declarative component schema 148 self._validate_source() 149 # apply additional post-processing to the manifest 150 self._post_process_manifest() 151 152 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 153 self._spec_component: Optional[Spec] = ( 154 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 155 ) 156 self._config = self._migrate_and_transform_config(config_path, config) or {} 157 158 @property 159 def resolved_manifest(self) -> Mapping[str, Any]: 160 """ 161 Returns the resolved manifest configuration for the source. 162 163 This property provides access to the internal source configuration as a mapping, 164 which contains all settings and parameters required to define the source's behavior. 165 166 Returns: 167 Mapping[str, Any]: The resolved source configuration manifest. 168 """ 169 return self._source_config 170 171 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 172 """ 173 Preprocesses the provided manifest dictionary by resolving any manifest references. 174 175 This method modifies the input manifest in place, resolving references using the 176 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 177 178 Args: 179 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 180 181 Returns: 182 None 183 """ 184 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 185 manifest = self._fix_source_type(manifest) 186 # Resolve references in the manifest 187 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 188 # Propagate types and parameters throughout the manifest 189 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 190 "", resolved_manifest, {} 191 ) 192 193 return propagated_manifest 194 195 def _post_process_manifest(self) -> None: 196 """ 197 Post-processes the manifest after validation. 198 This method is responsible for any additional modifications or transformations needed 199 after the manifest has been validated and before it is used in the source. 200 """ 201 # apply manifest migration, if required 202 self._migrate_manifest() 203 # apply manifest normalization, if required 204 self._normalize_manifest() 205 206 def _normalize_manifest(self) -> None: 207 """ 208 This method is used to normalize the manifest. It should be called after the manifest has been validated. 209 210 Connector Builder UI rendering requires the manifest to be in a specific format. 211 - references have been resolved 212 - the commonly used definitions are extracted to the `definitions.linked.*` 213 """ 214 if self._should_normalize: 215 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 216 self._source_config = normalizer.normalize() 217 218 def _migrate_and_transform_config( 219 self, 220 config_path: Optional[str], 221 config: Optional[Config], 222 ) -> Optional[Config]: 223 if not config: 224 return None 225 if not self._spec_component: 226 return config 227 mutable_config = dict(config) 228 self._spec_component.migrate_config(mutable_config) 229 if mutable_config != config: 230 if config_path: 231 with open(config_path, "w") as f: 232 json.dump(mutable_config, f) 233 self.message_repository.emit_message( 234 create_connector_config_control_message(mutable_config) 235 ) 236 # We have no mechanism for consuming the queue, so we print the messages to stdout 237 for message in self.message_repository.consume_queue(): 238 print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) 239 self._spec_component.transform_config(mutable_config) 240 return mutable_config 241 242 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 243 config = self._config or config 244 return super().configure(config, temp_dir) 245 246 def _migrate_manifest(self) -> None: 247 """ 248 This method is used to migrate the manifest. It should be called after the manifest has been validated. 249 The migration is done in place, so the original manifest is modified. 250 251 The original manifest is returned if any error occurs during migration. 252 """ 253 if self._should_migrate: 254 manifest_migrator = ManifestMigrationHandler(self._source_config) 255 self._source_config = manifest_migrator.apply_migrations() 256 # validate migrated manifest against the declarative component schema 257 self._validate_source() 258 259 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 260 """ 261 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 262 """ 263 if "type" not in manifest: 264 manifest["type"] = "DeclarativeSource" 265 266 return manifest 267 268 @property 269 def message_repository(self) -> MessageRepository: 270 return self._message_repository 271 272 @property 273 def dynamic_streams(self) -> List[Dict[str, Any]]: 274 return self._dynamic_stream_configs( 275 manifest=self._source_config, 276 config=self._config, 277 with_dynamic_stream_name=True, 278 ) 279 280 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 281 return self._constructor.get_model_deprecations() 282 283 @property 284 def connection_checker(self) -> ConnectionChecker: 285 check = self._source_config["check"] 286 if "type" not in check: 287 check["type"] = "CheckStream" 288 check_stream = self._constructor.create_component( 289 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 290 check, 291 dict(), 292 emit_connector_builder_messages=self._emit_connector_builder_messages, 293 ) 294 if isinstance(check_stream, ConnectionChecker): 295 return check_stream 296 else: 297 raise ValueError( 298 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 299 ) 300 301 def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 302 """ 303 As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). 304 Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to 305 fully decouple this from the AbstractSource. 306 """ 307 if self._spec_component: 308 self._spec_component.validate_config(config) 309 310 self._emit_manifest_debug_message( 311 extra_args={ 312 "source_name": self.name, 313 "parsed_config": json.dumps(self._source_config), 314 } 315 ) 316 317 stream_configs = ( 318 self._stream_configs(self._source_config, config=config) + self.dynamic_streams 319 ) 320 321 api_budget_model = self._source_config.get("api_budget") 322 if api_budget_model: 323 self._constructor.set_api_budget(api_budget_model, config) 324 325 source_streams = [ 326 self._constructor.create_component( 327 ( 328 StateDelegatingStreamModel 329 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 330 else DeclarativeStreamModel 331 ), 332 stream_config, 333 config, 334 emit_connector_builder_messages=self._emit_connector_builder_messages, 335 ) 336 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 337 ] 338 return source_streams 339 340 @staticmethod 341 def _initialize_cache_for_parent_streams( 342 stream_configs: List[Dict[str, Any]], 343 ) -> List[Dict[str, Any]]: 344 parent_streams = set() 345 346 def update_with_cache_parent_configs( 347 parent_configs: list[dict[str, Any]], 348 ) -> None: 349 for parent_config in parent_configs: 350 parent_streams.add(parent_config["stream"]["name"]) 351 if parent_config["stream"]["type"] == "StateDelegatingStream": 352 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 353 "use_cache" 354 ] = True 355 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 356 "use_cache" 357 ] = True 358 else: 359 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 360 361 for stream_config in stream_configs: 362 if stream_config.get("incremental_sync", {}).get("parent_stream"): 363 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 364 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 365 "use_cache" 366 ] = True 367 368 elif stream_config.get("retriever", {}).get("partition_router", {}): 369 partition_router = stream_config["retriever"]["partition_router"] 370 371 if isinstance(partition_router, dict) and partition_router.get( 372 "parent_stream_configs" 373 ): 374 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 375 elif isinstance(partition_router, list): 376 for router in partition_router: 377 if router.get("parent_stream_configs"): 378 update_with_cache_parent_configs(router["parent_stream_configs"]) 379 380 for stream_config in stream_configs: 381 if stream_config["name"] in parent_streams: 382 if stream_config["type"] == "StateDelegatingStream": 383 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 384 True 385 ) 386 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 387 True 388 ) 389 else: 390 stream_config["retriever"]["requester"]["use_cache"] = True 391 return stream_configs 392 393 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 394 """ 395 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 396 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 397 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 398 in the project root. 399 """ 400 self._configure_logger_level(logger) 401 self._emit_manifest_debug_message( 402 extra_args={ 403 "source_name": self.name, 404 "parsed_config": json.dumps(self._source_config), 405 } 406 ) 407 408 return ( 409 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 410 ) 411 412 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 413 self._configure_logger_level(logger) 414 return super().check(logger, config) 415 416 def read( 417 self, 418 logger: logging.Logger, 419 config: Mapping[str, Any], 420 catalog: ConfiguredAirbyteCatalog, 421 state: Optional[List[AirbyteStateMessage]] = None, 422 ) -> Iterator[AirbyteMessage]: 423 self._configure_logger_level(logger) 424 yield from super().read(logger, config, catalog, state) 425 426 def _configure_logger_level(self, logger: logging.Logger) -> None: 427 """ 428 Set the log level to logging.DEBUG if debug mode is enabled 429 """ 430 if self._debug: 431 logger.setLevel(logging.DEBUG) 432 433 def _validate_source(self) -> None: 434 """ 435 Validates the connector manifest against the declarative component schema 436 """ 437 438 try: 439 validate(self._source_config, self._declarative_component_schema) 440 except ValidationError as e: 441 raise ValidationError( 442 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 443 ) from e 444 445 cdk_version_str = metadata.version("airbyte_cdk") 446 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 447 manifest_version_str = self._source_config.get("version") 448 if manifest_version_str is None: 449 raise RuntimeError( 450 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 451 ) 452 manifest_version = self._parse_version(manifest_version_str, "manifest") 453 454 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 455 # Skipping version compatibility check on unreleased dev branch 456 pass 457 elif (cdk_version.major, cdk_version.minor) < ( 458 manifest_version.major, 459 manifest_version.minor, 460 ): 461 raise ValidationError( 462 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 463 f"manifest may contain features that are not in the current CDK version." 464 ) 465 elif (manifest_version.major, manifest_version.minor) < (0, 29): 466 raise ValidationError( 467 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 468 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 469 f"{cdk_version!s} which contains these breaking changes." 470 ) 471 472 @staticmethod 473 def _parse_version( 474 version: str, 475 version_type: str, 476 ) -> Version: 477 """Takes a semantic version represented as a string and splits it into a tuple. 478 479 The fourth part (prerelease) is not returned in the tuple. 480 481 Returns: 482 Version: the parsed version object 483 """ 484 try: 485 parsed_version = Version(version) 486 except InvalidVersion as ex: 487 raise ValidationError( 488 f"The {version_type} version '{version}' is not a valid version format." 489 ) from ex 490 else: 491 # No exception 492 return parsed_version 493 494 def _stream_configs( 495 self, manifest: Mapping[str, Any], config: Mapping[str, Any] 496 ) -> List[Dict[str, Any]]: 497 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 498 stream_configs = [] 499 for current_stream_config in manifest.get("streams", []): 500 if ( 501 "type" in current_stream_config 502 and current_stream_config["type"] == "ConditionalStreams" 503 ): 504 interpolated_boolean = InterpolatedBoolean( 505 condition=current_stream_config.get("condition"), 506 parameters={}, 507 ) 508 509 if interpolated_boolean.eval(config=config): 510 stream_configs.extend(current_stream_config.get("streams", [])) 511 else: 512 if "type" not in current_stream_config: 513 current_stream_config["type"] = "DeclarativeStream" 514 stream_configs.append(current_stream_config) 515 return stream_configs 516 517 def _dynamic_stream_configs( 518 self, 519 manifest: Mapping[str, Any], 520 config: Mapping[str, Any], 521 with_dynamic_stream_name: Optional[bool] = None, 522 ) -> List[Dict[str, Any]]: 523 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 524 dynamic_stream_configs: List[Dict[str, Any]] = [] 525 seen_dynamic_streams: Set[str] = set() 526 527 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 528 components_resolver_config = dynamic_definition["components_resolver"] 529 530 if not components_resolver_config: 531 raise ValueError( 532 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 533 ) 534 535 resolver_type = components_resolver_config.get("type") 536 if not resolver_type: 537 raise ValueError( 538 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 539 ) 540 541 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 542 raise ValueError( 543 f"Invalid components resolver type '{resolver_type}'. " 544 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 545 ) 546 547 if "retriever" in components_resolver_config: 548 components_resolver_config["retriever"]["requester"]["use_cache"] = True 549 550 # Create a resolver for dynamic components based on type 551 if resolver_type == "HttpComponentsResolver": 552 components_resolver = self._constructor.create_component( 553 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 554 component_definition=components_resolver_config, 555 config=config, 556 stream_name=dynamic_definition.get("name"), 557 ) 558 else: 559 components_resolver = self._constructor.create_component( 560 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 561 component_definition=components_resolver_config, 562 config=config, 563 ) 564 565 stream_template_config = dynamic_definition["stream_template"] 566 567 for dynamic_stream in components_resolver.resolve_components( 568 stream_template_config=stream_template_config 569 ): 570 # Get the use_parent_parameters configuration from the dynamic definition 571 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 572 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 573 574 dynamic_stream = { 575 **ManifestComponentTransformer().propagate_types_and_parameters( 576 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 577 ) 578 } 579 580 if "type" not in dynamic_stream: 581 dynamic_stream["type"] = "DeclarativeStream" 582 583 # Ensure that each stream is created with a unique name 584 name = dynamic_stream.get("name") 585 586 if with_dynamic_stream_name: 587 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 588 "name", f"dynamic_stream_{dynamic_definition_index}" 589 ) 590 591 if not isinstance(name, str): 592 raise ValueError( 593 f"Expected stream name {name} to be a string, got {type(name)}." 594 ) 595 596 if name in seen_dynamic_streams: 597 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 598 failure_type = FailureType.system_error 599 600 if resolver_type == "ConfigComponentsResolver": 601 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 602 failure_type = FailureType.config_error 603 604 raise AirbyteTracedException( 605 message=error_message, 606 internal_message=error_message, 607 failure_type=failure_type, 608 ) 609 610 seen_dynamic_streams.add(name) 611 dynamic_stream_configs.append(dynamic_stream) 612 613 return dynamic_stream_configs 614 615 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 616 self.logger.debug("declarative source created from manifest", extra=extra_args)
99class ManifestDeclarativeSource(DeclarativeSource): 100 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 101 102 def __init__( 103 self, 104 source_config: ConnectionDefinition, 105 *, 106 config: Mapping[str, Any] | None = None, 107 debug: bool = False, 108 emit_connector_builder_messages: bool = False, 109 component_factory: Optional[ModelToComponentFactory] = None, 110 migrate_manifest: Optional[bool] = False, 111 normalize_manifest: Optional[bool] = False, 112 config_path: Optional[str] = None, 113 ) -> None: 114 """ 115 Args: 116 config: The provided config dict. 117 source_config: The manifest of low-code components that describe the source connector. 118 debug: True if debug mode is enabled. 119 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 120 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 121 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 122 config_path: Optional path to the config file. 123 """ 124 self.logger = logging.getLogger(f"airbyte.{self.name}") 125 self._should_normalize = normalize_manifest 126 self._should_migrate = migrate_manifest 127 self._declarative_component_schema = _get_declarative_component_schema() 128 # If custom components are needed, locate and/or register them. 129 self.components_module: ModuleType | None = get_registered_components_module(config=config) 130 # set additional attributes 131 self._debug = debug 132 self._emit_connector_builder_messages = emit_connector_builder_messages 133 self._constructor = ( 134 component_factory 135 if component_factory 136 else ModelToComponentFactory( 137 emit_connector_builder_messages=emit_connector_builder_messages, 138 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 139 ) 140 ) 141 self._message_repository = self._constructor.get_message_repository() 142 self._slice_logger: SliceLogger = ( 143 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 144 ) 145 146 # resolve all components in the manifest 147 self._source_config = self._pre_process_manifest(dict(source_config)) 148 # validate resolved manifest against the declarative component schema 149 self._validate_source() 150 # apply additional post-processing to the manifest 151 self._post_process_manifest() 152 153 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 154 self._spec_component: Optional[Spec] = ( 155 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 156 ) 157 self._config = self._migrate_and_transform_config(config_path, config) or {} 158 159 @property 160 def resolved_manifest(self) -> Mapping[str, Any]: 161 """ 162 Returns the resolved manifest configuration for the source. 163 164 This property provides access to the internal source configuration as a mapping, 165 which contains all settings and parameters required to define the source's behavior. 166 167 Returns: 168 Mapping[str, Any]: The resolved source configuration manifest. 169 """ 170 return self._source_config 171 172 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 173 """ 174 Preprocesses the provided manifest dictionary by resolving any manifest references. 175 176 This method modifies the input manifest in place, resolving references using the 177 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 178 179 Args: 180 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 181 182 Returns: 183 None 184 """ 185 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 186 manifest = self._fix_source_type(manifest) 187 # Resolve references in the manifest 188 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 189 # Propagate types and parameters throughout the manifest 190 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 191 "", resolved_manifest, {} 192 ) 193 194 return propagated_manifest 195 196 def _post_process_manifest(self) -> None: 197 """ 198 Post-processes the manifest after validation. 199 This method is responsible for any additional modifications or transformations needed 200 after the manifest has been validated and before it is used in the source. 201 """ 202 # apply manifest migration, if required 203 self._migrate_manifest() 204 # apply manifest normalization, if required 205 self._normalize_manifest() 206 207 def _normalize_manifest(self) -> None: 208 """ 209 This method is used to normalize the manifest. It should be called after the manifest has been validated. 210 211 Connector Builder UI rendering requires the manifest to be in a specific format. 212 - references have been resolved 213 - the commonly used definitions are extracted to the `definitions.linked.*` 214 """ 215 if self._should_normalize: 216 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 217 self._source_config = normalizer.normalize() 218 219 def _migrate_and_transform_config( 220 self, 221 config_path: Optional[str], 222 config: Optional[Config], 223 ) -> Optional[Config]: 224 if not config: 225 return None 226 if not self._spec_component: 227 return config 228 mutable_config = dict(config) 229 self._spec_component.migrate_config(mutable_config) 230 if mutable_config != config: 231 if config_path: 232 with open(config_path, "w") as f: 233 json.dump(mutable_config, f) 234 self.message_repository.emit_message( 235 create_connector_config_control_message(mutable_config) 236 ) 237 # We have no mechanism for consuming the queue, so we print the messages to stdout 238 for message in self.message_repository.consume_queue(): 239 print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode()) 240 self._spec_component.transform_config(mutable_config) 241 return mutable_config 242 243 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 244 config = self._config or config 245 return super().configure(config, temp_dir) 246 247 def _migrate_manifest(self) -> None: 248 """ 249 This method is used to migrate the manifest. It should be called after the manifest has been validated. 250 The migration is done in place, so the original manifest is modified. 251 252 The original manifest is returned if any error occurs during migration. 253 """ 254 if self._should_migrate: 255 manifest_migrator = ManifestMigrationHandler(self._source_config) 256 self._source_config = manifest_migrator.apply_migrations() 257 # validate migrated manifest against the declarative component schema 258 self._validate_source() 259 260 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 261 """ 262 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 263 """ 264 if "type" not in manifest: 265 manifest["type"] = "DeclarativeSource" 266 267 return manifest 268 269 @property 270 def message_repository(self) -> MessageRepository: 271 return self._message_repository 272 273 @property 274 def dynamic_streams(self) -> List[Dict[str, Any]]: 275 return self._dynamic_stream_configs( 276 manifest=self._source_config, 277 config=self._config, 278 with_dynamic_stream_name=True, 279 ) 280 281 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 282 return self._constructor.get_model_deprecations() 283 284 @property 285 def connection_checker(self) -> ConnectionChecker: 286 check = self._source_config["check"] 287 if "type" not in check: 288 check["type"] = "CheckStream" 289 check_stream = self._constructor.create_component( 290 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 291 check, 292 dict(), 293 emit_connector_builder_messages=self._emit_connector_builder_messages, 294 ) 295 if isinstance(check_stream, ConnectionChecker): 296 return check_stream 297 else: 298 raise ValueError( 299 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 300 ) 301 302 def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 303 """ 304 As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). 305 Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to 306 fully decouple this from the AbstractSource. 307 """ 308 if self._spec_component: 309 self._spec_component.validate_config(config) 310 311 self._emit_manifest_debug_message( 312 extra_args={ 313 "source_name": self.name, 314 "parsed_config": json.dumps(self._source_config), 315 } 316 ) 317 318 stream_configs = ( 319 self._stream_configs(self._source_config, config=config) + self.dynamic_streams 320 ) 321 322 api_budget_model = self._source_config.get("api_budget") 323 if api_budget_model: 324 self._constructor.set_api_budget(api_budget_model, config) 325 326 source_streams = [ 327 self._constructor.create_component( 328 ( 329 StateDelegatingStreamModel 330 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 331 else DeclarativeStreamModel 332 ), 333 stream_config, 334 config, 335 emit_connector_builder_messages=self._emit_connector_builder_messages, 336 ) 337 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 338 ] 339 return source_streams 340 341 @staticmethod 342 def _initialize_cache_for_parent_streams( 343 stream_configs: List[Dict[str, Any]], 344 ) -> List[Dict[str, Any]]: 345 parent_streams = set() 346 347 def update_with_cache_parent_configs( 348 parent_configs: list[dict[str, Any]], 349 ) -> None: 350 for parent_config in parent_configs: 351 parent_streams.add(parent_config["stream"]["name"]) 352 if parent_config["stream"]["type"] == "StateDelegatingStream": 353 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 354 "use_cache" 355 ] = True 356 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 357 "use_cache" 358 ] = True 359 else: 360 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 361 362 for stream_config in stream_configs: 363 if stream_config.get("incremental_sync", {}).get("parent_stream"): 364 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 365 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 366 "use_cache" 367 ] = True 368 369 elif stream_config.get("retriever", {}).get("partition_router", {}): 370 partition_router = stream_config["retriever"]["partition_router"] 371 372 if isinstance(partition_router, dict) and partition_router.get( 373 "parent_stream_configs" 374 ): 375 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 376 elif isinstance(partition_router, list): 377 for router in partition_router: 378 if router.get("parent_stream_configs"): 379 update_with_cache_parent_configs(router["parent_stream_configs"]) 380 381 for stream_config in stream_configs: 382 if stream_config["name"] in parent_streams: 383 if stream_config["type"] == "StateDelegatingStream": 384 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 385 True 386 ) 387 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 388 True 389 ) 390 else: 391 stream_config["retriever"]["requester"]["use_cache"] = True 392 return stream_configs 393 394 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 395 """ 396 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 397 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 398 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 399 in the project root. 400 """ 401 self._configure_logger_level(logger) 402 self._emit_manifest_debug_message( 403 extra_args={ 404 "source_name": self.name, 405 "parsed_config": json.dumps(self._source_config), 406 } 407 ) 408 409 return ( 410 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 411 ) 412 413 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 414 self._configure_logger_level(logger) 415 return super().check(logger, config) 416 417 def read( 418 self, 419 logger: logging.Logger, 420 config: Mapping[str, Any], 421 catalog: ConfiguredAirbyteCatalog, 422 state: Optional[List[AirbyteStateMessage]] = None, 423 ) -> Iterator[AirbyteMessage]: 424 self._configure_logger_level(logger) 425 yield from super().read(logger, config, catalog, state) 426 427 def _configure_logger_level(self, logger: logging.Logger) -> None: 428 """ 429 Set the log level to logging.DEBUG if debug mode is enabled 430 """ 431 if self._debug: 432 logger.setLevel(logging.DEBUG) 433 434 def _validate_source(self) -> None: 435 """ 436 Validates the connector manifest against the declarative component schema 437 """ 438 439 try: 440 validate(self._source_config, self._declarative_component_schema) 441 except ValidationError as e: 442 raise ValidationError( 443 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 444 ) from e 445 446 cdk_version_str = metadata.version("airbyte_cdk") 447 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 448 manifest_version_str = self._source_config.get("version") 449 if manifest_version_str is None: 450 raise RuntimeError( 451 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 452 ) 453 manifest_version = self._parse_version(manifest_version_str, "manifest") 454 455 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 456 # Skipping version compatibility check on unreleased dev branch 457 pass 458 elif (cdk_version.major, cdk_version.minor) < ( 459 manifest_version.major, 460 manifest_version.minor, 461 ): 462 raise ValidationError( 463 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 464 f"manifest may contain features that are not in the current CDK version." 465 ) 466 elif (manifest_version.major, manifest_version.minor) < (0, 29): 467 raise ValidationError( 468 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 469 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 470 f"{cdk_version!s} which contains these breaking changes." 471 ) 472 473 @staticmethod 474 def _parse_version( 475 version: str, 476 version_type: str, 477 ) -> Version: 478 """Takes a semantic version represented as a string and splits it into a tuple. 479 480 The fourth part (prerelease) is not returned in the tuple. 481 482 Returns: 483 Version: the parsed version object 484 """ 485 try: 486 parsed_version = Version(version) 487 except InvalidVersion as ex: 488 raise ValidationError( 489 f"The {version_type} version '{version}' is not a valid version format." 490 ) from ex 491 else: 492 # No exception 493 return parsed_version 494 495 def _stream_configs( 496 self, manifest: Mapping[str, Any], config: Mapping[str, Any] 497 ) -> List[Dict[str, Any]]: 498 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 499 stream_configs = [] 500 for current_stream_config in manifest.get("streams", []): 501 if ( 502 "type" in current_stream_config 503 and current_stream_config["type"] == "ConditionalStreams" 504 ): 505 interpolated_boolean = InterpolatedBoolean( 506 condition=current_stream_config.get("condition"), 507 parameters={}, 508 ) 509 510 if interpolated_boolean.eval(config=config): 511 stream_configs.extend(current_stream_config.get("streams", [])) 512 else: 513 if "type" not in current_stream_config: 514 current_stream_config["type"] = "DeclarativeStream" 515 stream_configs.append(current_stream_config) 516 return stream_configs 517 518 def _dynamic_stream_configs( 519 self, 520 manifest: Mapping[str, Any], 521 config: Mapping[str, Any], 522 with_dynamic_stream_name: Optional[bool] = None, 523 ) -> List[Dict[str, Any]]: 524 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 525 dynamic_stream_configs: List[Dict[str, Any]] = [] 526 seen_dynamic_streams: Set[str] = set() 527 528 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 529 components_resolver_config = dynamic_definition["components_resolver"] 530 531 if not components_resolver_config: 532 raise ValueError( 533 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 534 ) 535 536 resolver_type = components_resolver_config.get("type") 537 if not resolver_type: 538 raise ValueError( 539 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 540 ) 541 542 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 543 raise ValueError( 544 f"Invalid components resolver type '{resolver_type}'. " 545 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 546 ) 547 548 if "retriever" in components_resolver_config: 549 components_resolver_config["retriever"]["requester"]["use_cache"] = True 550 551 # Create a resolver for dynamic components based on type 552 if resolver_type == "HttpComponentsResolver": 553 components_resolver = self._constructor.create_component( 554 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 555 component_definition=components_resolver_config, 556 config=config, 557 stream_name=dynamic_definition.get("name"), 558 ) 559 else: 560 components_resolver = self._constructor.create_component( 561 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 562 component_definition=components_resolver_config, 563 config=config, 564 ) 565 566 stream_template_config = dynamic_definition["stream_template"] 567 568 for dynamic_stream in components_resolver.resolve_components( 569 stream_template_config=stream_template_config 570 ): 571 # Get the use_parent_parameters configuration from the dynamic definition 572 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 573 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 574 575 dynamic_stream = { 576 **ManifestComponentTransformer().propagate_types_and_parameters( 577 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 578 ) 579 } 580 581 if "type" not in dynamic_stream: 582 dynamic_stream["type"] = "DeclarativeStream" 583 584 # Ensure that each stream is created with a unique name 585 name = dynamic_stream.get("name") 586 587 if with_dynamic_stream_name: 588 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 589 "name", f"dynamic_stream_{dynamic_definition_index}" 590 ) 591 592 if not isinstance(name, str): 593 raise ValueError( 594 f"Expected stream name {name} to be a string, got {type(name)}." 595 ) 596 597 if name in seen_dynamic_streams: 598 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 599 failure_type = FailureType.system_error 600 601 if resolver_type == "ConfigComponentsResolver": 602 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 603 failure_type = FailureType.config_error 604 605 raise AirbyteTracedException( 606 message=error_message, 607 internal_message=error_message, 608 failure_type=failure_type, 609 ) 610 611 seen_dynamic_streams.add(name) 612 dynamic_stream_configs.append(dynamic_stream) 613 614 return dynamic_stream_configs 615 616 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 617 self.logger.debug("declarative source created from manifest", extra=extra_args)
Declarative source defined by a manifest of low-code components that define source connector behavior
102 def __init__( 103 self, 104 source_config: ConnectionDefinition, 105 *, 106 config: Mapping[str, Any] | None = None, 107 debug: bool = False, 108 emit_connector_builder_messages: bool = False, 109 component_factory: Optional[ModelToComponentFactory] = None, 110 migrate_manifest: Optional[bool] = False, 111 normalize_manifest: Optional[bool] = False, 112 config_path: Optional[str] = None, 113 ) -> None: 114 """ 115 Args: 116 config: The provided config dict. 117 source_config: The manifest of low-code components that describe the source connector. 118 debug: True if debug mode is enabled. 119 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 120 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 121 normalize_manifest: Optional flag to indicate if the manifest should be normalized. 122 config_path: Optional path to the config file. 123 """ 124 self.logger = logging.getLogger(f"airbyte.{self.name}") 125 self._should_normalize = normalize_manifest 126 self._should_migrate = migrate_manifest 127 self._declarative_component_schema = _get_declarative_component_schema() 128 # If custom components are needed, locate and/or register them. 129 self.components_module: ModuleType | None = get_registered_components_module(config=config) 130 # set additional attributes 131 self._debug = debug 132 self._emit_connector_builder_messages = emit_connector_builder_messages 133 self._constructor = ( 134 component_factory 135 if component_factory 136 else ModelToComponentFactory( 137 emit_connector_builder_messages=emit_connector_builder_messages, 138 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 139 ) 140 ) 141 self._message_repository = self._constructor.get_message_repository() 142 self._slice_logger: SliceLogger = ( 143 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 144 ) 145 146 # resolve all components in the manifest 147 self._source_config = self._pre_process_manifest(dict(source_config)) 148 # validate resolved manifest against the declarative component schema 149 self._validate_source() 150 # apply additional post-processing to the manifest 151 self._post_process_manifest() 152 153 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 154 self._spec_component: Optional[Spec] = ( 155 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 156 ) 157 self._config = self._migrate_and_transform_config(config_path, config) or {}
Arguments:
- config: The provided config dict.
- source_config: The manifest of low-code components that describe the source connector.
- debug: True if debug mode is enabled.
- emit_connector_builder_messages: True if messages should be emitted to the connector builder.
- component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
- normalize_manifest: Optional flag to indicate if the manifest should be normalized.
- config_path: Optional path to the config file.
159 @property 160 def resolved_manifest(self) -> Mapping[str, Any]: 161 """ 162 Returns the resolved manifest configuration for the source. 163 164 This property provides access to the internal source configuration as a mapping, 165 which contains all settings and parameters required to define the source's behavior. 166 167 Returns: 168 Mapping[str, Any]: The resolved source configuration manifest. 169 """ 170 return self._source_config
Returns the resolved manifest configuration for the source.
This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.
Returns:
Mapping[str, Any]: The resolved source configuration manifest.
281 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 282 return self._constructor.get_model_deprecations()
Returns a list of deprecation warnings for the source.
284 @property 285 def connection_checker(self) -> ConnectionChecker: 286 check = self._source_config["check"] 287 if "type" not in check: 288 check["type"] = "CheckStream" 289 check_stream = self._constructor.create_component( 290 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 291 check, 292 dict(), 293 emit_connector_builder_messages=self._emit_connector_builder_messages, 294 ) 295 if isinstance(check_stream, ConnectionChecker): 296 return check_stream 297 else: 298 raise ValueError( 299 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 300 )
Returns the ConnectionChecker to use for the check
operation
302 def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 303 """ 304 As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). 305 Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to 306 fully decouple this from the AbstractSource. 307 """ 308 if self._spec_component: 309 self._spec_component.validate_config(config) 310 311 self._emit_manifest_debug_message( 312 extra_args={ 313 "source_name": self.name, 314 "parsed_config": json.dumps(self._source_config), 315 } 316 ) 317 318 stream_configs = ( 319 self._stream_configs(self._source_config, config=config) + self.dynamic_streams 320 ) 321 322 api_budget_model = self._source_config.get("api_budget") 323 if api_budget_model: 324 self._constructor.set_api_budget(api_budget_model, config) 325 326 source_streams = [ 327 self._constructor.create_component( 328 ( 329 StateDelegatingStreamModel 330 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 331 else DeclarativeStreamModel 332 ), 333 stream_config, 334 config, 335 emit_connector_builder_messages=self._emit_connector_builder_messages, 336 ) 337 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 338 ] 339 return source_streams
As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to fully decouple this from the AbstractSource.
394 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 395 """ 396 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 397 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 398 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 399 in the project root. 400 """ 401 self._configure_logger_level(logger) 402 self._emit_manifest_debug_message( 403 extra_args={ 404 "source_name": self.name, 405 "parsed_config": json.dumps(self._source_config), 406 } 407 ) 408 409 return ( 410 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 411 )
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
413 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 414 self._configure_logger_level(logger) 415 return super().check(logger, config)
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
417 def read( 418 self, 419 logger: logging.Logger, 420 config: Mapping[str, Any], 421 catalog: ConfiguredAirbyteCatalog, 422 state: Optional[List[AirbyteStateMessage]] = None, 423 ) -> Iterator[AirbyteMessage]: 424 self._configure_logger_level(logger) 425 yield from super().read(logger, config, catalog, state)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.