airbyte_cdk.sources.declarative.concurrent_declarative_source

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
  7
  8from airbyte_cdk.models import (
  9    AirbyteCatalog,
 10    AirbyteMessage,
 11    AirbyteStateMessage,
 12    ConfiguredAirbyteCatalog,
 13)
 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
 18from airbyte_cdk.sources.declarative.extractors import RecordSelector
 19from airbyte_cdk.sources.declarative.extractors.record_filter import (
 20    ClientSideIncrementalRecordFilterDecorator,
 21)
 22from airbyte_cdk.sources.declarative.incremental import (
 23    ConcurrentPerPartitionCursor,
 24    GlobalSubstreamCursor,
 25)
 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
 28    PerPartitionWithGlobalCursor,
 29)
 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
 31from airbyte_cdk.sources.declarative.models import FileUploader
 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 33    ConcurrencyLevel as ConcurrencyLevelModel,
 34)
 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 36    DatetimeBasedCursor as DatetimeBasedCursorModel,
 37)
 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 39    IncrementingCountCursor as IncrementingCountCursorModel,
 40)
 41from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 42    ModelToComponentFactory,
 43)
 44from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
 45from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
 46from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
 47    DeclarativePartitionFactory,
 48    StreamSlicerPartitionGenerator,
 49)
 50from airbyte_cdk.sources.declarative.types import ConnectionDefinition
 51from airbyte_cdk.sources.source import TState
 52from airbyte_cdk.sources.streams import Stream
 53from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 54from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 55from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
 56    AlwaysAvailableAvailabilityStrategy,
 57)
 58from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
 59from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 60from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
 61
 62
 63class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 64    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 65    # because it has hit the limit of futures but not partition reader is consuming them.
 66    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 67
 68    def __init__(
 69        self,
 70        catalog: Optional[ConfiguredAirbyteCatalog],
 71        config: Optional[Mapping[str, Any]],
 72        state: TState,
 73        source_config: ConnectionDefinition,
 74        debug: bool = False,
 75        emit_connector_builder_messages: bool = False,
 76        component_factory: Optional[ModelToComponentFactory] = None,
 77        config_path: Optional[str] = None,
 78        **kwargs: Any,
 79    ) -> None:
 80        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 81        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 82        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 83
 84        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 85        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 86        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 87        # incremental streams running in full refresh.
 88        component_factory = component_factory or ModelToComponentFactory(
 89            emit_connector_builder_messages=emit_connector_builder_messages,
 90            disable_resumable_full_refresh=True,
 91            connector_state_manager=self._connector_state_manager,
 92            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
 93        )
 94
 95        super().__init__(
 96            source_config=source_config,
 97            config=config,
 98            debug=debug,
 99            emit_connector_builder_messages=emit_connector_builder_messages,
100            component_factory=component_factory,
101            config_path=config_path,
102        )
103
104        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
105        if concurrency_level_from_manifest:
106            concurrency_level_component = self._constructor.create_component(
107                model_type=ConcurrencyLevelModel,
108                component_definition=concurrency_level_from_manifest,
109                config=config or {},
110            )
111            if not isinstance(concurrency_level_component, ConcurrencyLevel):
112                raise ValueError(
113                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
114                )
115
116            concurrency_level = concurrency_level_component.get_concurrency_level()
117            initial_number_of_partitions_to_generate = max(
118                concurrency_level // 2, 1
119            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
120        else:
121            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
122            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
123
124        self._concurrent_source = ConcurrentSource.create(
125            num_workers=concurrency_level,
126            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
127            logger=self.logger,
128            slice_logger=self._slice_logger,
129            message_repository=self.message_repository,
130        )
131
132    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
133    @property
134    def is_partially_declarative(self) -> bool:
135        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
136        return False
137
138    def read(
139        self,
140        logger: logging.Logger,
141        config: Mapping[str, Any],
142        catalog: ConfiguredAirbyteCatalog,
143        state: Optional[List[AirbyteStateMessage]] = None,
144    ) -> Iterator[AirbyteMessage]:
145        concurrent_streams, _ = self._group_streams(config=config)
146
147        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
148        # the concurrent streams must be saved so that they can be removed from the catalog before starting
149        # synchronous streams
150        if len(concurrent_streams) > 0:
151            concurrent_stream_names = set(
152                [concurrent_stream.name for concurrent_stream in concurrent_streams]
153            )
154
155            selected_concurrent_streams = self._select_streams(
156                streams=concurrent_streams, configured_catalog=catalog
157            )
158            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
159            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
160            if selected_concurrent_streams:
161                yield from self._concurrent_source.read(selected_concurrent_streams)
162
163            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
164            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
165            # of which were already synced using the Concurrent CDK
166            filtered_catalog = self._remove_concurrent_streams_from_catalog(
167                catalog=catalog, concurrent_stream_names=concurrent_stream_names
168            )
169        else:
170            filtered_catalog = catalog
171
172        # It is no need run read for synchronous streams if they are not exists.
173        if not filtered_catalog.streams:
174            return
175
176        yield from super().read(logger, config, filtered_catalog, state)
177
178    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
179        concurrent_streams, synchronous_streams = self._group_streams(config=config)
180        return AirbyteCatalog(
181            streams=[
182                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
183            ]
184        )
185
186    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
187        """
188        The `streams` method is used as part of the AbstractSource in the following cases:
189        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
190        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
191        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
192
193        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
194        """
195        return super().streams(config)
196
197    def _group_streams(
198        self, config: Mapping[str, Any]
199    ) -> Tuple[List[AbstractStream], List[Stream]]:
200        concurrent_streams: List[AbstractStream] = []
201        synchronous_streams: List[Stream] = []
202
203        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
204        # and this is validated during the initialization of the source.
205        streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs(
206            self._source_config, config
207        )
208
209        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
210
211        for declarative_stream in self.streams(config=config):
212            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
213            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
214            # so we need to treat them as synchronous
215
216            supports_file_transfer = (
217                isinstance(declarative_stream, DeclarativeStream)
218                and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
219            )
220
221            if (
222                isinstance(declarative_stream, DeclarativeStream)
223                and name_to_stream_mapping[declarative_stream.name]["type"]
224                == "StateDelegatingStream"
225            ):
226                stream_state = self._connector_state_manager.get_stream_state(
227                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
228                )
229
230                name_to_stream_mapping[declarative_stream.name] = (
231                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
232                    if stream_state
233                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
234                )
235
236            if isinstance(declarative_stream, DeclarativeStream) and (
237                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
238                == "SimpleRetriever"
239                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
240                == "AsyncRetriever"
241            ):
242                incremental_sync_component_definition = name_to_stream_mapping[
243                    declarative_stream.name
244                ].get("incremental_sync")
245
246                partition_router_component_definition = (
247                    name_to_stream_mapping[declarative_stream.name]
248                    .get("retriever", {})
249                    .get("partition_router")
250                )
251                is_without_partition_router_or_cursor = not bool(
252                    incremental_sync_component_definition
253                ) and not bool(partition_router_component_definition)
254
255                is_substream_without_incremental = (
256                    partition_router_component_definition
257                    and not incremental_sync_component_definition
258                )
259
260                if self._is_concurrent_cursor_incremental_without_partition_routing(
261                    declarative_stream, incremental_sync_component_definition
262                ):
263                    stream_state = self._connector_state_manager.get_stream_state(
264                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
265                    )
266                    stream_state = self._migrate_state(declarative_stream, stream_state)
267
268                    retriever = self._get_retriever(declarative_stream, stream_state)
269
270                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
271                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
272                    ):
273                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
274
275                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
276                            # This should never happen since we instantiate ConcurrentCursor in
277                            # model_to_component_factory.py
278                            raise ValueError(
279                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
280                            )
281
282                        partition_generator = StreamSlicerPartitionGenerator(
283                            partition_factory=DeclarativePartitionFactory(
284                                declarative_stream.name,
285                                declarative_stream.get_json_schema(),
286                                retriever,
287                                self.message_repository,
288                            ),
289                            stream_slicer=declarative_stream.retriever.stream_slicer,
290                        )
291                    else:
292                        if (
293                            incremental_sync_component_definition
294                            and incremental_sync_component_definition.get("type")
295                            == IncrementingCountCursorModel.__name__
296                        ):
297                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
298                                model_type=IncrementingCountCursorModel,
299                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
300                                stream_name=declarative_stream.name,
301                                stream_namespace=declarative_stream.namespace,
302                                config=config or {},
303                            )
304                        else:
305                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
306                                model_type=DatetimeBasedCursorModel,
307                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
308                                stream_name=declarative_stream.name,
309                                stream_namespace=declarative_stream.namespace,
310                                config=config or {},
311                                stream_state_migrations=declarative_stream.state_migrations,
312                            )
313                        partition_generator = StreamSlicerPartitionGenerator(
314                            partition_factory=DeclarativePartitionFactory(
315                                declarative_stream.name,
316                                declarative_stream.get_json_schema(),
317                                retriever,
318                                self.message_repository,
319                            ),
320                            stream_slicer=cursor,
321                        )
322
323                    concurrent_streams.append(
324                        DefaultStream(
325                            partition_generator=partition_generator,
326                            name=declarative_stream.name,
327                            json_schema=declarative_stream.get_json_schema(),
328                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
329                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
330                            cursor_field=cursor.cursor_field.cursor_field_key
331                            if hasattr(cursor, "cursor_field")
332                            and hasattr(
333                                cursor.cursor_field, "cursor_field_key"
334                            )  # FIXME this will need to be updated once we do the per partition
335                            else None,
336                            logger=self.logger,
337                            cursor=cursor,
338                            supports_file_transfer=supports_file_transfer,
339                        )
340                    )
341                elif (
342                    is_substream_without_incremental or is_without_partition_router_or_cursor
343                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
344                    partition_generator = StreamSlicerPartitionGenerator(
345                        DeclarativePartitionFactory(
346                            declarative_stream.name,
347                            declarative_stream.get_json_schema(),
348                            declarative_stream.retriever,
349                            self.message_repository,
350                        ),
351                        declarative_stream.retriever.stream_slicer,
352                    )
353
354                    final_state_cursor = FinalStateCursor(
355                        stream_name=declarative_stream.name,
356                        stream_namespace=declarative_stream.namespace,
357                        message_repository=self.message_repository,
358                    )
359
360                    concurrent_streams.append(
361                        DefaultStream(
362                            partition_generator=partition_generator,
363                            name=declarative_stream.name,
364                            json_schema=declarative_stream.get_json_schema(),
365                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
366                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
367                            cursor_field=None,
368                            logger=self.logger,
369                            cursor=final_state_cursor,
370                            supports_file_transfer=supports_file_transfer,
371                        )
372                    )
373                elif (
374                    incremental_sync_component_definition
375                    and incremental_sync_component_definition.get("type", "")
376                    == DatetimeBasedCursorModel.__name__
377                    and hasattr(declarative_stream.retriever, "stream_slicer")
378                    and isinstance(
379                        declarative_stream.retriever.stream_slicer,
380                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
381                    )
382                ):
383                    stream_state = self._connector_state_manager.get_stream_state(
384                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
385                    )
386                    stream_state = self._migrate_state(declarative_stream, stream_state)
387
388                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
389
390                    perpartition_cursor = (
391                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
392                            state_manager=self._connector_state_manager,
393                            model_type=DatetimeBasedCursorModel,
394                            component_definition=incremental_sync_component_definition,
395                            stream_name=declarative_stream.name,
396                            stream_namespace=declarative_stream.namespace,
397                            config=config or {},
398                            stream_state=stream_state,
399                            partition_router=partition_router,
400                        )
401                    )
402
403                    retriever = self._get_retriever(declarative_stream, stream_state)
404
405                    partition_generator = StreamSlicerPartitionGenerator(
406                        DeclarativePartitionFactory(
407                            declarative_stream.name,
408                            declarative_stream.get_json_schema(),
409                            retriever,
410                            self.message_repository,
411                        ),
412                        perpartition_cursor,
413                    )
414
415                    concurrent_streams.append(
416                        DefaultStream(
417                            partition_generator=partition_generator,
418                            name=declarative_stream.name,
419                            json_schema=declarative_stream.get_json_schema(),
420                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
421                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
422                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
423                            logger=self.logger,
424                            cursor=perpartition_cursor,
425                            supports_file_transfer=supports_file_transfer,
426                        )
427                    )
428                else:
429                    synchronous_streams.append(declarative_stream)
430            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
431            # Condition below needs to ensure that concurrent support is not lost for sources that already support
432            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
433            elif (
434                isinstance(declarative_stream, AbstractStreamFacade)
435                and self.is_partially_declarative
436            ):
437                concurrent_streams.append(declarative_stream.get_underlying_stream())
438            else:
439                synchronous_streams.append(declarative_stream)
440
441        return concurrent_streams, synchronous_streams
442
443    def _is_concurrent_cursor_incremental_without_partition_routing(
444        self,
445        declarative_stream: DeclarativeStream,
446        incremental_sync_component_definition: Mapping[str, Any] | None,
447    ) -> bool:
448        return (
449            incremental_sync_component_definition is not None
450            and bool(incremental_sync_component_definition)
451            and (
452                incremental_sync_component_definition.get("type", "")
453                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
454            )
455            and hasattr(declarative_stream.retriever, "stream_slicer")
456            and (
457                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
458                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
459                # add isintance check here if we want to create a Declarative IncrementingCountCursor
460                # or isinstance(
461                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
462                # )
463                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
464            )
465        )
466
467    @staticmethod
468    def _get_retriever(
469        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
470    ) -> Retriever:
471        retriever = declarative_stream.retriever
472
473        # This is an optimization so that we don't invoke any cursor or state management flows within the
474        # low-code framework because state management is handled through the ConcurrentCursor.
475        if declarative_stream and isinstance(retriever, SimpleRetriever):
476            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
477            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
478            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
479            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
480            # properly initialized with state.
481            if retriever.cursor:
482                retriever.cursor.set_initial_state(stream_state=stream_state)
483
484            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
485            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
486            # for semi-incremental streams using is_client_side_incremental to filter properly
487            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
488                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
489            ):
490                retriever.record_selector.record_filter._cursor.set_initial_state(
491                    stream_state=stream_state
492                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
493
494            # We zero it out here, but since this is a cursor reference, the state is still properly
495            # instantiated for the other components that reference it
496            retriever.cursor = None
497
498        return retriever
499
500    @staticmethod
501    def _select_streams(
502        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
503    ) -> List[AbstractStream]:
504        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
505        abstract_streams: List[AbstractStream] = []
506        for configured_stream in configured_catalog.streams:
507            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
508            if stream_instance:
509                abstract_streams.append(stream_instance)
510
511        return abstract_streams
512
513    @staticmethod
514    def _remove_concurrent_streams_from_catalog(
515        catalog: ConfiguredAirbyteCatalog,
516        concurrent_stream_names: set[str],
517    ) -> ConfiguredAirbyteCatalog:
518        return ConfiguredAirbyteCatalog(
519            streams=[
520                stream
521                for stream in catalog.streams
522                if stream.stream.name not in concurrent_stream_names
523            ]
524        )
525
526    @staticmethod
527    def _migrate_state(
528        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
529    ) -> MutableMapping[str, Any]:
530        for state_migration in declarative_stream.state_migrations:
531            if state_migration.should_migrate(stream_state):
532                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
533                stream_state = dict(state_migration.migrate(stream_state))
534
535        return stream_state
class ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
 64class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 65    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 66    # because it has hit the limit of futures but not partition reader is consuming them.
 67    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 68
 69    def __init__(
 70        self,
 71        catalog: Optional[ConfiguredAirbyteCatalog],
 72        config: Optional[Mapping[str, Any]],
 73        state: TState,
 74        source_config: ConnectionDefinition,
 75        debug: bool = False,
 76        emit_connector_builder_messages: bool = False,
 77        component_factory: Optional[ModelToComponentFactory] = None,
 78        config_path: Optional[str] = None,
 79        **kwargs: Any,
 80    ) -> None:
 81        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 82        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 83        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 84
 85        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 86        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 87        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 88        # incremental streams running in full refresh.
 89        component_factory = component_factory or ModelToComponentFactory(
 90            emit_connector_builder_messages=emit_connector_builder_messages,
 91            disable_resumable_full_refresh=True,
 92            connector_state_manager=self._connector_state_manager,
 93            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
 94        )
 95
 96        super().__init__(
 97            source_config=source_config,
 98            config=config,
 99            debug=debug,
100            emit_connector_builder_messages=emit_connector_builder_messages,
101            component_factory=component_factory,
102            config_path=config_path,
103        )
104
105        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
106        if concurrency_level_from_manifest:
107            concurrency_level_component = self._constructor.create_component(
108                model_type=ConcurrencyLevelModel,
109                component_definition=concurrency_level_from_manifest,
110                config=config or {},
111            )
112            if not isinstance(concurrency_level_component, ConcurrencyLevel):
113                raise ValueError(
114                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
115                )
116
117            concurrency_level = concurrency_level_component.get_concurrency_level()
118            initial_number_of_partitions_to_generate = max(
119                concurrency_level // 2, 1
120            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
121        else:
122            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
123            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
124
125        self._concurrent_source = ConcurrentSource.create(
126            num_workers=concurrency_level,
127            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
128            logger=self.logger,
129            slice_logger=self._slice_logger,
130            message_repository=self.message_repository,
131        )
132
133    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
134    @property
135    def is_partially_declarative(self) -> bool:
136        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
137        return False
138
139    def read(
140        self,
141        logger: logging.Logger,
142        config: Mapping[str, Any],
143        catalog: ConfiguredAirbyteCatalog,
144        state: Optional[List[AirbyteStateMessage]] = None,
145    ) -> Iterator[AirbyteMessage]:
146        concurrent_streams, _ = self._group_streams(config=config)
147
148        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
149        # the concurrent streams must be saved so that they can be removed from the catalog before starting
150        # synchronous streams
151        if len(concurrent_streams) > 0:
152            concurrent_stream_names = set(
153                [concurrent_stream.name for concurrent_stream in concurrent_streams]
154            )
155
156            selected_concurrent_streams = self._select_streams(
157                streams=concurrent_streams, configured_catalog=catalog
158            )
159            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
160            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
161            if selected_concurrent_streams:
162                yield from self._concurrent_source.read(selected_concurrent_streams)
163
164            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
165            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
166            # of which were already synced using the Concurrent CDK
167            filtered_catalog = self._remove_concurrent_streams_from_catalog(
168                catalog=catalog, concurrent_stream_names=concurrent_stream_names
169            )
170        else:
171            filtered_catalog = catalog
172
173        # It is no need run read for synchronous streams if they are not exists.
174        if not filtered_catalog.streams:
175            return
176
177        yield from super().read(logger, config, filtered_catalog, state)
178
179    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
180        concurrent_streams, synchronous_streams = self._group_streams(config=config)
181        return AirbyteCatalog(
182            streams=[
183                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
184            ]
185        )
186
187    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
188        """
189        The `streams` method is used as part of the AbstractSource in the following cases:
190        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
191        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
192        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
193
194        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
195        """
196        return super().streams(config)
197
198    def _group_streams(
199        self, config: Mapping[str, Any]
200    ) -> Tuple[List[AbstractStream], List[Stream]]:
201        concurrent_streams: List[AbstractStream] = []
202        synchronous_streams: List[Stream] = []
203
204        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
205        # and this is validated during the initialization of the source.
206        streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs(
207            self._source_config, config
208        )
209
210        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
211
212        for declarative_stream in self.streams(config=config):
213            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
214            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
215            # so we need to treat them as synchronous
216
217            supports_file_transfer = (
218                isinstance(declarative_stream, DeclarativeStream)
219                and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
220            )
221
222            if (
223                isinstance(declarative_stream, DeclarativeStream)
224                and name_to_stream_mapping[declarative_stream.name]["type"]
225                == "StateDelegatingStream"
226            ):
227                stream_state = self._connector_state_manager.get_stream_state(
228                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
229                )
230
231                name_to_stream_mapping[declarative_stream.name] = (
232                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
233                    if stream_state
234                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
235                )
236
237            if isinstance(declarative_stream, DeclarativeStream) and (
238                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
239                == "SimpleRetriever"
240                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
241                == "AsyncRetriever"
242            ):
243                incremental_sync_component_definition = name_to_stream_mapping[
244                    declarative_stream.name
245                ].get("incremental_sync")
246
247                partition_router_component_definition = (
248                    name_to_stream_mapping[declarative_stream.name]
249                    .get("retriever", {})
250                    .get("partition_router")
251                )
252                is_without_partition_router_or_cursor = not bool(
253                    incremental_sync_component_definition
254                ) and not bool(partition_router_component_definition)
255
256                is_substream_without_incremental = (
257                    partition_router_component_definition
258                    and not incremental_sync_component_definition
259                )
260
261                if self._is_concurrent_cursor_incremental_without_partition_routing(
262                    declarative_stream, incremental_sync_component_definition
263                ):
264                    stream_state = self._connector_state_manager.get_stream_state(
265                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
266                    )
267                    stream_state = self._migrate_state(declarative_stream, stream_state)
268
269                    retriever = self._get_retriever(declarative_stream, stream_state)
270
271                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
272                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
273                    ):
274                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
275
276                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
277                            # This should never happen since we instantiate ConcurrentCursor in
278                            # model_to_component_factory.py
279                            raise ValueError(
280                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
281                            )
282
283                        partition_generator = StreamSlicerPartitionGenerator(
284                            partition_factory=DeclarativePartitionFactory(
285                                declarative_stream.name,
286                                declarative_stream.get_json_schema(),
287                                retriever,
288                                self.message_repository,
289                            ),
290                            stream_slicer=declarative_stream.retriever.stream_slicer,
291                        )
292                    else:
293                        if (
294                            incremental_sync_component_definition
295                            and incremental_sync_component_definition.get("type")
296                            == IncrementingCountCursorModel.__name__
297                        ):
298                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
299                                model_type=IncrementingCountCursorModel,
300                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
301                                stream_name=declarative_stream.name,
302                                stream_namespace=declarative_stream.namespace,
303                                config=config or {},
304                            )
305                        else:
306                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
307                                model_type=DatetimeBasedCursorModel,
308                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
309                                stream_name=declarative_stream.name,
310                                stream_namespace=declarative_stream.namespace,
311                                config=config or {},
312                                stream_state_migrations=declarative_stream.state_migrations,
313                            )
314                        partition_generator = StreamSlicerPartitionGenerator(
315                            partition_factory=DeclarativePartitionFactory(
316                                declarative_stream.name,
317                                declarative_stream.get_json_schema(),
318                                retriever,
319                                self.message_repository,
320                            ),
321                            stream_slicer=cursor,
322                        )
323
324                    concurrent_streams.append(
325                        DefaultStream(
326                            partition_generator=partition_generator,
327                            name=declarative_stream.name,
328                            json_schema=declarative_stream.get_json_schema(),
329                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
330                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
331                            cursor_field=cursor.cursor_field.cursor_field_key
332                            if hasattr(cursor, "cursor_field")
333                            and hasattr(
334                                cursor.cursor_field, "cursor_field_key"
335                            )  # FIXME this will need to be updated once we do the per partition
336                            else None,
337                            logger=self.logger,
338                            cursor=cursor,
339                            supports_file_transfer=supports_file_transfer,
340                        )
341                    )
342                elif (
343                    is_substream_without_incremental or is_without_partition_router_or_cursor
344                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
345                    partition_generator = StreamSlicerPartitionGenerator(
346                        DeclarativePartitionFactory(
347                            declarative_stream.name,
348                            declarative_stream.get_json_schema(),
349                            declarative_stream.retriever,
350                            self.message_repository,
351                        ),
352                        declarative_stream.retriever.stream_slicer,
353                    )
354
355                    final_state_cursor = FinalStateCursor(
356                        stream_name=declarative_stream.name,
357                        stream_namespace=declarative_stream.namespace,
358                        message_repository=self.message_repository,
359                    )
360
361                    concurrent_streams.append(
362                        DefaultStream(
363                            partition_generator=partition_generator,
364                            name=declarative_stream.name,
365                            json_schema=declarative_stream.get_json_schema(),
366                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
367                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
368                            cursor_field=None,
369                            logger=self.logger,
370                            cursor=final_state_cursor,
371                            supports_file_transfer=supports_file_transfer,
372                        )
373                    )
374                elif (
375                    incremental_sync_component_definition
376                    and incremental_sync_component_definition.get("type", "")
377                    == DatetimeBasedCursorModel.__name__
378                    and hasattr(declarative_stream.retriever, "stream_slicer")
379                    and isinstance(
380                        declarative_stream.retriever.stream_slicer,
381                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
382                    )
383                ):
384                    stream_state = self._connector_state_manager.get_stream_state(
385                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
386                    )
387                    stream_state = self._migrate_state(declarative_stream, stream_state)
388
389                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
390
391                    perpartition_cursor = (
392                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
393                            state_manager=self._connector_state_manager,
394                            model_type=DatetimeBasedCursorModel,
395                            component_definition=incremental_sync_component_definition,
396                            stream_name=declarative_stream.name,
397                            stream_namespace=declarative_stream.namespace,
398                            config=config or {},
399                            stream_state=stream_state,
400                            partition_router=partition_router,
401                        )
402                    )
403
404                    retriever = self._get_retriever(declarative_stream, stream_state)
405
406                    partition_generator = StreamSlicerPartitionGenerator(
407                        DeclarativePartitionFactory(
408                            declarative_stream.name,
409                            declarative_stream.get_json_schema(),
410                            retriever,
411                            self.message_repository,
412                        ),
413                        perpartition_cursor,
414                    )
415
416                    concurrent_streams.append(
417                        DefaultStream(
418                            partition_generator=partition_generator,
419                            name=declarative_stream.name,
420                            json_schema=declarative_stream.get_json_schema(),
421                            availability_strategy=AlwaysAvailableAvailabilityStrategy(),
422                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
423                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
424                            logger=self.logger,
425                            cursor=perpartition_cursor,
426                            supports_file_transfer=supports_file_transfer,
427                        )
428                    )
429                else:
430                    synchronous_streams.append(declarative_stream)
431            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
432            # Condition below needs to ensure that concurrent support is not lost for sources that already support
433            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
434            elif (
435                isinstance(declarative_stream, AbstractStreamFacade)
436                and self.is_partially_declarative
437            ):
438                concurrent_streams.append(declarative_stream.get_underlying_stream())
439            else:
440                synchronous_streams.append(declarative_stream)
441
442        return concurrent_streams, synchronous_streams
443
444    def _is_concurrent_cursor_incremental_without_partition_routing(
445        self,
446        declarative_stream: DeclarativeStream,
447        incremental_sync_component_definition: Mapping[str, Any] | None,
448    ) -> bool:
449        return (
450            incremental_sync_component_definition is not None
451            and bool(incremental_sync_component_definition)
452            and (
453                incremental_sync_component_definition.get("type", "")
454                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
455            )
456            and hasattr(declarative_stream.retriever, "stream_slicer")
457            and (
458                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
459                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
460                # add isintance check here if we want to create a Declarative IncrementingCountCursor
461                # or isinstance(
462                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
463                # )
464                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
465            )
466        )
467
468    @staticmethod
469    def _get_retriever(
470        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
471    ) -> Retriever:
472        retriever = declarative_stream.retriever
473
474        # This is an optimization so that we don't invoke any cursor or state management flows within the
475        # low-code framework because state management is handled through the ConcurrentCursor.
476        if declarative_stream and isinstance(retriever, SimpleRetriever):
477            # Also a temporary hack. In the legacy Stream implementation, as part of the read,
478            # set_initial_state() is called to instantiate incoming state on the cursor. Although we no
479            # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
480            # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
481            # properly initialized with state.
482            if retriever.cursor:
483                retriever.cursor.set_initial_state(stream_state=stream_state)
484
485            # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
486            # from the one initialized on the SimpleRetriever, so it also must also have state initialized
487            # for semi-incremental streams using is_client_side_incremental to filter properly
488            if isinstance(retriever.record_selector, RecordSelector) and isinstance(
489                retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
490            ):
491                retriever.record_selector.record_filter._cursor.set_initial_state(
492                    stream_state=stream_state
493                )  # type: ignore  # After non-concurrent cursors are deprecated we can remove these cursor workarounds
494
495            # We zero it out here, but since this is a cursor reference, the state is still properly
496            # instantiated for the other components that reference it
497            retriever.cursor = None
498
499        return retriever
500
501    @staticmethod
502    def _select_streams(
503        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
504    ) -> List[AbstractStream]:
505        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
506        abstract_streams: List[AbstractStream] = []
507        for configured_stream in configured_catalog.streams:
508            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
509            if stream_instance:
510                abstract_streams.append(stream_instance)
511
512        return abstract_streams
513
514    @staticmethod
515    def _remove_concurrent_streams_from_catalog(
516        catalog: ConfiguredAirbyteCatalog,
517        concurrent_stream_names: set[str],
518    ) -> ConfiguredAirbyteCatalog:
519        return ConfiguredAirbyteCatalog(
520            streams=[
521                stream
522                for stream in catalog.streams
523                if stream.stream.name not in concurrent_stream_names
524            ]
525        )
526
527    @staticmethod
528    def _migrate_state(
529        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
530    ) -> MutableMapping[str, Any]:
531        for state_migration in declarative_stream.state_migrations:
532            if state_migration.should_migrate(stream_state):
533                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
534                stream_state = dict(state_migration.migrate(stream_state))
535
536        return stream_state

Declarative source defined by a manifest of low-code components that define source connector behavior

is_partially_declarative: bool
134    @property
135    def is_partially_declarative(self) -> bool:
136        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
137        return False

This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
179    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
180        concurrent_streams, synchronous_streams = self._group_streams(config=config)
181        return AirbyteCatalog(
182            streams=[
183                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
184            ]
185        )

Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.