airbyte_cdk.sources.declarative.concurrent_declarative_source

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2
  3import json
  4import logging
  5import pkgutil
  6from copy import deepcopy
  7from dataclasses import dataclass, field
  8from queue import Queue
  9from types import ModuleType
 10from typing import (
 11    Any,
 12    ClassVar,
 13    Dict,
 14    Iterator,
 15    List,
 16    Mapping,
 17    Optional,
 18    Set,
 19)
 20
 21import orjson
 22import yaml
 23from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor
 24from jsonschema.exceptions import ValidationError
 25from jsonschema.validators import validate
 26
 27from airbyte_cdk.config_observation import create_connector_config_control_message
 28from airbyte_cdk.connector_builder.models import (
 29    LogMessage as ConnectorBuilderLogMessage,
 30)
 31from airbyte_cdk.manifest_migrations.migration_handler import (
 32    ManifestMigrationHandler,
 33)
 34from airbyte_cdk.models import (
 35    AirbyteCatalog,
 36    AirbyteConnectionStatus,
 37    AirbyteMessage,
 38    AirbyteStateMessage,
 39    ConfiguredAirbyteCatalog,
 40    ConnectorSpecification,
 41    FailureType,
 42    Status,
 43)
 44from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
 45from airbyte_cdk.sources import Source
 46from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 47from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 48from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 49from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 50from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 51from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
 52from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 53    ConcurrencyLevel as ConcurrencyLevelModel,
 54)
 55from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 56    DeclarativeStream as DeclarativeStreamModel,
 57)
 58from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 59    Spec as SpecModel,
 60)
 61from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 62    StateDelegatingStream as StateDelegatingStreamModel,
 63)
 64from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 65    get_registered_components_module,
 66)
 67from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 68    ManifestComponentTransformer,
 69)
 70from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
 71    ManifestNormalizer,
 72)
 73from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 74    ManifestReferenceResolver,
 75)
 76from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 77    ModelToComponentFactory,
 78)
 79from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 80from airbyte_cdk.sources.declarative.spec.spec import Spec
 81from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
 82from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
 83from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
 84from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 85from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
 86from airbyte_cdk.sources.utils.slice_logger import (
 87    AlwaysLogSliceLogger,
 88    DebugSliceLogger,
 89    SliceLogger,
 90)
 91from airbyte_cdk.utils.stream_status_utils import as_airbyte_message
 92from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 93
 94
 95@dataclass
 96class TestLimits:
 97    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 98
 99    DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
