airbyte_cdk.sources.declarative.concurrent_declarative_source

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6from typing import Any, Generic, Iterator, List, Mapping, MutableMapping, Optional, Tuple
  7
  8from airbyte_cdk.models import (
  9    AirbyteCatalog,
 10    AirbyteMessage,
 11    AirbyteStateMessage,
 12    ConfiguredAirbyteCatalog,
 13)
 14from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 15from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 16from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
 17from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
 18from airbyte_cdk.sources.declarative.extractors import RecordSelector
 19from airbyte_cdk.sources.declarative.extractors.record_filter import (
 20    ClientSideIncrementalRecordFilterDecorator,
 21)
 22from airbyte_cdk.sources.declarative.incremental import (
 23    ConcurrentPerPartitionCursor,
 24    GlobalSubstreamCursor,
 25)
 26from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
 27from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
 28    PerPartitionWithGlobalCursor,
 29)
 30from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
 31from airbyte_cdk.sources.declarative.models import FileUploader
 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 33    ConcurrencyLevel as ConcurrencyLevelModel,
 34)
 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 36    DatetimeBasedCursor as DatetimeBasedCursorModel,
 37)
 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 39    IncrementingCountCursor as IncrementingCountCursorModel,
 40)
 41from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 42    ModelToComponentFactory,
 43)
 44from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
 45from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever
 46from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
 47    DeclarativePartitionFactory,
 48    StreamSlicerPartitionGenerator,
 49)
 50from airbyte_cdk.sources.declarative.types import ConnectionDefinition
 51from airbyte_cdk.sources.source import TState
 52from airbyte_cdk.sources.streams import Stream
 53from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 54from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 55from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
 56from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 57from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
 58
 59
 60class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 61    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 62    # because it has hit the limit of futures but not partition reader is consuming them.
 63    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 64
 65    def __init__(
 66        self,
 67        catalog: Optional[ConfiguredAirbyteCatalog],
 68        config: Optional[Mapping[str, Any]],
 69        state: TState,
 70        source_config: ConnectionDefinition,
 71        debug: bool = False,
 72        emit_connector_builder_messages: bool = False,
 73        component_factory: Optional[ModelToComponentFactory] = None,
 74        config_path: Optional[str] = None,
 75        **kwargs: Any,
 76    ) -> None:
 77        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 78        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 79        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 80
 81        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 82        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 83        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 84        # incremental streams running in full refresh.
 85        component_factory = component_factory or ModelToComponentFactory(
 86            emit_connector_builder_messages=emit_connector_builder_messages,
 87            disable_resumable_full_refresh=True,
 88            connector_state_manager=self._connector_state_manager,
 89            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
 90        )
 91
 92        super().__init__(
 93            source_config=source_config,
 94            config=config,
 95            debug=debug,
 96            emit_connector_builder_messages=emit_connector_builder_messages,
 97            component_factory=component_factory,
 98            config_path=config_path,
 99        )
