airbyte_cdk.sources.streams.concurrent.adapters

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import json
  7import logging
  8from functools import lru_cache
  9from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
 10
 11from typing_extensions import deprecated
 12
 13from airbyte_cdk.models import (
 14    AirbyteLogMessage,
 15    AirbyteMessage,
 16    AirbyteStream,
 17    ConfiguredAirbyteStream,
 18    Level,
 19    SyncMode,
 20    Type,
 21)
 22from airbyte_cdk.sources import AbstractSource, Source
 23from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 24from airbyte_cdk.sources.message import MessageRepository
 25from airbyte_cdk.sources.source import ExperimentalClassWarning
 26from airbyte_cdk.sources.streams import Stream
 27from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 28from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField, FinalStateCursor
 29from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 30from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
 31from airbyte_cdk.sources.streams.concurrent.helpers import (
 32    get_cursor_field_from_stream,
 33    get_primary_key_from_stream,
 34)
 35from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 36from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 37from airbyte_cdk.sources.streams.core import StreamData
 38from airbyte_cdk.sources.types import Record
 39from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
 40from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 41from airbyte_cdk.utils.slice_hasher import SliceHasher
 42
 43"""
 44This module contains adapters to help enabling concurrency on Stream objects without needing to migrate to AbstractStream
 45"""
 46
 47
 48@deprecated(
 49    "This class is experimental. Use at your own risk.",
 50    category=ExperimentalClassWarning,
 51)
 52class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
 53    """
 54    The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
 55
 56    All methods either delegate to the wrapped AbstractStream or provide a default implementation.
 57    The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
 58    """
 59
 60    @classmethod
 61    def create_from_stream(
 62        cls,
 63        stream: Stream,
 64        source: AbstractSource,
 65        logger: logging.Logger,
 66        state: Optional[MutableMapping[str, Any]],
 67        cursor: Cursor,
 68    ) -> Stream:
 69        """
 70        Create a ConcurrentStream from a Stream object.
 71        :param source: The source
 72        :param stream: The stream
 73        :param max_workers: The maximum number of worker thread to use
 74        :return:
 75        """
 76        pk = get_primary_key_from_stream(stream.primary_key)
 77        cursor_field = get_cursor_field_from_stream(stream)
 78
 79        if not source.message_repository:
 80            raise ValueError(
 81                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 82            )
 83
 84        message_repository = source.message_repository
 85        return StreamFacade(
 86            DefaultStream(
 87                partition_generator=StreamPartitionGenerator(
 88                    stream,
 89                    message_repository,
 90                    SyncMode.full_refresh
 91                    if isinstance(cursor, FinalStateCursor)
 92                    else SyncMode.incremental,
 93                    [cursor_field] if cursor_field is not None else None,
 94                    state,
 95                ),
 96                name=stream.name,
 97                namespace=stream.namespace,
 98                json_schema=stream.get_json_schema(),
 99                primary_key=pk,
100                cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
101                logger=logger,
102                cursor=cursor,
103            ),
104            stream,
105            cursor,
106            slice_logger=source._slice_logger,
107            logger=logger,
108        )
109
110    @property
111    def state(self) -> MutableMapping[str, Any]:
112        raise NotImplementedError(
113            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
114        )
115
116    @state.setter
117    def state(self, value: Mapping[str, Any]) -> None:
118        if "state" in dir(self._legacy_stream):
119            self._legacy_stream.state = value  # type: ignore  # validating `state` is attribute of stream using `if` above
120
121    def __init__(
122        self,
123        stream: DefaultStream,
124        legacy_stream: Stream,
125        cursor: Cursor,
126        slice_logger: SliceLogger,
127        logger: logging.Logger,
128    ):
129        """
130        :param stream: The underlying AbstractStream
131        """
132        self._abstract_stream = stream
133        self._legacy_stream = legacy_stream
134        self._cursor = cursor
135        self._slice_logger = slice_logger
136        self._logger = logger
137
138    def read(
139        self,
140        configured_stream: ConfiguredAirbyteStream,
141        logger: logging.Logger,
142        slice_logger: SliceLogger,
143        stream_state: MutableMapping[str, Any],
144        state_manager: ConnectorStateManager,
145        internal_config: InternalConfig,
146    ) -> Iterable[StreamData]:
147        yield from self._read_records()
148
149    def read_records(
150        self,
151        sync_mode: SyncMode,
152        cursor_field: Optional[List[str]] = None,
153        stream_slice: Optional[Mapping[str, Any]] = None,
154        stream_state: Optional[Mapping[str, Any]] = None,
155    ) -> Iterable[StreamData]:
156        try:
157            yield from self._read_records()
158        except Exception as exc:
159            if hasattr(self._cursor, "state"):
160                state = str(self._cursor.state)
161            else:
162                # This shouldn't happen if the ConcurrentCursor was used
163                state = "unknown; no state attribute was available on the cursor"
164            yield AirbyteMessage(
165                type=Type.LOG,
166                log=AirbyteLogMessage(
167                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
168                ),
169            )
170            raise exc
171
172    def _read_records(self) -> Iterable[StreamData]:
173        for partition in self._abstract_stream.generate_partitions():
174            if self._slice_logger.should_log_slice_message(self._logger):
175                yield self._slice_logger.create_slice_log_message(partition.to_slice())
176            for record in partition.read():
177                yield record.data
178
179    @property
180    def name(self) -> str:
181        return self._abstract_stream.name
182
183    @property
184    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
185        # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface
186        return self.as_airbyte_stream().source_defined_primary_key  # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]]
187
188    @property
189    def cursor_field(self) -> Union[str, List[str]]:
190        if self._abstract_stream.cursor_field is None:
191            return []
192        else:
193            return self._abstract_stream.cursor_field
194
195    @property
196    def cursor(self) -> Optional[Cursor]:  # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
197        return self._cursor
198
199    @property
200    def block_simultaneous_read(self) -> str:
201        """Returns the blocking group name from the underlying stream"""
202        return self._abstract_stream.block_simultaneous_read
203
204    # FIXME the lru_cache seems to be mostly there because of typing issue
205    @lru_cache(maxsize=None)
206    def get_json_schema(self) -> Mapping[str, Any]:
207        return self._abstract_stream.get_json_schema()
208
209    @property
210    def supports_incremental(self) -> bool:
211        return self._legacy_stream.supports_incremental
212
213    def as_airbyte_stream(self) -> AirbyteStream:
214        return self._abstract_stream.as_airbyte_stream()
215
216    def log_stream_sync_configuration(self) -> None:
217        self._abstract_stream.log_stream_sync_configuration()
218
219    def get_underlying_stream(self) -> DefaultStream:
220        return self._abstract_stream
221
222
223class SliceEncoder(json.JSONEncoder):
224    def default(self, obj: Any) -> Any:
225        if hasattr(obj, "__json_serializable__"):
226            return obj.__json_serializable__()
227
228        # Let the base class default method raise the TypeError
229        return super().default(obj)
230
231
232class StreamPartition(Partition):
233    """
234    This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
235
236    StreamPartitions are instantiated from a Stream and a stream_slice.
237
238    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
239    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
240    """
241
242    def __init__(
243        self,
244        stream: Stream,
245        _slice: Optional[Mapping[str, Any]],
246        message_repository: MessageRepository,
247        sync_mode: SyncMode,
248        cursor_field: Optional[List[str]],
249        state: Optional[MutableMapping[str, Any]],
250    ):
251        """
252        :param stream: The stream to delegate to
253        :param _slice: The partition's stream_slice
254        :param message_repository: The message repository to use to emit non-record messages
255        """
256        self._stream = stream
257        self._slice = _slice
258        self._message_repository = message_repository
259        self._sync_mode = sync_mode
260        self._cursor_field = cursor_field
261        self._state = state
262        self._hash = SliceHasher.hash(self._stream.name, self._slice)
263
264    def read(self) -> Iterable[Record]:
265        """
266        Read messages from the stream.
267        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
268        Otherwise, the message will be emitted on the message repository.
269        """
270        try:
271            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
272            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
273            #  * fetch_next_page
274            #  * parse_response
275            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
276            #  `if not stream_state` to know if it calls the Event stream or not
277            for record_data in self._stream.read_records(
278                cursor_field=self._cursor_field,
279                sync_mode=SyncMode.full_refresh,
280                stream_slice=copy.deepcopy(self._slice),
281                stream_state=self._state,
282            ):
283                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
284                # For now, file-based connectors have their own stream facade
285                if isinstance(record_data, Mapping):
286                    data_to_return = dict(record_data)
287                    self._stream.transformer.transform(
288                        data_to_return, self._stream.get_json_schema()
289                    )
290                    yield Record(
291                        data=data_to_return,
292                        stream_name=self.stream_name(),
293                        associated_slice=self._slice,  # type: ignore [arg-type]
294                    )
295                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
296                    yield Record(
297                        data=record_data.record.data or {},
298                        stream_name=self.stream_name(),
299                        associated_slice=self._slice,  # type: ignore [arg-type]
300                    )
301                else:
302                    self._message_repository.emit_message(record_data)
303        except Exception as e:
304            display_message = self._stream.get_error_display_message(e)
305            if display_message:
306                raise ExceptionWithDisplayMessage(display_message) from e
307            else:
308                raise e
309
310    def to_slice(self) -> Optional[Mapping[str, Any]]:
311        return self._slice
312
313    def __hash__(self) -> int:
314        return self._hash
315
316    def stream_name(self) -> str:
317        return self._stream.name
318
319    def __repr__(self) -> str:
320        return f"StreamPartition({self._stream.name}, {self._slice})"
321
322
323class StreamPartitionGenerator(PartitionGenerator):
324    """
325    This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
326
327    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
328    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
329    """
330
331    def __init__(
332        self,
333        stream: Stream,
334        message_repository: MessageRepository,
335        sync_mode: SyncMode,
336        cursor_field: Optional[List[str]],
337        state: Optional[MutableMapping[str, Any]],
338    ):
339        """
340        :param stream: The stream to delegate to
341        :param message_repository: The message repository to use to emit non-record messages
342        """
343        self.message_repository = message_repository
344        self._stream = stream
345        self._sync_mode = sync_mode
346        self._cursor_field = cursor_field
347        self._state = state
348
349    def generate(self) -> Iterable[Partition]:
350        for s in self._stream.stream_slices(
351            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
352        ):
353            yield StreamPartition(
354                self._stream,
355                copy.deepcopy(s),
356                self.message_repository,
357                self._sync_mode,
358                self._cursor_field,
359                self._state,
360            )
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
class StreamFacade(airbyte_cdk.sources.streams.concurrent.abstract_stream_facade.AbstractStreamFacade[airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream], airbyte_cdk.sources.streams.core.Stream):
 49@deprecated(
 50    "This class is experimental. Use at your own risk.",
 51    category=ExperimentalClassWarning,
 52)
 53class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
 54    """
 55    The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
 56
 57    All methods either delegate to the wrapped AbstractStream or provide a default implementation.
 58    The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
 59    """
 60
 61    @classmethod
 62    def create_from_stream(
 63        cls,
 64        stream: Stream,
 65        source: AbstractSource,
 66        logger: logging.Logger,
 67        state: Optional[MutableMapping[str, Any]],
 68        cursor: Cursor,
 69    ) -> Stream:
 70        """
 71        Create a ConcurrentStream from a Stream object.
 72        :param source: The source
 73        :param stream: The stream
 74        :param max_workers: The maximum number of worker thread to use
 75        :return:
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79
 80        if not source.message_repository:
 81            raise ValueError(
 82                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 83            )
 84
 85        message_repository = source.message_repository
 86        return StreamFacade(
 87            DefaultStream(
 88                partition_generator=StreamPartitionGenerator(
 89                    stream,
 90                    message_repository,
 91                    SyncMode.full_refresh
 92                    if isinstance(cursor, FinalStateCursor)
 93                    else SyncMode.incremental,
 94                    [cursor_field] if cursor_field is not None else None,
 95                    state,
 96                ),
 97                name=stream.name,
 98                namespace=stream.namespace,
 99                json_schema=stream.get_json_schema(),
100                primary_key=pk,
101                cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
102                logger=logger,
103                cursor=cursor,
104            ),
105            stream,
106            cursor,
107            slice_logger=source._slice_logger,
108            logger=logger,
109        )
110
111    @property
112    def state(self) -> MutableMapping[str, Any]:
113        raise NotImplementedError(
114            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
115        )
116
117    @state.setter
118    def state(self, value: Mapping[str, Any]) -> None:
119        if "state" in dir(self._legacy_stream):
120            self._legacy_stream.state = value  # type: ignore  # validating `state` is attribute of stream using `if` above
121
122    def __init__(
123        self,
124        stream: DefaultStream,
125        legacy_stream: Stream,
126        cursor: Cursor,
127        slice_logger: SliceLogger,
128        logger: logging.Logger,
129    ):
130        """
131        :param stream: The underlying AbstractStream
132        """
133        self._abstract_stream = stream
134        self._legacy_stream = legacy_stream
135        self._cursor = cursor
136        self._slice_logger = slice_logger
137        self._logger = logger
138
139    def read(
140        self,
141        configured_stream: ConfiguredAirbyteStream,
142        logger: logging.Logger,
143        slice_logger: SliceLogger,
144        stream_state: MutableMapping[str, Any],
145        state_manager: ConnectorStateManager,
146        internal_config: InternalConfig,
147    ) -> Iterable[StreamData]:
148        yield from self._read_records()
149
150    def read_records(
151        self,
152        sync_mode: SyncMode,
153        cursor_field: Optional[List[str]] = None,
154        stream_slice: Optional[Mapping[str, Any]] = None,
155        stream_state: Optional[Mapping[str, Any]] = None,
156    ) -> Iterable[StreamData]:
157        try:
158            yield from self._read_records()
159        except Exception as exc:
160            if hasattr(self._cursor, "state"):
161                state = str(self._cursor.state)
162            else:
163                # This shouldn't happen if the ConcurrentCursor was used
164                state = "unknown; no state attribute was available on the cursor"
165            yield AirbyteMessage(
166                type=Type.LOG,
167                log=AirbyteLogMessage(
168                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
169                ),
170            )
171            raise exc
172
173    def _read_records(self) -> Iterable[StreamData]:
174        for partition in self._abstract_stream.generate_partitions():
175            if self._slice_logger.should_log_slice_message(self._logger):
176                yield self._slice_logger.create_slice_log_message(partition.to_slice())
177            for record in partition.read():
178                yield record.data
179
180    @property
181    def name(self) -> str:
182        return self._abstract_stream.name
183
184    @property
185    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
186        # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface
187        return self.as_airbyte_stream().source_defined_primary_key  # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]]
188
189    @property
190    def cursor_field(self) -> Union[str, List[str]]:
191        if self._abstract_stream.cursor_field is None:
192            return []
193        else:
194            return self._abstract_stream.cursor_field
195
196    @property
197    def cursor(self) -> Optional[Cursor]:  # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
198        return self._cursor
199
200    @property
201    def block_simultaneous_read(self) -> str:
202        """Returns the blocking group name from the underlying stream"""
203        return self._abstract_stream.block_simultaneous_read
204
205    # FIXME the lru_cache seems to be mostly there because of typing issue
206    @lru_cache(maxsize=None)
207    def get_json_schema(self) -> Mapping[str, Any]:
208        return self._abstract_stream.get_json_schema()
209
210    @property
211    def supports_incremental(self) -> bool:
212        return self._legacy_stream.supports_incremental
213
214    def as_airbyte_stream(self) -> AirbyteStream:
215        return self._abstract_stream.as_airbyte_stream()
216
217    def log_stream_sync_configuration(self) -> None:
218        self._abstract_stream.log_stream_sync_configuration()
219
220    def get_underlying_stream(self) -> DefaultStream:
221        return self._abstract_stream

The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.

All methods either delegate to the wrapped AbstractStream or provide a default implementation. The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.

StreamFacade( stream: airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream, legacy_stream: airbyte_cdk.Stream, cursor: airbyte_cdk.Cursor, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, logger: logging.Logger)
122    def __init__(
123        self,
124        stream: DefaultStream,
125        legacy_stream: Stream,
126        cursor: Cursor,
127        slice_logger: SliceLogger,
128        logger: logging.Logger,
129    ):
130        """
131        :param stream: The underlying AbstractStream
132        """
133        self._abstract_stream = stream
134        self._legacy_stream = legacy_stream
135        self._cursor = cursor
136        self._slice_logger = slice_logger
137        self._logger = logger
Parameters
  • stream: The underlying AbstractStream
@classmethod
def create_from_stream( cls, stream: airbyte_cdk.Stream, source: airbyte_cdk.AbstractSource, logger: logging.Logger, state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.Cursor) -> airbyte_cdk.Stream:
 61    @classmethod
 62    def create_from_stream(
 63        cls,
 64        stream: Stream,
 65        source: AbstractSource,
 66        logger: logging.Logger,
 67        state: Optional[MutableMapping[str, Any]],
 68        cursor: Cursor,
 69    ) -> Stream:
 70        """
 71        Create a ConcurrentStream from a Stream object.
 72        :param source: The source
 73        :param stream: The stream
 74        :param max_workers: The maximum number of worker thread to use
 75        :return:
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79
 80        if not source.message_repository:
 81            raise ValueError(
 82                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 83            )
 84
 85        message_repository = source.message_repository
 86        return StreamFacade(
 87            DefaultStream(
 88                partition_generator=StreamPartitionGenerator(
 89                    stream,
 90                    message_repository,
 91                    SyncMode.full_refresh
 92                    if isinstance(cursor, FinalStateCursor)
 93                    else SyncMode.incremental,
 94                    [cursor_field] if cursor_field is not None else None,
 95                    state,
 96                ),
 97                name=stream.name,
 98                namespace=stream.namespace,
 99                json_schema=stream.get_json_schema(),
100                primary_key=pk,
101                cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
102                logger=logger,
103                cursor=cursor,
104            ),
105            stream,
106            cursor,
107            slice_logger=source._slice_logger,
108            logger=logger,
109        )

