airbyte_cdk.sources.declarative.concurrent_declarative_source

  1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  2
  3import json
  4import logging
  5import pkgutil
  6from copy import deepcopy
  7from dataclasses import dataclass, field
  8from queue import Queue
  9from types import ModuleType
 10from typing import (
 11    Any,
 12    ClassVar,
 13    Dict,
 14    Iterator,
 15    List,
 16    Mapping,
 17    Optional,
 18    Set,
 19)
 20
 21import orjson
 22import yaml
 23from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor
 24from jsonschema.exceptions import ValidationError
 25from jsonschema.validators import validate
 26
 27from airbyte_cdk.config_observation import create_connector_config_control_message
 28from airbyte_cdk.connector_builder.models import (
 29    LogMessage as ConnectorBuilderLogMessage,
 30)
 31from airbyte_cdk.manifest_migrations.migration_handler import (
 32    ManifestMigrationHandler,
 33)
 34from airbyte_cdk.models import (
 35    AirbyteCatalog,
 36    AirbyteConnectionStatus,
 37    AirbyteMessage,
 38    AirbyteStateMessage,
 39    ConfiguredAirbyteCatalog,
 40    ConnectorSpecification,
 41    FailureType,
 42    Status,
 43)
 44from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
 45from airbyte_cdk.sources import Source
 46from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 47from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 48from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 49from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 50from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 51from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
 52from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 53    ConcurrencyLevel as ConcurrencyLevelModel,
 54)
 55from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 56    DeclarativeStream as DeclarativeStreamModel,
 57)
 58from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 59    Spec as SpecModel,
 60)
 61from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 62    StateDelegatingStream as StateDelegatingStreamModel,
 63)
 64from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 65    get_registered_components_module,
 66)
 67from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 68    ManifestComponentTransformer,
 69)
 70from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
 71    ManifestNormalizer,
 72)
 73from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 74    ManifestReferenceResolver,
 75)
 76from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 77    ModelToComponentFactory,
 78)
 79from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 80from airbyte_cdk.sources.declarative.spec.spec import Spec
 81from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
 82from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
 83from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
 84from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 85from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
 86from airbyte_cdk.sources.utils.slice_logger import (
 87    AlwaysLogSliceLogger,
 88    DebugSliceLogger,
 89    SliceLogger,
 90)
 91from airbyte_cdk.utils.stream_status_utils import as_airbyte_message
 92from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 93
 94
 95@dataclass
 96class TestLimits:
 97    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 98
 99    DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