100
101        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
102        if concurrency_level_from_manifest:
103            concurrency_level_component = self._constructor.create_component(
104                model_type=ConcurrencyLevelModel,
105                component_definition=concurrency_level_from_manifest,
106                config=config or {},
107            )
108            if not isinstance(concurrency_level_component, ConcurrencyLevel):
109                raise ValueError(
110                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
111                )
112
113            concurrency_level = concurrency_level_component.get_concurrency_level()
114            initial_number_of_partitions_to_generate = max(
115                concurrency_level // 2, 1
116            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
117        else:
118            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
119            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
120
121        self._concurrent_source = ConcurrentSource.create(
122            num_workers=concurrency_level,
123            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
124            logger=self.logger,
125            slice_logger=self._slice_logger,
126            message_repository=self.message_repository,
127        )
128
129    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
130    @property
131    def is_partially_declarative(self) -> bool:
132        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
133        return False
134
135    def read(
136        self,
137        logger: logging.Logger,
138        config: Mapping[str, Any],
139        catalog: ConfiguredAirbyteCatalog,
140        state: Optional[List[AirbyteStateMessage]] = None,
141    ) -> Iterator[AirbyteMessage]:
142        concurrent_streams, _ = self._group_streams(config=config)
143
144        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
145        # the concurrent streams must be saved so that they can be removed from the catalog before starting
146        # synchronous streams
147        if len(concurrent_streams) > 0:
148            concurrent_stream_names = set(
149                [concurrent_stream.name for concurrent_stream in concurrent_streams]
150            )
151
152            selected_concurrent_streams = self._select_streams(
153                streams=concurrent_streams, configured_catalog=catalog
154            )
155            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
156            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
157            if selected_concurrent_streams:
158                yield from self._concurrent_source.read(selected_concurrent_streams)
159
160            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
161            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
162            # of which were already synced using the Concurrent CDK
163            filtered_catalog = self._remove_concurrent_streams_from_catalog(
164                catalog=catalog, concurrent_stream_names=concurrent_stream_names
165            )
166        else:
167            filtered_catalog = catalog
168
169        # It is no need run read for synchronous streams if they are not exists.
170        if not filtered_catalog.streams:
171            return
172
173        yield from super().read(logger, config, filtered_catalog, state)
174
175    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
176        concurrent_streams, synchronous_streams = self._group_streams(config=config)
177        return AirbyteCatalog(
178            streams=[
179                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
180            ]
181        )
182
183    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
184        """
185        The `streams` method is used as part of the AbstractSource in the following cases:
186        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
187        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
188        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
189
190        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
191        """
192        return super().streams(config)
193
194    def _group_streams(
195        self, config: Mapping[str, Any]
196    ) -> Tuple[List[AbstractStream], List[Stream]]:
197        concurrent_streams: List[AbstractStream] = []
198        synchronous_streams: List[Stream] = []
199
200        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
201        # and this is validated during the initialization of the source.
202        streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs(
203            self._source_config, config
204        )
205
206        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
207
208        for declarative_stream in self.streams(config=config):
209            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
210            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
211            # so we need to treat them as synchronous
212
213            supports_file_transfer = (
214                isinstance(declarative_stream, DeclarativeStream)
215                and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
216            )
217
218            if (
219                isinstance(declarative_stream, DeclarativeStream)
220                and name_to_stream_mapping[declarative_stream.name]["type"]
221                == "StateDelegatingStream"
222            ):
223                stream_state = self._connector_state_manager.get_stream_state(
224                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
225                )
226
227                name_to_stream_mapping[declarative_stream.name] = (
228                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
229                    if stream_state
230                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
231                )
232
233            if isinstance(declarative_stream, DeclarativeStream) and (
234                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
235                == "SimpleRetriever"
236                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
237                == "AsyncRetriever"
238            ):
239                incremental_sync_component_definition = name_to_stream_mapping[
240                    declarative_stream.name
241                ].get("incremental_sync")
242
243                partition_router_component_definition = (
244                    name_to_stream_mapping[declarative_stream.name]
245                    .get("retriever", {})
246                    .get("partition_router")
247                )
248                is_without_partition_router_or_cursor = not bool(
249                    incremental_sync_component_definition
250                ) and not bool(partition_router_component_definition)
251
252                is_substream_without_incremental = (
253                    partition_router_component_definition
254                    and not incremental_sync_component_definition
255                )
256
257                if self._is_concurrent_cursor_incremental_without_partition_routing(
258                    declarative_stream, incremental_sync_component_definition
259                ):
260                    stream_state = self._connector_state_manager.get_stream_state(
261                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
262                    )
263                    stream_state = self._migrate_state(declarative_stream, stream_state)
264
265                    retriever = self._get_retriever(declarative_stream, stream_state)
266
267                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
268                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
269                    ):
270                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
271
272                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
273                            # This should never happen since we instantiate ConcurrentCursor in
274                            # model_to_component_factory.py
275                            raise ValueError(
276                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
277                            )
278
279                        partition_generator = StreamSlicerPartitionGenerator(
280                            partition_factory=DeclarativePartitionFactory(
281                                declarative_stream.name,
282                                declarative_stream.get_json_schema(),
283                                retriever,
284                                self.message_repository,
285                            ),
286                            stream_slicer=declarative_stream.retriever.stream_slicer,
287                        )
288                    else:
289                        if (
290                            incremental_sync_component_definition
291                            and incremental_sync_component_definition.get("type")
292                            == IncrementingCountCursorModel.__name__
293                        ):
294                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
295                                model_type=IncrementingCountCursorModel,
296                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
297                                stream_name=declarative_stream.name,
298                                stream_namespace=declarative_stream.namespace,
299                                config=config or {},
300                            )
301                        else:
302                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
303                                model_type=DatetimeBasedCursorModel,
304                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
305                                stream_name=declarative_stream.name,
306                                stream_namespace=declarative_stream.namespace,
307                                config=config or {},
308                                stream_state_migrations=declarative_stream.state_migrations,
309                            )
310                        partition_generator = StreamSlicerPartitionGenerator(
311                            partition_factory=DeclarativePartitionFactory(
312                                declarative_stream.name,
313                                declarative_stream.get_json_schema(),
314                                retriever,
315                                self.message_repository,
316                            ),
317                            stream_slicer=cursor,
318                        )
319
320                    concurrent_streams.append(
321                        DefaultStream(
322                            partition_generator=partition_generator,
323                            name=declarative_stream.name,
324                            json_schema=declarative_stream.get_json_schema(),
325                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
326                            cursor_field=cursor.cursor_field.cursor_field_key
327                            if hasattr(cursor, "cursor_field")
328                            and hasattr(
329                                cursor.cursor_field, "cursor_field_key"
330                            )  # FIXME this will need to be updated once we do the per partition
331                            else None,
332                            logger=self.logger,
333                            cursor=cursor,
334                            supports_file_transfer=supports_file_transfer,
335                        )
336                    )
337                elif (
338                    is_substream_without_incremental or is_without_partition_router_or_cursor
339                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
340                    partition_generator = StreamSlicerPartitionGenerator(
341                        DeclarativePartitionFactory(
342                            declarative_stream.name,
343                            declarative_stream.get_json_schema(),
344                            declarative_stream.retriever,
345                            self.message_repository,
346                        ),
347                        declarative_stream.retriever.stream_slicer,
348                    )
349
350                    final_state_cursor = FinalStateCursor(
351                        stream_name=declarative_stream.name,
352                        stream_namespace=declarative_stream.namespace,
353                        message_repository=self.message_repository,
354                    )
355
356                    concurrent_streams.append(
357                        DefaultStream(
358                            partition_generator=partition_generator,
359                            name=declarative_stream.name,
360                            json_schema=declarative_stream.get_json_schema(),
361                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
362                            cursor_field=None,
363                            logger=self.logger,
364                            cursor=final_state_cursor,
365                            supports_file_transfer=supports_file_transfer,
366                        )
367                    )
368                elif (
369                    incremental_sync_component_definition
370                    and incremental_sync_component_definition.get("type", "")
371                    == DatetimeBasedCursorModel.__name__
372                    and hasattr(declarative_stream.retriever, "stream_slicer")
373                    and isinstance(
374                        declarative_stream.retriever.stream_slicer,
375                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
376                    )
377                ):
378                    stream_state = self._connector_state_manager.get_stream_state(
379                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
380                    )
381                    stream_state = self._migrate_state(declarative_stream, stream_state)
382
383                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
384
385                    perpartition_cursor = (
386                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
387                            state_manager=self._connector_state_manager,
388                            model_type=DatetimeBasedCursorModel,
389                            component_definition=incremental_sync_component_definition,
390                            stream_name=declarative_stream.name,
391                            stream_namespace=declarative_stream.namespace,
392                            config=config or {},
393                            stream_state=stream_state,
394                            partition_router=partition_router,
395                        )
396                    )
397
398                    retriever = self._get_retriever(declarative_stream, stream_state)
399
400                    partition_generator = StreamSlicerPartitionGenerator(
401                        DeclarativePartitionFactory(
402                            declarative_stream.name,
403                            declarative_stream.get_json_schema(),
404                            retriever,
405                            self.message_repository,
406                        ),
407                        perpartition_cursor,
408                    )
409
410                    concurrent_streams.append(
411                        DefaultStream(
412                            partition_generator=partition_generator,
413                            name=declarative_stream.name,
414                            json_schema=declarative_stream.get_json_schema(),
415                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
416                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
417                            logger=self.logger,
418                            cursor=perpartition_cursor,
419                            supports_file_transfer=supports_file_transfer,
420                        )
421                    )
422                else:
423                    synchronous_streams.append(declarative_stream)
424            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
425            # Condition below needs to ensure that concurrent support is not lost for sources that already support
426            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
427            elif (
428                isinstance(declarative_stream, AbstractStreamFacade)
429                and self.is_partially_declarative
430            ):
431                concurrent_streams.append(declarative_stream.get_underlying_stream())
432            else:
433                synchronous_streams.append(declarative_stream)
434
435        return concurrent_streams, synchronous_streams
436
437    def _is_concurrent_cursor_incremental_without_partition_routing(
438        self,
439        declarative_stream: DeclarativeStream,
440        incremental_sync_component_definition: Mapping[str, Any] | None,
441    ) -> bool:
442        return (
443            incremental_sync_component_definition is not None
444            and bool(incremental_sync_component_definition)
445            and (
446                incremental_sync_component_definition.get("type", "")
447                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
448            )
449            and hasattr(declarative_stream.retriever, "stream_slicer")
450            and (
451                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
452                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
453                # add isintance check here if we want to create a Declarative IncrementingCountCursor
454                # or isinstance(
455                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
456                # )
457                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
458            )
459        )
460
461    @staticmethod
462    def _get_retriever(
463        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
464    ) -> Retriever:
465        if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever):
466            # We zero it out here, but since this is a cursor reference, the state is still properly
467            # instantiated for the other components that reference it
468            declarative_stream.retriever.cursor = None
469        return declarative_stream.retriever
470
471    @staticmethod
472    def _select_streams(
473        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
474    ) -> List[AbstractStream]:
475        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
476        abstract_streams: List[AbstractStream] = []
477        for configured_stream in configured_catalog.streams:
478            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
479            if stream_instance:
480                abstract_streams.append(stream_instance)
481
482        return abstract_streams
483
484    @staticmethod
485    def _remove_concurrent_streams_from_catalog(
486        catalog: ConfiguredAirbyteCatalog,
487        concurrent_stream_names: set[str],
488    ) -> ConfiguredAirbyteCatalog:
489        return ConfiguredAirbyteCatalog(
490            streams=[
491                stream
492                for stream in catalog.streams
493                if stream.stream.name not in concurrent_stream_names
494            ]
495        )
496
497    @staticmethod
498    def _migrate_state(
499        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
500    ) -> MutableMapping[str, Any]:
501        for state_migration in declarative_stream.state_migrations:
502            if state_migration.should_migrate(stream_state):
503                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
504                stream_state = dict(state_migration.migrate(stream_state))
505
506        return stream_state
class ConcurrentDeclarativeSource(airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource, typing.Generic[~TState]):
 61class ConcurrentDeclarativeSource(ManifestDeclarativeSource, Generic[TState]):
 62    # By default, we defer to a value of 2. A value lower than than could cause a PartitionEnqueuer to be stuck in a state of deadlock
 63    # because it has hit the limit of futures but not partition reader is consuming them.
 64    _LOWEST_SAFE_CONCURRENCY_LEVEL = 2
 65
 66    def __init__(
 67        self,
 68        catalog: Optional[ConfiguredAirbyteCatalog],
 69        config: Optional[Mapping[str, Any]],
 70        state: TState,
 71        source_config: ConnectionDefinition,
 72        debug: bool = False,
 73        emit_connector_builder_messages: bool = False,
 74        component_factory: Optional[ModelToComponentFactory] = None,
 75        config_path: Optional[str] = None,
 76        **kwargs: Any,
 77    ) -> None:
 78        # todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
 79        #  no longer needs to store the original incoming state. But maybe there's an edge case?
 80        self._connector_state_manager = ConnectorStateManager(state=state)  # type: ignore  # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later
 81
 82        # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
 83        # cursors. We do this by no longer automatically instantiating RFR cursors when converting
 84        # the declarative models into runtime components. Concurrent sources will continue to checkpoint
 85        # incremental streams running in full refresh.
 86        component_factory = component_factory or ModelToComponentFactory(
 87            emit_connector_builder_messages=emit_connector_builder_messages,
 88            disable_resumable_full_refresh=True,
 89            connector_state_manager=self._connector_state_manager,
 90            max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
 91        )
 92
 93        super().__init__(
 94            source_config=source_config,
 95            config=config,
 96            debug=debug,
 97            emit_connector_builder_messages=emit_connector_builder_messages,
 98            component_factory=component_factory,
 99            config_path=config_path,
100        )
101
102        concurrency_level_from_manifest = self._source_config.get("concurrency_level")
103        if concurrency_level_from_manifest:
104            concurrency_level_component = self._constructor.create_component(
105                model_type=ConcurrencyLevelModel,
106                component_definition=concurrency_level_from_manifest,
107                config=config or {},
108            )
109            if not isinstance(concurrency_level_component, ConcurrencyLevel):
110                raise ValueError(
111                    f"Expected to generate a ConcurrencyLevel component, but received {concurrency_level_component.__class__}"
112                )
113
114            concurrency_level = concurrency_level_component.get_concurrency_level()
115            initial_number_of_partitions_to_generate = max(
116                concurrency_level // 2, 1
117            )  # Partition_generation iterates using range based on this value. If this is floored to zero we end up in a dead lock during start up
118        else:
119            concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL
120            initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2
121
122        self._concurrent_source = ConcurrentSource.create(
123            num_workers=concurrency_level,
124            initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate,
125            logger=self.logger,
126            slice_logger=self._slice_logger,
127            message_repository=self.message_repository,
128        )
129
130    # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state.
131    @property
132    def is_partially_declarative(self) -> bool:
133        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
134        return False
135
136    def read(
137        self,
138        logger: logging.Logger,
139        config: Mapping[str, Any],
140        catalog: ConfiguredAirbyteCatalog,
141        state: Optional[List[AirbyteStateMessage]] = None,
142    ) -> Iterator[AirbyteMessage]:
143        concurrent_streams, _ = self._group_streams(config=config)
144
145        # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
146        # the concurrent streams must be saved so that they can be removed from the catalog before starting
147        # synchronous streams
148        if len(concurrent_streams) > 0:
149            concurrent_stream_names = set(
150                [concurrent_stream.name for concurrent_stream in concurrent_streams]
151            )
152
153            selected_concurrent_streams = self._select_streams(
154                streams=concurrent_streams, configured_catalog=catalog
155            )
156            # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
157            # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
158            if selected_concurrent_streams:
159                yield from self._concurrent_source.read(selected_concurrent_streams)
160
161            # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the
162            # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many
163            # of which were already synced using the Concurrent CDK
164            filtered_catalog = self._remove_concurrent_streams_from_catalog(
165                catalog=catalog, concurrent_stream_names=concurrent_stream_names
166            )
167        else:
168            filtered_catalog = catalog
169
170        # It is no need run read for synchronous streams if they are not exists.
171        if not filtered_catalog.streams:
172            return
173
174        yield from super().read(logger, config, filtered_catalog, state)
175
176    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
177        concurrent_streams, synchronous_streams = self._group_streams(config=config)
178        return AirbyteCatalog(
179            streams=[
180                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
181            ]
182        )
183
184    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
185        """
186        The `streams` method is used as part of the AbstractSource in the following cases:
187        * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams
188        * ConcurrentDeclarativeSource.read -> AbstractSource.read -> streams (note that we filter for a specific catalog which excludes concurrent streams so not all streams actually read from all the streams returned by `streams`)
189        Note that `super.streams(config)` is also called when splitting the streams between concurrent or not in `_group_streams`.
190
191        In both case, we will assume that calling the DeclarativeStream is perfectly fine as the result for these is the same regardless of if it is a DeclarativeStream or a DefaultStream (concurrent). This should simply be removed once we have moved away from the mentioned code paths above.
192        """
193        return super().streams(config)
194
195    def _group_streams(
196        self, config: Mapping[str, Any]
197    ) -> Tuple[List[AbstractStream], List[Stream]]:
198        concurrent_streams: List[AbstractStream] = []
199        synchronous_streams: List[Stream] = []
200
201        # Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
202        # and this is validated during the initialization of the source.
203        streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs(
204            self._source_config, config
205        )
206
207        name_to_stream_mapping = {stream["name"]: stream for stream in streams}
208
209        for declarative_stream in self.streams(config=config):
210            # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
211            # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
212            # so we need to treat them as synchronous
213
214            supports_file_transfer = (
215                isinstance(declarative_stream, DeclarativeStream)
216                and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
217            )
218
219            if (
220                isinstance(declarative_stream, DeclarativeStream)
221                and name_to_stream_mapping[declarative_stream.name]["type"]
222                == "StateDelegatingStream"
223            ):
224                stream_state = self._connector_state_manager.get_stream_state(
225                    stream_name=declarative_stream.name, namespace=declarative_stream.namespace
226                )
227
228                name_to_stream_mapping[declarative_stream.name] = (
229                    name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
230                    if stream_state
231                    else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
232                )
233
234            if isinstance(declarative_stream, DeclarativeStream) and (
235                name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
236                == "SimpleRetriever"
237                or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
238                == "AsyncRetriever"
239            ):
240                incremental_sync_component_definition = name_to_stream_mapping[
241                    declarative_stream.name
242                ].get("incremental_sync")
243
244                partition_router_component_definition = (
245                    name_to_stream_mapping[declarative_stream.name]
246                    .get("retriever", {})
247                    .get("partition_router")
248                )
249                is_without_partition_router_or_cursor = not bool(
250                    incremental_sync_component_definition
251                ) and not bool(partition_router_component_definition)
252
253                is_substream_without_incremental = (
254                    partition_router_component_definition
255                    and not incremental_sync_component_definition
256                )
257
258                if self._is_concurrent_cursor_incremental_without_partition_routing(
259                    declarative_stream, incremental_sync_component_definition
260                ):
261                    stream_state = self._connector_state_manager.get_stream_state(
262                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
263                    )
264                    stream_state = self._migrate_state(declarative_stream, stream_state)
265
266                    retriever = self._get_retriever(declarative_stream, stream_state)
267
268                    if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance(
269                        declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter
270                    ):
271                        cursor = declarative_stream.retriever.stream_slicer.stream_slicer
272
273                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
274                            # This should never happen since we instantiate ConcurrentCursor in
275                            # model_to_component_factory.py
276                            raise ValueError(
277                                f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
278                            )
279
280                        partition_generator = StreamSlicerPartitionGenerator(
281                            partition_factory=DeclarativePartitionFactory(
282                                declarative_stream.name,
283                                declarative_stream.get_json_schema(),
284                                retriever,
285                                self.message_repository,
286                            ),
287                            stream_slicer=declarative_stream.retriever.stream_slicer,
288                        )
289                    else:
290                        if (
291                            incremental_sync_component_definition
292                            and incremental_sync_component_definition.get("type")
293                            == IncrementingCountCursorModel.__name__
294                        ):
295                            cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
296                                model_type=IncrementingCountCursorModel,
297                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
298                                stream_name=declarative_stream.name,
299                                stream_namespace=declarative_stream.namespace,
300                                config=config or {},
301                            )
302                        else:
303                            cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
304                                model_type=DatetimeBasedCursorModel,
305                                component_definition=incremental_sync_component_definition,  # type: ignore  # Not None because of the if condition above
306                                stream_name=declarative_stream.name,
307                                stream_namespace=declarative_stream.namespace,
308                                config=config or {},
309                                stream_state_migrations=declarative_stream.state_migrations,
310                            )
311                        partition_generator = StreamSlicerPartitionGenerator(
312                            partition_factory=DeclarativePartitionFactory(
313                                declarative_stream.name,
314                                declarative_stream.get_json_schema(),
315                                retriever,
316                                self.message_repository,
317                            ),
318                            stream_slicer=cursor,
319                        )
320
321                    concurrent_streams.append(
322                        DefaultStream(
323                            partition_generator=partition_generator,
324                            name=declarative_stream.name,
325                            json_schema=declarative_stream.get_json_schema(),
326                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
327                            cursor_field=cursor.cursor_field.cursor_field_key
328                            if hasattr(cursor, "cursor_field")
329                            and hasattr(
330                                cursor.cursor_field, "cursor_field_key"
331                            )  # FIXME this will need to be updated once we do the per partition
332                            else None,
333                            logger=self.logger,
334                            cursor=cursor,
335                            supports_file_transfer=supports_file_transfer,
336                        )
337                    )
338                elif (
339                    is_substream_without_incremental or is_without_partition_router_or_cursor
340                ) and hasattr(declarative_stream.retriever, "stream_slicer"):
341                    partition_generator = StreamSlicerPartitionGenerator(
342                        DeclarativePartitionFactory(
343                            declarative_stream.name,
344                            declarative_stream.get_json_schema(),
345                            declarative_stream.retriever,
346                            self.message_repository,
347                        ),
348                        declarative_stream.retriever.stream_slicer,
349                    )
350
351                    final_state_cursor = FinalStateCursor(
352                        stream_name=declarative_stream.name,
353                        stream_namespace=declarative_stream.namespace,
354                        message_repository=self.message_repository,
355                    )
356
357                    concurrent_streams.append(
358                        DefaultStream(
359                            partition_generator=partition_generator,
360                            name=declarative_stream.name,
361                            json_schema=declarative_stream.get_json_schema(),
362                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
363                            cursor_field=None,
364                            logger=self.logger,
365                            cursor=final_state_cursor,
366                            supports_file_transfer=supports_file_transfer,
367                        )
368                    )
369                elif (
370                    incremental_sync_component_definition
371                    and incremental_sync_component_definition.get("type", "")
372                    == DatetimeBasedCursorModel.__name__
373                    and hasattr(declarative_stream.retriever, "stream_slicer")
374                    and isinstance(
375                        declarative_stream.retriever.stream_slicer,
376                        (GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
377                    )
378                ):
379                    stream_state = self._connector_state_manager.get_stream_state(
380                        stream_name=declarative_stream.name, namespace=declarative_stream.namespace
381                    )
382                    stream_state = self._migrate_state(declarative_stream, stream_state)
383
384                    partition_router = declarative_stream.retriever.stream_slicer._partition_router
385
386                    perpartition_cursor = (
387                        self._constructor.create_concurrent_cursor_from_perpartition_cursor(
388                            state_manager=self._connector_state_manager,
389                            model_type=DatetimeBasedCursorModel,
390                            component_definition=incremental_sync_component_definition,
391                            stream_name=declarative_stream.name,
392                            stream_namespace=declarative_stream.namespace,
393                            config=config or {},
394                            stream_state=stream_state,
395                            partition_router=partition_router,
396                        )
397                    )
398
399                    retriever = self._get_retriever(declarative_stream, stream_state)
400
401                    partition_generator = StreamSlicerPartitionGenerator(
402                        DeclarativePartitionFactory(
403                            declarative_stream.name,
404                            declarative_stream.get_json_schema(),
405                            retriever,
406                            self.message_repository,
407                        ),
408                        perpartition_cursor,
409                    )
410
411                    concurrent_streams.append(
412                        DefaultStream(
413                            partition_generator=partition_generator,
414                            name=declarative_stream.name,
415                            json_schema=declarative_stream.get_json_schema(),
416                            primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
417                            cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
418                            logger=self.logger,
419                            cursor=perpartition_cursor,
420                            supports_file_transfer=supports_file_transfer,
421                        )
422                    )
423                else:
424                    synchronous_streams.append(declarative_stream)
425            # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state.
426            # Condition below needs to ensure that concurrent support is not lost for sources that already support
427            # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe).
428            elif (
429                isinstance(declarative_stream, AbstractStreamFacade)
430                and self.is_partially_declarative
431            ):
432                concurrent_streams.append(declarative_stream.get_underlying_stream())
433            else:
434                synchronous_streams.append(declarative_stream)
435
436        return concurrent_streams, synchronous_streams
437
438    def _is_concurrent_cursor_incremental_without_partition_routing(
439        self,
440        declarative_stream: DeclarativeStream,
441        incremental_sync_component_definition: Mapping[str, Any] | None,
442    ) -> bool:
443        return (
444            incremental_sync_component_definition is not None
445            and bool(incremental_sync_component_definition)
446            and (
447                incremental_sync_component_definition.get("type", "")
448                in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
449            )
450            and hasattr(declarative_stream.retriever, "stream_slicer")
451            and (
452                isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
453                # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
454                # add isintance check here if we want to create a Declarative IncrementingCountCursor
455                # or isinstance(
456                #     declarative_stream.retriever.stream_slicer, IncrementingCountCursor
457                # )
458                or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
459            )
460        )
461
462    @staticmethod
463    def _get_retriever(
464        declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
465    ) -> Retriever:
466        if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever):
467            # We zero it out here, but since this is a cursor reference, the state is still properly
468            # instantiated for the other components that reference it
469            declarative_stream.retriever.cursor = None
470        return declarative_stream.retriever
471
472    @staticmethod
473    def _select_streams(
474        streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
475    ) -> List[AbstractStream]:
476        stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
477        abstract_streams: List[AbstractStream] = []
478        for configured_stream in configured_catalog.streams:
479            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
480            if stream_instance:
481                abstract_streams.append(stream_instance)
482
483        return abstract_streams
484
485    @staticmethod
486    def _remove_concurrent_streams_from_catalog(
487        catalog: ConfiguredAirbyteCatalog,
488        concurrent_stream_names: set[str],
489    ) -> ConfiguredAirbyteCatalog:
490        return ConfiguredAirbyteCatalog(
491            streams=[
492                stream
493                for stream in catalog.streams
494                if stream.stream.name not in concurrent_stream_names
495            ]
496        )
497
498    @staticmethod
499    def _migrate_state(
500        declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any]
501    ) -> MutableMapping[str, Any]:
502        for state_migration in declarative_stream.state_migrations:
503            if state_migration.should_migrate(stream_state):
504                # The state variable is expected to be mutable but the migrate method returns an immutable mapping.
505                stream_state = dict(state_migration.migrate(stream_state))
506
507        return stream_state

Declarative source defined by a manifest of low-code components that define source connector behavior

is_partially_declarative: bool
131    @property
132    def is_partially_declarative(self) -> bool:
133        """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams."""
134        return False

This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
176    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
177        concurrent_streams, synchronous_streams = self._group_streams(config=config)
178        return AirbyteCatalog(
179            streams=[
180                stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
181            ]
182        )

Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.