airbyte_cdk.sources.declarative.concurrent_declarative_source

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2
  3import json
  4import logging
  5import pkgutil
  6from copy import deepcopy
  7from dataclasses import dataclass, field
  8from queue import Queue
  9from types import ModuleType
 10from typing import (
 11    Any,
 12    ClassVar,
 13    Dict,
 14    Iterator,
 15    List,
 16    Mapping,
 17    Optional,
 18    Set,
 19)
 20
 21import orjson
 22import yaml
 23from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor
 24from jsonschema.exceptions import ValidationError
 25from jsonschema.validators import validate
 26
 27from airbyte_cdk.config_observation import create_connector_config_control_message
 28from airbyte_cdk.connector_builder.models import (
 29    LogMessage as ConnectorBuilderLogMessage,
 30)
 31from airbyte_cdk.manifest_migrations.migration_handler import (
 32    ManifestMigrationHandler,
 33)
 34from airbyte_cdk.models import (
 35    AirbyteCatalog,
 36    AirbyteConnectionStatus,
 37    AirbyteMessage,
 38    AirbyteStateMessage,
 39    ConfiguredAirbyteCatalog,
 40    ConnectorSpecification,
 41    FailureType,
 42    Status,
 43)
 44from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
 45from airbyte_cdk.sources import Source
 46from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 47from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 48from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 49from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 50from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 51from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
 52from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 53    ConcurrencyLevel as ConcurrencyLevelModel,
 54)
 55from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 56    DeclarativeStream as DeclarativeStreamModel,
 57)
 58from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 59    Spec as SpecModel,
 60)
 61from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 62    StateDelegatingStream as StateDelegatingStreamModel,
 63)
 64from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 65    get_registered_components_module,
 66)
 67from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 68    ManifestComponentTransformer,
 69)
 70from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
 71    ManifestNormalizer,
 72)
 73from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 74    ManifestReferenceResolver,
 75)
 76from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 77    ModelToComponentFactory,
 78)
 79from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
 80    GroupingPartitionRouter,
 81)
 82from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
 83    SubstreamPartitionRouter,
 84)
 85from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 86from airbyte_cdk.sources.declarative.spec.spec import Spec
 87from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
 88from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
 89from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
 90from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 91from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 92from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
 93from airbyte_cdk.sources.utils.slice_logger import (
 94    AlwaysLogSliceLogger,
 95    DebugSliceLogger,
 96    SliceLogger,
 97)
 98from airbyte_cdk.utils.stream_status_utils import as_airbyte_message
 99from airbyte_cdk.utils.traced_exception import AirbyteTracedException
