
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  5import copy
  6import json
  7import logging
  8from functools import lru_cache
  9from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
 11from typing_extensions import deprecated
 13from airbyte_cdk.models import (
 14    AirbyteLogMessage,
 15    AirbyteMessage,
 16    AirbyteStream,
 17    ConfiguredAirbyteStream,
 18    Level,
 19    SyncMode,
 20    Type,
 22from airbyte_cdk.sources import AbstractSource, Source
 23from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 24from airbyte_cdk.sources.message import MessageRepository
 25from airbyte_cdk.sources.source import ExperimentalClassWarning
 26from airbyte_cdk.sources.streams import Stream
 27from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
 28from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 29from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
 30    AbstractAvailabilityStrategy,
 31    AlwaysAvailableAvailabilityStrategy,
 33from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
 34from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 35from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
 36from airbyte_cdk.sources.streams.concurrent.helpers import (
 37    get_cursor_field_from_stream,
 38    get_primary_key_from_stream,
 40from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 41from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 42from airbyte_cdk.sources.streams.core import StreamData
 43from airbyte_cdk.sources.types import Record
 44from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
 45from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 46from airbyte_cdk.utils.slice_hasher import SliceHasher
 49This module contains adapters to help enabling concurrency on Stream objects without needing to migrate to AbstractStream
 54    "This class is experimental. Use at your own risk.",
 55    category=ExperimentalClassWarning,
 57class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
 58    """
 59    The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
 61    All methods either delegate to the wrapped AbstractStream or provide a default implementation.
 62    The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
 63    """
 65    @classmethod
 66    def create_from_stream(
 67        cls,
 68        stream: Stream,
 69        source: AbstractSource,
 70        logger: logging.Logger,
 71        state: Optional[MutableMapping[str, Any]],
 72        cursor: Cursor,
 73    ) -> Stream:
 74        """
 75        Create a ConcurrentStream from a Stream object.
 76        :param source: The source
 77        :param stream: The stream
 78        :param max_workers: The maximum number of worker thread to use
 79        :return:
 80        """
 81        pk = get_primary_key_from_stream(stream.primary_key)
 82        cursor_field = get_cursor_field_from_stream(stream)
 84        if not source.message_repository:
 85            raise ValueError(
 86                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 87            )
 89        message_repository = source.message_repository
 90        return StreamFacade(
 91            DefaultStream(
 92                partition_generator=StreamPartitionGenerator(
 93                    stream,
 94                    message_repository,
 95                    SyncMode.full_refresh
 96                    if isinstance(cursor, FinalStateCursor)
 97                    else SyncMode.incremental,
 98                    [cursor_field] if cursor_field is not None else None,
 99                    state,
100                ),
101      ,
102                namespace=stream.namespace,
103                json_schema=stream.get_json_schema(),
104                availability_strategy=AlwaysAvailableAvailabilityStrategy(),
105                primary_key=pk,
106                cursor_field=cursor_field,
107                logger=logger,
108                cursor=cursor,
109            ),
110            stream,
111            cursor,
112            slice_logger=source._slice_logger,
113            logger=logger,
114        )
116    @property
117    def state(self) -> MutableMapping[str, Any]:
118        raise NotImplementedError(
119            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
120        )
122    @state.setter
123    def state(self, value: Mapping[str, Any]) -> None:
124        if "state" in dir(self._legacy_stream):
125            self._legacy_stream.state = value  # type: ignore  # validating `state` is attribute of stream using `if` above
127    def __init__(
128        self,
129        stream: DefaultStream,
130        legacy_stream: Stream,
131        cursor: Cursor,
132        slice_logger: SliceLogger,
133        logger: logging.Logger,
134    ):
135        """
136        :param stream: The underlying AbstractStream
137        """
138        self._abstract_stream = stream
139        self._legacy_stream = legacy_stream
140        self._cursor = cursor
141        self._slice_logger = slice_logger
142        self._logger = logger
144    def read(
145        self,
146        configured_stream: ConfiguredAirbyteStream,
147        logger: logging.Logger,
148        slice_logger: SliceLogger,
149        stream_state: MutableMapping[str, Any],
150        state_manager: ConnectorStateManager,
151        internal_config: InternalConfig,
152    ) -> Iterable[StreamData]:
153        yield from self._read_records()
155    def read_records(
156        self,
157        sync_mode: SyncMode,
158        cursor_field: Optional[List[str]] = None,
159        stream_slice: Optional[Mapping[str, Any]] = None,
160        stream_state: Optional[Mapping[str, Any]] = None,
161    ) -> Iterable[StreamData]:
162        try:
163            yield from self._read_records()
164        except Exception as exc:
165            if hasattr(self._cursor, "state"):
166                state = str(self._cursor.state)
167            else:
168                # This shouldn't happen if the ConcurrentCursor was used
169                state = "unknown; no state attribute was available on the cursor"
170            yield AirbyteMessage(
171                type=Type.LOG,
172                log=AirbyteLogMessage(
173                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
174                ),
175            )
176            raise exc
178    def _read_records(self) -> Iterable[StreamData]:
179        for partition in self._abstract_stream.generate_partitions():
180            if self._slice_logger.should_log_slice_message(self._logger):
181                yield self._slice_logger.create_slice_log_message(partition.to_slice())
182            for record in
183                yield
185    @property
186    def name(self) -> str:
187        return
189    @property
190    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
191        # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface
192        return self.as_airbyte_stream().source_defined_primary_key  # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]]
194    @property
195    def cursor_field(self) -> Union[str, List[str]]:
196        if self._abstract_stream.cursor_field is None:
197            return []
198        else:
199            return self._abstract_stream.cursor_field
201    @property
202    def cursor(self) -> Optional[Cursor]:  # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
203        return self._cursor
205    @lru_cache(maxsize=None)
206    def get_json_schema(self) -> Mapping[str, Any]:
207        return self._abstract_stream.get_json_schema()
209    @property
210    def supports_incremental(self) -> bool:
211        return self._legacy_stream.supports_incremental
213    def check_availability(
214        self, logger: logging.Logger, source: Optional["Source"] = None
215    ) -> Tuple[bool, Optional[str]]:
216        """
217        Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
218        :param logger: (ignored)
219        :param source:  (ignored)
220        :return:
221        """
222        availability = self._abstract_stream.check_availability()
223        return availability.is_available(), availability.message()
225    def as_airbyte_stream(self) -> AirbyteStream:
226        return self._abstract_stream.as_airbyte_stream()
228    def log_stream_sync_configuration(self) -> None:
229        self._abstract_stream.log_stream_sync_configuration()
231    def get_underlying_stream(self) -> DefaultStream:
232        return self._abstract_stream
235class SliceEncoder(json.JSONEncoder):
236    def default(self, obj: Any) -> Any:
237        if hasattr(obj, "__json_serializable__"):
238            return obj.__json_serializable__()
240        # Let the base class default method raise the TypeError
241        return super().default(obj)
244class StreamPartition(Partition):
245    """
246    This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
248    StreamPartitions are instantiated from a Stream and a stream_slice.
250    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
251    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
252    """
254    def __init__(
255        self,
256        stream: Stream,
257        _slice: Optional[Mapping[str, Any]],
258        message_repository: MessageRepository,
259        sync_mode: SyncMode,
260        cursor_field: Optional[List[str]],
261        state: Optional[MutableMapping[str, Any]],
262    ):
263        """
264        :param stream: The stream to delegate to
265        :param _slice: The partition's stream_slice
266        :param message_repository: The message repository to use to emit non-record messages
267        """
268        self._stream = stream
269        self._slice = _slice
270        self._message_repository = message_repository
271        self._sync_mode = sync_mode
272        self._cursor_field = cursor_field
273        self._state = state
274        self._hash = SliceHasher.hash(, self._slice)
276    def read(self) -> Iterable[Record]:
277        """
278        Read messages from the stream.
279        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
280        Otherwise, the message will be emitted on the message repository.
281        """
282        try:
283            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
284            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
285            #  * fetch_next_page
286            #  * parse_response
287            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
288            #  `if not stream_state` to know if it calls the Event stream or not
289            for record_data in self._stream.read_records(
290                cursor_field=self._cursor_field,
291                sync_mode=SyncMode.full_refresh,
292                stream_slice=copy.deepcopy(self._slice),
293                stream_state=self._state,
294            ):
295                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
296                # For now, file-based connectors have their own stream facade
297                if isinstance(record_data, Mapping):
298                    data_to_return = dict(record_data)
299                    self._stream.transformer.transform(
300                        data_to_return, self._stream.get_json_schema()
301                    )
302                    yield Record(
303                        data=data_to_return,
304                        stream_name=self.stream_name(),
305                        associated_slice=self._slice,  # type: ignore [arg-type]
306                    )
307                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
308                    yield Record(
309               or {},
310                        stream_name=self.stream_name(),
311                        associated_slice=self._slice,  # type: ignore [arg-type]
312                    )
313                else:
314                    self._message_repository.emit_message(record_data)
315        except Exception as e:
316            display_message = self._stream.get_error_display_message(e)
317            if display_message:
318                raise ExceptionWithDisplayMessage(display_message) from e
319            else:
320                raise e
322    def to_slice(self) -> Optional[Mapping[str, Any]]:
323        return self._slice
325    def __hash__(self) -> int:
326        return self._hash
328    def stream_name(self) -> str:
329        return
331    def __repr__(self) -> str:
332        return f"StreamPartition({}, {self._slice})"
335class StreamPartitionGenerator(PartitionGenerator):
336    """
337    This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
339    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
340    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
341    """
343    def __init__(
344        self,
345        stream: Stream,
346        message_repository: MessageRepository,
347        sync_mode: SyncMode,
348        cursor_field: Optional[List[str]],
349        state: Optional[MutableMapping[str, Any]],
350    ):
351        """
352        :param stream: The stream to delegate to
353        :param message_repository: The message repository to use to emit non-record messages
354        """
355        self.message_repository = message_repository
356        self._stream = stream
357        self._sync_mode = sync_mode
358        self._cursor_field = cursor_field
359        self._state = state
361    def generate(self) -> Iterable[Partition]:
362        for s in self._stream.stream_slices(
363            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
364        ):
365            yield StreamPartition(
366                self._stream,
367                copy.deepcopy(s),
368                self.message_repository,
369                self._sync_mode,
370                self._cursor_field,
371                self._state,
372            )
376    "Availability strategy has been soft deprecated. Do not use. Class is subject to removal",
377    category=ExperimentalClassWarning,
379class AvailabilityStrategyFacade(AvailabilityStrategy):
380    def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
381        self._abstract_availability_strategy = abstract_availability_strategy
383    def check_availability(
384        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
385    ) -> Tuple[bool, Optional[str]]:
386        """
387        Checks stream availability.
389        Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.
391        :param stream: (unused)
392        :param logger: logger object to use
393        :param source: (unused)
394        :return: A tuple of (boolean, str). If boolean is true, then the stream
395        """
396        stream_availability = self._abstract_availability_strategy.check_availability(logger)
397        return stream_availability.is_available(), stream_availability.message()
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
class StreamFacade(airbyte_cdk.sources.streams.concurrent.abstract_stream_facade.AbstractStreamFacade[airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream], airbyte_cdk.sources.streams.core.Stream):
 55    "This class is experimental. Use at your own risk.",
 56    category=ExperimentalClassWarning,
 58class StreamFacade(AbstractStreamFacade[DefaultStream], Stream):
 59    """
 60    The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
 62    All methods either delegate to the wrapped AbstractStream or provide a default implementation.
 63    The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
 64    """
 66    @classmethod
 67    def create_from_stream(
 68        cls,
 69        stream: Stream,
 70        source: AbstractSource,
 71        logger: logging.Logger,
 72        state: Optional[MutableMapping[str, Any]],
 73        cursor: Cursor,
 74    ) -> Stream:
 75        """
 76        Create a ConcurrentStream from a Stream object.
 77        :param source: The source
 78        :param stream: The stream
 79        :param max_workers: The maximum number of worker thread to use
 80        :return:
 81        """
 82        pk = get_primary_key_from_stream(stream.primary_key)
 83        cursor_field = get_cursor_field_from_stream(stream)
 85        if not source.message_repository:
 86            raise ValueError(
 87                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 88            )
 90        message_repository = source.message_repository
 91        return StreamFacade(
 92            DefaultStream(
 93                partition_generator=StreamPartitionGenerator(
 94                    stream,
 95                    message_repository,
 96                    SyncMode.full_refresh
 97                    if isinstance(cursor, FinalStateCursor)
 98                    else SyncMode.incremental,
 99                    [cursor_field] if cursor_field is not None else None,
100                    state,
101                ),
102      ,
103                namespace=stream.namespace,
104                json_schema=stream.get_json_schema(),
105                availability_strategy=AlwaysAvailableAvailabilityStrategy(),
106                primary_key=pk,
107                cursor_field=cursor_field,
108                logger=logger,
109                cursor=cursor,
110            ),
111            stream,
112            cursor,
113            slice_logger=source._slice_logger,
114            logger=logger,
115        )
117    @property
118    def state(self) -> MutableMapping[str, Any]:
119        raise NotImplementedError(
120            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
121        )
123    @state.setter
124    def state(self, value: Mapping[str, Any]) -> None:
125        if "state" in dir(self._legacy_stream):
126            self._legacy_stream.state = value  # type: ignore  # validating `state` is attribute of stream using `if` above
128    def __init__(
129        self,
130        stream: DefaultStream,
131        legacy_stream: Stream,
132        cursor: Cursor,
133        slice_logger: SliceLogger,
134        logger: logging.Logger,
135    ):
136        """
137        :param stream: The underlying AbstractStream
138        """
139        self._abstract_stream = stream
140        self._legacy_stream = legacy_stream
141        self._cursor = cursor
142        self._slice_logger = slice_logger
143        self._logger = logger
145    def read(
146        self,
147        configured_stream: ConfiguredAirbyteStream,
148        logger: logging.Logger,
149        slice_logger: SliceLogger,
150        stream_state: MutableMapping[str, Any],
151        state_manager: ConnectorStateManager,
152        internal_config: InternalConfig,
153    ) -> Iterable[StreamData]:
154        yield from self._read_records()
156    def read_records(
157        self,
158        sync_mode: SyncMode,
159        cursor_field: Optional[List[str]] = None,
160        stream_slice: Optional[Mapping[str, Any]] = None,
161        stream_state: Optional[Mapping[str, Any]] = None,
162    ) -> Iterable[StreamData]:
163        try:
164            yield from self._read_records()
165        except Exception as exc:
166            if hasattr(self._cursor, "state"):
167                state = str(self._cursor.state)
168            else:
169                # This shouldn't happen if the ConcurrentCursor was used
170                state = "unknown; no state attribute was available on the cursor"
171            yield AirbyteMessage(
172                type=Type.LOG,
173                log=AirbyteLogMessage(
174                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
175                ),
176            )
177            raise exc
179    def _read_records(self) -> Iterable[StreamData]:
180        for partition in self._abstract_stream.generate_partitions():
181            if self._slice_logger.should_log_slice_message(self._logger):
182                yield self._slice_logger.create_slice_log_message(partition.to_slice())
183            for record in
184                yield
186    @property
187    def name(self) -> str:
188        return
190    @property
191    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
192        # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface
193        return self.as_airbyte_stream().source_defined_primary_key  # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]]
195    @property
196    def cursor_field(self) -> Union[str, List[str]]:
197        if self._abstract_stream.cursor_field is None:
198            return []
199        else:
200            return self._abstract_stream.cursor_field
202    @property
203    def cursor(self) -> Optional[Cursor]:  # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
204        return self._cursor
206    @lru_cache(maxsize=None)
207    def get_json_schema(self) -> Mapping[str, Any]:
208        return self._abstract_stream.get_json_schema()
210    @property
211    def supports_incremental(self) -> bool:
212        return self._legacy_stream.supports_incremental
214    def check_availability(
215        self, logger: logging.Logger, source: Optional["Source"] = None
216    ) -> Tuple[bool, Optional[str]]:
217        """
218        Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
219        :param logger: (ignored)
220        :param source:  (ignored)
221        :return:
222        """
223        availability = self._abstract_stream.check_availability()
224        return availability.is_available(), availability.message()
226    def as_airbyte_stream(self) -> AirbyteStream:
227        return self._abstract_stream.as_airbyte_stream()
229    def log_stream_sync_configuration(self) -> None:
230        self._abstract_stream.log_stream_sync_configuration()
232    def get_underlying_stream(self) -> DefaultStream:
233        return self._abstract_stream

The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.

All methods either delegate to the wrapped AbstractStream or provide a default implementation. The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.

StreamFacade( stream: airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream, legacy_stream: airbyte_cdk.Stream, cursor: airbyte_cdk.Cursor, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, logger: logging.Logger)
128    def __init__(
129        self,
130        stream: DefaultStream,
131        legacy_stream: Stream,
132        cursor: Cursor,
133        slice_logger: SliceLogger,
134        logger: logging.Logger,
135    ):
136        """
137        :param stream: The underlying AbstractStream
138        """
139        self._abstract_stream = stream
140        self._legacy_stream = legacy_stream
141        self._cursor = cursor
142        self._slice_logger = slice_logger
143        self._logger = logger
  • stream: The underlying AbstractStream
def create_from_stream( cls, stream: airbyte_cdk.Stream, source: airbyte_cdk.AbstractSource, logger: logging.Logger, state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.Cursor) -> airbyte_cdk.Stream:
 66    @classmethod
 67    def create_from_stream(
 68        cls,
 69        stream: Stream,
 70        source: AbstractSource,
 71        logger: logging.Logger,
 72        state: Optional[MutableMapping[str, Any]],
 73        cursor: Cursor,
 74    ) -> Stream:
 75        """
 76        Create a ConcurrentStream from a Stream object.
 77        :param source: The source
 78        :param stream: The stream
 79        :param max_workers: The maximum number of worker thread to use
 80        :return:
 81        """
 82        pk = get_primary_key_from_stream(stream.primary_key)
 83        cursor_field = get_cursor_field_from_stream(stream)
 85        if not source.message_repository:
 86            raise ValueError(
 87                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 88            )
 90        message_repository = source.message_repository
 91        return StreamFacade(
 92            DefaultStream(
 93                partition_generator=StreamPartitionGenerator(
 94                    stream,
 95                    message_repository,
 96                    SyncMode.full_refresh
 97                    if isinstance(cursor, FinalStateCursor)
 98                    else SyncMode.incremental,
 99                    [cursor_field] if cursor_field is not None else None,
100                    state,
101                ),
102      ,
103                namespace=stream.namespace,
104                json_schema=stream.get_json_schema(),
105                availability_strategy=AlwaysAvailableAvailabilityStrategy(),
106                primary_key=pk,
107                cursor_field=cursor_field,
108                logger=logger,
109                cursor=cursor,
110            ),
111            stream,
112            cursor,
113            slice_logger=source._slice_logger,
114            logger=logger,
115        )

Create a ConcurrentStream from a Stream object.

  • source: The source
  • stream: The stream
  • max_workers: The maximum number of worker thread to use
state: MutableMapping[str, Any]
117    @property
118    def state(self) -> MutableMapping[str, Any]:
119        raise NotImplementedError(
120            "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte"
121        )
def check_availability( self, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
214    def check_availability(
215        self, logger: logging.Logger, source: Optional["Source"] = None
216    ) -> Tuple[bool, Optional[str]]:
217        """
218        Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
219        :param logger: (ignored)
220        :param source:  (ignored)
221        :return:
222        """
223        availability = self._abstract_stream.check_availability()
224        return availability.is_available(), availability.message()

Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters

  • logger: (ignored)
  • source: (ignored)
def get_underlying_stream( self) -> airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream:
232    def get_underlying_stream(self) -> DefaultStream:
233        return self._abstract_stream

Return the underlying stream facade object.

class SliceEncoder(json.encoder.JSONEncoder):
236class SliceEncoder(json.JSONEncoder):
237    def default(self, obj: Any) -> Any:
238        if hasattr(obj, "__json_serializable__"):
239            return obj.__json_serializable__()
241        # Let the base class default method raise the TypeError
242        return super().default(obj)

Extensible JSON encoder for Python data structures.

Supports the following objects and types by default:

+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+

To extend this to recognize other objects, subclass and implement a .default() method with another method that returns a serializable object for o if possible, otherwise it should call the superclass implementation (to raise TypeError).

def default(self, obj: Any) -> Any:
237    def default(self, obj: Any) -> Any:
238        if hasattr(obj, "__json_serializable__"):
239            return obj.__json_serializable__()
241        # Let the base class default method raise the TypeError
242        return super().default(obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
        iterable = iter(o)
    except TypeError:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
245class StreamPartition(Partition):
246    """
247    This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
249    StreamPartitions are instantiated from a Stream and a stream_slice.
251    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
252    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
253    """
255    def __init__(
256        self,
257        stream: Stream,
258        _slice: Optional[Mapping[str, Any]],
259        message_repository: MessageRepository,
260        sync_mode: SyncMode,
261        cursor_field: Optional[List[str]],
262        state: Optional[MutableMapping[str, Any]],
263    ):
264        """
265        :param stream: The stream to delegate to
266        :param _slice: The partition's stream_slice
267        :param message_repository: The message repository to use to emit non-record messages
268        """
269        self._stream = stream
270        self._slice = _slice
271        self._message_repository = message_repository
272        self._sync_mode = sync_mode
273        self._cursor_field = cursor_field
274        self._state = state
275        self._hash = SliceHasher.hash(, self._slice)
277    def read(self) -> Iterable[Record]:
278        """
279        Read messages from the stream.
280        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
281        Otherwise, the message will be emitted on the message repository.
282        """
283        try:
284            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
285            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
286            #  * fetch_next_page
287            #  * parse_response
288            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
289            #  `if not stream_state` to know if it calls the Event stream or not
290            for record_data in self._stream.read_records(
291                cursor_field=self._cursor_field,
292                sync_mode=SyncMode.full_refresh,
293                stream_slice=copy.deepcopy(self._slice),
294                stream_state=self._state,
295            ):
296                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
297                # For now, file-based connectors have their own stream facade
298                if isinstance(record_data, Mapping):
299                    data_to_return = dict(record_data)
300                    self._stream.transformer.transform(
301                        data_to_return, self._stream.get_json_schema()
302                    )
303                    yield Record(
304                        data=data_to_return,
305                        stream_name=self.stream_name(),
306                        associated_slice=self._slice,  # type: ignore [arg-type]
307                    )
308                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
309                    yield Record(
310               or {},
311                        stream_name=self.stream_name(),
312                        associated_slice=self._slice,  # type: ignore [arg-type]
313                    )
314                else:
315                    self._message_repository.emit_message(record_data)
316        except Exception as e:
317            display_message = self._stream.get_error_display_message(e)
318            if display_message:
319                raise ExceptionWithDisplayMessage(display_message) from e
320            else:
321                raise e
323    def to_slice(self) -> Optional[Mapping[str, Any]]:
324        return self._slice
326    def __hash__(self) -> int:
327        return self._hash
329    def stream_name(self) -> str:
330        return
332    def __repr__(self) -> str:
333        return f"StreamPartition({}, {self._slice})"

This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface

StreamPartitions are instantiated from a Stream and a stream_slice.

This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.

StreamPartition( stream: airbyte_cdk.Stream, _slice: Optional[Mapping[str, Any]], message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
255    def __init__(
256        self,
257        stream: Stream,
258        _slice: Optional[Mapping[str, Any]],
259        message_repository: MessageRepository,
260        sync_mode: SyncMode,
261        cursor_field: Optional[List[str]],
262        state: Optional[MutableMapping[str, Any]],
263    ):
264        """
265        :param stream: The stream to delegate to
266        :param _slice: The partition's stream_slice
267        :param message_repository: The message repository to use to emit non-record messages
268        """
269        self._stream = stream
270        self._slice = _slice
271        self._message_repository = message_repository
272        self._sync_mode = sync_mode
273        self._cursor_field = cursor_field
274        self._state = state
275        self._hash = SliceHasher.hash(, self._slice)
  • stream: The stream to delegate to
  • _slice: The partition's stream_slice
  • message_repository: The message repository to use to emit non-record messages
def read(self) -> Iterable[airbyte_cdk.Record]:
277    def read(self) -> Iterable[Record]:
278        """
279        Read messages from the stream.
280        If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record.
281        Otherwise, the message will be emitted on the message repository.
282        """
283        try:
284            # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice
285            #  by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to:
286            #  * fetch_next_page
287            #  * parse_response
288            #  Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do
289            #  `if not stream_state` to know if it calls the Event stream or not
290            for record_data in self._stream.read_records(
291                cursor_field=self._cursor_field,
292                sync_mode=SyncMode.full_refresh,
293                stream_slice=copy.deepcopy(self._slice),
294                stream_state=self._state,
295            ):
296                # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade
297                # For now, file-based connectors have their own stream facade
298                if isinstance(record_data, Mapping):
299                    data_to_return = dict(record_data)
300                    self._stream.transformer.transform(
301                        data_to_return, self._stream.get_json_schema()
302                    )
303                    yield Record(
304                        data=data_to_return,
305                        stream_name=self.stream_name(),
306                        associated_slice=self._slice,  # type: ignore [arg-type]
307                    )
308                elif isinstance(record_data, AirbyteMessage) and record_data.record is not None:
309                    yield Record(
310               or {},
311                        stream_name=self.stream_name(),
312                        associated_slice=self._slice,  # type: ignore [arg-type]
313                    )
314                else:
315                    self._message_repository.emit_message(record_data)
316        except Exception as e:
317            display_message = self._stream.get_error_display_message(e)
318            if display_message:
319                raise ExceptionWithDisplayMessage(display_message) from e
320            else:
321                raise e

Read messages from the stream. If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. Otherwise, the message will be emitted on the message repository.

def to_slice(self) -> Optional[Mapping[str, Any]]:
323    def to_slice(self) -> Optional[Mapping[str, Any]]:
324        return self._slice

Converts the partition to a slice that can be serialized and deserialized.

Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested values (example)


A mapping representing a slice

def stream_name(self) -> str:
329    def stream_name(self) -> str:
330        return

Returns the name of the stream that this partition is reading from.


The name of the stream.

336class StreamPartitionGenerator(PartitionGenerator):
337    """
338    This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
340    This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream.
341    In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
342    """
344    def __init__(
345        self,
346        stream: Stream,
347        message_repository: MessageRepository,
348        sync_mode: SyncMode,
349        cursor_field: Optional[List[str]],
350        state: Optional[MutableMapping[str, Any]],
351    ):
352        """
353        :param stream: The stream to delegate to
354        :param message_repository: The message repository to use to emit non-record messages
355        """
356        self.message_repository = message_repository
357        self._stream = stream
358        self._sync_mode = sync_mode
359        self._cursor_field = cursor_field
360        self._state = state
362    def generate(self) -> Iterable[Partition]:
363        for s in self._stream.stream_slices(
364            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
365        ):
366            yield StreamPartition(
367                self._stream,
368                copy.deepcopy(s),
369                self.message_repository,
370                self._sync_mode,
371                self._cursor_field,
372                self._state,
373            )

This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices

This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.

StreamPartitionGenerator( stream: airbyte_cdk.Stream, message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
344    def __init__(
345        self,
346        stream: Stream,
347        message_repository: MessageRepository,
348        sync_mode: SyncMode,
349        cursor_field: Optional[List[str]],
350        state: Optional[MutableMapping[str, Any]],
351    ):
352        """
353        :param stream: The stream to delegate to
354        :param message_repository: The message repository to use to emit non-record messages
355        """
356        self.message_repository = message_repository
357        self._stream = stream
358        self._sync_mode = sync_mode
359        self._cursor_field = cursor_field
360        self._state = state
  • stream: The stream to delegate to
  • message_repository: The message repository to use to emit non-record messages
def generate( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
362    def generate(self) -> Iterable[Partition]:
363        for s in self._stream.stream_slices(
364            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
365        ):
366            yield StreamPartition(
367                self._stream,
368                copy.deepcopy(s),
369                self.message_repository,
370                self._sync_mode,
371                self._cursor_field,
372                self._state,
373            )

Generates partitions for a given sync mode.


An iterable of partitions

@deprecated('Availability strategy has been soft deprecated. Do not use. Class is subject to removal', category=ExperimentalClassWarning)
class AvailabilityStrategyFacade(airbyte_cdk.sources.streams.availability_strategy.AvailabilityStrategy):
377    "Availability strategy has been soft deprecated. Do not use. Class is subject to removal",
378    category=ExperimentalClassWarning,
380class AvailabilityStrategyFacade(AvailabilityStrategy):
381    def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
382        self._abstract_availability_strategy = abstract_availability_strategy
384    def check_availability(
385        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
386    ) -> Tuple[bool, Optional[str]]:
387        """
388        Checks stream availability.
390        Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.
392        :param stream: (unused)
393        :param logger: logger object to use
394        :param source: (unused)
395        :return: A tuple of (boolean, str). If boolean is true, then the stream
396        """
397        stream_availability = self._abstract_availability_strategy.check_availability(logger)
398        return stream_availability.is_available(), stream_availability.message()

Abstract base class for checking stream availability.

AvailabilityStrategyFacade( abstract_availability_strategy: airbyte_cdk.sources.streams.concurrent.availability_strategy.AbstractAvailabilityStrategy)
381    def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
382        self._abstract_availability_strategy = abstract_availability_strategy
def check_availability( self, stream: airbyte_cdk.Stream, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
384    def check_availability(
385        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
386    ) -> Tuple[bool, Optional[str]]:
387        """
388        Checks stream availability.
390        Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.
392        :param stream: (unused)
393        :param logger: logger object to use
394        :param source: (unused)
395        :return: A tuple of (boolean, str). If boolean is true, then the stream
396        """
397        stream_availability = self._abstract_availability_strategy.check_availability(logger)
398        return stream_availability.is_available(), stream_availability.message()

Checks stream availability.

Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.

  • stream: (unused)
  • logger: logger object to use
  • source: (unused)

A tuple of (boolean, str). If boolean is true, then the stream