airbyte_cdk.sources.declarative.concurrent_declarative_source

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2
  3import json
  4import logging
  5import pkgutil
  6from copy import deepcopy
  7from dataclasses import dataclass, field
  8from queue import Queue
  9from types import ModuleType
 10from typing import (
 11    Any,
 12    ClassVar,
 13    Dict,
 14    Iterator,
 15    List,
 16    Mapping,
 17    Optional,
 18    Set,
 19)
 20
 21import orjson
 22import yaml
 23from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor
 24from jsonschema.exceptions import ValidationError
 25from jsonschema.validators import validate
 26
 27from airbyte_cdk.config_observation import create_connector_config_control_message
 28from airbyte_cdk.connector_builder.models import (
 29    LogMessage as ConnectorBuilderLogMessage,
 30)
 31from airbyte_cdk.manifest_migrations.migration_handler import (
 32    ManifestMigrationHandler,
 33)
 34from airbyte_cdk.models import (
 35    AirbyteCatalog,
 36    AirbyteConnectionStatus,
 37    AirbyteMessage,
 38    AirbyteStateMessage,
 39    ConfiguredAirbyteCatalog,
 40    ConnectorSpecification,
 41    FailureType,
 42    Status,
 43)
 44from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
 45from airbyte_cdk.sources import Source
 46from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 47from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 48from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 49from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 50from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 51from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
 52from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 53    ConcurrencyLevel as ConcurrencyLevelModel,
 54)
 55from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 56    DeclarativeStream as DeclarativeStreamModel,
 57)
 58from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 59    Spec as SpecModel,
 60)
 61from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 62    StateDelegatingStream as StateDelegatingStreamModel,
 63)
 64from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 65    get_registered_components_module,
 66)
 67from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 68    ManifestComponentTransformer,
 69)
 70from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
 71    ManifestNormalizer,
 72)
 73from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 74    ManifestReferenceResolver,
 75)
 76from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 77    ModelToComponentFactory,
 78)
 79from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 80from airbyte_cdk.sources.declarative.spec.spec import Spec
 81from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
 82from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
 83from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
 84from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 85from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
 86from airbyte_cdk.sources.utils.slice_logger import (
 87    AlwaysLogSliceLogger,
 88    DebugSliceLogger,
 89    SliceLogger,
 90)
 91from airbyte_cdk.utils.stream_status_utils import as_airbyte_message
 92from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 93
 94
 95@dataclass
 96class TestLimits:
 97    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 98
 99    DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