100    DEFAULT_MAX_SLICES: ClassVar[int] = 5
101    DEFAULT_MAX_RECORDS: ClassVar[int] = 100
102    DEFAULT_MAX_STREAMS: ClassVar[int] = 100
103
104    max_records: int = field(default=DEFAULT_MAX_RECORDS)
105    max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE)
106    max_slices: int = field(default=DEFAULT_MAX_SLICES)
107    max_streams: int = field(default=DEFAULT_MAX_STREAMS)
108
109
110def _get_declarative_component_schema() -> Dict[str, Any]:
111    try:
112        raw_component_schema = pkgutil.get_data(
113            "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
114        )
115        if raw_component_schema is not None:
116            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
117            return declarative_component_schema  # type: ignore
118        else:
119            raise RuntimeError(
120                "Failed to read manifest component json schema required for deduplication"
121            )
122    except FileNotFoundError as e:
123        raise FileNotFoundError(
124            f"Failed to read manifest component json schema required for deduplication: {e}"
125        )
126
127
128class ConcurrentDeclarativeSource(Source):
129    # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
130    # because it has hit the limit of futures but not partition reader is consuming them.
131    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
132
133    def __init__(
134        self,
135        catalog: Optional[ConfiguredAirbyteCatalog] = None,
136        config: Optional[Mapping[str, Any]] = None,
137        state: Optional[List[AirbyteStateMessage]] = None,
138        *,
139        source_config: ConnectionDefinition,
140        debug: bool = False,
141        emit_connector_builder_messages: bool = False,
142        migrate_manifest: bool = False,
143        normalize_manifest: bool = False,
144        limits: Optional[TestLimits] = None,
145        config_path: Optional[str] = None,
146        **kwargs: Any,
147    ) -> None:
148        self.logger = logging.getLogger(f"airbyte.{self.name}")
149
150        self._limits = limits
151
152        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
153        #  no longer needs to store the original incoming state. But maybe there's an edge case?
154        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
155
156        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
157        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
158        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
159        # information and might even need to be configurable depending on the source
160        queue: Queue[QueueItem] = Queue(maxsize=10_000)
161        message_repository = InMemoryMessageRepository(
162            Level.DEBUG if emit_connector_builder_messages else Level.INFO
163        )
164
165        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
166        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
167        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
168        # incremental streams running in full refresh.
169        component_factory = ModelToComponentFactory(
170            emit_connector_builder_messages=emit_connector_builder_messages,
171            message_repository=ConcurrentMessageRepository(queue, message_repository),
172            connector_state_manager=self._connector_state_manager,
173            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
174            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
175            limit_slices_fetched=limits.max_slices if limits else None,
176            disable_retries=True if limits else False,
177            disable_cache=True if limits else False,
178        )
179
180        self._should_normalize = normalize_manifest
181        self._should_migrate = migrate_manifest
182        self._declarative_component_schema = _get_declarative_component_schema()
183        # If custom components are needed, locate and/or register them.
184        self.components_module: ModuleType | None = get_registered_components_module(config=config)
185        # set additional attributes
186        self._debug = debug
187        self._emit_connector_builder_messages = emit_connector_builder_messages
188        self._constructor = (
189            component_factory
190            if component_factory
191            else ModelToComponentFactory(
192                emit_connector_builder_messages=emit_connector_builder_messages,
193                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
194            )
195        )
196
197        self._message_repository = self._constructor.get_message_repository()
198        self._slice_logger: SliceLogger = (
199            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
200        )
201
202        # resolve all components in the manifest
203        self._source_config = self._pre_process_manifest(dict(source_config))
204        # validate resolved manifest against the declarative component schema
205        self._validate_source()
206        # apply additional post-processing to the manifest
207        self._post_process_manifest()
208
209        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
210        self._spec_component: Optional[Spec] = (
211            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
212        )
213        self._config = self._migrate_and_transform_config(config_path, config) or {}
214
215        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
216        if concurrency_level_from_manifest:
217            concurrency_level_component = self._constructor.create_component(
218                model_type=ConcurrencyLevelModel,
219                component_definition=concurrency_level_from_manifest,
220                config=config or {},
221            )
222            if not isinstance(concurrency_level_component, ConcurrencyLevel):
223                raise ValueError(
224                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
225                )
226
227            concurrency_level = concurrency_level_component.get_concurrency_level()
228            initial_number_of_partitions_to_generate = max(
229                concurrency_level // 2, 1
230            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
231        else:
232            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
233            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
234
235        self._concurrent_source = ConcurrentSource.create(
236            num_workers=concurrency_level,
237            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
238            logger=self.logger,
239            slice_logger=self._slice_logger,
240            queue=queue,
241            message_repository=self._message_repository,
242        )
243
244    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
245        """
246        Preprocesses the provided manifest dictionary by resolving any manifest references.
247
248        This method modifies the input manifest in place, resolving references using the
249        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
250
251        Args:
252            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
253
254        Returns:
255            None
256        """
257        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
258        manifest = self._fix_source_type(manifest)
259        # Resolve references in the manifest
260        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
261        # Propagate types and parameters throughout the manifest
262        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
263            "", resolved_manifest, {}
264        )
265
266        return propagated_manifest
267
268    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
269        """
270        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
271        """
272        if "type" not in manifest:
273            manifest["type"] = "DeclarativeSource"
274
275        return manifest
276
277    def _post_process_manifest(self) -> None:
278        """
279        Post-processes the manifest after validation.
280        This method is responsible for any additional modifications or transformations needed
281        after the manifest has been validated and before it is used in the source.
282        """
283        # apply manifest migration, if required
284        self._migrate_manifest()
285        # apply manifest normalization, if required
286        self._normalize_manifest()
287
288    def _migrate_manifest(self) -> None:
289        """
290        This method is used to migrate the manifest. It should be called after the manifest has been validated.
291        The migration is done in place, so the original manifest is modified.
292
293        The original manifest is returned if any error occurs during migration.
294        """
295        if self._should_migrate:
296            manifest_migrator = ManifestMigrationHandler(self._source_config)
297            self._source_config = manifest_migrator.apply_migrations()
298            # validate migrated manifest against the declarative component schema
299            self._validate_source()
300
301    def _normalize_manifest(self) -> None:
302        """
303        This method is used to normalize the manifest. It should be called after the manifest has been validated.
304
305        Connector Builder UI rendering requires the manifest to be in a specific format.
306         - references have been resolved
307         - the commonly used definitions are extracted to the `definitions.linked.*`
308        """
309        if self._should_normalize:
310            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
311            self._source_config = normalizer.normalize()
312
313    def _validate_source(self) -> None:
314        """
315        Validates the connector manifest against the declarative component schema
316        """
317
318        try:
319            validate(self._source_config, self._declarative_component_schema)
320        except ValidationError as e:
321            raise ValidationError(
322                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
323            ) from e
324
325    def _migrate_and_transform_config(
326        self,
327        config_path: Optional[str],
328        config: Optional[Config],
329    ) -> Optional[Config]:
330        if not config:
331            return None
332        if not self._spec_component:
333            return config
334        mutable_config = dict(config)
335        self._spec_component.migrate_config(mutable_config)
336        if mutable_config != config:
337            if config_path:
338                with open(config_path, "w") as f:
339                    json.dump(mutable_config, f)
340            control_message = create_connector_config_control_message(mutable_config)
341            print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode())
342        self._spec_component.transform_config(mutable_config)
343        return mutable_config
344
345    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
346        config = self._config or config
347        return super().configure(config, temp_dir)
348
349    @property
350    def resolved_manifest(self) -> Mapping[str, Any]:
351        """
352        Returns the resolved manifest configuration for the source.
353
354        This property provides access to the internal source configuration as a mapping,
355        which contains all settings and parameters required to define the source's behavior.
356
357        Returns:
358            Mapping[str, Any]: The resolved source configuration manifest.
359        """
360        return self._source_config
361
362    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
363        return self._constructor.get_model_deprecations()
364
365    def read(
366        self,
367        logger: logging.Logger,
368        config: Mapping[str, Any],
369        catalog: ConfiguredAirbyteCatalog,
370        state: Optional[List[AirbyteStateMessage]] = None,
371    ) -> Iterator[AirbyteMessage]:
372        selected_concurrent_streams = self._select_streams(
373            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
374            configured_catalog=catalog,
375        )
376
377        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
378        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
379        if len(selected_concurrent_streams) > 0:
380            yield from self._concurrent_source.read(selected_concurrent_streams)
381
382    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
383        return AirbyteCatalog(
384            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
385        )
386
387    # todo: add PR comment about whether we can change the signature to List[AbstractStream]
388    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
389        """
390        The `streams` method is used as part of the AbstractSource in the following cases:
391        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
392        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
393        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
394
395        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
396        """
397
398        if self._spec_component:
399            self._spec_component.validate_config(self._config)
400
401        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
402
403        api_budget_model = self._source_config.get("api_budget")
404        if api_budget_model:
405            self._constructor.set_api_budget(api_budget_model, self._config)
406
407        source_streams = [
408            self._constructor.create_component(
409                (
410                    StateDelegatingStreamModel
411                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
412                    else DeclarativeStreamModel
413                ),
414                stream_config,
415                self._config,
416                emit_connector_builder_messages=self._emit_connector_builder_messages,
417            )
418            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
419        ]
420        return source_streams
421
422    @staticmethod
423    def _initialize_cache_for_parent_streams(
424        stream_configs: List[Dict[str, Any]],
425    ) -> List[Dict[str, Any]]:
426        parent_streams = set()
427
428        def update_with_cache_parent_configs(
429            parent_configs: list[dict[str, Any]],
430        ) -> None:
431            for parent_config in parent_configs:
432                parent_streams.add(parent_config["stream"]["name"])
433                if parent_config["stream"]["type"] == "StateDelegatingStream":
434                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
435                        "use_cache"
436                    ] = True
437                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
438                        "use_cache"
439                    ] = True
440                else:
441                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
442
443        for stream_config in stream_configs:
444            if stream_config.get("incremental_sync", {}).get("parent_stream"):
445                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
446                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
447                    "use_cache"
448                ] = True
449
450            elif stream_config.get("retriever", {}).get("partition_router", {}):
451                partition_router = stream_config["retriever"]["partition_router"]
452
453                if isinstance(partition_router, dict) and partition_router.get(
454                    "parent_stream_configs"
455                ):
456                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
457                elif isinstance(partition_router, list):
458                    for router in partition_router:
459                        if router.get("parent_stream_configs"):
460                            update_with_cache_parent_configs(router["parent_stream_configs"])
461
462        for stream_config in stream_configs:
463            if stream_config["name"] in parent_streams:
464                if stream_config["type"] == "StateDelegatingStream":
465                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
466                        True
467                    )
468                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
469                        True
470                    )
471                else:
472                    stream_config["retriever"]["requester"]["use_cache"] = True
473        return stream_configs
474
475    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
476        """
477        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
478        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
479        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
480        in the project root.
481        """
482        return (
483            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
484        )
485
486    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
487        check = self._source_config.get("check")
488        if not check:
489            raise ValueError(f"Missing 'check' component definition within the manifest.")
490
491        if "type" not in check:
492            check["type"] = "CheckStream"
493        connection_checker = self._constructor.create_component(
494            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
495            check,
496            dict(),
497            emit_connector_builder_messages=self._emit_connector_builder_messages,
498        )
499        if not isinstance(connection_checker, ConnectionChecker):
500            raise ValueError(
501                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
502            )
503
504        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
505        if not check_succeeded:
506            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
507        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
508
509    @property
510    def dynamic_streams(self) -> List[Dict[str, Any]]:
511        return self._dynamic_stream_configs(
512            manifest=self._source_config,
513            with_dynamic_stream_name=True,
514        )
515
516    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
517        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
518        stream_configs = []
519        for current_stream_config in manifest.get("streams", []):
520            if (
521                "type" in current_stream_config
522                and current_stream_config["type"] == "ConditionalStreams"
523            ):
524                interpolated_boolean = InterpolatedBoolean(
525                    condition=current_stream_config.get("condition"),
526                    parameters={},
527                )
528
529                if interpolated_boolean.eval(config=self._config):
530                    stream_configs.extend(current_stream_config.get("streams", []))
531            else:
532                if "type" not in current_stream_config:
533                    current_stream_config["type"] = "DeclarativeStream"
534                stream_configs.append(current_stream_config)
535        return stream_configs
536
537    def _dynamic_stream_configs(
538        self,
539        manifest: Mapping[str, Any],
540        with_dynamic_stream_name: Optional[bool] = None,
541    ) -> List[Dict[str, Any]]:
542        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
543        dynamic_stream_configs: List[Dict[str, Any]] = []
544        seen_dynamic_streams: Set[str] = set()
545
546        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
547            components_resolver_config = dynamic_definition["components_resolver"]
548
549            if not components_resolver_config:
550                raise ValueError(
551                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
552                )
553
554            resolver_type = components_resolver_config.get("type")
555            if not resolver_type:
556                raise ValueError(
557                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
558                )
559
560            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
561                raise ValueError(
562                    f"Invalid components resolver type '{resolver_type}'. "
563                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
564                )
565
566            if "retriever" in components_resolver_config:
567                components_resolver_config["retriever"]["requester"]["use_cache"] = True
568
569            # Create a resolver for dynamic components based on type
570            if resolver_type == "HttpComponentsResolver":
571                components_resolver = self._constructor.create_component(
572                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
573                    component_definition=components_resolver_config,
574                    config=self._config,
575                    stream_name=dynamic_definition.get("name"),
576                )
577            else:
578                components_resolver = self._constructor.create_component(
579                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
580                    component_definition=components_resolver_config,
581                    config=self._config,
582                )
583
584            stream_template_config = dynamic_definition["stream_template"]
585
586            for dynamic_stream in components_resolver.resolve_components(
587                stream_template_config=stream_template_config
588            ):
589                # Get the use_parent_parameters configuration from the dynamic definition
590                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
591                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
592
593                dynamic_stream = {
594                    **ManifestComponentTransformer().propagate_types_and_parameters(
595                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
596                    )
597                }
598
599                if "type" not in dynamic_stream:
600                    dynamic_stream["type"] = "DeclarativeStream"
601
602                # Ensure that each stream is created with a unique name
603                name = dynamic_stream.get("name")
604
605                if with_dynamic_stream_name:
606                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
607                        "name", f"dynamic_stream_{dynamic_definition_index}"
608                    )
609
610                if not isinstance(name, str):
611                    raise ValueError(
612                        f"Expected stream name {name} to be a string, got {type(name)}."
613                    )
614
615                if name in seen_dynamic_streams:
616                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
617                    failure_type = FailureType.system_error
618
619                    if resolver_type == "ConfigComponentsResolver":
620                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
621                        failure_type = FailureType.config_error
622
623                    raise AirbyteTracedException(
624                        message=error_message,
625                        internal_message=error_message,
626                        failure_type=failure_type,
627                    )
628
629                seen_dynamic_streams.add(name)
630                dynamic_stream_configs.append(dynamic_stream)
631
632        return dynamic_stream_configs
633
634    def _select_streams(
635        self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
636    ) -> List[AbstractStream]:
637        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
638        abstract_streams: List[AbstractStream] = []
639        for configured_stream in configured_catalog.streams:
640            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
641            if stream_instance:
642                abstract_streams.append(stream_instance)
643            else:
644                # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
645                # the source was configured with raise_exception_on_missing_stream=True. This was used on very
646                # few sources like facebook-marketing and google-ads. We decided not to port this feature over,
647                # but we can do so if we feel it necessary. With the current behavior,we should still result
648                # in a partial failure since missing streams will be marked as INCOMPLETE.
649                self._message_repository.emit_message(
650                    as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
651                )
652        return abstract_streams
@dataclass
class TestLimits:
 96@dataclass
 97class TestLimits:
 98    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 99
