airbyte_cdk.sources.concurrent_source.concurrent_source_adapter

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6from abc import ABC
  7from datetime import timedelta
  8from typing import Any, Callable, Iterator, List, Mapping, MutableMapping, Optional, Tuple
  9
 10from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
 11from airbyte_cdk.sources import AbstractSource
 12from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
 13from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 14from airbyte_cdk.sources.streams import Stream
 15from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 16from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 17from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
 18from airbyte_cdk.sources.streams.concurrent.cursor import (
 19    ConcurrentCursor,
 20    Cursor,
 21    CursorField,
 22    CursorValueType,
 23    FinalStateCursor,
 24    GapType,
 25)
 26from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
 27    AbstractStreamStateConverter,
 28)
 29
 30DEFAULT_LOOKBACK_SECONDS = 0
 31
 32
 33class ConcurrentSourceAdapter(AbstractSource, ABC):
 34    def __init__(self, concurrent_source: ConcurrentSource, **kwargs: Any) -> None:
 35        """
 36        ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.
 37
 38        The source's streams are still defined through the streams() method.
 39        Streams wrapped in a StreamFacade will be processed concurrently.
 40        Other streams will be processed sequentially as a later step.
 41        """
 42        self._concurrent_source = concurrent_source
 43        super().__init__(**kwargs)
 44
 45    def read(
 46        self,
 47        logger: logging.Logger,
 48        config: Mapping[str, Any],
 49        catalog: ConfiguredAirbyteCatalog,
 50        state: Optional[List[AirbyteStateMessage]] = None,
 51    ) -> Iterator[AirbyteMessage]:
 52        abstract_streams = self._select_abstract_streams(config, catalog)
 53        concurrent_stream_names = {stream.name for stream in abstract_streams}
 54        configured_catalog_for_regular_streams = ConfiguredAirbyteCatalog(
 55            streams=[
 56                stream
 57                for stream in catalog.streams
 58                if stream.stream.name not in concurrent_stream_names
 59            ]
 60        )
 61        if abstract_streams:
 62            yield from self._concurrent_source.read(abstract_streams)
 63        if configured_catalog_for_regular_streams.streams:
 64            yield from super().read(logger, config, configured_catalog_for_regular_streams, state)
 65
 66    def _select_abstract_streams(
 67        self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog
 68    ) -> List[AbstractStream]:
 69        """
 70        Selects streams that can be processed concurrently and returns their abstract representations.
 71        """
 72        all_streams = self.streams(config)
 73        stream_name_to_instance: Mapping[str, Stream] = {s.name: s for s in all_streams}
 74        abstract_streams: List[AbstractStream] = []
 75        for configured_stream in configured_catalog.streams:
 76            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
 77            if not stream_instance:
 78                continue
 79
 80            if isinstance(stream_instance, AbstractStreamFacade):
 81                abstract_streams.append(stream_instance.get_underlying_stream())
 82        return abstract_streams
 83
 84    def convert_to_concurrent_stream(
 85        self,
 86        logger: logging.Logger,
 87        stream: Stream,
 88        state_manager: ConnectorStateManager,
 89        cursor: Optional[Cursor] = None,
 90    ) -> Stream:
 91        """
 92        Prepares a stream for concurrent processing by initializing or assigning a cursor,
 93        managing the stream's state, and returning an updated Stream instance.
 94        """
 95        state: MutableMapping[str, Any] = {}
 96
 97        if cursor:
 98            state = state_manager.get_stream_state(stream.name, stream.namespace)
 99