100    DEFAULT_MAX_SLICES: ClassVar[int] = 5
101    DEFAULT_MAX_RECORDS: ClassVar[int] = 100
102    DEFAULT_MAX_STREAMS: ClassVar[int] = 100
103
104    max_records: int = field(default=DEFAULT_MAX_RECORDS)
105    max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE)
106    max_slices: int = field(default=DEFAULT_MAX_SLICES)
107    max_streams: int = field(default=DEFAULT_MAX_STREAMS)
108
109
110def _get_declarative_component_schema() -> Dict[str, Any]:
111    try:
112        raw_component_schema = pkgutil.get_data(
113            "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
114        )
115        if raw_component_schema is not None:
116            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
117            return declarative_component_schema  # type: ignore
118        else:
119            raise RuntimeError(
120                "Failed to read manifest component json schema required for deduplication"
121            )
122    except FileNotFoundError as e:
123        raise FileNotFoundError(
124            f"Failed to read manifest component json schema required for deduplication: {e}"
125        )
126
127
128class ConcurrentDeclarativeSource(Source):
129    # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
130    # because it has hit the limit of futures but not partition reader is consuming them.
131    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
132
133    def __init__(
134        self,
135        catalog: Optional[ConfiguredAirbyteCatalog] = None,
136        config: Optional[Mapping[str, Any]] = None,
137        state: Optional[List[AirbyteStateMessage]] = None,
138        *,
139        source_config: ConnectionDefinition,
140        debug: bool = False,
141        emit_connector_builder_messages: bool = False,
142        migrate_manifest: bool = False,
143        normalize_manifest: bool = False,
144        limits: Optional[TestLimits] = None,
145        config_path: Optional[str] = None,
146        **kwargs: Any,
147    ) -> None:
148        self.logger = logging.getLogger(f"airbyte.{self.name}")
149
150        self._limits = limits
151
152        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
153        #  no longer needs to store the original incoming state. But maybe there's an edge case?
154        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
155
156        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
157        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
158        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
159        # information and might even need to be configurable depending on the source
160        queue: Queue[QueueItem] = Queue(maxsize=10_000)
161        message_repository = InMemoryMessageRepository(
162            Level.DEBUG if emit_connector_builder_messages else Level.INFO
163        )
164
165        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
166        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
167        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
168        # incremental streams running in full refresh.
169        component_factory = ModelToComponentFactory(
170            emit_connector_builder_messages=emit_connector_builder_messages,
171            message_repository=ConcurrentMessageRepository(queue, message_repository),
172            configured_catalog=catalog,
173            connector_state_manager=self._connector_state_manager,
174            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
175            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
176            limit_slices_fetched=limits.max_slices if limits else None,
177            disable_retries=True if limits else False,
178            disable_cache=True if limits else False,
179        )
180
181        self._should_normalize = normalize_manifest
182        self._should_migrate = migrate_manifest
183        self._declarative_component_schema = _get_declarative_component_schema()
184        # If custom components are needed, locate and/or register them.
185        self.components_module: ModuleType | None = get_registered_components_module(config=config)
186        # set additional attributes
187        self._debug = debug
188        self._emit_connector_builder_messages = emit_connector_builder_messages
189        self._constructor = (
190            component_factory
191            if component_factory
192            else ModelToComponentFactory(
193                emit_connector_builder_messages=emit_connector_builder_messages,
194                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
195            )
196        )
197
198        self._message_repository = self._constructor.get_message_repository()
199        self._slice_logger: SliceLogger = (
200            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
201        )
202
203        # resolve all components in the manifest
204        self._source_config = self._pre_process_manifest(dict(source_config))
205        # validate resolved manifest against the declarative component schema
206        self._validate_source()
207        # apply additional post-processing to the manifest
208        self._post_process_manifest()
209
210        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
211        self._spec_component: Optional[Spec] = (
212            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
213        )
214        self._config = self._migrate_and_transform_config(config_path, config) or {}
215
216        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
217        if concurrency_level_from_manifest:
218            concurrency_level_component = self._constructor.create_component(
219                model_type=ConcurrencyLevelModel,
220                component_definition=concurrency_level_from_manifest,
221                config=config or {},
222            )
223            if not isinstance(concurrency_level_component, ConcurrencyLevel):
224                raise ValueError(
225                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
226                )
227
228            concurrency_level = concurrency_level_component.get_concurrency_level()
229            initial_number_of_partitions_to_generate = max(
230                concurrency_level // 2, 1
231            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
232        else:
233            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
234            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
235
236        self._concurrent_source = ConcurrentSource.create(
237            num_workers=concurrency_level,
238            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
239            logger=self.logger,
240            slice_logger=self._slice_logger,
241            queue=queue,
242            message_repository=self._message_repository,
243        )
244
245    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
246        """
247        Preprocesses the provided manifest dictionary by resolving any manifest references.
248
249        This method modifies the input manifest in place, resolving references using the
250        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
251
252        Args:
253            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
254
255        Returns:
256            None
257        """
258        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
259        manifest = self._fix_source_type(manifest)
260        # Resolve references in the manifest
261        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
262        # Propagate types and parameters throughout the manifest
263        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
264            "", resolved_manifest, {}
265        )
266
267        return propagated_manifest
268
269    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
270        """
271        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
272        """
273        if "type" not in manifest:
274            manifest["type"] = "DeclarativeSource"
275
276        return manifest
277
278    def _post_process_manifest(self) -> None:
279        """
280        Post-processes the manifest after validation.
281        This method is responsible for any additional modifications or transformations needed
282        after the manifest has been validated and before it is used in the source.
283        """
284        # apply manifest migration, if required
285        self._migrate_manifest()
286        # apply manifest normalization, if required
287        self._normalize_manifest()
288
289    def _migrate_manifest(self) -> None:
290        """
291        This method is used to migrate the manifest. It should be called after the manifest has been validated.
292        The migration is done in place, so the original manifest is modified.
293
294        The original manifest is returned if any error occurs during migration.
295        """
296        if self._should_migrate:
297            manifest_migrator = ManifestMigrationHandler(self._source_config)
298            self._source_config = manifest_migrator.apply_migrations()
299            # validate migrated manifest against the declarative component schema
300            self._validate_source()
301
302    def _normalize_manifest(self) -> None:
303        """
304        This method is used to normalize the manifest. It should be called after the manifest has been validated.
305
306        Connector Builder UI rendering requires the manifest to be in a specific format.
307         - references have been resolved
308         - the commonly used definitions are extracted to the `definitions.linked.*`
309        """
310        if self._should_normalize:
311            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
312            self._source_config = normalizer.normalize()
313
314    def _validate_source(self) -> None:
315        """
316        Validates the connector manifest against the declarative component schema
317        """
318
319        try:
320            validate(self._source_config, self._declarative_component_schema)
321        except ValidationError as e:
322            raise ValidationError(
323                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
324            ) from e
325
326    def _migrate_and_transform_config(
327        self,
328        config_path: Optional[str],
329        config: Optional[Config],
330    ) -> Optional[Config]:
331        if not config:
332            return None
333        if not self._spec_component:
334            return config
335        mutable_config = dict(config)
336        self._spec_component.migrate_config(mutable_config)
337        if mutable_config != config:
338            if config_path:
339                with open(config_path, "w") as f:
340                    json.dump(mutable_config, f)
341            control_message = create_connector_config_control_message(mutable_config)
342            print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode())
343        self._spec_component.transform_config(mutable_config)
344        return mutable_config
345
346    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
347        config = self._config or config
348        return super().configure(config, temp_dir)
349
350    @property
351    def resolved_manifest(self) -> Mapping[str, Any]:
352        """
353        Returns the resolved manifest configuration for the source.
354
355        This property provides access to the internal source configuration as a mapping,
356        which contains all settings and parameters required to define the source's behavior.
357
358        Returns:
359            Mapping[str, Any]: The resolved source configuration manifest.
360        """
361        return self._source_config
362
363    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
364        return self._constructor.get_model_deprecations()
365
366    def read(
367        self,
368        logger: logging.Logger,
369        config: Mapping[str, Any],
370        catalog: ConfiguredAirbyteCatalog,
371        state: Optional[List[AirbyteStateMessage]] = None,
372    ) -> Iterator[AirbyteMessage]:
373        selected_concurrent_streams = self._select_streams(
374            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
375            configured_catalog=catalog,
376        )
377
378        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
379        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
380        if len(selected_concurrent_streams) > 0:
381            yield from self._concurrent_source.read(selected_concurrent_streams)
382
383    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
384        return AirbyteCatalog(
385            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
386        )
387
388    # todo: add PR comment about whether we can change the signature to List[AbstractStream]
389    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
390        """
391        The `streams` method is used as part of the AbstractSource in the following cases:
392        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
393        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
394        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
395
396        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
397        """
398
399        if self._spec_component:
400            self._spec_component.validate_config(self._config)
401
402        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
403
404        api_budget_model = self._source_config.get("api_budget")
405        if api_budget_model:
406            self._constructor.set_api_budget(api_budget_model, self._config)
407
408        source_streams = [
409            self._constructor.create_component(
410                (
411                    StateDelegatingStreamModel
412                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
413                    else DeclarativeStreamModel
414                ),
415                stream_config,
416                self._config,
417                emit_connector_builder_messages=self._emit_connector_builder_messages,
418            )
419            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
420        ]
421        return source_streams
422
423    @staticmethod
424    def _initialize_cache_for_parent_streams(
425        stream_configs: List[Dict[str, Any]],
426    ) -> List[Dict[str, Any]]:
427        """Enable caching for parent streams unless explicitly disabled.
428
429        Caching is enabled by default for parent streams to optimize performance when the same
430        parent data is needed by multiple child streams. However, explicit `use_cache: false`
431        settings are respected for streams that cannot use caching (e.g., scroll-based pagination
432        APIs where caching causes duplicate records).
433        """
434        parent_streams = set()
435
436        def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
437            """Set use_cache to True only if not explicitly disabled."""
438            if requester.get("use_cache") is not False:
439                requester["use_cache"] = True
440
441        def update_with_cache_parent_configs(
442            parent_configs: list[dict[str, Any]],
443        ) -> None:
444            for parent_config in parent_configs:
445                parent_streams.add(parent_config["stream"]["name"])
446                if parent_config["stream"]["type"] == "StateDelegatingStream":
447                    _set_cache_if_not_disabled(
448                        parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
449                    )
450                    _set_cache_if_not_disabled(
451                        parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
452                    )
453                else:
454                    _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
455
456        for stream_config in stream_configs:
457            if stream_config.get("incremental_sync", {}).get("parent_stream"):
458                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
459                _set_cache_if_not_disabled(
460                    stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
461                )
462
463            elif stream_config.get("retriever", {}).get("partition_router", {}):
464                partition_router = stream_config["retriever"]["partition_router"]
465
466                if isinstance(partition_router, dict) and partition_router.get(
467                    "parent_stream_configs"
468                ):
469                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
470                elif isinstance(partition_router, list):
471                    for router in partition_router:
472                        if router.get("parent_stream_configs"):
473                            update_with_cache_parent_configs(router["parent_stream_configs"])
474
475        for stream_config in stream_configs:
476            if stream_config["name"] in parent_streams:
477                if stream_config["type"] == "StateDelegatingStream":
478                    _set_cache_if_not_disabled(
479                        stream_config["full_refresh_stream"]["retriever"]["requester"]
480                    )
481                    _set_cache_if_not_disabled(
482                        stream_config["incremental_stream"]["retriever"]["requester"]
483                    )
484                else:
485                    _set_cache_if_not_disabled(stream_config["retriever"]["requester"])
486        return stream_configs
487
488    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
489        """
490        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
491        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
492        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
493        in the project root.
494        """
495        return (
496            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
497        )
498
499    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
500        check = self._source_config.get("check")
501        if not check:
502            raise ValueError(f"Missing 'check' component definition within the manifest.")
503
504        if "type" not in check:
505            check["type"] = "CheckStream"
506        connection_checker = self._constructor.create_component(
507            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
508            check,
509            dict(),
510            emit_connector_builder_messages=self._emit_connector_builder_messages,
511        )
512        if not isinstance(connection_checker, ConnectionChecker):
513            raise ValueError(
514                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
515            )
516
517        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
518        if not check_succeeded:
519            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
520        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
521
522    @property
523    def dynamic_streams(self) -> List[Dict[str, Any]]:
524        return self._dynamic_stream_configs(
525            manifest=self._source_config,
526            with_dynamic_stream_name=True,
527        )
528
529    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
530        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
531        stream_configs = []
532        for current_stream_config in manifest.get("streams", []):
533            if (
534                "type" in current_stream_config
535                and current_stream_config["type"] == "ConditionalStreams"
536            ):
537                interpolated_boolean = InterpolatedBoolean(
538                    condition=current_stream_config.get("condition"),
539                    parameters={},
540                )
541
542                if interpolated_boolean.eval(config=self._config):
543                    stream_configs.extend(current_stream_config.get("streams", []))
544            else:
545                if "type" not in current_stream_config:
546                    current_stream_config["type"] = "DeclarativeStream"
547                stream_configs.append(current_stream_config)
548        return stream_configs
549
550    def _dynamic_stream_configs(
551        self,
552        manifest: Mapping[str, Any],
553        with_dynamic_stream_name: Optional[bool] = None,
554    ) -> List[Dict[str, Any]]:
555        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
556        dynamic_stream_configs: List[Dict[str, Any]] = []
557        seen_dynamic_streams: Set[str] = set()
558
559        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
560            components_resolver_config = dynamic_definition["components_resolver"]
561
562            if not components_resolver_config:
563                raise ValueError(
564                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
565                )
566
567            resolver_type = components_resolver_config.get("type")
568            if not resolver_type:
569                raise ValueError(
570                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
571                )
572
573            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
574                raise ValueError(
575                    f"Invalid components resolver type '{resolver_type}'. "
576                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
577                )
578
579            if "retriever" in components_resolver_config:
580                components_resolver_config["retriever"]["requester"]["use_cache"] = True
581
582            # Create a resolver for dynamic components based on type
583            if resolver_type == "HttpComponentsResolver":
584                components_resolver = self._constructor.create_component(
585                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
586                    component_definition=components_resolver_config,
587                    config=self._config,
588                    stream_name=dynamic_definition.get("name"),
589                )
590            else:
591                components_resolver = self._constructor.create_component(
592                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
593                    component_definition=components_resolver_config,
594                    config=self._config,
595                )
596
597            stream_template_config = dynamic_definition["stream_template"]
598
599            for dynamic_stream in components_resolver.resolve_components(
600                stream_template_config=stream_template_config
601            ):
602                # Get the use_parent_parameters configuration from the dynamic definition
603                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
604                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
605
606                dynamic_stream = {
607                    **ManifestComponentTransformer().propagate_types_and_parameters(
608                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
609                    )
610                }
611
612                if "type" not in dynamic_stream:
613                    dynamic_stream["type"] = "DeclarativeStream"
614
615                # Ensure that each stream is created with a unique name
616                name = dynamic_stream.get("name")
617
618                if with_dynamic_stream_name:
619                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
620                        "name", f"dynamic_stream_{dynamic_definition_index}"
621                    )
622
623                if not isinstance(name, str):
624                    raise ValueError(
625                        f"Expected stream name {name} to be a string, got {type(name)}."
626                    )
627
628                if name in seen_dynamic_streams:
629                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
630                    failure_type = FailureType.system_error
631
632                    if resolver_type == "ConfigComponentsResolver":
633                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
634                        failure_type = FailureType.config_error
635
636                    raise AirbyteTracedException(
637                        message=error_message,
638                        internal_message=error_message,
639                        failure_type=failure_type,
640                    )
641
642                seen_dynamic_streams.add(name)
643                dynamic_stream_configs.append(dynamic_stream)
644
645        return dynamic_stream_configs
646
647    def _select_streams(
648        self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
649    ) -> List[AbstractStream]:
650        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
651        abstract_streams: List[AbstractStream] = []
652        for configured_stream in configured_catalog.streams:
653            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
654            if stream_instance:
655                abstract_streams.append(stream_instance)
656            else:
657                # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
658                # the source was configured with raise_exception_on_missing_stream=True. This was used on very
659                # few sources like facebook-marketing and google-ads. We decided not to port this feature over,
660                # but we can do so if we feel it necessary. With the current behavior,we should still result
661                # in a partial failure since missing streams will be marked as INCOMPLETE.
662                self._message_repository.emit_message(
663                    as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
664                )
665        return abstract_streams
@dataclass
class TestLimits:
 96@dataclass
 97class TestLimits:
 98    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 99