100    DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
101    DEFAULT_MAX_SLICES: ClassVar[int] = 5
102    DEFAULT_MAX_RECORDS: ClassVar[int] = 100
103    DEFAULT_MAX_STREAMS: ClassVar[int] = 100
104
105    max_records: int = field(default=DEFAULT_MAX_RECORDS)
106    max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE)
107    max_slices: int = field(default=DEFAULT_MAX_SLICES)
108    max_streams: int = field(default=DEFAULT_MAX_STREAMS)
TestLimits( max_records: int = 100, max_pages_per_slice: int = 5, max_slices: int = 5, max_streams: int = 100)
DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
DEFAULT_MAX_SLICES: ClassVar[int] = 5
DEFAULT_MAX_RECORDS: ClassVar[int] = 100
DEFAULT_MAX_STREAMS: ClassVar[int] = 100
max_records: int = 100
max_pages_per_slice: int = 5
max_slices: int = 5
max_streams: int = 100
129class ConcurrentDeclarativeSource(Source):
130    # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
131    # because it has hit the limit of futures but not partition reader is consuming them.
132    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
133
134    def __init__(
135        self,
136        catalog: Optional[ConfiguredAirbyteCatalog] = None,
137        config: Optional[Mapping[str, Any]] = None,
138        state: Optional[List[AirbyteStateMessage]] = None,
139        *,
140        source_config: ConnectionDefinition,
141        debug: bool = False,
142        emit_connector_builder_messages: bool = False,
143        migrate_manifest: bool = False,
144        normalize_manifest: bool = False,
145        limits: Optional[TestLimits] = None,
146        config_path: Optional[str] = None,
147        **kwargs: Any,
148    ) -> None:
149        self.logger = logging.getLogger(f"airbyte.{self.name}")
150
151        self._limits = limits
152
153        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
154        #  no longer needs to store the original incoming state. But maybe there's an edge case?
155        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
156
157        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
158        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
159        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
160        # information and might even need to be configurable depending on the source
161        queue: Queue[QueueItem] = Queue(maxsize=10_000)
162        message_repository = InMemoryMessageRepository(
163            Level.DEBUG if emit_connector_builder_messages else Level.INFO
164        )
165
166        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
167        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
168        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
169        # incremental streams running in full refresh.
170        component_factory = ModelToComponentFactory(
171            emit_connector_builder_messages=emit_connector_builder_messages,
172            message_repository=ConcurrentMessageRepository(queue, message_repository),
173            connector_state_manager=self._connector_state_manager,
174            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
175            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
176            limit_slices_fetched=limits.max_slices if limits else None,
177            disable_retries=True if limits else False,
178            disable_cache=True if limits else False,
179        )
180
181        self._should_normalize = normalize_manifest
182        self._should_migrate = migrate_manifest
183        self._declarative_component_schema = _get_declarative_component_schema()
184        # If custom components are needed, locate and/or register them.
185        self.components_module: ModuleType | None = get_registered_components_module(config=config)
186        # set additional attributes
187        self._debug = debug
188        self._emit_connector_builder_messages = emit_connector_builder_messages
189        self._constructor = (
190            component_factory
191            if component_factory
192            else ModelToComponentFactory(
193                emit_connector_builder_messages=emit_connector_builder_messages,
194                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
195            )
196        )
197
198        self._message_repository = self._constructor.get_message_repository()
199        self._slice_logger: SliceLogger = (
200            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
201        )
202
203        # resolve all components in the manifest
204        self._source_config = self._pre_process_manifest(dict(source_config))
205        # validate resolved manifest against the declarative component schema
206        self._validate_source()
207        # apply additional post-processing to the manifest
208        self._post_process_manifest()
209
210        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
211        self._spec_component: Optional[Spec] = (
212            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
213        )
214        self._config = self._migrate_and_transform_config(config_path, config) or {}
215
216        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
217        if concurrency_level_from_manifest:
218            concurrency_level_component = self._constructor.create_component(
219                model_type=ConcurrencyLevelModel,
220                component_definition=concurrency_level_from_manifest,
221                config=config or {},
222            )
223            if not isinstance(concurrency_level_component, ConcurrencyLevel):
224                raise ValueError(
225                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
226                )
227
228            concurrency_level = concurrency_level_component.get_concurrency_level()
229            initial_number_of_partitions_to_generate = max(
230                concurrency_level // 2, 1
231            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
232        else:
233            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
234            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
235
236        self._concurrent_source = ConcurrentSource.create(
237            num_workers=concurrency_level,
238            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
239            logger=self.logger,
240            slice_logger=self._slice_logger,
241            queue=queue,
242            message_repository=self._message_repository,
243        )
244
245    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
246        """
247        Preprocesses the provided manifest dictionary by resolving any manifest references.
248
249        This method modifies the input manifest in place, resolving references using the
250        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
251
252        Args:
253            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
254
255        Returns:
256            None
257        """
258        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
259        manifest = self._fix_source_type(manifest)
260        # Resolve references in the manifest
261        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
262        # Propagate types and parameters throughout the manifest
263        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
264            "", resolved_manifest, {}
265        )
266
267        return propagated_manifest
268
269    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
270        """
271        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
272        """
273        if "type" not in manifest:
274            manifest["type"] = "DeclarativeSource"
275
276        return manifest
277
278    def _post_process_manifest(self) -> None:
279        """
280        Post-processes the manifest after validation.
281        This method is responsible for any additional modifications or transformations needed
282        after the manifest has been validated and before it is used in the source.
283        """
284        # apply manifest migration, if required
285        self._migrate_manifest()
286        # apply manifest normalization, if required
287        self._normalize_manifest()
288
289    def _migrate_manifest(self) -> None:
290        """
291        This method is used to migrate the manifest. It should be called after the manifest has been validated.
292        The migration is done in place, so the original manifest is modified.
293
294        The original manifest is returned if any error occurs during migration.
295        """
296        if self._should_migrate:
297            manifest_migrator = ManifestMigrationHandler(self._source_config)
298            self._source_config = manifest_migrator.apply_migrations()
299            # validate migrated manifest against the declarative component schema
300            self._validate_source()
301
302    def _normalize_manifest(self) -> None:
303        """
304        This method is used to normalize the manifest. It should be called after the manifest has been validated.
305
306        Connector Builder UI rendering requires the manifest to be in a specific format.
307         - references have been resolved
308         - the commonly used definitions are extracted to the `definitions.linked.*`
309        """
310        if self._should_normalize:
311            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
312            self._source_config = normalizer.normalize()
313
314    def _validate_source(self) -> None:
315        """
316        Validates the connector manifest against the declarative component schema
317        """
318
319        try:
320            validate(self._source_config, self._declarative_component_schema)
321        except ValidationError as e:
322            raise ValidationError(
323                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
324            ) from e
325
326    def _migrate_and_transform_config(
327        self,
328        config_path: Optional[str],
329        config: Optional[Config],
330    ) -> Optional[Config]:
331        if not config:
332            return None
333        if not self._spec_component:
334            return config
335        mutable_config = dict(config)
336        self._spec_component.migrate_config(mutable_config)
337        if mutable_config != config:
338            if config_path:
339                with open(config_path, "w") as f:
340                    json.dump(mutable_config, f)
341            control_message = create_connector_config_control_message(mutable_config)
342            print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode())
343        self._spec_component.transform_config(mutable_config)
344        return mutable_config
345
346    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
347        config = self._config or config
348        return super().configure(config, temp_dir)
349
350    @property
351    def resolved_manifest(self) -> Mapping[str, Any]:
352        """
353        Returns the resolved manifest configuration for the source.
354
355        This property provides access to the internal source configuration as a mapping,
356        which contains all settings and parameters required to define the source's behavior.
357
358        Returns:
359            Mapping[str, Any]: The resolved source configuration manifest.
360        """
361        return self._source_config
362
363    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
364        return self._constructor.get_model_deprecations()
365
366    def read(
367        self,
368        logger: logging.Logger,
369        config: Mapping[str, Any],
370        catalog: ConfiguredAirbyteCatalog,
371        state: Optional[List[AirbyteStateMessage]] = None,
372    ) -> Iterator[AirbyteMessage]:
373        selected_concurrent_streams = self._select_streams(
374            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
375            configured_catalog=catalog,
376        )
377
378        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
379        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
380        if len(selected_concurrent_streams) > 0:
381            yield from self._concurrent_source.read(selected_concurrent_streams)
382
383    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
384        return AirbyteCatalog(
385            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
386        )
387
388    # todo: add PR comment about whether we can change the signature to List[AbstractStream]
389    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
390        """
391        The `streams` method is used as part of the AbstractSource in the following cases:
392        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
393        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
394        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
395
396        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
397        """
398
399        if self._spec_component:
400            self._spec_component.validate_config(self._config)
401
402        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
403
404        api_budget_model = self._source_config.get("api_budget")
405        if api_budget_model:
406            self._constructor.set_api_budget(api_budget_model, self._config)
407
408        source_streams = [
409            self._constructor.create_component(
410                (
411                    StateDelegatingStreamModel
412                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
413                    else DeclarativeStreamModel
414                ),
415                stream_config,
416                self._config,
417                emit_connector_builder_messages=self._emit_connector_builder_messages,
418            )
419            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
420        ]
421        return source_streams
422
423    @staticmethod
424    def _initialize_cache_for_parent_streams(
425        stream_configs: List[Dict[str, Any]],
426    ) -> List[Dict[str, Any]]:
427        parent_streams = set()
428
429        def update_with_cache_parent_configs(
430            parent_configs: list[dict[str, Any]],
431        ) -> None:
432            for parent_config in parent_configs:
433                parent_streams.add(parent_config["stream"]["name"])
434                if parent_config["stream"]["type"] == "StateDelegatingStream":
435                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
436                        "use_cache"
437                    ] = True
438                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
439                        "use_cache"
440                    ] = True
441                else:
442                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
443
444        for stream_config in stream_configs:
445            if stream_config.get("incremental_sync", {}).get("parent_stream"):
446                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
447                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
448                    "use_cache"
449                ] = True
450
451            elif stream_config.get("retriever", {}).get("partition_router", {}):
452                partition_router = stream_config["retriever"]["partition_router"]
453
454                if isinstance(partition_router, dict) and partition_router.get(
455                    "parent_stream_configs"
456                ):
457                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
458                elif isinstance(partition_router, list):
459                    for router in partition_router:
460                        if router.get("parent_stream_configs"):
461                            update_with_cache_parent_configs(router["parent_stream_configs"])
462
463        for stream_config in stream_configs:
464            if stream_config["name"] in parent_streams:
465                if stream_config["type"] == "StateDelegatingStream":
466                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
467                        True
468                    )
469                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
470                        True
471                    )
472                else:
473                    stream_config["retriever"]["requester"]["use_cache"] = True
474        return stream_configs
475
476    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
477        """
478        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
479        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
480        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
481        in the project root.
482        """
483        return (
484            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
485        )
486
487    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
488        check = self._source_config.get("check")
489        if not check:
490            raise ValueError(f"Missing 'check' component definition within the manifest.")
491
492        if "type" not in check:
493            check["type"] = "CheckStream"
494        connection_checker = self._constructor.create_component(
495            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
496            check,
497            dict(),
498            emit_connector_builder_messages=self._emit_connector_builder_messages,
499        )
500        if not isinstance(connection_checker, ConnectionChecker):
501            raise ValueError(
502                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
503            )
504
505        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
506        if not check_succeeded:
507            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
508        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
509
510    @property
511    def dynamic_streams(self) -> List[Dict[str, Any]]:
512        return self._dynamic_stream_configs(
513            manifest=self._source_config,
514            with_dynamic_stream_name=True,
515        )
516
517    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
518        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
519        stream_configs = []
520        for current_stream_config in manifest.get("streams", []):
521            if (
522                "type" in current_stream_config
523                and current_stream_config["type"] == "ConditionalStreams"
524            ):
525                interpolated_boolean = InterpolatedBoolean(
526                    condition=current_stream_config.get("condition"),
527                    parameters={},
528                )
529
530                if interpolated_boolean.eval(config=self._config):
531                    stream_configs.extend(current_stream_config.get("streams", []))
532            else:
533                if "type" not in current_stream_config:
534                    current_stream_config["type"] = "DeclarativeStream"
535                stream_configs.append(current_stream_config)
536        return stream_configs
537
538    def _dynamic_stream_configs(
539        self,
540        manifest: Mapping[str, Any],
541        with_dynamic_stream_name: Optional[bool] = None,
542    ) -> List[Dict[str, Any]]:
543        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
544        dynamic_stream_configs: List[Dict[str, Any]] = []
545        seen_dynamic_streams: Set[str] = set()
546
547        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
548            components_resolver_config = dynamic_definition["components_resolver"]
549
550            if not components_resolver_config:
551                raise ValueError(
552                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
553                )
554
555            resolver_type = components_resolver_config.get("type")
556            if not resolver_type:
557                raise ValueError(
558                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
559                )
560
561            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
562                raise ValueError(
563                    f"Invalid components resolver type '{resolver_type}'. "
564                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
565                )
566
567            if "retriever" in components_resolver_config:
568                components_resolver_config["retriever"]["requester"]["use_cache"] = True
569
570            # Create a resolver for dynamic components based on type
571            if resolver_type == "HttpComponentsResolver":
572                components_resolver = self._constructor.create_component(
573                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
574                    component_definition=components_resolver_config,
575                    config=self._config,
576                    stream_name=dynamic_definition.get("name"),
577                )
578            else:
579                components_resolver = self._constructor.create_component(
580                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
581                    component_definition=components_resolver_config,
582                    config=self._config,
583                )
584
585            stream_template_config = dynamic_definition["stream_template"]
586
587            for dynamic_stream in components_resolver.resolve_components(
588                stream_template_config=stream_template_config
589            ):
590                # Get the use_parent_parameters configuration from the dynamic definition
591                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
592                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
593
594                dynamic_stream = {
595                    **ManifestComponentTransformer().propagate_types_and_parameters(
596                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
597                    )
598                }
599
600                if "type" not in dynamic_stream:
601                    dynamic_stream["type"] = "DeclarativeStream"
602
603                # Ensure that each stream is created with a unique name
604                name = dynamic_stream.get("name")
605
606                if with_dynamic_stream_name:
607                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
608                        "name", f"dynamic_stream_{dynamic_definition_index}"
609                    )
610
611                if not isinstance(name, str):
612                    raise ValueError(
613                        f"Expected stream name {name} to be a string, got {type(name)}."
614                    )
615
616                if name in seen_dynamic_streams:
617                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
618                    failure_type = FailureType.system_error
619
620                    if resolver_type == "ConfigComponentsResolver":
621                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
622                        failure_type = FailureType.config_error
623
624                    raise AirbyteTracedException(
625                        message=error_message,
626                        internal_message=error_message,
627                        failure_type=failure_type,
628                    )
629
630                seen_dynamic_streams.add(name)
631                dynamic_stream_configs.append(dynamic_stream)
632
633        return dynamic_stream_configs
634
635    def _select_streams(
636        self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
637    ) -> List[AbstractStream]:
638        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
639        abstract_streams: List[AbstractStream] = []
640        for configured_stream in configured_catalog.streams:
641            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
642            if stream_instance:
643                abstract_streams.append(stream_instance)
644            else:
645                # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
646                # the source was configured with raise_exception_on_missing_stream=True. This was used on very
647                # few sources like facebook-marketing and google-ads. We decided not to port this feature over,
648                # but we can do so if we feel it necessary. With the current behavior,we should still result
649                # in a partial failure since missing streams will be marked as INCOMPLETE.
650                self._message_repository.emit_message(
651                    as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
652                )
653        return abstract_streams

