airbyte_cdk.sources.declarative.concurrent_declarative_source

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
  7
  8from airbyte_cdk.models import (
  9    AirbyteCatalog,
 10    AirbyteMessage,
 11    AirbyteStateMessage,
 12    ConfiguredAirbyteCatalog,
 13)
 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
 18from airbyte_cdk.sources.declarative.extractors import RecordSelector
 19from airbyte_cdk.sources.declarative.extractors.record_filter import (
 20    ClientSideIncrementalRecordFilterDecorator,
 21)
 22from airbyte_cdk.sources.declarative.incremental import (
 23    ConcurrentPerPartitionCursor,
 24    GlobalSubstreamCursor,
 25)
 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
 28    PerPartitionWithGlobalCursor,
 29)
 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
 31from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 32    ConcurrencyLevel as ConcurrencyLevelModel,
 33)
 34from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 35    DatetimeBasedCursor as DatetimeBasedCursorModel,
 36)
 37from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 38    IncrementingCountCursor as IncrementingCountCursorModel,
 39)
 40from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 41    ModelToComponentFactory,
 42)
 43from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
 44from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
 45from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
 46    DeclarativePartitionFactory,
 47    StreamSlicerPartitionGenerator,
 48)
 49from airbyte_cdk.sources.declarative.types import ConnectionDefinition
 50from airbyte_cdk.sources.source import TState
 51from airbyte_cdk.sources.streams import Stream
 52from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 53from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 54from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
 55    AlwaysAvailableAvailabilityStrategy,
 56)
 57from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
 58from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 59from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
 60
 61
 62class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 63    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 64    # because it has hit the limit of futures but not partition reader is consuming them.
 65    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 66
 67    def __init__(
 68        self,
 69        catalog: Optional[ConfiguredAirbyteCatalog],
 70        config: Optional[Mapping[str, Any]],
 71        state: TState,
 72        source_config: ConnectionDefinition,
 73        debug: bool = False,
 74        emit_connector_builder_messages: bool = False,
 75        component_factory: Optional[ModelToComponentFactory] = None,
 76        **kwargs: Any,
 77    ) -> None:
 78        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 79        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 80        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 81
 82        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 83        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 84        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 85        # incremental streams running in full refresh.
 86        component_factory = component_factory or ModelToComponentFactory(
 87            emit_connector_builder_messages=emit_connector_builder_messages,
 88            disable_resumable_full_refresh=True,
 89            connector_state_manager=self._connector_state_manager,
 90        )
 91
 92        super().__init__(
 93            source_config=source_config,
 94            config=config,
 95            debug=debug,
 96            emit_connector_builder_messages=emit_connector_builder_messages,
 97            component_factory=component_factory,
 98        )
 99