100    DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
101    DEFAULT_MAX_SLICES: ClassVar[int] = 5
102    DEFAULT_MAX_RECORDS: ClassVar[int] = 100
103    DEFAULT_MAX_STREAMS: ClassVar[int] = 100
104
105    max_records: int = field(default=DEFAULT_MAX_RECORDS)
106    max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE)
107    max_slices: int = field(default=DEFAULT_MAX_SLICES)
108    max_streams: int = field(default=DEFAULT_MAX_STREAMS)
TestLimits( max_records: int = 100, max_pages_per_slice: int = 5, max_slices: int = 5, max_streams: int = 100)
DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
DEFAULT_MAX_SLICES: ClassVar[int] = 5
DEFAULT_MAX_RECORDS: ClassVar[int] = 100
DEFAULT_MAX_STREAMS: ClassVar[int] = 100
max_records: int = 100
max_pages_per_slice: int = 5
max_slices: int = 5
max_streams: int = 100
129class ConcurrentDeclarativeSource(Source):
130    # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
131    # because it has hit the limit of futures but not partition reader is consuming them.
132    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
133
134    def __init__(
135        self,
136        catalog: Optional[ConfiguredAirbyteCatalog] = None,
137        config: Optional[Mapping[str, Any]] = None,
138        state: Optional[List[AirbyteStateMessage]] = None,
139        *,
140        source_config: ConnectionDefinition,
141        debug: bool = False,
142        emit_connector_builder_messages: bool = False,
143        migrate_manifest: bool = False,
144        normalize_manifest: bool = False,
145        limits: Optional[TestLimits] = None,
146        config_path: Optional[str] = None,
147        **kwargs: Any,
148    ) -> None:
149        self.logger = logging.getLogger(f"airbyte.{self.name}")
150
151        self._limits = limits
152
153        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
154        #  no longer needs to store the original incoming state. But maybe there's an edge case?
155        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
156
157        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
158        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
159        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
160        # information and might even need to be configurable depending on the source
161        queue: Queue[QueueItem] = Queue(maxsize=10_000)
162        message_repository = InMemoryMessageRepository(
163            Level.DEBUG if emit_connector_builder_messages else Level.INFO
164        )
165
166        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
167        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
168        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
169        # incremental streams running in full refresh.
170        component_factory = ModelToComponentFactory(
171            emit_connector_builder_messages=emit_connector_builder_messages,
172            message_repository=ConcurrentMessageRepository(queue, message_repository),
173            configured_catalog=catalog,
174            connector_state_manager=self._connector_state_manager,
175            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
176            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
177            limit_slices_fetched=limits.max_slices if limits else None,
178            disable_retries=True if limits else False,
179            disable_cache=True if limits else False,
180        )
181
182        self._should_normalize = normalize_manifest
183        self._should_migrate = migrate_manifest
184        self._declarative_component_schema = _get_declarative_component_schema()
185        # If custom components are needed, locate and/or register them.
186        self.components_module: ModuleType | None = get_registered_components_module(config=config)
187        # set additional attributes
188        self._debug = debug
189        self._emit_connector_builder_messages = emit_connector_builder_messages
190        self._constructor = (
191            component_factory
192            if component_factory
193            else ModelToComponentFactory(
194                emit_connector_builder_messages=emit_connector_builder_messages,
195                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
196            )
197        )
198
199        self._message_repository = self._constructor.get_message_repository()
200        self._slice_logger: SliceLogger = (
201            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
202        )
203
204        # resolve all components in the manifest
205        self._source_config = self._pre_process_manifest(dict(source_config))
206        # validate resolved manifest against the declarative component schema
207        self._validate_source()
208        # apply additional post-processing to the manifest
209        self._post_process_manifest()
210
211        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
212        self._spec_component: Optional[Spec] = (
213            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
214        )
215        self._config = self._migrate_and_transform_config(config_path, config) or {}
216
217        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
218        if concurrency_level_from_manifest:
219            concurrency_level_component = self._constructor.create_component(
220                model_type=ConcurrencyLevelModel,
221                component_definition=concurrency_level_from_manifest,
222                config=config or {},
223            )
224            if not isinstance(concurrency_level_component, ConcurrencyLevel):
225                raise ValueError(
226                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
227                )
228
229            concurrency_level = concurrency_level_component.get_concurrency_level()
230            initial_number_of_partitions_to_generate = max(
231                concurrency_level // 2, 1
232            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
233        else:
234            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
235            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
236
237        self._concurrent_source = ConcurrentSource.create(
238            num_workers=concurrency_level,
239            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
240            logger=self.logger,
241            slice_logger=self._slice_logger,
242            queue=queue,
243            message_repository=self._message_repository,
244        )
245
246    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
247        """
248        Preprocesses the provided manifest dictionary by resolving any manifest references.
249
250        This method modifies the input manifest in place, resolving references using the
251        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
252
253        Args:
254            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
255
256        Returns:
257            None
258        """
259        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
260        manifest = self._fix_source_type(manifest)
261        # Resolve references in the manifest
262        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
263        # Propagate types and parameters throughout the manifest
264        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
265            "", resolved_manifest, {}
266        )
267
268        return propagated_manifest
269
270    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
271        """
272        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
273        """
274        if "type" not in manifest:
275            manifest["type"] = "DeclarativeSource"
276
277        return manifest
278
279    def _post_process_manifest(self) -> None:
280        """
281        Post-processes the manifest after validation.
282        This method is responsible for any additional modifications or transformations needed
283        after the manifest has been validated and before it is used in the source.
284        """
285        # apply manifest migration, if required
286        self._migrate_manifest()
287        # apply manifest normalization, if required
288        self._normalize_manifest()
289
290    def _migrate_manifest(self) -> None:
291        """
292        This method is used to migrate the manifest. It should be called after the manifest has been validated.
293        The migration is done in place, so the original manifest is modified.
294
295        The original manifest is returned if any error occurs during migration.
296        """
297        if self._should_migrate:
298            manifest_migrator = ManifestMigrationHandler(self._source_config)
299            self._source_config = manifest_migrator.apply_migrations()
300            # validate migrated manifest against the declarative component schema
301            self._validate_source()
302
303    def _normalize_manifest(self) -> None:
304        """
305        This method is used to normalize the manifest. It should be called after the manifest has been validated.
306
307        Connector Builder UI rendering requires the manifest to be in a specific format.
308         - references have been resolved
309         - the commonly used definitions are extracted to the `definitions.linked.*`
310        """
311        if self._should_normalize:
312            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
313            self._source_config = normalizer.normalize()
314
315    def _validate_source(self) -> None:
316        """
317        Validates the connector manifest against the declarative component schema
318        """
319
320        try:
321            validate(self._source_config, self._declarative_component_schema)
322        except ValidationError as e:
323            raise ValidationError(
324                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
325            ) from e
326
327    def _migrate_and_transform_config(
328        self,
329        config_path: Optional[str],
330        config: Optional[Config],
331    ) -> Optional[Config]:
332        if not config:
333            return None
334        if not self._spec_component:
335            return config
336        mutable_config = dict(config)
337        self._spec_component.migrate_config(mutable_config)
338        if mutable_config != config:
339            if config_path:
340                with open(config_path, "w") as f:
341                    json.dump(mutable_config, f)
342            control_message = create_connector_config_control_message(mutable_config)
343            print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode())
344        self._spec_component.transform_config(mutable_config)
345        return mutable_config
346
347    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
348        config = self._config or config
349        return super().configure(config, temp_dir)
350
351    @property
352    def resolved_manifest(self) -> Mapping[str, Any]:
353        """
354        Returns the resolved manifest configuration for the source.
355
356        This property provides access to the internal source configuration as a mapping,
357        which contains all settings and parameters required to define the source's behavior.
358
359        Returns:
360            Mapping[str, Any]: The resolved source configuration manifest.
361        """
362        return self._source_config
363
364    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
365        return self._constructor.get_model_deprecations()
366
367    def read(
368        self,
369        logger: logging.Logger,
370        config: Mapping[str, Any],
371        catalog: ConfiguredAirbyteCatalog,
372        state: Optional[List[AirbyteStateMessage]] = None,
373    ) -> Iterator[AirbyteMessage]:
374        selected_concurrent_streams = self._select_streams(
375            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
376            configured_catalog=catalog,
377        )
378
379        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
380        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
381        if len(selected_concurrent_streams) > 0:
382            yield from self._concurrent_source.read(selected_concurrent_streams)
383
384    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
385        return AirbyteCatalog(
386            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
387        )
388
389    # todo: add PR comment about whether we can change the signature to List[AbstractStream]
390    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
391        """
392        The `streams` method is used as part of the AbstractSource in the following cases:
393        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
394        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
395        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
396
397        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
398        """
399
400        if self._spec_component:
401            self._spec_component.validate_config(self._config)
402
403        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
404
405        api_budget_model = self._source_config.get("api_budget")
406        if api_budget_model:
407            self._constructor.set_api_budget(api_budget_model, self._config)
408
409        source_streams = [
410            self._constructor.create_component(
411                (
412                    StateDelegatingStreamModel
413                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
414                    else DeclarativeStreamModel
415                ),
416                stream_config,
417                self._config,
418                emit_connector_builder_messages=self._emit_connector_builder_messages,
419            )
420            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
421        ]
422        return source_streams
423
424    @staticmethod
425    def _initialize_cache_for_parent_streams(
426        stream_configs: List[Dict[str, Any]],
427    ) -> List[Dict[str, Any]]:
428        """Enable caching for parent streams unless explicitly disabled.
429
430        Caching is enabled by default for parent streams to optimize performance when the same
431        parent data is needed by multiple child streams. However, explicit `use_cache: false`
432        settings are respected for streams that cannot use caching (e.g., scroll-based pagination
433        APIs where caching causes duplicate records).
434        """
435        parent_streams = set()
436
437        def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
438            """Set use_cache to True only if not explicitly disabled."""
439            if requester.get("use_cache") is not False:
440                requester["use_cache"] = True
441
442        def update_with_cache_parent_configs(
443            parent_configs: list[dict[str, Any]],
444        ) -> None:
445            for parent_config in parent_configs:
446                parent_streams.add(parent_config["stream"]["name"])
447                if parent_config["stream"]["type"] == "StateDelegatingStream":
448                    _set_cache_if_not_disabled(
449                        parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
450                    )
451                    _set_cache_if_not_disabled(
452                        parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
453                    )
454                else:
455                    _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
456
457        for stream_config in stream_configs:
458            if stream_config.get("incremental_sync", {}).get("parent_stream"):
459                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
460                _set_cache_if_not_disabled(
461                    stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
462                )
463
464            elif stream_config.get("retriever", {}).get("partition_router", {}):
465                partition_router = stream_config["retriever"]["partition_router"]
466
467                if isinstance(partition_router, dict) and partition_router.get(
468                    "parent_stream_configs"
469                ):
470                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
471                elif isinstance(partition_router, list):
472                    for router in partition_router:
473                        if router.get("parent_stream_configs"):
474                            update_with_cache_parent_configs(router["parent_stream_configs"])
475
476        for stream_config in stream_configs:
477            if stream_config["name"] in parent_streams:
478                if stream_config["type"] == "StateDelegatingStream":
479                    _set_cache_if_not_disabled(
480                        stream_config["full_refresh_stream"]["retriever"]["requester"]
481                    )
482                    _set_cache_if_not_disabled(
483                        stream_config["incremental_stream"]["retriever"]["requester"]
484                    )
485                else:
486                    _set_cache_if_not_disabled(stream_config["retriever"]["requester"])
487        return stream_configs
488
489    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
490        """
491        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
492        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
493        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
494        in the project root.
495        """
496        return (
497            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
498        )
499
500    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
501        check = self._source_config.get("check")
502        if not check:
503            raise ValueError(f"Missing 'check' component definition within the manifest.")
504
505        if "type" not in check:
506            check["type"] = "CheckStream"
507        connection_checker = self._constructor.create_component(
508            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
509            check,
510            dict(),
511            emit_connector_builder_messages=self._emit_connector_builder_messages,
512        )
513        if not isinstance(connection_checker, ConnectionChecker):
514            raise ValueError(
515                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
516            )
517
518        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
519        if not check_succeeded:
520            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
521        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
522
523    @property
524    def dynamic_streams(self) -> List[Dict[str, Any]]:
525        return self._dynamic_stream_configs(
526            manifest=self._source_config,
527            with_dynamic_stream_name=True,
528        )
529
530    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
531        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
532        stream_configs = []
533        for current_stream_config in manifest.get("streams", []):
534            if (
535                "type" in current_stream_config
536                and current_stream_config["type"] == "ConditionalStreams"
537            ):
538                interpolated_boolean = InterpolatedBoolean(
539                    condition=current_stream_config.get("condition"),
540                    parameters={},
541                )
542
543                if interpolated_boolean.eval(config=self._config):
544                    stream_configs.extend(current_stream_config.get("streams", []))
545            else:
546                if "type" not in current_stream_config:
547                    current_stream_config["type"] = "DeclarativeStream"
548                stream_configs.append(current_stream_config)
549        return stream_configs
550
551    def _dynamic_stream_configs(
552        self,
553        manifest: Mapping[str, Any],
554        with_dynamic_stream_name: Optional[bool] = None,
555    ) -> List[Dict[str, Any]]:
556        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
557        dynamic_stream_configs: List[Dict[str, Any]] = []
558        seen_dynamic_streams: Set[str] = set()
559
560        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
561            components_resolver_config = dynamic_definition["components_resolver"]
562
563            if not components_resolver_config:
564                raise ValueError(
565                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
566                )
567
568            resolver_type = components_resolver_config.get("type")
569            if not resolver_type:
570                raise ValueError(
571                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
572                )
573
574            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
575                raise ValueError(
576                    f"Invalid components resolver type '{resolver_type}'. "
577                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
578                )
579
580            if "retriever" in components_resolver_config:
581                components_resolver_config["retriever"]["requester"]["use_cache"] = True
582
583            # Create a resolver for dynamic components based on type
584            if resolver_type == "HttpComponentsResolver":
585                components_resolver = self._constructor.create_component(
586                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
587                    component_definition=components_resolver_config,
588                    config=self._config,
589                    stream_name=dynamic_definition.get("name"),
590                )
591            else:
592                components_resolver = self._constructor.create_component(
593                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
594                    component_definition=components_resolver_config,
595                    config=self._config,
596                )
597
598            stream_template_config = dynamic_definition["stream_template"]
599
600            for dynamic_stream in components_resolver.resolve_components(
601                stream_template_config=stream_template_config
602            ):
603                # Get the use_parent_parameters configuration from the dynamic definition
604                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
605                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
606
607                dynamic_stream = {
608                    **ManifestComponentTransformer().propagate_types_and_parameters(
609                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
610                    )
611                }
612
613                if "type" not in dynamic_stream:
614                    dynamic_stream["type"] = "DeclarativeStream"
615
616                # Ensure that each stream is created with a unique name
617                name = dynamic_stream.get("name")
618
619                if with_dynamic_stream_name:
620                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
621                        "name", f"dynamic_stream_{dynamic_definition_index}"
622                    )
623
624                if not isinstance(name, str):
625                    raise ValueError(
626                        f"Expected stream name {name} to be a string, got {type(name)}."
627                    )
628
629                if name in seen_dynamic_streams:
630                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
631                    failure_type = FailureType.system_error
632
633                    if resolver_type == "ConfigComponentsResolver":
634                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
635                        failure_type = FailureType.config_error
636
637                    raise AirbyteTracedException(
638                        message=error_message,
639                        internal_message=error_message,
640                        failure_type=failure_type,
641                    )
642
643                seen_dynamic_streams.add(name)
644                dynamic_stream_configs.append(dynamic_stream)
645
646        return dynamic_stream_configs
647
648    def _select_streams(
649        self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
650    ) -> List[AbstractStream]:
651        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
652        abstract_streams: List[AbstractStream] = []
653        for configured_stream in configured_catalog.streams:
654            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
655            if stream_instance:
656                abstract_streams.append(stream_instance)
657            else:
658                # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
659                # the source was configured with raise_exception_on_missing_stream=True. This was used on very
660                # few sources like facebook-marketing and google-ads. We decided not to port this feature over,
661                # but we can do so if we feel it necessary. With the current behavior,we should still result
662                # in a partial failure since missing streams will be marked as INCOMPLETE.
663                self._message_repository.emit_message(
664                    as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
665                )
666        return abstract_streams