100
101
102@dataclass
103class TestLimits:
104    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
105
106    DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
107    DEFAULT_MAX_SLICES: ClassVar[int] = 5
108    DEFAULT_MAX_RECORDS: ClassVar[int] = 100
109    DEFAULT_MAX_STREAMS: ClassVar[int] = 100
110
111    max_records: int = field(default=DEFAULT_MAX_RECORDS)
112    max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE)
113    max_slices: int = field(default=DEFAULT_MAX_SLICES)
114    max_streams: int = field(default=DEFAULT_MAX_STREAMS)
115
116
117def _get_declarative_component_schema() -> Dict[str, Any]:
118    try:
119        raw_component_schema = pkgutil.get_data(
120            "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
121        )
122        if raw_component_schema is not None:
123            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
124            return declarative_component_schema  # type: ignore
125        else:
126            raise RuntimeError(
127                "Failed to read manifest component json schema required for deduplication"
128            )
129    except FileNotFoundError as e:
130        raise FileNotFoundError(
131            f"Failed to read manifest component json schema required for deduplication: {e}"
132        )
133
134
135class ConcurrentDeclarativeSource(Source):
136    # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
137    # because it has hit the limit of futures but not partition reader is consuming them.
138    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
139
140    def __init__(
141        self,
142        catalog: Optional[ConfiguredAirbyteCatalog] = None,
143        config: Optional[Mapping[str, Any]] = None,
144        state: Optional[List[AirbyteStateMessage]] = None,
145        *,
146        source_config: ConnectionDefinition,
147        debug: bool = False,
148        emit_connector_builder_messages: bool = False,
149        migrate_manifest: bool = False,
150        normalize_manifest: bool = False,
151        limits: Optional[TestLimits] = None,
152        config_path: Optional[str] = None,
153        **kwargs: Any,
154    ) -> None:
155        self.logger = logging.getLogger(f"airbyte.{self.name}")
156
157        self._limits = limits
158
159        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
160        #  no longer needs to store the original incoming state. But maybe there's an edge case?
161        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
162
163        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
164        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
165        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
166        # information and might even need to be configurable depending on the source
167        queue: Queue[QueueItem] = Queue(maxsize=10_000)
168        message_repository = InMemoryMessageRepository(
169            Level.DEBUG if emit_connector_builder_messages else Level.INFO
170        )
171
172        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
173        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
174        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
175        # incremental streams running in full refresh.
176        component_factory = ModelToComponentFactory(
177            emit_connector_builder_messages=emit_connector_builder_messages,
178            message_repository=ConcurrentMessageRepository(queue, message_repository),
179            configured_catalog=catalog,
180            connector_state_manager=self._connector_state_manager,
181            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
182            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
183            limit_slices_fetched=limits.max_slices if limits else None,
184            disable_retries=True if limits else False,
185            disable_cache=True if limits else False,
186        )
187
188        self._should_normalize = normalize_manifest
189        self._should_migrate = migrate_manifest
190        self._declarative_component_schema = _get_declarative_component_schema()
191        # If custom components are needed, locate and/or register them.
192        self.components_module: ModuleType | None = get_registered_components_module(config=config)
193        # set additional attributes
194        self._debug = debug
195        self._emit_connector_builder_messages = emit_connector_builder_messages
196        self._constructor = (
197            component_factory
198            if component_factory
199            else ModelToComponentFactory(
200                emit_connector_builder_messages=emit_connector_builder_messages,
201                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
202            )
203        )
204
205        self._message_repository = self._constructor.get_message_repository()
206        self._slice_logger: SliceLogger = (
207            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
208        )
209
210        # resolve all components in the manifest
211        self._source_config = self._pre_process_manifest(dict(source_config))
212        # validate resolved manifest against the declarative component schema
213        self._validate_source()
214        # apply additional post-processing to the manifest
215        self._post_process_manifest()
216
217        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
218        self._spec_component: Optional[Spec] = (
219            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
220        )
221        self._config = self._migrate_and_transform_config(config_path, config) or {}
222
223        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
224        if concurrency_level_from_manifest:
225            concurrency_level_component = self._constructor.create_component(
226                model_type=ConcurrencyLevelModel,
227                component_definition=concurrency_level_from_manifest,
228                config=config or {},
229            )
230            if not isinstance(concurrency_level_component, ConcurrencyLevel):
231                raise ValueError(
232                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
233                )
234
235            concurrency_level = concurrency_level_component.get_concurrency_level()
236            initial_number_of_partitions_to_generate = max(
237                concurrency_level // 2, 1
238            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
239        else:
240            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
241            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
242
243        self._concurrent_source = ConcurrentSource.create(
244            num_workers=concurrency_level,
245            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
246            logger=self.logger,
247            slice_logger=self._slice_logger,
248            queue=queue,
249            message_repository=self._message_repository,
250        )
251
252    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
253        """
254        Preprocesses the provided manifest dictionary by resolving any manifest references.
255
256        This method modifies the input manifest in place, resolving references using the
257        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
258
259        Args:
260            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
261
262        Returns:
263            None
264        """
265        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
266        manifest = self._fix_source_type(manifest)
267        # Resolve references in the manifest
268        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
269        # Propagate types and parameters throughout the manifest
270        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
271            "", resolved_manifest, {}
272        )
273
274        return propagated_manifest
275
276    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
277        """
278        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
279        """
280        if "type" not in manifest:
281            manifest["type"] = "DeclarativeSource"
282
283        return manifest
284
285    def _post_process_manifest(self) -> None:
286        """
287        Post-processes the manifest after validation.
288        This method is responsible for any additional modifications or transformations needed
289        after the manifest has been validated and before it is used in the source.
290        """
291        # apply manifest migration, if required
292        self._migrate_manifest()
293        # apply manifest normalization, if required
294        self._normalize_manifest()
295
296    def _migrate_manifest(self) -> None:
297        """
298        This method is used to migrate the manifest. It should be called after the manifest has been validated.
299        The migration is done in place, so the original manifest is modified.
300
301        The original manifest is returned if any error occurs during migration.
302        """
303        if self._should_migrate:
304            manifest_migrator = ManifestMigrationHandler(self._source_config)
305            self._source_config = manifest_migrator.apply_migrations()
306            # validate migrated manifest against the declarative component schema
307            self._validate_source()
308
309    def _normalize_manifest(self) -> None:
310        """
311        This method is used to normalize the manifest. It should be called after the manifest has been validated.
312
313        Connector Builder UI rendering requires the manifest to be in a specific format.
314         - references have been resolved
315         - the commonly used definitions are extracted to the `definitions.linked.*`
316        """
317        if self._should_normalize:
318            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
319            self._source_config = normalizer.normalize()
320
321    def _validate_source(self) -> None:
322        """
323        Validates the connector manifest against the declarative component schema
324        """
325
326        try:
327            validate(self._source_config, self._declarative_component_schema)
328        except ValidationError as e:
329            raise ValidationError(
330                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
331            ) from e
332
333    def _migrate_and_transform_config(
334        self,
335        config_path: Optional[str],
336        config: Optional[Config],
337    ) -> Optional[Config]:
338        if not config:
339            return None
340        if not self._spec_component:
341            return config
342        mutable_config = dict(config)
343        self._spec_component.migrate_config(mutable_config)
344        if mutable_config != config:
345            if config_path:
346                with open(config_path, "w") as f:
347                    json.dump(mutable_config, f)
348            control_message = create_connector_config_control_message(mutable_config)
349            print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode())
350        self._spec_component.transform_config(mutable_config)
351        return mutable_config
352
353    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
354        config = self._config or config
355        return super().configure(config, temp_dir)
356
357    @property
358    def resolved_manifest(self) -> Mapping[str, Any]:
359        """
360        Returns the resolved manifest configuration for the source.
361
362        This property provides access to the internal source configuration as a mapping,
363        which contains all settings and parameters required to define the source's behavior.
364
365        Returns:
366            Mapping[str, Any]: The resolved source configuration manifest.
367        """
368        return self._source_config
369
370    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
371        return self._constructor.get_model_deprecations()
372
373    def read(
374        self,
375        logger: logging.Logger,
376        config: Mapping[str, Any],
377        catalog: ConfiguredAirbyteCatalog,
378        state: Optional[List[AirbyteStateMessage]] = None,
379    ) -> Iterator[AirbyteMessage]:
380        selected_concurrent_streams = self._select_streams(
381            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
382            configured_catalog=catalog,
383        )
384
385        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
386        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
387        if len(selected_concurrent_streams) > 0:
388            yield from self._concurrent_source.read(selected_concurrent_streams)
389
390    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
391        return AirbyteCatalog(
392            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
393        )
394
395    # todo: add PR comment about whether we can change the signature to List[AbstractStream]
396    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
397        """
398        The `streams` method is used as part of the AbstractSource in the following cases:
399        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
400        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
401        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
402
403        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
404        """
405
406        if self._spec_component:
407            self._spec_component.validate_config(self._config)
408
409        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
410
411        api_budget_model = self._source_config.get("api_budget")
412        if api_budget_model:
413            self._constructor.set_api_budget(api_budget_model, self._config)
414
415        prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
416
417        source_streams = [
418            self._constructor.create_component(
419                (
420                    StateDelegatingStreamModel
421                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
422                    else DeclarativeStreamModel
423                ),
424                stream_config,
425                self._config,
426                emit_connector_builder_messages=self._emit_connector_builder_messages,
427            )
428            for stream_config in prepared_configs
429        ]
430
431        self._apply_stream_groups(source_streams)
432
433        return source_streams
434
435    def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
436        """Set block_simultaneous_read on streams based on the manifest's stream_groups config.
437
438        Iterates over the resolved manifest's stream_groups and matches group membership
439        against actual created stream instances by name. Validates that no stream shares a
440        group with any of its parent streams, which would cause a deadlock.
441        """
442        stream_groups = self._source_config.get("stream_groups", {})
443        if not stream_groups:
444            return
445
446        # Build stream_name -> group_name mapping from the resolved manifest
447        stream_name_to_group: Dict[str, str] = {}
448        for group_name, group_config in stream_groups.items():
449            for stream_ref in group_config.get("streams", []):
450                if isinstance(stream_ref, dict):
451                    stream_name = stream_ref.get("name", "")
452                    if stream_name:
453                        stream_name_to_group[stream_name] = group_name
454
455        # Validate no stream shares a group with any of its ancestor streams
456        stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}
457
458        def _collect_all_ancestor_names(stream_name: str) -> Set[str]:
459            """Recursively collect all ancestor stream names."""
460            ancestors: Set[str] = set()
461            inst = stream_name_to_instance.get(stream_name)
462            if not isinstance(inst, DefaultStream):
463                return ancestors
464            router = inst.get_partition_router()
465            if isinstance(router, GroupingPartitionRouter):
466                router = router.underlying_partition_router
467            if not isinstance(router, SubstreamPartitionRouter):
468                return ancestors
469            for parent_config in router.parent_stream_configs:
470                parent_name = parent_config.stream.name
471                ancestors.add(parent_name)
472                ancestors.update(_collect_all_ancestor_names(parent_name))
473            return ancestors
474
475        for stream in streams:
476            if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
477                continue
478            group_name = stream_name_to_group[stream.name]
479            for ancestor_name in _collect_all_ancestor_names(stream.name):
480                if stream_name_to_group.get(ancestor_name) == group_name:
481                    raise ValueError(
482                        f"Stream '{stream.name}' and its parent stream '{ancestor_name}' "
483                        f"are both in group '{group_name}'. "
484                        f"A child stream must not share a group with its parent to avoid deadlock."
485                    )
486
487        # Apply group to matching stream instances
488        for stream in streams:
489            if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group:
490                stream.block_simultaneous_read = stream_name_to_group[stream.name]
491
492    @staticmethod
493    def _initialize_cache_for_parent_streams(
494        stream_configs: List[Dict[str, Any]],
495    ) -> List[Dict[str, Any]]:
496        """Enable caching for parent streams unless explicitly disabled.
497
498        Caching is enabled by default for parent streams to optimize performance when the same
499        parent data is needed by multiple child streams. However, explicit `use_cache: false`
500        settings are respected for streams that cannot use caching (e.g., scroll-based pagination
501        APIs where caching causes duplicate records).
502        """
503        parent_streams = set()
504
505        def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
506            """Set use_cache to True only if not explicitly disabled."""
507            if requester.get("use_cache") is not False:
508                requester["use_cache"] = True
509
510        def update_with_cache_parent_configs(
511            parent_configs: list[dict[str, Any]],
512        ) -> None:
513            for parent_config in parent_configs:
514                parent_streams.add(parent_config["stream"]["name"])
515                if parent_config["stream"]["type"] == "StateDelegatingStream":
516                    _set_cache_if_not_disabled(
517                        parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
518                    )
519                    _set_cache_if_not_disabled(
520                        parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
521                    )
522                else:
523                    _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
524
525        for stream_config in stream_configs:
526            if stream_config.get("incremental_sync", {}).get("parent_stream"):
527                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
528                _set_cache_if_not_disabled(
529                    stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
530                )
531
532            elif stream_config.get("retriever", {}).get("partition_router", {}):
533                partition_router = stream_config["retriever"]["partition_router"]
534
535                if isinstance(partition_router, dict) and partition_router.get(
536                    "parent_stream_configs"
537                ):
538                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
539                elif isinstance(partition_router, list):
540                    for router in partition_router:
541                        if router.get("parent_stream_configs"):
542                            update_with_cache_parent_configs(router["parent_stream_configs"])
543
544        for stream_config in stream_configs:
545            if stream_config["name"] in parent_streams:
546                if stream_config["type"] == "StateDelegatingStream":
547                    _set_cache_if_not_disabled(
548                        stream_config["full_refresh_stream"]["retriever"]["requester"]
549                    )
550                    _set_cache_if_not_disabled(
551                        stream_config["incremental_stream"]["retriever"]["requester"]
552                    )
553                else:
554                    _set_cache_if_not_disabled(stream_config["retriever"]["requester"])
555        return stream_configs
556
557    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
558        """
559        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
560        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
561        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
562        in the project root.
563        """
564        return (
565            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
566        )
567
568    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
569        check = self._source_config.get("check")
570        if not check:
571            raise ValueError(f"Missing 'check' component definition within the manifest.")
572
573        if "type" not in check:
574            check["type"] = "CheckStream"
575        connection_checker = self._constructor.create_component(
576            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
577            check,
578            dict(),
579            emit_connector_builder_messages=self._emit_connector_builder_messages,
580        )
581        if not isinstance(connection_checker, ConnectionChecker):
582            raise ValueError(
583                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
584            )
585
586        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
587        if not check_succeeded:
588            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
589        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
590
591    @property
592    def dynamic_streams(self) -> List[Dict[str, Any]]:
593        return self._dynamic_stream_configs(
594            manifest=self._source_config,
595            with_dynamic_stream_name=True,
596        )
597
598    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
599        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
600        stream_configs = []
601        for current_stream_config in manifest.get("streams", []):
602            if (
603                "type" in current_stream_config
604                and current_stream_config["type"] == "ConditionalStreams"
605            ):
606                interpolated_boolean = InterpolatedBoolean(
607                    condition=current_stream_config.get("condition"),
608                    parameters={},
609                )
610
611                if interpolated_boolean.eval(config=self._config):
612                    stream_configs.extend(current_stream_config.get("streams", []))
613            else:
614                if "type" not in current_stream_config:
615                    current_stream_config["type"] = "DeclarativeStream"
616                stream_configs.append(current_stream_config)
617        return stream_configs
618
619    def _dynamic_stream_configs(
620        self,
621        manifest: Mapping[str, Any],
622        with_dynamic_stream_name: Optional[bool] = None,
623    ) -> List[Dict[str, Any]]:
624        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
625        dynamic_stream_configs: List[Dict[str, Any]] = []
626        seen_dynamic_streams: Set[str] = set()
627
628        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
629            components_resolver_config = dynamic_definition["components_resolver"]
630
631            if not components_resolver_config:
632                raise ValueError(
633                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
634                )
635
636            resolver_type = components_resolver_config.get("type")
637            if not resolver_type:
638                raise ValueError(
639                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
640                )
641
642            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
643                raise ValueError(
644                    f"Invalid components resolver type '{resolver_type}'. "
645                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
646                )
647
648            if "retriever" in components_resolver_config:
649                components_resolver_config["retriever"]["requester"]["use_cache"] = True
650
651            # Create a resolver for dynamic components based on type
652            if resolver_type == "HttpComponentsResolver":
653                components_resolver = self._constructor.create_component(
654                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
655                    component_definition=components_resolver_config,
656                    config=self._config,
657                    stream_name=dynamic_definition.get("name"),
658                )
659            else:
660                components_resolver = self._constructor.create_component(
661                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
662                    component_definition=components_resolver_config,
663                    config=self._config,
664                )
665
666            stream_template_config = dynamic_definition["stream_template"]
667
668            for dynamic_stream in components_resolver.resolve_components(
669                stream_template_config=stream_template_config
670            ):
671                # Get the use_parent_parameters configuration from the dynamic definition
672                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
673                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
674
675                dynamic_stream = {
676                    **ManifestComponentTransformer().propagate_types_and_parameters(
677                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
678                    )
679                }
680
681                if "type" not in dynamic_stream:
682                    dynamic_stream["type"] = "DeclarativeStream"
683
684                # Ensure that each stream is created with a unique name
685                name = dynamic_stream.get("name")
686
687                if with_dynamic_stream_name:
688                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
689                        "name", f"dynamic_stream_{dynamic_definition_index}"
690                    )
691
692                if not isinstance(name, str):
693                    raise ValueError(
694                        f"Expected stream name {name} to be a string, got {type(name)}."
695                    )
696
697                if name in seen_dynamic_streams:
698                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
699                    failure_type = FailureType.system_error
700
701                    if resolver_type == "ConfigComponentsResolver":
702                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
703                        failure_type = FailureType.config_error
704
705                    raise AirbyteTracedException(
706                        message=error_message,
707                        internal_message=error_message,
708                        failure_type=failure_type,
709                    )
710
711                seen_dynamic_streams.add(name)
712                dynamic_stream_configs.append(dynamic_stream)
713
714        return dynamic_stream_configs
715
716    def _select_streams(
717        self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
718    ) -> List[AbstractStream]:
719        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
720        abstract_streams: List[AbstractStream] = []
721        for configured_stream in configured_catalog.streams:
722            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
723            if stream_instance:
724                abstract_streams.append(stream_instance)
725            else:
726                # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
727                # the source was configured with raise_exception_on_missing_stream=True. This was used on very
728                # few sources like facebook-marketing and google-ads. We decided not to port this feature over,
729                # but we can do so if we feel it necessary. With the current behavior,we should still result
730                # in a partial failure since missing streams will be marked as INCOMPLETE.
731                self._message_repository.emit_message(
732                    as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
733                )
734        return abstract_streams
@dataclass
class TestLimits:
103@dataclass
104class TestLimits:
105    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
106
107    DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
108    DEFAULT_MAX_SLICES: ClassVar[int] = 5
109    DEFAULT_MAX_RECORDS: ClassVar[int] = 100
110    DEFAULT_MAX_STREAMS: ClassVar[int] = 100
111
112    max_records: int = field(default=DEFAULT_MAX_RECORDS)
113    max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE)
114    max_slices: int = field(default=DEFAULT_MAX_SLICES)
115    max_streams: int = field(default=DEFAULT_MAX_STREAMS)
TestLimits( max_records: int = 100, max_pages_per_slice: int = 5, max_slices: int = 5, max_streams: int = 100)
DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
DEFAULT_MAX_SLICES: ClassVar[int] = 5
DEFAULT_MAX_RECORDS: ClassVar[int] = 100
DEFAULT_MAX_STREAMS: ClassVar[int] = 100
max_records: int = 100
max_pages_per_slice: int = 5
max_slices: int = 5
max_streams: int = 100
136class ConcurrentDeclarativeSource(Source):
137    # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
138    # because it has hit the limit of futures but not partition reader is consuming them.
139    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
140
141    def __init__(
142        self,
143        catalog: Optional[ConfiguredAirbyteCatalog] = None,
144        config: Optional[Mapping[str, Any]] = None,
145        state: Optional[List[AirbyteStateMessage]] = None,
146        *,
147        source_config: ConnectionDefinition,
148        debug: bool = False,
149        emit_connector_builder_messages: bool = False,
150        migrate_manifest: bool = False,
151        normalize_manifest: bool = False,
152        limits: Optional[TestLimits] = None,
153        config_path: Optional[str] = None,
154        **kwargs: Any,
155    ) -> None:
156        self.logger = logging.getLogger(f"airbyte.{self.name}")
157
158        self._limits = limits
159
160        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
161        #  no longer needs to store the original incoming state. But maybe there's an edge case?
162        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
163
164        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
165        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
166        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
167        # information and might even need to be configurable depending on the source
168        queue: Queue[QueueItem] = Queue(maxsize=10_000)
169        message_repository = InMemoryMessageRepository(
170            Level.DEBUG if emit_connector_builder_messages else Level.INFO
171        )
172
173        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
174        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
175        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
176        # incremental streams running in full refresh.
177        component_factory = ModelToComponentFactory(
178            emit_connector_builder_messages=emit_connector_builder_messages,
179            message_repository=ConcurrentMessageRepository(queue, message_repository),
180            configured_catalog=catalog,
181            connector_state_manager=self._connector_state_manager,
182            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
183            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
184            limit_slices_fetched=limits.max_slices if limits else None,
185            disable_retries=True if limits else False,
186            disable_cache=True if limits else False,
187        )
188
189        self._should_normalize = normalize_manifest
190        self._should_migrate = migrate_manifest
191        self._declarative_component_schema = _get_declarative_component_schema()
192        # If custom components are needed, locate and/or register them.
193        self.components_module: ModuleType | None = get_registered_components_module(config=config)
194        # set additional attributes
195        self._debug = debug
196        self._emit_connector_builder_messages = emit_connector_builder_messages
197        self._constructor = (
198            component_factory
199            if component_factory
200            else ModelToComponentFactory(
201                emit_connector_builder_messages=emit_connector_builder_messages,
202                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
203            )
204        )
205
206        self._message_repository = self._constructor.get_message_repository()
207        self._slice_logger: SliceLogger = (
208            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
209        )
210
211        # resolve all components in the manifest
212        self._source_config = self._pre_process_manifest(dict(source_config))
213        # validate resolved manifest against the declarative component schema
214        self._validate_source()
215        # apply additional post-processing to the manifest
216        self._post_process_manifest()
217
218        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
219        self._spec_component: Optional[Spec] = (
220            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
221        )
222        self._config = self._migrate_and_transform_config(config_path, config) or {}
223
224        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
225        if concurrency_level_from_manifest:
226            concurrency_level_component = self._constructor.create_component(
227                model_type=ConcurrencyLevelModel,
228                component_definition=concurrency_level_from_manifest,
229                config=config or {},
230            )
231            if not isinstance(concurrency_level_component, ConcurrencyLevel):
232                raise ValueError(
233                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
234                )
235
236            concurrency_level = concurrency_level_component.get_concurrency_level()
237            initial_number_of_partitions_to_generate = max(
238                concurrency_level // 2, 1
239            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
240        else:
241            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
242            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
243
244        self._concurrent_source = ConcurrentSource.create(
245            num_workers=concurrency_level,
246            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
247            logger=self.logger,
248            slice_logger=self._slice_logger,
249            queue=queue,
250            message_repository=self._message_repository,
251        )
252
253    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
254        """
255        Preprocesses the provided manifest dictionary by resolving any manifest references.
256
257        This method modifies the input manifest in place, resolving references using the
258        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
259
260        Args:
261            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
262
263        Returns:
264            None
265        """
266        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
267        manifest = self._fix_source_type(manifest)
268        # Resolve references in the manifest
269        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
270        # Propagate types and parameters throughout the manifest
271        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
272            "", resolved_manifest, {}
273        )
274
275        return propagated_manifest
276
277    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
278        """
279        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
280        """
281        if "type" not in manifest:
282            manifest["type"] = "DeclarativeSource"
283
284        return manifest
285
286    def _post_process_manifest(self) -> None:
287        """
288        Post-processes the manifest after validation.
289        This method is responsible for any additional modifications or transformations needed
290        after the manifest has been validated and before it is used in the source.
291        """
292        # apply manifest migration, if required
293        self._migrate_manifest()
294        # apply manifest normalization, if required
295        self._normalize_manifest()
296
297    def _migrate_manifest(self) -> None:
298        """
299        This method is used to migrate the manifest. It should be called after the manifest has been validated.
300        The migration is done in place, so the original manifest is modified.
301
302        The original manifest is returned if any error occurs during migration.
303        """
304        if self._should_migrate:
305            manifest_migrator = ManifestMigrationHandler(self._source_config)
306            self._source_config = manifest_migrator.apply_migrations()
307            # validate migrated manifest against the declarative component schema
308            self._validate_source()
309
310    def _normalize_manifest(self) -> None:
311        """
312        This method is used to normalize the manifest. It should be called after the manifest has been validated.
313
314        Connector Builder UI rendering requires the manifest to be in a specific format.
315         - references have been resolved
316         - the commonly used definitions are extracted to the `definitions.linked.*`
317        """
318        if self._should_normalize:
319            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
320            self._source_config = normalizer.normalize()
321
322    def _validate_source(self) -> None:
323        """
324        Validates the connector manifest against the declarative component schema
325        """
326
327        try:
328            validate(self._source_config, self._declarative_component_schema)
329        except ValidationError as e:
330            raise ValidationError(
331                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
332            ) from e
333
334    def _migrate_and_transform_config(
335        self,
336        config_path: Optional[str],
337        config: Optional[Config],
338    ) -> Optional[Config]:
339        if not config:
340            return None
341        if not self._spec_component:
342            return config
343        mutable_config = dict(config)
344        self._spec_component.migrate_config(mutable_config)
345        if mutable_config != config:
346            if config_path:
347                with open(config_path, "w") as f:
348                    json.dump(mutable_config, f)
349            control_message = create_connector_config_control_message(mutable_config)
350            print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode())
351        self._spec_component.transform_config(mutable_config)
352        return mutable_config
353
354    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
355        config = self._config or config
356        return super().configure(config, temp_dir)
357
358    @property
359    def resolved_manifest(self) -> Mapping[str, Any]:
360        """
361        Returns the resolved manifest configuration for the source.
362
363        This property provides access to the internal source configuration as a mapping,
364        which contains all settings and parameters required to define the source's behavior.
365
366        Returns:
367            Mapping[str, Any]: The resolved source configuration manifest.
368        """
369        return self._source_config
370
371    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
372        return self._constructor.get_model_deprecations()
373
374    def read(
375        self,
376        logger: logging.Logger,
377        config: Mapping[str, Any],
378        catalog: ConfiguredAirbyteCatalog,
379        state: Optional[List[AirbyteStateMessage]] = None,
380    ) -> Iterator[AirbyteMessage]:
381        selected_concurrent_streams = self._select_streams(
382            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
383            configured_catalog=catalog,
384        )
385
386        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
387        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
388        if len(selected_concurrent_streams) > 0:
389            yield from self._concurrent_source.read(selected_concurrent_streams)
390
391    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
392        return AirbyteCatalog(
393            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
394        )
395
396    # todo: add PR comment about whether we can change the signature to List[AbstractStream]
397    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
398        """
399        The `streams` method is used as part of the AbstractSource in the following cases:
400        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
401        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
402        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
403
404        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
405        """
406
407        if self._spec_component:
408            self._spec_component.validate_config(self._config)
409
410        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
411
412        api_budget_model = self._source_config.get("api_budget")
413        if api_budget_model:
414            self._constructor.set_api_budget(api_budget_model, self._config)
415
416        prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
417
418        source_streams = [
419            self._constructor.create_component(
420                (
421                    StateDelegatingStreamModel
422                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
423                    else DeclarativeStreamModel
424                ),
425                stream_config,
426                self._config,
427                emit_connector_builder_messages=self._emit_connector_builder_messages,
428            )
429            for stream_config in prepared_configs
430        ]
431
432        self._apply_stream_groups(source_streams)
433
434        return source_streams
435
436    def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
437        """Set block_simultaneous_read on streams based on the manifest's stream_groups config.
438
439        Iterates over the resolved manifest's stream_groups and matches group membership
440        against actual created stream instances by name. Validates that no stream shares a
441        group with any of its parent streams, which would cause a deadlock.
442        """
443        stream_groups = self._source_config.get("stream_groups", {})
444        if not stream_groups:
445            return
446
447        # Build stream_name -> group_name mapping from the resolved manifest
448        stream_name_to_group: Dict[str, str] = {}
449        for group_name, group_config in stream_groups.items():
450            for stream_ref in group_config.get("streams", []):
451                if isinstance(stream_ref, dict):
452                    stream_name = stream_ref.get("name", "")
453                    if stream_name:
454                        stream_name_to_group[stream_name] = group_name
455
456        # Validate no stream shares a group with any of its ancestor streams
457        stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}
458
459        def _collect_all_ancestor_names(stream_name: str) -> Set[str]:
460            """Recursively collect all ancestor stream names."""
461            ancestors: Set[str] = set()
462            inst = stream_name_to_instance.get(stream_name)
463            if not isinstance(inst, DefaultStream):
464                return ancestors
465            router = inst.get_partition_router()
466            if isinstance(router, GroupingPartitionRouter):
467                router = router.underlying_partition_router
468            if not isinstance(router, SubstreamPartitionRouter):
469                return ancestors
470            for parent_config in router.parent_stream_configs:
471                parent_name = parent_config.stream.name
472                ancestors.add(parent_name)
473                ancestors.update(_collect_all_ancestor_names(parent_name))
474            return ancestors
475
476        for stream in streams:
477            if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
478                continue
479            group_name = stream_name_to_group[stream.name]
480            for ancestor_name in _collect_all_ancestor_names(stream.name):
481                if stream_name_to_group.get(ancestor_name) == group_name:
482                    raise ValueError(
483                        f"Stream '{stream.name}' and its parent stream '{ancestor_name}' "
484                        f"are both in group '{group_name}'. "
485                        f"A child stream must not share a group with its parent to avoid deadlock."
486                    )
487
488        # Apply group to matching stream instances
489        for stream in streams:
490            if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group:
491                stream.block_simultaneous_read = stream_name_to_group[stream.name]
492
493    @staticmethod
494    def _initialize_cache_for_parent_streams(
495        stream_configs: List[Dict[str, Any]],
496    ) -> List[Dict[str, Any]]:
497        """Enable caching for parent streams unless explicitly disabled.
498
499        Caching is enabled by default for parent streams to optimize performance when the same
500        parent data is needed by multiple child streams. However, explicit `use_cache: false`
501        settings are respected for streams that cannot use caching (e.g., scroll-based pagination
502        APIs where caching causes duplicate records).
503        """
504        parent_streams = set()
505
506        def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
507            """Set use_cache to True only if not explicitly disabled."""
508            if requester.get("use_cache") is not False:
509                requester["use_cache"] = True
510
511        def update_with_cache_parent_configs(
512            parent_configs: list[dict[str, Any]],
513        ) -> None:
514            for parent_config in parent_configs:
515                parent_streams.add(parent_config["stream"]["name"])
516                if parent_config["stream"]["type"] == "StateDelegatingStream":
517                    _set_cache_if_not_disabled(
518                        parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
519                    )
520                    _set_cache_if_not_disabled(
521                        parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
522                    )
523                else:
524                    _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
525
526        for stream_config in stream_configs:
527            if stream_config.get("incremental_sync", {}).get("parent_stream"):
528                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
529                _set_cache_if_not_disabled(
530                    stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
531                )
532
533            elif stream_config.get("retriever", {}).get("partition_router", {}):
534                partition_router = stream_config["retriever"]["partition_router"]
535
536                if isinstance(partition_router, dict) and partition_router.get(
537                    "parent_stream_configs"
538                ):
539                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
540                elif isinstance(partition_router, list):
541                    for router in partition_router:
542                        if router.get("parent_stream_configs"):
543                            update_with_cache_parent_configs(router["parent_stream_configs"])
544
545        for stream_config in stream_configs:
546            if stream_config["name"] in parent_streams:
547                if stream_config["type"] == "StateDelegatingStream":
548                    _set_cache_if_not_disabled(
549                        stream_config["full_refresh_stream"]["retriever"]["requester"]
550                    )
551                    _set_cache_if_not_disabled(
552                        stream_config["incremental_stream"]["retriever"]["requester"]
553                    )
554                else:
555                    _set_cache_if_not_disabled(stream_config["retriever"]["requester"])
556        return stream_configs
557
558    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
559        """
560        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
561        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
562        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
563        in the project root.
564        """
565        return (
566            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
567        )
568
569    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
570        check = self._source_config.get("check")
571        if not check:
572            raise ValueError(f"Missing 'check' component definition within the manifest.")
573
574        if "type" not in check:
575            check["type"] = "CheckStream"
576        connection_checker = self._constructor.create_component(
577            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
578            check,
579            dict(),
580            emit_connector_builder_messages=self._emit_connector_builder_messages,
581        )
582        if not isinstance(connection_checker, ConnectionChecker):
583            raise ValueError(
584                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
585            )
586
587        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
588        if not check_succeeded:
589            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
590        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
591
592    @property
593    def dynamic_streams(self) -> List[Dict[str, Any]]:
594        return self._dynamic_stream_configs(
595            manifest=self._source_config,
596            with_dynamic_stream_name=True,
597        )
598
599    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
600        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
601        stream_configs = []
602        for current_stream_config in manifest.get("streams", []):
603            if (
604                "type" in current_stream_config
605                and current_stream_config["type"] == "ConditionalStreams"
606            ):
607                interpolated_boolean = InterpolatedBoolean(
608                    condition=current_stream_config.get("condition"),
609                    parameters={},
610                )
611
612                if interpolated_boolean.eval(config=self._config):
613                    stream_configs.extend(current_stream_config.get("streams", []))
614            else:
615                if "type" not in current_stream_config:
616                    current_stream_config["type"] = "DeclarativeStream"
617                stream_configs.append(current_stream_config)
618        return stream_configs
619
620    def _dynamic_stream_configs(
621        self,
622        manifest: Mapping[str, Any],
623        with_dynamic_stream_name: Optional[bool] = None,
624    ) -> List[Dict[str, Any]]:
625        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
626        dynamic_stream_configs: List[Dict[str, Any]] = []
627        seen_dynamic_streams: Set[str] = set()
628
629        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
630            components_resolver_config = dynamic_definition["components_resolver"]
631
632            if not components_resolver_config:
633                raise ValueError(
634                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
635                )
636
637            resolver_type = components_resolver_config.get("type")
638            if not resolver_type:
639                raise ValueError(
640                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
641                )
642
643            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
644                raise ValueError(
645                    f"Invalid components resolver type '{resolver_type}'. "
646                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
647                )
648
649            if "retriever" in components_resolver_config:
650                components_resolver_config["retriever"]["requester"]["use_cache"] = True
651
652            # Create a resolver for dynamic components based on type
653            if resolver_type == "HttpComponentsResolver":
654                components_resolver = self._constructor.create_component(
655                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
656                    component_definition=components_resolver_config,
657                    config=self._config,
658                    stream_name=dynamic_definition.get("name"),
659                )
660            else:
661                components_resolver = self._constructor.create_component(
662                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
663                    component_definition=components_resolver_config,
664                    config=self._config,
665                )
666
667            stream_template_config = dynamic_definition["stream_template"]
668
669            for dynamic_stream in components_resolver.resolve_components(
670                stream_template_config=stream_template_config
671            ):
672                # Get the use_parent_parameters configuration from the dynamic definition
673                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
674                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
675
676                dynamic_stream = {
677                    **ManifestComponentTransformer().propagate_types_and_parameters(
678                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
679                    )
680                }
681
682                if "type" not in dynamic_stream:
683                    dynamic_stream["type"] = "DeclarativeStream"
684
685                # Ensure that each stream is created with a unique name
686                name = dynamic_stream.get("name")
687
688                if with_dynamic_stream_name:
689                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
690                        "name", f"dynamic_stream_{dynamic_definition_index}"
691                    )
692
693                if not isinstance(name, str):
694                    raise ValueError(
695                        f"Expected stream name {name} to be a string, got {type(name)}."
696                    )
697
698                if name in seen_dynamic_streams:
699                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
700                    failure_type = FailureType.system_error
701
702                    if resolver_type == "ConfigComponentsResolver":
703                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
704                        failure_type = FailureType.config_error
705
706                    raise AirbyteTracedException(
707                        message=error_message,
708                        internal_message=error_message,
709                        failure_type=failure_type,
710                    )
711
712                seen_dynamic_streams.add(name)
713                dynamic_stream_configs.append(dynamic_stream)
714
715        return dynamic_stream_configs
716
717    def _select_streams(
718        self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
719    ) -> List[AbstractStream]:
720        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
721        abstract_streams: List[AbstractStream] = []
722        for configured_stream in configured_catalog.streams:
723            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
724            if stream_instance:
725                abstract_streams.append(stream_instance)
726            else:
727                # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
728                # the source was configured with raise_exception_on_missing_stream=True. This was used on very
729                # few sources like facebook-marketing and google-ads. We decided not to port this feature over,
730                # but we can do so if we feel it necessary. With the current behavior,we should still result
731                # in a partial failure since missing streams will be marked as INCOMPLETE.
732                self._message_repository.emit_message(
733                    as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
734                )
735        return abstract_streams

