airbyte_cdk.sources.streams.concurrent.adapters

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import json
  7import logging
  8from functools import lru_cache
  9from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
 10
 11from typing_extensions import deprecated
 12
 13from airbyte_cdk.models import (
 14    AirbyteLogMessage,
 15    AirbyteMessage,
 16    AirbyteStream,
 17    ConfiguredAirbyteStream,
 18    Level,
 19    SyncMode,
 20    Type,
 21)
 22from airbyte_cdk.sources import AbstractSource, Source
 23from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 24from airbyte_cdk.sources.message import MessageRepository
 25from airbyte_cdk.sources.source import ExperimentalClassWarning
 26from airbyte_cdk.sources.streams import Stream
 27from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 28from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
 29from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 30from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
 31from airbyte_cdk.sources.streams.concurrent.helpers import (
 32    get_cursor_field_from_stream,
 33    get_primary_key_from_stream,
 34)
 35from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 36from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 37from airbyte_cdk.sources.streams.core import StreamData
 38from airbyte_cdk.sources.types import Record
 39from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
 40from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 41from airbyte_cdk.utils.slice_hasher import SliceHasher
 42
 43"""
 44This module contains adapters to help enabling concurrency on Stream objects without needing to migrate to AbstractStream
 45"""
 46
 47
 48@deprecated(
 49    "This class is experimental. Use at your own risk.",
 50    category=ExperimentalClassWarning,
 51)
 52class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
 53    """
 54    The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
 55
 56    All methods either delegate to the wrapped AbstractStream or provide a default implementation.
 57    The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
 58    """
 59
 60    @classmethod
 61    def create_from_stream(
 62        cls,
 63        stream: Stream,
 64        source: AbstractSource,
 65        logger: logging.Logger,
 66        state: Optional[MutableMapping[str, Any]],
 67        cursor: Cursor,
 68    ) -> Stream:
 69        """
 70        Create a ConcurrentStream from a Stream object.
 71        :param source: The source
 72        :param stream: The stream
 73        :param max_workers: The maximum number of worker thread to use
 74        :return:
 75        """
 76        pk = get_primary_key_from_stream(stream.primary_key)
 77        cursor_field = get_cursor_field_from_stream(stream)
 78
 79        if not source.message_repository:
 80            raise ValueError(
 81                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 82            )
 83
 84        message_repository = source.message_repository
 85        return StreamFacade(
 86            DefaultStream(
 87                partition_generator=StreamPartitionGenerator(
 88                    stream,
 89                    message_repository,
 90                    SyncMode.full_refresh
 91                    if isinstance(cursor, FinalStateCursor)
 92                    else SyncMode.incremental,
 93                    [cursor_field] if cursor_field is not None else None,
 94                    state,
 95                ),
 96                name=stream.name,
 97                namespace=stream.namespace,
 98                json_schema=stream.get_json_schema(),
 99                primary_key=pk,
100                cursor_field=cursor_field,
101                logger=logger,
102                cursor=cursor,
103            ),
104            stream,
105            cursor,
106            slice_logger=source._slice_logger,
107            logger=logger,
108        )
109
110    @property
111    def state(self) -> MutableMapping[str, Any]:
112        raise NotImplementedError(
113            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
114        )
115
116    @state.setter
117    def state(self, value: Mapping[str, Any]) -> None:
118        if "state" in dir(self._legacy_stream):
119            self._legacy_stream.state = value  # type: ignore  # validating `state` is attribute of stream using `if` above
120
121    def __init__(
122        self,
123        stream: DefaultStream,
124        legacy_stream: Stream,
125        cursor: Cursor,
126        slice_logger: SliceLogger,
127        logger: logging.Logger,
128    ):
129        """
130        :param stream: The underlying AbstractStream
131        """
132        self._abstract_stream = stream
133        self._legacy_stream = legacy_stream
134        self._cursor = cursor
135        self._slice_logger = slice_logger
136        self._logger = logger
137
138    def read(
139        self,
140        configured_stream: ConfiguredAirbyteStream,
141        logger: logging.Logger,
142        slice_logger: SliceLogger,
143        stream_state: MutableMapping[str, Any],
144        state_manager: ConnectorStateManager,
145        internal_config: InternalConfig,
146    ) -> Iterable[StreamData]:
147        yield from self._read_records()
148
149    def read_records(
150        self,
151        sync_mode: SyncMode,
152        cursor_field: Optional[List[str]] = None,
153        stream_slice: Optional[Mapping[str, Any]] = None,
154        stream_state: Optional[Mapping[str, Any]] = None,
155    ) -> Iterable[StreamData]:
156        try:
157            yield from self._read_records()
158        except Exception as exc:
159            if hasattr(self._cursor, "state"):
160                state = str(self._cursor.state)
161            else:
162                # This shouldn't happen if the ConcurrentCursor was used
163                state = "unknown; no state attribute was available on the cursor"
164            yield AirbyteMessage(
165                type=Type.LOG,
166                log=AirbyteLogMessage(
167                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
168                ),
169            )
170            raise exc
171
172    def _read_records(self) -> Iterable[StreamData]:
173        for partition in self._abstract_stream.generate_partitions():
174            if self._slice_logger.should_log_slice_message(self._logger):
175                yield self._slice_logger.create_slice_log_message(partition.to_slice())
176            for record in partition.read():
177                yield record.data
178
179    @property
180    def name(self) -> str:
181        return self._abstract_stream.name
182
183    @property
184    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
185        # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface
186        return self.as_airbyte_stream().source_defined_primary_key  # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]]
187
188    @property
189    def cursor_field(self) -> Union[str, List[str]]:
190        if self._abstract_stream.cursor_field is None:
191            return []
192        else:
193            return self._abstract_stream.cursor_field
194
195    @property
196    def cursor(self) -> Optional[Cursor]:  # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
197        return self._cursor
198
199    # FIXME the lru_cache seems to be mostly there because of typing issue
200    @lru_cache(maxsize=None)
201    def get_json_schema(self) -> Mapping[str, Any]:
202        return self._abstract_stream.get_json_schema()
203
204    @property
205    def supports_incremental(self) -> bool:
206        return self._legacy_stream.supports_incremental
207
208    def as_airbyte_stream(self) -> AirbyteStream:
209        return self._abstract_stream.as_airbyte_stream()
210
211    def log_stream_sync_configuration(self) -> None:
212        self._abstract_stream.log_stream_sync_configuration()
213
214    def get_underlying_stream(self) -> DefaultStream:
215        return self._abstract_stream
216
217
218class SliceEncoder(json.JSONEncoder):
219    def default(self, obj: Any) -> Any:
220        if hasattr(obj, "__json_serializable__"):
221            return obj.__json_serializable__()
222
223        # Let the base class default method raise the TypeError
224        return super().default(obj)
225
226
227class StreamPartition(Partition):
228    """
229    This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
230
231    StreamPartitions are instantiated from a Stream and a stream_slice.
232
233    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
234    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
235    """
236
237    def __init__(
238        self,
239        stream: Stream,
240        _slice: Optional[Mapping[str, Any]],
241        message_repository: MessageRepository,
242        sync_mode: SyncMode,
243        cursor_field: Optional[List[str]],
244        state: Optional[MutableMapping[str, Any]],
245    ):
246        """
247        :param stream: The stream to delegate to
248        :param _slice: The partition's stream_slice
249        :param message_repository: The message repository to use to emit non-record messages
250        """
251        self._stream = stream
252        self._slice = _slice
253        self._message_repository = message_repository
254        self._sync_mode = sync_mode
255        self._cursor_field = cursor_field
256        self._state = state
257        self._hash = SliceHasher.hash(self._stream.name, self._slice)
258
259    def read(self) -> Iterable[Record]:
260        """
261        Read messages from the stream.
262        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
263        Otherwise, the message will be emitted on the message repository.
264        """
265        try:
266            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
267            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
268            #  * fetch_next_page
269            #  * parse_response
270            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
271            #  `if not stream_state` to know if it calls the Event stream or not
272            for record_data in self._stream.read_records(
273                cursor_field=self._cursor_field,
274                sync_mode=SyncMode.full_refresh,
275                stream_slice=copy.deepcopy(self._slice),
276                stream_state=self._state,
277            ):
278                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
279                # For now, file-based connectors have their own stream facade
280                if isinstance(record_data, Mapping):
281                    data_to_return = dict(record_data)
282                    self._stream.transformer.transform(
283                        data_to_return, self._stream.get_json_schema()
284                    )
285                    yield Record(
286                        data=data_to_return,
287                        stream_name=self.stream_name(),
288                        associated_slice=self._slice,  # type: ignore [arg-type]
289                    )
290                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
291                    yield Record(
292                        data=record_data.record.data or {},
293                        stream_name=self.stream_name(),
294                        associated_slice=self._slice,  # type: ignore [arg-type]
295                    )
296                else:
297                    self._message_repository.emit_message(record_data)
298        except Exception as e:
299            display_message = self._stream.get_error_display_message(e)
300            if display_message:
301                raise ExceptionWithDisplayMessage(display_message) from e
302            else:
303                raise e
304
305    def to_slice(self) -> Optional[Mapping[str, Any]]:
306        return self._slice
307
308    def __hash__(self) -> int:
309        return self._hash
310
311    def stream_name(self) -> str:
312        return self._stream.name
313
314    def __repr__(self) -> str:
315        return f"StreamPartition({self._stream.name}, {self._slice})"
316
317
318class StreamPartitionGenerator(PartitionGenerator):
319    """
320    This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
321
322    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
323    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
324    """
325
326    def __init__(
327        self,
328        stream: Stream,
329        message_repository: MessageRepository,
330        sync_mode: SyncMode,
331        cursor_field: Optional[List[str]],
332        state: Optional[MutableMapping[str, Any]],
333    ):
334        """
335        :param stream: The stream to delegate to
336        :param message_repository: The message repository to use to emit non-record messages
337        """
338        self.message_repository = message_repository
339        self._stream = stream
340        self._sync_mode = sync_mode
341        self._cursor_field = cursor_field
342        self._state = state
343
344    def generate(self) -> Iterable[Partition]:
345        for s in self._stream.stream_slices(
346            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
347        ):
348            yield StreamPartition(
349                self._stream,
350                copy.deepcopy(s),
351                self.message_repository,
352                self._sync_mode,
353                self._cursor_field,
354                self._state,
355            )
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
class StreamFacade(airbyte_cdk.sources.streams.concurrent.abstract_stream_facade.AbstractStreamFacade[airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream], airbyte_cdk.sources.streams.core.Stream):
 49@deprecated(
 50    "This class is experimental. Use at your own risk.",
 51    category=ExperimentalClassWarning,
 52)
 53class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
 54    """
 55    The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
 56
 57    All methods either delegate to the wrapped AbstractStream or provide a default implementation.
 58    The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
 59    """
 60
 61    @classmethod
 62    def create_from_stream(
 63        cls,
 64        stream: Stream,
 65        source: AbstractSource,
 66        logger: logging.Logger,
 67        state: Optional[MutableMapping[str, Any]],
 68        cursor: Cursor,
 69    ) -> Stream:
 70        """
 71        Create a ConcurrentStream from a Stream object.
 72        :param source: The source
 73        :param stream: The stream
 74        :param max_workers: The maximum number of worker thread to use
 75        :return:
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79
 80        if not source.message_repository:
 81            raise ValueError(
 82                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 83            )
 84
 85        message_repository = source.message_repository
 86        return StreamFacade(
 87            DefaultStream(
 88                partition_generator=StreamPartitionGenerator(
 89                    stream,
 90                    message_repository,
 91                    SyncMode.full_refresh
 92                    if isinstance(cursor, FinalStateCursor)
 93                    else SyncMode.incremental,
 94                    [cursor_field] if cursor_field is not None else None,
 95                    state,
 96                ),
 97                name=stream.name,
 98                namespace=stream.namespace,
 99                json_schema=stream.get_json_schema(),
100                primary_key=pk,
101                cursor_field=cursor_field,
102                logger=logger,
103                cursor=cursor,
104            ),
105            stream,
106            cursor,
107            slice_logger=source._slice_logger,
108            logger=logger,
109        )
110
111    @property
112    def state(self) -> MutableMapping[str, Any]:
113        raise NotImplementedError(
114            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
115        )
116
117    @state.setter
118    def state(self, value: Mapping[str, Any]) -> None:
119        if "state" in dir(self._legacy_stream):
120            self._legacy_stream.state = value  # type: ignore  # validating `state` is attribute of stream using `if` above
121
122    def __init__(
123        self,
124        stream: DefaultStream,
125        legacy_stream: Stream,
126        cursor: Cursor,
127        slice_logger: SliceLogger,
128        logger: logging.Logger,
129    ):
130        """
131        :param stream: The underlying AbstractStream
132        """
133        self._abstract_stream = stream
134        self._legacy_stream = legacy_stream
135        self._cursor = cursor
136        self._slice_logger = slice_logger
137        self._logger = logger
138
139    def read(
140        self,
141        configured_stream: ConfiguredAirbyteStream,
142        logger: logging.Logger,
143        slice_logger: SliceLogger,
144        stream_state: MutableMapping[str, Any],
145        state_manager: ConnectorStateManager,
146        internal_config: InternalConfig,
147    ) -> Iterable[StreamData]:
148        yield from self._read_records()
149
150    def read_records(
151        self,
152        sync_mode: SyncMode,
153        cursor_field: Optional[List[str]] = None,
154        stream_slice: Optional[Mapping[str, Any]] = None,
155        stream_state: Optional[Mapping[str, Any]] = None,
156    ) -> Iterable[StreamData]:
157        try:
158            yield from self._read_records()
159        except Exception as exc:
160            if hasattr(self._cursor, "state"):
161                state = str(self._cursor.state)
162            else:
163                # This shouldn't happen if the ConcurrentCursor was used
164                state = "unknown; no state attribute was available on the cursor"
165            yield AirbyteMessage(
166                type=Type.LOG,
167                log=AirbyteLogMessage(
168                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
169                ),
170            )
171            raise exc
172
173    def _read_records(self) -> Iterable[StreamData]:
174        for partition in self._abstract_stream.generate_partitions():
175            if self._slice_logger.should_log_slice_message(self._logger):
176                yield self._slice_logger.create_slice_log_message(partition.to_slice())
177            for record in partition.read():
178                yield record.data
179
180    @property
181    def name(self) -> str:
182        return self._abstract_stream.name
183
184    @property
185    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
186        # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface
187        return self.as_airbyte_stream().source_defined_primary_key  # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]]
188
189    @property
190    def cursor_field(self) -> Union[str, List[str]]:
191        if self._abstract_stream.cursor_field is None:
192            return []
193        else:
194            return self._abstract_stream.cursor_field
195
196    @property
197    def cursor(self) -> Optional[Cursor]:  # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
198        return self._cursor
199
200    # FIXME the lru_cache seems to be mostly there because of typing issue
201    @lru_cache(maxsize=None)
202    def get_json_schema(self) -> Mapping[str, Any]:
203        return self._abstract_stream.get_json_schema()
204
205    @property
206    def supports_incremental(self) -> bool:
207        return self._legacy_stream.supports_incremental
208
209    def as_airbyte_stream(self) -> AirbyteStream:
210        return self._abstract_stream.as_airbyte_stream()
211
212    def log_stream_sync_configuration(self) -> None:
213        self._abstract_stream.log_stream_sync_configuration()
214
215    def get_underlying_stream(self) -> DefaultStream:
216        return self._abstract_stream

The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.

All methods either delegate to the wrapped AbstractStream or provide a default implementation. The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.

StreamFacade( stream: airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream, legacy_stream: airbyte_cdk.Stream, cursor: airbyte_cdk.Cursor, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, logger: logging.Logger)
122    def __init__(
123        self,
124        stream: DefaultStream,
125        legacy_stream: Stream,
126        cursor: Cursor,
127        slice_logger: SliceLogger,
128        logger: logging.Logger,
129    ):
130        """
131        :param stream: The underlying AbstractStream
132        """
133        self._abstract_stream = stream
134        self._legacy_stream = legacy_stream
135        self._cursor = cursor
136        self._slice_logger = slice_logger
137        self._logger = logger
Parameters
  • stream: The underlying AbstractStream
@classmethod
def create_from_stream( cls, stream: airbyte_cdk.Stream, source: airbyte_cdk.AbstractSource, logger: logging.Logger, state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.Cursor) -> airbyte_cdk.Stream:
 61    @classmethod
 62    def create_from_stream(
 63        cls,
 64        stream: Stream,
 65        source: AbstractSource,
 66        logger: logging.Logger,
 67        state: Optional[MutableMapping[str, Any]],
 68        cursor: Cursor,
 69    ) -> Stream:
 70        """
 71        Create a ConcurrentStream from a Stream object.
 72        :param source: The source
 73        :param stream: The stream
 74        :param max_workers: The maximum number of worker thread to use
 75        :return:
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79
 80        if not source.message_repository:
 81            raise ValueError(
 82                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 83            )
 84
 85        message_repository = source.message_repository
 86        return StreamFacade(
 87            DefaultStream(
 88                partition_generator=StreamPartitionGenerator(
 89                    stream,
 90                    message_repository,
 91                    SyncMode.full_refresh
 92                    if isinstance(cursor, FinalStateCursor)
 93                    else SyncMode.incremental,
 94                    [cursor_field] if cursor_field is not None else None,
 95                    state,
 96                ),
 97                name=stream.name,
 98                namespace=stream.namespace,
 99                json_schema=stream.get_json_schema(),
100                primary_key=pk,
101                cursor_field=cursor_field,
102                logger=logger,
103                cursor=cursor,
104            ),
105            stream,
106            cursor,
107            slice_logger=source._slice_logger,
108            logger=logger,
109        )