100    DEFAULT_MAX_SLICES: ClassVar[int] = 5
101    DEFAULT_MAX_RECORDS: ClassVar[int] = 100
102    DEFAULT_MAX_STREAMS: ClassVar[int] = 100
103
104    max_records: int = field(default=DEFAULT_MAX_RECORDS)
105    max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE)
106    max_slices: int = field(default=DEFAULT_MAX_SLICES)
107    max_streams: int = field(default=DEFAULT_MAX_STREAMS)
108
109
110def _get_declarative_component_schema() -> Dict[str, Any]:
111    try:
112        raw_component_schema = pkgutil.get_data(
113            "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
114        )
115        if raw_component_schema is not None:
116            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
117            return declarative_component_schema  # type: ignore
118        else:
119            raise RuntimeError(
120                "Failed to read manifest component json schema required for deduplication"
121            )
122    except FileNotFoundError as e:
123        raise FileNotFoundError(
124            f"Failed to read manifest component json schema required for deduplication: {e}"
125        )
126
127
128class ConcurrentDeclarativeSource(Source):
129    # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
130    # because it has hit the limit of futures but not partition reader is consuming them.
131    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
132
133    def __init__(
134        self,
135        catalog: Optional[ConfiguredAirbyteCatalog] = None,
136        config: Optional[Mapping[str, Any]] = None,
137        state: Optional[List[AirbyteStateMessage]] = None,
138        *,
139        source_config: ConnectionDefinition,
140        debug: bool = False,
141        emit_connector_builder_messages: bool = False,
142        migrate_manifest: bool = False,
143        normalize_manifest: bool = False,
144        limits: Optional[TestLimits] = None,
145        config_path: Optional[str] = None,
146        **kwargs: Any,
147    ) -> None:
148        self.logger = logging.getLogger(f"airbyte.{self.name}")
149
150        self._limits = limits
151
152        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
153        #  no longer needs to store the original incoming state. But maybe there's an edge case?
154        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
155
156        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
157        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
158        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
159        # information and might even need to be configurable depending on the source
160        queue: Queue[QueueItem] = Queue(maxsize=10_000)
161        message_repository = InMemoryMessageRepository(
162            Level.DEBUG if emit_connector_builder_messages else Level.INFO
163        )
164
165        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
166        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
167        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
168        # incremental streams running in full refresh.
169        component_factory = ModelToComponentFactory(
170            emit_connector_builder_messages=emit_connector_builder_messages,
171            message_repository=ConcurrentMessageRepository(queue, message_repository),
172            configured_catalog=catalog,
173            connector_state_manager=self._connector_state_manager,
174            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
175            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
176            limit_slices_fetched=limits.max_slices if limits else None,
177            disable_retries=True if limits else False,
178            disable_cache=True if limits else False,
179        )
180
181        self._should_normalize = normalize_manifest
182        self._should_migrate = migrate_manifest
183        self._declarative_component_schema = _get_declarative_component_schema()
184        # If custom components are needed, locate and/or register them.
185        self.components_module: ModuleType | None = get_registered_components_module(config=config)
186        # set additional attributes
187        self._debug = debug
188        self._emit_connector_builder_messages = emit_connector_builder_messages
189        self._constructor = (
190            component_factory
191            if component_factory
192            else ModelToComponentFactory(
193                emit_connector_builder_messages=emit_connector_builder_messages,
194                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
195            )
196        )
197
198        self._message_repository = self._constructor.get_message_repository()
199        self._slice_logger: SliceLogger = (
200            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
201        )
202
203        # resolve all components in the manifest
204        self._source_config = self._pre_process_manifest(dict(source_config))
205        # validate resolved manifest against the declarative component schema
206        self._validate_source()
207        # apply additional post-processing to the manifest
208        self._post_process_manifest()
209
210        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
211        self._spec_component: Optional[Spec] = (
212            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
213        )
214        self._config = self._migrate_and_transform_config(config_path, config) or {}
215
216        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
217        if concurrency_level_from_manifest:
218            concurrency_level_component = self._constructor.create_component(
219                model_type=ConcurrencyLevelModel,
220                component_definition=concurrency_level_from_manifest,
221                config=config or {},
222            )
223            if not isinstance(concurrency_level_component, ConcurrencyLevel):
224                raise ValueError(
225                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
226                )
227
228            concurrency_level = concurrency_level_component.get_concurrency_level()
229            initial_number_of_partitions_to_generate = max(
230                concurrency_level // 2, 1
231            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
232        else:
233            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
234            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
235
236        self._concurrent_source = ConcurrentSource.create(
237            num_workers=concurrency_level,
238            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
239            logger=self.logger,
240            slice_logger=self._slice_logger,
241            queue=queue,
242            message_repository=self._message_repository,
243        )
244
245    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
246        """
247        Preprocesses the provided manifest dictionary by resolving any manifest references.
248
249        This method modifies the input manifest in place, resolving references using the
250        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
251
252        Args:
253            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
254
255        Returns:
256            None
257        """
258        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
259        manifest = self._fix_source_type(manifest)
260        # Resolve references in the manifest
261        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
262        # Propagate types and parameters throughout the manifest
263        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
264            "", resolved_manifest, {}
265        )
266
267        return propagated_manifest
268
269    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
270        """
271        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
272        """
273        if "type" not in manifest:
274            manifest["type"] = "DeclarativeSource"
275
276        return manifest
277
278    def _post_process_manifest(self) -> None:
279        """
280        Post-processes the manifest after validation.
281        This method is responsible for any additional modifications or transformations needed
282        after the manifest has been validated and before it is used in the source.
283        """
284        # apply manifest migration, if required
285        self._migrate_manifest()
286        # apply manifest normalization, if required
287        self._normalize_manifest()
288
289    def _migrate_manifest(self) -> None:
290        """
291        This method is used to migrate the manifest. It should be called after the manifest has been validated.
292        The migration is done in place, so the original manifest is modified.
293
294        The original manifest is returned if any error occurs during migration.
295        """
296        if self._should_migrate:
297            manifest_migrator = ManifestMigrationHandler(self._source_config)
298            self._source_config = manifest_migrator.apply_migrations()
299            # validate migrated manifest against the declarative component schema
300            self._validate_source()
301
302    def _normalize_manifest(self) -> None:
303        """
304        This method is used to normalize the manifest. It should be called after the manifest has been validated.
305
306        Connector Builder UI rendering requires the manifest to be in a specific format.
307         - references have been resolved
308         - the commonly used definitions are extracted to the `definitions.linked.*`
309        """
310        if self._should_normalize:
311            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
312            self._source_config = normalizer.normalize()
313
314    def _validate_source(self) -> None:
315        """
316        Validates the connector manifest against the declarative component schema
317        """
318
319        try:
320            validate(self._source_config, self._declarative_component_schema)
321        except ValidationError as e:
322            raise ValidationError(
323                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
324            ) from e
325
326    def _migrate_and_transform_config(
327        self,
328        config_path: Optional[str],
329        config: Optional[Config],
330    ) -> Optional[Config]:
331        if not config:
332            return None
333        if not self._spec_component:
334            return config
335        mutable_config = dict(config)
336        self._spec_component.migrate_config(mutable_config)
337        if mutable_config != config:
338            if config_path:
339                with open(config_path, "w") as f:
340                    json.dump(mutable_config, f)
341            control_message = create_connector_config_control_message(mutable_config)
342            print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode())
343        self._spec_component.transform_config(mutable_config)
344        return mutable_config
345
346    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
347        config = self._config or config
348        return super().configure(config, temp_dir)
349
350    @property
351    def resolved_manifest(self) -> Mapping[str, Any]:
352        """
353        Returns the resolved manifest configuration for the source.
354
355        This property provides access to the internal source configuration as a mapping,
356        which contains all settings and parameters required to define the source's behavior.
357
358        Returns:
359            Mapping[str, Any]: The resolved source configuration manifest.
360        """
361        return self._source_config
362
363    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
364        return self._constructor.get_model_deprecations()
365
366    def read(
367        self,
368        logger: logging.Logger,
369        config: Mapping[str, Any],
370        catalog: ConfiguredAirbyteCatalog,
371        state: Optional[List[AirbyteStateMessage]] = None,
372    ) -> Iterator[AirbyteMessage]:
373        selected_concurrent_streams = self._select_streams(
374            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
375            configured_catalog=catalog,
376        )
377
378        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
379        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
380        if len(selected_concurrent_streams) > 0:
381            yield from self._concurrent_source.read(selected_concurrent_streams)
382
383    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
384        return AirbyteCatalog(
385            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
386        )
387
388    # todo: add PR comment about whether we can change the signature to List[AbstractStream]
389    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
390        """
391        The `streams` method is used as part of the AbstractSource in the following cases:
392        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
393        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
394        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
395
396        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
397        """
398
399        if self._spec_component:
400            self._spec_component.validate_config(self._config)
401
402        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
403
404        api_budget_model = self._source_config.get("api_budget")
405        if api_budget_model:
406            self._constructor.set_api_budget(api_budget_model, self._config)
407
408        source_streams = [
409            self._constructor.create_component(
410                (
411                    StateDelegatingStreamModel
412                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
413                    else DeclarativeStreamModel
414                ),
415                stream_config,
416                self._config,
417                emit_connector_builder_messages=self._emit_connector_builder_messages,
418            )
419            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
420        ]
421        return source_streams
422
423    @staticmethod
424    def _initialize_cache_for_parent_streams(
425        stream_configs: List[Dict[str, Any]],
426    ) -> List[Dict[str, Any]]:
427        parent_streams = set()
428
429        def update_with_cache_parent_configs(
430            parent_configs: list[dict[str, Any]],
431        ) -> None:
432            for parent_config in parent_configs:
433                parent_streams.add(parent_config["stream"]["name"])
434                if parent_config["stream"]["type"] == "StateDelegatingStream":
435                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
436                        "use_cache"
437                    ] = True
438                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
439                        "use_cache"
440                    ] = True
441                else:
442                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
443
444        for stream_config in stream_configs:
445            if stream_config.get("incremental_sync", {}).get("parent_stream"):
446                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
447                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
448                    "use_cache"
449                ] = True
450
451            elif stream_config.get("retriever", {}).get("partition_router", {}):
452                partition_router = stream_config["retriever"]["partition_router"]
453
454                if isinstance(partition_router, dict) and partition_router.get(
455                    "parent_stream_configs"
456                ):
457                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
458                elif isinstance(partition_router, list):
459                    for router in partition_router:
460                        if router.get("parent_stream_configs"):
461                            update_with_cache_parent_configs(router["parent_stream_configs"])
462
463        for stream_config in stream_configs:
464            if stream_config["name"] in parent_streams:
465                if stream_config["type"] == "StateDelegatingStream":
466                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
467                        True
468                    )
469                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
470                        True
471                    )
472                else:
473                    stream_config["retriever"]["requester"]["use_cache"] = True
474        return stream_configs
475
476    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
477        """
478        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
479        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
480        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
481        in the project root.
482        """
483        return (
484            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
485        )
486
487    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
488        check = self._source_config.get("check")
489        if not check:
490            raise ValueError(f"Missing 'check' component definition within the manifest.")
491
492        if "type" not in check:
493            check["type"] = "CheckStream"
494        connection_checker = self._constructor.create_component(
495            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
496            check,
497            dict(),
498            emit_connector_builder_messages=self._emit_connector_builder_messages,
499        )
500        if not isinstance(connection_checker, ConnectionChecker):
501            raise ValueError(
502                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
503            )
504
505        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
506        if not check_succeeded:
507            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
508        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
509
510    @property
511    def dynamic_streams(self) -> List[Dict[str, Any]]:
512        return self._dynamic_stream_configs(
513            manifest=self._source_config,
514            with_dynamic_stream_name=True,
515        )
516
517    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
518        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
519        stream_configs = []
520        for current_stream_config in manifest.get("streams", []):
521            if (
522                "type" in current_stream_config
523                and current_stream_config["type"] == "ConditionalStreams"
524            ):
525                interpolated_boolean = InterpolatedBoolean(
526                    condition=current_stream_config.get("condition"),
527                    parameters={},
528                )
529
530                if interpolated_boolean.eval(config=self._config):
531                    stream_configs.extend(current_stream_config.get("streams", []))
532            else:
533                if "type" not in current_stream_config:
534                    current_stream_config["type"] = "DeclarativeStream"
535                stream_configs.append(current_stream_config)
536        return stream_configs
537
538    def _dynamic_stream_configs(
539        self,
540        manifest: Mapping[str, Any],
541        with_dynamic_stream_name: Optional[bool] = None,
542    ) -> List[Dict[str, Any]]:
543        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
544        dynamic_stream_configs: List[Dict[str, Any]] = []
545        seen_dynamic_streams: Set[str] = set()
546
547        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
548            components_resolver_config = dynamic_definition["components_resolver"]
549
550            if not components_resolver_config:
551                raise ValueError(
552                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
553                )
554
555            resolver_type = components_resolver_config.get("type")
556            if not resolver_type:
557                raise ValueError(
558                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
559                )
560
561            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
562                raise ValueError(
563                    f"Invalid components resolver type '{resolver_type}'. "
564                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
565                )
566
567            if "retriever" in components_resolver_config:
568                components_resolver_config["retriever"]["requester"]["use_cache"] = True
569
570            # Create a resolver for dynamic components based on type
571            if resolver_type == "HttpComponentsResolver":
572                components_resolver = self._constructor.create_component(
573                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
574                    component_definition=components_resolver_config,
575                    config=self._config,
576                    stream_name=dynamic_definition.get("name"),
577                )
578            else:
579                components_resolver = self._constructor.create_component(
580                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
581                    component_definition=components_resolver_config,
582                    config=self._config,
583                )
584
585            stream_template_config = dynamic_definition["stream_template"]
586
587            for dynamic_stream in components_resolver.resolve_components(
588                stream_template_config=stream_template_config
589            ):
590                # Get the use_parent_parameters configuration from the dynamic definition
591                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
592                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
593
594                dynamic_stream = {
595                    **ManifestComponentTransformer().propagate_types_and_parameters(
596                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
597                    )
598                }
599
600                if "type" not in dynamic_stream:
601                    dynamic_stream["type"] = "DeclarativeStream"
602
603                # Ensure that each stream is created with a unique name
604                name = dynamic_stream.get("name")
605
606                if with_dynamic_stream_name:
607                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
608                        "name", f"dynamic_stream_{dynamic_definition_index}"
609                    )
610
611                if not isinstance(name, str):
612                    raise ValueError(
613                        f"Expected stream name {name} to be a string, got {type(name)}."
614                    )
615
616                if name in seen_dynamic_streams:
617                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
618                    failure_type = FailureType.system_error
619
620                    if resolver_type == "ConfigComponentsResolver":
621                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
622                        failure_type = FailureType.config_error
623
624                    raise AirbyteTracedException(
625                        message=error_message,
626                        internal_message=error_message,
627                        failure_type=failure_type,
628                    )
629
630                seen_dynamic_streams.add(name)
631                dynamic_stream_configs.append(dynamic_stream)
632
633        return dynamic_stream_configs
634
635    def _select_streams(
636        self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
637    ) -> List[AbstractStream]:
638        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
639        abstract_streams: List[AbstractStream] = []
640        for configured_stream in configured_catalog.streams:
641            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
642            if stream_instance:
643                abstract_streams.append(stream_instance)
644            else:
645                # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
646                # the source was configured with raise_exception_on_missing_stream=True. This was used on very
647                # few sources like facebook-marketing and google-ads. We decided not to port this feature over,
648                # but we can do so if we feel it necessary. With the current behavior,we should still result
649                # in a partial failure since missing streams will be marked as INCOMPLETE.
650                self._message_repository.emit_message(
651                    as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
652                )
653        return abstract_streams
@dataclass
class TestLimits:
 96@dataclass
 97class TestLimits:
 98    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 99