Helper class that provides a standard way to create an ABC using inheritance.

ConcurrentDeclarativeSource( catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None, *, source_config: Mapping[str, Any], debug: bool = False, emit_connector_builder_messages: bool = False, migrate_manifest: bool = False, normalize_manifest: bool = False, limits: Optional[TestLimits] = None, config_path: Optional[str] = None, **kwargs: Any)
141    def __init__(
142        self,
143        catalog: Optional[ConfiguredAirbyteCatalog] = None,
144        config: Optional[Mapping[str, Any]] = None,
145        state: Optional[List[AirbyteStateMessage]] = None,
146        *,
147        source_config: ConnectionDefinition,
148        debug: bool = False,
149        emit_connector_builder_messages: bool = False,
150        migrate_manifest: bool = False,
151        normalize_manifest: bool = False,
152        limits: Optional[TestLimits] = None,
153        config_path: Optional[str] = None,
154        **kwargs: Any,
155    ) -> None:
156        self.logger = logging.getLogger(f"airbyte.{self.name}")
157
158        self._limits = limits
159
160        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
161        #  no longer needs to store the original incoming state. But maybe there's an edge case?
162        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
163
164        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
165        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
166        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
167        # information and might even need to be configurable depending on the source
168        queue: Queue[QueueItem] = Queue(maxsize=10_000)
169        message_repository = InMemoryMessageRepository(
170            Level.DEBUG if emit_connector_builder_messages else Level.INFO
171        )
172
173        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
174        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
175        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
176        # incremental streams running in full refresh.
177        component_factory = ModelToComponentFactory(
178            emit_connector_builder_messages=emit_connector_builder_messages,
179            message_repository=ConcurrentMessageRepository(queue, message_repository),
180            configured_catalog=catalog,
181            connector_state_manager=self._connector_state_manager,
182            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
183            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
184            limit_slices_fetched=limits.max_slices if limits else None,
185            disable_retries=True if limits else False,
186            disable_cache=True if limits else False,
187        )
188
189        self._should_normalize = normalize_manifest
190        self._should_migrate = migrate_manifest
191        self._declarative_component_schema = _get_declarative_component_schema()
192        # If custom components are needed, locate and/or register them.
193        self.components_module: ModuleType | None = get_registered_components_module(config=config)
194        # set additional attributes
195        self._debug = debug
196        self._emit_connector_builder_messages = emit_connector_builder_messages
197        self._constructor = (
198            component_factory
199            if component_factory
200            else ModelToComponentFactory(
201                emit_connector_builder_messages=emit_connector_builder_messages,
202                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
203            )
204        )
205
206        self._message_repository = self._constructor.get_message_repository()
207        self._slice_logger: SliceLogger = (
208            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
209        )
210
211        # resolve all components in the manifest
212        self._source_config = self._pre_process_manifest(dict(source_config))
213        # validate resolved manifest against the declarative component schema
214        self._validate_source()
215        # apply additional post-processing to the manifest
216        self._post_process_manifest()
217
218        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
219        self._spec_component: Optional[Spec] = (
220            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
221        )
222        self._config = self._migrate_and_transform_config(config_path, config) or {}
223
224        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
225        if concurrency_level_from_manifest:
226            concurrency_level_component = self._constructor.create_component(
227                model_type=ConcurrencyLevelModel,
228                component_definition=concurrency_level_from_manifest,
229                config=config or {},
230            )
231            if not isinstance(concurrency_level_component, ConcurrencyLevel):
232                raise ValueError(
233                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
234                )
235
236            concurrency_level = concurrency_level_component.get_concurrency_level()
237            initial_number_of_partitions_to_generate = max(
238                concurrency_level // 2, 1
239            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
240        else:
241            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
242            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
243
244        self._concurrent_source = ConcurrentSource.create(
245            num_workers=concurrency_level,
246            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
247            logger=self.logger,
248            slice_logger=self._slice_logger,
249            queue=queue,
250            message_repository=self._message_repository,
251        )
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
358    @property
359    def resolved_manifest(self) -> Mapping[str, Any]:
360        """
361        Returns the resolved manifest configuration for the source.
362
363        This property provides access to the internal source configuration as a mapping,
364        which contains all settings and parameters required to define the source's behavior.
365
366        Returns:
367            Mapping[str, Any]: The resolved source configuration manifest.
368        """
369        return self._source_config

Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.

Returns:

Mapping[str, Any]: The resolved source configuration manifest.

def deprecation_warnings(self) -> List[airbyte_cdk.connector_builder.models.LogMessage]:
371    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
372        return self._constructor.get_model_deprecations()
def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
374    def read(
375        self,
376        logger: logging.Logger,
377        config: Mapping[str, Any],
378        catalog: ConfiguredAirbyteCatalog,
379        state: Optional[List[AirbyteStateMessage]] = None,
380    ) -> Iterator[AirbyteMessage]:
381        selected_concurrent_streams = self._select_streams(
382            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
383            configured_catalog=catalog,
384        )
385
386        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
387        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
388        if len(selected_concurrent_streams) > 0:
389            yield from self._concurrent_source.read(selected_concurrent_streams)

Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
391    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
392        return AirbyteCatalog(
393            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
394        )

Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream]:
397    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
398        """
399        The `streams` method is used as part of the AbstractSource in the following cases:
400        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
401        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
402        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
403
404        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
405        """
406
407        if self._spec_component:
408            self._spec_component.validate_config(self._config)
409
410        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
411
412        api_budget_model = self._source_config.get("api_budget")
413        if api_budget_model:
414            self._constructor.set_api_budget(api_budget_model, self._config)
415
416        prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
417
418        source_streams = [
419            self._constructor.create_component(
420                (
421                    StateDelegatingStreamModel
422                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
423                    else DeclarativeStreamModel
424                ),
425                stream_config,
426                self._config,
427                emit_connector_builder_messages=self._emit_connector_builder_messages,
428            )
429            for stream_config in prepared_configs
430        ]
431
432        self._apply_stream_groups(source_streams)
433
434        return source_streams