Create a ConcurrentStream from a Stream object.

Parameters
  • source: The source
  • stream: The stream
  • max_workers: The maximum number of worker thread to use
Returns
state: MutableMapping[str, Any]
111    @property
112    def state(self) -> MutableMapping[str, Any]:
113        raise NotImplementedError(
114            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
115        )
def get_underlying_stream( self) -> airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream:
215    def get_underlying_stream(self) -> DefaultStream:
216        return self._abstract_stream

Return the underlying stream facade object.

class SliceEncoder(json.encoder.JSONEncoder):
219class SliceEncoder(json.JSONEncoder):
220    def default(self, obj: Any) -> Any:
221        if hasattr(obj, "__json_serializable__"):
222            return obj.__json_serializable__()
223
224        # Let the base class default method raise the TypeError
225        return super().default(obj)

Extensible JSON https://json.org encoder for Python data structures.

Supports the following objects and types by default:

+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+

To extend this to recognize other objects, subclass and implement a .default() method with another method that returns a serializable object for o if possible, otherwise it should call the superclass implementation (to raise TypeError).

def default(self, obj: Any) -> Any:
220    def default(self, obj: Any) -> Any:
221        if hasattr(obj, "__json_serializable__"):
222            return obj.__json_serializable__()
223
224        # Let the base class default method raise the TypeError
225        return super().default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
228class StreamPartition(Partition):
229    """
230    This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
231
232    StreamPartitions are instantiated from a Stream and a stream_slice.
233
234    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
235    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
236    """
237
238    def __init__(
239        self,
240        stream: Stream,
241        _slice: Optional[Mapping[str, Any]],
242        message_repository: MessageRepository,
243        sync_mode: SyncMode,
244        cursor_field: Optional[List[str]],
245        state: Optional[MutableMapping[str, Any]],
246    ):
247        """
248        :param stream: The stream to delegate to
249        :param _slice: The partition's stream_slice
250        :param message_repository: The message repository to use to emit non-record messages
251        """
252        self._stream = stream
253        self._slice = _slice
254        self._message_repository = message_repository
255        self._sync_mode = sync_mode
256        self._cursor_field = cursor_field
257        self._state = state
258        self._hash = SliceHasher.hash(self._stream.name, self._slice)
259
260    def read(self) -> Iterable[Record]:
261        """
262        Read messages from the stream.
263        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
264        Otherwise, the message will be emitted on the message repository.
265        """
266        try:
267            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
268            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
269            #  * fetch_next_page
270            #  * parse_response
271            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
272            #  `if not stream_state` to know if it calls the Event stream or not
273            for record_data in self._stream.read_records(
274                cursor_field=self._cursor_field,
275                sync_mode=SyncMode.full_refresh,
276                stream_slice=copy.deepcopy(self._slice),
277                stream_state=self._state,
278            ):
279                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
280                # For now, file-based connectors have their own stream facade
281                if isinstance(record_data, Mapping):
282                    data_to_return = dict(record_data)
283                    self._stream.transformer.transform(
284                        data_to_return, self._stream.get_json_schema()
285                    )
286                    yield Record(
287                        data=data_to_return,
288                        stream_name=self.stream_name(),
289                        associated_slice=self._slice,  # type: ignore [arg-type]
290                    )
291                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
292                    yield Record(
293                        data=record_data.record.data or {},
294                        stream_name=self.stream_name(),
295                        associated_slice=self._slice,  # type: ignore [arg-type]
296                    )
297                else:
298                    self._message_repository.emit_message(record_data)
299        except Exception as e:
300            display_message = self._stream.get_error_display_message(e)
301            if display_message:
302                raise ExceptionWithDisplayMessage(display_message) from e
303            else:
304                raise e
305
306    def to_slice(self) -> Optional[Mapping[str, Any]]:
307        return self._slice
308
309    def __hash__(self) -> int:
310        return self._hash
311
312    def stream_name(self) -> str:
313        return self._stream.name
314
315    def __repr__(self) -> str:
316        return f"StreamPartition({self._stream.name}, {self._slice})"

