airbyte_cdk.sources.streams.concurrent.adapters

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import json
  7import logging
  8from functools import lru_cache
  9from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
 10
 11from typing_extensions import deprecated
 12
 13from airbyte_cdk.models import (
 14    AirbyteLogMessage,
 15    AirbyteMessage,
 16    AirbyteStream,
 17    ConfiguredAirbyteStream,
 18    Level,
 19    SyncMode,
 20    Type,
 21)
 22from airbyte_cdk.sources import AbstractSource, Source
 23from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 24from airbyte_cdk.sources.message import MessageRepository
 25from airbyte_cdk.sources.source import ExperimentalClassWarning
 26from airbyte_cdk.sources.streams import Stream
 27from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
 28from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 29from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
 30    AbstractAvailabilityStrategy,
 31    AlwaysAvailableAvailabilityStrategy,
 32)
 33from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
 34from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 35from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
 36from airbyte_cdk.sources.streams.concurrent.helpers import (
 37    get_cursor_field_from_stream,
 38    get_primary_key_from_stream,
 39)
 40from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 41from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 42from airbyte_cdk.sources.streams.core import StreamData
 43from airbyte_cdk.sources.types import Record
 44from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
 45from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 46from airbyte_cdk.utils.slice_hasher import SliceHasher
 47
 48"""
 49This module contains adapters to help enabling concurrency on Stream objects without needing to migrate to AbstractStream
 50"""
 51
 52
 53@deprecated(
 54    "This class is experimental. Use at your own risk.",
 55    category=ExperimentalClassWarning,
 56)
 57class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
 58    """
 59    The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
 60
 61    All methods either delegate to the wrapped AbstractStream or provide a default implementation.
 62    The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
 63    """
 64
 65    @classmethod
 66    def create_from_stream(
 67        cls,
 68        stream: Stream,
 69        source: AbstractSource,
 70        logger: logging.Logger,
 71        state: Optional[MutableMapping[str, Any]],
 72        cursor: Cursor,
 73    ) -> Stream:
 74        """
 75        Create a ConcurrentStream from a Stream object.
 76        :param source: The source
 77        :param stream: The stream
 78        :param max_workers: The maximum number of worker thread to use
 79        :return:
 80        """
 81        pk = get_primary_key_from_stream(stream.primary_key)
 82        cursor_field = get_cursor_field_from_stream(stream)
 83
 84        if not source.message_repository:
 85            raise ValueError(
 86                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 87            )
 88
 89        message_repository = source.message_repository
 90        return StreamFacade(
 91            DefaultStream(
 92                partition_generator=StreamPartitionGenerator(
 93                    stream,
 94                    message_repository,
 95                    SyncMode.full_refresh
 96                    if isinstance(cursor, FinalStateCursor)
 97                    else SyncMode.incremental,
 98                    [cursor_field] if cursor_field is not None else None,
 99                    state,
100                ),
101                name=stream.name,
102                namespace=stream.namespace,
103                json_schema=stream.get_json_schema(),
104                availability_strategy=AlwaysAvailableAvailabilityStrategy(),
105                primary_key=pk,
106                cursor_field=cursor_field,
107                logger=logger,
108                cursor=cursor,
109            ),
110            stream,
111            cursor,
112            slice_logger=source._slice_logger,
113            logger=logger,
114        )
115
116    @property
117    def state(self) -> MutableMapping[str, Any]:
118        raise NotImplementedError(
119            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
120        )
121
122    @state.setter
123    def state(self, value: Mapping[str, Any]) -> None:
124        if "state" in dir(self._legacy_stream):
125            self._legacy_stream.state = value  # type: ignore  # validating `state` is attribute of stream using `if` above
126
127    def __init__(
128        self,
129        stream: DefaultStream,
130        legacy_stream: Stream,
131        cursor: Cursor,
132        slice_logger: SliceLogger,
133        logger: logging.Logger,
134    ):
135        """
136        :param stream: The underlying AbstractStream
137        """
138        self._abstract_stream = stream
139        self._legacy_stream = legacy_stream
140        self._cursor = cursor
141        self._slice_logger = slice_logger
142        self._logger = logger
143
144    def read(
145        self,
146        configured_stream: ConfiguredAirbyteStream,
147        logger: logging.Logger,
148        slice_logger: SliceLogger,
149        stream_state: MutableMapping[str, Any],
150        state_manager: ConnectorStateManager,
151        internal_config: InternalConfig,
152    ) -> Iterable[StreamData]:
153        yield from self._read_records()
154
155    def read_records(
156        self,
157        sync_mode: SyncMode,
158        cursor_field: Optional[List[str]] = None,
159        stream_slice: Optional[Mapping[str, Any]] = None,
160        stream_state: Optional[Mapping[str, Any]] = None,
161    ) -> Iterable[StreamData]:
162        try:
163            yield from self._read_records()
164        except Exception as exc:
165            if hasattr(self._cursor, "state"):
166                state = str(self._cursor.state)
167            else:
168                # This shouldn't happen if the ConcurrentCursor was used
169                state = "unknown; no state attribute was available on the cursor"
170            yield AirbyteMessage(
171                type=Type.LOG,
172                log=AirbyteLogMessage(
173                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
174                ),
175            )
176            raise exc
177
178    def _read_records(self) -> Iterable[StreamData]:
179        for partition in self._abstract_stream.generate_partitions():
180            if self._slice_logger.should_log_slice_message(self._logger):
181                yield self._slice_logger.create_slice_log_message(partition.to_slice())
182            for record in partition.read():
183                yield record.data
184
185    @property
186    def name(self) -> str:
187        return self._abstract_stream.name
188
189    @property
190    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
191        # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface
192        return self.as_airbyte_stream().source_defined_primary_key  # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]]
193
194    @property
195    def cursor_field(self) -> Union[str, List[str]]:
196        if self._abstract_stream.cursor_field is None:
197            return []
198        else:
199            return self._abstract_stream.cursor_field
200
201    @property
202    def cursor(self) -> Optional[Cursor]:  # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
203        return self._cursor
204
205    @lru_cache(maxsize=None)
206    def get_json_schema(self) -> Mapping[str, Any]:
207        return self._abstract_stream.get_json_schema()
208
209    @property
210    def supports_incremental(self) -> bool:
211        return self._legacy_stream.supports_incremental
212
213    def check_availability(
214        self, logger: logging.Logger, source: Optional["Source"] = None
215    ) -> Tuple[bool, Optional[str]]:
216        """
217        Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
218        :param logger: (ignored)
219        :param source:  (ignored)
220        :return:
221        """
222        availability = self._abstract_stream.check_availability()
223        return availability.is_available(), availability.message()
224
225    def as_airbyte_stream(self) -> AirbyteStream:
226        return self._abstract_stream.as_airbyte_stream()
227
228    def log_stream_sync_configuration(self) -> None:
229        self._abstract_stream.log_stream_sync_configuration()
230
231    def get_underlying_stream(self) -> DefaultStream:
232        return self._abstract_stream
233
234
235class SliceEncoder(json.JSONEncoder):
236    def default(self, obj: Any) -> Any:
237        if hasattr(obj, "__json_serializable__"):
238            return obj.__json_serializable__()
239
240        # Let the base class default method raise the TypeError
241        return super().default(obj)
242
243
244class StreamPartition(Partition):
245    """
246    This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
247
248    StreamPartitions are instantiated from a Stream and a stream_slice.
249
250    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
251    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
252    """
253
254    def __init__(
255        self,
256        stream: Stream,
257        _slice: Optional[Mapping[str, Any]],
258        message_repository: MessageRepository,
259        sync_mode: SyncMode,
260        cursor_field: Optional[List[str]],
261        state: Optional[MutableMapping[str, Any]],
262    ):
263        """
264        :param stream: The stream to delegate to
265        :param _slice: The partition's stream_slice
266        :param message_repository: The message repository to use to emit non-record messages
267        """
268        self._stream = stream
269        self._slice = _slice
270        self._message_repository = message_repository
271        self._sync_mode = sync_mode
272        self._cursor_field = cursor_field
273        self._state = state
274        self._hash = SliceHasher.hash(self._stream.name, self._slice)
275
276    def read(self) -> Iterable[Record]:
277        """
278        Read messages from the stream.
279        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
280        Otherwise, the message will be emitted on the message repository.
281        """
282        try:
283            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
284            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
285            #  * fetch_next_page
286            #  * parse_response
287            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
288            #  `if not stream_state` to know if it calls the Event stream or not
289            for record_data in self._stream.read_records(
290                cursor_field=self._cursor_field,
291                sync_mode=SyncMode.full_refresh,
292                stream_slice=copy.deepcopy(self._slice),
293                stream_state=self._state,
294            ):
295                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
296                # For now, file-based connectors have their own stream facade
297                if isinstance(record_data, Mapping):
298                    data_to_return = dict(record_data)
299                    self._stream.transformer.transform(
300                        data_to_return, self._stream.get_json_schema()
301                    )
302                    yield Record(
303                        data=data_to_return,
304                        stream_name=self.stream_name(),
305                        associated_slice=self._slice,  # type: ignore [arg-type]
306                    )
307                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
308                    yield Record(
309                        data=record_data.record.data or {},
310                        stream_name=self.stream_name(),
311                        associated_slice=self._slice,  # type: ignore [arg-type]
312                    )
313                else:
314                    self._message_repository.emit_message(record_data)
315        except Exception as e:
316            display_message = self._stream.get_error_display_message(e)
317            if display_message:
318                raise ExceptionWithDisplayMessage(display_message) from e
319            else:
320                raise e
321
322    def to_slice(self) -> Optional[Mapping[str, Any]]:
323        return self._slice
324
325    def __hash__(self) -> int:
326        return self._hash
327
328    def stream_name(self) -> str:
329        return self._stream.name
330
331    def __repr__(self) -> str:
332        return f"StreamPartition({self._stream.name}, {self._slice})"
333
334
335class StreamPartitionGenerator(PartitionGenerator):
336    """
337    This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
338
339    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
340    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
341    """
342
343    def __init__(
344        self,
345        stream: Stream,
346        message_repository: MessageRepository,
347        sync_mode: SyncMode,
348        cursor_field: Optional[List[str]],
349        state: Optional[MutableMapping[str, Any]],
350    ):
351        """
352        :param stream: The stream to delegate to
353        :param message_repository: The message repository to use to emit non-record messages
354        """
355        self.message_repository = message_repository
356        self._stream = stream
357        self._sync_mode = sync_mode
358        self._cursor_field = cursor_field
359        self._state = state
360
361    def generate(self) -> Iterable[Partition]:
362        for s in self._stream.stream_slices(
363            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
364        ):
365            yield StreamPartition(
366                self._stream,
367                copy.deepcopy(s),
368                self.message_repository,
369                self._sync_mode,
370                self._cursor_field,
371                self._state,
372            )
373
374
375@deprecated(
376    "Availability strategy has been soft deprecated. Do not use. Class is subject to removal",
377    category=ExperimentalClassWarning,
378)
379class AvailabilityStrategyFacade(AvailabilityStrategy):
380    def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
381        self._abstract_availability_strategy = abstract_availability_strategy
382
383    def check_availability(
384        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
385    ) -> Tuple[bool, Optional[str]]:
386        """
387        Checks stream availability.
388
389        Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.
390
391        :param stream: (unused)
392        :param logger: logger object to use
393        :param source: (unused)
394        :return: A tuple of (boolean, str). If boolean is true, then the stream
395        """
396        stream_availability = self._abstract_availability_strategy.check_availability(logger)
397        return stream_availability.is_available(), stream_availability.message()
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
class StreamFacade(airbyte_cdk.sources.streams.concurrent.abstract_stream_facade.AbstractStreamFacade[airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream], airbyte_cdk.sources.streams.core.Stream):
 54@deprecated(
 55    "This class is experimental. Use at your own risk.",
 56    category=ExperimentalClassWarning,
 57)
 58class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
 59    """
 60    The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
 61
 62    All methods either delegate to the wrapped AbstractStream or provide a default implementation.
 63    The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
 64    """
 65
 66    @classmethod
 67    def create_from_stream(
 68        cls,
 69        stream: Stream,
 70        source: AbstractSource,
 71        logger: logging.Logger,
 72        state: Optional[MutableMapping[str, Any]],
 73        cursor: Cursor,
 74    ) -> Stream:
 75        """
 76        Create a ConcurrentStream from a Stream object.
 77        :param source: The source
 78        :param stream: The stream
 79        :param max_workers: The maximum number of worker thread to use
 80        :return:
 81        """
 82        pk = get_primary_key_from_stream(stream.primary_key)
 83        cursor_field = get_cursor_field_from_stream(stream)
 84
 85        if not source.message_repository:
 86            raise ValueError(
 87                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 88            )
 89
 90        message_repository = source.message_repository
 91        return StreamFacade(
 92            DefaultStream(
 93                partition_generator=StreamPartitionGenerator(
 94                    stream,
 95                    message_repository,
 96                    SyncMode.full_refresh
 97                    if isinstance(cursor, FinalStateCursor)
 98                    else SyncMode.incremental,
 99                    [cursor_field] if cursor_field is not None else None,
100                    state,
101                ),
102                name=stream.name,
103                namespace=stream.namespace,
104                json_schema=stream.get_json_schema(),
105                availability_strategy=AlwaysAvailableAvailabilityStrategy(),
106                primary_key=pk,
107                cursor_field=cursor_field,
108                logger=logger,
109                cursor=cursor,
110            ),
111            stream,
112            cursor,
113            slice_logger=source._slice_logger,
114            logger=logger,
115        )
116
117    @property
118    def state(self) -> MutableMapping[str, Any]:
119        raise NotImplementedError(
120            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
121        )
122
123    @state.setter
124    def state(self, value: Mapping[str, Any]) -> None:
125        if "state" in dir(self._legacy_stream):
126            self._legacy_stream.state = value  # type: ignore  # validating `state` is attribute of stream using `if` above
127
128    def __init__(
129        self,
130        stream: DefaultStream,
131        legacy_stream: Stream,
132        cursor: Cursor,
133        slice_logger: SliceLogger,
134        logger: logging.Logger,
135    ):
136        """
137        :param stream: The underlying AbstractStream
138        """
139        self._abstract_stream = stream
140        self._legacy_stream = legacy_stream
141        self._cursor = cursor
142        self._slice_logger = slice_logger
143        self._logger = logger
144
145    def read(
146        self,
147        configured_stream: ConfiguredAirbyteStream,
148        logger: logging.Logger,
149        slice_logger: SliceLogger,
150        stream_state: MutableMapping[str, Any],
151        state_manager: ConnectorStateManager,
152        internal_config: InternalConfig,
153    ) -> Iterable[StreamData]:
154        yield from self._read_records()
155
156    def read_records(
157        self,
158        sync_mode: SyncMode,
159        cursor_field: Optional[List[str]] = None,
160        stream_slice: Optional[Mapping[str, Any]] = None,
161        stream_state: Optional[Mapping[str, Any]] = None,
162    ) -> Iterable[StreamData]:
163        try:
164            yield from self._read_records()
165        except Exception as exc:
166            if hasattr(self._cursor, "state"):
167                state = str(self._cursor.state)
168            else:
169                # This shouldn't happen if the ConcurrentCursor was used
170                state = "unknown; no state attribute was available on the cursor"
171            yield AirbyteMessage(
172                type=Type.LOG,
173                log=AirbyteLogMessage(
174                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
175                ),
176            )
177            raise exc
178
179    def _read_records(self) -> Iterable[StreamData]:
180        for partition in self._abstract_stream.generate_partitions():
181            if self._slice_logger.should_log_slice_message(self._logger):
182                yield self._slice_logger.create_slice_log_message(partition.to_slice())
183            for record in partition.read():
184                yield record.data
185
186    @property
187    def name(self) -> str:
188        return self._abstract_stream.name
189
190    @property
191    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
192        # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface
193        return self.as_airbyte_stream().source_defined_primary_key  # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]]
194
195    @property
196    def cursor_field(self) -> Union[str, List[str]]:
197        if self._abstract_stream.cursor_field is None:
198            return []
199        else:
200            return self._abstract_stream.cursor_field
201
202    @property
203    def cursor(self) -> Optional[Cursor]:  # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
204        return self._cursor
205
206    @lru_cache(maxsize=None)
207    def get_json_schema(self) -> Mapping[str, Any]:
208        return self._abstract_stream.get_json_schema()
209
210    @property
211    def supports_incremental(self) -> bool:
212        return self._legacy_stream.supports_incremental
213
214    def check_availability(
215        self, logger: logging.Logger, source: Optional["Source"] = None
216    ) -> Tuple[bool, Optional[str]]:
217        """
218        Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
219        :param logger: (ignored)
220        :param source:  (ignored)
221        :return:
222        """
223        availability = self._abstract_stream.check_availability()
224        return availability.is_available(), availability.message()
225
226    def as_airbyte_stream(self) -> AirbyteStream:
227        return self._abstract_stream.as_airbyte_stream()
228
229    def log_stream_sync_configuration(self) -> None:
230        self._abstract_stream.log_stream_sync_configuration()
231
232    def get_underlying_stream(self) -> DefaultStream:
233        return self._abstract_stream