The streams method is used as part of the AbstractSource in the following cases:

  • ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
  • ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by streams) Note that super.streams(config) is also called when splitting the streams between concurrent or not in _group_streams.

In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.

def spec( self, logger: logging.Logger) -> airbyte_cdk.ConnectorSpecification:
558    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
559        """
560        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
561        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
562        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
563        in the project root.
564        """
565        return (
566            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
567        )

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
569    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
570        check = self._source_config.get("check")
571        if not check:
572            raise ValueError(f"Missing 'check' component definition within the manifest.")
573
574        if "type" not in check:
575            check["type"] = "CheckStream"
576        connection_checker = self._constructor.create_component(
577            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
578            check,
579            dict(),
580            emit_connector_builder_messages=self._emit_connector_builder_messages,
581        )
582        if not isinstance(connection_checker, ConnectionChecker):
583            raise ValueError(
584                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
585            )
586
587        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
588        if not check_succeeded:
589            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
590        return AirbyteConnectionStatus(status=Status.SUCCEEDED)

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

dynamic_streams: List[Dict[str, Any]]
592    @property
593    def dynamic_streams(self) -> List[Dict[str, Any]]:
594        return self._dynamic_stream_configs(
595            manifest=self._source_config,
596            with_dynamic_stream_name=True,
597        )