airbyte_cdk.sources.declarative.concurrent_declarative_source
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3import json 4import logging 5import pkgutil 6from copy import deepcopy 7from dataclasses import dataclass, field 8from queue import Queue 9from types import ModuleType 10from typing import ( 11 Any, 12 ClassVar, 13 Dict, 14 Iterator, 15 List, 16 Mapping, 17 Optional, 18 Set, 19) 20 21import orjson 22import yaml 23from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor 24from jsonschema.exceptions import ValidationError 25from jsonschema.validators import validate 26 27from airbyte_cdk.config_observation import create_connector_config_control_message 28from airbyte_cdk.connector_builder.models import ( 29 LogMessage as ConnectorBuilderLogMessage, 30) 31from airbyte_cdk.manifest_migrations.migration_handler import ( 32 ManifestMigrationHandler, 33) 34from airbyte_cdk.models import ( 35 AirbyteCatalog, 36 AirbyteConnectionStatus, 37 AirbyteMessage, 38 AirbyteStateMessage, 39 ConfiguredAirbyteCatalog, 40 ConnectorSpecification, 41 FailureType, 42 Status, 43) 44from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer 45from airbyte_cdk.sources import Source 46from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 47from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 48from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 49from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 50from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 51from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean 52from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 53 ConcurrencyLevel as ConcurrencyLevelModel, 54) 55from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 56 DeclarativeStream as DeclarativeStreamModel, 57) 58from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 59 Spec as SpecModel, 60) 61from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 62 StateDelegatingStream as StateDelegatingStreamModel, 63) 64from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 65 get_registered_components_module, 66) 67from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 68 ManifestComponentTransformer, 69) 70from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( 71 ManifestNormalizer, 72) 73from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 74 ManifestReferenceResolver, 75) 76from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 77 ModelToComponentFactory, 78) 79from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 80from airbyte_cdk.sources.declarative.spec.spec import Spec 81from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition 82from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository 83from airbyte_cdk.sources.message.repository import InMemoryMessageRepository 84from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 85from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem 86from airbyte_cdk.sources.utils.slice_logger import ( 87 AlwaysLogSliceLogger, 88 DebugSliceLogger, 89 SliceLogger, 90) 91from airbyte_cdk.utils.stream_status_utils import as_airbyte_message 92from airbyte_cdk.utils.traced_exception import AirbyteTracedException 93 94 95@dataclass 96class TestLimits: 97 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 98 99 DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 100 DEFAULT_MAX_SLICES: ClassVar[int] = 5 101 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 102 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 103 104 max_records: int = field(default=DEFAULT_MAX_RECORDS) 105 max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) 106 max_slices: int = field(default=DEFAULT_MAX_SLICES) 107 max_streams: int = field(default=DEFAULT_MAX_STREAMS) 108 109 110def _get_declarative_component_schema() -> Dict[str, Any]: 111 try: 112 raw_component_schema = pkgutil.get_data( 113 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 114 ) 115 if raw_component_schema is not None: 116 declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) 117 return declarative_component_schema # type: ignore 118 else: 119 raise RuntimeError( 120 "Failed to read manifest component json schema required for deduplication" 121 ) 122 except FileNotFoundError as e: 123 raise FileNotFoundError( 124 f"Failed to read manifest component json schema required for deduplication: {e}" 125 ) 126 127 128class ConcurrentDeclarativeSource(Source): 129 # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock 130 # because it has hit the limit of futures but not partition reader is consuming them. 131 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 132 133 def __init__( 134 self, 135 catalog: Optional[ConfiguredAirbyteCatalog] = None, 136 config: Optional[Mapping[str, Any]] = None, 137 state: Optional[List[AirbyteStateMessage]] = None, 138 *, 139 source_config: ConnectionDefinition, 140 debug: bool = False, 141 emit_connector_builder_messages: bool = False, 142 migrate_manifest: bool = False, 143 normalize_manifest: bool = False, 144 limits: Optional[TestLimits] = None, 145 config_path: Optional[str] = None, 146 **kwargs: Any, 147 ) -> None: 148 self.logger = logging.getLogger(f"airbyte.{self.name}") 149 150 self._limits = limits 151 152 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 153 # no longer needs to store the original incoming state. But maybe there's an edge case? 154 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 155 156 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 157 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 158 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 159 # information and might even need to be configurable depending on the source 160 queue: Queue[QueueItem] = Queue(maxsize=10_000) 161 message_repository = InMemoryMessageRepository( 162 Level.DEBUG if emit_connector_builder_messages else Level.INFO 163 ) 164 165 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 166 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 167 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 168 # incremental streams running in full refresh. 169 component_factory = ModelToComponentFactory( 170 emit_connector_builder_messages=emit_connector_builder_messages, 171 message_repository=ConcurrentMessageRepository(queue, message_repository), 172 connector_state_manager=self._connector_state_manager, 173 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 174 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 175 limit_slices_fetched=limits.max_slices if limits else None, 176 disable_retries=True if limits else False, 177 disable_cache=True if limits else False, 178 ) 179 180 self._should_normalize = normalize_manifest 181 self._should_migrate = migrate_manifest 182 self._declarative_component_schema = _get_declarative_component_schema() 183 # If custom components are needed, locate and/or register them. 184 self.components_module: ModuleType | None = get_registered_components_module(config=config) 185 # set additional attributes 186 self._debug = debug 187 self._emit_connector_builder_messages = emit_connector_builder_messages 188 self._constructor = ( 189 component_factory 190 if component_factory 191 else ModelToComponentFactory( 192 emit_connector_builder_messages=emit_connector_builder_messages, 193 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 194 ) 195 ) 196 197 self._message_repository = self._constructor.get_message_repository() 198 self._slice_logger: SliceLogger = ( 199 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 200 ) 201 202 # resolve all components in the manifest 203 self._source_config = self._pre_process_manifest(dict(source_config)) 204 # validate resolved manifest against the declarative component schema 205 self._validate_source() 206 # apply additional post-processing to the manifest 207 self._post_process_manifest() 208 209 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 210 self._spec_component: Optional[Spec] = ( 211 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 212 ) 213 self._config = self._migrate_and_transform_config(config_path, config) or {} 214 215 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 216 if concurrency_level_from_manifest: 217 concurrency_level_component = self._constructor.create_component( 218 model_type=ConcurrencyLevelModel, 219 component_definition=concurrency_level_from_manifest, 220 config=config or {}, 221 ) 222 if not isinstance(concurrency_level_component, ConcurrencyLevel): 223 raise ValueError( 224 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 225 ) 226 227 concurrency_level = concurrency_level_component.get_concurrency_level() 228 initial_number_of_partitions_to_generate = max( 229 concurrency_level // 2, 1 230 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 231 else: 232 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 233 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 234 235 self._concurrent_source = ConcurrentSource.create( 236 num_workers=concurrency_level, 237 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 238 logger=self.logger, 239 slice_logger=self._slice_logger, 240 queue=queue, 241 message_repository=self._message_repository, 242 ) 243 244 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 245 """ 246 Preprocesses the provided manifest dictionary by resolving any manifest references. 247 248 This method modifies the input manifest in place, resolving references using the 249 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 250 251 Args: 252 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 253 254 Returns: 255 None 256 """ 257 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 258 manifest = self._fix_source_type(manifest) 259 # Resolve references in the manifest 260 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 261 # Propagate types and parameters throughout the manifest 262 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 263 "", resolved_manifest, {} 264 ) 265 266 return propagated_manifest 267 268 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 269 """ 270 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 271 """ 272 if "type" not in manifest: 273 manifest["type"] = "DeclarativeSource" 274 275 return manifest 276 277 def _post_process_manifest(self) -> None: 278 """ 279 Post-processes the manifest after validation. 280 This method is responsible for any additional modifications or transformations needed 281 after the manifest has been validated and before it is used in the source. 282 """ 283 # apply manifest migration, if required 284 self._migrate_manifest() 285 # apply manifest normalization, if required 286 self._normalize_manifest() 287 288 def _migrate_manifest(self) -> None: 289 """ 290 This method is used to migrate the manifest. It should be called after the manifest has been validated. 291 The migration is done in place, so the original manifest is modified. 292 293 The original manifest is returned if any error occurs during migration. 294 """ 295 if self._should_migrate: 296 manifest_migrator = ManifestMigrationHandler(self._source_config) 297 self._source_config = manifest_migrator.apply_migrations() 298 # validate migrated manifest against the declarative component schema 299 self._validate_source() 300 301 def _normalize_manifest(self) -> None: 302 """ 303 This method is used to normalize the manifest. It should be called after the manifest has been validated. 304 305 Connector Builder UI rendering requires the manifest to be in a specific format. 306 - references have been resolved 307 - the commonly used definitions are extracted to the `definitions.linked.*` 308 """ 309 if self._should_normalize: 310 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 311 self._source_config = normalizer.normalize() 312 313 def _validate_source(self) -> None: 314 """ 315 Validates the connector manifest against the declarative component schema 316 """ 317 318 try: 319 validate(self._source_config, self._declarative_component_schema) 320 except ValidationError as e: 321 raise ValidationError( 322 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 323 ) from e 324 325 def _migrate_and_transform_config( 326 self, 327 config_path: Optional[str], 328 config: Optional[Config], 329 ) -> Optional[Config]: 330 if not config: 331 return None 332 if not self._spec_component: 333 return config 334 mutable_config = dict(config) 335 self._spec_component.migrate_config(mutable_config) 336 if mutable_config != config: 337 if config_path: 338 with open(config_path, "w") as f: 339 json.dump(mutable_config, f) 340 control_message = create_connector_config_control_message(mutable_config) 341 print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode()) 342 self._spec_component.transform_config(mutable_config) 343 return mutable_config 344 345 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 346 config = self._config or config 347 return super().configure(config, temp_dir) 348 349 @property 350 def resolved_manifest(self) -> Mapping[str, Any]: 351 """ 352 Returns the resolved manifest configuration for the source. 353 354 This property provides access to the internal source configuration as a mapping, 355 which contains all settings and parameters required to define the source's behavior. 356 357 Returns: 358 Mapping[str, Any]: The resolved source configuration manifest. 359 """ 360 return self._source_config 361 362 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 363 return self._constructor.get_model_deprecations() 364 365 def read( 366 self, 367 logger: logging.Logger, 368 config: Mapping[str, Any], 369 catalog: ConfiguredAirbyteCatalog, 370 state: Optional[List[AirbyteStateMessage]] = None, 371 ) -> Iterator[AirbyteMessage]: 372 selected_concurrent_streams = self._select_streams( 373 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 374 configured_catalog=catalog, 375 ) 376 377 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 378 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 379 if len(selected_concurrent_streams) > 0: 380 yield from self._concurrent_source.read(selected_concurrent_streams) 381 382 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 383 return AirbyteCatalog( 384 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 385 ) 386 387 # todo: add PR comment about whether we can change the signature to List[AbstractStream] 388 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 389 """ 390 The `streams` method is used as part of the AbstractSource in the following cases: 391 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 392 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 393 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 394 395 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 396 """ 397 398 if self._spec_component: 399 self._spec_component.validate_config(self._config) 400 401 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 402 403 api_budget_model = self._source_config.get("api_budget") 404 if api_budget_model: 405 self._constructor.set_api_budget(api_budget_model, self._config) 406 407 source_streams = [ 408 self._constructor.create_component( 409 ( 410 StateDelegatingStreamModel 411 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 412 else DeclarativeStreamModel 413 ), 414 stream_config, 415 self._config, 416 emit_connector_builder_messages=self._emit_connector_builder_messages, 417 ) 418 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 419 ] 420 return source_streams 421 422 @staticmethod 423 def _initialize_cache_for_parent_streams( 424 stream_configs: List[Dict[str, Any]], 425 ) -> List[Dict[str, Any]]: 426 parent_streams = set() 427 428 def update_with_cache_parent_configs( 429 parent_configs: list[dict[str, Any]], 430 ) -> None: 431 for parent_config in parent_configs: 432 parent_streams.add(parent_config["stream"]["name"]) 433 if parent_config["stream"]["type"] == "StateDelegatingStream": 434 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 435 "use_cache" 436 ] = True 437 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 438 "use_cache" 439 ] = True 440 else: 441 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 442 443 for stream_config in stream_configs: 444 if stream_config.get("incremental_sync", {}).get("parent_stream"): 445 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 446 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 447 "use_cache" 448 ] = True 449 450 elif stream_config.get("retriever", {}).get("partition_router", {}): 451 partition_router = stream_config["retriever"]["partition_router"] 452 453 if isinstance(partition_router, dict) and partition_router.get( 454 "parent_stream_configs" 455 ): 456 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 457 elif isinstance(partition_router, list): 458 for router in partition_router: 459 if router.get("parent_stream_configs"): 460 update_with_cache_parent_configs(router["parent_stream_configs"]) 461 462 for stream_config in stream_configs: 463 if stream_config["name"] in parent_streams: 464 if stream_config["type"] == "StateDelegatingStream": 465 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 466 True 467 ) 468 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 469 True 470 ) 471 else: 472 stream_config["retriever"]["requester"]["use_cache"] = True 473 return stream_configs 474 475 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 476 """ 477 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 478 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 479 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 480 in the project root. 481 """ 482 return ( 483 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 484 ) 485 486 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 487 check = self._source_config.get("check") 488 if not check: 489 raise ValueError(f"Missing 'check' component definition within the manifest.") 490 491 if "type" not in check: 492 check["type"] = "CheckStream" 493 connection_checker = self._constructor.create_component( 494 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 495 check, 496 dict(), 497 emit_connector_builder_messages=self._emit_connector_builder_messages, 498 ) 499 if not isinstance(connection_checker, ConnectionChecker): 500 raise ValueError( 501 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 502 ) 503 504 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 505 if not check_succeeded: 506 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 507 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 508 509 @property 510 def dynamic_streams(self) -> List[Dict[str, Any]]: 511 return self._dynamic_stream_configs( 512 manifest=self._source_config, 513 with_dynamic_stream_name=True, 514 ) 515 516 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 517 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 518 stream_configs = [] 519 for current_stream_config in manifest.get("streams", []): 520 if ( 521 "type" in current_stream_config 522 and current_stream_config["type"] == "ConditionalStreams" 523 ): 524 interpolated_boolean = InterpolatedBoolean( 525 condition=current_stream_config.get("condition"), 526 parameters={}, 527 ) 528 529 if interpolated_boolean.eval(config=self._config): 530 stream_configs.extend(current_stream_config.get("streams", [])) 531 else: 532 if "type" not in current_stream_config: 533 current_stream_config["type"] = "DeclarativeStream" 534 stream_configs.append(current_stream_config) 535 return stream_configs 536 537 def _dynamic_stream_configs( 538 self, 539 manifest: Mapping[str, Any], 540 with_dynamic_stream_name: Optional[bool] = None, 541 ) -> List[Dict[str, Any]]: 542 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 543 dynamic_stream_configs: List[Dict[str, Any]] = [] 544 seen_dynamic_streams: Set[str] = set() 545 546 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 547 components_resolver_config = dynamic_definition["components_resolver"] 548 549 if not components_resolver_config: 550 raise ValueError( 551 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 552 ) 553 554 resolver_type = components_resolver_config.get("type") 555 if not resolver_type: 556 raise ValueError( 557 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 558 ) 559 560 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 561 raise ValueError( 562 f"Invalid components resolver type '{resolver_type}'. " 563 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 564 ) 565 566 if "retriever" in components_resolver_config: 567 components_resolver_config["retriever"]["requester"]["use_cache"] = True 568 569 # Create a resolver for dynamic components based on type 570 if resolver_type == "HttpComponentsResolver": 571 components_resolver = self._constructor.create_component( 572 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 573 component_definition=components_resolver_config, 574 config=self._config, 575 stream_name=dynamic_definition.get("name"), 576 ) 577 else: 578 components_resolver = self._constructor.create_component( 579 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 580 component_definition=components_resolver_config, 581 config=self._config, 582 ) 583 584 stream_template_config = dynamic_definition["stream_template"] 585 586 for dynamic_stream in components_resolver.resolve_components( 587 stream_template_config=stream_template_config 588 ): 589 # Get the use_parent_parameters configuration from the dynamic definition 590 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 591 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 592 593 dynamic_stream = { 594 **ManifestComponentTransformer().propagate_types_and_parameters( 595 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 596 ) 597 } 598 599 if "type" not in dynamic_stream: 600 dynamic_stream["type"] = "DeclarativeStream" 601 602 # Ensure that each stream is created with a unique name 603 name = dynamic_stream.get("name") 604 605 if with_dynamic_stream_name: 606 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 607 "name", f"dynamic_stream_{dynamic_definition_index}" 608 ) 609 610 if not isinstance(name, str): 611 raise ValueError( 612 f"Expected stream name {name} to be a string, got {type(name)}." 613 ) 614 615 if name in seen_dynamic_streams: 616 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 617 failure_type = FailureType.system_error 618 619 if resolver_type == "ConfigComponentsResolver": 620 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 621 failure_type = FailureType.config_error 622 623 raise AirbyteTracedException( 624 message=error_message, 625 internal_message=error_message, 626 failure_type=failure_type, 627 ) 628 629 seen_dynamic_streams.add(name) 630 dynamic_stream_configs.append(dynamic_stream) 631 632 return dynamic_stream_configs 633 634 def _select_streams( 635 self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 636 ) -> List[AbstractStream]: 637 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 638 abstract_streams: List[AbstractStream] = [] 639 for configured_stream in configured_catalog.streams: 640 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 641 if stream_instance: 642 abstract_streams.append(stream_instance) 643 else: 644 # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if 645 # the source was configured with raise_exception_on_missing_stream=True. This was used on very 646 # few sources like facebook-marketing and google-ads. We decided not to port this feature over, 647 # but we can do so if we feel it necessary. With the current behavior,we should still result 648 # in a partial failure since missing streams will be marked as INCOMPLETE. 649 self._message_repository.emit_message( 650 as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) 651 ) 652 return abstract_streams
96@dataclass 97class TestLimits: 98 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 99 100 DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 101 DEFAULT_MAX_SLICES: ClassVar[int] = 5 102 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 103 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 104 105 max_records: int = field(default=DEFAULT_MAX_RECORDS) 106 max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) 107 max_slices: int = field(default=DEFAULT_MAX_SLICES) 108 max_streams: int = field(default=DEFAULT_MAX_STREAMS)
129class ConcurrentDeclarativeSource(Source): 130 # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock 131 # because it has hit the limit of futures but not partition reader is consuming them. 132 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 133 134 def __init__( 135 self, 136 catalog: Optional[ConfiguredAirbyteCatalog] = None, 137 config: Optional[Mapping[str, Any]] = None, 138 state: Optional[List[AirbyteStateMessage]] = None, 139 *, 140 source_config: ConnectionDefinition, 141 debug: bool = False, 142 emit_connector_builder_messages: bool = False, 143 migrate_manifest: bool = False, 144 normalize_manifest: bool = False, 145 limits: Optional[TestLimits] = None, 146 config_path: Optional[str] = None, 147 **kwargs: Any, 148 ) -> None: 149 self.logger = logging.getLogger(f"airbyte.{self.name}") 150 151 self._limits = limits 152 153 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 154 # no longer needs to store the original incoming state. But maybe there's an edge case? 155 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 156 157 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 158 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 159 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 160 # information and might even need to be configurable depending on the source 161 queue: Queue[QueueItem] = Queue(maxsize=10_000) 162 message_repository = InMemoryMessageRepository( 163 Level.DEBUG if emit_connector_builder_messages else Level.INFO 164 ) 165 166 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 167 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 168 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 169 # incremental streams running in full refresh. 170 component_factory = ModelToComponentFactory( 171 emit_connector_builder_messages=emit_connector_builder_messages, 172 message_repository=ConcurrentMessageRepository(queue, message_repository), 173 connector_state_manager=self._connector_state_manager, 174 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 175 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 176 limit_slices_fetched=limits.max_slices if limits else None, 177 disable_retries=True if limits else False, 178 disable_cache=True if limits else False, 179 ) 180 181 self._should_normalize = normalize_manifest 182 self._should_migrate = migrate_manifest 183 self._declarative_component_schema = _get_declarative_component_schema() 184 # If custom components are needed, locate and/or register them. 185 self.components_module: ModuleType | None = get_registered_components_module(config=config) 186 # set additional attributes 187 self._debug = debug 188 self._emit_connector_builder_messages = emit_connector_builder_messages 189 self._constructor = ( 190 component_factory 191 if component_factory 192 else ModelToComponentFactory( 193 emit_connector_builder_messages=emit_connector_builder_messages, 194 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 195 ) 196 ) 197 198 self._message_repository = self._constructor.get_message_repository() 199 self._slice_logger: SliceLogger = ( 200 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 201 ) 202 203 # resolve all components in the manifest 204 self._source_config = self._pre_process_manifest(dict(source_config)) 205 # validate resolved manifest against the declarative component schema 206 self._validate_source() 207 # apply additional post-processing to the manifest 208 self._post_process_manifest() 209 210 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 211 self._spec_component: Optional[Spec] = ( 212 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 213 ) 214 self._config = self._migrate_and_transform_config(config_path, config) or {} 215 216 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 217 if concurrency_level_from_manifest: 218 concurrency_level_component = self._constructor.create_component( 219 model_type=ConcurrencyLevelModel, 220 component_definition=concurrency_level_from_manifest, 221 config=config or {}, 222 ) 223 if not isinstance(concurrency_level_component, ConcurrencyLevel): 224 raise ValueError( 225 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 226 ) 227 228 concurrency_level = concurrency_level_component.get_concurrency_level() 229 initial_number_of_partitions_to_generate = max( 230 concurrency_level // 2, 1 231 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 232 else: 233 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 234 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 235 236 self._concurrent_source = ConcurrentSource.create( 237 num_workers=concurrency_level, 238 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 239 logger=self.logger, 240 slice_logger=self._slice_logger, 241 queue=queue, 242 message_repository=self._message_repository, 243 ) 244 245 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 246 """ 247 Preprocesses the provided manifest dictionary by resolving any manifest references. 248 249 This method modifies the input manifest in place, resolving references using the 250 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 251 252 Args: 253 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 254 255 Returns: 256 None 257 """ 258 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 259 manifest = self._fix_source_type(manifest) 260 # Resolve references in the manifest 261 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 262 # Propagate types and parameters throughout the manifest 263 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 264 "", resolved_manifest, {} 265 ) 266 267 return propagated_manifest 268 269 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 270 """ 271 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 272 """ 273 if "type" not in manifest: 274 manifest["type"] = "DeclarativeSource" 275 276 return manifest 277 278 def _post_process_manifest(self) -> None: 279 """ 280 Post-processes the manifest after validation. 281 This method is responsible for any additional modifications or transformations needed 282 after the manifest has been validated and before it is used in the source. 283 """ 284 # apply manifest migration, if required 285 self._migrate_manifest() 286 # apply manifest normalization, if required 287 self._normalize_manifest() 288 289 def _migrate_manifest(self) -> None: 290 """ 291 This method is used to migrate the manifest. It should be called after the manifest has been validated. 292 The migration is done in place, so the original manifest is modified. 293 294 The original manifest is returned if any error occurs during migration. 295 """ 296 if self._should_migrate: 297 manifest_migrator = ManifestMigrationHandler(self._source_config) 298 self._source_config = manifest_migrator.apply_migrations() 299 # validate migrated manifest against the declarative component schema 300 self._validate_source() 301 302 def _normalize_manifest(self) -> None: 303 """ 304 This method is used to normalize the manifest. It should be called after the manifest has been validated. 305 306 Connector Builder UI rendering requires the manifest to be in a specific format. 307 - references have been resolved 308 - the commonly used definitions are extracted to the `definitions.linked.*` 309 """ 310 if self._should_normalize: 311 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 312 self._source_config = normalizer.normalize() 313 314 def _validate_source(self) -> None: 315 """ 316 Validates the connector manifest against the declarative component schema 317 """ 318 319 try: 320 validate(self._source_config, self._declarative_component_schema) 321 except ValidationError as e: 322 raise ValidationError( 323 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 324 ) from e 325 326 def _migrate_and_transform_config( 327 self, 328 config_path: Optional[str], 329 config: Optional[Config], 330 ) -> Optional[Config]: 331 if not config: 332 return None 333 if not self._spec_component: 334 return config 335 mutable_config = dict(config) 336 self._spec_component.migrate_config(mutable_config) 337 if mutable_config != config: 338 if config_path: 339 with open(config_path, "w") as f: 340 json.dump(mutable_config, f) 341 control_message = create_connector_config_control_message(mutable_config) 342 print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode()) 343 self._spec_component.transform_config(mutable_config) 344 return mutable_config 345 346 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 347 config = self._config or config 348 return super().configure(config, temp_dir) 349 350 @property 351 def resolved_manifest(self) -> Mapping[str, Any]: 352 """ 353 Returns the resolved manifest configuration for the source. 354 355 This property provides access to the internal source configuration as a mapping, 356 which contains all settings and parameters required to define the source's behavior. 357 358 Returns: 359 Mapping[str, Any]: The resolved source configuration manifest. 360 """ 361 return self._source_config 362 363 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 364 return self._constructor.get_model_deprecations() 365 366 def read( 367 self, 368 logger: logging.Logger, 369 config: Mapping[str, Any], 370 catalog: ConfiguredAirbyteCatalog, 371 state: Optional[List[AirbyteStateMessage]] = None, 372 ) -> Iterator[AirbyteMessage]: 373 selected_concurrent_streams = self._select_streams( 374 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 375 configured_catalog=catalog, 376 ) 377 378 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 379 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 380 if len(selected_concurrent_streams) > 0: 381 yield from self._concurrent_source.read(selected_concurrent_streams) 382 383 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 384 return AirbyteCatalog( 385 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 386 ) 387 388 # todo: add PR comment about whether we can change the signature to List[AbstractStream] 389 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 390 """ 391 The `streams` method is used as part of the AbstractSource in the following cases: 392 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 393 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 394 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 395 396 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 397 """ 398 399 if self._spec_component: 400 self._spec_component.validate_config(self._config) 401 402 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 403 404 api_budget_model = self._source_config.get("api_budget") 405 if api_budget_model: 406 self._constructor.set_api_budget(api_budget_model, self._config) 407 408 source_streams = [ 409 self._constructor.create_component( 410 ( 411 StateDelegatingStreamModel 412 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 413 else DeclarativeStreamModel 414 ), 415 stream_config, 416 self._config, 417 emit_connector_builder_messages=self._emit_connector_builder_messages, 418 ) 419 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 420 ] 421 return source_streams 422 423 @staticmethod 424 def _initialize_cache_for_parent_streams( 425 stream_configs: List[Dict[str, Any]], 426 ) -> List[Dict[str, Any]]: 427 parent_streams = set() 428 429 def update_with_cache_parent_configs( 430 parent_configs: list[dict[str, Any]], 431 ) -> None: 432 for parent_config in parent_configs: 433 parent_streams.add(parent_config["stream"]["name"]) 434 if parent_config["stream"]["type"] == "StateDelegatingStream": 435 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 436 "use_cache" 437 ] = True 438 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 439 "use_cache" 440 ] = True 441 else: 442 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 443 444 for stream_config in stream_configs: 445 if stream_config.get("incremental_sync", {}).get("parent_stream"): 446 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 447 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 448 "use_cache" 449 ] = True 450 451 elif stream_config.get("retriever", {}).get("partition_router", {}): 452 partition_router = stream_config["retriever"]["partition_router"] 453 454 if isinstance(partition_router, dict) and partition_router.get( 455 "parent_stream_configs" 456 ): 457 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 458 elif isinstance(partition_router, list): 459 for router in partition_router: 460 if router.get("parent_stream_configs"): 461 update_with_cache_parent_configs(router["parent_stream_configs"]) 462 463 for stream_config in stream_configs: 464 if stream_config["name"] in parent_streams: 465 if stream_config["type"] == "StateDelegatingStream": 466 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 467 True 468 ) 469 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 470 True 471 ) 472 else: 473 stream_config["retriever"]["requester"]["use_cache"] = True 474 return stream_configs 475 476 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 477 """ 478 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 479 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 480 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 481 in the project root. 482 """ 483 return ( 484 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 485 ) 486 487 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 488 check = self._source_config.get("check") 489 if not check: 490 raise ValueError(f"Missing 'check' component definition within the manifest.") 491 492 if "type" not in check: 493 check["type"] = "CheckStream" 494 connection_checker = self._constructor.create_component( 495 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 496 check, 497 dict(), 498 emit_connector_builder_messages=self._emit_connector_builder_messages, 499 ) 500 if not isinstance(connection_checker, ConnectionChecker): 501 raise ValueError( 502 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 503 ) 504 505 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 506 if not check_succeeded: 507 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 508 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 509 510 @property 511 def dynamic_streams(self) -> List[Dict[str, Any]]: 512 return self._dynamic_stream_configs( 513 manifest=self._source_config, 514 with_dynamic_stream_name=True, 515 ) 516 517 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 518 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 519 stream_configs = [] 520 for current_stream_config in manifest.get("streams", []): 521 if ( 522 "type" in current_stream_config 523 and current_stream_config["type"] == "ConditionalStreams" 524 ): 525 interpolated_boolean = InterpolatedBoolean( 526 condition=current_stream_config.get("condition"), 527 parameters={}, 528 ) 529 530 if interpolated_boolean.eval(config=self._config): 531 stream_configs.extend(current_stream_config.get("streams", [])) 532 else: 533 if "type" not in current_stream_config: 534 current_stream_config["type"] = "DeclarativeStream" 535 stream_configs.append(current_stream_config) 536 return stream_configs 537 538 def _dynamic_stream_configs( 539 self, 540 manifest: Mapping[str, Any], 541 with_dynamic_stream_name: Optional[bool] = None, 542 ) -> List[Dict[str, Any]]: 543 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 544 dynamic_stream_configs: List[Dict[str, Any]] = [] 545 seen_dynamic_streams: Set[str] = set() 546 547 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 548 components_resolver_config = dynamic_definition["components_resolver"] 549 550 if not components_resolver_config: 551 raise ValueError( 552 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 553 ) 554 555 resolver_type = components_resolver_config.get("type") 556 if not resolver_type: 557 raise ValueError( 558 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 559 ) 560 561 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 562 raise ValueError( 563 f"Invalid components resolver type '{resolver_type}'. " 564 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 565 ) 566 567 if "retriever" in components_resolver_config: 568 components_resolver_config["retriever"]["requester"]["use_cache"] = True 569 570 # Create a resolver for dynamic components based on type 571 if resolver_type == "HttpComponentsResolver": 572 components_resolver = self._constructor.create_component( 573 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 574 component_definition=components_resolver_config, 575 config=self._config, 576 stream_name=dynamic_definition.get("name"), 577 ) 578 else: 579 components_resolver = self._constructor.create_component( 580 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 581 component_definition=components_resolver_config, 582 config=self._config, 583 ) 584 585 stream_template_config = dynamic_definition["stream_template"] 586 587 for dynamic_stream in components_resolver.resolve_components( 588 stream_template_config=stream_template_config 589 ): 590 # Get the use_parent_parameters configuration from the dynamic definition 591 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 592 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 593 594 dynamic_stream = { 595 **ManifestComponentTransformer().propagate_types_and_parameters( 596 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 597 ) 598 } 599 600 if "type" not in dynamic_stream: 601 dynamic_stream["type"] = "DeclarativeStream" 602 603 # Ensure that each stream is created with a unique name 604 name = dynamic_stream.get("name") 605 606 if with_dynamic_stream_name: 607 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 608 "name", f"dynamic_stream_{dynamic_definition_index}" 609 ) 610 611 if not isinstance(name, str): 612 raise ValueError( 613 f"Expected stream name {name} to be a string, got {type(name)}." 614 ) 615 616 if name in seen_dynamic_streams: 617 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 618 failure_type = FailureType.system_error 619 620 if resolver_type == "ConfigComponentsResolver": 621 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 622 failure_type = FailureType.config_error 623 624 raise AirbyteTracedException( 625 message=error_message, 626 internal_message=error_message, 627 failure_type=failure_type, 628 ) 629 630 seen_dynamic_streams.add(name) 631 dynamic_stream_configs.append(dynamic_stream) 632 633 return dynamic_stream_configs 634 635 def _select_streams( 636 self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 637 ) -> List[AbstractStream]: 638 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 639 abstract_streams: List[AbstractStream] = [] 640 for configured_stream in configured_catalog.streams: 641 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 642 if stream_instance: 643 abstract_streams.append(stream_instance) 644 else: 645 # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if 646 # the source was configured with raise_exception_on_missing_stream=True. This was used on very 647 # few sources like facebook-marketing and google-ads. We decided not to port this feature over, 648 # but we can do so if we feel it necessary. With the current behavior,we should still result 649 # in a partial failure since missing streams will be marked as INCOMPLETE. 650 self._message_repository.emit_message( 651 as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) 652 ) 653 return abstract_streams
Helper class that provides a standard way to create an ABC using inheritance.
134 def __init__( 135 self, 136 catalog: Optional[ConfiguredAirbyteCatalog] = None, 137 config: Optional[Mapping[str, Any]] = None, 138 state: Optional[List[AirbyteStateMessage]] = None, 139 *, 140 source_config: ConnectionDefinition, 141 debug: bool = False, 142 emit_connector_builder_messages: bool = False, 143 migrate_manifest: bool = False, 144 normalize_manifest: bool = False, 145 limits: Optional[TestLimits] = None, 146 config_path: Optional[str] = None, 147 **kwargs: Any, 148 ) -> None: 149 self.logger = logging.getLogger(f"airbyte.{self.name}") 150 151 self._limits = limits 152 153 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 154 # no longer needs to store the original incoming state. But maybe there's an edge case? 155 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 156 157 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 158 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 159 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 160 # information and might even need to be configurable depending on the source 161 queue: Queue[QueueItem] = Queue(maxsize=10_000) 162 message_repository = InMemoryMessageRepository( 163 Level.DEBUG if emit_connector_builder_messages else Level.INFO 164 ) 165 166 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 167 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 168 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 169 # incremental streams running in full refresh. 170 component_factory = ModelToComponentFactory( 171 emit_connector_builder_messages=emit_connector_builder_messages, 172 message_repository=ConcurrentMessageRepository(queue, message_repository), 173 connector_state_manager=self._connector_state_manager, 174 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 175 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 176 limit_slices_fetched=limits.max_slices if limits else None, 177 disable_retries=True if limits else False, 178 disable_cache=True if limits else False, 179 ) 180 181 self._should_normalize = normalize_manifest 182 self._should_migrate = migrate_manifest 183 self._declarative_component_schema = _get_declarative_component_schema() 184 # If custom components are needed, locate and/or register them. 185 self.components_module: ModuleType | None = get_registered_components_module(config=config) 186 # set additional attributes 187 self._debug = debug 188 self._emit_connector_builder_messages = emit_connector_builder_messages 189 self._constructor = ( 190 component_factory 191 if component_factory 192 else ModelToComponentFactory( 193 emit_connector_builder_messages=emit_connector_builder_messages, 194 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 195 ) 196 ) 197 198 self._message_repository = self._constructor.get_message_repository() 199 self._slice_logger: SliceLogger = ( 200 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 201 ) 202 203 # resolve all components in the manifest 204 self._source_config = self._pre_process_manifest(dict(source_config)) 205 # validate resolved manifest against the declarative component schema 206 self._validate_source() 207 # apply additional post-processing to the manifest 208 self._post_process_manifest() 209 210 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 211 self._spec_component: Optional[Spec] = ( 212 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 213 ) 214 self._config = self._migrate_and_transform_config(config_path, config) or {} 215 216 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 217 if concurrency_level_from_manifest: 218 concurrency_level_component = self._constructor.create_component( 219 model_type=ConcurrencyLevelModel, 220 component_definition=concurrency_level_from_manifest, 221 config=config or {}, 222 ) 223 if not isinstance(concurrency_level_component, ConcurrencyLevel): 224 raise ValueError( 225 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 226 ) 227 228 concurrency_level = concurrency_level_component.get_concurrency_level() 229 initial_number_of_partitions_to_generate = max( 230 concurrency_level // 2, 1 231 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 232 else: 233 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 234 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 235 236 self._concurrent_source = ConcurrentSource.create( 237 num_workers=concurrency_level, 238 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 239 logger=self.logger, 240 slice_logger=self._slice_logger, 241 queue=queue, 242 message_repository=self._message_repository, 243 )
350 @property 351 def resolved_manifest(self) -> Mapping[str, Any]: 352 """ 353 Returns the resolved manifest configuration for the source. 354 355 This property provides access to the internal source configuration as a mapping, 356 which contains all settings and parameters required to define the source's behavior. 357 358 Returns: 359 Mapping[str, Any]: The resolved source configuration manifest. 360 """ 361 return self._source_config
Returns the resolved manifest configuration for the source.
This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.
Returns:
Mapping[str, Any]: The resolved source configuration manifest.
366 def read( 367 self, 368 logger: logging.Logger, 369 config: Mapping[str, Any], 370 catalog: ConfiguredAirbyteCatalog, 371 state: Optional[List[AirbyteStateMessage]] = None, 372 ) -> Iterator[AirbyteMessage]: 373 selected_concurrent_streams = self._select_streams( 374 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 375 configured_catalog=catalog, 376 ) 377 378 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 379 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 380 if len(selected_concurrent_streams) > 0: 381 yield from self._concurrent_source.read(selected_concurrent_streams)
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
383 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 384 return AirbyteCatalog( 385 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 386 )
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
389 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 390 """ 391 The `streams` method is used as part of the AbstractSource in the following cases: 392 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 393 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 394 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 395 396 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 397 """ 398 399 if self._spec_component: 400 self._spec_component.validate_config(self._config) 401 402 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 403 404 api_budget_model = self._source_config.get("api_budget") 405 if api_budget_model: 406 self._constructor.set_api_budget(api_budget_model, self._config) 407 408 source_streams = [ 409 self._constructor.create_component( 410 ( 411 StateDelegatingStreamModel 412 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 413 else DeclarativeStreamModel 414 ), 415 stream_config, 416 self._config, 417 emit_connector_builder_messages=self._emit_connector_builder_messages, 418 ) 419 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 420 ] 421 return source_streams
The streams
method is used as part of the AbstractSource in the following cases:
- ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
- ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by
streams
) Note thatsuper.streams(config)
is also called when splitting the streams between concurrent or not in_group_streams
.
In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
476 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 477 """ 478 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 479 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 480 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 481 in the project root. 482 """ 483 return ( 484 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 485 )
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
487 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 488 check = self._source_config.get("check") 489 if not check: 490 raise ValueError(f"Missing 'check' component definition within the manifest.") 491 492 if "type" not in check: 493 check["type"] = "CheckStream" 494 connection_checker = self._constructor.create_component( 495 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 496 check, 497 dict(), 498 emit_connector_builder_messages=self._emit_connector_builder_messages, 499 ) 500 if not isinstance(connection_checker, ConnectionChecker): 501 raise ValueError( 502 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 503 ) 504 505 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 506 if not check_succeeded: 507 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 508 return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.