Create a ConcurrentStream from a Stream object.

Parameters
  • source: The source
  • stream: The stream
  • max_workers: The maximum number of worker thread to use
Returns
state: MutableMapping[str, Any]
111    @property
112    def state(self) -> MutableMapping[str, Any]:
113        raise NotImplementedError(
114            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
115        )
block_simultaneous_read: str
200    @property
201    def block_simultaneous_read(self) -> str:
202        """Returns the blocking group name from the underlying stream"""
203        return self._abstract_stream.block_simultaneous_read

Returns the blocking group name from the underlying stream

def get_underlying_stream( self) -> airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream:
220    def get_underlying_stream(self) -> DefaultStream:
221        return self._abstract_stream

Return the underlying stream facade object.

class SliceEncoder(json.encoder.JSONEncoder):
224class SliceEncoder(json.JSONEncoder):
225    def default(self, obj: Any) -> Any:
226        if hasattr(obj, "__json_serializable__"):
227            return obj.__json_serializable__()
228
229        # Let the base class default method raise the TypeError
230        return super().default(obj)

Extensible JSON https://json.org encoder for Python data structures.

Supports the following objects and types by default:

+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+

To extend this to recognize other objects, subclass and implement a .default() method with another method that returns a serializable object for o if possible, otherwise it should call the superclass implementation (to raise TypeError).

