airbyte_cdk.sources.declarative.concurrent_declarative_source

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
  7
  8from airbyte_cdk.models import (
  9    AirbyteCatalog,
 10    AirbyteMessage,
 11    AirbyteStateMessage,
 12    ConfiguredAirbyteCatalog,
 13)
 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
 18from airbyte_cdk.sources.declarative.extractors import RecordSelector
 19from airbyte_cdk.sources.declarative.extractors.record_filter import (
 20    ClientSideIncrementalRecordFilterDecorator,
 21)
 22from airbyte_cdk.sources.declarative.incremental import (
 23    ConcurrentPerPartitionCursor,
 24    GlobalSubstreamCursor,
 25)
 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
 28    PerPartitionWithGlobalCursor,
 29)
 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
 31from airbyte_cdk.sources.declarative.models import FileUploader
 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 33    ConcurrencyLevel as ConcurrencyLevelModel,
 34)
 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 36    DatetimeBasedCursor as DatetimeBasedCursorModel,
 37)
 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 39    IncrementingCountCursor as IncrementingCountCursorModel,
 40)
 41from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 42    ModelToComponentFactory,
 43)
 44from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
 45from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
 46from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
 47    DeclarativePartitionFactory,
 48    StreamSlicerPartitionGenerator,
 49)
 50from airbyte_cdk.sources.declarative.types import ConnectionDefinition
 51from airbyte_cdk.sources.source import TState
 52from airbyte_cdk.sources.streams import Stream
 53from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 54from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 55from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
 56    AlwaysAvailableAvailabilityStrategy,
 57)
 58from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
 59from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 60from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
 61
 62
 63class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 64    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 65    # because it has hit the limit of futures but not partition reader is consuming them.
 66    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 67
 68    def __init__(
 69        self,
 70        catalog: Optional[ConfiguredAirbyteCatalog],
 71        config: Optional[Mapping[str, Any]],
 72        state: TState,
 73        source_config: ConnectionDefinition,
 74        debug: bool = False,
 75        emit_connector_builder_messages: bool = False,
 76        component_factory: Optional[ModelToComponentFactory] = None,
 77        **kwargs: Any,
 78    ) -> None:
 79        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 80        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 81        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 82
 83        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 84        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 85        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 86        # incremental streams running in full refresh.
 87        component_factory = component_factory or ModelToComponentFactory(
 88            emit_connector_builder_messages=emit_connector_builder_messages,
 89            disable_resumable_full_refresh=True,
 90            connector_state_manager=self._connector_state_manager,
 91        )
 92
 93        super().__init__(
 94            source_config=source_config,
 95            config=config,
 96            debug=debug,
 97            emit_connector_builder_messages=emit_connector_builder_messages,
 98            component_factory=component_factory,
 99        )