100    DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
101    DEFAULT_MAX_SLICES: ClassVar[int] = 5
102    DEFAULT_MAX_RECORDS: ClassVar[int] = 100
103    DEFAULT_MAX_STREAMS: ClassVar[int] = 100
104
105    max_records: int = field(default=DEFAULT_MAX_RECORDS)
106    max_pages_per_slice: int = field(default=DEFAULT_MAX_PAGES_PER_SLICE)
107    max_slices: int = field(default=DEFAULT_MAX_SLICES)
108    max_streams: int = field(default=DEFAULT_MAX_STREAMS)
TestLimits( max_records: int = 100, max_pages_per_slice: int = 5, max_slices: int = 5, max_streams: int = 100)
DEFAULT_MAX_PAGES_PER_SLICE: ClassVar[int] = 5
DEFAULT_MAX_SLICES: ClassVar[int] = 5
DEFAULT_MAX_RECORDS: ClassVar[int] = 100
DEFAULT_MAX_STREAMS: ClassVar[int] = 100
max_records: int = 100
max_pages_per_slice: int = 5
max_slices: int = 5
max_streams: int = 100
129class ConcurrentDeclarativeSource(Source):
130    # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock
131    # because it has hit the limit of futures but not partition reader is consuming them.
132    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
133
134    def __init__(
135        self,
136        catalog: Optional[ConfiguredAirbyteCatalog] = None,
137        config: Optional[Mapping[str, Any]] = None,
138        state: Optional[List[AirbyteStateMessage]] = None,
139        *,
140        source_config: ConnectionDefinition,
141        debug: bool = False,
142        emit_connector_builder_messages: bool = False,
143        migrate_manifest: bool = False,
144        normalize_manifest: bool = False,
145        limits: Optional[TestLimits] = None,
146        config_path: Optional[str] = None,
147        **kwargs: Any,
148    ) -> None:
149        self.logger = logging.getLogger(f"airbyte.{self.name}")
150
151        self._limits = limits
152
153        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
154        #  no longer needs to store the original incoming state. But maybe there's an edge case?
155        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
156
157        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
158        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
159        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
160        # information and might even need to be configurable depending on the source
161        queue: Queue[QueueItem] = Queue(maxsize=10_000)
162        message_repository = InMemoryMessageRepository(
163            Level.DEBUG if emit_connector_builder_messages else Level.INFO
164        )
165
166        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
167        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
168        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
169        # incremental streams running in full refresh.
170        component_factory = ModelToComponentFactory(
171            emit_connector_builder_messages=emit_connector_builder_messages,
172            message_repository=ConcurrentMessageRepository(queue, message_repository),
173            configured_catalog=catalog,
174            connector_state_manager=self._connector_state_manager,
175            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
176            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
177            limit_slices_fetched=limits.max_slices if limits else None,
178            disable_retries=True if limits else False,
179            disable_cache=True if limits else False,
180        )
181
182        self._should_normalize = normalize_manifest
183        self._should_migrate = migrate_manifest
184        self._declarative_component_schema = _get_declarative_component_schema()
185        # If custom components are needed, locate and/or register them.
186        self.components_module: ModuleType | None = get_registered_components_module(config=config)
187        # set additional attributes
188        self._debug = debug
189        self._emit_connector_builder_messages = emit_connector_builder_messages
190        self._constructor = (
191            component_factory
192            if component_factory
193            else ModelToComponentFactory(
194                emit_connector_builder_messages=emit_connector_builder_messages,
195                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
196            )
197        )
198
199        self._message_repository = self._constructor.get_message_repository()
200        self._slice_logger: SliceLogger = (
201            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
202        )
203
204        # resolve all components in the manifest
205        self._source_config = self._pre_process_manifest(dict(source_config))
206        # validate resolved manifest against the declarative component schema
207        self._validate_source()
208        # apply additional post-processing to the manifest
209        self._post_process_manifest()
210
211        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
212        self._spec_component: Optional[Spec] = (
213            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
214        )
215        self._config = self._migrate_and_transform_config(config_path, config) or {}
216
217        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
218        if concurrency_level_from_manifest:
219            concurrency_level_component = self._constructor.create_component(
220                model_type=ConcurrencyLevelModel,
221                component_definition=concurrency_level_from_manifest,
222                config=config or {},
223            )
224            if not isinstance(concurrency_level_component, ConcurrencyLevel):
225                raise ValueError(
226                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
227                )
228
229            concurrency_level = concurrency_level_component.get_concurrency_level()
230            initial_number_of_partitions_to_generate = max(
231                concurrency_level // 2, 1
232            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
233        else:
234            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
235            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
236
237        self._concurrent_source = ConcurrentSource.create(
238            num_workers=concurrency_level,
239            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
240            logger=self.logger,
241            slice_logger=self._slice_logger,
242            queue=queue,
243            message_repository=self._message_repository,
244        )
245
246    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
247        """
248        Preprocesses the provided manifest dictionary by resolving any manifest references.
249
250        This method modifies the input manifest in place, resolving references using the
251        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
252
253        Args:
254            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
255
256        Returns:
257            None
258        """
259        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
260        manifest = self._fix_source_type(manifest)
261        # Resolve references in the manifest
262        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
263        # Propagate types and parameters throughout the manifest
264        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
265            "", resolved_manifest, {}
266        )
267
268        return propagated_manifest
269
270    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
271        """
272        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
273        """
274        if "type" not in manifest:
275            manifest["type"] = "DeclarativeSource"
276
277        return manifest
278
279    def _post_process_manifest(self) -> None:
280        """
281        Post-processes the manifest after validation.
282        This method is responsible for any additional modifications or transformations needed
283        after the manifest has been validated and before it is used in the source.
284        """
285        # apply manifest migration, if required
286        self._migrate_manifest()
287        # apply manifest normalization, if required
288        self._normalize_manifest()
289
290    def _migrate_manifest(self) -> None:
291        """
292        This method is used to migrate the manifest. It should be called after the manifest has been validated.
293        The migration is done in place, so the original manifest is modified.
294
295        The original manifest is returned if any error occurs during migration.
296        """
297        if self._should_migrate:
298            manifest_migrator = ManifestMigrationHandler(self._source_config)
299            self._source_config = manifest_migrator.apply_migrations()
300            # validate migrated manifest against the declarative component schema
301            self._validate_source()
302
303    def _normalize_manifest(self) -> None:
304        """
305        This method is used to normalize the manifest. It should be called after the manifest has been validated.
306
307        Connector Builder UI rendering requires the manifest to be in a specific format.
308         - references have been resolved
309         - the commonly used definitions are extracted to the `definitions.linked.*`
310        """
311        if self._should_normalize:
312            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
313            self._source_config = normalizer.normalize()
314
315    def _validate_source(self) -> None:
316        """
317        Validates the connector manifest against the declarative component schema
318        """
319
320        try:
321            validate(self._source_config, self._declarative_component_schema)
322        except ValidationError as e:
323            raise ValidationError(
324                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
325            ) from e
326
327    def _migrate_and_transform_config(
328        self,
329        config_path: Optional[str],
330        config: Optional[Config],
331    ) -> Optional[Config]:
332        if not config:
333            return None
334        if not self._spec_component:
335            return config
336        mutable_config = dict(config)
337        self._spec_component.migrate_config(mutable_config)
338        if mutable_config != config:
339            if config_path:
340                with open(config_path, "w") as f:
341                    json.dump(mutable_config, f)
342            control_message = create_connector_config_control_message(mutable_config)
343            print(orjson.dumps(AirbyteMessageSerializer.dump(control_message)).decode())
344        self._spec_component.transform_config(mutable_config)
345        return mutable_config
346
347    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
348        config = self._config or config
349        return super().configure(config, temp_dir)
350
351    @property
352    def resolved_manifest(self) -> Mapping[str, Any]:
353        """
354        Returns the resolved manifest configuration for the source.
355
356        This property provides access to the internal source configuration as a mapping,
357        which contains all settings and parameters required to define the source's behavior.
358
359        Returns:
360            Mapping[str, Any]: The resolved source configuration manifest.
361        """
362        return self._source_config
363
364    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
365        return self._constructor.get_model_deprecations()
366
367    def read(
368        self,
369        logger: logging.Logger,
370        config: Mapping[str, Any],
371        catalog: ConfiguredAirbyteCatalog,
372        state: Optional[List[AirbyteStateMessage]] = None,
373    ) -> Iterator[AirbyteMessage]:
374        selected_concurrent_streams = self._select_streams(
375            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
376            configured_catalog=catalog,
377        )
378
379        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
380        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
381        if len(selected_concurrent_streams) > 0:
382            yield from self._concurrent_source.read(selected_concurrent_streams)
383
384    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
385        return AirbyteCatalog(
386            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
387        )
388
389    # todo: add PR comment about whether we can change the signature to List[AbstractStream]
390    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
391        """
392        The `streams` method is used as part of the AbstractSource in the following cases:
393        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
394        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
395        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
396
397        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
398        """
399
400        if self._spec_component:
401            self._spec_component.validate_config(self._config)
402
403        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
404
405        api_budget_model = self._source_config.get("api_budget")
406        if api_budget_model:
407            self._constructor.set_api_budget(api_budget_model, self._config)
408
409        source_streams = [
410            self._constructor.create_component(
411                (
412                    StateDelegatingStreamModel
413                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
414                    else DeclarativeStreamModel
415                ),
416                stream_config,
417                self._config,
418                emit_connector_builder_messages=self._emit_connector_builder_messages,
419            )
420            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
421        ]
422        return source_streams
423
424    @staticmethod
425    def _initialize_cache_for_parent_streams(
426        stream_configs: List[Dict[str, Any]],
427    ) -> List[Dict[str, Any]]:
428        parent_streams = set()
429
430        def update_with_cache_parent_configs(
431            parent_configs: list[dict[str, Any]],
432        ) -> None:
433            for parent_config in parent_configs:
434                parent_streams.add(parent_config["stream"]["name"])
435                if parent_config["stream"]["type"] == "StateDelegatingStream":
436                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
437                        "use_cache"
438                    ] = True
439                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
440                        "use_cache"
441                    ] = True
442                else:
443                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
444
445        for stream_config in stream_configs:
446            if stream_config.get("incremental_sync", {}).get("parent_stream"):
447                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
448                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
449                    "use_cache"
450                ] = True
451
452            elif stream_config.get("retriever", {}).get("partition_router", {}):
453                partition_router = stream_config["retriever"]["partition_router"]
454
455                if isinstance(partition_router, dict) and partition_router.get(
456                    "parent_stream_configs"
457                ):
458                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
459                elif isinstance(partition_router, list):
460                    for router in partition_router:
461                        if router.get("parent_stream_configs"):
462                            update_with_cache_parent_configs(router["parent_stream_configs"])
463
464        for stream_config in stream_configs:
465            if stream_config["name"] in parent_streams:
466                if stream_config["type"] == "StateDelegatingStream":
467                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
468                        True
469                    )
470                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
471                        True
472                    )
473                else:
474                    stream_config["retriever"]["requester"]["use_cache"] = True
475        return stream_configs
476
477    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
478        """
479        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
480        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
481        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
482        in the project root.
483        """
484        return (
485            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
486        )
487
488    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
489        check = self._source_config.get("check")
490        if not check:
491            raise ValueError(f"Missing 'check' component definition within the manifest.")
492
493        if "type" not in check:
494            check["type"] = "CheckStream"
495        connection_checker = self._constructor.create_component(
496            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
497            check,
498            dict(),
499            emit_connector_builder_messages=self._emit_connector_builder_messages,
500        )
501        if not isinstance(connection_checker, ConnectionChecker):
502            raise ValueError(
503                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
504            )
505
506        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
507        if not check_succeeded:
508            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
509        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
510
511    @property
512    def dynamic_streams(self) -> List[Dict[str, Any]]:
513        return self._dynamic_stream_configs(
514            manifest=self._source_config,
515            with_dynamic_stream_name=True,
516        )
517
518    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
519        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
520        stream_configs = []
521        for current_stream_config in manifest.get("streams", []):
522            if (
523                "type" in current_stream_config
524                and current_stream_config["type"] == "ConditionalStreams"
525            ):
526                interpolated_boolean = InterpolatedBoolean(
527                    condition=current_stream_config.get("condition"),
528                    parameters={},
529                )
530
531                if interpolated_boolean.eval(config=self._config):
532                    stream_configs.extend(current_stream_config.get("streams", []))
533            else:
534                if "type" not in current_stream_config:
535                    current_stream_config["type"] = "DeclarativeStream"
536                stream_configs.append(current_stream_config)
537        return stream_configs
538
539    def _dynamic_stream_configs(
540        self,
541        manifest: Mapping[str, Any],
542        with_dynamic_stream_name: Optional[bool] = None,
543    ) -> List[Dict[str, Any]]:
544        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
545        dynamic_stream_configs: List[Dict[str, Any]] = []
546        seen_dynamic_streams: Set[str] = set()
547
548        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
549            components_resolver_config = dynamic_definition["components_resolver"]
550
551            if not components_resolver_config:
552                raise ValueError(
553                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
554                )
555
556            resolver_type = components_resolver_config.get("type")
557            if not resolver_type:
558                raise ValueError(
559                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
560                )
561
562            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
563                raise ValueError(
564                    f"Invalid components resolver type '{resolver_type}'. "
565                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
566                )
567
568            if "retriever" in components_resolver_config:
569                components_resolver_config["retriever"]["requester"]["use_cache"] = True
570
571            # Create a resolver for dynamic components based on type
572            if resolver_type == "HttpComponentsResolver":
573                components_resolver = self._constructor.create_component(
574                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
575                    component_definition=components_resolver_config,
576                    config=self._config,
577                    stream_name=dynamic_definition.get("name"),
578                )
579            else:
580                components_resolver = self._constructor.create_component(
581                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
582                    component_definition=components_resolver_config,
583                    config=self._config,
584                )
585
586            stream_template_config = dynamic_definition["stream_template"]
587
588            for dynamic_stream in components_resolver.resolve_components(
589                stream_template_config=stream_template_config
590            ):
591                # Get the use_parent_parameters configuration from the dynamic definition
592                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
593                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
594
595                dynamic_stream = {
596                    **ManifestComponentTransformer().propagate_types_and_parameters(
597                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
598                    )
599                }
600
601                if "type" not in dynamic_stream:
602                    dynamic_stream["type"] = "DeclarativeStream"
603
604                # Ensure that each stream is created with a unique name
605                name = dynamic_stream.get("name")
606
607                if with_dynamic_stream_name:
608                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
609                        "name", f"dynamic_stream_{dynamic_definition_index}"
610                    )
611
612                if not isinstance(name, str):
613                    raise ValueError(
614                        f"Expected stream name {name} to be a string, got {type(name)}."
615                    )
616
617                if name in seen_dynamic_streams:
618                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
619                    failure_type = FailureType.system_error
620
621                    if resolver_type == "ConfigComponentsResolver":
622                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
623                        failure_type = FailureType.config_error
624
625                    raise AirbyteTracedException(
626                        message=error_message,
627                        internal_message=error_message,
628                        failure_type=failure_type,
629                    )
630
631                seen_dynamic_streams.add(name)
632                dynamic_stream_configs.append(dynamic_stream)
633
634        return dynamic_stream_configs
635
636    def _select_streams(
637        self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
638    ) -> List[AbstractStream]:
639        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
640        abstract_streams: List[AbstractStream] = []
641        for configured_stream in configured_catalog.streams:
642            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
643            if stream_instance:
644                abstract_streams.append(stream_instance)
645            else:
646                # Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
647                # the source was configured with raise_exception_on_missing_stream=True. This was used on very
648                # few sources like facebook-marketing and google-ads. We decided not to port this feature over,
649                # but we can do so if we feel it necessary. With the current behavior,we should still result
650                # in a partial failure since missing streams will be marked as INCOMPLETE.
651                self._message_repository.emit_message(
652                    as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
653                )
654        return abstract_streams