def default(self, obj: Any) -> Any:
225    def default(self, obj: Any) -> Any:
226        if hasattr(obj, "__json_serializable__"):
227            return obj.__json_serializable__()
228
229        # Let the base class default method raise the TypeError
230        return super().default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
233class StreamPartition(Partition):
234    """
235    This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
236
237    StreamPartitions are instantiated from a Stream and a stream_slice.
238
239    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
240    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
241    """
242
243    def __init__(
244        self,
245        stream: Stream,
246        _slice: Optional[Mapping[str, Any]],
247        message_repository: MessageRepository,
248        sync_mode: SyncMode,
249        cursor_field: Optional[List[str]],
250        state: Optional[MutableMapping[str, Any]],
251    ):
252        """
253        :param stream: The stream to delegate to
254        :param _slice: The partition's stream_slice
255        :param message_repository: The message repository to use to emit non-record messages
256        """
257        self._stream = stream
258        self._slice = _slice
259        self._message_repository = message_repository
260        self._sync_mode = sync_mode
261        self._cursor_field = cursor_field
262        self._state = state
263        self._hash = SliceHasher.hash(self._stream.name, self._slice)
264
265    def read(self) -> Iterable[Record]:
266        """
267        Read messages from the stream.
268        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
269        Otherwise, the message will be emitted on the message repository.
270        """
271        try:
272            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
273            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
274            #  * fetch_next_page
275            #  * parse_response
276            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
277            #  `if not stream_state` to know if it calls the Event stream or not
278            for record_data in self._stream.read_records(
279                cursor_field=self._cursor_field,
280                sync_mode=SyncMode.full_refresh,
281                stream_slice=copy.deepcopy(self._slice),
282                stream_state=self._state,
283            ):
284                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
285                # For now, file-based connectors have their own stream facade
286                if isinstance(record_data, Mapping):
287                    data_to_return = dict(record_data)
288                    self._stream.transformer.transform(
289                        data_to_return, self._stream.get_json_schema()
290                    )
291                    yield Record(
292                        data=data_to_return,
293                        stream_name=self.stream_name(),
294                        associated_slice=self._slice,  # type: ignore [arg-type]
295                    )
296                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
297                    yield Record(
298                        data=record_data.record.data or {},
299                        stream_name=self.stream_name(),
300                        associated_slice=self._slice,  # type: ignore [arg-type]
301                    )
302                else:
303                    self._message_repository.emit_message(record_data)
304        except Exception as e:
305            display_message = self._stream.get_error_display_message(e)
306            if display_message:
307                raise ExceptionWithDisplayMessage(display_message) from e
308            else:
309                raise e
310
311    def to_slice(self) -> Optional[Mapping[str, Any]]:
312        return self._slice
313
314    def __hash__(self) -> int:
315        return self._hash
316
317    def stream_name(self) -> str:
318        return self._stream.name
319
320    def __repr__(self) -> str:
321        return f"StreamPartition({self._stream.name}, {self._slice})"

