airbyte_cdk.sources.declarative.concurrent_declarative_source
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3import json 4import logging 5import pkgutil 6from copy import deepcopy 7from dataclasses import dataclass, field 8from queue import Queue 9from types import ModuleType 10from typing import ( 11 Any, 12 ClassVar, 13 Dict, 14 Iterator, 15 List, 16 Mapping, 17 Optional, 18 Set, 19) 20 21import orjson 22import yaml 23from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor 24from jsonschema.exceptions import ValidationError 25from jsonschema.validators import validate 26 27from airbyte_cdk.config_observation import create_connector_config_control_message 28from airbyte_cdk.connector_builder.models import ( 29 LogMessage as ConnectorBuilderLogMessage, 30) 31from airbyte_cdk.manifest_migrations.migration_handler import ( 32 ManifestMigrationHandler, 33) 34from airbyte_cdk.models import ( 35 AirbyteCatalog, 36 AirbyteConnectionStatus, 37 AirbyteMessage, 38 AirbyteStateMessage, 39 ConfiguredAirbyteCatalog, 40 ConnectorSpecification, 41 FailureType, 42 Status, 43) 44from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer 45from airbyte_cdk.sources import Source 46from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 47from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 48from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 49from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 50from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 51from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean 52from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 53 ConcurrencyLevel as ConcurrencyLevelModel, 54) 55from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 56 DeclarativeStream as DeclarativeStreamModel, 57) 58from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 59 Spec as SpecModel, 60) 61from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 62 StateDelegatingStream as StateDelegatingStreamModel, 63) 64from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 65 get_registered_components_module, 66) 67from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 68 ManifestComponentTransformer, 69) 70from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( 71 ManifestNormalizer, 72) 73from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 74 ManifestReferenceResolver, 75) 76from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 77 ModelToComponentFactory, 78) 79from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 80from airbyte_cdk.sources.declarative.spec.spec import Spec 81from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition 82from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository 83from airbyte_cdk.sources.message.repository import InMemoryMessageRepository 84from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 85from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem 86from airbyte_cdk.sources.utils.slice_logger import ( 87 AlwaysLogSliceLogger, 88 DebugSliceLogger, 89 SliceLogger, 90) 91from airbyte_cdk.utils.stream_status_utils import as_airbyte_message 92from airbyte_cdk.utils.traced_exception import AirbyteTracedException 93 94 95@dataclass 96class TestLimits: 97 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 98 99 DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 100 DEFAULT_MAX_SLICES: ClassVar[int] = 5 101 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 102 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 103 104 max_records: int = field(default=DEFAULT_MAX_RECORDS) 105 max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) 106 max_slices: int = field(default=DEFAULT_MAX_SLICES) 107 max_streams: int = field(default=DEFAULT_MAX_STREAMS) 108 109 110def _get_declarative_component_schema() -> Dict[str, Any]: 111 try: 112 raw_component_schema = pkgutil.get_data( 113 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 114 ) 115 if raw_component_schema is not None: 116 declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) 117 return declarative_component_schema # type: ignore 118 else: 119 raise RuntimeError( 120 "Failed to read manifest component json schema required for deduplication" 121 ) 122 except FileNotFoundError as e: 123 raise FileNotFoundError( 124 f"Failed to read manifest component json schema required for deduplication: {e}" 125 ) 126 127 128class ConcurrentDeclarativeSource(Source): 129 # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock 130 # because it has hit the limit of futures but not partition reader is consuming them. 131 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 132 133 def __init__( 134 self, 135 catalog: Optional[ConfiguredAirbyteCatalog] = None, 136 config: Optional[Mapping[str, Any]] = None, 137 state: Optional[List[AirbyteStateMessage]] = None, 138 *, 139 source_config: ConnectionDefinition, 140 debug: bool = False, 141 emit_connector_builder_messages: bool = False, 142 migrate_manifest: bool = False, 143 normalize_manifest: bool = False, 144 limits: Optional[TestLimits] = None, 145 config_path: Optional[str] = None, 146 **kwargs: Any, 147 ) -> None: 148 self.logger = logging.getLogger(f"airbyte.{self.name}") 149 150 self._limits = limits 151 152 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 153 # no longer needs to store the original incoming state. But maybe there's an edge case? 154 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 155 156 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 157 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 158 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 159 # information and might even need to be configurable depending on the source 160 queue: Queue[QueueItem] = Queue(maxsize=10_000) 161 message_repository = InMemoryMessageRepository( 162 Level.DEBUG if emit_connector_builder_messages else Level.INFO 163 ) 164 165 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 166 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 167 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 168 # incremental streams running in full refresh. 169 component_factory = ModelToComponentFactory( 170 emit_connector_builder_messages=emit_connector_builder_messages, 171 message_repository=ConcurrentMessageRepository(queue, message_repository), 172 configured_catalog=catalog, 173 connector_state_manager=self._connector_state_manager, 174 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 175 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 176 limit_slices_fetched=limits.max_slices if limits else None, 177 disable_retries=True if limits else False, 178 disable_cache=True if limits else False, 179 ) 180 181 self._should_normalize = normalize_manifest 182 self._should_migrate = migrate_manifest 183 self._declarative_component_schema = _get_declarative_component_schema() 184 # If custom components are needed, locate and/or register them. 185 self.components_module: ModuleType | None = get_registered_components_module(config=config) 186 # set additional attributes 187 self._debug = debug 188 self._emit_connector_builder_messages = emit_connector_builder_messages 189 self._constructor = ( 190 component_factory 191 if component_factory 192 else ModelToComponentFactory( 193 emit_connector_builder_messages=emit_connector_builder_messages, 194 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 195 ) 196 ) 197 198 self._message_repository = self._constructor.get_message_repository() 199 self._slice_logger: SliceLogger = ( 200 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 201 ) 202 203 # resolve all components in the manifest 204 self._source_config = self._pre_process_manifest(dict(source_config)) 205 # validate resolved manifest against the declarative component schema 206 self._validate_source() 207 # apply additional post-processing to the manifest 208 self._post_process_manifest() 209 210 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 211 self._spec_component: Optional[Spec] = ( 212 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 213 ) 214 self._config = self._migrate_and_transform_config(config_path, config) or {} 215 216 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 217 if concurrency_level_from_manifest: 218 concurrency_level_component = self._constructor.create_component( 219 model_type=ConcurrencyLevelModel, 220 component_definition=concurrency_level_from_manifest, 221 config=config or {}, 222 ) 223 if not isinstance(concurrency_level_component, ConcurrencyLevel): 224 raise ValueError( 225 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 226 ) 227 228 concurrency_level = concurrency_level_component.get_concurrency_level() 229 initial_number_of_partitions_to_generate = max( 230 concurrency_level // 2, 1 231 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 232 else: 233 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 234 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 235 236 self._concurrent_source = ConcurrentSource.create( 237 num_workers=concurrency_level, 238 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 239 logger=self.logger, 240 slice_logger=self._slice_logger, 241 queue=queue, 242 message_repository=self._message_repository, 243 ) 244 245 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 246 """ 247 Preprocesses the provided manifest dictionary by resolving any manifest references. 248 249 This method modifies the input manifest in place, resolving references using the 250 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 251 252 Args: 253 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 254 255 Returns: 256 None 257 """ 258 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 259 manifest = self._fix_source_type(manifest) 260 # Resolve references in the manifest 261 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 262 # Propagate types and parameters throughout the manifest 263 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 264 "", resolved_manifest, {} 265 ) 266 267 return propagated_manifest 268 269 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 270 """ 271 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 272 """ 273 if "type" not in manifest: 274 manifest["type"] = "DeclarativeSource" 275 276 return manifest 277 278 def _post_process_manifest(self) -> None: 279 """ 280 Post-processes the manifest after validation. 281 This method is responsible for any additional modifications or transformations needed 282 after the manifest has been validated and before it is used in the source. 283 """ 284 # apply manifest migration, if required 285 self._migrate_manifest() 286 # apply manifest normalization, if required 287 self._normalize_manifest() 288 289 def _migrate_manifest(self) -> None: 290 """ 291 This method is used to migrate the manifest. It should be called after the manifest has been validated. 292 The migration is done in place, so the original manifest is modified. 293 294 The original manifest is returned if any error occurs during migration. 295 """ 296 if self._should_migrate: 297 manifest_migrator = ManifestMigrationHandler(self._source_config) 298 self._source_config = manifest_migrator.apply_migrations() 299 # validate migrated manifest against the declarative component schema 300 self._validate_source() 301 302 def _normalize_manifest(self) -> None: 303 """ 304 This method is used to normalize the manifest. It should be called after the manifest has been validated. 305 306 Connector Builder UI rendering requires the manifest to be in a specific format. 307 - references have been resolved 308 - the commonly used definitions are extracted to the `definitions.linked.*` 309 """ 310 if self._should_normalize: 311 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 312 self._source_config = normalizer.normalize() 313 314 def _validate_source(self) -> None: 315 """ 316 Validates the connector manifest against the declarative component schema 317 """ 318 319 try: 320 validate(self._source_config, self._declarative_component_schema) 321 except ValidationError as e: 322 raise ValidationError( 323 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 324 ) from e 325 326 def _migrate_and_transform_config( 327 self, 328 config_path: Optional[str], 329 config: Optional[Config], 330 ) -> Optional[Config]: 331 if not config: 332 return None 333 if not self._spec_component: 334 return config 335 mutable_config = dict(config) 336 self._spec_component.migrate_config(mutable_config) 337 if mutable_config != config: 338 if config_path: 339 with open(config_path, "w") as f: 340 json.dump(mutable_config, f) 341 control_message = create_connector_config_control_message(mutable_config) 342 print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode()) 343 self._spec_component.transform_config(mutable_config) 344 return mutable_config 345 346 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 347 config = self._config or config 348 return super().configure(config, temp_dir) 349 350 @property 351 def resolved_manifest(self) -> Mapping[str, Any]: 352 """ 353 Returns the resolved manifest configuration for the source. 354 355 This property provides access to the internal source configuration as a mapping, 356 which contains all settings and parameters required to define the source's behavior. 357 358 Returns: 359 Mapping[str, Any]: The resolved source configuration manifest. 360 """ 361 return self._source_config 362 363 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 364 return self._constructor.get_model_deprecations() 365 366 def read( 367 self, 368 logger: logging.Logger, 369 config: Mapping[str, Any], 370 catalog: ConfiguredAirbyteCatalog, 371 state: Optional[List[AirbyteStateMessage]] = None, 372 ) -> Iterator[AirbyteMessage]: 373 selected_concurrent_streams = self._select_streams( 374 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 375 configured_catalog=catalog, 376 ) 377 378 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 379 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 380 if len(selected_concurrent_streams) > 0: 381 yield from self._concurrent_source.read(selected_concurrent_streams) 382 383 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 384 return AirbyteCatalog( 385 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 386 ) 387 388 # todo: add PR comment about whether we can change the signature to List[AbstractStream] 389 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 390 """ 391 The `streams` method is used as part of the AbstractSource in the following cases: 392 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 393 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 394 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 395 396 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 397 """ 398 399 if self._spec_component: 400 self._spec_component.validate_config(self._config) 401 402 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 403 404 api_budget_model = self._source_config.get("api_budget") 405 if api_budget_model: 406 self._constructor.set_api_budget(api_budget_model, self._config) 407 408 source_streams = [ 409 self._constructor.create_component( 410 ( 411 StateDelegatingStreamModel 412 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 413 else DeclarativeStreamModel 414 ), 415 stream_config, 416 self._config, 417 emit_connector_builder_messages=self._emit_connector_builder_messages, 418 ) 419 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 420 ] 421 return source_streams 422 423 @staticmethod 424 def _initialize_cache_for_parent_streams( 425 stream_configs: List[Dict[str, Any]], 426 ) -> List[Dict[str, Any]]: 427 """Enable caching for parent streams unless explicitly disabled. 428 429 Caching is enabled by default for parent streams to optimize performance when the same 430 parent data is needed by multiple child streams. However, explicit `use_cache: false` 431 settings are respected for streams that cannot use caching (e.g., scroll-based pagination 432 APIs where caching causes duplicate records). 433 """ 434 parent_streams = set() 435 436 def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None: 437 """Set use_cache to True only if not explicitly disabled.""" 438 if requester.get("use_cache") is not False: 439 requester["use_cache"] = True 440 441 def update_with_cache_parent_configs( 442 parent_configs: list[dict[str, Any]], 443 ) -> None: 444 for parent_config in parent_configs: 445 parent_streams.add(parent_config["stream"]["name"]) 446 if parent_config["stream"]["type"] == "StateDelegatingStream": 447 _set_cache_if_not_disabled( 448 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"] 449 ) 450 _set_cache_if_not_disabled( 451 parent_config["stream"]["incremental_stream"]["retriever"]["requester"] 452 ) 453 else: 454 _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"]) 455 456 for stream_config in stream_configs: 457 if stream_config.get("incremental_sync", {}).get("parent_stream"): 458 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 459 _set_cache_if_not_disabled( 460 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"] 461 ) 462 463 elif stream_config.get("retriever", {}).get("partition_router", {}): 464 partition_router = stream_config["retriever"]["partition_router"] 465 466 if isinstance(partition_router, dict) and partition_router.get( 467 "parent_stream_configs" 468 ): 469 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 470 elif isinstance(partition_router, list): 471 for router in partition_router: 472 if router.get("parent_stream_configs"): 473 update_with_cache_parent_configs(router["parent_stream_configs"]) 474 475 for stream_config in stream_configs: 476 if stream_config["name"] in parent_streams: 477 if stream_config["type"] == "StateDelegatingStream": 478 _set_cache_if_not_disabled( 479 stream_config["full_refresh_stream"]["retriever"]["requester"] 480 ) 481 _set_cache_if_not_disabled( 482 stream_config["incremental_stream"]["retriever"]["requester"] 483 ) 484 else: 485 _set_cache_if_not_disabled(stream_config["retriever"]["requester"]) 486 return stream_configs 487 488 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 489 """ 490 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 491 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 492 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 493 in the project root. 494 """ 495 return ( 496 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 497 ) 498 499 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 500 check = self._source_config.get("check") 501 if not check: 502 raise ValueError(f"Missing 'check' component definition within the manifest.") 503 504 if "type" not in check: 505 check["type"] = "CheckStream" 506 connection_checker = self._constructor.create_component( 507 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 508 check, 509 dict(), 510 emit_connector_builder_messages=self._emit_connector_builder_messages, 511 ) 512 if not isinstance(connection_checker, ConnectionChecker): 513 raise ValueError( 514 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 515 ) 516 517 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 518 if not check_succeeded: 519 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 520 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 521 522 @property 523 def dynamic_streams(self) -> List[Dict[str, Any]]: 524 return self._dynamic_stream_configs( 525 manifest=self._source_config, 526 with_dynamic_stream_name=True, 527 ) 528 529 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 530 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 531 stream_configs = [] 532 for current_stream_config in manifest.get("streams", []): 533 if ( 534 "type" in current_stream_config 535 and current_stream_config["type"] == "ConditionalStreams" 536 ): 537 interpolated_boolean = InterpolatedBoolean( 538 condition=current_stream_config.get("condition"), 539 parameters={}, 540 ) 541 542 if interpolated_boolean.eval(config=self._config): 543 stream_configs.extend(current_stream_config.get("streams", [])) 544 else: 545 if "type" not in current_stream_config: 546 current_stream_config["type"] = "DeclarativeStream" 547 stream_configs.append(current_stream_config) 548 return stream_configs 549 550 def _dynamic_stream_configs( 551 self, 552 manifest: Mapping[str, Any], 553 with_dynamic_stream_name: Optional[bool] = None, 554 ) -> List[Dict[str, Any]]: 555 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 556 dynamic_stream_configs: List[Dict[str, Any]] = [] 557 seen_dynamic_streams: Set[str] = set() 558 559 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 560 components_resolver_config = dynamic_definition["components_resolver"] 561 562 if not components_resolver_config: 563 raise ValueError( 564 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 565 ) 566 567 resolver_type = components_resolver_config.get("type") 568 if not resolver_type: 569 raise ValueError( 570 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 571 ) 572 573 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 574 raise ValueError( 575 f"Invalid components resolver type '{resolver_type}'. " 576 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 577 ) 578 579 if "retriever" in components_resolver_config: 580 components_resolver_config["retriever"]["requester"]["use_cache"] = True 581 582 # Create a resolver for dynamic components based on type 583 if resolver_type == "HttpComponentsResolver": 584 components_resolver = self._constructor.create_component( 585 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 586 component_definition=components_resolver_config, 587 config=self._config, 588 stream_name=dynamic_definition.get("name"), 589 ) 590 else: 591 components_resolver = self._constructor.create_component( 592 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 593 component_definition=components_resolver_config, 594 config=self._config, 595 ) 596 597 stream_template_config = dynamic_definition["stream_template"] 598 599 for dynamic_stream in components_resolver.resolve_components( 600 stream_template_config=stream_template_config 601 ): 602 # Get the use_parent_parameters configuration from the dynamic definition 603 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 604 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 605 606 dynamic_stream = { 607 **ManifestComponentTransformer().propagate_types_and_parameters( 608 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 609 ) 610 } 611 612 if "type" not in dynamic_stream: 613 dynamic_stream["type"] = "DeclarativeStream" 614 615 # Ensure that each stream is created with a unique name 616 name = dynamic_stream.get("name") 617 618 if with_dynamic_stream_name: 619 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 620 "name", f"dynamic_stream_{dynamic_definition_index}" 621 ) 622 623 if not isinstance(name, str): 624 raise ValueError( 625 f"Expected stream name {name} to be a string, got {type(name)}." 626 ) 627 628 if name in seen_dynamic_streams: 629 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 630 failure_type = FailureType.system_error 631 632 if resolver_type == "ConfigComponentsResolver": 633 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 634 failure_type = FailureType.config_error 635 636 raise AirbyteTracedException( 637 message=error_message, 638 internal_message=error_message, 639 failure_type=failure_type, 640 ) 641 642 seen_dynamic_streams.add(name) 643 dynamic_stream_configs.append(dynamic_stream) 644 645 return dynamic_stream_configs 646 647 def _select_streams( 648 self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 649 ) -> List[AbstractStream]: 650 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 651 abstract_streams: List[AbstractStream] = [] 652 for configured_stream in configured_catalog.streams: 653 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 654 if stream_instance: 655 abstract_streams.append(stream_instance) 656 else: 657 # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if 658 # the source was configured with raise_exception_on_missing_stream=True. This was used on very 659 # few sources like facebook-marketing and google-ads. We decided not to port this feature over, 660 # but we can do so if we feel it necessary. With the current behavior,we should still result 661 # in a partial failure since missing streams will be marked as INCOMPLETE. 662 self._message_repository.emit_message( 663 as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) 664 ) 665 return abstract_streams
96@dataclass 97class TestLimits: 98 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 99 100 DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 101 DEFAULT_MAX_SLICES: ClassVar[int] = 5 102 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 103 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 104 105 max_records: int = field(default=DEFAULT_MAX_RECORDS) 106 max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) 107 max_slices: int = field(default=DEFAULT_MAX_SLICES) 108 max_streams: int = field(default=DEFAULT_MAX_STREAMS)
129class ConcurrentDeclarativeSource(Source): 130 # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock 131 # because it has hit the limit of futures but not partition reader is consuming them. 132 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 133 134 def __init__( 135 self, 136 catalog: Optional[ConfiguredAirbyteCatalog] = None, 137 config: Optional[Mapping[str, Any]] = None, 138 state: Optional[List[AirbyteStateMessage]] = None, 139 *, 140 source_config: ConnectionDefinition, 141 debug: bool = False, 142 emit_connector_builder_messages: bool = False, 143 migrate_manifest: bool = False, 144 normalize_manifest: bool = False, 145 limits: Optional[TestLimits] = None, 146 config_path: Optional[str] = None, 147 **kwargs: Any, 148 ) -> None: 149 self.logger = logging.getLogger(f"airbyte.{self.name}") 150 151 self._limits = limits 152 153 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 154 # no longer needs to store the original incoming state. But maybe there's an edge case? 155 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 156 157 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 158 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 159 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 160 # information and might even need to be configurable depending on the source 161 queue: Queue[QueueItem] = Queue(maxsize=10_000) 162 message_repository = InMemoryMessageRepository( 163 Level.DEBUG if emit_connector_builder_messages else Level.INFO 164 ) 165 166 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 167 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 168 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 169 # incremental streams running in full refresh. 170 component_factory = ModelToComponentFactory( 171 emit_connector_builder_messages=emit_connector_builder_messages, 172 message_repository=ConcurrentMessageRepository(queue, message_repository), 173 configured_catalog=catalog, 174 connector_state_manager=self._connector_state_manager, 175 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 176 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 177 limit_slices_fetched=limits.max_slices if limits else None, 178 disable_retries=True if limits else False, 179 disable_cache=True if limits else False, 180 ) 181 182 self._should_normalize = normalize_manifest 183 self._should_migrate = migrate_manifest 184 self._declarative_component_schema = _get_declarative_component_schema() 185 # If custom components are needed, locate and/or register them. 186 self.components_module: ModuleType | None = get_registered_components_module(config=config) 187 # set additional attributes 188 self._debug = debug 189 self._emit_connector_builder_messages = emit_connector_builder_messages 190 self._constructor = ( 191 component_factory 192 if component_factory 193 else ModelToComponentFactory( 194 emit_connector_builder_messages=emit_connector_builder_messages, 195 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 196 ) 197 ) 198 199 self._message_repository = self._constructor.get_message_repository() 200 self._slice_logger: SliceLogger = ( 201 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 202 ) 203 204 # resolve all components in the manifest 205 self._source_config = self._pre_process_manifest(dict(source_config)) 206 # validate resolved manifest against the declarative component schema 207 self._validate_source() 208 # apply additional post-processing to the manifest 209 self._post_process_manifest() 210 211 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 212 self._spec_component: Optional[Spec] = ( 213 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 214 ) 215 self._config = self._migrate_and_transform_config(config_path, config) or {} 216 217 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 218 if concurrency_level_from_manifest: 219 concurrency_level_component = self._constructor.create_component( 220 model_type=ConcurrencyLevelModel, 221 component_definition=concurrency_level_from_manifest, 222 config=config or {}, 223 ) 224 if not isinstance(concurrency_level_component, ConcurrencyLevel): 225 raise ValueError( 226 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 227 ) 228 229 concurrency_level = concurrency_level_component.get_concurrency_level() 230 initial_number_of_partitions_to_generate = max( 231 concurrency_level // 2, 1 232 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 233 else: 234 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 235 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 236 237 self._concurrent_source = ConcurrentSource.create( 238 num_workers=concurrency_level, 239 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 240 logger=self.logger, 241 slice_logger=self._slice_logger, 242 queue=queue, 243 message_repository=self._message_repository, 244 ) 245 246 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 247 """ 248 Preprocesses the provided manifest dictionary by resolving any manifest references. 249 250 This method modifies the input manifest in place, resolving references using the 251 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 252 253 Args: 254 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 255 256 Returns: 257 None 258 """ 259 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 260 manifest = self._fix_source_type(manifest) 261 # Resolve references in the manifest 262 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 263 # Propagate types and parameters throughout the manifest 264 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 265 "", resolved_manifest, {} 266 ) 267 268 return propagated_manifest 269 270 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 271 """ 272 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 273 """ 274 if "type" not in manifest: 275 manifest["type"] = "DeclarativeSource" 276 277 return manifest 278 279 def _post_process_manifest(self) -> None: 280 """ 281 Post-processes the manifest after validation. 282 This method is responsible for any additional modifications or transformations needed 283 after the manifest has been validated and before it is used in the source. 284 """ 285 # apply manifest migration, if required 286 self._migrate_manifest() 287 # apply manifest normalization, if required 288 self._normalize_manifest() 289 290 def _migrate_manifest(self) -> None: 291 """ 292 This method is used to migrate the manifest. It should be called after the manifest has been validated. 293 The migration is done in place, so the original manifest is modified. 294 295 The original manifest is returned if any error occurs during migration. 296 """ 297 if self._should_migrate: 298 manifest_migrator = ManifestMigrationHandler(self._source_config) 299 self._source_config = manifest_migrator.apply_migrations() 300 # validate migrated manifest against the declarative component schema 301 self._validate_source() 302 303 def _normalize_manifest(self) -> None: 304 """ 305 This method is used to normalize the manifest. It should be called after the manifest has been validated. 306 307 Connector Builder UI rendering requires the manifest to be in a specific format. 308 - references have been resolved 309 - the commonly used definitions are extracted to the `definitions.linked.*` 310 """ 311 if self._should_normalize: 312 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 313 self._source_config = normalizer.normalize() 314 315 def _validate_source(self) -> None: 316 """ 317 Validates the connector manifest against the declarative component schema 318 """ 319 320 try: 321 validate(self._source_config, self._declarative_component_schema) 322 except ValidationError as e: 323 raise ValidationError( 324 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 325 ) from e 326 327 def _migrate_and_transform_config( 328 self, 329 config_path: Optional[str], 330 config: Optional[Config], 331 ) -> Optional[Config]: 332 if not config: 333 return None 334 if not self._spec_component: 335 return config 336 mutable_config = dict(config) 337 self._spec_component.migrate_config(mutable_config) 338 if mutable_config != config: 339 if config_path: 340 with open(config_path, "w") as f: 341 json.dump(mutable_config, f) 342 control_message = create_connector_config_control_message(mutable_config) 343 print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode()) 344 self._spec_component.transform_config(mutable_config) 345 return mutable_config 346 347 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 348 config = self._config or config 349 return super().configure(config, temp_dir) 350 351 @property 352 def resolved_manifest(self) -> Mapping[str, Any]: 353 """ 354 Returns the resolved manifest configuration for the source. 355 356 This property provides access to the internal source configuration as a mapping, 357 which contains all settings and parameters required to define the source's behavior. 358 359 Returns: 360 Mapping[str, Any]: The resolved source configuration manifest. 361 """ 362 return self._source_config 363 364 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 365 return self._constructor.get_model_deprecations() 366 367 def read( 368 self, 369 logger: logging.Logger, 370 config: Mapping[str, Any], 371 catalog: ConfiguredAirbyteCatalog, 372 state: Optional[List[AirbyteStateMessage]] = None, 373 ) -> Iterator[AirbyteMessage]: 374 selected_concurrent_streams = self._select_streams( 375 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 376 configured_catalog=catalog, 377 ) 378 379 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 380 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 381 if len(selected_concurrent_streams) > 0: 382 yield from self._concurrent_source.read(selected_concurrent_streams) 383 384 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 385 return AirbyteCatalog( 386 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 387 ) 388 389 # todo: add PR comment about whether we can change the signature to List[AbstractStream] 390 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 391 """ 392 The `streams` method is used as part of the AbstractSource in the following cases: 393 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 394 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 395 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 396 397 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 398 """ 399 400 if self._spec_component: 401 self._spec_component.validate_config(self._config) 402 403 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 404 405 api_budget_model = self._source_config.get("api_budget") 406 if api_budget_model: 407 self._constructor.set_api_budget(api_budget_model, self._config) 408 409 source_streams = [ 410 self._constructor.create_component( 411 ( 412 StateDelegatingStreamModel 413 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 414 else DeclarativeStreamModel 415 ), 416 stream_config, 417 self._config, 418 emit_connector_builder_messages=self._emit_connector_builder_messages, 419 ) 420 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 421 ] 422 return source_streams 423 424 @staticmethod 425 def _initialize_cache_for_parent_streams( 426 stream_configs: List[Dict[str, Any]], 427 ) -> List[Dict[str, Any]]: 428 """Enable caching for parent streams unless explicitly disabled. 429 430 Caching is enabled by default for parent streams to optimize performance when the same 431 parent data is needed by multiple child streams. However, explicit `use_cache: false` 432 settings are respected for streams that cannot use caching (e.g., scroll-based pagination 433 APIs where caching causes duplicate records). 434 """ 435 parent_streams = set() 436 437 def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None: 438 """Set use_cache to True only if not explicitly disabled.""" 439 if requester.get("use_cache") is not False: 440 requester["use_cache"] = True 441 442 def update_with_cache_parent_configs( 443 parent_configs: list[dict[str, Any]], 444 ) -> None: 445 for parent_config in parent_configs: 446 parent_streams.add(parent_config["stream"]["name"]) 447 if parent_config["stream"]["type"] == "StateDelegatingStream": 448 _set_cache_if_not_disabled( 449 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"] 450 ) 451 _set_cache_if_not_disabled( 452 parent_config["stream"]["incremental_stream"]["retriever"]["requester"] 453 ) 454 else: 455 _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"]) 456 457 for stream_config in stream_configs: 458 if stream_config.get("incremental_sync", {}).get("parent_stream"): 459 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 460 _set_cache_if_not_disabled( 461 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"] 462 ) 463 464 elif stream_config.get("retriever", {}).get("partition_router", {}): 465 partition_router = stream_config["retriever"]["partition_router"] 466 467 if isinstance(partition_router, dict) and partition_router.get( 468 "parent_stream_configs" 469 ): 470 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 471 elif isinstance(partition_router, list): 472 for router in partition_router: 473 if router.get("parent_stream_configs"): 474 update_with_cache_parent_configs(router["parent_stream_configs"]) 475 476 for stream_config in stream_configs: 477 if stream_config["name"] in parent_streams: 478 if stream_config["type"] == "StateDelegatingStream": 479 _set_cache_if_not_disabled( 480 stream_config["full_refresh_stream"]["retriever"]["requester"] 481 ) 482 _set_cache_if_not_disabled( 483 stream_config["incremental_stream"]["retriever"]["requester"] 484 ) 485 else: 486 _set_cache_if_not_disabled(stream_config["retriever"]["requester"]) 487 return stream_configs 488 489 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 490 """ 491 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 492 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 493 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 494 in the project root. 495 """ 496 return ( 497 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 498 ) 499 500 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 501 check = self._source_config.get("check") 502 if not check: 503 raise ValueError(f"Missing 'check' component definition within the manifest.") 504 505 if "type" not in check: 506 check["type"] = "CheckStream" 507 connection_checker = self._constructor.create_component( 508 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 509 check, 510 dict(), 511 emit_connector_builder_messages=self._emit_connector_builder_messages, 512 ) 513 if not isinstance(connection_checker, ConnectionChecker): 514 raise ValueError( 515 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 516 ) 517 518 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 519 if not check_succeeded: 520 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 521 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 522 523 @property 524 def dynamic_streams(self) -> List[Dict[str, Any]]: 525 return self._dynamic_stream_configs( 526 manifest=self._source_config, 527 with_dynamic_stream_name=True, 528 ) 529 530 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 531 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 532 stream_configs = [] 533 for current_stream_config in manifest.get("streams", []): 534 if ( 535 "type" in current_stream_config 536 and current_stream_config["type"] == "ConditionalStreams" 537 ): 538 interpolated_boolean = InterpolatedBoolean( 539 condition=current_stream_config.get("condition"), 540 parameters={}, 541 ) 542 543 if interpolated_boolean.eval(config=self._config): 544 stream_configs.extend(current_stream_config.get("streams", [])) 545 else: 546 if "type" not in current_stream_config: 547 current_stream_config["type"] = "DeclarativeStream" 548 stream_configs.append(current_stream_config) 549 return stream_configs 550 551 def _dynamic_stream_configs( 552 self, 553 manifest: Mapping[str, Any], 554 with_dynamic_stream_name: Optional[bool] = None, 555 ) -> List[Dict[str, Any]]: 556 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 557 dynamic_stream_configs: List[Dict[str, Any]] = [] 558 seen_dynamic_streams: Set[str] = set() 559 560 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 561 components_resolver_config = dynamic_definition["components_resolver"] 562 563 if not components_resolver_config: 564 raise ValueError( 565 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 566 ) 567 568 resolver_type = components_resolver_config.get("type") 569 if not resolver_type: 570 raise ValueError( 571 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 572 ) 573 574 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 575 raise ValueError( 576 f"Invalid components resolver type '{resolver_type}'. " 577 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 578 ) 579 580 if "retriever" in components_resolver_config: 581 components_resolver_config["retriever"]["requester"]["use_cache"] = True 582 583 # Create a resolver for dynamic components based on type 584 if resolver_type == "HttpComponentsResolver": 585 components_resolver = self._constructor.create_component( 586 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 587 component_definition=components_resolver_config, 588 config=self._config, 589 stream_name=dynamic_definition.get("name"), 590 ) 591 else: 592 components_resolver = self._constructor.create_component( 593 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 594 component_definition=components_resolver_config, 595 config=self._config, 596 ) 597 598 stream_template_config = dynamic_definition["stream_template"] 599 600 for dynamic_stream in components_resolver.resolve_components( 601 stream_template_config=stream_template_config 602 ): 603 # Get the use_parent_parameters configuration from the dynamic definition 604 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 605 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 606 607 dynamic_stream = { 608 **ManifestComponentTransformer().propagate_types_and_parameters( 609 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 610 ) 611 } 612 613 if "type" not in dynamic_stream: 614 dynamic_stream["type"] = "DeclarativeStream" 615 616 # Ensure that each stream is created with a unique name 617 name = dynamic_stream.get("name") 618 619 if with_dynamic_stream_name: 620 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 621 "name", f"dynamic_stream_{dynamic_definition_index}" 622 ) 623 624 if not isinstance(name, str): 625 raise ValueError( 626 f"Expected stream name {name} to be a string, got {type(name)}." 627 ) 628 629 if name in seen_dynamic_streams: 630 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 631 failure_type = FailureType.system_error 632 633 if resolver_type == "ConfigComponentsResolver": 634 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 635 failure_type = FailureType.config_error 636 637 raise AirbyteTracedException( 638 message=error_message, 639 internal_message=error_message, 640 failure_type=failure_type, 641 ) 642 643 seen_dynamic_streams.add(name) 644 dynamic_stream_configs.append(dynamic_stream) 645 646 return dynamic_stream_configs 647 648 def _select_streams( 649 self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 650 ) -> List[AbstractStream]: 651 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 652 abstract_streams: List[AbstractStream] = [] 653 for configured_stream in configured_catalog.streams: 654 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 655 if stream_instance: 656 abstract_streams.append(stream_instance) 657 else: 658 # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if 659 # the source was configured with raise_exception_on_missing_stream=True. This was used on very 660 # few sources like facebook-marketing and google-ads. We decided not to port this feature over, 661 # but we can do so if we feel it necessary. With the current behavior,we should still result 662 # in a partial failure since missing streams will be marked as INCOMPLETE. 663 self._message_repository.emit_message( 664 as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) 665 ) 666 return abstract_streams
Helper class that provides a standard way to create an ABC using inheritance.
134 def __init__( 135 self, 136 catalog: Optional[ConfiguredAirbyteCatalog] = None, 137 config: Optional[Mapping[str, Any]] = None, 138 state: Optional[List[AirbyteStateMessage]] = None, 139 *, 140 source_config: ConnectionDefinition, 141 debug: bool = False, 142 emit_connector_builder_messages: bool = False, 143 migrate_manifest: bool = False, 144 normalize_manifest: bool = False, 145 limits: Optional[TestLimits] = None, 146 config_path: Optional[str] = None, 147 **kwargs: Any, 148 ) -> None: 149 self.logger = logging.getLogger(f"airbyte.{self.name}") 150 151 self._limits = limits 152 153 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 154 # no longer needs to store the original incoming state. But maybe there's an edge case? 155 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 156 157 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 158 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 159 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 160 # information and might even need to be configurable depending on the source 161 queue: Queue[QueueItem] = Queue(maxsize=10_000) 162 message_repository = InMemoryMessageRepository( 163 Level.DEBUG if emit_connector_builder_messages else Level.INFO 164 ) 165 166 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 167 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 168 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 169 # incremental streams running in full refresh. 170 component_factory = ModelToComponentFactory( 171 emit_connector_builder_messages=emit_connector_builder_messages, 172 message_repository=ConcurrentMessageRepository(queue, message_repository), 173 configured_catalog=catalog, 174 connector_state_manager=self._connector_state_manager, 175 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 176 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 177 limit_slices_fetched=limits.max_slices if limits else None, 178 disable_retries=True if limits else False, 179 disable_cache=True if limits else False, 180 ) 181 182 self._should_normalize = normalize_manifest 183 self._should_migrate = migrate_manifest 184 self._declarative_component_schema = _get_declarative_component_schema() 185 # If custom components are needed, locate and/or register them. 186 self.components_module: ModuleType | None = get_registered_components_module(config=config) 187 # set additional attributes 188 self._debug = debug 189 self._emit_connector_builder_messages = emit_connector_builder_messages 190 self._constructor = ( 191 component_factory 192 if component_factory 193 else ModelToComponentFactory( 194 emit_connector_builder_messages=emit_connector_builder_messages, 195 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 196 ) 197 ) 198 199 self._message_repository = self._constructor.get_message_repository() 200 self._slice_logger: SliceLogger = ( 201 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 202 ) 203 204 # resolve all components in the manifest 205 self._source_config = self._pre_process_manifest(dict(source_config)) 206 # validate resolved manifest against the declarative component schema 207 self._validate_source() 208 # apply additional post-processing to the manifest 209 self._post_process_manifest() 210 211 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 212 self._spec_component: Optional[Spec] = ( 213 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 214 ) 215 self._config = self._migrate_and_transform_config(config_path, config) or {} 216 217 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 218 if concurrency_level_from_manifest: 219 concurrency_level_component = self._constructor.create_component( 220 model_type=ConcurrencyLevelModel, 221 component_definition=concurrency_level_from_manifest, 222 config=config or {}, 223 ) 224 if not isinstance(concurrency_level_component, ConcurrencyLevel): 225 raise ValueError( 226 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 227 ) 228 229 concurrency_level = concurrency_level_component.get_concurrency_level() 230 initial_number_of_partitions_to_generate = max( 231 concurrency_level // 2, 1 232 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 233 else: 234 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 235 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 236 237 self._concurrent_source = ConcurrentSource.create( 238 num_workers=concurrency_level, 239 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 240 logger=self.logger, 241 slice_logger=self._slice_logger, 242 queue=queue, 243 message_repository=self._message_repository, 244 )
351 @property 352 def resolved_manifest(self) -> Mapping[str, Any]: 353 """ 354 Returns the resolved manifest configuration for the source. 355 356 This property provides access to the internal source configuration as a mapping, 357 which contains all settings and parameters required to define the source's behavior. 358 359 Returns: 360 Mapping[str, Any]: The resolved source configuration manifest. 361 """ 362 return self._source_config
Returns the resolved manifest configuration for the source.
This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.
Returns:
Mapping[str, Any]: The resolved source configuration manifest.
367 def read( 368 self, 369 logger: logging.Logger, 370 config: Mapping[str, Any], 371 catalog: ConfiguredAirbyteCatalog, 372 state: Optional[List[AirbyteStateMessage]] = None, 373 ) -> Iterator[AirbyteMessage]: 374 selected_concurrent_streams = self._select_streams( 375 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 376 configured_catalog=catalog, 377 ) 378 379 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 380 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 381 if len(selected_concurrent_streams) > 0: 382 yield from self._concurrent_source.read(selected_concurrent_streams)
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
384 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 385 return AirbyteCatalog( 386 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 387 )
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
390 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 391 """ 392 The `streams` method is used as part of the AbstractSource in the following cases: 393 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 394 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 395 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 396 397 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 398 """ 399 400 if self._spec_component: 401 self._spec_component.validate_config(self._config) 402 403 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 404 405 api_budget_model = self._source_config.get("api_budget") 406 if api_budget_model: 407 self._constructor.set_api_budget(api_budget_model, self._config) 408 409 source_streams = [ 410 self._constructor.create_component( 411 ( 412 StateDelegatingStreamModel 413 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 414 else DeclarativeStreamModel 415 ), 416 stream_config, 417 self._config, 418 emit_connector_builder_messages=self._emit_connector_builder_messages, 419 ) 420 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 421 ] 422 return source_streams
The streams method is used as part of the AbstractSource in the following cases:
- ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
- ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by
streams) Note thatsuper.streams(config)is also called when splitting the streams between concurrent or not in_group_streams.
In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
489 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 490 """ 491 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 492 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 493 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 494 in the project root. 495 """ 496 return ( 497 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 498 )
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
500 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 501 check = self._source_config.get("check") 502 if not check: 503 raise ValueError(f"Missing 'check' component definition within the manifest.") 504 505 if "type" not in check: 506 check["type"] = "CheckStream" 507 connection_checker = self._constructor.create_component( 508 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 509 check, 510 dict(), 511 emit_connector_builder_messages=self._emit_connector_builder_messages, 512 ) 513 if not isinstance(connection_checker, ConnectionChecker): 514 raise ValueError( 515 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 516 ) 517 518 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 519 if not check_succeeded: 520 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 521 return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.