airbyte_cdk.sources.streams.availability_strategy

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5import logging
 6import typing
 7from abc import ABC, abstractmethod
 8from typing import Any, Mapping, Optional, Tuple
 9
10from airbyte_cdk.models import SyncMode
11from airbyte_cdk.sources.streams.core import Stream, StreamData
12
13if typing.TYPE_CHECKING:
14    from airbyte_cdk.sources import Source
15
16
17# FIXME this
18class AvailabilityStrategy(ABC):
19    """
20    Abstract base class for checking stream availability.
21    """
22
23    @abstractmethod
24    def check_availability(
25        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
26    ) -> Tuple[bool, Optional[str]]:
27        """
28        Checks stream availability.
29
30        :param stream: stream
31        :param logger: source logger
32        :param source: (optional) source
33        :return: A tuple of (boolean, str). If boolean is true, then the stream
34          is available, and no str is required. Otherwise, the stream is unavailable
35          for some reason and the str should describe what went wrong and how to
36          resolve the unavailability, if possible.
37        """
38
39    @staticmethod
40    def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
41        """
42        Gets the first stream_slice from a given stream's stream_slices.
43        :param stream: stream
44        :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty)
45        :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice)
46        """
47        # We wrap the return output of stream_slices() because some implementations return types that are iterable,
48        # but not iterators such as lists or tuples
49        slices = iter(
50            stream.stream_slices(
51                cursor_field=stream.cursor_field,  # type: ignore[arg-type]
52                sync_mode=SyncMode.full_refresh,
53            )
54        )
55        return next(slices)
56
57    @staticmethod
58    def get_first_record_for_slice(
59        stream: Stream, stream_slice: Optional[Mapping[str, Any]]
60    ) -> StreamData:
61        """
62        Gets the first record for a stream_slice of a stream.
63
64        :param stream: stream instance from which to read records
65        :param stream_slice: stream_slice parameters for slicing the stream
66        :raises StopIteration: if there is no first record to return (the read_records generator is empty)
67        :return: StreamData containing the first record in the slice
68        """
69        # Store the original value of exit_on_rate_limit
70        original_exit_on_rate_limit = stream.exit_on_rate_limit
71
72        try:
73            # Ensure exit_on_rate_limit is safely set to True if possible
74            stream.exit_on_rate_limit = True
75
76            # We wrap the return output of read_records() because some implementations return types that are iterable,
77            # but not iterators such as lists or tuples
78            records_for_slice = iter(
79                stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
80            )
81
82            return next(records_for_slice)
83        finally:
84            # Restore the original exit_on_rate_limit value
85            stream.exit_on_rate_limit = original_exit_on_rate_limit
class AvailabilityStrategy(abc.ABC):
19class AvailabilityStrategy(ABC):
20    """
21    Abstract base class for checking stream availability.
22    """
23
24    @abstractmethod
25    def check_availability(
26        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
27    ) -> Tuple[bool, Optional[str]]:
28        """
29        Checks stream availability.
30
31        :param stream: stream
32        :param logger: source logger
33        :param source: (optional) source
34        :return: A tuple of (boolean, str). If boolean is true, then the stream
35          is available, and no str is required. Otherwise, the stream is unavailable
36          for some reason and the str should describe what went wrong and how to
37          resolve the unavailability, if possible.
38        """
39
40    @staticmethod
41    def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
42        """
43        Gets the first stream_slice from a given stream's stream_slices.
44        :param stream: stream
45        :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty)
46        :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice)
47        """
48        # We wrap the return output of stream_slices() because some implementations return types that are iterable,
49        # but not iterators such as lists or tuples
50        slices = iter(
51            stream.stream_slices(
52                cursor_field=stream.cursor_field,  # type: ignore[arg-type]
53                sync_mode=SyncMode.full_refresh,
54            )
55        )
56        return next(slices)
57
58    @staticmethod
59    def get_first_record_for_slice(
60        stream: Stream, stream_slice: Optional[Mapping[str, Any]]
61    ) -> StreamData:
62        """
63        Gets the first record for a stream_slice of a stream.
64
65        :param stream: stream instance from which to read records
66        :param stream_slice: stream_slice parameters for slicing the stream
67        :raises StopIteration: if there is no first record to return (the read_records generator is empty)
68        :return: StreamData containing the first record in the slice
69        """
70        # Store the original value of exit_on_rate_limit
71        original_exit_on_rate_limit = stream.exit_on_rate_limit
72
73        try:
74            # Ensure exit_on_rate_limit is safely set to True if possible
75            stream.exit_on_rate_limit = True
76
77            # We wrap the return output of read_records() because some implementations return types that are iterable,
78            # but not iterators such as lists or tuples
79            records_for_slice = iter(
80                stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
81            )
82
83            return next(records_for_slice)
84        finally:
85            # Restore the original exit_on_rate_limit value
86            stream.exit_on_rate_limit = original_exit_on_rate_limit