Helper class that provides a standard way to create an ABC using inheritance.

ConcurrentDeclarativeSource( catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None, *, source_config: Mapping[str, Any], debug: bool = False, emit_connector_builder_messages: bool = False, migrate_manifest: bool = False, normalize_manifest: bool = False, limits: Optional[TestLimits] = None, config_path: Optional[str] = None, **kwargs: Any)
134    def __init__(
135        self,
136        catalog: Optional[ConfiguredAirbyteCatalog] = None,
137        config: Optional[Mapping[str, Any]] = None,
138        state: Optional[List[AirbyteStateMessage]] = None,
139        *,
140        source_config: ConnectionDefinition,
141        debug: bool = False,
142        emit_connector_builder_messages: bool = False,
143        migrate_manifest: bool = False,
144        normalize_manifest: bool = False,
145        limits: Optional[TestLimits] = None,
146        config_path: Optional[str] = None,
147        **kwargs: Any,
148    ) -> None:
149        self.logger = logging.getLogger(f"airbyte.{self.name}")
150
151        self._limits = limits
152
153        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
154        #  no longer needs to store the original incoming state. But maybe there's an edge case?
155        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
156
157        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
158        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
159        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
160        # information and might even need to be configurable depending on the source
161        queue: Queue[QueueItem] = Queue(maxsize=10_000)
162        message_repository = InMemoryMessageRepository(
163            Level.DEBUG if emit_connector_builder_messages else Level.INFO
164        )
165
166        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
167        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
168        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
169        # incremental streams running in full refresh.
170        component_factory = ModelToComponentFactory(
171            emit_connector_builder_messages=emit_connector_builder_messages,
172            message_repository=ConcurrentMessageRepository(queue, message_repository),
173            configured_catalog=catalog,
174            connector_state_manager=self._connector_state_manager,
175            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
176            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
177            limit_slices_fetched=limits.max_slices if limits else None,
178            disable_retries=True if limits else False,
179            disable_cache=True if limits else False,
180        )
181
182        self._should_normalize = normalize_manifest
183        self._should_migrate = migrate_manifest
184        self._declarative_component_schema = _get_declarative_component_schema()
185        # If custom components are needed, locate and/or register them.
186        self.components_module: ModuleType | None = get_registered_components_module(config=config)
187        # set additional attributes
188        self._debug = debug
189        self._emit_connector_builder_messages = emit_connector_builder_messages
190        self._constructor = (
191            component_factory
192            if component_factory
193            else ModelToComponentFactory(
194                emit_connector_builder_messages=emit_connector_builder_messages,
195                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
196            )
197        )
198
199        self._message_repository = self._constructor.get_message_repository()
200        self._slice_logger: SliceLogger = (
201            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
202        )
203
204        # resolve all components in the manifest
205        self._source_config = self._pre_process_manifest(dict(source_config))
206        # validate resolved manifest against the declarative component schema
207        self._validate_source()
208        # apply additional post-processing to the manifest
209        self._post_process_manifest()
210
211        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
212        self._spec_component: Optional[Spec] = (
213            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
214        )
215        self._config = self._migrate_and_transform_config(config_path, config) or {}
216
217        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
218        if concurrency_level_from_manifest:
219            concurrency_level_component = self._constructor.create_component(
220                model_type=ConcurrencyLevelModel,
221                component_definition=concurrency_level_from_manifest,
222                config=config or {},
223            )
224            if not isinstance(concurrency_level_component, ConcurrencyLevel):
225                raise ValueError(
226                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
227                )
228
229            concurrency_level = concurrency_level_component.get_concurrency_level()
230            initial_number_of_partitions_to_generate = max(
231                concurrency_level // 2, 1
232            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
233        else:
234            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
235            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
236
237        self._concurrent_source = ConcurrentSource.create(
238            num_workers=concurrency_level,
239            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
240            logger=self.logger,
241            slice_logger=self._slice_logger,
242            queue=queue,
243            message_repository=self._message_repository,
244        )
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
351    @property
352    def resolved_manifest(self) -> Mapping[str, Any]:
353        """
354        Returns the resolved manifest configuration for the source.
355
356        This property provides access to the internal source configuration as a mapping,
357        which contains all settings and parameters required to define the source's behavior.
358
359        Returns:
360            Mapping[str, Any]: The resolved source configuration manifest.
361        """
362        return self._source_config

Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.

Returns:

Mapping[str, Any]: The resolved source configuration manifest.

def deprecation_warnings(self) -> List[airbyte_cdk.connector_builder.models.LogMessage]:
364    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
365        return self._constructor.get_model_deprecations()
def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
367    def read(
368        self,
369        logger: logging.Logger,
370        config: Mapping[str, Any],
371        catalog: ConfiguredAirbyteCatalog,
372        state: Optional[List[AirbyteStateMessage]] = None,
373    ) -> Iterator[AirbyteMessage]:
374        selected_concurrent_streams = self._select_streams(
375            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
376            configured_catalog=catalog,
377        )
378
379        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
380        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
381        if len(selected_concurrent_streams) > 0:
382            yield from self._concurrent_source.read(selected_concurrent_streams)

Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
384    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
385        return AirbyteCatalog(
386            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
387        )

Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream]:
390    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
391        """
392        The `streams` method is used as part of the AbstractSource in the following cases:
393        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
394        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
395        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
396
397        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
398        """
399
400        if self._spec_component:
401            self._spec_component.validate_config(self._config)
402
403        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
404
405        api_budget_model = self._source_config.get("api_budget")
406        if api_budget_model:
407            self._constructor.set_api_budget(api_budget_model, self._config)
408
409        source_streams = [
410            self._constructor.create_component(
411                (
412                    StateDelegatingStreamModel
413                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
414                    else DeclarativeStreamModel
415                ),
416                stream_config,
417                self._config,
418                emit_connector_builder_messages=self._emit_connector_builder_messages,
419            )
420            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
421        ]
422        return source_streams

The streams method is used as part of the AbstractSource in the following cases:

  • ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
  • ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by streams) Note that super.streams(config) is also called when splitting the streams between concurrent or not in _group_streams.

In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
489    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
490        """
491        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
492        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
493        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
494        in the project root.
495        """
496        return (
497            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
498        )

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
500    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
501        check = self._source_config.get("check")
502        if not check:
503            raise ValueError(f"Missing 'check' component definition within the manifest.")
504
505        if "type" not in check:
506            check["type"] = "CheckStream"
507        connection_checker = self._constructor.create_component(
508            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
509            check,
510            dict(),
511            emit_connector_builder_messages=self._emit_connector_builder_messages,
512        )
513        if not isinstance(connection_checker, ConnectionChecker):
514            raise ValueError(
515                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
516            )
517
518        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
519        if not check_succeeded:
520            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
521        return AirbyteConnectionStatus(status=Status.SUCCEEDED)

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

dynamic_streams: List[Dict[str, Any]]
523    @property
524    def dynamic_streams(self) -> List[Dict[str, Any]]:
525        return self._dynamic_stream_configs(
526            manifest=self._source_config,
527            with_dynamic_stream_name=True,
528        )