This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface

StreamPartitions are instantiated from a Stream and a stream_slice.

This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.

StreamPartition( stream: airbyte_cdk.Stream, _slice: Optional[Mapping[str, Any]], message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
243    def __init__(
244        self,
245        stream: Stream,
246        _slice: Optional[Mapping[str, Any]],
247        message_repository: MessageRepository,
248        sync_mode: SyncMode,
249        cursor_field: Optional[List[str]],
250        state: Optional[MutableMapping[str, Any]],
251    ):
252        """
253        :param stream: The stream to delegate to
254        :param _slice: The partition's stream_slice
255        :param message_repository: The message repository to use to emit non-record messages
256        """
257        self._stream = stream
258        self._slice = _slice
259        self._message_repository = message_repository
260        self._sync_mode = sync_mode
261        self._cursor_field = cursor_field
262        self._state = state
263        self._hash = SliceHasher.hash(self._stream.name, self._slice)
Parameters
  • stream: The stream to delegate to
  • _slice: The partition's stream_slice
  • message_repository: The message repository to use to emit non-record messages
def read(self) -> Iterable[airbyte_cdk.Record]:
265    def read(self) -> Iterable[Record]:
266        """
267        Read messages from the stream.
268        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
269        Otherwise, the message will be emitted on the message repository.
270        """
271        try:
272            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
273            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
274            #  * fetch_next_page
275            #  * parse_response
276            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
277            #  `if not stream_state` to know if it calls the Event stream or not
278            for record_data in self._stream.read_records(
279                cursor_field=self._cursor_field,
280                sync_mode=SyncMode.full_refresh,
281                stream_slice=copy.deepcopy(self._slice),
282                stream_state=self._state,
283            ):
284                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
285                # For now, file-based connectors have their own stream facade
286                if isinstance(record_data, Mapping):
287                    data_to_return = dict(record_data)
288                    self._stream.transformer.transform(
289                        data_to_return, self._stream.get_json_schema()
290                    )
291                    yield Record(
292                        data=data_to_return,
293                        stream_name=self.stream_name(),
294                        associated_slice=self._slice,  # type: ignore [arg-type]
295                    )
296                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
297                    yield Record(
298                        data=record_data.record.data or {},
299                        stream_name=self.stream_name(),
300                        associated_slice=self._slice,  # type: ignore [arg-type]
301                    )
302                else:
303                    self._message_repository.emit_message(record_data)
304        except Exception as e:
305            display_message = self._stream.get_error_display_message(e)
306            if display_message:
307                raise ExceptionWithDisplayMessage(display_message) from e
308            else:
309                raise e

Read messages from the stream. If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. Otherwise, the message will be emitted on the message repository.

def to_slice(self) -> Optional[Mapping[str, Any]]:
311    def to_slice(self) -> Optional[Mapping[str, Any]]:
312        return self._slice

Converts the partition to a slice that can be serialized and deserialized.

Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested values (example)

Returns

A mapping representing a slice

def stream_name(self) -> str:
317    def stream_name(self) -> str:
318        return self._stream.name

Returns the name of the stream that this partition is reading from.

Returns

The name of the stream.

324class StreamPartitionGenerator(PartitionGenerator):
325    """
326    This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
327
328    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
329    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
330    """
331
332    def __init__(
333        self,
334        stream: Stream,
335        message_repository: MessageRepository,
336        sync_mode: SyncMode,
337        cursor_field: Optional[List[str]],
338        state: Optional[MutableMapping[str, Any]],
339    ):
340        """
341        :param stream: The stream to delegate to
342        :param message_repository: The message repository to use to emit non-record messages
343        """
344        self.message_repository = message_repository
345        self._stream = stream
346        self._sync_mode = sync_mode
347        self._cursor_field = cursor_field
348        self._state = state
349
350    def generate(self) -> Iterable[Partition]:
351        for s in self._stream.stream_slices(
352            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
353        ):
354            yield StreamPartition(
355                self._stream,
356                copy.deepcopy(s),
357                self.message_repository,
358                self._sync_mode,
359                self._cursor_field,
360                self._state,
361            )

This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices

This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.

StreamPartitionGenerator( stream: airbyte_cdk.Stream, message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
332    def __init__(
333        self,
334        stream: Stream,
335        message_repository: MessageRepository,
336        sync_mode: SyncMode,
337        cursor_field: Optional[List[str]],
338        state: Optional[MutableMapping[str, Any]],
339    ):
340        """
341        :param stream: The stream to delegate to
342        :param message_repository: The message repository to use to emit non-record messages
343        """
344        self.message_repository = message_repository
345        self._stream = stream
346        self._sync_mode = sync_mode
347        self._cursor_field = cursor_field
348        self._state = state
Parameters
  • stream: The stream to delegate to
  • message_repository: The message repository to use to emit non-record messages
message_repository
def generate( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
350    def generate(self) -> Iterable[Partition]:
351        for s in self._stream.stream_slices(
352            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
353        ):
354            yield StreamPartition(
355                self._stream,
356                copy.deepcopy(s),
357                self.message_repository,
358                self._sync_mode,
359                self._cursor_field,
360                self._state,
361            )

Generates partitions for a given sync mode.

Returns

An iterable of partitions