Abstract base class for checking stream availability.

@abstractmethod
def check_availability( self, stream: airbyte_cdk.Stream, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
24    @abstractmethod
25    def check_availability(
26        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
27    ) -> Tuple[bool, Optional[str]]:
28        """
29        Checks stream availability.
30
31        :param stream: stream
32        :param logger: source logger
33        :param source: (optional) source
34        :return: A tuple of (boolean, str). If boolean is true, then the stream
35          is available, and no str is required. Otherwise, the stream is unavailable
36          for some reason and the str should describe what went wrong and how to
37          resolve the unavailability, if possible.
38        """

Checks stream availability.

Parameters
  • stream: stream
  • logger: source logger
  • source: (optional) source
Returns

A tuple of (boolean, str). If boolean is true, then the stream is available, and no str is required. Otherwise, the stream is unavailable for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible.

@staticmethod
def get_first_stream_slice( stream: airbyte_cdk.Stream) -> Optional[Mapping[str, Any]]:
40    @staticmethod
41    def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
42        """
43        Gets the first stream_slice from a given stream's stream_slices.
44        :param stream: stream
45        :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty)
46        :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice)
47        """
48        # We wrap the return output of stream_slices() because some implementations return types that are iterable,
49        # but not iterators such as lists or tuples
50        slices = iter(
51            stream.stream_slices(
52                cursor_field=stream.cursor_field,  # type: ignore[arg-type]
53                sync_mode=SyncMode.full_refresh,
54            )
55        )
56        return next(slices)

Gets the first stream_slice from a given stream's stream_slices.

Parameters
  • stream: stream
Raises
  • StopIteration: if there is no first slice to return (the stream_slices generator is empty)
Returns

first stream slice from 'stream_slices' generator (None is a valid stream slice)

@staticmethod
def get_first_record_for_slice( stream: airbyte_cdk.Stream, stream_slice: Optional[Mapping[str, Any]]) -> Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]:
58    @staticmethod
59    def get_first_record_for_slice(
60        stream: Stream, stream_slice: Optional[Mapping[str, Any]]
61    ) -> StreamData:
62        """
63        Gets the first record for a stream_slice of a stream.
64
65        :param stream: stream instance from which to read records
66        :param stream_slice: stream_slice parameters for slicing the stream
67        :raises StopIteration: if there is no first record to return (the read_records generator is empty)
68        :return: StreamData containing the first record in the slice
69        """
70        # Store the original value of exit_on_rate_limit
71        original_exit_on_rate_limit = stream.exit_on_rate_limit
72
73        try:
74            # Ensure exit_on_rate_limit is safely set to True if possible
75            stream.exit_on_rate_limit = True
76
77            # We wrap the return output of read_records() because some implementations return types that are iterable,
78            # but not iterators such as lists or tuples
79            records_for_slice = iter(
80                stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
81            )
82
83            return next(records_for_slice)
84        finally:
85            # Restore the original exit_on_rate_limit value
86            stream.exit_on_rate_limit = original_exit_on_rate_limit

Gets the first record for a stream_slice of a stream.

Parameters
  • stream: stream instance from which to read records
  • stream_slice: stream_slice parameters for slicing the stream
Raises
  • StopIteration: if there is no first record to return (the read_records generator is empty)
Returns

StreamData containing the first record in the slice