100
101        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
102        if concurrency_level_from_manifest:
103            concurrency_level_component = self._constructor.create_component(
104                model_type=ConcurrencyLevelModel,
105                component_definition=concurrency_level_from_manifest,
106                config=config or {},
107            )
108            if not isinstance(concurrency_level_component, ConcurrencyLevel):
109                raise ValueError(
110                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
111                )
112
113            concurrency_level = concurrency_level_component.get_concurrency_level()
114            initial_number_of_partitions_to_generate = max(
115                concurrency_level // 2, 1
116            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
117        else:
118            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
119            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
120
121        self._concurrent_source = ConcurrentSource.create(
122            num_workers=concurrency_level,
123            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
124            logger=self.logger,
125            slice_logger=self._slice_logger,
126            message_repository=self.message_repository,
127        )
128
129    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
130    @property
131    def is_partially_declarative(self) -> bool:
132        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
133        return False
134
135    def read(
136        self,
137        logger: logging.Logger,
138        config: Mapping[str, Any],
139        catalog: ConfiguredAirbyteCatalog,
140        state: Optional[List[AirbyteStateMessage]] = None,
141    ) -> Iterator[AirbyteMessage]:
142        concurrent_streams, _ = self._group_streams(config=config)
143
144        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
145        # the concurrent streams must be saved so that they can be removed from the catalog before starting
146        # synchronous streams
147        if len(concurrent_streams) > 0:
148            concurrent_stream_names = set(
149                [concurrent_stream.name for concurrent_stream in concurrent_streams]
150            )
151
152            selected_concurrent_streams = self._select_streams(
153                streams=concurrent_streams, configured_catalog=catalog
154            )
155            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
156            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
157            if selected_concurrent_streams:
158                yield from self._concurrent_source.read(selected_concurrent_streams)
159
160            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
161            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
162            # of which were already synced using the Concurrent CDK
163            filtered_catalog = self._remove_concurrent_streams_from_catalog(
164                catalog=catalog, concurrent_stream_names=concurrent_stream_names
165            )
166        else:
167            filtered_catalog = catalog
168
169        # It is no need run read for synchronous streams if they are not exists.
170        if not filtered_catalog.streams:
171            return
172
173        yield from super().read(logger, config, filtered_catalog, state)
174
175    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
176        concurrent_streams, synchronous_streams = self._group_streams(config=config)
177        return AirbyteCatalog(
178            streams=[
179                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
180            ]
181        )
182
183    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
184        """
185        The `streams` method is used as part of the AbstractSource in the following cases:
186        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
187        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
188        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
189
190        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
191        """
192        return super().streams(config)
193
194    def _group_streams(
195        self, config: Mapping[str, Any]
196    ) -> Tuple[List[AbstractStream], List[Stream]]:
197        concurrent_streams: List[AbstractStream] = []
198        synchronous_streams: List[Stream] = []
199
200        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
201        # and this is validated during the initialization of the source.
202        streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
203            self._source_config, config
204        )
205
206        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
207
208        for declarative_stream in self.streams(config=config):
209            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
210            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
211            # so we need to treat them as synchronous
212
213            supports_file_transfer = (
214                isinstance(declarative_stream, DeclarativeStream)
215                and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
216            )
217
218            if (
219                isinstance(declarative_stream, DeclarativeStream)
220                and name_to_stream_mapping[declarative_stream.name]["type"]
221                == "StateDelegatingStream"
222            ):
223                stream_state = self._connector_state_manager.get_stream_state(
224                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
225                )
226
227                name_to_stream_mapping[declarative_stream.name] = (
228                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
229                    if stream_state
230                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
231                )
232
233            if isinstance(declarative_stream, DeclarativeStream) and (
234                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
235                == "SimpleRetriever"
236                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
237                == "AsyncRetriever"
238            ):
239                incremental_sync_component_definition = name_to_stream_mapping[
240                    declarative_stream.name
241                ].get("incremental_sync")
242
243                partition_router_component_definition = (
244                    name_to_stream_mapping[declarative_stream.name]
245                    .get("retriever", {})
246                    .get("partition_router")
247                )
248                is_without_partition_router_or_cursor = not bool(
249                    incremental_sync_component_definition
250                ) and not bool(partition_router_component_definition)
251
252                is_substream_without_incremental = (
253                    partition_router_component_definition
254                    and not incremental_sync_component_definition
255                )
256
257                if self._is_concurrent_cursor_incremental_without_partition_routing(
258                    declarative_stream, incremental_sync_component_definition
259                ):
260                    stream_state = self._connector_state_manager.get_stream_state(
261                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
262                    )
263                    stream_state = self._migrate_state(declarative_stream, stream_state)
264
265                    retriever = self._get_retriever(declarative_stream, stream_state)
266
267                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
268                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
269                    ):
270                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
271
272                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
273                            # This should never happen since we instantiate ConcurrentCursor in
274                            # model_to_component_factory.py
275                            raise ValueError(
276                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
277                            )
278
279                        partition_generator = StreamSlicerPartitionGenerator(
280                            partition_factory=DeclarativePartitionFactory(
281                                declarative_stream.name,
282                                declarative_stream.get_json_schema(),
283                                retriever,
284                                self.message_repository,
285                            ),
286                            stream_slicer=declarative_stream.retriever.stream_slicer,
287                        )
288                    else:
289                        if (
290                            incremental_sync_component_definition
291                            and incremental_sync_component_definition.get("type")
292                            == IncrementingCountCursorModel.__name__
293                        ):
294                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
295                                model_type=IncrementingCountCursorModel,
296                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
297                                stream_name=declarative_stream.name,
298                                stream_namespace=declarative_stream.namespace,
299                                config=config or {},
300                            )
301                        else:
302                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
303                                model_type=DatetimeBasedCursorModel,
304                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
305                                stream_name=declarative_stream.name,
306                                stream_namespace=declarative_stream.namespace,
307                                config=config or {},
308                                stream_state_migrations=declarative_stream.state_migrations,
309                            )
310                        partition_generator = StreamSlicerPartitionGenerator(
311                            partition_factory=DeclarativePartitionFactory(
312                                declarative_stream.name,
313                                declarative_stream.get_json_schema(),
314                                retriever,
315                                self.message_repository,
316                            ),
317                            stream_slicer=cursor,
318                        )
319
320                    concurrent_streams.append(
321                        DefaultStream(
322                            partition_generator=partition_generator,
323                            name=declarative_stream.name,
324                            json_schema=declarative_stream.get_json_schema(),
325                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
326                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
327                            cursor_field=cursor.cursor_field.cursor_field_key
328                            if hasattr(cursor, "cursor_field")
329                            and hasattr(
330                                cursor.cursor_field, "cursor_field_key"
331                            )  # FIXME this will need to be updated once we do the per partition
332                            else None,
333                            logger=self.logger,
334                            cursor=cursor,
335                            supports_file_transfer=supports_file_transfer,
336                        )
337                    )
338                elif (
339                    is_substream_without_incremental or is_without_partition_router_or_cursor
340                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
341                    partition_generator = StreamSlicerPartitionGenerator(
342                        DeclarativePartitionFactory(
343                            declarative_stream.name,
344                            declarative_stream.get_json_schema(),
345                            declarative_stream.retriever,
346                            self.message_repository,
347                        ),
348                        declarative_stream.retriever.stream_slicer,
349                    )
350
351                    final_state_cursor = FinalStateCursor(
352                        stream_name=declarative_stream.name,
353                        stream_namespace=declarative_stream.namespace,
354                        message_repository=self.message_repository,
355                    )
356
357                    concurrent_streams.append(
358                        DefaultStream(
359                            partition_generator=partition_generator,
360                            name=declarative_stream.name,
361                            json_schema=declarative_stream.get_json_schema(),
362                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
363                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
364                            cursor_field=None,
365                            logger=self.logger,
366                            cursor=final_state_cursor,
367                            supports_file_transfer=supports_file_transfer,
368                        )
369                    )
370                elif (
371                    incremental_sync_component_definition
372                    and incremental_sync_component_definition.get("type", "")
373                    == DatetimeBasedCursorModel.__name__
374                    and hasattr(declarative_stream.retriever, "stream_slicer")
375                    and isinstance(
376                        declarative_stream.retriever.stream_slicer,
377                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
378                    )
379                ):
380                    stream_state = self._connector_state_manager.get_stream_state(
381                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
382                    )
383                    stream_state = self._migrate_state(declarative_stream, stream_state)
384
385                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
386
387                    perpartition_cursor = (
388                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
389                            state_manager=self._connector_state_manager,
390                            model_type=DatetimeBasedCursorModel,
391                            component_definition=incremental_sync_component_definition,
392                            stream_name=declarative_stream.name,
393                            stream_namespace=declarative_stream.namespace,
394                            config=config or {},
395                            stream_state=stream_state,
396                            partition_router=partition_router,
397                        )
398                    )
399
400                    retriever = self._get_retriever(declarative_stream, stream_state)
401
402                    partition_generator = StreamSlicerPartitionGenerator(
403                        DeclarativePartitionFactory(
404                            declarative_stream.name,
405                            declarative_stream.get_json_schema(),
406                            retriever,
407                            self.message_repository,
408                        ),
409                        perpartition_cursor,
410                    )
411
412                    concurrent_streams.append(
413                        DefaultStream(
414                            partition_generator=partition_generator,
415                            name=declarative_stream.name,
416                            json_schema=declarative_stream.get_json_schema(),
417                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
418                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
419                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
420                            logger=self.logger,
421                            cursor=perpartition_cursor,
422                            supports_file_transfer=supports_file_transfer,
423                        )
424                    )
425                else:
426                    synchronous_streams.append(declarative_stream)
427            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
428            # Condition below needs to ensure that concurrent support is not lost for sources that already support
429            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
430            elif (
431                isinstance(declarative_stream, AbstractStreamFacade)
432                and self.is_partially_declarative
433            ):
434                concurrent_streams.append(declarative_stream.get_underlying_stream())
435            else:
436                synchronous_streams.append(declarative_stream)
437
438        return concurrent_streams, synchronous_streams
439
440    def _is_concurrent_cursor_incremental_without_partition_routing(
441        self,
442        declarative_stream: DeclarativeStream,
443        incremental_sync_component_definition: Mapping[str, Any] | None,
444    ) -> bool:
445        return (
446            incremental_sync_component_definition is not None
447            and bool(incremental_sync_component_definition)
448            and (
449                incremental_sync_component_definition.get("type", "")
450                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
451            )
452            and hasattr(declarative_stream.retriever, "stream_slicer")
453            and (
454                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
455                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
456                # add isintance check here if we want to create a Declarative IncrementingCountCursor
457                # or isinstance(
458                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
459                # )
460                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
461            )
462        )
463
464    @staticmethod
465    def _get_retriever(
466        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
467    ) -> Retriever:
468        retriever = declarative_stream.retriever
469
470        # This is an optimization so that we don't invoke any cursor or state management flows within the
471        # low-code framework because state management is handled through the ConcurrentCursor.
472        if declarative_stream and isinstance(retriever, SimpleRetriever):
473            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
474            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
475            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
476            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
477            # properly initialized with state.
478            if retriever.cursor:
479                retriever.cursor.set_initial_state(stream_state=stream_state)
480
481            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
482            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
483            # for semi-incremental streams using is_client_side_incremental to filter properly
484            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
485                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
486            ):
487                retriever.record_selector.record_filter._cursor.set_initial_state(
488                    stream_state=stream_state
489                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
490
491            # We zero it out here, but since this is a cursor reference, the state is still properly
492            # instantiated for the other components that reference it
493            retriever.cursor = None
494
495        return retriever
496
497    @staticmethod
498    def _select_streams(
499        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
500    ) -> List[AbstractStream]:
501        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
502        abstract_streams: List[AbstractStream] = []
503        for configured_stream in configured_catalog.streams:
504            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
505            if stream_instance:
506                abstract_streams.append(stream_instance)
507
508        return abstract_streams
509
510    @staticmethod
511    def _remove_concurrent_streams_from_catalog(
512        catalog: ConfiguredAirbyteCatalog,
513        concurrent_stream_names: set[str],
514    ) -> ConfiguredAirbyteCatalog:
515        return ConfiguredAirbyteCatalog(
516            streams=[
517                stream
518                for stream in catalog.streams
519                if stream.stream.name not in concurrent_stream_names
520            ]
521        )
522
523    @staticmethod
524    def _migrate_state(
525        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
526    ) -> MutableMapping[str, Any]:
527        for state_migration in declarative_stream.state_migrations:
528            if state_migration.should_migrate(stream_state):
529                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
530                stream_state = dict(state_migration.migrate(stream_state))
531
532        return stream_state
class ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
 64class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 65    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 66    # because it has hit the limit of futures but not partition reader is consuming them.
 67    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 68
 69    def __init__(
 70        self,
 71        catalog: Optional[ConfiguredAirbyteCatalog],
 72        config: Optional[Mapping[str, Any]],
 73        state: TState,
 74        source_config: ConnectionDefinition,
 75        debug: bool = False,
 76        emit_connector_builder_messages: bool = False,
 77        component_factory: Optional[ModelToComponentFactory] = None,
 78        **kwargs: Any,
 79    ) -> None:
 80        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 81        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 82        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 83
 84        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 85        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 86        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 87        # incremental streams running in full refresh.
 88        component_factory = component_factory or ModelToComponentFactory(
 89            emit_connector_builder_messages=emit_connector_builder_messages,
 90            disable_resumable_full_refresh=True,
 91            connector_state_manager=self._connector_state_manager,
 92        )
 93
 94        super().__init__(
 95            source_config=source_config,
 96            config=config,
 97            debug=debug,
 98            emit_connector_builder_messages=emit_connector_builder_messages,
 99            component_factory=component_factory,
100        )
101
102        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
103        if concurrency_level_from_manifest:
104            concurrency_level_component = self._constructor.create_component(
105                model_type=ConcurrencyLevelModel,
106                component_definition=concurrency_level_from_manifest,
107                config=config or {},
108            )
109            if not isinstance(concurrency_level_component, ConcurrencyLevel):
110                raise ValueError(
111                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
112                )
113
114            concurrency_level = concurrency_level_component.get_concurrency_level()
115            initial_number_of_partitions_to_generate = max(
116                concurrency_level // 2, 1
117            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
118        else:
119            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
120            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
121
122        self._concurrent_source = ConcurrentSource.create(
123            num_workers=concurrency_level,
124            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
125            logger=self.logger,
126            slice_logger=self._slice_logger,
127            message_repository=self.message_repository,
128        )
129
130    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
131    @property
132    def is_partially_declarative(self) -> bool:
133        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
134        return False
135
136    def read(
137        self,
138        logger: logging.Logger,
139        config: Mapping[str, Any],
140        catalog: ConfiguredAirbyteCatalog,
141        state: Optional[List[AirbyteStateMessage]] = None,
142    ) -> Iterator[AirbyteMessage]:
143        concurrent_streams, _ = self._group_streams(config=config)
144
145        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
146        # the concurrent streams must be saved so that they can be removed from the catalog before starting
147        # synchronous streams
148        if len(concurrent_streams) > 0:
149            concurrent_stream_names = set(
150                [concurrent_stream.name for concurrent_stream in concurrent_streams]
151            )
152
153            selected_concurrent_streams = self._select_streams(
154                streams=concurrent_streams, configured_catalog=catalog
155            )
156            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
157            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
158            if selected_concurrent_streams:
159                yield from self._concurrent_source.read(selected_concurrent_streams)
160
161            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
162            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
163            # of which were already synced using the Concurrent CDK
164            filtered_catalog = self._remove_concurrent_streams_from_catalog(
165                catalog=catalog, concurrent_stream_names=concurrent_stream_names
166            )
167        else:
168            filtered_catalog = catalog
169
170        # It is no need run read for synchronous streams if they are not exists.
171        if not filtered_catalog.streams:
172            return
173
174        yield from super().read(logger, config, filtered_catalog, state)
175
176    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
177        concurrent_streams, synchronous_streams = self._group_streams(config=config)
178        return AirbyteCatalog(
179            streams=[
180                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
181            ]
182        )
183
184    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
185        """
186        The `streams` method is used as part of the AbstractSource in the following cases:
187        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
188        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
189        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
190
191        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
192        """
193        return super().streams(config)
194
195    def _group_streams(
196        self, config: Mapping[str, Any]
197    ) -> Tuple[List[AbstractStream], List[Stream]]:
198        concurrent_streams: List[AbstractStream] = []
199        synchronous_streams: List[Stream] = []
200
201        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
202        # and this is validated during the initialization of the source.
203        streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
204            self._source_config, config
205        )
206
207        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
208
209        for declarative_stream in self.streams(config=config):
210            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
211            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
212            # so we need to treat them as synchronous
213
214            supports_file_transfer = (
215                isinstance(declarative_stream, DeclarativeStream)
216                and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
217            )
218
219            if (
220                isinstance(declarative_stream, DeclarativeStream)
221                and name_to_stream_mapping[declarative_stream.name]["type"]
222                == "StateDelegatingStream"
223            ):
224                stream_state = self._connector_state_manager.get_stream_state(
225                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
226                )
227
228                name_to_stream_mapping[declarative_stream.name] = (
229                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
230                    if stream_state
231                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
232                )
233
234            if isinstance(declarative_stream, DeclarativeStream) and (
235                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
236                == "SimpleRetriever"
237                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
238                == "AsyncRetriever"
239            ):
240                incremental_sync_component_definition = name_to_stream_mapping[
241                    declarative_stream.name
242                ].get("incremental_sync")
243
244                partition_router_component_definition = (
245                    name_to_stream_mapping[declarative_stream.name]
246                    .get("retriever", {})
247                    .get("partition_router")
248                )
249                is_without_partition_router_or_cursor = not bool(
250                    incremental_sync_component_definition
251                ) and not bool(partition_router_component_definition)
252
253                is_substream_without_incremental = (
254                    partition_router_component_definition
255                    and not incremental_sync_component_definition
256                )
257
258                if self._is_concurrent_cursor_incremental_without_partition_routing(
259                    declarative_stream, incremental_sync_component_definition
260                ):
261                    stream_state = self._connector_state_manager.get_stream_state(
262                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
263                    )
264                    stream_state = self._migrate_state(declarative_stream, stream_state)
265
266                    retriever = self._get_retriever(declarative_stream, stream_state)
267
268                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
269                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
270                    ):
271                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
272
273                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
274                            # This should never happen since we instantiate ConcurrentCursor in
275                            # model_to_component_factory.py
276                            raise ValueError(
277                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
278                            )
279
280                        partition_generator = StreamSlicerPartitionGenerator(
281                            partition_factory=DeclarativePartitionFactory(
282                                declarative_stream.name,
283                                declarative_stream.get_json_schema(),
284                                retriever,
285                                self.message_repository,
286                            ),
287                            stream_slicer=declarative_stream.retriever.stream_slicer,
288                        )
289                    else:
290                        if (
291                            incremental_sync_component_definition
292                            and incremental_sync_component_definition.get("type")
293                            == IncrementingCountCursorModel.__name__
294                        ):
295                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
296                                model_type=IncrementingCountCursorModel,
297                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
298                                stream_name=declarative_stream.name,
299                                stream_namespace=declarative_stream.namespace,
300                                config=config or {},
301                            )
302                        else:
303                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
304                                model_type=DatetimeBasedCursorModel,
305                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
306                                stream_name=declarative_stream.name,
307                                stream_namespace=declarative_stream.namespace,
308                                config=config or {},
309                                stream_state_migrations=declarative_stream.state_migrations,
310                            )
311                        partition_generator = StreamSlicerPartitionGenerator(
312                            partition_factory=DeclarativePartitionFactory(
313                                declarative_stream.name,
314                                declarative_stream.get_json_schema(),
315                                retriever,
316                                self.message_repository,
317                            ),
318                            stream_slicer=cursor,
319                        )
320
321                    concurrent_streams.append(
322                        DefaultStream(
323                            partition_generator=partition_generator,
324                            name=declarative_stream.name,
325                            json_schema=declarative_stream.get_json_schema(),
326                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
327                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
328                            cursor_field=cursor.cursor_field.cursor_field_key
329                            if hasattr(cursor, "cursor_field")
330                            and hasattr(
331                                cursor.cursor_field, "cursor_field_key"
332                            )  # FIXME this will need to be updated once we do the per partition
333                            else None,
334                            logger=self.logger,
335                            cursor=cursor,
336                            supports_file_transfer=supports_file_transfer,
337                        )
338                    )
339                elif (
340                    is_substream_without_incremental or is_without_partition_router_or_cursor
341                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
342                    partition_generator = StreamSlicerPartitionGenerator(
343                        DeclarativePartitionFactory(
344                            declarative_stream.name,
345                            declarative_stream.get_json_schema(),
346                            declarative_stream.retriever,
347                            self.message_repository,
348                        ),
349                        declarative_stream.retriever.stream_slicer,
350                    )
351
352                    final_state_cursor = FinalStateCursor(
353                        stream_name=declarative_stream.name,
354                        stream_namespace=declarative_stream.namespace,
355                        message_repository=self.message_repository,
356                    )
357
358                    concurrent_streams.append(
359                        DefaultStream(
360                            partition_generator=partition_generator,
361                            name=declarative_stream.name,
362                            json_schema=declarative_stream.get_json_schema(),
363                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
364                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
365                            cursor_field=None,
366                            logger=self.logger,
367                            cursor=final_state_cursor,
368                            supports_file_transfer=supports_file_transfer,
369                        )
370                    )
371                elif (
372                    incremental_sync_component_definition
373                    and incremental_sync_component_definition.get("type", "")
374                    == DatetimeBasedCursorModel.__name__
375                    and hasattr(declarative_stream.retriever, "stream_slicer")
376                    and isinstance(
377                        declarative_stream.retriever.stream_slicer,
378                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
379                    )
380                ):
381                    stream_state = self._connector_state_manager.get_stream_state(
382                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
383                    )
384                    stream_state = self._migrate_state(declarative_stream, stream_state)
385
386                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
387
388                    perpartition_cursor = (
389                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
390                            state_manager=self._connector_state_manager,
391                            model_type=DatetimeBasedCursorModel,
392                            component_definition=incremental_sync_component_definition,
393                            stream_name=declarative_stream.name,
394                            stream_namespace=declarative_stream.namespace,
395                            config=config or {},
396                            stream_state=stream_state,
397                            partition_router=partition_router,
398                        )
399                    )
400
401                    retriever = self._get_retriever(declarative_stream, stream_state)
402
403                    partition_generator = StreamSlicerPartitionGenerator(
404                        DeclarativePartitionFactory(
405                            declarative_stream.name,
406                            declarative_stream.get_json_schema(),
407                            retriever,
408                            self.message_repository,
409                        ),
410                        perpartition_cursor,
411                    )
412
413                    concurrent_streams.append(
414                        DefaultStream(
415                            partition_generator=partition_generator,
416                            name=declarative_stream.name,
417                            json_schema=declarative_stream.get_json_schema(),
418                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
419                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
420                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
421                            logger=self.logger,
422                            cursor=perpartition_cursor,
423                            supports_file_transfer=supports_file_transfer,
424                        )
425                    )
426                else:
427                    synchronous_streams.append(declarative_stream)
428            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
429            # Condition below needs to ensure that concurrent support is not lost for sources that already support
430            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
431            elif (
432                isinstance(declarative_stream, AbstractStreamFacade)
433                and self.is_partially_declarative
434            ):
435                concurrent_streams.append(declarative_stream.get_underlying_stream())
436            else:
437                synchronous_streams.append(declarative_stream)
438
439        return concurrent_streams, synchronous_streams
440
441    def _is_concurrent_cursor_incremental_without_partition_routing(
442        self,
443        declarative_stream: DeclarativeStream,
444        incremental_sync_component_definition: Mapping[str, Any] | None,
445    ) -> bool:
446        return (
447            incremental_sync_component_definition is not None
448            and bool(incremental_sync_component_definition)
449            and (
450                incremental_sync_component_definition.get("type", "")
451                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
452            )
453            and hasattr(declarative_stream.retriever, "stream_slicer")
454            and (
455                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
456                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
457                # add isintance check here if we want to create a Declarative IncrementingCountCursor
458                # or isinstance(
459                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
460                # )
461                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
462            )
463        )
464
465    @staticmethod
466    def _get_retriever(
467        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
468    ) -> Retriever:
469        retriever = declarative_stream.retriever
470
471        # This is an optimization so that we don't invoke any cursor or state management flows within the
472        # low-code framework because state management is handled through the ConcurrentCursor.
473        if declarative_stream and isinstance(retriever, SimpleRetriever):
474            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
475            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
476            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
477            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
478            # properly initialized with state.
479            if retriever.cursor:
480                retriever.cursor.set_initial_state(stream_state=stream_state)
481
482            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
483            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
484            # for semi-incremental streams using is_client_side_incremental to filter properly
485            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
486                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
487            ):
488                retriever.record_selector.record_filter._cursor.set_initial_state(
489                    stream_state=stream_state
490                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
491
492            # We zero it out here, but since this is a cursor reference, the state is still properly
493            # instantiated for the other components that reference it
494            retriever.cursor = None
495
496        return retriever
497
498    @staticmethod
499    def _select_streams(
500        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
501    ) -> List[AbstractStream]:
502        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
503        abstract_streams: List[AbstractStream] = []
504        for configured_stream in configured_catalog.streams:
505            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
506            if stream_instance:
507                abstract_streams.append(stream_instance)
508
509        return abstract_streams
510
511    @staticmethod
512    def _remove_concurrent_streams_from_catalog(
513        catalog: ConfiguredAirbyteCatalog,
514        concurrent_stream_names: set[str],
515    ) -> ConfiguredAirbyteCatalog:
516        return ConfiguredAirbyteCatalog(
517            streams=[
518                stream
519                for stream in catalog.streams
520                if stream.stream.name not in concurrent_stream_names
521            ]
522        )
523
524    @staticmethod
525    def _migrate_state(
526        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
527    ) -> MutableMapping[str, Any]:
528        for state_migration in declarative_stream.state_migrations:
529            if state_migration.should_migrate(stream_state):
530                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
531                stream_state = dict(state_migration.migrate(stream_state))
532
533        return stream_state

Declarative source defined by a manifest of low-code components that define source connector behavior

is_partially_declarative: bool
131    @property
132    def is_partially_declarative(self) -> bool:
133        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
134        return False

This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
176    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
177        concurrent_streams, synchronous_streams = self._group_streams(config=config)
178        return AirbyteCatalog(
179            streams=[
180                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
181            ]
182        )

Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.