airbyte_cdk.sources.declarative.concurrent_declarative_source
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3import json 4import logging 5import pkgutil 6from copy import deepcopy 7from dataclasses import dataclass, field 8from queue import Queue 9from types import ModuleType 10from typing import ( 11 Any, 12 ClassVar, 13 Dict, 14 Iterator, 15 List, 16 Mapping, 17 Optional, 18 Set, 19) 20 21import orjson 22import yaml 23from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor 24from jsonschema.exceptions import ValidationError 25from jsonschema.validators import validate 26 27from airbyte_cdk.config_observation import create_connector_config_control_message 28from airbyte_cdk.connector_builder.models import ( 29 LogMessage as ConnectorBuilderLogMessage, 30) 31from airbyte_cdk.manifest_migrations.migration_handler import ( 32 ManifestMigrationHandler, 33) 34from airbyte_cdk.models import ( 35 AirbyteCatalog, 36 AirbyteConnectionStatus, 37 AirbyteMessage, 38 AirbyteStateMessage, 39 ConfiguredAirbyteCatalog, 40 ConnectorSpecification, 41 FailureType, 42 Status, 43) 44from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer 45from airbyte_cdk.sources import Source 46from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 47from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 48from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 49from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 50from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 51from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean 52from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 53 ConcurrencyLevel as ConcurrencyLevelModel, 54) 55from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 56 DeclarativeStream as DeclarativeStreamModel, 57) 58from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 59 Spec as SpecModel, 60) 61from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 62 StateDelegatingStream as StateDelegatingStreamModel, 63) 64from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 65 get_registered_components_module, 66) 67from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 68 ManifestComponentTransformer, 69) 70from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( 71 ManifestNormalizer, 72) 73from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 74 ManifestReferenceResolver, 75) 76from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 77 ModelToComponentFactory, 78) 79from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( 80 GroupingPartitionRouter, 81) 82from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( 83 SubstreamPartitionRouter, 84) 85from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 86from airbyte_cdk.sources.declarative.spec.spec import Spec 87from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition 88from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository 89from airbyte_cdk.sources.message.repository import InMemoryMessageRepository 90from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 91from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 92from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem 93from airbyte_cdk.sources.utils.slice_logger import ( 94 AlwaysLogSliceLogger, 95 DebugSliceLogger, 96 SliceLogger, 97) 98from airbyte_cdk.utils.stream_status_utils import as_airbyte_message 99from airbyte_cdk.utils.traced_exception import AirbyteTracedException 100 101 102@dataclass 103class TestLimits: 104 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 105 106 DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 107 DEFAULT_MAX_SLICES: ClassVar[int] = 5 108 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 109 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 110 111 max_records: int = field(default=DEFAULT_MAX_RECORDS) 112 max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) 113 max_slices: int = field(default=DEFAULT_MAX_SLICES) 114 max_streams: int = field(default=DEFAULT_MAX_STREAMS) 115 116 117def _get_declarative_component_schema() -> Dict[str, Any]: 118 try: 119 raw_component_schema = pkgutil.get_data( 120 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 121 ) 122 if raw_component_schema is not None: 123 declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) 124 return declarative_component_schema # type: ignore 125 else: 126 raise RuntimeError( 127 "Failed to read manifest component json schema required for deduplication" 128 ) 129 except FileNotFoundError as e: 130 raise FileNotFoundError( 131 f"Failed to read manifest component json schema required for deduplication: {e}" 132 ) 133 134 135class ConcurrentDeclarativeSource(Source): 136 # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock 137 # because it has hit the limit of futures but not partition reader is consuming them. 138 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 139 140 def __init__( 141 self, 142 catalog: Optional[ConfiguredAirbyteCatalog] = None, 143 config: Optional[Mapping[str, Any]] = None, 144 state: Optional[List[AirbyteStateMessage]] = None, 145 *, 146 source_config: ConnectionDefinition, 147 debug: bool = False, 148 emit_connector_builder_messages: bool = False, 149 migrate_manifest: bool = False, 150 normalize_manifest: bool = False, 151 limits: Optional[TestLimits] = None, 152 config_path: Optional[str] = None, 153 **kwargs: Any, 154 ) -> None: 155 self.logger = logging.getLogger(f"airbyte.{self.name}") 156 157 self._limits = limits 158 159 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 160 # no longer needs to store the original incoming state. But maybe there's an edge case? 161 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 162 163 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 164 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 165 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 166 # information and might even need to be configurable depending on the source 167 queue: Queue[QueueItem] = Queue(maxsize=10_000) 168 message_repository = InMemoryMessageRepository( 169 Level.DEBUG if emit_connector_builder_messages else Level.INFO 170 ) 171 172 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 173 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 174 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 175 # incremental streams running in full refresh. 176 component_factory = ModelToComponentFactory( 177 emit_connector_builder_messages=emit_connector_builder_messages, 178 message_repository=ConcurrentMessageRepository(queue, message_repository), 179 configured_catalog=catalog, 180 connector_state_manager=self._connector_state_manager, 181 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 182 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 183 limit_slices_fetched=limits.max_slices if limits else None, 184 disable_retries=True if limits else False, 185 disable_cache=True if limits else False, 186 ) 187 188 self._should_normalize = normalize_manifest 189 self._should_migrate = migrate_manifest 190 self._declarative_component_schema = _get_declarative_component_schema() 191 # If custom components are needed, locate and/or register them. 192 self.components_module: ModuleType | None = get_registered_components_module(config=config) 193 # set additional attributes 194 self._debug = debug 195 self._emit_connector_builder_messages = emit_connector_builder_messages 196 self._constructor = ( 197 component_factory 198 if component_factory 199 else ModelToComponentFactory( 200 emit_connector_builder_messages=emit_connector_builder_messages, 201 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 202 ) 203 ) 204 205 self._message_repository = self._constructor.get_message_repository() 206 self._slice_logger: SliceLogger = ( 207 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 208 ) 209 210 # resolve all components in the manifest 211 self._source_config = self._pre_process_manifest(dict(source_config)) 212 # validate resolved manifest against the declarative component schema 213 self._validate_source() 214 # apply additional post-processing to the manifest 215 self._post_process_manifest() 216 217 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 218 self._spec_component: Optional[Spec] = ( 219 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 220 ) 221 self._config = self._migrate_and_transform_config(config_path, config) or {} 222 223 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 224 if concurrency_level_from_manifest: 225 concurrency_level_component = self._constructor.create_component( 226 model_type=ConcurrencyLevelModel, 227 component_definition=concurrency_level_from_manifest, 228 config=config or {}, 229 ) 230 if not isinstance(concurrency_level_component, ConcurrencyLevel): 231 raise ValueError( 232 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 233 ) 234 235 concurrency_level = concurrency_level_component.get_concurrency_level() 236 initial_number_of_partitions_to_generate = max( 237 concurrency_level // 2, 1 238 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 239 else: 240 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 241 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 242 243 self._concurrent_source = ConcurrentSource.create( 244 num_workers=concurrency_level, 245 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 246 logger=self.logger, 247 slice_logger=self._slice_logger, 248 queue=queue, 249 message_repository=self._message_repository, 250 ) 251 252 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 253 """ 254 Preprocesses the provided manifest dictionary by resolving any manifest references. 255 256 This method modifies the input manifest in place, resolving references using the 257 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 258 259 Args: 260 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 261 262 Returns: 263 None 264 """ 265 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 266 manifest = self._fix_source_type(manifest) 267 # Resolve references in the manifest 268 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 269 # Propagate types and parameters throughout the manifest 270 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 271 "", resolved_manifest, {} 272 ) 273 274 return propagated_manifest 275 276 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 277 """ 278 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 279 """ 280 if "type" not in manifest: 281 manifest["type"] = "DeclarativeSource" 282 283 return manifest 284 285 def _post_process_manifest(self) -> None: 286 """ 287 Post-processes the manifest after validation. 288 This method is responsible for any additional modifications or transformations needed 289 after the manifest has been validated and before it is used in the source. 290 """ 291 # apply manifest migration, if required 292 self._migrate_manifest() 293 # apply manifest normalization, if required 294 self._normalize_manifest() 295 296 def _migrate_manifest(self) -> None: 297 """ 298 This method is used to migrate the manifest. It should be called after the manifest has been validated. 299 The migration is done in place, so the original manifest is modified. 300 301 The original manifest is returned if any error occurs during migration. 302 """ 303 if self._should_migrate: 304 manifest_migrator = ManifestMigrationHandler(self._source_config) 305 self._source_config = manifest_migrator.apply_migrations() 306 # validate migrated manifest against the declarative component schema 307 self._validate_source() 308 309 def _normalize_manifest(self) -> None: 310 """ 311 This method is used to normalize the manifest. It should be called after the manifest has been validated. 312 313 Connector Builder UI rendering requires the manifest to be in a specific format. 314 - references have been resolved 315 - the commonly used definitions are extracted to the `definitions.linked.*` 316 """ 317 if self._should_normalize: 318 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 319 self._source_config = normalizer.normalize() 320 321 def _validate_source(self) -> None: 322 """ 323 Validates the connector manifest against the declarative component schema 324 """ 325 326 try: 327 validate(self._source_config, self._declarative_component_schema) 328 except ValidationError as e: 329 raise ValidationError( 330 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 331 ) from e 332 333 def _migrate_and_transform_config( 334 self, 335 config_path: Optional[str], 336 config: Optional[Config], 337 ) -> Optional[Config]: 338 if not config: 339 return None 340 if not self._spec_component: 341 return config 342 mutable_config = dict(config) 343 self._spec_component.migrate_config(mutable_config) 344 if mutable_config != config: 345 if config_path: 346 with open(config_path, "w") as f: 347 json.dump(mutable_config, f) 348 control_message = create_connector_config_control_message(mutable_config) 349 print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode()) 350 self._spec_component.transform_config(mutable_config) 351 return mutable_config 352 353 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 354 config = self._config or config 355 return super().configure(config, temp_dir) 356 357 @property 358 def resolved_manifest(self) -> Mapping[str, Any]: 359 """ 360 Returns the resolved manifest configuration for the source. 361 362 This property provides access to the internal source configuration as a mapping, 363 which contains all settings and parameters required to define the source's behavior. 364 365 Returns: 366 Mapping[str, Any]: The resolved source configuration manifest. 367 """ 368 return self._source_config 369 370 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 371 return self._constructor.get_model_deprecations() 372 373 def read( 374 self, 375 logger: logging.Logger, 376 config: Mapping[str, Any], 377 catalog: ConfiguredAirbyteCatalog, 378 state: Optional[List[AirbyteStateMessage]] = None, 379 ) -> Iterator[AirbyteMessage]: 380 selected_concurrent_streams = self._select_streams( 381 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 382 configured_catalog=catalog, 383 ) 384 385 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 386 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 387 if len(selected_concurrent_streams) > 0: 388 yield from self._concurrent_source.read(selected_concurrent_streams) 389 390 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 391 return AirbyteCatalog( 392 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 393 ) 394 395 # todo: add PR comment about whether we can change the signature to List[AbstractStream] 396 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 397 """ 398 The `streams` method is used as part of the AbstractSource in the following cases: 399 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 400 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 401 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 402 403 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 404 """ 405 406 if self._spec_component: 407 self._spec_component.validate_config(self._config) 408 409 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 410 411 api_budget_model = self._source_config.get("api_budget") 412 if api_budget_model: 413 self._constructor.set_api_budget(api_budget_model, self._config) 414 415 prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 416 417 source_streams = [ 418 self._constructor.create_component( 419 ( 420 StateDelegatingStreamModel 421 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 422 else DeclarativeStreamModel 423 ), 424 stream_config, 425 self._config, 426 emit_connector_builder_messages=self._emit_connector_builder_messages, 427 ) 428 for stream_config in prepared_configs 429 ] 430 431 self._apply_stream_groups(source_streams) 432 433 return source_streams 434 435 def _apply_stream_groups(self, streams: List[AbstractStream]) -> None: 436 """Set block_simultaneous_read on streams based on the manifest's stream_groups config. 437 438 Iterates over the resolved manifest's stream_groups and matches group membership 439 against actual created stream instances by name. Validates that no stream shares a 440 group with any of its parent streams, which would cause a deadlock. 441 """ 442 stream_groups = self._source_config.get("stream_groups", {}) 443 if not stream_groups: 444 return 445 446 # Build stream_name -> group_name mapping from the resolved manifest 447 stream_name_to_group: Dict[str, str] = {} 448 for group_name, group_config in stream_groups.items(): 449 for stream_ref in group_config.get("streams", []): 450 if isinstance(stream_ref, dict): 451 stream_name = stream_ref.get("name", "") 452 if stream_name: 453 stream_name_to_group[stream_name] = group_name 454 455 # Validate no stream shares a group with any of its ancestor streams 456 stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} 457 458 def _collect_all_ancestor_names(stream_name: str) -> Set[str]: 459 """Recursively collect all ancestor stream names.""" 460 ancestors: Set[str] = set() 461 inst = stream_name_to_instance.get(stream_name) 462 if not isinstance(inst, DefaultStream): 463 return ancestors 464 router = inst.get_partition_router() 465 if isinstance(router, GroupingPartitionRouter): 466 router = router.underlying_partition_router 467 if not isinstance(router, SubstreamPartitionRouter): 468 return ancestors 469 for parent_config in router.parent_stream_configs: 470 parent_name = parent_config.stream.name 471 ancestors.add(parent_name) 472 ancestors.update(_collect_all_ancestor_names(parent_name)) 473 return ancestors 474 475 for stream in streams: 476 if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group: 477 continue 478 group_name = stream_name_to_group[stream.name] 479 for ancestor_name in _collect_all_ancestor_names(stream.name): 480 if stream_name_to_group.get(ancestor_name) == group_name: 481 raise ValueError( 482 f"Stream '{stream.name}' and its parent stream '{ancestor_name}' " 483 f"are both in group '{group_name}'. " 484 f"A child stream must not share a group with its parent to avoid deadlock." 485 ) 486 487 # Apply group to matching stream instances 488 for stream in streams: 489 if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group: 490 stream.block_simultaneous_read = stream_name_to_group[stream.name] 491 492 @staticmethod 493 def _initialize_cache_for_parent_streams( 494 stream_configs: List[Dict[str, Any]], 495 ) -> List[Dict[str, Any]]: 496 """Enable caching for parent streams unless explicitly disabled. 497 498 Caching is enabled by default for parent streams to optimize performance when the same 499 parent data is needed by multiple child streams. However, explicit `use_cache: false` 500 settings are respected for streams that cannot use caching (e.g., scroll-based pagination 501 APIs where caching causes duplicate records). 502 """ 503 parent_streams = set() 504 505 def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None: 506 """Set use_cache to True only if not explicitly disabled.""" 507 if requester.get("use_cache") is not False: 508 requester["use_cache"] = True 509 510 def update_with_cache_parent_configs( 511 parent_configs: list[dict[str, Any]], 512 ) -> None: 513 for parent_config in parent_configs: 514 parent_streams.add(parent_config["stream"]["name"]) 515 if parent_config["stream"]["type"] == "StateDelegatingStream": 516 _set_cache_if_not_disabled( 517 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"] 518 ) 519 _set_cache_if_not_disabled( 520 parent_config["stream"]["incremental_stream"]["retriever"]["requester"] 521 ) 522 else: 523 _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"]) 524 525 for stream_config in stream_configs: 526 if stream_config.get("incremental_sync", {}).get("parent_stream"): 527 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 528 _set_cache_if_not_disabled( 529 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"] 530 ) 531 532 elif stream_config.get("retriever", {}).get("partition_router", {}): 533 partition_router = stream_config["retriever"]["partition_router"] 534 535 if isinstance(partition_router, dict) and partition_router.get( 536 "parent_stream_configs" 537 ): 538 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 539 elif isinstance(partition_router, list): 540 for router in partition_router: 541 if router.get("parent_stream_configs"): 542 update_with_cache_parent_configs(router["parent_stream_configs"]) 543 544 for stream_config in stream_configs: 545 if stream_config["name"] in parent_streams: 546 if stream_config["type"] == "StateDelegatingStream": 547 _set_cache_if_not_disabled( 548 stream_config["full_refresh_stream"]["retriever"]["requester"] 549 ) 550 _set_cache_if_not_disabled( 551 stream_config["incremental_stream"]["retriever"]["requester"] 552 ) 553 else: 554 _set_cache_if_not_disabled(stream_config["retriever"]["requester"]) 555 return stream_configs 556 557 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 558 """ 559 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 560 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 561 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 562 in the project root. 563 """ 564 return ( 565 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 566 ) 567 568 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 569 check = self._source_config.get("check") 570 if not check: 571 raise ValueError(f"Missing 'check' component definition within the manifest.") 572 573 if "type" not in check: 574 check["type"] = "CheckStream" 575 connection_checker = self._constructor.create_component( 576 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 577 check, 578 dict(), 579 emit_connector_builder_messages=self._emit_connector_builder_messages, 580 ) 581 if not isinstance(connection_checker, ConnectionChecker): 582 raise ValueError( 583 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 584 ) 585 586 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 587 if not check_succeeded: 588 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 589 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 590 591 @property 592 def dynamic_streams(self) -> List[Dict[str, Any]]: 593 return self._dynamic_stream_configs( 594 manifest=self._source_config, 595 with_dynamic_stream_name=True, 596 ) 597 598 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 599 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 600 stream_configs = [] 601 for current_stream_config in manifest.get("streams", []): 602 if ( 603 "type" in current_stream_config 604 and current_stream_config["type"] == "ConditionalStreams" 605 ): 606 interpolated_boolean = InterpolatedBoolean( 607 condition=current_stream_config.get("condition"), 608 parameters={}, 609 ) 610 611 if interpolated_boolean.eval(config=self._config): 612 stream_configs.extend(current_stream_config.get("streams", [])) 613 else: 614 if "type" not in current_stream_config: 615 current_stream_config["type"] = "DeclarativeStream" 616 stream_configs.append(current_stream_config) 617 return stream_configs 618 619 def _dynamic_stream_configs( 620 self, 621 manifest: Mapping[str, Any], 622 with_dynamic_stream_name: Optional[bool] = None, 623 ) -> List[Dict[str, Any]]: 624 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 625 dynamic_stream_configs: List[Dict[str, Any]] = [] 626 seen_dynamic_streams: Set[str] = set() 627 628 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 629 components_resolver_config = dynamic_definition["components_resolver"] 630 631 if not components_resolver_config: 632 raise ValueError( 633 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 634 ) 635 636 resolver_type = components_resolver_config.get("type") 637 if not resolver_type: 638 raise ValueError( 639 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 640 ) 641 642 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 643 raise ValueError( 644 f"Invalid components resolver type '{resolver_type}'. " 645 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 646 ) 647 648 if "retriever" in components_resolver_config: 649 components_resolver_config["retriever"]["requester"]["use_cache"] = True 650 651 # Create a resolver for dynamic components based on type 652 if resolver_type == "HttpComponentsResolver": 653 components_resolver = self._constructor.create_component( 654 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 655 component_definition=components_resolver_config, 656 config=self._config, 657 stream_name=dynamic_definition.get("name"), 658 ) 659 else: 660 components_resolver = self._constructor.create_component( 661 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 662 component_definition=components_resolver_config, 663 config=self._config, 664 ) 665 666 stream_template_config = dynamic_definition["stream_template"] 667 668 for dynamic_stream in components_resolver.resolve_components( 669 stream_template_config=stream_template_config 670 ): 671 # Get the use_parent_parameters configuration from the dynamic definition 672 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 673 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 674 675 dynamic_stream = { 676 **ManifestComponentTransformer().propagate_types_and_parameters( 677 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 678 ) 679 } 680 681 if "type" not in dynamic_stream: 682 dynamic_stream["type"] = "DeclarativeStream" 683 684 # Ensure that each stream is created with a unique name 685 name = dynamic_stream.get("name") 686 687 if with_dynamic_stream_name: 688 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 689 "name", f"dynamic_stream_{dynamic_definition_index}" 690 ) 691 692 if not isinstance(name, str): 693 raise ValueError( 694 f"Expected stream name {name} to be a string, got {type(name)}." 695 ) 696 697 if name in seen_dynamic_streams: 698 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 699 failure_type = FailureType.system_error 700 701 if resolver_type == "ConfigComponentsResolver": 702 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 703 failure_type = FailureType.config_error 704 705 raise AirbyteTracedException( 706 message=error_message, 707 internal_message=error_message, 708 failure_type=failure_type, 709 ) 710 711 seen_dynamic_streams.add(name) 712 dynamic_stream_configs.append(dynamic_stream) 713 714 return dynamic_stream_configs 715 716 def _select_streams( 717 self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 718 ) -> List[AbstractStream]: 719 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 720 abstract_streams: List[AbstractStream] = [] 721 for configured_stream in configured_catalog.streams: 722 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 723 if stream_instance: 724 abstract_streams.append(stream_instance) 725 else: 726 # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if 727 # the source was configured with raise_exception_on_missing_stream=True. This was used on very 728 # few sources like facebook-marketing and google-ads. We decided not to port this feature over, 729 # but we can do so if we feel it necessary. With the current behavior,we should still result 730 # in a partial failure since missing streams will be marked as INCOMPLETE. 731 self._message_repository.emit_message( 732 as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) 733 ) 734 return abstract_streams
103@dataclass 104class TestLimits: 105 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 106 107 DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 108 DEFAULT_MAX_SLICES: ClassVar[int] = 5 109 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 110 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 111 112 max_records: int = field(default=DEFAULT_MAX_RECORDS) 113 max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) 114 max_slices: int = field(default=DEFAULT_MAX_SLICES) 115 max_streams: int = field(default=DEFAULT_MAX_STREAMS)
136class ConcurrentDeclarativeSource(Source): 137 # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock 138 # because it has hit the limit of futures but not partition reader is consuming them. 139 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 140 141 def __init__( 142 self, 143 catalog: Optional[ConfiguredAirbyteCatalog] = None, 144 config: Optional[Mapping[str, Any]] = None, 145 state: Optional[List[AirbyteStateMessage]] = None, 146 *, 147 source_config: ConnectionDefinition, 148 debug: bool = False, 149 emit_connector_builder_messages: bool = False, 150 migrate_manifest: bool = False, 151 normalize_manifest: bool = False, 152 limits: Optional[TestLimits] = None, 153 config_path: Optional[str] = None, 154 **kwargs: Any, 155 ) -> None: 156 self.logger = logging.getLogger(f"airbyte.{self.name}") 157 158 self._limits = limits 159 160 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 161 # no longer needs to store the original incoming state. But maybe there's an edge case? 162 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 163 164 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 165 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 166 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 167 # information and might even need to be configurable depending on the source 168 queue: Queue[QueueItem] = Queue(maxsize=10_000) 169 message_repository = InMemoryMessageRepository( 170 Level.DEBUG if emit_connector_builder_messages else Level.INFO 171 ) 172 173 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 174 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 175 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 176 # incremental streams running in full refresh. 177 component_factory = ModelToComponentFactory( 178 emit_connector_builder_messages=emit_connector_builder_messages, 179 message_repository=ConcurrentMessageRepository(queue, message_repository), 180 configured_catalog=catalog, 181 connector_state_manager=self._connector_state_manager, 182 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 183 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 184 limit_slices_fetched=limits.max_slices if limits else None, 185 disable_retries=True if limits else False, 186 disable_cache=True if limits else False, 187 ) 188 189 self._should_normalize = normalize_manifest 190 self._should_migrate = migrate_manifest 191 self._declarative_component_schema = _get_declarative_component_schema() 192 # If custom components are needed, locate and/or register them. 193 self.components_module: ModuleType | None = get_registered_components_module(config=config) 194 # set additional attributes 195 self._debug = debug 196 self._emit_connector_builder_messages = emit_connector_builder_messages 197 self._constructor = ( 198 component_factory 199 if component_factory 200 else ModelToComponentFactory( 201 emit_connector_builder_messages=emit_connector_builder_messages, 202 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 203 ) 204 ) 205 206 self._message_repository = self._constructor.get_message_repository() 207 self._slice_logger: SliceLogger = ( 208 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 209 ) 210 211 # resolve all components in the manifest 212 self._source_config = self._pre_process_manifest(dict(source_config)) 213 # validate resolved manifest against the declarative component schema 214 self._validate_source() 215 # apply additional post-processing to the manifest 216 self._post_process_manifest() 217 218 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 219 self._spec_component: Optional[Spec] = ( 220 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 221 ) 222 self._config = self._migrate_and_transform_config(config_path, config) or {} 223 224 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 225 if concurrency_level_from_manifest: 226 concurrency_level_component = self._constructor.create_component( 227 model_type=ConcurrencyLevelModel, 228 component_definition=concurrency_level_from_manifest, 229 config=config or {}, 230 ) 231 if not isinstance(concurrency_level_component, ConcurrencyLevel): 232 raise ValueError( 233 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 234 ) 235 236 concurrency_level = concurrency_level_component.get_concurrency_level() 237 initial_number_of_partitions_to_generate = max( 238 concurrency_level // 2, 1 239 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 240 else: 241 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 242 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 243 244 self._concurrent_source = ConcurrentSource.create( 245 num_workers=concurrency_level, 246 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 247 logger=self.logger, 248 slice_logger=self._slice_logger, 249 queue=queue, 250 message_repository=self._message_repository, 251 ) 252 253 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 254 """ 255 Preprocesses the provided manifest dictionary by resolving any manifest references. 256 257 This method modifies the input manifest in place, resolving references using the 258 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 259 260 Args: 261 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 262 263 Returns: 264 None 265 """ 266 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 267 manifest = self._fix_source_type(manifest) 268 # Resolve references in the manifest 269 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 270 # Propagate types and parameters throughout the manifest 271 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 272 "", resolved_manifest, {} 273 ) 274 275 return propagated_manifest 276 277 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 278 """ 279 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 280 """ 281 if "type" not in manifest: 282 manifest["type"] = "DeclarativeSource" 283 284 return manifest 285 286 def _post_process_manifest(self) -> None: 287 """ 288 Post-processes the manifest after validation. 289 This method is responsible for any additional modifications or transformations needed 290 after the manifest has been validated and before it is used in the source. 291 """ 292 # apply manifest migration, if required 293 self._migrate_manifest() 294 # apply manifest normalization, if required 295 self._normalize_manifest() 296 297 def _migrate_manifest(self) -> None: 298 """ 299 This method is used to migrate the manifest. It should be called after the manifest has been validated. 300 The migration is done in place, so the original manifest is modified. 301 302 The original manifest is returned if any error occurs during migration. 303 """ 304 if self._should_migrate: 305 manifest_migrator = ManifestMigrationHandler(self._source_config) 306 self._source_config = manifest_migrator.apply_migrations() 307 # validate migrated manifest against the declarative component schema 308 self._validate_source() 309 310 def _normalize_manifest(self) -> None: 311 """ 312 This method is used to normalize the manifest. It should be called after the manifest has been validated. 313 314 Connector Builder UI rendering requires the manifest to be in a specific format. 315 - references have been resolved 316 - the commonly used definitions are extracted to the `definitions.linked.*` 317 """ 318 if self._should_normalize: 319 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 320 self._source_config = normalizer.normalize() 321 322 def _validate_source(self) -> None: 323 """ 324 Validates the connector manifest against the declarative component schema 325 """ 326 327 try: 328 validate(self._source_config, self._declarative_component_schema) 329 except ValidationError as e: 330 raise ValidationError( 331 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 332 ) from e 333 334 def _migrate_and_transform_config( 335 self, 336 config_path: Optional[str], 337 config: Optional[Config], 338 ) -> Optional[Config]: 339 if not config: 340 return None 341 if not self._spec_component: 342 return config 343 mutable_config = dict(config) 344 self._spec_component.migrate_config(mutable_config) 345 if mutable_config != config: 346 if config_path: 347 with open(config_path, "w") as f: 348 json.dump(mutable_config, f) 349 control_message = create_connector_config_control_message(mutable_config) 350 print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode()) 351 self._spec_component.transform_config(mutable_config) 352 return mutable_config 353 354 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 355 config = self._config or config 356 return super().configure(config, temp_dir) 357 358 @property 359 def resolved_manifest(self) -> Mapping[str, Any]: 360 """ 361 Returns the resolved manifest configuration for the source. 362 363 This property provides access to the internal source configuration as a mapping, 364 which contains all settings and parameters required to define the source's behavior. 365 366 Returns: 367 Mapping[str, Any]: The resolved source configuration manifest. 368 """ 369 return self._source_config 370 371 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 372 return self._constructor.get_model_deprecations() 373 374 def read( 375 self, 376 logger: logging.Logger, 377 config: Mapping[str, Any], 378 catalog: ConfiguredAirbyteCatalog, 379 state: Optional[List[AirbyteStateMessage]] = None, 380 ) -> Iterator[AirbyteMessage]: 381 selected_concurrent_streams = self._select_streams( 382 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 383 configured_catalog=catalog, 384 ) 385 386 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 387 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 388 if len(selected_concurrent_streams) > 0: 389 yield from self._concurrent_source.read(selected_concurrent_streams) 390 391 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 392 return AirbyteCatalog( 393 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 394 ) 395 396 # todo: add PR comment about whether we can change the signature to List[AbstractStream] 397 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 398 """ 399 The `streams` method is used as part of the AbstractSource in the following cases: 400 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 401 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 402 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 403 404 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 405 """ 406 407 if self._spec_component: 408 self._spec_component.validate_config(self._config) 409 410 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 411 412 api_budget_model = self._source_config.get("api_budget") 413 if api_budget_model: 414 self._constructor.set_api_budget(api_budget_model, self._config) 415 416 prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 417 418 source_streams = [ 419 self._constructor.create_component( 420 ( 421 StateDelegatingStreamModel 422 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 423 else DeclarativeStreamModel 424 ), 425 stream_config, 426 self._config, 427 emit_connector_builder_messages=self._emit_connector_builder_messages, 428 ) 429 for stream_config in prepared_configs 430 ] 431 432 self._apply_stream_groups(source_streams) 433 434 return source_streams 435 436 def _apply_stream_groups(self, streams: List[AbstractStream]) -> None: 437 """Set block_simultaneous_read on streams based on the manifest's stream_groups config. 438 439 Iterates over the resolved manifest's stream_groups and matches group membership 440 against actual created stream instances by name. Validates that no stream shares a 441 group with any of its parent streams, which would cause a deadlock. 442 """ 443 stream_groups = self._source_config.get("stream_groups", {}) 444 if not stream_groups: 445 return 446 447 # Build stream_name -> group_name mapping from the resolved manifest 448 stream_name_to_group: Dict[str, str] = {} 449 for group_name, group_config in stream_groups.items(): 450 for stream_ref in group_config.get("streams", []): 451 if isinstance(stream_ref, dict): 452 stream_name = stream_ref.get("name", "") 453 if stream_name: 454 stream_name_to_group[stream_name] = group_name 455 456 # Validate no stream shares a group with any of its ancestor streams 457 stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} 458 459 def _collect_all_ancestor_names(stream_name: str) -> Set[str]: 460 """Recursively collect all ancestor stream names.""" 461 ancestors: Set[str] = set() 462 inst = stream_name_to_instance.get(stream_name) 463 if not isinstance(inst, DefaultStream): 464 return ancestors 465 router = inst.get_partition_router() 466 if isinstance(router, GroupingPartitionRouter): 467 router = router.underlying_partition_router 468 if not isinstance(router, SubstreamPartitionRouter): 469 return ancestors 470 for parent_config in router.parent_stream_configs: 471 parent_name = parent_config.stream.name 472 ancestors.add(parent_name) 473 ancestors.update(_collect_all_ancestor_names(parent_name)) 474 return ancestors 475 476 for stream in streams: 477 if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group: 478 continue 479 group_name = stream_name_to_group[stream.name] 480 for ancestor_name in _collect_all_ancestor_names(stream.name): 481 if stream_name_to_group.get(ancestor_name) == group_name: 482 raise ValueError( 483 f"Stream '{stream.name}' and its parent stream '{ancestor_name}' " 484 f"are both in group '{group_name}'. " 485 f"A child stream must not share a group with its parent to avoid deadlock." 486 ) 487 488 # Apply group to matching stream instances 489 for stream in streams: 490 if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group: 491 stream.block_simultaneous_read = stream_name_to_group[stream.name] 492 493 @staticmethod 494 def _initialize_cache_for_parent_streams( 495 stream_configs: List[Dict[str, Any]], 496 ) -> List[Dict[str, Any]]: 497 """Enable caching for parent streams unless explicitly disabled. 498 499 Caching is enabled by default for parent streams to optimize performance when the same 500 parent data is needed by multiple child streams. However, explicit `use_cache: false` 501 settings are respected for streams that cannot use caching (e.g., scroll-based pagination 502 APIs where caching causes duplicate records). 503 """ 504 parent_streams = set() 505 506 def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None: 507 """Set use_cache to True only if not explicitly disabled.""" 508 if requester.get("use_cache") is not False: 509 requester["use_cache"] = True 510 511 def update_with_cache_parent_configs( 512 parent_configs: list[dict[str, Any]], 513 ) -> None: 514 for parent_config in parent_configs: 515 parent_streams.add(parent_config["stream"]["name"]) 516 if parent_config["stream"]["type"] == "StateDelegatingStream": 517 _set_cache_if_not_disabled( 518 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"] 519 ) 520 _set_cache_if_not_disabled( 521 parent_config["stream"]["incremental_stream"]["retriever"]["requester"] 522 ) 523 else: 524 _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"]) 525 526 for stream_config in stream_configs: 527 if stream_config.get("incremental_sync", {}).get("parent_stream"): 528 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 529 _set_cache_if_not_disabled( 530 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"] 531 ) 532 533 elif stream_config.get("retriever", {}).get("partition_router", {}): 534 partition_router = stream_config["retriever"]["partition_router"] 535 536 if isinstance(partition_router, dict) and partition_router.get( 537 "parent_stream_configs" 538 ): 539 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 540 elif isinstance(partition_router, list): 541 for router in partition_router: 542 if router.get("parent_stream_configs"): 543 update_with_cache_parent_configs(router["parent_stream_configs"]) 544 545 for stream_config in stream_configs: 546 if stream_config["name"] in parent_streams: 547 if stream_config["type"] == "StateDelegatingStream": 548 _set_cache_if_not_disabled( 549 stream_config["full_refresh_stream"]["retriever"]["requester"] 550 ) 551 _set_cache_if_not_disabled( 552 stream_config["incremental_stream"]["retriever"]["requester"] 553 ) 554 else: 555 _set_cache_if_not_disabled(stream_config["retriever"]["requester"]) 556 return stream_configs 557 558 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 559 """ 560 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 561 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 562 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 563 in the project root. 564 """ 565 return ( 566 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 567 ) 568 569 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 570 check = self._source_config.get("check") 571 if not check: 572 raise ValueError(f"Missing 'check' component definition within the manifest.") 573 574 if "type" not in check: 575 check["type"] = "CheckStream" 576 connection_checker = self._constructor.create_component( 577 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 578 check, 579 dict(), 580 emit_connector_builder_messages=self._emit_connector_builder_messages, 581 ) 582 if not isinstance(connection_checker, ConnectionChecker): 583 raise ValueError( 584 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 585 ) 586 587 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 588 if not check_succeeded: 589 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 590 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 591 592 @property 593 def dynamic_streams(self) -> List[Dict[str, Any]]: 594 return self._dynamic_stream_configs( 595 manifest=self._source_config, 596 with_dynamic_stream_name=True, 597 ) 598 599 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 600 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 601 stream_configs = [] 602 for current_stream_config in manifest.get("streams", []): 603 if ( 604 "type" in current_stream_config 605 and current_stream_config["type"] == "ConditionalStreams" 606 ): 607 interpolated_boolean = InterpolatedBoolean( 608 condition=current_stream_config.get("condition"), 609 parameters={}, 610 ) 611 612 if interpolated_boolean.eval(config=self._config): 613 stream_configs.extend(current_stream_config.get("streams", [])) 614 else: 615 if "type" not in current_stream_config: 616 current_stream_config["type"] = "DeclarativeStream" 617 stream_configs.append(current_stream_config) 618 return stream_configs 619 620 def _dynamic_stream_configs( 621 self, 622 manifest: Mapping[str, Any], 623 with_dynamic_stream_name: Optional[bool] = None, 624 ) -> List[Dict[str, Any]]: 625 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 626 dynamic_stream_configs: List[Dict[str, Any]] = [] 627 seen_dynamic_streams: Set[str] = set() 628 629 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 630 components_resolver_config = dynamic_definition["components_resolver"] 631 632 if not components_resolver_config: 633 raise ValueError( 634 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 635 ) 636 637 resolver_type = components_resolver_config.get("type") 638 if not resolver_type: 639 raise ValueError( 640 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 641 ) 642 643 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 644 raise ValueError( 645 f"Invalid components resolver type '{resolver_type}'. " 646 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 647 ) 648 649 if "retriever" in components_resolver_config: 650 components_resolver_config["retriever"]["requester"]["use_cache"] = True 651 652 # Create a resolver for dynamic components based on type 653 if resolver_type == "HttpComponentsResolver": 654 components_resolver = self._constructor.create_component( 655 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 656 component_definition=components_resolver_config, 657 config=self._config, 658 stream_name=dynamic_definition.get("name"), 659 ) 660 else: 661 components_resolver = self._constructor.create_component( 662 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 663 component_definition=components_resolver_config, 664 config=self._config, 665 ) 666 667 stream_template_config = dynamic_definition["stream_template"] 668 669 for dynamic_stream in components_resolver.resolve_components( 670 stream_template_config=stream_template_config 671 ): 672 # Get the use_parent_parameters configuration from the dynamic definition 673 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 674 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 675 676 dynamic_stream = { 677 **ManifestComponentTransformer().propagate_types_and_parameters( 678 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 679 ) 680 } 681 682 if "type" not in dynamic_stream: 683 dynamic_stream["type"] = "DeclarativeStream" 684 685 # Ensure that each stream is created with a unique name 686 name = dynamic_stream.get("name") 687 688 if with_dynamic_stream_name: 689 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 690 "name", f"dynamic_stream_{dynamic_definition_index}" 691 ) 692 693 if not isinstance(name, str): 694 raise ValueError( 695 f"Expected stream name {name} to be a string, got {type(name)}." 696 ) 697 698 if name in seen_dynamic_streams: 699 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 700 failure_type = FailureType.system_error 701 702 if resolver_type == "ConfigComponentsResolver": 703 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 704 failure_type = FailureType.config_error 705 706 raise AirbyteTracedException( 707 message=error_message, 708 internal_message=error_message, 709 failure_type=failure_type, 710 ) 711 712 seen_dynamic_streams.add(name) 713 dynamic_stream_configs.append(dynamic_stream) 714 715 return dynamic_stream_configs 716 717 def _select_streams( 718 self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 719 ) -> List[AbstractStream]: 720 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 721 abstract_streams: List[AbstractStream] = [] 722 for configured_stream in configured_catalog.streams: 723 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 724 if stream_instance: 725 abstract_streams.append(stream_instance) 726 else: 727 # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if 728 # the source was configured with raise_exception_on_missing_stream=True. This was used on very 729 # few sources like facebook-marketing and google-ads. We decided not to port this feature over, 730 # but we can do so if we feel it necessary. With the current behavior,we should still result 731 # in a partial failure since missing streams will be marked as INCOMPLETE. 732 self._message_repository.emit_message( 733 as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) 734 ) 735 return abstract_streams
Helper class that provides a standard way to create an ABC using inheritance.
141 def __init__( 142 self, 143 catalog: Optional[ConfiguredAirbyteCatalog] = None, 144 config: Optional[Mapping[str, Any]] = None, 145 state: Optional[List[AirbyteStateMessage]] = None, 146 *, 147 source_config: ConnectionDefinition, 148 debug: bool = False, 149 emit_connector_builder_messages: bool = False, 150 migrate_manifest: bool = False, 151 normalize_manifest: bool = False, 152 limits: Optional[TestLimits] = None, 153 config_path: Optional[str] = None, 154 **kwargs: Any, 155 ) -> None: 156 self.logger = logging.getLogger(f"airbyte.{self.name}") 157 158 self._limits = limits 159 160 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 161 # no longer needs to store the original incoming state. But maybe there's an edge case? 162 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 163 164 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 165 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 166 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 167 # information and might even need to be configurable depending on the source 168 queue: Queue[QueueItem] = Queue(maxsize=10_000) 169 message_repository = InMemoryMessageRepository( 170 Level.DEBUG if emit_connector_builder_messages else Level.INFO 171 ) 172 173 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 174 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 175 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 176 # incremental streams running in full refresh. 177 component_factory = ModelToComponentFactory( 178 emit_connector_builder_messages=emit_connector_builder_messages, 179 message_repository=ConcurrentMessageRepository(queue, message_repository), 180 configured_catalog=catalog, 181 connector_state_manager=self._connector_state_manager, 182 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 183 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 184 limit_slices_fetched=limits.max_slices if limits else None, 185 disable_retries=True if limits else False, 186 disable_cache=True if limits else False, 187 ) 188 189 self._should_normalize = normalize_manifest 190 self._should_migrate = migrate_manifest 191 self._declarative_component_schema = _get_declarative_component_schema() 192 # If custom components are needed, locate and/or register them. 193 self.components_module: ModuleType | None = get_registered_components_module(config=config) 194 # set additional attributes 195 self._debug = debug 196 self._emit_connector_builder_messages = emit_connector_builder_messages 197 self._constructor = ( 198 component_factory 199 if component_factory 200 else ModelToComponentFactory( 201 emit_connector_builder_messages=emit_connector_builder_messages, 202 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 203 ) 204 ) 205 206 self._message_repository = self._constructor.get_message_repository() 207 self._slice_logger: SliceLogger = ( 208 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 209 ) 210 211 # resolve all components in the manifest 212 self._source_config = self._pre_process_manifest(dict(source_config)) 213 # validate resolved manifest against the declarative component schema 214 self._validate_source() 215 # apply additional post-processing to the manifest 216 self._post_process_manifest() 217 218 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 219 self._spec_component: Optional[Spec] = ( 220 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 221 ) 222 self._config = self._migrate_and_transform_config(config_path, config) or {} 223 224 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 225 if concurrency_level_from_manifest: 226 concurrency_level_component = self._constructor.create_component( 227 model_type=ConcurrencyLevelModel, 228 component_definition=concurrency_level_from_manifest, 229 config=config or {}, 230 ) 231 if not isinstance(concurrency_level_component, ConcurrencyLevel): 232 raise ValueError( 233 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 234 ) 235 236 concurrency_level = concurrency_level_component.get_concurrency_level() 237 initial_number_of_partitions_to_generate = max( 238 concurrency_level // 2, 1 239 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 240 else: 241 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 242 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 243 244 self._concurrent_source = ConcurrentSource.create( 245 num_workers=concurrency_level, 246 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 247 logger=self.logger, 248 slice_logger=self._slice_logger, 249 queue=queue, 250 message_repository=self._message_repository, 251 )
358 @property 359 def resolved_manifest(self) -> Mapping[str, Any]: 360 """ 361 Returns the resolved manifest configuration for the source. 362 363 This property provides access to the internal source configuration as a mapping, 364 which contains all settings and parameters required to define the source's behavior. 365 366 Returns: 367 Mapping[str, Any]: The resolved source configuration manifest. 368 """ 369 return self._source_config
Returns the resolved manifest configuration for the source.
This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.
Returns:
Mapping[str, Any]: The resolved source configuration manifest.
374 def read( 375 self, 376 logger: logging.Logger, 377 config: Mapping[str, Any], 378 catalog: ConfiguredAirbyteCatalog, 379 state: Optional[List[AirbyteStateMessage]] = None, 380 ) -> Iterator[AirbyteMessage]: 381 selected_concurrent_streams = self._select_streams( 382 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 383 configured_catalog=catalog, 384 ) 385 386 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 387 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 388 if len(selected_concurrent_streams) > 0: 389 yield from self._concurrent_source.read(selected_concurrent_streams)
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
391 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 392 return AirbyteCatalog( 393 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 394 )
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
397 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 398 """ 399 The `streams` method is used as part of the AbstractSource in the following cases: 400 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 401 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 402 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 403 404 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 405 """ 406 407 if self._spec_component: 408 self._spec_component.validate_config(self._config) 409 410 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 411 412 api_budget_model = self._source_config.get("api_budget") 413 if api_budget_model: 414 self._constructor.set_api_budget(api_budget_model, self._config) 415 416 prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 417 418 source_streams = [ 419 self._constructor.create_component( 420 ( 421 StateDelegatingStreamModel 422 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 423 else DeclarativeStreamModel 424 ), 425 stream_config, 426 self._config, 427 emit_connector_builder_messages=self._emit_connector_builder_messages, 428 ) 429 for stream_config in prepared_configs 430 ] 431 432 self._apply_stream_groups(source_streams) 433 434 return source_streams
The streams method is used as part of the AbstractSource in the following cases:
- ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
- ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by
streams) Note thatsuper.streams(config)is also called when splitting the streams between concurrent or not in_group_streams.
In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
558 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 559 """ 560 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 561 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 562 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 563 in the project root. 564 """ 565 return ( 566 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 567 )
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
569 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 570 check = self._source_config.get("check") 571 if not check: 572 raise ValueError(f"Missing 'check' component definition within the manifest.") 573 574 if "type" not in check: 575 check["type"] = "CheckStream" 576 connection_checker = self._constructor.create_component( 577 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 578 check, 579 dict(), 580 emit_connector_builder_messages=self._emit_connector_builder_messages, 581 ) 582 if not isinstance(connection_checker, ConnectionChecker): 583 raise ValueError( 584 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 585 ) 586 587 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 588 if not check_succeeded: 589 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 590 return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.