Helper class that provides a standard way to create an ABC using inheritance.

ConcurrentDeclarativeSource( catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None, *, source_config: Mapping[str, Any], debug: bool = False, emit_connector_builder_messages: bool = False, migrate_manifest: bool = False, normalize_manifest: bool = False, limits: Optional[TestLimits] = None, config_path: Optional[str] = None, **kwargs: Any)
134    def __init__(
135        self,
136        catalog: Optional[ConfiguredAirbyteCatalog] = None,
137        config: Optional[Mapping[str, Any]] = None,
138        state: Optional[List[AirbyteStateMessage]] = None,
139        *,
140        source_config: ConnectionDefinition,
141        debug: bool = False,
142        emit_connector_builder_messages: bool = False,
143        migrate_manifest: bool = False,
144        normalize_manifest: bool = False,
145        limits: Optional[TestLimits] = None,
146        config_path: Optional[str] = None,
147        **kwargs: Any,
148    ) -> None:
149        self.logger = logging.getLogger(f"airbyte.{self.name}")
150
151        self._limits = limits
152
153        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
154        #  no longer needs to store the original incoming state. But maybe there's an edge case?
155        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
156
157        # We set a maxsize to for the main thread to process record items when the queue size grows. This assumes that there are less
158        # threads generating partitions that than are max number of workers. If it weren't the case, we could have threads only generating
159        # partitions which would fill the queue. This number is arbitrarily set to 10_000 but will probably need to be changed given more
160        # information and might even need to be configurable depending on the source
161        queue: Queue[QueueItem] = Queue(maxsize=10_000)
162        message_repository = InMemoryMessageRepository(
163            Level.DEBUG if emit_connector_builder_messages else Level.INFO
164        )
165
166        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
167        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
168        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
169        # incremental streams running in full refresh.
170        component_factory = ModelToComponentFactory(
171            emit_connector_builder_messages=emit_connector_builder_messages,
172            message_repository=ConcurrentMessageRepository(queue, message_repository),
173            configured_catalog=catalog,
174            connector_state_manager=self._connector_state_manager,
175            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
176            limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
177            limit_slices_fetched=limits.max_slices if limits else None,
178            disable_retries=True if limits else False,
179            disable_cache=True if limits else False,
180        )
181
182        self._should_normalize = normalize_manifest
183        self._should_migrate = migrate_manifest
184        self._declarative_component_schema = _get_declarative_component_schema()
185        # If custom components are needed, locate and/or register them.
186        self.components_module: ModuleType | None = get_registered_components_module(config=config)
187        # set additional attributes
188        self._debug = debug
189        self._emit_connector_builder_messages = emit_connector_builder_messages
190        self._constructor = (
191            component_factory
192            if component_factory
193            else ModelToComponentFactory(
194                emit_connector_builder_messages=emit_connector_builder_messages,
195                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
196            )
197        )
198
199        self._message_repository = self._constructor.get_message_repository()
200        self._slice_logger: SliceLogger = (
201            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
202        )
203
204        # resolve all components in the manifest
205        self._source_config = self._pre_process_manifest(dict(source_config))
206        # validate resolved manifest against the declarative component schema
207        self._validate_source()
208        # apply additional post-processing to the manifest
209        self._post_process_manifest()
210
211        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
212        self._spec_component: Optional[Spec] = (
213            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
214        )
215        self._config = self._migrate_and_transform_config(config_path, config) or {}
216
217        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
218        if concurrency_level_from_manifest:
219            concurrency_level_component = self._constructor.create_component(
220                model_type=ConcurrencyLevelModel,
221                component_definition=concurrency_level_from_manifest,
222                config=config or {},
223            )
224            if not isinstance(concurrency_level_component, ConcurrencyLevel):
225                raise ValueError(
226                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
227                )
228
229            concurrency_level = concurrency_level_component.get_concurrency_level()
230            initial_number_of_partitions_to_generate = max(
231                concurrency_level // 2, 1
232            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
233        else:
234            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
235            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
236
237        self._concurrent_source = ConcurrentSource.create(
238            num_workers=concurrency_level,
239            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
240            logger=self.logger,
241            slice_logger=self._slice_logger,
242            queue=queue,
243            message_repository=self._message_repository,
244        )
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
351    @property
352    def resolved_manifest(self) -> Mapping[str, Any]:
353        """
354        Returns the resolved manifest configuration for the source.
355
356        This property provides access to the internal source configuration as a mapping,
357        which contains all settings and parameters required to define the source's behavior.
358
359        Returns:
360            Mapping[str, Any]: The resolved source configuration manifest.
361        """
362        return self._source_config

Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.

Returns:

Mapping[str, Any]: The resolved source configuration manifest.

def deprecation_warnings(self) -> List[airbyte_cdk.connector_builder.models.LogMessage]:
364    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
365        return self._constructor.get_model_deprecations()
def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
367    def read(
368        self,
369        logger: logging.Logger,
370        config: Mapping[str, Any],
371        catalog: ConfiguredAirbyteCatalog,
372        state: Optional[List[AirbyteStateMessage]] = None,
373    ) -> Iterator[AirbyteMessage]:
374        selected_concurrent_streams = self._select_streams(
375            streams=self.streams(config=self._config),  # type: ignore  # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
376            configured_catalog=catalog,
377        )
378
379        # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
380        # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
381        if len(selected_concurrent_streams) > 0:
382            yield from self._concurrent_source.read(selected_concurrent_streams)

Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
384    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
385        return AirbyteCatalog(
386            streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
387        )

Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream]:
390    def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
391        """
392        The `streams` method is used as part of the AbstractSource in the following cases:
393        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
394        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
395        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
396
397        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
398        """
399
400        if self._spec_component:
401            self._spec_component.validate_config(self._config)
402
403        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
404
405        api_budget_model = self._source_config.get("api_budget")
406        if api_budget_model:
407            self._constructor.set_api_budget(api_budget_model, self._config)
408
409        source_streams = [
410            self._constructor.create_component(
411                (
412                    StateDelegatingStreamModel
413                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
414                    else DeclarativeStreamModel
415                ),
416                stream_config,
417                self._config,
418                emit_connector_builder_messages=self._emit_connector_builder_messages,
419            )
420            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
421        ]
422        return source_streams

The streams method is used as part of the AbstractSource in the following cases:

  • ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
  • ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by streams) Note that super.streams(config) is also called when splitting the streams between concurrent or not in _group_streams.

In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
477    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
478        """
479        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
480        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
481        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
482        in the project root.
483        """
484        return (
485            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
486        )

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
488    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
489        check = self._source_config.get("check")
490        if not check:
491            raise ValueError(f"Missing 'check' component definition within the manifest.")
492
493        if "type" not in check:
494            check["type"] = "CheckStream"
495        connection_checker = self._constructor.create_component(
496            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
497            check,
498            dict(),
499            emit_connector_builder_messages=self._emit_connector_builder_messages,
500        )
501        if not isinstance(connection_checker, ConnectionChecker):
502            raise ValueError(
503                f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}"
504            )
505
506        check_succeeded, error = connection_checker.check_connection(self, logger, self._config)
507        if not check_succeeded:
508            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
509        return AirbyteConnectionStatus(status=Status.SUCCEEDED)

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

dynamic_streams: List[Dict[str, Any]]
511    @property
512    def dynamic_streams(self) -> List[Dict[str, Any]]:
513        return self._dynamic_stream_configs(
514            manifest=self._source_config,
515            with_dynamic_stream_name=True,
516        )