The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.

All methods either delegate to the wrapped AbstractStream or provide a default implementation. The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.

StreamFacade( stream: airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream, legacy_stream: airbyte_cdk.Stream, cursor: airbyte_cdk.Cursor, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, logger: logging.Logger)
128    def __init__(
129        self,
130        stream: DefaultStream,
131        legacy_stream: Stream,
132        cursor: Cursor,
133        slice_logger: SliceLogger,
134        logger: logging.Logger,
135    ):
136        """
137        :param stream: The underlying AbstractStream
138        """
139        self._abstract_stream = stream
140        self._legacy_stream = legacy_stream
141        self._cursor = cursor
142        self._slice_logger = slice_logger
143        self._logger = logger
Parameters
  • stream: The underlying AbstractStream
@classmethod
def create_from_stream( cls, stream: airbyte_cdk.Stream, source: airbyte_cdk.AbstractSource, logger: logging.Logger, state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.Cursor) -> airbyte_cdk.Stream:
 66    @classmethod
 67    def create_from_stream(
 68        cls,
 69        stream: Stream,
 70        source: AbstractSource,
 71        logger: logging.Logger,
 72        state: Optional[MutableMapping[str, Any]],
 73        cursor: Cursor,
 74    ) -> Stream:
 75        """
 76        Create a ConcurrentStream from a Stream object.
 77        :param source: The source
 78        :param stream: The stream
 79        :param max_workers: The maximum number of worker thread to use
 80        :return:
 81        """
 82        pk = get_primary_key_from_stream(stream.primary_key)
 83        cursor_field = get_cursor_field_from_stream(stream)
 84
 85        if not source.message_repository:
 86            raise ValueError(
 87                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 88            )
 89
 90        message_repository = source.message_repository
 91        return StreamFacade(
 92            DefaultStream(
 93                partition_generator=StreamPartitionGenerator(
 94                    stream,
 95                    message_repository,
 96                    SyncMode.full_refresh
 97                    if isinstance(cursor, FinalStateCursor)
 98                    else SyncMode.incremental,
 99                    [cursor_field] if cursor_field is not None else None,
100                    state,
101                ),
102                name=stream.name,
103                namespace=stream.namespace,
104                json_schema=stream.get_json_schema(),
105                availability_strategy=AlwaysAvailableAvailabilityStrategy(),
106                primary_key=pk,
107                cursor_field=cursor_field,
108                logger=logger,
109                cursor=cursor,
110            ),
111            stream,
112            cursor,
113            slice_logger=source._slice_logger,
114            logger=logger,
115        )

