airbyte_cdk.sources.declarative.concurrent_declarative_source

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
  7
  8from airbyte_cdk.models import (
  9    AirbyteCatalog,
 10    AirbyteMessage,
 11    AirbyteStateMessage,
 12    ConfiguredAirbyteCatalog,
 13)
 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
 18from airbyte_cdk.sources.declarative.extractors import RecordSelector
 19from airbyte_cdk.sources.declarative.extractors.record_filter import (
 20    ClientSideIncrementalRecordFilterDecorator,
 21)
 22from airbyte_cdk.sources.declarative.incremental import (
 23    ConcurrentPerPartitionCursor,
 24    GlobalSubstreamCursor,
 25)
 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
 28    PerPartitionWithGlobalCursor,
 29)
 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
 31from airbyte_cdk.sources.declarative.models import FileUploader
 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 33    ConcurrencyLevel as ConcurrencyLevelModel,
 34)
 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 36    DatetimeBasedCursor as DatetimeBasedCursorModel,
 37)
 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 39    IncrementingCountCursor as IncrementingCountCursorModel,
 40)
 41from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 42    ModelToComponentFactory,
 43)
 44from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
 45from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
 46from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
 47    DeclarativePartitionFactory,
 48    StreamSlicerPartitionGenerator,
 49)
 50from airbyte_cdk.sources.declarative.types import ConnectionDefinition
 51from airbyte_cdk.sources.source import TState
 52from airbyte_cdk.sources.streams import Stream
 53from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 54from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 55from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
 56    AlwaysAvailableAvailabilityStrategy,
 57)
 58from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
 59from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 60from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
 61
 62
 63class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 64    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 65    # because it has hit the limit of futures but not partition reader is consuming them.
 66    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 67
 68    def __init__(
 69        self,
 70        catalog: Optional[ConfiguredAirbyteCatalog],
 71        config: Optional[Mapping[str, Any]],
 72        state: TState,
 73        source_config: ConnectionDefinition,
 74        debug: bool = False,
 75        emit_connector_builder_messages: bool = False,
 76        component_factory: Optional[ModelToComponentFactory] = None,
 77        config_path: Optional[str] = None,
 78        **kwargs: Any,
 79    ) -> None:
 80        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 81        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 82        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 83
 84        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 85        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 86        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 87        # incremental streams running in full refresh.
 88        component_factory = component_factory or ModelToComponentFactory(
 89            emit_connector_builder_messages=emit_connector_builder_messages,
 90            disable_resumable_full_refresh=True,
 91            connector_state_manager=self._connector_state_manager,
 92        )
 93
 94        super().__init__(
 95            source_config=source_config,
 96            config=config,
 97            debug=debug,
 98            emit_connector_builder_messages=emit_connector_builder_messages,
 99            component_factory=component_factory,
100            config_path=config_path,
101        )
102
103        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
104        if concurrency_level_from_manifest:
105            concurrency_level_component = self._constructor.create_component(
106                model_type=ConcurrencyLevelModel,
107                component_definition=concurrency_level_from_manifest,
108                config=config or {},
109            )
110            if not isinstance(concurrency_level_component, ConcurrencyLevel):
111                raise ValueError(
112                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
113                )
114
115            concurrency_level = concurrency_level_component.get_concurrency_level()
116            initial_number_of_partitions_to_generate = max(
117                concurrency_level // 2, 1
118            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
119        else:
120            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
121            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
122
123        self._concurrent_source = ConcurrentSource.create(
124            num_workers=concurrency_level,
125            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
126            logger=self.logger,
127            slice_logger=self._slice_logger,
128            message_repository=self.message_repository,
129        )
130
131    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
132    @property
133    def is_partially_declarative(self) -> bool:
134        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
135        return False
136
137    def read(
138        self,
139        logger: logging.Logger,
140        config: Mapping[str, Any],
141        catalog: ConfiguredAirbyteCatalog,
142        state: Optional[List[AirbyteStateMessage]] = None,
143    ) -> Iterator[AirbyteMessage]:
144        concurrent_streams, _ = self._group_streams(config=config)
145
146        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
147        # the concurrent streams must be saved so that they can be removed from the catalog before starting
148        # synchronous streams
149        if len(concurrent_streams) > 0:
150            concurrent_stream_names = set(
151                [concurrent_stream.name for concurrent_stream in concurrent_streams]
152            )
153
154            selected_concurrent_streams = self._select_streams(
155                streams=concurrent_streams, configured_catalog=catalog
156            )
157            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
158            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
159            if selected_concurrent_streams:
160                yield from self._concurrent_source.read(selected_concurrent_streams)
161
162            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
163            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
164            # of which were already synced using the Concurrent CDK
165            filtered_catalog = self._remove_concurrent_streams_from_catalog(
166                catalog=catalog, concurrent_stream_names=concurrent_stream_names
167            )
168        else:
169            filtered_catalog = catalog
170
171        # It is no need run read for synchronous streams if they are not exists.
172        if not filtered_catalog.streams:
173            return
174
175        yield from super().read(logger, config, filtered_catalog, state)
176
177    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
178        concurrent_streams, synchronous_streams = self._group_streams(config=config)
179        return AirbyteCatalog(
180            streams=[
181                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
182            ]
183        )
184
185    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
186        """
187        The `streams` method is used as part of the AbstractSource in the following cases:
188        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
189        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
190        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
191
192        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
193        """
194        return super().streams(config)
195
196    def _group_streams(
197        self, config: Mapping[str, Any]
198    ) -> Tuple[List[AbstractStream], List[Stream]]:
199        concurrent_streams: List[AbstractStream] = []
200        synchronous_streams: List[Stream] = []
201
202        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
203        # and this is validated during the initialization of the source.
204        streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
205            self._source_config, config
206        )
207
208        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
209
210        for declarative_stream in self.streams(config=config):
211            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
212            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
213            # so we need to treat them as synchronous
214
215            supports_file_transfer = (
216                isinstance(declarative_stream, DeclarativeStream)
217                and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
218            )
219
220            if (
221                isinstance(declarative_stream, DeclarativeStream)
222                and name_to_stream_mapping[declarative_stream.name]["type"]
223                == "StateDelegatingStream"
224            ):
225                stream_state = self._connector_state_manager.get_stream_state(
226                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
227                )
228
229                name_to_stream_mapping[declarative_stream.name] = (
230                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
231                    if stream_state
232                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
233                )
234
235            if isinstance(declarative_stream, DeclarativeStream) and (
236                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
237                == "SimpleRetriever"
238                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
239                == "AsyncRetriever"
240            ):
241                incremental_sync_component_definition = name_to_stream_mapping[
242                    declarative_stream.name
243                ].get("incremental_sync")
244
245                partition_router_component_definition = (
246                    name_to_stream_mapping[declarative_stream.name]
247                    .get("retriever", {})
248                    .get("partition_router")
249                )
250                is_without_partition_router_or_cursor = not bool(
251                    incremental_sync_component_definition
252                ) and not bool(partition_router_component_definition)
253
254                is_substream_without_incremental = (
255                    partition_router_component_definition
256                    and not incremental_sync_component_definition
257                )
258
259                if self._is_concurrent_cursor_incremental_without_partition_routing(
260                    declarative_stream, incremental_sync_component_definition
261                ):
262                    stream_state = self._connector_state_manager.get_stream_state(
263                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
264                    )
265                    stream_state = self._migrate_state(declarative_stream, stream_state)
266
267                    retriever = self._get_retriever(declarative_stream, stream_state)
268
269                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
270                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
271                    ):
272                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
273
274                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
275                            # This should never happen since we instantiate ConcurrentCursor in
276                            # model_to_component_factory.py
277                            raise ValueError(
278                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
279                            )
280
281                        partition_generator = StreamSlicerPartitionGenerator(
282                            partition_factory=DeclarativePartitionFactory(
283                                declarative_stream.name,
284                                declarative_stream.get_json_schema(),
285                                retriever,
286                                self.message_repository,
287                            ),
288                            stream_slicer=declarative_stream.retriever.stream_slicer,
289                        )
290                    else:
291                        if (
292                            incremental_sync_component_definition
293                            and incremental_sync_component_definition.get("type")
294                            == IncrementingCountCursorModel.__name__
295                        ):
296                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
297                                model_type=IncrementingCountCursorModel,
298                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
299                                stream_name=declarative_stream.name,
300                                stream_namespace=declarative_stream.namespace,
301                                config=config or {},
302                            )
303                        else:
304                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
305                                model_type=DatetimeBasedCursorModel,
306                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
307                                stream_name=declarative_stream.name,
308                                stream_namespace=declarative_stream.namespace,
309                                config=config or {},
310                                stream_state_migrations=declarative_stream.state_migrations,
311                            )
312                        partition_generator = StreamSlicerPartitionGenerator(
313                            partition_factory=DeclarativePartitionFactory(
314                                declarative_stream.name,
315                                declarative_stream.get_json_schema(),
316                                retriever,
317                                self.message_repository,
318                            ),
319                            stream_slicer=cursor,
320                        )
321
322                    concurrent_streams.append(
323                        DefaultStream(
324                            partition_generator=partition_generator,
325                            name=declarative_stream.name,
326                            json_schema=declarative_stream.get_json_schema(),
327                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
328                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
329                            cursor_field=cursor.cursor_field.cursor_field_key
330                            if hasattr(cursor, "cursor_field")
331                            and hasattr(
332                                cursor.cursor_field, "cursor_field_key"
333                            )  # FIXME this will need to be updated once we do the per partition
334                            else None,
335                            logger=self.logger,
336                            cursor=cursor,
337                            supports_file_transfer=supports_file_transfer,
338                        )
339                    )
340                elif (
341                    is_substream_without_incremental or is_without_partition_router_or_cursor
342                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
343                    partition_generator = StreamSlicerPartitionGenerator(
344                        DeclarativePartitionFactory(
345                            declarative_stream.name,
346                            declarative_stream.get_json_schema(),
347                            declarative_stream.retriever,
348                            self.message_repository,
349                        ),
350                        declarative_stream.retriever.stream_slicer,
351                    )
352
353                    final_state_cursor = FinalStateCursor(
354                        stream_name=declarative_stream.name,
355                        stream_namespace=declarative_stream.namespace,
356                        message_repository=self.message_repository,
357                    )
358
359                    concurrent_streams.append(
360                        DefaultStream(
361                            partition_generator=partition_generator,
362                            name=declarative_stream.name,
363                            json_schema=declarative_stream.get_json_schema(),
364                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
365                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
366                            cursor_field=None,
367                            logger=self.logger,
368                            cursor=final_state_cursor,
369                            supports_file_transfer=supports_file_transfer,
370                        )
371                    )
372                elif (
373                    incremental_sync_component_definition
374                    and incremental_sync_component_definition.get("type", "")
375                    == DatetimeBasedCursorModel.__name__
376                    and hasattr(declarative_stream.retriever, "stream_slicer")
377                    and isinstance(
378                        declarative_stream.retriever.stream_slicer,
379                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
380                    )
381                ):
382                    stream_state = self._connector_state_manager.get_stream_state(
383                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
384                    )
385                    stream_state = self._migrate_state(declarative_stream, stream_state)
386
387                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
388
389                    perpartition_cursor = (
390                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
391                            state_manager=self._connector_state_manager,
392                            model_type=DatetimeBasedCursorModel,
393                            component_definition=incremental_sync_component_definition,
394                            stream_name=declarative_stream.name,
395                            stream_namespace=declarative_stream.namespace,
396                            config=config or {},
397                            stream_state=stream_state,
398                            partition_router=partition_router,
399                        )
400                    )
401
402                    retriever = self._get_retriever(declarative_stream, stream_state)
403
404                    partition_generator = StreamSlicerPartitionGenerator(
405                        DeclarativePartitionFactory(
406                            declarative_stream.name,
407                            declarative_stream.get_json_schema(),
408                            retriever,
409                            self.message_repository,
410                        ),
411                        perpartition_cursor,
412                    )
413
414                    concurrent_streams.append(
415                        DefaultStream(
416                            partition_generator=partition_generator,
417                            name=declarative_stream.name,
418                            json_schema=declarative_stream.get_json_schema(),
419                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
420                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
421                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
422                            logger=self.logger,
423                            cursor=perpartition_cursor,
424                            supports_file_transfer=supports_file_transfer,
425                        )
426                    )
427                else:
428                    synchronous_streams.append(declarative_stream)
429            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
430            # Condition below needs to ensure that concurrent support is not lost for sources that already support
431            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
432            elif (
433                isinstance(declarative_stream, AbstractStreamFacade)
434                and self.is_partially_declarative
435            ):
436                concurrent_streams.append(declarative_stream.get_underlying_stream())
437            else:
438                synchronous_streams.append(declarative_stream)
439
440        return concurrent_streams, synchronous_streams
441
442    def _is_concurrent_cursor_incremental_without_partition_routing(
443        self,
444        declarative_stream: DeclarativeStream,
445        incremental_sync_component_definition: Mapping[str, Any] | None,
446    ) -> bool:
447        return (
448            incremental_sync_component_definition is not None
449            and bool(incremental_sync_component_definition)
450            and (
451                incremental_sync_component_definition.get("type", "")
452                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
453            )
454            and hasattr(declarative_stream.retriever, "stream_slicer")
455            and (
456                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
457                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
458                # add isintance check here if we want to create a Declarative IncrementingCountCursor
459                # or isinstance(
460                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
461                # )
462                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
463            )
464        )
465
466    @staticmethod
467    def _get_retriever(
468        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
469    ) -> Retriever:
470        retriever = declarative_stream.retriever
471
472        # This is an optimization so that we don't invoke any cursor or state management flows within the
473        # low-code framework because state management is handled through the ConcurrentCursor.
474        if declarative_stream and isinstance(retriever, SimpleRetriever):
475            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
476            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
477            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
478            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
479            # properly initialized with state.
480            if retriever.cursor:
481                retriever.cursor.set_initial_state(stream_state=stream_state)
482
483            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
484            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
485            # for semi-incremental streams using is_client_side_incremental to filter properly
486            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
487                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
488            ):
489                retriever.record_selector.record_filter._cursor.set_initial_state(
490                    stream_state=stream_state
491                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
492
493            # We zero it out here, but since this is a cursor reference, the state is still properly
494            # instantiated for the other components that reference it
495            retriever.cursor = None
496
497        return retriever
498
499    @staticmethod
500    def _select_streams(
501        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
502    ) -> List[AbstractStream]:
503        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
504        abstract_streams: List[AbstractStream] = []
505        for configured_stream in configured_catalog.streams:
506            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
507            if stream_instance:
508                abstract_streams.append(stream_instance)
509
510        return abstract_streams
511
512    @staticmethod
513    def _remove_concurrent_streams_from_catalog(
514        catalog: ConfiguredAirbyteCatalog,
515        concurrent_stream_names: set[str],
516    ) -> ConfiguredAirbyteCatalog:
517        return ConfiguredAirbyteCatalog(
518            streams=[
519                stream
520                for stream in catalog.streams
521                if stream.stream.name not in concurrent_stream_names
522            ]
523        )
524
525    @staticmethod
526    def _migrate_state(
527        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
528    ) -> MutableMapping[str, Any]:
529        for state_migration in declarative_stream.state_migrations:
530            if state_migration.should_migrate(stream_state):
531                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
532                stream_state = dict(state_migration.migrate(stream_state))
533
534        return stream_state
class ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
 64class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 65    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 66    # because it has hit the limit of futures but not partition reader is consuming them.
 67    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 68
 69    def __init__(
 70        self,
 71        catalog: Optional[ConfiguredAirbyteCatalog],
 72        config: Optional[Mapping[str, Any]],
 73        state: TState,
 74        source_config: ConnectionDefinition,
 75        debug: bool = False,
 76        emit_connector_builder_messages: bool = False,
 77        component_factory: Optional[ModelToComponentFactory] = None,
 78        config_path: Optional[str] = None,
 79        **kwargs: Any,
 80    ) -> None:
 81        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 82        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 83        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 84
 85        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 86        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 87        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 88        # incremental streams running in full refresh.
 89        component_factory = component_factory or ModelToComponentFactory(
 90            emit_connector_builder_messages=emit_connector_builder_messages,
 91            disable_resumable_full_refresh=True,
 92            connector_state_manager=self._connector_state_manager,
 93        )
 94
 95        super().__init__(
 96            source_config=source_config,
 97            config=config,
 98            debug=debug,
 99            emit_connector_builder_messages=emit_connector_builder_messages,
100            component_factory=component_factory,
101            config_path=config_path,
102        )
103
104        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
105        if concurrency_level_from_manifest:
106            concurrency_level_component = self._constructor.create_component(
107                model_type=ConcurrencyLevelModel,
108                component_definition=concurrency_level_from_manifest,
109                config=config or {},
110            )
111            if not isinstance(concurrency_level_component, ConcurrencyLevel):
112                raise ValueError(
113                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
114                )
115
116            concurrency_level = concurrency_level_component.get_concurrency_level()
117            initial_number_of_partitions_to_generate = max(
118                concurrency_level // 2, 1
119            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
120        else:
121            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
122            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
123
124        self._concurrent_source = ConcurrentSource.create(
125            num_workers=concurrency_level,
126            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
127            logger=self.logger,
128            slice_logger=self._slice_logger,
129            message_repository=self.message_repository,
130        )
131
132    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
133    @property
134    def is_partially_declarative(self) -> bool:
135        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
136        return False
137
138    def read(
139        self,
140        logger: logging.Logger,
141        config: Mapping[str, Any],
142        catalog: ConfiguredAirbyteCatalog,
143        state: Optional[List[AirbyteStateMessage]] = None,
144    ) -> Iterator[AirbyteMessage]:
145        concurrent_streams, _ = self._group_streams(config=config)
146
147        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
148        # the concurrent streams must be saved so that they can be removed from the catalog before starting
149        # synchronous streams
150        if len(concurrent_streams) > 0:
151            concurrent_stream_names = set(
152                [concurrent_stream.name for concurrent_stream in concurrent_streams]
153            )
154
155            selected_concurrent_streams = self._select_streams(
156                streams=concurrent_streams, configured_catalog=catalog
157            )
158            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
159            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
160            if selected_concurrent_streams:
161                yield from self._concurrent_source.read(selected_concurrent_streams)
162
163            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
164            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
165            # of which were already synced using the Concurrent CDK
166            filtered_catalog = self._remove_concurrent_streams_from_catalog(
167                catalog=catalog, concurrent_stream_names=concurrent_stream_names
168            )
169        else:
170            filtered_catalog = catalog
171
172        # It is no need run read for synchronous streams if they are not exists.
173        if not filtered_catalog.streams:
174            return
175
176        yield from super().read(logger, config, filtered_catalog, state)
177
178    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
179        concurrent_streams, synchronous_streams = self._group_streams(config=config)
180        return AirbyteCatalog(
181            streams=[
182                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
183            ]
184        )
185
186    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
187        """
188        The `streams` method is used as part of the AbstractSource in the following cases:
189        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
190        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
191        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
192
193        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
194        """
195        return super().streams(config)
196
197    def _group_streams(
198        self, config: Mapping[str, Any]
199    ) -> Tuple[List[AbstractStream], List[Stream]]:
200        concurrent_streams: List[AbstractStream] = []
201        synchronous_streams: List[Stream] = []
202
203        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
204        # and this is validated during the initialization of the source.
205        streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
206            self._source_config, config
207        )
208
209        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
210
211        for declarative_stream in self.streams(config=config):
212            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
213            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
214            # so we need to treat them as synchronous
215
216            supports_file_transfer = (
217                isinstance(declarative_stream, DeclarativeStream)
218                and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
219            )
220
221            if (
222                isinstance(declarative_stream, DeclarativeStream)
223                and name_to_stream_mapping[declarative_stream.name]["type"]
224                == "StateDelegatingStream"
225            ):
226                stream_state = self._connector_state_manager.get_stream_state(
227                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
228                )
229
230                name_to_stream_mapping[declarative_stream.name] = (
231                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
232                    if stream_state
233                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
234                )
235
236            if isinstance(declarative_stream, DeclarativeStream) and (
237                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
238                == "SimpleRetriever"
239                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
240                == "AsyncRetriever"
241            ):
242                incremental_sync_component_definition = name_to_stream_mapping[
243                    declarative_stream.name
244                ].get("incremental_sync")
245
246                partition_router_component_definition = (
247                    name_to_stream_mapping[declarative_stream.name]
248                    .get("retriever", {})
249                    .get("partition_router")
250                )
251                is_without_partition_router_or_cursor = not bool(
252                    incremental_sync_component_definition
253                ) and not bool(partition_router_component_definition)
254
255                is_substream_without_incremental = (
256                    partition_router_component_definition
257                    and not incremental_sync_component_definition
258                )
259
260                if self._is_concurrent_cursor_incremental_without_partition_routing(
261                    declarative_stream, incremental_sync_component_definition
262                ):
263                    stream_state = self._connector_state_manager.get_stream_state(
264                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
265                    )
266                    stream_state = self._migrate_state(declarative_stream, stream_state)
267
268                    retriever = self._get_retriever(declarative_stream, stream_state)
269
270                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
271                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
272                    ):
273                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
274
275                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
276                            # This should never happen since we instantiate ConcurrentCursor in
277                            # model_to_component_factory.py
278                            raise ValueError(
279                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
280                            )
281
282                        partition_generator = StreamSlicerPartitionGenerator(
283                            partition_factory=DeclarativePartitionFactory(
284                                declarative_stream.name,
285                                declarative_stream.get_json_schema(),
286                                retriever,
287                                self.message_repository,
288                            ),
289                            stream_slicer=declarative_stream.retriever.stream_slicer,
290                        )
291                    else:
292                        if (
293                            incremental_sync_component_definition
294                            and incremental_sync_component_definition.get("type")
295                            == IncrementingCountCursorModel.__name__
296                        ):
297                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
298                                model_type=IncrementingCountCursorModel,
299                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
300                                stream_name=declarative_stream.name,
301                                stream_namespace=declarative_stream.namespace,
302                                config=config or {},
303                            )
304                        else:
305                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
306                                model_type=DatetimeBasedCursorModel,
307                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
308                                stream_name=declarative_stream.name,
309                                stream_namespace=declarative_stream.namespace,
310                                config=config or {},
311                                stream_state_migrations=declarative_stream.state_migrations,
312                            )
313                        partition_generator = StreamSlicerPartitionGenerator(
314                            partition_factory=DeclarativePartitionFactory(
315                                declarative_stream.name,
316                                declarative_stream.get_json_schema(),
317                                retriever,
318                                self.message_repository,
319                            ),
320                            stream_slicer=cursor,
321                        )
322
323                    concurrent_streams.append(
324                        DefaultStream(
325                            partition_generator=partition_generator,
326                            name=declarative_stream.name,
327                            json_schema=declarative_stream.get_json_schema(),
328                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
329                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
330                            cursor_field=cursor.cursor_field.cursor_field_key
331                            if hasattr(cursor, "cursor_field")
332                            and hasattr(
333                                cursor.cursor_field, "cursor_field_key"
334                            )  # FIXME this will need to be updated once we do the per partition
335                            else None,
336                            logger=self.logger,
337                            cursor=cursor,
338                            supports_file_transfer=supports_file_transfer,
339                        )
340                    )
341                elif (
342                    is_substream_without_incremental or is_without_partition_router_or_cursor
343                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
344                    partition_generator = StreamSlicerPartitionGenerator(
345                        DeclarativePartitionFactory(
346                            declarative_stream.name,
347                            declarative_stream.get_json_schema(),
348                            declarative_stream.retriever,
349                            self.message_repository,
350                        ),
351                        declarative_stream.retriever.stream_slicer,
352                    )
353
354                    final_state_cursor = FinalStateCursor(
355                        stream_name=declarative_stream.name,
356                        stream_namespace=declarative_stream.namespace,
357                        message_repository=self.message_repository,
358                    )
359
360                    concurrent_streams.append(
361                        DefaultStream(
362                            partition_generator=partition_generator,
363                            name=declarative_stream.name,
364                            json_schema=declarative_stream.get_json_schema(),
365                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
366                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
367                            cursor_field=None,
368                            logger=self.logger,
369                            cursor=final_state_cursor,
370                            supports_file_transfer=supports_file_transfer,
371                        )
372                    )
373                elif (
374                    incremental_sync_component_definition
375                    and incremental_sync_component_definition.get("type", "")
376                    == DatetimeBasedCursorModel.__name__
377                    and hasattr(declarative_stream.retriever, "stream_slicer")
378                    and isinstance(
379                        declarative_stream.retriever.stream_slicer,
380                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
381                    )
382                ):
383                    stream_state = self._connector_state_manager.get_stream_state(
384                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
385                    )
386                    stream_state = self._migrate_state(declarative_stream, stream_state)
387
388                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
389
390                    perpartition_cursor = (
391                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
392                            state_manager=self._connector_state_manager,
393                            model_type=DatetimeBasedCursorModel,
394                            component_definition=incremental_sync_component_definition,
395                            stream_name=declarative_stream.name,
396                            stream_namespace=declarative_stream.namespace,
397                            config=config or {},
398                            stream_state=stream_state,
399                            partition_router=partition_router,
400                        )
401                    )
402
403                    retriever = self._get_retriever(declarative_stream, stream_state)
404
405                    partition_generator = StreamSlicerPartitionGenerator(
406                        DeclarativePartitionFactory(
407                            declarative_stream.name,
408                            declarative_stream.get_json_schema(),
409                            retriever,
410                            self.message_repository,
411                        ),
412                        perpartition_cursor,
413                    )
414
415                    concurrent_streams.append(
416                        DefaultStream(
417                            partition_generator=partition_generator,
418                            name=declarative_stream.name,
419                            json_schema=declarative_stream.get_json_schema(),
420                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
421                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
422                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
423                            logger=self.logger,
424                            cursor=perpartition_cursor,
425                            supports_file_transfer=supports_file_transfer,
426                        )
427                    )
428                else:
429                    synchronous_streams.append(declarative_stream)
430            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
431            # Condition below needs to ensure that concurrent support is not lost for sources that already support
432            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
433            elif (
434                isinstance(declarative_stream, AbstractStreamFacade)
435                and self.is_partially_declarative
436            ):
437                concurrent_streams.append(declarative_stream.get_underlying_stream())
438            else:
439                synchronous_streams.append(declarative_stream)
440
441        return concurrent_streams, synchronous_streams
442
443    def _is_concurrent_cursor_incremental_without_partition_routing(
444        self,
445        declarative_stream: DeclarativeStream,
446        incremental_sync_component_definition: Mapping[str, Any] | None,
447    ) -> bool:
448        return (
449            incremental_sync_component_definition is not None
450            and bool(incremental_sync_component_definition)
451            and (
452                incremental_sync_component_definition.get("type", "")
453                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
454            )
455            and hasattr(declarative_stream.retriever, "stream_slicer")
456            and (
457                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
458                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
459                # add isintance check here if we want to create a Declarative IncrementingCountCursor
460                # or isinstance(
461                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
462                # )
463                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
464            )
465        )
466
467    @staticmethod
468    def _get_retriever(
469        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
470    ) -> Retriever:
471        retriever = declarative_stream.retriever
472
473        # This is an optimization so that we don't invoke any cursor or state management flows within the
474        # low-code framework because state management is handled through the ConcurrentCursor.
475        if declarative_stream and isinstance(retriever, SimpleRetriever):
476            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
477            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
478            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
479            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
480            # properly initialized with state.
481            if retriever.cursor:
482                retriever.cursor.set_initial_state(stream_state=stream_state)
483
484            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
485            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
486            # for semi-incremental streams using is_client_side_incremental to filter properly
487            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
488                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
489            ):
490                retriever.record_selector.record_filter._cursor.set_initial_state(
491                    stream_state=stream_state
492                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
493
494            # We zero it out here, but since this is a cursor reference, the state is still properly
495            # instantiated for the other components that reference it
496            retriever.cursor = None
497
498        return retriever
499
500    @staticmethod
501    def _select_streams(
502        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
503    ) -> List[AbstractStream]:
504        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
505        abstract_streams: List[AbstractStream] = []
506        for configured_stream in configured_catalog.streams:
507            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
508            if stream_instance:
509                abstract_streams.append(stream_instance)
510
511        return abstract_streams
512
513    @staticmethod
514    def _remove_concurrent_streams_from_catalog(
515        catalog: ConfiguredAirbyteCatalog,
516        concurrent_stream_names: set[str],
517    ) -> ConfiguredAirbyteCatalog:
518        return ConfiguredAirbyteCatalog(
519            streams=[
520                stream
521                for stream in catalog.streams
522                if stream.stream.name not in concurrent_stream_names
523            ]
524        )
525
526    @staticmethod
527    def _migrate_state(
528        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
529    ) -> MutableMapping[str, Any]:
530        for state_migration in declarative_stream.state_migrations:
531            if state_migration.should_migrate(stream_state):
532                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
533                stream_state = dict(state_migration.migrate(stream_state))
534
535        return stream_state

Declarative source defined by a manifest of low-code components that define source connector behavior

is_partially_declarative: bool
133    @property
134    def is_partially_declarative(self) -> bool:
135        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
136        return False

This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
178    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
179        concurrent_streams, synchronous_streams = self._group_streams(config=config)
180        return AirbyteCatalog(
181            streams=[
182                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
183            ]
184        )

Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.