This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface

StreamPartitions are instantiated from a Stream and a stream_slice.

This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.

StreamPartition( stream: airbyte_cdk.Stream, _slice: Optional[Mapping[str, Any]], message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
238    def __init__(
239        self,
240        stream: Stream,
241        _slice: Optional[Mapping[str, Any]],
242        message_repository: MessageRepository,
243        sync_mode: SyncMode,
244        cursor_field: Optional[List[str]],
245        state: Optional[MutableMapping[str, Any]],
246    ):
247        """
248        :param stream: The stream to delegate to
249        :param _slice: The partition's stream_slice
250        :param message_repository: The message repository to use to emit non-record messages
251        """
252        self._stream = stream
253        self._slice = _slice
254        self._message_repository = message_repository
255        self._sync_mode = sync_mode
256        self._cursor_field = cursor_field
257        self._state = state
258        self._hash = SliceHasher.hash(self._stream.name, self._slice)
Parameters
  • stream: The stream to delegate to
  • _slice: The partition's stream_slice
  • message_repository: The message repository to use to emit non-record messages
def read(self) -> Iterable[airbyte_cdk.Record]:
260    def read(self) -> Iterable[Record]:
261        """
262        Read messages from the stream.
263        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
264        Otherwise, the message will be emitted on the message repository.
265        """
266        try:
267            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
268            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
269            #  * fetch_next_page
270            #  * parse_response
271            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
272            #  `if not stream_state` to know if it calls the Event stream or not
273            for record_data in self._stream.read_records(
274                cursor_field=self._cursor_field,
275                sync_mode=SyncMode.full_refresh,
276                stream_slice=copy.deepcopy(self._slice),
277                stream_state=self._state,
278            ):
279                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
280                # For now, file-based connectors have their own stream facade
281                if isinstance(record_data, Mapping):
282                    data_to_return = dict(record_data)
283                    self._stream.transformer.transform(
284                        data_to_return, self._stream.get_json_schema()
285                    )
286                    yield Record(
287                        data=data_to_return,
288                        stream_name=self.stream_name(),
289                        associated_slice=self._slice,  # type: ignore [arg-type]
290                    )
291                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
292                    yield Record(
293                        data=record_data.record.data or {},
294                        stream_name=self.stream_name(),
295                        associated_slice=self._slice,  # type: ignore [arg-type]
296                    )
297                else:
298                    self._message_repository.emit_message(record_data)
299        except Exception as e:
300            display_message = self._stream.get_error_display_message(e)
301            if display_message:
302                raise ExceptionWithDisplayMessage(display_message) from e
303            else:
304                raise e

Read messages from the stream. If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. Otherwise, the message will be emitted on the message repository.

def to_slice(self) -> Optional[Mapping[str, Any]]:
306    def to_slice(self) -> Optional[Mapping[str, Any]]:
307        return self._slice

Converts the partition to a slice that can be serialized and deserialized.

Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested values (example)

Returns

A mapping representing a slice

def stream_name(self) -> str:
312    def stream_name(self) -> str:
313        return self._stream.name

Returns the name of the stream that this partition is reading from.

Returns

The name of the stream.

319class StreamPartitionGenerator(PartitionGenerator):
320    """
321    This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
322
323    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
324    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
325    """
326
327    def __init__(
328        self,
329        stream: Stream,
330        message_repository: MessageRepository,
331        sync_mode: SyncMode,
332        cursor_field: Optional[List[str]],
333        state: Optional[MutableMapping[str, Any]],
334    ):
335        """
336        :param stream: The stream to delegate to
337        :param message_repository: The message repository to use to emit non-record messages
338        """
339        self.message_repository = message_repository
340        self._stream = stream
341        self._sync_mode = sync_mode
342        self._cursor_field = cursor_field
343        self._state = state
344
345    def generate(self) -> Iterable[Partition]:
346        for s in self._stream.stream_slices(
347            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
348        ):
349            yield StreamPartition(
350                self._stream,
351                copy.deepcopy(s),
352                self.message_repository,
353                self._sync_mode,
354                self._cursor_field,
355                self._state,
356            )

This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices

This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.

StreamPartitionGenerator( stream: airbyte_cdk.Stream, message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
327    def __init__(
328        self,
329        stream: Stream,
330        message_repository: MessageRepository,
331        sync_mode: SyncMode,
332        cursor_field: Optional[List[str]],
333        state: Optional[MutableMapping[str, Any]],
334    ):
335        """
336        :param stream: The stream to delegate to
337        :param message_repository: The message repository to use to emit non-record messages
338        """
339        self.message_repository = message_repository
340        self._stream = stream
341        self._sync_mode = sync_mode
342        self._cursor_field = cursor_field
343        self._state = state
Parameters
  • stream: The stream to delegate to
  • message_repository: The message repository to use to emit non-record messages
message_repository
def generate( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
345    def generate(self) -> Iterable[Partition]:
346        for s in self._stream.stream_slices(
347            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
348        ):
349            yield StreamPartition(
350                self._stream,
351                copy.deepcopy(s),
352                self.message_repository,
353                self._sync_mode,
354                self._cursor_field,
355                self._state,
356            )

Generates partitions for a given sync mode.

Returns

An iterable of partitions