Create a ConcurrentStream from a Stream object.

Parameters
  • source: The source
  • stream: The stream
  • max_workers: The maximum number of worker thread to use
Returns
state: MutableMapping[str, Any]
117    @property
118    def state(self) -> MutableMapping[str, Any]:
119        raise NotImplementedError(
120            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
121        )
def check_availability( self, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
214    def check_availability(
215        self, logger: logging.Logger, source: Optional["Source"] = None
216    ) -> Tuple[bool, Optional[str]]:
217        """
218        Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
219        :param logger: (ignored)
220        :param source:  (ignored)
221        :return:
222        """
223        availability = self._abstract_stream.check_availability()
224        return availability.is_available(), availability.message()

Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters

Parameters
  • logger: (ignored)
  • source: (ignored)
Returns
def get_underlying_stream( self) -> airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream:
232    def get_underlying_stream(self) -> DefaultStream:
233        return self._abstract_stream

Return the underlying stream facade object.

class SliceEncoder(json.encoder.JSONEncoder):
236class SliceEncoder(json.JSONEncoder):
237    def default(self, obj: Any) -> Any:
238        if hasattr(obj, "__json_serializable__"):
239            return obj.__json_serializable__()
240
241        # Let the base class default method raise the TypeError
242        return super().default(obj)

Extensible JSON https://json.org encoder for Python data structures.

Supports the following objects and types by default:

+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+

To extend this to recognize other objects, subclass and implement a .default() method with another method that returns a serializable object for o if possible, otherwise it should call the superclass implementation (to raise TypeError).

def default(self, obj: Any) -> Any:
237    def default(self, obj: Any) -> Any:
238        if hasattr(obj, "__json_serializable__"):
239            return obj.__json_serializable__()
240
241        # Let the base class default method raise the TypeError
242        return super().default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
245class StreamPartition(Partition):
246    """
247    This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
248
249    StreamPartitions are instantiated from a Stream and a stream_slice.
250
251    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
252    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
253    """
254
255    def __init__(
256        self,
257        stream: Stream,
258        _slice: Optional[Mapping[str, Any]],
259        message_repository: MessageRepository,
260        sync_mode: SyncMode,
261        cursor_field: Optional[List[str]],
262        state: Optional[MutableMapping[str, Any]],
263    ):
264        """
265        :param stream: The stream to delegate to
266        :param _slice: The partition's stream_slice
267        :param message_repository: The message repository to use to emit non-record messages
268        """
269        self._stream = stream
270        self._slice = _slice
271        self._message_repository = message_repository
272        self._sync_mode = sync_mode
273        self._cursor_field = cursor_field
274        self._state = state
275        self._hash = SliceHasher.hash(self._stream.name, self._slice)
276
277    def read(self) -> Iterable[Record]:
278        """
279        Read messages from the stream.
280        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
281        Otherwise, the message will be emitted on the message repository.
282        """
283        try:
284            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
285            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
286            #  * fetch_next_page
287            #  * parse_response
288            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
289            #  `if not stream_state` to know if it calls the Event stream or not
290            for record_data in self._stream.read_records(
291                cursor_field=self._cursor_field,
292                sync_mode=SyncMode.full_refresh,
293                stream_slice=copy.deepcopy(self._slice),
294                stream_state=self._state,
295            ):
296                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
297                # For now, file-based connectors have their own stream facade
298                if isinstance(record_data, Mapping):
299                    data_to_return = dict(record_data)
300                    self._stream.transformer.transform(
301                        data_to_return, self._stream.get_json_schema()
302                    )
303                    yield Record(
304                        data=data_to_return,
305                        stream_name=self.stream_name(),
306                        associated_slice=self._slice,  # type: ignore [arg-type]
307                    )
308                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
309                    yield Record(
310                        data=record_data.record.data or {},
311                        stream_name=self.stream_name(),
312                        associated_slice=self._slice,  # type: ignore [arg-type]
313                    )
314                else:
315                    self._message_repository.emit_message(record_data)
316        except Exception as e:
317            display_message = self._stream.get_error_display_message(e)
318            if display_message:
319                raise ExceptionWithDisplayMessage(display_message) from e
320            else:
321                raise e
322
323    def to_slice(self) -> Optional[Mapping[str, Any]]:
324        return self._slice
325
326    def __hash__(self) -> int:
327        return self._hash
328
329    def stream_name(self) -> str:
330        return self._stream.name
331
332    def __repr__(self) -> str:
333        return f"StreamPartition({self._stream.name}, {self._slice})"

This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface

StreamPartitions are instantiated from a Stream and a stream_slice.

This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.

StreamPartition( stream: airbyte_cdk.Stream, _slice: Optional[Mapping[str, Any]], message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
255    def __init__(
256        self,
257        stream: Stream,
258        _slice: Optional[Mapping[str, Any]],
259        message_repository: MessageRepository,
260        sync_mode: SyncMode,
261        cursor_field: Optional[List[str]],
262        state: Optional[MutableMapping[str, Any]],
263    ):
264        """
265        :param stream: The stream to delegate to
266        :param _slice: The partition's stream_slice
267        :param message_repository: The message repository to use to emit non-record messages
268        """
269        self._stream = stream
270        self._slice = _slice
271        self._message_repository = message_repository
272        self._sync_mode = sync_mode
273        self._cursor_field = cursor_field
274        self._state = state
275        self._hash = SliceHasher.hash(self._stream.name, self._slice)
Parameters
  • stream: The stream to delegate to
  • _slice: The partition's stream_slice
  • message_repository: The message repository to use to emit non-record messages
def read(self) -> Iterable[airbyte_cdk.Record]:
277    def read(self) -> Iterable[Record]:
278        """
279        Read messages from the stream.
280        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
281        Otherwise, the message will be emitted on the message repository.
282        """
283        try:
284            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
285            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
286            #  * fetch_next_page
287            #  * parse_response
288            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
289            #  `if not stream_state` to know if it calls the Event stream or not
290            for record_data in self._stream.read_records(
291                cursor_field=self._cursor_field,
292                sync_mode=SyncMode.full_refresh,
293                stream_slice=copy.deepcopy(self._slice),
294                stream_state=self._state,
295            ):
296                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
297                # For now, file-based connectors have their own stream facade
298                if isinstance(record_data, Mapping):
299                    data_to_return = dict(record_data)
300                    self._stream.transformer.transform(
301                        data_to_return, self._stream.get_json_schema()
302                    )
303                    yield Record(
304                        data=data_to_return,
305                        stream_name=self.stream_name(),
306                        associated_slice=self._slice,  # type: ignore [arg-type]
307                    )
308                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
309                    yield Record(
310                        data=record_data.record.data or {},
311                        stream_name=self.stream_name(),
312                        associated_slice=self._slice,  # type: ignore [arg-type]
313                    )
314                else:
315                    self._message_repository.emit_message(record_data)
316        except Exception as e:
317            display_message = self._stream.get_error_display_message(e)
318            if display_message:
319                raise ExceptionWithDisplayMessage(display_message) from e
320            else:
321                raise e

Read messages from the stream. If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. Otherwise, the message will be emitted on the message repository.

def to_slice(self) -> Optional[Mapping[str, Any]]:
323    def to_slice(self) -> Optional[Mapping[str, Any]]:
324        return self._slice

Converts the partition to a slice that can be serialized and deserialized.

Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested values (example)

Returns

A mapping representing a slice

def stream_name(self) -> str:
329    def stream_name(self) -> str:
330        return self._stream.name

Returns the name of the stream that this partition is reading from.

Returns

The name of the stream.

336class StreamPartitionGenerator(PartitionGenerator):
337    """
338    This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
339
340    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
341    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
342    """
343
344    def __init__(
345        self,
346        stream: Stream,
347        message_repository: MessageRepository,
348        sync_mode: SyncMode,
349        cursor_field: Optional[List[str]],
350        state: Optional[MutableMapping[str, Any]],
351    ):
352        """
353        :param stream: The stream to delegate to
354        :param message_repository: The message repository to use to emit non-record messages
355        """
356        self.message_repository = message_repository
357        self._stream = stream
358        self._sync_mode = sync_mode
359        self._cursor_field = cursor_field
360        self._state = state
361
362    def generate(self) -> Iterable[Partition]:
363        for s in self._stream.stream_slices(
364            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
365        ):
366            yield StreamPartition(
367                self._stream,
368                copy.deepcopy(s),
369                self.message_repository,
370                self._sync_mode,
371                self._cursor_field,
372                self._state,
373            )

This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices

This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.

StreamPartitionGenerator( stream: airbyte_cdk.Stream, message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
344    def __init__(
345        self,
346        stream: Stream,
347        message_repository: MessageRepository,
348        sync_mode: SyncMode,
349        cursor_field: Optional[List[str]],
350        state: Optional[MutableMapping[str, Any]],
351    ):
352        """
353        :param stream: The stream to delegate to
354        :param message_repository: The message repository to use to emit non-record messages
355        """
356        self.message_repository = message_repository
357        self._stream = stream
358        self._sync_mode = sync_mode
359        self._cursor_field = cursor_field
360        self._state = state
Parameters
  • stream: The stream to delegate to
  • message_repository: The message repository to use to emit non-record messages
message_repository
def generate( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
362    def generate(self) -> Iterable[Partition]:
363        for s in self._stream.stream_slices(
364            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
365        ):
366            yield StreamPartition(
367                self._stream,
368                copy.deepcopy(s),
369                self.message_repository,
370                self._sync_mode,
371                self._cursor_field,
372                self._state,
373            )

Generates partitions for a given sync mode.

Returns

An iterable of partitions

@deprecated('Availability strategy has been soft deprecated. Do not use. Class is subject to removal', category=ExperimentalClassWarning)
class AvailabilityStrategyFacade(airbyte_cdk.sources.streams.availability_strategy.AvailabilityStrategy):
376@deprecated(
377    "Availability strategy has been soft deprecated. Do not use. Class is subject to removal",
378    category=ExperimentalClassWarning,
379)
380class AvailabilityStrategyFacade(AvailabilityStrategy):
381    def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
382        self._abstract_availability_strategy = abstract_availability_strategy
383
384    def check_availability(
385        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
386    ) -> Tuple[bool, Optional[str]]:
387        """
388        Checks stream availability.
389
390        Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.
391
392        :param stream: (unused)
393        :param logger: logger object to use
394        :param source: (unused)
395        :return: A tuple of (boolean, str). If boolean is true, then the stream
396        """
397        stream_availability = self._abstract_availability_strategy.check_availability(logger)
398        return stream_availability.is_available(), stream_availability.message()

Abstract base class for checking stream availability.

AvailabilityStrategyFacade( abstract_availability_strategy: airbyte_cdk.sources.streams.concurrent.availability_strategy.AbstractAvailabilityStrategy)
381    def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
382        self._abstract_availability_strategy = abstract_availability_strategy
def check_availability( self, stream: airbyte_cdk.Stream, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
384    def check_availability(
385        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
386    ) -> Tuple[bool, Optional[str]]:
387        """
388        Checks stream availability.
389
390        Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.
391
392        :param stream: (unused)
393        :param logger: logger object to use
394        :param source: (unused)
395        :return: A tuple of (boolean, str). If boolean is true, then the stream
396        """
397        stream_availability = self._abstract_availability_strategy.check_availability(logger)
398        return stream_availability.is_available(), stream_availability.message()

Checks stream availability.

Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.

Parameters
  • stream: (unused)
  • logger: logger object to use
  • source: (unused)
Returns

A tuple of (boolean, str). If boolean is true, then the stream