100            stream.cursor = cursor  # type: ignore[assignment]  # cursor is of type ConcurrentCursor, which inherits from Cursor
101            if hasattr(stream, "parent"):
102                stream.parent.cursor = cursor
103        else:
104            cursor = FinalStateCursor(
105                stream_name=stream.name,
106                stream_namespace=stream.namespace,
107                message_repository=self.message_repository,  # type: ignore[arg-type]  # _default_message_repository will be returned in the worst case
108            )
109        return StreamFacade.create_from_stream(stream, self, logger, state, cursor)
110
111    def initialize_cursor(
112        self,
113        stream: Stream,
114        state_manager: ConnectorStateManager,
115        converter: AbstractStreamStateConverter,
116        slice_boundary_fields: Optional[Tuple[str, str]],
117        start: Optional[CursorValueType],
118        end_provider: Callable[[], CursorValueType],
119        lookback_window: Optional[GapType] = None,
120        slice_range: Optional[GapType] = None,
121    ) -> Optional[ConcurrentCursor]:
122        lookback_window = lookback_window or timedelta(seconds=DEFAULT_LOOKBACK_SECONDS)
123
124        cursor_field_name = stream.cursor_field
125
126        if cursor_field_name:
127            if not isinstance(cursor_field_name, str):
128                raise ValueError(
129                    f"Cursor field type must be a string, but received {type(cursor_field_name).__name__}."
130                )
131
132            return ConcurrentCursor(
133                stream.name,
134                stream.namespace,
135                state_manager.get_stream_state(stream.name, stream.namespace),
136                self.message_repository,  # type: ignore[arg-type]  # _default_message_repository will be returned in the worst case
137                state_manager,
138                converter,
139                CursorField(cursor_field_name),
140                slice_boundary_fields,
141                start,
142                end_provider,
143                lookback_window,
144                slice_range,
145            )
146
147        return None
DEFAULT_LOOKBACK_SECONDS = 0
 34class ConcurrentSourceAdapter(AbstractSource, ABC):
 35    def __init__(self, concurrent_source: ConcurrentSource, **kwargs: Any) -> None:
 36        """
 37        ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.
 38
 39        The source's streams are still defined through the streams() method.
 40        Streams wrapped in a StreamFacade will be processed concurrently.
 41        Other streams will be processed sequentially as a later step.
 42        """
 43        self._concurrent_source = concurrent_source
 44        super().__init__(**kwargs)
 45
 46    def read(
 47        self,
 48        logger: logging.Logger,
 49        config: Mapping[str, Any],
 50        catalog: ConfiguredAirbyteCatalog,
 51        state: Optional[List[AirbyteStateMessage]] = None,
 52    ) -> Iterator[AirbyteMessage]:
 53        abstract_streams = self._select_abstract_streams(config, catalog)
 54        concurrent_stream_names = {stream.name for stream in abstract_streams}
 55        configured_catalog_for_regular_streams = ConfiguredAirbyteCatalog(
 56            streams=[
 57                stream
 58                for stream in catalog.streams
 59                if stream.stream.name not in concurrent_stream_names
 60            ]
 61        )
 62        if abstract_streams:
 63            yield from self._concurrent_source.read(abstract_streams)
 64        if configured_catalog_for_regular_streams.streams:
 65            yield from super().read(logger, config, configured_catalog_for_regular_streams, state)
 66
 67    def _select_abstract_streams(
 68        self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog
 69    ) -> List[AbstractStream]:
 70        """
 71        Selects streams that can be processed concurrently and returns their abstract representations.
 72        """
 73        all_streams = self.streams(config)
 74        stream_name_to_instance: Mapping[str, Stream] = {s.name: s for s in all_streams}
 75        abstract_streams: List[AbstractStream] = []
 76        for configured_stream in configured_catalog.streams:
 77            stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
 78            if not stream_instance:
 79                continue
 80
 81            if isinstance(stream_instance, AbstractStreamFacade):
 82                abstract_streams.append(stream_instance.get_underlying_stream())
 83        return abstract_streams
 84
 85    def convert_to_concurrent_stream(
 86        self,
 87        logger: logging.Logger,
 88        stream: Stream,
 89        state_manager: ConnectorStateManager,
 90        cursor: Optional[Cursor] = None,
 91    ) -> Stream:
 92        """
 93        Prepares a stream for concurrent processing by initializing or assigning a cursor,
 94        managing the stream's state, and returning an updated Stream instance.
 95        """
 96        state: MutableMapping[str, Any] = {}
 97
 98        if cursor:
 99            state = state_manager.get_stream_state(stream.name, stream.namespace)
100
101            stream.cursor = cursor  # type: ignore[assignment]  # cursor is of type ConcurrentCursor, which inherits from Cursor
102            if hasattr(stream, "parent"):
103                stream.parent.cursor = cursor
104        else:
105            cursor = FinalStateCursor(
106                stream_name=stream.name,
107                stream_namespace=stream.namespace,
108                message_repository=self.message_repository,  # type: ignore[arg-type]  # _default_message_repository will be returned in the worst case
109            )
110        return StreamFacade.create_from_stream(stream, self, logger, state, cursor)
111
112    def initialize_cursor(
113        self,
114        stream: Stream,
115        state_manager: ConnectorStateManager,
116        converter: AbstractStreamStateConverter,
117        slice_boundary_fields: Optional[Tuple[str, str]],
118        start: Optional[CursorValueType],
119        end_provider: Callable[[], CursorValueType],
120        lookback_window: Optional[GapType] = None,
121        slice_range: Optional[GapType] = None,
122    ) -> Optional[ConcurrentCursor]:
123        lookback_window = lookback_window or timedelta(seconds=DEFAULT_LOOKBACK_SECONDS)
124
125        cursor_field_name = stream.cursor_field
126
127        if cursor_field_name:
128            if not isinstance(cursor_field_name, str):
129                raise ValueError(
130                    f"Cursor field type must be a string, but received {type(cursor_field_name).__name__}."
131                )
132
133            return ConcurrentCursor(
134                stream.name,
135                stream.namespace,
136                state_manager.get_stream_state(stream.name, stream.namespace),
137                self.message_repository,  # type: ignore[arg-type]  # _default_message_repository will be returned in the worst case
138                state_manager,
139                converter,
140                CursorField(cursor_field_name),
141                slice_boundary_fields,
142                start,
143                end_provider,
144                lookback_window,
145                slice_range,
146            )
147
148        return None

Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.

