airbyte_cdk.sources.streams.concurrent.abstract_stream_facade

 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2
 3from abc import ABC, abstractmethod
 4from typing import Generic, Optional, TypeVar
 5
 6from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
 7
 8StreamType = TypeVar("StreamType")
 9
10
11class AbstractStreamFacade(Generic[StreamType], ABC):
12    @abstractmethod
13    def get_underlying_stream(self) -> StreamType:
14        """
15        Return the underlying stream facade object.
16        """
17        ...
18
19    @property
20    def source_defined_cursor(self) -> bool:
21        # Streams must be aware of their cursor at instantiation time
22        return True
23
24    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
25        """
26        Retrieves the user-friendly display message that corresponds to an exception.
27        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
28
29        A display message will be returned if the exception is an instance of ExceptionWithDisplayMessage.
30
31        :param exception: The exception that was raised
32        :return: A user-friendly message that indicates the cause of the error
33        """
34        if isinstance(exception, ExceptionWithDisplayMessage):
35            return exception.display_message
36        else:
37            return None
class AbstractStreamFacade(typing.Generic[~StreamType], abc.ABC):
12class AbstractStreamFacade(Generic[StreamType], ABC):
13    @abstractmethod
14    def get_underlying_stream(self) -> StreamType:
15        """
16        Return the underlying stream facade object.
17        """
18        ...
19
20    @property
21    def source_defined_cursor(self) -> bool:
22        # Streams must be aware of their cursor at instantiation time
23        return True
24
25    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
26        """
27        Retrieves the user-friendly display message that corresponds to an exception.
28        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
29
30        A display message will be returned if the exception is an instance of ExceptionWithDisplayMessage.
31
32        :param exception: The exception that was raised
33        :return: A user-friendly message that indicates the cause of the error
34        """
35        if isinstance(exception, ExceptionWithDisplayMessage):
36            return exception.display_message
37        else:
38            return None

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

@abstractmethod
def get_underlying_stream(self) -> ~StreamType:
13    @abstractmethod
14    def get_underlying_stream(self) -> StreamType:
15        """
16        Return the underlying stream facade object.
17        """
18        ...

Return the underlying stream facade object.

source_defined_cursor: bool
20    @property
21    def source_defined_cursor(self) -> bool:
22        # Streams must be aware of their cursor at instantiation time
23        return True
def get_error_display_message(self, exception: BaseException) -> Optional[str]:
25    def get_error_display_message(self, exception: BaseException) -> Optional[str]:
26        """
27        Retrieves the user-friendly display message that corresponds to an exception.
28        This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.
29
30        A display message will be returned if the exception is an instance of ExceptionWithDisplayMessage.
31
32        :param exception: The exception that was raised
33        :return: A user-friendly message that indicates the cause of the error
34        """
35        if isinstance(exception, ExceptionWithDisplayMessage):
36            return exception.display_message
37        else:
38            return None

Retrieves the user-friendly display message that corresponds to an exception. This will be called when encountering an exception while reading records from the stream, and used to build the AirbyteTraceMessage.

A display message will be returned if the exception is an instance of ExceptionWithDisplayMessage.

Parameters
  • exception: The exception that was raised
Returns

A user-friendly message that indicates the cause of the error