airbyte_cdk.sources.declarative.concurrent_declarative_source

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
  7
  8from airbyte_cdk.models import (
  9    AirbyteCatalog,
 10    AirbyteMessage,
 11    AirbyteStateMessage,
 12    ConfiguredAirbyteCatalog,
 13)
 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
 18from airbyte_cdk.sources.declarative.extractors import RecordSelector
 19from airbyte_cdk.sources.declarative.extractors.record_filter import (
 20    ClientSideIncrementalRecordFilterDecorator,
 21)
 22from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor
 23from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
 24from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
 25    PerPartitionWithGlobalCursor,
 26)
 27from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
 28from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 29    ConcurrencyLevel as ConcurrencyLevelModel,
 30)
 31from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 32    DatetimeBasedCursor as DatetimeBasedCursorModel,
 33)
 34from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 35    IncrementingCountCursor as IncrementingCountCursorModel,
 36)
 37from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 38    ModelToComponentFactory,
 39)
 40from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
 41from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
 42from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
 43    DeclarativePartitionFactory,
 44    StreamSlicerPartitionGenerator,
 45)
 46from airbyte_cdk.sources.declarative.types import ConnectionDefinition
 47from airbyte_cdk.sources.source import TState
 48from airbyte_cdk.sources.streams import Stream
 49from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 50from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 51from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
 52    AlwaysAvailableAvailabilityStrategy,
 53)
 54from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
 55from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 56from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
 57
 58
 59class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 60    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 61    # because it has hit the limit of futures but not partition reader is consuming them.
 62    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 63
 64    def __init__(
 65        self,
 66        catalog: Optional[ConfiguredAirbyteCatalog],
 67        config: Optional[Mapping[str, Any]],
 68        state: TState,
 69        source_config: ConnectionDefinition,
 70        debug: bool = False,
 71        emit_connector_builder_messages: bool = False,
 72        component_factory: Optional[ModelToComponentFactory] = None,
 73        **kwargs: Any,
 74    ) -> None:
 75        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 76        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 77        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 78
 79        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 80        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 81        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 82        # incremental streams running in full refresh.
 83        component_factory = component_factory or ModelToComponentFactory(
 84            emit_connector_builder_messages=emit_connector_builder_messages,
 85            disable_resumable_full_refresh=True,
 86            connector_state_manager=self._connector_state_manager,
 87        )
 88
 89        super().__init__(
 90            source_config=source_config,
 91            config=config,
 92            debug=debug,
 93            emit_connector_builder_messages=emit_connector_builder_messages,
 94            component_factory=component_factory,
 95        )
 96
 97        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
 98        if concurrency_level_from_manifest:
 99            concurrency_level_component = self._constructor.create_component(
100                model_type=ConcurrencyLevelModel,
101                component_definition=concurrency_level_from_manifest,
102                config=config or {},
103            )
104            if not isinstance(concurrency_level_component, ConcurrencyLevel):
105                raise ValueError(
106                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
107                )
108
109            concurrency_level = concurrency_level_component.get_concurrency_level()
110            initial_number_of_partitions_to_generate = max(
111                concurrency_level // 2, 1
112            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
113        else:
114            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
115            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
116
117        self._concurrent_source = ConcurrentSource.create(
118            num_workers=concurrency_level,
119            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
120            logger=self.logger,
121            slice_logger=self._slice_logger,
122            message_repository=self.message_repository,
123        )
124
125    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
126    @property
127    def is_partially_declarative(self) -> bool:
128        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
129        return False
130
131    def read(
132        self,
133        logger: logging.Logger,
134        config: Mapping[str, Any],
135        catalog: ConfiguredAirbyteCatalog,
136        state: Optional[List[AirbyteStateMessage]] = None,
137    ) -> Iterator[AirbyteMessage]:
138        concurrent_streams, _ = self._group_streams(config=config)
139
140        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
141        # the concurrent streams must be saved so that they can be removed from the catalog before starting
142        # synchronous streams
143        if len(concurrent_streams) > 0:
144            concurrent_stream_names = set(
145                [concurrent_stream.name for concurrent_stream in concurrent_streams]
146            )
147
148            selected_concurrent_streams = self._select_streams(
149                streams=concurrent_streams, configured_catalog=catalog
150            )
151            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
152            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
153            if selected_concurrent_streams:
154                yield from self._concurrent_source.read(selected_concurrent_streams)
155
156            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
157            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
158            # of which were already synced using the Concurrent CDK
159            filtered_catalog = self._remove_concurrent_streams_from_catalog(
160                catalog=catalog, concurrent_stream_names=concurrent_stream_names
161            )
162        else:
163            filtered_catalog = catalog
164
165        # It is no need run read for synchronous streams if they are not exists.
166        if not filtered_catalog.streams:
167            return
168
169        yield from super().read(logger, config, filtered_catalog, state)
170
171    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
172        concurrent_streams, synchronous_streams = self._group_streams(config=config)
173        return AirbyteCatalog(
174            streams=[
175                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
176            ]
177        )
178
179    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
180        """
181        The `streams` method is used as part of the AbstractSource in the following cases:
182        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
183        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
184        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
185
186        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
187        """
188        return super().streams(config)
189
190    def _group_streams(
191        self, config: Mapping[str, Any]
192    ) -> Tuple[List[AbstractStream], List[Stream]]:
193        concurrent_streams: List[AbstractStream] = []
194        synchronous_streams: List[Stream] = []
195
196        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
197        # and this is validated during the initialization of the source.
198        streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
199            self._source_config, config
200        )
201
202        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
203
204        for declarative_stream in self.streams(config=config):
205            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
206            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
207            # so we need to treat them as synchronous
208
209            if (
210                isinstance(declarative_stream, DeclarativeStream)
211                and name_to_stream_mapping[declarative_stream.name]["type"]
212                == "StateDelegatingStream"
213            ):
214                stream_state = self._connector_state_manager.get_stream_state(
215                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
216                )
217
218                name_to_stream_mapping[declarative_stream.name] = (
219                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
220                    if stream_state
221                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
222                )
223
224            if isinstance(declarative_stream, DeclarativeStream) and (
225                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
226                == "SimpleRetriever"
227                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
228                == "AsyncRetriever"
229            ):
230                incremental_sync_component_definition = name_to_stream_mapping[
231                    declarative_stream.name
232                ].get("incremental_sync")
233
234                partition_router_component_definition = (
235                    name_to_stream_mapping[declarative_stream.name]
236                    .get("retriever", {})
237                    .get("partition_router")
238                )
239                is_without_partition_router_or_cursor = not bool(
240                    incremental_sync_component_definition
241                ) and not bool(partition_router_component_definition)
242
243                is_substream_without_incremental = (
244                    partition_router_component_definition
245                    and not incremental_sync_component_definition
246                )
247
248                if self._is_concurrent_cursor_incremental_without_partition_routing(
249                    declarative_stream, incremental_sync_component_definition
250                ):
251                    stream_state = self._connector_state_manager.get_stream_state(
252                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
253                    )
254                    stream_state = self._migrate_state(declarative_stream, stream_state)
255
256                    retriever = self._get_retriever(declarative_stream, stream_state)
257
258                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
259                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
260                    ):
261                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
262
263                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
264                            # This should never happen since we instantiate ConcurrentCursor in
265                            # model_to_component_factory.py
266                            raise ValueError(
267                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
268                            )
269
270                        partition_generator = StreamSlicerPartitionGenerator(
271                            partition_factory=DeclarativePartitionFactory(
272                                declarative_stream.name,
273                                declarative_stream.get_json_schema(),
274                                retriever,
275                                self.message_repository,
276                            ),
277                            stream_slicer=declarative_stream.retriever.stream_slicer,
278                        )
279                    else:
280                        if (
281                            incremental_sync_component_definition
282                            and incremental_sync_component_definition.get("type")
283                            == IncrementingCountCursorModel.__name__
284                        ):
285                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
286                                model_type=IncrementingCountCursorModel,
287                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
288                                stream_name=declarative_stream.name,
289                                stream_namespace=declarative_stream.namespace,
290                                config=config or {},
291                            )
292                        else:
293                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
294                                model_type=DatetimeBasedCursorModel,
295                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
296                                stream_name=declarative_stream.name,
297                                stream_namespace=declarative_stream.namespace,
298                                config=config or {},
299                            )
300                        partition_generator = StreamSlicerPartitionGenerator(
301                            partition_factory=DeclarativePartitionFactory(
302                                declarative_stream.name,
303                                declarative_stream.get_json_schema(),
304                                retriever,
305                                self.message_repository,
306                            ),
307                            stream_slicer=cursor,
308                        )
309
310                    concurrent_streams.append(
311                        DefaultStream(
312                            partition_generator=partition_generator,
313                            name=declarative_stream.name,
314                            json_schema=declarative_stream.get_json_schema(),
315                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
316                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
317                            cursor_field=cursor.cursor_field.cursor_field_key
318                            if hasattr(cursor, "cursor_field")
319                            and hasattr(
320                                cursor.cursor_field, "cursor_field_key"
321                            )  # FIXME this will need to be updated once we do the per partition
322                            else None,
323                            logger=self.logger,
324                            cursor=cursor,
325                        )
326                    )
327                elif (
328                    is_substream_without_incremental or is_without_partition_router_or_cursor
329                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
330                    partition_generator = StreamSlicerPartitionGenerator(
331                        DeclarativePartitionFactory(
332                            declarative_stream.name,
333                            declarative_stream.get_json_schema(),
334                            declarative_stream.retriever,
335                            self.message_repository,
336                        ),
337                        declarative_stream.retriever.stream_slicer,
338                    )
339
340                    final_state_cursor = FinalStateCursor(
341                        stream_name=declarative_stream.name,
342                        stream_namespace=declarative_stream.namespace,
343                        message_repository=self.message_repository,
344                    )
345
346                    concurrent_streams.append(
347                        DefaultStream(
348                            partition_generator=partition_generator,
349                            name=declarative_stream.name,
350                            json_schema=declarative_stream.get_json_schema(),
351                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
352                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
353                            cursor_field=None,
354                            logger=self.logger,
355                            cursor=final_state_cursor,
356                        )
357                    )
358                elif (
359                    incremental_sync_component_definition
360                    and incremental_sync_component_definition.get("type", "")
361                    == DatetimeBasedCursorModel.__name__
362                    and hasattr(declarative_stream.retriever, "stream_slicer")
363                    and isinstance(
364                        declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
365                    )
366                ):
367                    stream_state = self._connector_state_manager.get_stream_state(
368                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
369                    )
370                    stream_state = self._migrate_state(declarative_stream, stream_state)
371
372                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
373
374                    perpartition_cursor = (
375                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
376                            state_manager=self._connector_state_manager,
377                            model_type=DatetimeBasedCursorModel,
378                            component_definition=incremental_sync_component_definition,
379                            stream_name=declarative_stream.name,
380                            stream_namespace=declarative_stream.namespace,
381                            config=config or {},
382                            stream_state=stream_state,
383                            partition_router=partition_router,
384                        )
385                    )
386
387                    retriever = self._get_retriever(declarative_stream, stream_state)
388
389                    partition_generator = StreamSlicerPartitionGenerator(
390                        DeclarativePartitionFactory(
391                            declarative_stream.name,
392                            declarative_stream.get_json_schema(),
393                            retriever,
394                            self.message_repository,
395                        ),
396                        perpartition_cursor,
397                    )
398
399                    concurrent_streams.append(
400                        DefaultStream(
401                            partition_generator=partition_generator,
402                            name=declarative_stream.name,
403                            json_schema=declarative_stream.get_json_schema(),
404                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
405                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
406                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
407                            logger=self.logger,
408                            cursor=perpartition_cursor,
409                        )
410                    )
411                else:
412                    synchronous_streams.append(declarative_stream)
413            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
414            # Condition below needs to ensure that concurrent support is not lost for sources that already support
415            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
416            elif (
417                isinstance(declarative_stream, AbstractStreamFacade)
418                and self.is_partially_declarative
419            ):
420                concurrent_streams.append(declarative_stream.get_underlying_stream())
421            else:
422                synchronous_streams.append(declarative_stream)
423
424        return concurrent_streams, synchronous_streams
425
426    def _is_concurrent_cursor_incremental_without_partition_routing(
427        self,
428        declarative_stream: DeclarativeStream,
429        incremental_sync_component_definition: Mapping[str, Any] | None,
430    ) -> bool:
431        return (
432            incremental_sync_component_definition is not None
433            and bool(incremental_sync_component_definition)
434            and (
435                incremental_sync_component_definition.get("type", "")
436                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
437            )
438            and hasattr(declarative_stream.retriever, "stream_slicer")
439            and (
440                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
441                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
442                # add isintance check here if we want to create a Declarative IncrementingCountCursor
443                # or isinstance(
444                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
445                # )
446                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
447            )
448        )
449
450    @staticmethod
451    def _get_retriever(
452        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
453    ) -> Retriever:
454        retriever = declarative_stream.retriever
455
456        # This is an optimization so that we don't invoke any cursor or state management flows within the
457        # low-code framework because state management is handled through the ConcurrentCursor.
458        if declarative_stream and isinstance(retriever, SimpleRetriever):
459            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
460            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
461            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
462            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
463            # properly initialized with state.
464            if retriever.cursor:
465                retriever.cursor.set_initial_state(stream_state=stream_state)
466
467            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
468            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
469            # for semi-incremental streams using is_client_side_incremental to filter properly
470            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
471                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
472            ):
473                retriever.record_selector.record_filter._cursor.set_initial_state(
474                    stream_state=stream_state
475                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
476
477            # We zero it out here, but since this is a cursor reference, the state is still properly
478            # instantiated for the other components that reference it
479            retriever.cursor = None
480
481        return retriever
482
483    @staticmethod
484    def _select_streams(
485        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
486    ) -> List[AbstractStream]:
487        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
488        abstract_streams: List[AbstractStream] = []
489        for configured_stream in configured_catalog.streams:
490            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
491            if stream_instance:
492                abstract_streams.append(stream_instance)
493
494        return abstract_streams
495
496    @staticmethod
497    def _remove_concurrent_streams_from_catalog(
498        catalog: ConfiguredAirbyteCatalog,
499        concurrent_stream_names: set[str],
500    ) -> ConfiguredAirbyteCatalog:
501        return ConfiguredAirbyteCatalog(
502            streams=[
503                stream
504                for stream in catalog.streams
505                if stream.stream.name not in concurrent_stream_names
506            ]
507        )
508
509    @staticmethod
510    def _migrate_state(
511        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
512    ) -> MutableMapping[str, Any]:
513        for state_migration in declarative_stream.state_migrations:
514            if state_migration.should_migrate(stream_state):
515                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
516                stream_state = dict(state_migration.migrate(stream_state))
517
518        return stream_state
class ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
 60class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 61    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 62    # because it has hit the limit of futures but not partition reader is consuming them.
 63    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 64
 65    def __init__(
 66        self,
 67        catalog: Optional[ConfiguredAirbyteCatalog],
 68        config: Optional[Mapping[str, Any]],
 69        state: TState,
 70        source_config: ConnectionDefinition,
 71        debug: bool = False,
 72        emit_connector_builder_messages: bool = False,
 73        component_factory: Optional[ModelToComponentFactory] = None,
 74        **kwargs: Any,
 75    ) -> None:
 76        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 77        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 78        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 79
 80        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 81        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 82        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 83        # incremental streams running in full refresh.
 84        component_factory = component_factory or ModelToComponentFactory(
 85            emit_connector_builder_messages=emit_connector_builder_messages,
 86            disable_resumable_full_refresh=True,
 87            connector_state_manager=self._connector_state_manager,
 88        )
 89
 90        super().__init__(
 91            source_config=source_config,
 92            config=config,
 93            debug=debug,
 94            emit_connector_builder_messages=emit_connector_builder_messages,
 95            component_factory=component_factory,
 96        )
 97
 98        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
 99        if concurrency_level_from_manifest:
100            concurrency_level_component = self._constructor.create_component(
101                model_type=ConcurrencyLevelModel,
102                component_definition=concurrency_level_from_manifest,
103                config=config or {},
104            )
105            if not isinstance(concurrency_level_component, ConcurrencyLevel):
106                raise ValueError(
107                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
108                )
109
110            concurrency_level = concurrency_level_component.get_concurrency_level()
111            initial_number_of_partitions_to_generate = max(
112                concurrency_level // 2, 1
113            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
114        else:
115            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
116            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
117
118        self._concurrent_source = ConcurrentSource.create(
119            num_workers=concurrency_level,
120            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
121            logger=self.logger,
122            slice_logger=self._slice_logger,
123            message_repository=self.message_repository,
124        )
125
126    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
127    @property
128    def is_partially_declarative(self) -> bool:
129        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
130        return False
131
132    def read(
133        self,
134        logger: logging.Logger,
135        config: Mapping[str, Any],
136        catalog: ConfiguredAirbyteCatalog,
137        state: Optional[List[AirbyteStateMessage]] = None,
138    ) -> Iterator[AirbyteMessage]:
139        concurrent_streams, _ = self._group_streams(config=config)
140
141        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
142        # the concurrent streams must be saved so that they can be removed from the catalog before starting
143        # synchronous streams
144        if len(concurrent_streams) > 0:
145            concurrent_stream_names = set(
146                [concurrent_stream.name for concurrent_stream in concurrent_streams]
147            )
148
149            selected_concurrent_streams = self._select_streams(
150                streams=concurrent_streams, configured_catalog=catalog
151            )
152            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
153            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
154            if selected_concurrent_streams:
155                yield from self._concurrent_source.read(selected_concurrent_streams)
156
157            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
158            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
159            # of which were already synced using the Concurrent CDK
160            filtered_catalog = self._remove_concurrent_streams_from_catalog(
161                catalog=catalog, concurrent_stream_names=concurrent_stream_names
162            )
163        else:
164            filtered_catalog = catalog
165
166        # It is no need run read for synchronous streams if they are not exists.
167        if not filtered_catalog.streams:
168            return
169
170        yield from super().read(logger, config, filtered_catalog, state)
171
172    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
173        concurrent_streams, synchronous_streams = self._group_streams(config=config)
174        return AirbyteCatalog(
175            streams=[
176                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
177            ]
178        )
179
180    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
181        """
182        The `streams` method is used as part of the AbstractSource in the following cases:
183        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
184        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
185        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
186
187        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
188        """
189        return super().streams(config)
190
191    def _group_streams(
192        self, config: Mapping[str, Any]
193    ) -> Tuple[List[AbstractStream], List[Stream]]:
194        concurrent_streams: List[AbstractStream] = []
195        synchronous_streams: List[Stream] = []
196
197        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
198        # and this is validated during the initialization of the source.
199        streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
200            self._source_config, config
201        )
202
203        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
204
205        for declarative_stream in self.streams(config=config):
206            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
207            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
208            # so we need to treat them as synchronous
209
210            if (
211                isinstance(declarative_stream, DeclarativeStream)
212                and name_to_stream_mapping[declarative_stream.name]["type"]
213                == "StateDelegatingStream"
214            ):
215                stream_state = self._connector_state_manager.get_stream_state(
216                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
217                )
218
219                name_to_stream_mapping[declarative_stream.name] = (
220                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
221                    if stream_state
222                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
223                )
224
225            if isinstance(declarative_stream, DeclarativeStream) and (
226                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
227                == "SimpleRetriever"
228                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
229                == "AsyncRetriever"
230            ):
231                incremental_sync_component_definition = name_to_stream_mapping[
232                    declarative_stream.name
233                ].get("incremental_sync")
234
235                partition_router_component_definition = (
236                    name_to_stream_mapping[declarative_stream.name]
237                    .get("retriever", {})
238                    .get("partition_router")
239                )
240                is_without_partition_router_or_cursor = not bool(
241                    incremental_sync_component_definition
242                ) and not bool(partition_router_component_definition)
243
244                is_substream_without_incremental = (
245                    partition_router_component_definition
246                    and not incremental_sync_component_definition
247                )
248
249                if self._is_concurrent_cursor_incremental_without_partition_routing(
250                    declarative_stream, incremental_sync_component_definition
251                ):
252                    stream_state = self._connector_state_manager.get_stream_state(
253                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
254                    )
255                    stream_state = self._migrate_state(declarative_stream, stream_state)
256
257                    retriever = self._get_retriever(declarative_stream, stream_state)
258
259                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
260                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
261                    ):
262                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
263
264                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
265                            # This should never happen since we instantiate ConcurrentCursor in
266                            # model_to_component_factory.py
267                            raise ValueError(
268                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
269                            )
270
271                        partition_generator = StreamSlicerPartitionGenerator(
272                            partition_factory=DeclarativePartitionFactory(
273                                declarative_stream.name,
274                                declarative_stream.get_json_schema(),
275                                retriever,
276                                self.message_repository,
277                            ),
278                            stream_slicer=declarative_stream.retriever.stream_slicer,
279                        )
280                    else:
281                        if (
282                            incremental_sync_component_definition
283                            and incremental_sync_component_definition.get("type")
284                            == IncrementingCountCursorModel.__name__
285                        ):
286                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
287                                model_type=IncrementingCountCursorModel,
288                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
289                                stream_name=declarative_stream.name,
290                                stream_namespace=declarative_stream.namespace,
291                                config=config or {},
292                            )
293                        else:
294                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
295                                model_type=DatetimeBasedCursorModel,
296                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
297                                stream_name=declarative_stream.name,
298                                stream_namespace=declarative_stream.namespace,
299                                config=config or {},
300                            )
301                        partition_generator = StreamSlicerPartitionGenerator(
302                            partition_factory=DeclarativePartitionFactory(
303                                declarative_stream.name,
304                                declarative_stream.get_json_schema(),
305                                retriever,
306                                self.message_repository,
307                            ),
308                            stream_slicer=cursor,
309                        )
310
311                    concurrent_streams.append(
312                        DefaultStream(
313                            partition_generator=partition_generator,
314                            name=declarative_stream.name,
315                            json_schema=declarative_stream.get_json_schema(),
316                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
317                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
318                            cursor_field=cursor.cursor_field.cursor_field_key
319                            if hasattr(cursor, "cursor_field")
320                            and hasattr(
321                                cursor.cursor_field, "cursor_field_key"
322                            )  # FIXME this will need to be updated once we do the per partition
323                            else None,
324                            logger=self.logger,
325                            cursor=cursor,
326                        )
327                    )
328                elif (
329                    is_substream_without_incremental or is_without_partition_router_or_cursor
330                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
331                    partition_generator = StreamSlicerPartitionGenerator(
332                        DeclarativePartitionFactory(
333                            declarative_stream.name,
334                            declarative_stream.get_json_schema(),
335                            declarative_stream.retriever,
336                            self.message_repository,
337                        ),
338                        declarative_stream.retriever.stream_slicer,
339                    )
340
341                    final_state_cursor = FinalStateCursor(
342                        stream_name=declarative_stream.name,
343                        stream_namespace=declarative_stream.namespace,
344                        message_repository=self.message_repository,
345                    )
346
347                    concurrent_streams.append(
348                        DefaultStream(
349                            partition_generator=partition_generator,
350                            name=declarative_stream.name,
351                            json_schema=declarative_stream.get_json_schema(),
352                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
353                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
354                            cursor_field=None,
355                            logger=self.logger,
356                            cursor=final_state_cursor,
357                        )
358                    )
359                elif (
360                    incremental_sync_component_definition
361                    and incremental_sync_component_definition.get("type", "")
362                    == DatetimeBasedCursorModel.__name__
363                    and hasattr(declarative_stream.retriever, "stream_slicer")
364                    and isinstance(
365                        declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
366                    )
367                ):
368                    stream_state = self._connector_state_manager.get_stream_state(
369                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
370                    )
371                    stream_state = self._migrate_state(declarative_stream, stream_state)
372
373                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
374
375                    perpartition_cursor = (
376                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
377                            state_manager=self._connector_state_manager,
378                            model_type=DatetimeBasedCursorModel,
379                            component_definition=incremental_sync_component_definition,
380                            stream_name=declarative_stream.name,
381                            stream_namespace=declarative_stream.namespace,
382                            config=config or {},
383                            stream_state=stream_state,
384                            partition_router=partition_router,
385                        )
386                    )
387
388                    retriever = self._get_retriever(declarative_stream, stream_state)
389
390                    partition_generator = StreamSlicerPartitionGenerator(
391                        DeclarativePartitionFactory(
392                            declarative_stream.name,
393                            declarative_stream.get_json_schema(),
394                            retriever,
395                            self.message_repository,
396                        ),
397                        perpartition_cursor,
398                    )
399
400                    concurrent_streams.append(
401                        DefaultStream(
402                            partition_generator=partition_generator,
403                            name=declarative_stream.name,
404                            json_schema=declarative_stream.get_json_schema(),
405                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
406                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
407                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
408                            logger=self.logger,
409                            cursor=perpartition_cursor,
410                        )
411                    )
412                else:
413                    synchronous_streams.append(declarative_stream)
414            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
415            # Condition below needs to ensure that concurrent support is not lost for sources that already support
416            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
417            elif (
418                isinstance(declarative_stream, AbstractStreamFacade)
419                and self.is_partially_declarative
420            ):
421                concurrent_streams.append(declarative_stream.get_underlying_stream())
422            else:
423                synchronous_streams.append(declarative_stream)
424
425        return concurrent_streams, synchronous_streams
426
427    def _is_concurrent_cursor_incremental_without_partition_routing(
428        self,
429        declarative_stream: DeclarativeStream,
430        incremental_sync_component_definition: Mapping[str, Any] | None,
431    ) -> bool:
432        return (
433            incremental_sync_component_definition is not None
434            and bool(incremental_sync_component_definition)
435            and (
436                incremental_sync_component_definition.get("type", "")
437                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
438            )
439            and hasattr(declarative_stream.retriever, "stream_slicer")
440            and (
441                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
442                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
443                # add isintance check here if we want to create a Declarative IncrementingCountCursor
444                # or isinstance(
445                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
446                # )
447                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
448            )
449        )
450
451    @staticmethod
452    def _get_retriever(
453        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
454    ) -> Retriever:
455        retriever = declarative_stream.retriever
456
457        # This is an optimization so that we don't invoke any cursor or state management flows within the
458        # low-code framework because state management is handled through the ConcurrentCursor.
459        if declarative_stream and isinstance(retriever, SimpleRetriever):
460            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
461            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
462            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
463            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
464            # properly initialized with state.
465            if retriever.cursor:
466                retriever.cursor.set_initial_state(stream_state=stream_state)
467
468            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
469            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
470            # for semi-incremental streams using is_client_side_incremental to filter properly
471            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
472                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
473            ):
474                retriever.record_selector.record_filter._cursor.set_initial_state(
475                    stream_state=stream_state
476                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
477
478            # We zero it out here, but since this is a cursor reference, the state is still properly
479            # instantiated for the other components that reference it
480            retriever.cursor = None
481
482        return retriever
483
484    @staticmethod
485    def _select_streams(
486        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
487    ) -> List[AbstractStream]:
488        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
489        abstract_streams: List[AbstractStream] = []
490        for configured_stream in configured_catalog.streams:
491            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
492            if stream_instance:
493                abstract_streams.append(stream_instance)
494
495        return abstract_streams
496
497    @staticmethod
498    def _remove_concurrent_streams_from_catalog(
499        catalog: ConfiguredAirbyteCatalog,
500        concurrent_stream_names: set[str],
501    ) -> ConfiguredAirbyteCatalog:
502        return ConfiguredAirbyteCatalog(
503            streams=[
504                stream
505                for stream in catalog.streams
506                if stream.stream.name not in concurrent_stream_names
507            ]
508        )
509
510    @staticmethod
511    def _migrate_state(
512        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
513    ) -> MutableMapping[str, Any]:
514        for state_migration in declarative_stream.state_migrations:
515            if state_migration.should_migrate(stream_state):
516                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
517                stream_state = dict(state_migration.migrate(stream_state))
518
519        return stream_state

Declarative source defined by a manifest of low-code components that define source connector behavior

is_partially_declarative: bool
127    @property
128    def is_partially_declarative(self) -> bool:
129        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
130        return False

This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
172    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
173        concurrent_streams, synchronous_streams = self._group_streams(config=config)
174        return AirbyteCatalog(
175            streams=[
176                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
177            ]
178        )

Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.