ConcurrentSourceAdapter( concurrent_source: airbyte_cdk.ConcurrentSource, **kwargs: Any)
35    def __init__(self, concurrent_source: ConcurrentSource, **kwargs: Any) -> None:
36        """
37        ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.
38
39        The source's streams are still defined through the streams() method.
40        Streams wrapped in a StreamFacade will be processed concurrently.
41        Other streams will be processed sequentially as a later step.
42        """
43        self._concurrent_source = concurrent_source
44        super().__init__(**kwargs)

ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.

The source's streams are still defined through the streams() method. Streams wrapped in a StreamFacade will be processed concurrently. Other streams will be processed sequentially as a later step.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
46    def read(
47        self,
48        logger: logging.Logger,
49        config: Mapping[str, Any],
50        catalog: ConfiguredAirbyteCatalog,
51        state: Optional[List[AirbyteStateMessage]] = None,
52    ) -> Iterator[AirbyteMessage]:
53        abstract_streams = self._select_abstract_streams(config, catalog)
54        concurrent_stream_names = {stream.name for stream in abstract_streams}
55        configured_catalog_for_regular_streams = ConfiguredAirbyteCatalog(
56            streams=[
57                stream
58                for stream in catalog.streams
59                if stream.stream.name not in concurrent_stream_names
60            ]
61        )
62        if abstract_streams:
63            yield from self._concurrent_source.read(abstract_streams)
64        if configured_catalog_for_regular_streams.streams:
65            yield from super().read(logger, config, configured_catalog_for_regular_streams, state)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.

def convert_to_concurrent_stream( self, logger: logging.Logger, stream: airbyte_cdk.Stream, state_manager: airbyte_cdk.ConnectorStateManager, cursor: Optional[airbyte_cdk.Cursor] = None) -> airbyte_cdk.Stream:
 85    def convert_to_concurrent_stream(
 86        self,
 87        logger: logging.Logger,
 88        stream: Stream,
 89        state_manager: ConnectorStateManager,
 90        cursor: Optional[Cursor] = None,
 91    ) -> Stream:
 92        """
 93        Prepares a stream for concurrent processing by initializing or assigning a cursor,
 94        managing the stream's state, and returning an updated Stream instance.
 95        """
 96        state: MutableMapping[str, Any] = {}
 97
 98        if cursor:
 99            state = state_manager.get_stream_state(stream.name, stream.namespace)
100
101            stream.cursor = cursor  # type: ignore[assignment]  # cursor is of type ConcurrentCursor, which inherits from Cursor
102            if hasattr(stream, "parent"):
103                stream.parent.cursor = cursor
104        else:
105            cursor = FinalStateCursor(
106                stream_name=stream.name,
107                stream_namespace=stream.namespace,
108                message_repository=self.message_repository,  # type: ignore[arg-type]  # _default_message_repository will be returned in the worst case
109            )
110        return StreamFacade.create_from_stream(stream, self, logger, state, cursor)

Prepares a stream for concurrent processing by initializing or assigning a cursor, managing the stream's state, and returning an updated Stream instance.

112    def initialize_cursor(
113        self,
114        stream: Stream,
115        state_manager: ConnectorStateManager,
116        converter: AbstractStreamStateConverter,
117        slice_boundary_fields: Optional[Tuple[str, str]],
118        start: Optional[CursorValueType],
119        end_provider: Callable[[], CursorValueType],
120        lookback_window: Optional[GapType] = None,
121        slice_range: Optional[GapType] = None,
122    ) -> Optional[ConcurrentCursor]:
123        lookback_window = lookback_window or timedelta(seconds=DEFAULT_LOOKBACK_SECONDS)
124
125        cursor_field_name = stream.cursor_field
126
127        if cursor_field_name:
128            if not isinstance(cursor_field_name, str):
129                raise ValueError(
130                    f"Cursor field type must be a string, but received {type(cursor_field_name).__name__}."
131                )
132
133            return ConcurrentCursor(
134                stream.name,
135                stream.namespace,
136                state_manager.get_stream_state(stream.name, stream.namespace),
137                self.message_repository,  # type: ignore[arg-type]  # _default_message_repository will be returned in the worst case
138                state_manager,
139                converter,
140                CursorField(cursor_field_name),
141                slice_boundary_fields,
142                start,
143                end_provider,
144                lookback_window,
145                slice_range,
146            )
147
148        return None