100        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
101        if concurrency_level_from_manifest:
102            concurrency_level_component = self._constructor.create_component(
103                model_type=ConcurrencyLevelModel,
104                component_definition=concurrency_level_from_manifest,
105                config=config or {},
106            )
107            if not isinstance(concurrency_level_component, ConcurrencyLevel):
108                raise ValueError(
109                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
110                )
111
112            concurrency_level = concurrency_level_component.get_concurrency_level()
113            initial_number_of_partitions_to_generate = max(
114                concurrency_level // 2, 1
115            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
116        else:
117            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
118            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
119
120        self._concurrent_source = ConcurrentSource.create(
121            num_workers=concurrency_level,
122            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
123            logger=self.logger,
124            slice_logger=self._slice_logger,
125            message_repository=self.message_repository,
126        )
127
128    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
129    @property
130    def is_partially_declarative(self) -> bool:
131        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
132        return False
133
134    def read(
135        self,
136        logger: logging.Logger,
137        config: Mapping[str, Any],
138        catalog: ConfiguredAirbyteCatalog,
139        state: Optional[List[AirbyteStateMessage]] = None,
140    ) -> Iterator[AirbyteMessage]:
141        concurrent_streams, _ = self._group_streams(config=config)
142
143        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
144        # the concurrent streams must be saved so that they can be removed from the catalog before starting
145        # synchronous streams
146        if len(concurrent_streams) > 0:
147            concurrent_stream_names = set(
148                [concurrent_stream.name for concurrent_stream in concurrent_streams]
149            )
150
151            selected_concurrent_streams = self._select_streams(
152                streams=concurrent_streams, configured_catalog=catalog
153            )
154            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
155            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
156            if selected_concurrent_streams:
157                yield from self._concurrent_source.read(selected_concurrent_streams)
158
159            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
160            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
161            # of which were already synced using the Concurrent CDK
162            filtered_catalog = self._remove_concurrent_streams_from_catalog(
163                catalog=catalog, concurrent_stream_names=concurrent_stream_names
164            )
165        else:
166            filtered_catalog = catalog
167
168        # It is no need run read for synchronous streams if they are not exists.
169        if not filtered_catalog.streams:
170            return
171
172        yield from super().read(logger, config, filtered_catalog, state)
173
174    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
175        concurrent_streams, synchronous_streams = self._group_streams(config=config)
176        return AirbyteCatalog(
177            streams=[
178                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
179            ]
180        )
181
182    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
183        """
184        The `streams` method is used as part of the AbstractSource in the following cases:
185        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
186        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
187        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
188
189        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
190        """
191        return super().streams(config)
192
193    def _group_streams(
194        self, config: Mapping[str, Any]
195    ) -> Tuple[List[AbstractStream], List[Stream]]:
196        concurrent_streams: List[AbstractStream] = []
197        synchronous_streams: List[Stream] = []
198
199        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
200        # and this is validated during the initialization of the source.
201        streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
202            self._source_config, config
203        )
204
205        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
206
207        for declarative_stream in self.streams(config=config):
208            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
209            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
210            # so we need to treat them as synchronous
211
212            if (
213                isinstance(declarative_stream, DeclarativeStream)
214                and name_to_stream_mapping[declarative_stream.name]["type"]
215                == "StateDelegatingStream"
216            ):
217                stream_state = self._connector_state_manager.get_stream_state(
218                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
219                )
220
221                name_to_stream_mapping[declarative_stream.name] = (
222                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
223                    if stream_state
224                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
225                )
226
227            if isinstance(declarative_stream, DeclarativeStream) and (
228                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
229                == "SimpleRetriever"
230                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
231                == "AsyncRetriever"
232            ):
233                incremental_sync_component_definition = name_to_stream_mapping[
234                    declarative_stream.name
235                ].get("incremental_sync")
236
237                partition_router_component_definition = (
238                    name_to_stream_mapping[declarative_stream.name]
239                    .get("retriever", {})
240                    .get("partition_router")
241                )
242                is_without_partition_router_or_cursor = not bool(
243                    incremental_sync_component_definition
244                ) and not bool(partition_router_component_definition)
245
246                is_substream_without_incremental = (
247                    partition_router_component_definition
248                    and not incremental_sync_component_definition
249                )
250
251                if self._is_concurrent_cursor_incremental_without_partition_routing(
252                    declarative_stream, incremental_sync_component_definition
253                ):
254                    stream_state = self._connector_state_manager.get_stream_state(
255                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
256                    )
257                    stream_state = self._migrate_state(declarative_stream, stream_state)
258
259                    retriever = self._get_retriever(declarative_stream, stream_state)
260
261                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
262                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
263                    ):
264                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
265
266                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
267                            # This should never happen since we instantiate ConcurrentCursor in
268                            # model_to_component_factory.py
269                            raise ValueError(
270                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
271                            )
272
273                        partition_generator = StreamSlicerPartitionGenerator(
274                            partition_factory=DeclarativePartitionFactory(
275                                declarative_stream.name,
276                                declarative_stream.get_json_schema(),
277                                retriever,
278                                self.message_repository,
279                            ),
280                            stream_slicer=declarative_stream.retriever.stream_slicer,
281                        )
282                    else:
283                        if (
284                            incremental_sync_component_definition
285                            and incremental_sync_component_definition.get("type")
286                            == IncrementingCountCursorModel.__name__
287                        ):
288                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
289                                model_type=IncrementingCountCursorModel,
290                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
291                                stream_name=declarative_stream.name,
292                                stream_namespace=declarative_stream.namespace,
293                                config=config or {},
294                            )
295                        else:
296                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
297                                model_type=DatetimeBasedCursorModel,
298                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
299                                stream_name=declarative_stream.name,
300                                stream_namespace=declarative_stream.namespace,
301                                config=config or {},
302                            )
303                        partition_generator = StreamSlicerPartitionGenerator(
304                            partition_factory=DeclarativePartitionFactory(
305                                declarative_stream.name,
306                                declarative_stream.get_json_schema(),
307                                retriever,
308                                self.message_repository,
309                            ),
310                            stream_slicer=cursor,
311                        )
312
313                    concurrent_streams.append(
314                        DefaultStream(
315                            partition_generator=partition_generator,
316                            name=declarative_stream.name,
317                            json_schema=declarative_stream.get_json_schema(),
318                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
319                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
320                            cursor_field=cursor.cursor_field.cursor_field_key
321                            if hasattr(cursor, "cursor_field")
322                            and hasattr(
323                                cursor.cursor_field, "cursor_field_key"
324                            )  # FIXME this will need to be updated once we do the per partition
325                            else None,
326                            logger=self.logger,
327                            cursor=cursor,
328                        )
329                    )
330                elif (
331                    is_substream_without_incremental or is_without_partition_router_or_cursor
332                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
333                    partition_generator = StreamSlicerPartitionGenerator(
334                        DeclarativePartitionFactory(
335                            declarative_stream.name,
336                            declarative_stream.get_json_schema(),
337                            declarative_stream.retriever,
338                            self.message_repository,
339                        ),
340                        declarative_stream.retriever.stream_slicer,
341                    )
342
343                    final_state_cursor = FinalStateCursor(
344                        stream_name=declarative_stream.name,
345                        stream_namespace=declarative_stream.namespace,
346                        message_repository=self.message_repository,
347                    )
348
349                    concurrent_streams.append(
350                        DefaultStream(
351                            partition_generator=partition_generator,
352                            name=declarative_stream.name,
353                            json_schema=declarative_stream.get_json_schema(),
354                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
355                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
356                            cursor_field=None,
357                            logger=self.logger,
358                            cursor=final_state_cursor,
359                        )
360                    )
361                elif (
362                    incremental_sync_component_definition
363                    and incremental_sync_component_definition.get("type", "")
364                    == DatetimeBasedCursorModel.__name__
365                    and hasattr(declarative_stream.retriever, "stream_slicer")
366                    and isinstance(
367                        declarative_stream.retriever.stream_slicer,
368                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
369                    )
370                ):
371                    stream_state = self._connector_state_manager.get_stream_state(
372                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
373                    )
374                    stream_state = self._migrate_state(declarative_stream, stream_state)
375
376                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
377
378                    perpartition_cursor = (
379                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
380                            state_manager=self._connector_state_manager,
381                            model_type=DatetimeBasedCursorModel,
382                            component_definition=incremental_sync_component_definition,
383                            stream_name=declarative_stream.name,
384                            stream_namespace=declarative_stream.namespace,
385                            config=config or {},
386                            stream_state=stream_state,
387                            partition_router=partition_router,
388                        )
389                    )
390
391                    retriever = self._get_retriever(declarative_stream, stream_state)
392
393                    partition_generator = StreamSlicerPartitionGenerator(
394                        DeclarativePartitionFactory(
395                            declarative_stream.name,
396                            declarative_stream.get_json_schema(),
397                            retriever,
398                            self.message_repository,
399                        ),
400                        perpartition_cursor,
401                    )
402
403                    concurrent_streams.append(
404                        DefaultStream(
405                            partition_generator=partition_generator,
406                            name=declarative_stream.name,
407                            json_schema=declarative_stream.get_json_schema(),
408                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
409                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
410                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
411                            logger=self.logger,
412                            cursor=perpartition_cursor,
413                        )
414                    )
415                else:
416                    synchronous_streams.append(declarative_stream)
417            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
418            # Condition below needs to ensure that concurrent support is not lost for sources that already support
419            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
420            elif (
421                isinstance(declarative_stream, AbstractStreamFacade)
422                and self.is_partially_declarative
423            ):
424                concurrent_streams.append(declarative_stream.get_underlying_stream())
425            else:
426                synchronous_streams.append(declarative_stream)
427
428        return concurrent_streams, synchronous_streams
429
430    def _is_concurrent_cursor_incremental_without_partition_routing(
431        self,
432        declarative_stream: DeclarativeStream,
433        incremental_sync_component_definition: Mapping[str, Any] | None,
434    ) -> bool:
435        return (
436            incremental_sync_component_definition is not None
437            and bool(incremental_sync_component_definition)
438            and (
439                incremental_sync_component_definition.get("type", "")
440                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
441            )
442            and hasattr(declarative_stream.retriever, "stream_slicer")
443            and (
444                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
445                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
446                # add isintance check here if we want to create a Declarative IncrementingCountCursor
447                # or isinstance(
448                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
449                # )
450                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
451            )
452        )
453
454    @staticmethod
455    def _get_retriever(
456        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
457    ) -> Retriever:
458        retriever = declarative_stream.retriever
459
460        # This is an optimization so that we don't invoke any cursor or state management flows within the
461        # low-code framework because state management is handled through the ConcurrentCursor.
462        if declarative_stream and isinstance(retriever, SimpleRetriever):
463            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
464            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
465            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
466            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
467            # properly initialized with state.
468            if retriever.cursor:
469                retriever.cursor.set_initial_state(stream_state=stream_state)
470
471            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
472            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
473            # for semi-incremental streams using is_client_side_incremental to filter properly
474            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
475                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
476            ):
477                retriever.record_selector.record_filter._cursor.set_initial_state(
478                    stream_state=stream_state
479                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
480
481            # We zero it out here, but since this is a cursor reference, the state is still properly
482            # instantiated for the other components that reference it
483            retriever.cursor = None
484
485        return retriever
486
487    @staticmethod
488    def _select_streams(
489        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
490    ) -> List[AbstractStream]:
491        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
492        abstract_streams: List[AbstractStream] = []
493        for configured_stream in configured_catalog.streams:
494            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
495            if stream_instance:
496                abstract_streams.append(stream_instance)
497
498        return abstract_streams
499
500    @staticmethod
501    def _remove_concurrent_streams_from_catalog(
502        catalog: ConfiguredAirbyteCatalog,
503        concurrent_stream_names: set[str],
504    ) -> ConfiguredAirbyteCatalog:
505        return ConfiguredAirbyteCatalog(
506            streams=[
507                stream
508                for stream in catalog.streams
509                if stream.stream.name not in concurrent_stream_names
510            ]
511        )
512
513    @staticmethod
514    def _migrate_state(
515        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
516    ) -> MutableMapping[str, Any]:
517        for state_migration in declarative_stream.state_migrations:
518            if state_migration.should_migrate(stream_state):
519                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
520                stream_state = dict(state_migration.migrate(stream_state))
521
522        return stream_state
class ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
 63class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 64    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 65    # because it has hit the limit of futures but not partition reader is consuming them.
 66    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 67
 68    def __init__(
 69        self,
 70        catalog: Optional[ConfiguredAirbyteCatalog],
 71        config: Optional[Mapping[str, Any]],
 72        state: TState,
 73        source_config: ConnectionDefinition,
 74        debug: bool = False,
 75        emit_connector_builder_messages: bool = False,
 76        component_factory: Optional[ModelToComponentFactory] = None,
 77        **kwargs: Any,
 78    ) -> None:
 79        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 80        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 81        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 82
 83        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 84        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 85        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 86        # incremental streams running in full refresh.
 87        component_factory = component_factory or ModelToComponentFactory(
 88            emit_connector_builder_messages=emit_connector_builder_messages,
 89            disable_resumable_full_refresh=True,
 90            connector_state_manager=self._connector_state_manager,
 91        )
 92
 93        super().__init__(
 94            source_config=source_config,
 95            config=config,
 96            debug=debug,
 97            emit_connector_builder_messages=emit_connector_builder_messages,
 98            component_factory=component_factory,
 99        )
100
101        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
102        if concurrency_level_from_manifest:
103            concurrency_level_component = self._constructor.create_component(
104                model_type=ConcurrencyLevelModel,
105                component_definition=concurrency_level_from_manifest,
106                config=config or {},
107            )
108            if not isinstance(concurrency_level_component, ConcurrencyLevel):
109                raise ValueError(
110                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
111                )
112
113            concurrency_level = concurrency_level_component.get_concurrency_level()
114            initial_number_of_partitions_to_generate = max(
115                concurrency_level // 2, 1
116            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
117        else:
118            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
119            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
120
121        self._concurrent_source = ConcurrentSource.create(
122            num_workers=concurrency_level,
123            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
124            logger=self.logger,
125            slice_logger=self._slice_logger,
126            message_repository=self.message_repository,
127        )
128
129    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
130    @property
131    def is_partially_declarative(self) -> bool:
132        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
133        return False
134
135    def read(
136        self,
137        logger: logging.Logger,
138        config: Mapping[str, Any],
139        catalog: ConfiguredAirbyteCatalog,
140        state: Optional[List[AirbyteStateMessage]] = None,
141    ) -> Iterator[AirbyteMessage]:
142        concurrent_streams, _ = self._group_streams(config=config)
143
144        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
145        # the concurrent streams must be saved so that they can be removed from the catalog before starting
146        # synchronous streams
147        if len(concurrent_streams) > 0:
148            concurrent_stream_names = set(
149                [concurrent_stream.name for concurrent_stream in concurrent_streams]
150            )
151
152            selected_concurrent_streams = self._select_streams(
153                streams=concurrent_streams, configured_catalog=catalog
154            )
155            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
156            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
157            if selected_concurrent_streams:
158                yield from self._concurrent_source.read(selected_concurrent_streams)
159
160            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
161            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
162            # of which were already synced using the Concurrent CDK
163            filtered_catalog = self._remove_concurrent_streams_from_catalog(
164                catalog=catalog, concurrent_stream_names=concurrent_stream_names
165            )
166        else:
167            filtered_catalog = catalog
168
169        # It is no need run read for synchronous streams if they are not exists.
170        if not filtered_catalog.streams:
171            return
172
173        yield from super().read(logger, config, filtered_catalog, state)
174
175    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
176        concurrent_streams, synchronous_streams = self._group_streams(config=config)
177        return AirbyteCatalog(
178            streams=[
179                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
180            ]
181        )
182
183    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
184        """
185        The `streams` method is used as part of the AbstractSource in the following cases:
186        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
187        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
188        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
189
190        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
191        """
192        return super().streams(config)
193
194    def _group_streams(
195        self, config: Mapping[str, Any]
196    ) -> Tuple[List[AbstractStream], List[Stream]]:
197        concurrent_streams: List[AbstractStream] = []
198        synchronous_streams: List[Stream] = []
199
200        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
201        # and this is validated during the initialization of the source.
202        streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
203            self._source_config, config
204        )
205
206        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
207
208        for declarative_stream in self.streams(config=config):
209            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
210            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
211            # so we need to treat them as synchronous
212
213            if (
214                isinstance(declarative_stream, DeclarativeStream)
215                and name_to_stream_mapping[declarative_stream.name]["type"]
216                == "StateDelegatingStream"
217            ):
218                stream_state = self._connector_state_manager.get_stream_state(
219                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
220                )
221
222                name_to_stream_mapping[declarative_stream.name] = (
223                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
224                    if stream_state
225                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
226                )
227
228            if isinstance(declarative_stream, DeclarativeStream) and (
229                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
230                == "SimpleRetriever"
231                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
232                == "AsyncRetriever"
233            ):
234                incremental_sync_component_definition = name_to_stream_mapping[
235                    declarative_stream.name
236                ].get("incremental_sync")
237
238                partition_router_component_definition = (
239                    name_to_stream_mapping[declarative_stream.name]
240                    .get("retriever", {})
241                    .get("partition_router")
242                )
243                is_without_partition_router_or_cursor = not bool(
244                    incremental_sync_component_definition
245                ) and not bool(partition_router_component_definition)
246
247                is_substream_without_incremental = (
248                    partition_router_component_definition
249                    and not incremental_sync_component_definition
250                )
251
252                if self._is_concurrent_cursor_incremental_without_partition_routing(
253                    declarative_stream, incremental_sync_component_definition
254                ):
255                    stream_state = self._connector_state_manager.get_stream_state(
256                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
257                    )
258                    stream_state = self._migrate_state(declarative_stream, stream_state)
259
260                    retriever = self._get_retriever(declarative_stream, stream_state)
261
262                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
263                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
264                    ):
265                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
266
267                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
268                            # This should never happen since we instantiate ConcurrentCursor in
269                            # model_to_component_factory.py
270                            raise ValueError(
271                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
272                            )
273
274                        partition_generator = StreamSlicerPartitionGenerator(
275                            partition_factory=DeclarativePartitionFactory(
276                                declarative_stream.name,
277                                declarative_stream.get_json_schema(),
278                                retriever,
279                                self.message_repository,
280                            ),
281                            stream_slicer=declarative_stream.retriever.stream_slicer,
282                        )
283                    else:
284                        if (
285                            incremental_sync_component_definition
286                            and incremental_sync_component_definition.get("type")
287                            == IncrementingCountCursorModel.__name__
288                        ):
289                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
290                                model_type=IncrementingCountCursorModel,
291                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
292                                stream_name=declarative_stream.name,
293                                stream_namespace=declarative_stream.namespace,
294                                config=config or {},
295                            )
296                        else:
297                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
298                                model_type=DatetimeBasedCursorModel,
299                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
300                                stream_name=declarative_stream.name,
301                                stream_namespace=declarative_stream.namespace,
302                                config=config or {},
303                            )
304                        partition_generator = StreamSlicerPartitionGenerator(
305                            partition_factory=DeclarativePartitionFactory(
306                                declarative_stream.name,
307                                declarative_stream.get_json_schema(),
308                                retriever,
309                                self.message_repository,
310                            ),
311                            stream_slicer=cursor,
312                        )
313
314                    concurrent_streams.append(
315                        DefaultStream(
316                            partition_generator=partition_generator,
317                            name=declarative_stream.name,
318                            json_schema=declarative_stream.get_json_schema(),
319                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
320                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
321                            cursor_field=cursor.cursor_field.cursor_field_key
322                            if hasattr(cursor, "cursor_field")
323                            and hasattr(
324                                cursor.cursor_field, "cursor_field_key"
325                            )  # FIXME this will need to be updated once we do the per partition
326                            else None,
327                            logger=self.logger,
328                            cursor=cursor,
329                        )
330                    )
331                elif (
332                    is_substream_without_incremental or is_without_partition_router_or_cursor
333                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
334                    partition_generator = StreamSlicerPartitionGenerator(
335                        DeclarativePartitionFactory(
336                            declarative_stream.name,
337                            declarative_stream.get_json_schema(),
338                            declarative_stream.retriever,
339                            self.message_repository,
340                        ),
341                        declarative_stream.retriever.stream_slicer,
342                    )
343
344                    final_state_cursor = FinalStateCursor(
345                        stream_name=declarative_stream.name,
346                        stream_namespace=declarative_stream.namespace,
347                        message_repository=self.message_repository,
348                    )
349
350                    concurrent_streams.append(
351                        DefaultStream(
352                            partition_generator=partition_generator,
353                            name=declarative_stream.name,
354                            json_schema=declarative_stream.get_json_schema(),
355                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
356                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
357                            cursor_field=None,
358                            logger=self.logger,
359                            cursor=final_state_cursor,
360                        )
361                    )
362                elif (
363                    incremental_sync_component_definition
364                    and incremental_sync_component_definition.get("type", "")
365                    == DatetimeBasedCursorModel.__name__
366                    and hasattr(declarative_stream.retriever, "stream_slicer")
367                    and isinstance(
368                        declarative_stream.retriever.stream_slicer,
369                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
370                    )
371                ):
372                    stream_state = self._connector_state_manager.get_stream_state(
373                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
374                    )
375                    stream_state = self._migrate_state(declarative_stream, stream_state)
376
377                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
378
379                    perpartition_cursor = (
380                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
381                            state_manager=self._connector_state_manager,
382                            model_type=DatetimeBasedCursorModel,
383                            component_definition=incremental_sync_component_definition,
384                            stream_name=declarative_stream.name,
385                            stream_namespace=declarative_stream.namespace,
386                            config=config or {},
387                            stream_state=stream_state,
388                            partition_router=partition_router,
389                        )
390                    )
391
392                    retriever = self._get_retriever(declarative_stream, stream_state)
393
394                    partition_generator = StreamSlicerPartitionGenerator(
395                        DeclarativePartitionFactory(
396                            declarative_stream.name,
397                            declarative_stream.get_json_schema(),
398                            retriever,
399                            self.message_repository,
400                        ),
401                        perpartition_cursor,
402                    )
403
404                    concurrent_streams.append(
405                        DefaultStream(
406                            partition_generator=partition_generator,
407                            name=declarative_stream.name,
408                            json_schema=declarative_stream.get_json_schema(),
409                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
410                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
411                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
412                            logger=self.logger,
413                            cursor=perpartition_cursor,
414                        )
415                    )
416                else:
417                    synchronous_streams.append(declarative_stream)
418            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
419            # Condition below needs to ensure that concurrent support is not lost for sources that already support
420            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
421            elif (
422                isinstance(declarative_stream, AbstractStreamFacade)
423                and self.is_partially_declarative
424            ):
425                concurrent_streams.append(declarative_stream.get_underlying_stream())
426            else:
427                synchronous_streams.append(declarative_stream)
428
429        return concurrent_streams, synchronous_streams
430
431    def _is_concurrent_cursor_incremental_without_partition_routing(
432        self,
433        declarative_stream: DeclarativeStream,
434        incremental_sync_component_definition: Mapping[str, Any] | None,
435    ) -> bool:
436        return (
437            incremental_sync_component_definition is not None
438            and bool(incremental_sync_component_definition)
439            and (
440                incremental_sync_component_definition.get("type", "")
441                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
442            )
443            and hasattr(declarative_stream.retriever, "stream_slicer")
444            and (
445                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
446                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
447                # add isintance check here if we want to create a Declarative IncrementingCountCursor
448                # or isinstance(
449                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
450                # )
451                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
452            )
453        )
454
455    @staticmethod
456    def _get_retriever(
457        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
458    ) -> Retriever:
459        retriever = declarative_stream.retriever
460
461        # This is an optimization so that we don't invoke any cursor or state management flows within the
462        # low-code framework because state management is handled through the ConcurrentCursor.
463        if declarative_stream and isinstance(retriever, SimpleRetriever):
464            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
465            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
466            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
467            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
468            # properly initialized with state.
469            if retriever.cursor:
470                retriever.cursor.set_initial_state(stream_state=stream_state)
471
472            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
473            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
474            # for semi-incremental streams using is_client_side_incremental to filter properly
475            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
476                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
477            ):
478                retriever.record_selector.record_filter._cursor.set_initial_state(
479                    stream_state=stream_state
480                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
481
482            # We zero it out here, but since this is a cursor reference, the state is still properly
483            # instantiated for the other components that reference it
484            retriever.cursor = None
485
486        return retriever
487
488    @staticmethod
489    def _select_streams(
490        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
491    ) -> List[AbstractStream]:
492        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
493        abstract_streams: List[AbstractStream] = []
494        for configured_stream in configured_catalog.streams:
495            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
496            if stream_instance:
497                abstract_streams.append(stream_instance)
498
499        return abstract_streams
500
501    @staticmethod
502    def _remove_concurrent_streams_from_catalog(
503        catalog: ConfiguredAirbyteCatalog,
504        concurrent_stream_names: set[str],
505    ) -> ConfiguredAirbyteCatalog:
506        return ConfiguredAirbyteCatalog(
507            streams=[
508                stream
509                for stream in catalog.streams
510                if stream.stream.name not in concurrent_stream_names
511            ]
512        )
513
514    @staticmethod
515    def _migrate_state(
516        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
517    ) -> MutableMapping[str, Any]:
518        for state_migration in declarative_stream.state_migrations:
519            if state_migration.should_migrate(stream_state):
520                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
521                stream_state = dict(state_migration.migrate(stream_state))
522
523        return stream_state

Declarative source defined by a manifest of low-code components that define source connector behavior

is_partially_declarative: bool
130    @property
131    def is_partially_declarative(self) -> bool:
132        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
133        return False

This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
175    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
176        concurrent_streams, synchronous_streams = self._group_streams(config=config)
177        return AirbyteCatalog(
178            streams=[
179                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
180            ]
181        )

Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.