airbyte_cdk.sources.declarative.concurrent_declarative_source
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3import json 4import logging 5import pkgutil 6from copy import deepcopy 7from dataclasses import dataclass, field 8from queue import Queue 9from types import ModuleType 10from typing import ( 11 Any, 12 ClassVar, 13 Dict, 14 Iterator, 15 List, 16 Mapping, 17 Optional, 18 Set, 19) 20 21import orjson 22import yaml 23from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor 24from jsonschema.exceptions import ValidationError 25from jsonschema.validators import validate 26 27from airbyte_cdk.config_observation import create_connector_config_control_message 28from airbyte_cdk.connector_builder.models import ( 29 LogMessage as ConnectorBuilderLogMessage, 30) 31from airbyte_cdk.manifest_migrations.migration_handler import ( 32 ManifestMigrationHandler, 33) 34from airbyte_cdk.models import ( 35 AirbyteCatalog, 36 AirbyteConnectionStatus, 37 AirbyteMessage, 38 AirbyteStateMessage, 39 ConfiguredAirbyteCatalog, 40 ConnectorSpecification, 41 FailureType, 42 Status, 43) 44from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer 45from airbyte_cdk.sources import Source 46from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 47from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 48from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 49from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 50from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel 51from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean 52from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 53 ConcurrencyLevel as ConcurrencyLevelModel, 54) 55from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 56 DeclarativeStream as DeclarativeStreamModel, 57) 58from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 59 Spec as SpecModel, 60) 61from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 62 StateDelegatingStream as StateDelegatingStreamModel, 63) 64from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 65 get_registered_components_module, 66) 67from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 68 ManifestComponentTransformer, 69) 70from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import ( 71 ManifestNormalizer, 72) 73from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 74 ManifestReferenceResolver, 75) 76from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 77 ModelToComponentFactory, 78) 79from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 80from airbyte_cdk.sources.declarative.spec.spec import Spec 81from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition 82from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository 83from airbyte_cdk.sources.message.repository import InMemoryMessageRepository 84from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 85from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem 86from airbyte_cdk.sources.utils.slice_logger import ( 87 AlwaysLogSliceLogger, 88 DebugSliceLogger, 89 SliceLogger, 90) 91from airbyte_cdk.utils.stream_status_utils import as_airbyte_message 92from airbyte_cdk.utils.traced_exception import AirbyteTracedException 93 94 95@dataclass 96class TestLimits: 97 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 98 99 DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 100 DEFAULT_MAX_SLICES: ClassVar[int] = 5 101 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 102 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 103 104 max_records: int = field(default=DEFAULT_MAX_RECORDS) 105 max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) 106 max_slices: int = field(default=DEFAULT_MAX_SLICES) 107 max_streams: int = field(default=DEFAULT_MAX_STREAMS) 108 109 110def _get_declarative_component_schema() -> Dict[str, Any]: 111 try: 112 raw_component_schema = pkgutil.get_data( 113 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 114 ) 115 if raw_component_schema is not None: 116 declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader) 117 return declarative_component_schema # type: ignore 118 else: 119 raise RuntimeError( 120 "Failed to read manifest component json schema required for deduplication" 121 ) 122 except FileNotFoundError as e: 123 raise FileNotFoundError( 124 f"Failed to read manifest component json schema required for deduplication: {e}" 125 ) 126 127 128class ConcurrentDeclarativeSource(Source): 129 # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock 130 # because it has hit the limit of futures but not partition reader is consuming them. 131 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 132 133 def __init__( 134 self, 135 catalog: Optional[ConfiguredAirbyteCatalog] = None, 136 config: Optional[Mapping[str, Any]] = None, 137 state: Optional[List[AirbyteStateMessage]] = None, 138 *, 139 source_config: ConnectionDefinition, 140 debug: bool = False, 141 emit_connector_builder_messages: bool = False, 142 migrate_manifest: bool = False, 143 normalize_manifest: bool = False, 144 limits: Optional[TestLimits] = None, 145 config_path: Optional[str] = None, 146 **kwargs: Any, 147 ) -> None: 148 self.logger = logging.getLogger(f"airbyte.{self.name}") 149 150 self._limits = limits 151 152 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 153 # no longer needs to store the original incoming state. But maybe there's an edge case? 154 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 155 156 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 157 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 158 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 159 # information and might even need to be configurable depending on the source 160 queue: Queue[QueueItem] = Queue(maxsize=10_000) 161 message_repository = InMemoryMessageRepository( 162 Level.DEBUG if emit_connector_builder_messages else Level.INFO 163 ) 164 165 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 166 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 167 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 168 # incremental streams running in full refresh. 169 component_factory = ModelToComponentFactory( 170 emit_connector_builder_messages=emit_connector_builder_messages, 171 message_repository=ConcurrentMessageRepository(queue, message_repository), 172 configured_catalog=catalog, 173 connector_state_manager=self._connector_state_manager, 174 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 175 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 176 limit_slices_fetched=limits.max_slices if limits else None, 177 disable_retries=True if limits else False, 178 disable_cache=True if limits else False, 179 ) 180 181 self._should_normalize = normalize_manifest 182 self._should_migrate = migrate_manifest 183 self._declarative_component_schema = _get_declarative_component_schema() 184 # If custom components are needed, locate and/or register them. 185 self.components_module: ModuleType | None = get_registered_components_module(config=config) 186 # set additional attributes 187 self._debug = debug 188 self._emit_connector_builder_messages = emit_connector_builder_messages 189 self._constructor = ( 190 component_factory 191 if component_factory 192 else ModelToComponentFactory( 193 emit_connector_builder_messages=emit_connector_builder_messages, 194 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 195 ) 196 ) 197 198 self._message_repository = self._constructor.get_message_repository() 199 self._slice_logger: SliceLogger = ( 200 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 201 ) 202 203 # resolve all components in the manifest 204 self._source_config = self._pre_process_manifest(dict(source_config)) 205 # validate resolved manifest against the declarative component schema 206 self._validate_source() 207 # apply additional post-processing to the manifest 208 self._post_process_manifest() 209 210 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 211 self._spec_component: Optional[Spec] = ( 212 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 213 ) 214 self._config = self._migrate_and_transform_config(config_path, config) or {} 215 216 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 217 if concurrency_level_from_manifest: 218 concurrency_level_component = self._constructor.create_component( 219 model_type=ConcurrencyLevelModel, 220 component_definition=concurrency_level_from_manifest, 221 config=config or {}, 222 ) 223 if not isinstance(concurrency_level_component, ConcurrencyLevel): 224 raise ValueError( 225 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 226 ) 227 228 concurrency_level = concurrency_level_component.get_concurrency_level() 229 initial_number_of_partitions_to_generate = max( 230 concurrency_level // 2, 1 231 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 232 else: 233 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 234 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 235 236 self._concurrent_source = ConcurrentSource.create( 237 num_workers=concurrency_level, 238 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 239 logger=self.logger, 240 slice_logger=self._slice_logger, 241 queue=queue, 242 message_repository=self._message_repository, 243 ) 244 245 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 246 """ 247 Preprocesses the provided manifest dictionary by resolving any manifest references. 248 249 This method modifies the input manifest in place, resolving references using the 250 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 251 252 Args: 253 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 254 255 Returns: 256 None 257 """ 258 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 259 manifest = self._fix_source_type(manifest) 260 # Resolve references in the manifest 261 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 262 # Propagate types and parameters throughout the manifest 263 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 264 "", resolved_manifest, {} 265 ) 266 267 return propagated_manifest 268 269 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 270 """ 271 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 272 """ 273 if "type" not in manifest: 274 manifest["type"] = "DeclarativeSource" 275 276 return manifest 277 278 def _post_process_manifest(self) -> None: 279 """ 280 Post-processes the manifest after validation. 281 This method is responsible for any additional modifications or transformations needed 282 after the manifest has been validated and before it is used in the source. 283 """ 284 # apply manifest migration, if required 285 self._migrate_manifest() 286 # apply manifest normalization, if required 287 self._normalize_manifest() 288 289 def _migrate_manifest(self) -> None: 290 """ 291 This method is used to migrate the manifest. It should be called after the manifest has been validated. 292 The migration is done in place, so the original manifest is modified. 293 294 The original manifest is returned if any error occurs during migration. 295 """ 296 if self._should_migrate: 297 manifest_migrator = ManifestMigrationHandler(self._source_config) 298 self._source_config = manifest_migrator.apply_migrations() 299 # validate migrated manifest against the declarative component schema 300 self._validate_source() 301 302 def _normalize_manifest(self) -> None: 303 """ 304 This method is used to normalize the manifest. It should be called after the manifest has been validated. 305 306 Connector Builder UI rendering requires the manifest to be in a specific format. 307 - references have been resolved 308 - the commonly used definitions are extracted to the `definitions.linked.*` 309 """ 310 if self._should_normalize: 311 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 312 self._source_config = normalizer.normalize() 313 314 def _validate_source(self) -> None: 315 """ 316 Validates the connector manifest against the declarative component schema 317 """ 318 319 try: 320 validate(self._source_config, self._declarative_component_schema) 321 except ValidationError as e: 322 raise ValidationError( 323 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 324 ) from e 325 326 def _migrate_and_transform_config( 327 self, 328 config_path: Optional[str], 329 config: Optional[Config], 330 ) -> Optional[Config]: 331 if not config: 332 return None 333 if not self._spec_component: 334 return config 335 mutable_config = dict(config) 336 self._spec_component.migrate_config(mutable_config) 337 if mutable_config != config: 338 if config_path: 339 with open(config_path, "w") as f: 340 json.dump(mutable_config, f) 341 control_message = create_connector_config_control_message(mutable_config) 342 print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode()) 343 self._spec_component.transform_config(mutable_config) 344 return mutable_config 345 346 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 347 config = self._config or config 348 return super().configure(config, temp_dir) 349 350 @property 351 def resolved_manifest(self) -> Mapping[str, Any]: 352 """ 353 Returns the resolved manifest configuration for the source. 354 355 This property provides access to the internal source configuration as a mapping, 356 which contains all settings and parameters required to define the source's behavior. 357 358 Returns: 359 Mapping[str, Any]: The resolved source configuration manifest. 360 """ 361 return self._source_config 362 363 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 364 return self._constructor.get_model_deprecations() 365 366 def read( 367 self, 368 logger: logging.Logger, 369 config: Mapping[str, Any], 370 catalog: ConfiguredAirbyteCatalog, 371 state: Optional[List[AirbyteStateMessage]] = None, 372 ) -> Iterator[AirbyteMessage]: 373 selected_concurrent_streams = self._select_streams( 374 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 375 configured_catalog=catalog, 376 ) 377 378 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 379 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 380 if len(selected_concurrent_streams) > 0: 381 yield from self._concurrent_source.read(selected_concurrent_streams) 382 383 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 384 return AirbyteCatalog( 385 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 386 ) 387 388 # todo: add PR comment about whether we can change the signature to List[AbstractStream] 389 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 390 """ 391 The `streams` method is used as part of the AbstractSource in the following cases: 392 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 393 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 394 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 395 396 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 397 """ 398 399 if self._spec_component: 400 self._spec_component.validate_config(self._config) 401 402 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 403 404 api_budget_model = self._source_config.get("api_budget") 405 if api_budget_model: 406 self._constructor.set_api_budget(api_budget_model, self._config) 407 408 source_streams = [ 409 self._constructor.create_component( 410 ( 411 StateDelegatingStreamModel 412 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 413 else DeclarativeStreamModel 414 ), 415 stream_config, 416 self._config, 417 emit_connector_builder_messages=self._emit_connector_builder_messages, 418 ) 419 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 420 ] 421 return source_streams 422 423 @staticmethod 424 def _initialize_cache_for_parent_streams( 425 stream_configs: List[Dict[str, Any]], 426 ) -> List[Dict[str, Any]]: 427 parent_streams = set() 428 429 def update_with_cache_parent_configs( 430 parent_configs: list[dict[str, Any]], 431 ) -> None: 432 for parent_config in parent_configs: 433 parent_streams.add(parent_config["stream"]["name"]) 434 if parent_config["stream"]["type"] == "StateDelegatingStream": 435 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 436 "use_cache" 437 ] = True 438 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 439 "use_cache" 440 ] = True 441 else: 442 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 443 444 for stream_config in stream_configs: 445 if stream_config.get("incremental_sync", {}).get("parent_stream"): 446 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 447 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 448 "use_cache" 449 ] = True 450 451 elif stream_config.get("retriever", {}).get("partition_router", {}): 452 partition_router = stream_config["retriever"]["partition_router"] 453 454 if isinstance(partition_router, dict) and partition_router.get( 455 "parent_stream_configs" 456 ): 457 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 458 elif isinstance(partition_router, list): 459 for router in partition_router: 460 if router.get("parent_stream_configs"): 461 update_with_cache_parent_configs(router["parent_stream_configs"]) 462 463 for stream_config in stream_configs: 464 if stream_config["name"] in parent_streams: 465 if stream_config["type"] == "StateDelegatingStream": 466 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 467 True 468 ) 469 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 470 True 471 ) 472 else: 473 stream_config["retriever"]["requester"]["use_cache"] = True 474 return stream_configs 475 476 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 477 """ 478 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 479 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 480 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 481 in the project root. 482 """ 483 return ( 484 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 485 ) 486 487 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 488 check = self._source_config.get("check") 489 if not check: 490 raise ValueError(f"Missing 'check' component definition within the manifest.") 491 492 if "type" not in check: 493 check["type"] = "CheckStream" 494 connection_checker = self._constructor.create_component( 495 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 496 check, 497 dict(), 498 emit_connector_builder_messages=self._emit_connector_builder_messages, 499 ) 500 if not isinstance(connection_checker, ConnectionChecker): 501 raise ValueError( 502 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 503 ) 504 505 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 506 if not check_succeeded: 507 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 508 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 509 510 @property 511 def dynamic_streams(self) -> List[Dict[str, Any]]: 512 return self._dynamic_stream_configs( 513 manifest=self._source_config, 514 with_dynamic_stream_name=True, 515 ) 516 517 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 518 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 519 stream_configs = [] 520 for current_stream_config in manifest.get("streams", []): 521 if ( 522 "type" in current_stream_config 523 and current_stream_config["type"] == "ConditionalStreams" 524 ): 525 interpolated_boolean = InterpolatedBoolean( 526 condition=current_stream_config.get("condition"), 527 parameters={}, 528 ) 529 530 if interpolated_boolean.eval(config=self._config): 531 stream_configs.extend(current_stream_config.get("streams", [])) 532 else: 533 if "type" not in current_stream_config: 534 current_stream_config["type"] = "DeclarativeStream" 535 stream_configs.append(current_stream_config) 536 return stream_configs 537 538 def _dynamic_stream_configs( 539 self, 540 manifest: Mapping[str, Any], 541 with_dynamic_stream_name: Optional[bool] = None, 542 ) -> List[Dict[str, Any]]: 543 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 544 dynamic_stream_configs: List[Dict[str, Any]] = [] 545 seen_dynamic_streams: Set[str] = set() 546 547 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 548 components_resolver_config = dynamic_definition["components_resolver"] 549 550 if not components_resolver_config: 551 raise ValueError( 552 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 553 ) 554 555 resolver_type = components_resolver_config.get("type") 556 if not resolver_type: 557 raise ValueError( 558 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 559 ) 560 561 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 562 raise ValueError( 563 f"Invalid components resolver type '{resolver_type}'. " 564 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 565 ) 566 567 if "retriever" in components_resolver_config: 568 components_resolver_config["retriever"]["requester"]["use_cache"] = True 569 570 # Create a resolver for dynamic components based on type 571 if resolver_type == "HttpComponentsResolver": 572 components_resolver = self._constructor.create_component( 573 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 574 component_definition=components_resolver_config, 575 config=self._config, 576 stream_name=dynamic_definition.get("name"), 577 ) 578 else: 579 components_resolver = self._constructor.create_component( 580 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 581 component_definition=components_resolver_config, 582 config=self._config, 583 ) 584 585 stream_template_config = dynamic_definition["stream_template"] 586 587 for dynamic_stream in components_resolver.resolve_components( 588 stream_template_config=stream_template_config 589 ): 590 # Get the use_parent_parameters configuration from the dynamic definition 591 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 592 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 593 594 dynamic_stream = { 595 **ManifestComponentTransformer().propagate_types_and_parameters( 596 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 597 ) 598 } 599 600 if "type" not in dynamic_stream: 601 dynamic_stream["type"] = "DeclarativeStream" 602 603 # Ensure that each stream is created with a unique name 604 name = dynamic_stream.get("name") 605 606 if with_dynamic_stream_name: 607 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 608 "name", f"dynamic_stream_{dynamic_definition_index}" 609 ) 610 611 if not isinstance(name, str): 612 raise ValueError( 613 f"Expected stream name {name} to be a string, got {type(name)}." 614 ) 615 616 if name in seen_dynamic_streams: 617 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 618 failure_type = FailureType.system_error 619 620 if resolver_type == "ConfigComponentsResolver": 621 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 622 failure_type = FailureType.config_error 623 624 raise AirbyteTracedException( 625 message=error_message, 626 internal_message=error_message, 627 failure_type=failure_type, 628 ) 629 630 seen_dynamic_streams.add(name) 631 dynamic_stream_configs.append(dynamic_stream) 632 633 return dynamic_stream_configs 634 635 def _select_streams( 636 self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 637 ) -> List[AbstractStream]: 638 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 639 abstract_streams: List[AbstractStream] = [] 640 for configured_stream in configured_catalog.streams: 641 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 642 if stream_instance: 643 abstract_streams.append(stream_instance) 644 else: 645 # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if 646 # the source was configured with raise_exception_on_missing_stream=True. This was used on very 647 # few sources like facebook-marketing and google-ads. We decided not to port this feature over, 648 # but we can do so if we feel it necessary. With the current behavior,we should still result 649 # in a partial failure since missing streams will be marked as INCOMPLETE. 650 self._message_repository.emit_message( 651 as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) 652 ) 653 return abstract_streams
96@dataclass 97class TestLimits: 98 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 99 100 DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5 101 DEFAULT_MAX_SLICES: ClassVar[int] = 5 102 DEFAULT_MAX_RECORDS: ClassVar[int] = 100 103 DEFAULT_MAX_STREAMS: ClassVar[int] = 100 104 105 max_records: int = field(default=DEFAULT_MAX_RECORDS) 106 max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE) 107 max_slices: int = field(default=DEFAULT_MAX_SLICES) 108 max_streams: int = field(default=DEFAULT_MAX_STREAMS)
129class ConcurrentDeclarativeSource(Source): 130 # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock 131 # because it has hit the limit of futures but not partition reader is consuming them. 132 _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 133 134 def __init__( 135 self, 136 catalog: Optional[ConfiguredAirbyteCatalog] = None, 137 config: Optional[Mapping[str, Any]] = None, 138 state: Optional[List[AirbyteStateMessage]] = None, 139 *, 140 source_config: ConnectionDefinition, 141 debug: bool = False, 142 emit_connector_builder_messages: bool = False, 143 migrate_manifest: bool = False, 144 normalize_manifest: bool = False, 145 limits: Optional[TestLimits] = None, 146 config_path: Optional[str] = None, 147 **kwargs: Any, 148 ) -> None: 149 self.logger = logging.getLogger(f"airbyte.{self.name}") 150 151 self._limits = limits 152 153 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 154 # no longer needs to store the original incoming state. But maybe there's an edge case? 155 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 156 157 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 158 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 159 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 160 # information and might even need to be configurable depending on the source 161 queue: Queue[QueueItem] = Queue(maxsize=10_000) 162 message_repository = InMemoryMessageRepository( 163 Level.DEBUG if emit_connector_builder_messages else Level.INFO 164 ) 165 166 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 167 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 168 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 169 # incremental streams running in full refresh. 170 component_factory = ModelToComponentFactory( 171 emit_connector_builder_messages=emit_connector_builder_messages, 172 message_repository=ConcurrentMessageRepository(queue, message_repository), 173 configured_catalog=catalog, 174 connector_state_manager=self._connector_state_manager, 175 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 176 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 177 limit_slices_fetched=limits.max_slices if limits else None, 178 disable_retries=True if limits else False, 179 disable_cache=True if limits else False, 180 ) 181 182 self._should_normalize = normalize_manifest 183 self._should_migrate = migrate_manifest 184 self._declarative_component_schema = _get_declarative_component_schema() 185 # If custom components are needed, locate and/or register them. 186 self.components_module: ModuleType | None = get_registered_components_module(config=config) 187 # set additional attributes 188 self._debug = debug 189 self._emit_connector_builder_messages = emit_connector_builder_messages 190 self._constructor = ( 191 component_factory 192 if component_factory 193 else ModelToComponentFactory( 194 emit_connector_builder_messages=emit_connector_builder_messages, 195 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 196 ) 197 ) 198 199 self._message_repository = self._constructor.get_message_repository() 200 self._slice_logger: SliceLogger = ( 201 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 202 ) 203 204 # resolve all components in the manifest 205 self._source_config = self._pre_process_manifest(dict(source_config)) 206 # validate resolved manifest against the declarative component schema 207 self._validate_source() 208 # apply additional post-processing to the manifest 209 self._post_process_manifest() 210 211 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 212 self._spec_component: Optional[Spec] = ( 213 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 214 ) 215 self._config = self._migrate_and_transform_config(config_path, config) or {} 216 217 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 218 if concurrency_level_from_manifest: 219 concurrency_level_component = self._constructor.create_component( 220 model_type=ConcurrencyLevelModel, 221 component_definition=concurrency_level_from_manifest, 222 config=config or {}, 223 ) 224 if not isinstance(concurrency_level_component, ConcurrencyLevel): 225 raise ValueError( 226 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 227 ) 228 229 concurrency_level = concurrency_level_component.get_concurrency_level() 230 initial_number_of_partitions_to_generate = max( 231 concurrency_level // 2, 1 232 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 233 else: 234 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 235 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 236 237 self._concurrent_source = ConcurrentSource.create( 238 num_workers=concurrency_level, 239 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 240 logger=self.logger, 241 slice_logger=self._slice_logger, 242 queue=queue, 243 message_repository=self._message_repository, 244 ) 245 246 def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 247 """ 248 Preprocesses the provided manifest dictionary by resolving any manifest references. 249 250 This method modifies the input manifest in place, resolving references using the 251 ManifestReferenceResolver to ensure all references within the manifest are properly handled. 252 253 Args: 254 manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in. 255 256 Returns: 257 None 258 """ 259 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 260 manifest = self._fix_source_type(manifest) 261 # Resolve references in the manifest 262 resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest) 263 # Propagate types and parameters throughout the manifest 264 propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters( 265 "", resolved_manifest, {} 266 ) 267 268 return propagated_manifest 269 270 def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]: 271 """ 272 Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest. 273 """ 274 if "type" not in manifest: 275 manifest["type"] = "DeclarativeSource" 276 277 return manifest 278 279 def _post_process_manifest(self) -> None: 280 """ 281 Post-processes the manifest after validation. 282 This method is responsible for any additional modifications or transformations needed 283 after the manifest has been validated and before it is used in the source. 284 """ 285 # apply manifest migration, if required 286 self._migrate_manifest() 287 # apply manifest normalization, if required 288 self._normalize_manifest() 289 290 def _migrate_manifest(self) -> None: 291 """ 292 This method is used to migrate the manifest. It should be called after the manifest has been validated. 293 The migration is done in place, so the original manifest is modified. 294 295 The original manifest is returned if any error occurs during migration. 296 """ 297 if self._should_migrate: 298 manifest_migrator = ManifestMigrationHandler(self._source_config) 299 self._source_config = manifest_migrator.apply_migrations() 300 # validate migrated manifest against the declarative component schema 301 self._validate_source() 302 303 def _normalize_manifest(self) -> None: 304 """ 305 This method is used to normalize the manifest. It should be called after the manifest has been validated. 306 307 Connector Builder UI rendering requires the manifest to be in a specific format. 308 - references have been resolved 309 - the commonly used definitions are extracted to the `definitions.linked.*` 310 """ 311 if self._should_normalize: 312 normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema) 313 self._source_config = normalizer.normalize() 314 315 def _validate_source(self) -> None: 316 """ 317 Validates the connector manifest against the declarative component schema 318 """ 319 320 try: 321 validate(self._source_config, self._declarative_component_schema) 322 except ValidationError as e: 323 raise ValidationError( 324 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 325 ) from e 326 327 def _migrate_and_transform_config( 328 self, 329 config_path: Optional[str], 330 config: Optional[Config], 331 ) -> Optional[Config]: 332 if not config: 333 return None 334 if not self._spec_component: 335 return config 336 mutable_config = dict(config) 337 self._spec_component.migrate_config(mutable_config) 338 if mutable_config != config: 339 if config_path: 340 with open(config_path, "w") as f: 341 json.dump(mutable_config, f) 342 control_message = create_connector_config_control_message(mutable_config) 343 print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode()) 344 self._spec_component.transform_config(mutable_config) 345 return mutable_config 346 347 def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]: 348 config = self._config or config 349 return super().configure(config, temp_dir) 350 351 @property 352 def resolved_manifest(self) -> Mapping[str, Any]: 353 """ 354 Returns the resolved manifest configuration for the source. 355 356 This property provides access to the internal source configuration as a mapping, 357 which contains all settings and parameters required to define the source's behavior. 358 359 Returns: 360 Mapping[str, Any]: The resolved source configuration manifest. 361 """ 362 return self._source_config 363 364 def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: 365 return self._constructor.get_model_deprecations() 366 367 def read( 368 self, 369 logger: logging.Logger, 370 config: Mapping[str, Any], 371 catalog: ConfiguredAirbyteCatalog, 372 state: Optional[List[AirbyteStateMessage]] = None, 373 ) -> Iterator[AirbyteMessage]: 374 selected_concurrent_streams = self._select_streams( 375 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 376 configured_catalog=catalog, 377 ) 378 379 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 380 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 381 if len(selected_concurrent_streams) > 0: 382 yield from self._concurrent_source.read(selected_concurrent_streams) 383 384 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 385 return AirbyteCatalog( 386 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 387 ) 388 389 # todo: add PR comment about whether we can change the signature to List[AbstractStream] 390 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 391 """ 392 The `streams` method is used as part of the AbstractSource in the following cases: 393 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 394 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 395 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 396 397 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 398 """ 399 400 if self._spec_component: 401 self._spec_component.validate_config(self._config) 402 403 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 404 405 api_budget_model = self._source_config.get("api_budget") 406 if api_budget_model: 407 self._constructor.set_api_budget(api_budget_model, self._config) 408 409 source_streams = [ 410 self._constructor.create_component( 411 ( 412 StateDelegatingStreamModel 413 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 414 else DeclarativeStreamModel 415 ), 416 stream_config, 417 self._config, 418 emit_connector_builder_messages=self._emit_connector_builder_messages, 419 ) 420 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 421 ] 422 return source_streams 423 424 @staticmethod 425 def _initialize_cache_for_parent_streams( 426 stream_configs: List[Dict[str, Any]], 427 ) -> List[Dict[str, Any]]: 428 parent_streams = set() 429 430 def update_with_cache_parent_configs( 431 parent_configs: list[dict[str, Any]], 432 ) -> None: 433 for parent_config in parent_configs: 434 parent_streams.add(parent_config["stream"]["name"]) 435 if parent_config["stream"]["type"] == "StateDelegatingStream": 436 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 437 "use_cache" 438 ] = True 439 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 440 "use_cache" 441 ] = True 442 else: 443 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 444 445 for stream_config in stream_configs: 446 if stream_config.get("incremental_sync", {}).get("parent_stream"): 447 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 448 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 449 "use_cache" 450 ] = True 451 452 elif stream_config.get("retriever", {}).get("partition_router", {}): 453 partition_router = stream_config["retriever"]["partition_router"] 454 455 if isinstance(partition_router, dict) and partition_router.get( 456 "parent_stream_configs" 457 ): 458 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 459 elif isinstance(partition_router, list): 460 for router in partition_router: 461 if router.get("parent_stream_configs"): 462 update_with_cache_parent_configs(router["parent_stream_configs"]) 463 464 for stream_config in stream_configs: 465 if stream_config["name"] in parent_streams: 466 if stream_config["type"] == "StateDelegatingStream": 467 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 468 True 469 ) 470 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 471 True 472 ) 473 else: 474 stream_config["retriever"]["requester"]["use_cache"] = True 475 return stream_configs 476 477 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 478 """ 479 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 480 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 481 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 482 in the project root. 483 """ 484 return ( 485 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 486 ) 487 488 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 489 check = self._source_config.get("check") 490 if not check: 491 raise ValueError(f"Missing 'check' component definition within the manifest.") 492 493 if "type" not in check: 494 check["type"] = "CheckStream" 495 connection_checker = self._constructor.create_component( 496 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 497 check, 498 dict(), 499 emit_connector_builder_messages=self._emit_connector_builder_messages, 500 ) 501 if not isinstance(connection_checker, ConnectionChecker): 502 raise ValueError( 503 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 504 ) 505 506 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 507 if not check_succeeded: 508 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 509 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 510 511 @property 512 def dynamic_streams(self) -> List[Dict[str, Any]]: 513 return self._dynamic_stream_configs( 514 manifest=self._source_config, 515 with_dynamic_stream_name=True, 516 ) 517 518 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 519 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 520 stream_configs = [] 521 for current_stream_config in manifest.get("streams", []): 522 if ( 523 "type" in current_stream_config 524 and current_stream_config["type"] == "ConditionalStreams" 525 ): 526 interpolated_boolean = InterpolatedBoolean( 527 condition=current_stream_config.get("condition"), 528 parameters={}, 529 ) 530 531 if interpolated_boolean.eval(config=self._config): 532 stream_configs.extend(current_stream_config.get("streams", [])) 533 else: 534 if "type" not in current_stream_config: 535 current_stream_config["type"] = "DeclarativeStream" 536 stream_configs.append(current_stream_config) 537 return stream_configs 538 539 def _dynamic_stream_configs( 540 self, 541 manifest: Mapping[str, Any], 542 with_dynamic_stream_name: Optional[bool] = None, 543 ) -> List[Dict[str, Any]]: 544 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 545 dynamic_stream_configs: List[Dict[str, Any]] = [] 546 seen_dynamic_streams: Set[str] = set() 547 548 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 549 components_resolver_config = dynamic_definition["components_resolver"] 550 551 if not components_resolver_config: 552 raise ValueError( 553 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 554 ) 555 556 resolver_type = components_resolver_config.get("type") 557 if not resolver_type: 558 raise ValueError( 559 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 560 ) 561 562 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 563 raise ValueError( 564 f"Invalid components resolver type '{resolver_type}'. " 565 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 566 ) 567 568 if "retriever" in components_resolver_config: 569 components_resolver_config["retriever"]["requester"]["use_cache"] = True 570 571 # Create a resolver for dynamic components based on type 572 if resolver_type == "HttpComponentsResolver": 573 components_resolver = self._constructor.create_component( 574 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 575 component_definition=components_resolver_config, 576 config=self._config, 577 stream_name=dynamic_definition.get("name"), 578 ) 579 else: 580 components_resolver = self._constructor.create_component( 581 model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], 582 component_definition=components_resolver_config, 583 config=self._config, 584 ) 585 586 stream_template_config = dynamic_definition["stream_template"] 587 588 for dynamic_stream in components_resolver.resolve_components( 589 stream_template_config=stream_template_config 590 ): 591 # Get the use_parent_parameters configuration from the dynamic definition 592 # Default to True for backward compatibility, since connectors were already using it by default when this param was added 593 use_parent_parameters = dynamic_definition.get("use_parent_parameters", True) 594 595 dynamic_stream = { 596 **ManifestComponentTransformer().propagate_types_and_parameters( 597 "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters 598 ) 599 } 600 601 if "type" not in dynamic_stream: 602 dynamic_stream["type"] = "DeclarativeStream" 603 604 # Ensure that each stream is created with a unique name 605 name = dynamic_stream.get("name") 606 607 if with_dynamic_stream_name: 608 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 609 "name", f"dynamic_stream_{dynamic_definition_index}" 610 ) 611 612 if not isinstance(name, str): 613 raise ValueError( 614 f"Expected stream name {name} to be a string, got {type(name)}." 615 ) 616 617 if name in seen_dynamic_streams: 618 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 619 failure_type = FailureType.system_error 620 621 if resolver_type == "ConfigComponentsResolver": 622 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 623 failure_type = FailureType.config_error 624 625 raise AirbyteTracedException( 626 message=error_message, 627 internal_message=error_message, 628 failure_type=failure_type, 629 ) 630 631 seen_dynamic_streams.add(name) 632 dynamic_stream_configs.append(dynamic_stream) 633 634 return dynamic_stream_configs 635 636 def _select_streams( 637 self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog 638 ) -> List[AbstractStream]: 639 stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams} 640 abstract_streams: List[AbstractStream] = [] 641 for configured_stream in configured_catalog.streams: 642 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 643 if stream_instance: 644 abstract_streams.append(stream_instance) 645 else: 646 # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if 647 # the source was configured with raise_exception_on_missing_stream=True. This was used on very 648 # few sources like facebook-marketing and google-ads. We decided not to port this feature over, 649 # but we can do so if we feel it necessary. With the current behavior,we should still result 650 # in a partial failure since missing streams will be marked as INCOMPLETE. 651 self._message_repository.emit_message( 652 as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) 653 ) 654 return abstract_streams
Helper class that provides a standard way to create an ABC using inheritance.
134 def __init__( 135 self, 136 catalog: Optional[ConfiguredAirbyteCatalog] = None, 137 config: Optional[Mapping[str, Any]] = None, 138 state: Optional[List[AirbyteStateMessage]] = None, 139 *, 140 source_config: ConnectionDefinition, 141 debug: bool = False, 142 emit_connector_builder_messages: bool = False, 143 migrate_manifest: bool = False, 144 normalize_manifest: bool = False, 145 limits: Optional[TestLimits] = None, 146 config_path: Optional[str] = None, 147 **kwargs: Any, 148 ) -> None: 149 self.logger = logging.getLogger(f"airbyte.{self.name}") 150 151 self._limits = limits 152 153 # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source 154 # no longer needs to store the original incoming state. But maybe there's an edge case? 155 self._connector_state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later 156 157 # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less 158 # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating 159 # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more 160 # information and might even need to be configurable depending on the source 161 queue: Queue[QueueItem] = Queue(maxsize=10_000) 162 message_repository = InMemoryMessageRepository( 163 Level.DEBUG if emit_connector_builder_messages else Level.INFO 164 ) 165 166 # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic 167 # cursors. We do this by no longer automatically instantiating RFR cursors when converting 168 # the declarative models into runtime components. Concurrent sources will continue to checkpoint 169 # incremental streams running in full refresh. 170 component_factory = ModelToComponentFactory( 171 emit_connector_builder_messages=emit_connector_builder_messages, 172 message_repository=ConcurrentMessageRepository(queue, message_repository), 173 configured_catalog=catalog, 174 connector_state_manager=self._connector_state_manager, 175 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 176 limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None, 177 limit_slices_fetched=limits.max_slices if limits else None, 178 disable_retries=True if limits else False, 179 disable_cache=True if limits else False, 180 ) 181 182 self._should_normalize = normalize_manifest 183 self._should_migrate = migrate_manifest 184 self._declarative_component_schema = _get_declarative_component_schema() 185 # If custom components are needed, locate and/or register them. 186 self.components_module: ModuleType | None = get_registered_components_module(config=config) 187 # set additional attributes 188 self._debug = debug 189 self._emit_connector_builder_messages = emit_connector_builder_messages 190 self._constructor = ( 191 component_factory 192 if component_factory 193 else ModelToComponentFactory( 194 emit_connector_builder_messages=emit_connector_builder_messages, 195 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 196 ) 197 ) 198 199 self._message_repository = self._constructor.get_message_repository() 200 self._slice_logger: SliceLogger = ( 201 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 202 ) 203 204 # resolve all components in the manifest 205 self._source_config = self._pre_process_manifest(dict(source_config)) 206 # validate resolved manifest against the declarative component schema 207 self._validate_source() 208 # apply additional post-processing to the manifest 209 self._post_process_manifest() 210 211 spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") 212 self._spec_component: Optional[Spec] = ( 213 self._constructor.create_component(SpecModel, spec, dict()) if spec else None 214 ) 215 self._config = self._migrate_and_transform_config(config_path, config) or {} 216 217 concurrency_level_from_manifest = self._source_config.get("concurrency_level") 218 if concurrency_level_from_manifest: 219 concurrency_level_component = self._constructor.create_component( 220 model_type=ConcurrencyLevelModel, 221 component_definition=concurrency_level_from_manifest, 222 config=config or {}, 223 ) 224 if not isinstance(concurrency_level_component, ConcurrencyLevel): 225 raise ValueError( 226 f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}" 227 ) 228 229 concurrency_level = concurrency_level_component.get_concurrency_level() 230 initial_number_of_partitions_to_generate = max( 231 concurrency_level // 2, 1 232 ) # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up 233 else: 234 concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL 235 initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 236 237 self._concurrent_source = ConcurrentSource.create( 238 num_workers=concurrency_level, 239 initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, 240 logger=self.logger, 241 slice_logger=self._slice_logger, 242 queue=queue, 243 message_repository=self._message_repository, 244 )
351 @property 352 def resolved_manifest(self) -> Mapping[str, Any]: 353 """ 354 Returns the resolved manifest configuration for the source. 355 356 This property provides access to the internal source configuration as a mapping, 357 which contains all settings and parameters required to define the source's behavior. 358 359 Returns: 360 Mapping[str, Any]: The resolved source configuration manifest. 361 """ 362 return self._source_config
Returns the resolved manifest configuration for the source.
This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.
Returns:
Mapping[str, Any]: The resolved source configuration manifest.
367 def read( 368 self, 369 logger: logging.Logger, 370 config: Mapping[str, Any], 371 catalog: ConfiguredAirbyteCatalog, 372 state: Optional[List[AirbyteStateMessage]] = None, 373 ) -> Iterator[AirbyteMessage]: 374 selected_concurrent_streams = self._select_streams( 375 streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface 376 configured_catalog=catalog, 377 ) 378 379 # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. 380 # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now 381 if len(selected_concurrent_streams) > 0: 382 yield from self._concurrent_source.read(selected_concurrent_streams)
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
384 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 385 return AirbyteCatalog( 386 streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] 387 )
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
390 def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder 391 """ 392 The `streams` method is used as part of the AbstractSource in the following cases: 393 * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams 394 * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`) 395 Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`. 396 397 In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above. 398 """ 399 400 if self._spec_component: 401 self._spec_component.validate_config(self._config) 402 403 stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams 404 405 api_budget_model = self._source_config.get("api_budget") 406 if api_budget_model: 407 self._constructor.set_api_budget(api_budget_model, self._config) 408 409 source_streams = [ 410 self._constructor.create_component( 411 ( 412 StateDelegatingStreamModel 413 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 414 else DeclarativeStreamModel 415 ), 416 stream_config, 417 self._config, 418 emit_connector_builder_messages=self._emit_connector_builder_messages, 419 ) 420 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 421 ] 422 return source_streams
The streams method is used as part of the AbstractSource in the following cases:
- ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
- ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by
streams) Note thatsuper.streams(config)is also called when splitting the streams between concurrent or not in_group_streams.
In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
477 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 478 """ 479 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 480 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 481 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 482 in the project root. 483 """ 484 return ( 485 self._spec_component.generate_spec() if self._spec_component else super().spec(logger) 486 )
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
488 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 489 check = self._source_config.get("check") 490 if not check: 491 raise ValueError(f"Missing 'check' component definition within the manifest.") 492 493 if "type" not in check: 494 check["type"] = "CheckStream" 495 connection_checker = self._constructor.create_component( 496 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 497 check, 498 dict(), 499 emit_connector_builder_messages=self._emit_connector_builder_messages, 500 ) 501 if not isinstance(connection_checker, ConnectionChecker): 502 raise ValueError( 503 f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" 504 ) 505 506 check_succeeded, error = connection_checker.check_connection(self, logger, self._config) 507 if not check_succeeded: 508 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 509 return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.