Helper class that provides a standard way to create an ABC using inheritance.

ConcurrentDeclarativeSource( catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None, *, source_config: Mapping[str, Any], debug: bool = False, emit_connector_builder_messages: bool = False, migrate_manifest: bool = False, normalize_manifest: bool = False, limits: Optional[TestLimits] = None, config_path: Optional[str] = None, **kwargs: Any)
134    def __init__(
135        self,
136        catalog: Optional[ConfiguredAirbyteCatalog] = None,
137        config: Optional[Mapping[str, Any]] = None,
138        state: Optional[List[AirbyteStateMessage]] = None,
139        *,
140        source_config: ConnectionDefinition,
141        debug: bool = False,
142        emit_connector_builder_messages: bool = False,
143        migrate_manifest: bool = False,
144        normalize_manifest: bool = False,
145        limits: Optional[TestLimits] = None,
146        config_path: Optional[str] = None,
147        **kwargs: Any,
148    ) -> None:
149        self.logger = logging.getLogger(f"airbyte.{self.name}")
150
151        self._limits = limits
152
153        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
154        #  no longer needs to store the original incoming state. But maybe there's an edge case?
155        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
156
157        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
158        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
159        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
160        # information and might even need to be configurable depending on the source
161        queue: Queue[QueueItem] = Queue(maxsize=10_000)
162        message_repository = InMemoryMessageRepository(
163            Level.DEBUG if emit_connector_builder_messages else Level.INFO
164        )
165
166        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
167        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
168        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
169        # incremental streams running in full refresh.
170        component_factory = ModelToComponentFactory(
171            emit_connector_builder_messages=emit_connector_builder_messages,
172            message_repository=ConcurrentMessageRepository(queue, message_repository),
173            connector_state_manager=self._connector_state_manager,
174            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
175            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
176            limit_slices_fetched=limits.max_slices if limits else None,
177            disable_retries=True if limits else False,
178            disable_cache=True if limits else False,
179        )
180
181        self._should_normalize = normalize_manifest
182        self._should_migrate = migrate_manifest
183        self._declarative_component_schema = _get_declarative_component_schema()
184        # If custom components are needed, locate and/or register them.
185        self.components_module: ModuleType | None = get_registered_components_module(config=config)
186        # set additional attributes
187        self._debug = debug
188        self._emit_connector_builder_messages = emit_connector_builder_messages
189        self._constructor = (
190            component_factory
191            if component_factory
192            else ModelToComponentFactory(
193                emit_connector_builder_messages=emit_connector_builder_messages,
194                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
195            )
196        )
197
198        self._message_repository = self._constructor.get_message_repository()
199        self._slice_logger: SliceLogger = (
200            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
201        )
202
203        # resolve all components in the manifest
204        self._source_config = self._pre_process_manifest(dict(source_config))
205        # validate resolved manifest against the declarative component schema
206        self._validate_source()
207        # apply additional post-processing to the manifest
208        self._post_process_manifest()
209
210        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
211        self._spec_component: Optional[Spec] = (
212            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
213        )
214        self._config = self._migrate_and_transform_config(config_path, config) or {}
215
216        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
217        if concurrency_level_from_manifest:
218            concurrency_level_component = self._constructor.create_component(
219                model_type=ConcurrencyLevelModel,
220                component_definition=concurrency_level_from_manifest,
221                config=config or {},
222            )
223            if not isinstance(concurrency_level_component, ConcurrencyLevel):
224                raise ValueError(
225                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
226                )
227
228            concurrency_level = concurrency_level_component.get_concurrency_level()
229            initial_number_of_partitions_to_generate = max(
230                concurrency_level // 2, 1
231            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
232        else:
233            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
234            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
235
236        self._concurrent_source = ConcurrentSource.create(
237            num_workers=concurrency_level,
238            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
239            logger=self.logger,
240            slice_logger=self._slice_logger,
241            queue=queue,
242            message_repository=self._message_repository,
243        )
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
350    @property
351    def resolved_manifest(self) -> Mapping[str, Any]:
352        """
353        Returns the resolved manifest configuration for the source.
354
355        This property provides access to the internal source configuration as a mapping,
356        which contains all settings and parameters required to define the source's behavior.
357
358        Returns:
359            Mapping[str, Any]: The resolved source configuration manifest.
360        """
361        return self._source_config

Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.

Returns:

Mapping[str, Any]: The resolved source configuration manifest.

def deprecation_warnings(self) -> List[airbyte_cdk.connector_builder.models.LogMessage]:
363    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
364        return self._constructor.get_model_deprecations()
def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
366    def read(
367        self,
368        logger: logging.Logger,
369        config: Mapping[str, Any],
370        catalog: ConfiguredAirbyteCatalog,
371        state: Optional[List[AirbyteStateMessage]] = None,
372    ) -> Iterator[AirbyteMessage]:
373        selected_concurrent_streams = self._select_streams(
374            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
375            configured_catalog=catalog,
376        )
377
378        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
379        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
380        if len(selected_concurrent_streams) > 0:
381            yield from self._concurrent_source.read(selected_concurrent_streams)

Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
383    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
384        return AirbyteCatalog(
385            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
386        )

Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream]:
389    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
390        """
391        The `streams` method is used as part of the AbstractSource in the following cases:
392        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
393        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
394        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
395
396        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
397        """
398
399        if self._spec_component:
400            self._spec_component.validate_config(self._config)
401
402        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
403
404        api_budget_model = self._source_config.get("api_budget")
405        if api_budget_model:
406            self._constructor.set_api_budget(api_budget_model, self._config)
407
408        source_streams = [
409            self._constructor.create_component(
410                (
411                    StateDelegatingStreamModel
412                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
413                    else DeclarativeStreamModel
414                ),
415                stream_config,
416                self._config,
417                emit_connector_builder_messages=self._emit_connector_builder_messages,
418            )
419            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
420        ]
421        return source_streams

The streams method is used as part of the AbstractSource in the following cases:

  • ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
  • ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by streams) Note that super.streams(config) is also called when splitting the streams between concurrent or not in _group_streams.

In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
476    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
477        """
478        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
479        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
480        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
481        in the project root.
482        """
483        return (
484            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
485        )

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
487    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
488        check = self._source_config.get("check")
489        if not check:
490            raise ValueError(f"Missing 'check' component definition within the manifest.")
491
492        if "type" not in check:
493            check["type"] = "CheckStream"
494        connection_checker = self._constructor.create_component(
495            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
496            check,
497            dict(),
498            emit_connector_builder_messages=self._emit_connector_builder_messages,
499        )
500        if not isinstance(connection_checker, ConnectionChecker):
501            raise ValueError(
502                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
503            )
504
505        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
506        if not check_succeeded:
507            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
508        return AirbyteConnectionStatus(status=Status.SUCCEEDED)

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

dynamic_streams: List[Dict[str, Any]]
510    @property
511    def dynamic_streams(self) -> List[Dict[str, Any]]:
512        return self._dynamic_stream_configs(
513            manifest=self._source_config,
514            with_dynamic_stream_name=True,
515        )