airbyte_cdk.sources.streams.availability_strategy

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5import logging
 6import typing
 7from abc import ABC, abstractmethod
 8from typing import Any, Mapping, Optional, Tuple
 9
10from airbyte_cdk.models import SyncMode
11from airbyte_cdk.sources.streams.core import Stream, StreamData
12
13if typing.TYPE_CHECKING:
14    from airbyte_cdk.sources import Source
15
16
17class AvailabilityStrategy(ABC):
18    """
19    Abstract base class for checking stream availability.
20    """
21
22    @abstractmethod
23    def check_availability(
24        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
25    ) -> Tuple[bool, Optional[str]]:
26        """
27        Checks stream availability.
28
29        :param stream: stream
30        :param logger: source logger
31        :param source: (optional) source
32        :return: A tuple of (boolean, str). If boolean is true, then the stream
33          is available, and no str is required. Otherwise, the stream is unavailable
34          for some reason and the str should describe what went wrong and how to
35          resolve the unavailability, if possible.
36        """
37
38    @staticmethod
39    def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
40        """
41        Gets the first stream_slice from a given stream's stream_slices.
42        :param stream: stream
43        :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty)
44        :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice)
45        """
46        # We wrap the return output of stream_slices() because some implementations return types that are iterable,
47        # but not iterators such as lists or tuples
48        slices = iter(
49            stream.stream_slices(
50                cursor_field=stream.cursor_field,  # type: ignore[arg-type]
51                sync_mode=SyncMode.full_refresh,
52            )
53        )
54        return next(slices)
55
56    @staticmethod
57    def get_first_record_for_slice(
58        stream: Stream, stream_slice: Optional[Mapping[str, Any]]
59    ) -> StreamData:
60        """
61        Gets the first record for a stream_slice of a stream.
62
63        :param stream: stream instance from which to read records
64        :param stream_slice: stream_slice parameters for slicing the stream
65        :raises StopIteration: if there is no first record to return (the read_records generator is empty)
66        :return: StreamData containing the first record in the slice
67        """
68        # Store the original value of exit_on_rate_limit
69        original_exit_on_rate_limit = stream.exit_on_rate_limit
70
71        try:
72            # Ensure exit_on_rate_limit is safely set to True if possible
73            stream.exit_on_rate_limit = True
74
75            # We wrap the return output of read_records() because some implementations return types that are iterable,
76            # but not iterators such as lists or tuples
77            records_for_slice = iter(
78                stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
79            )
80
81            return next(records_for_slice)
82        finally:
83            # Restore the original exit_on_rate_limit value
84            stream.exit_on_rate_limit = original_exit_on_rate_limit
class AvailabilityStrategy(abc.ABC):
18class AvailabilityStrategy(ABC):
19    """
20    Abstract base class for checking stream availability.
21    """
22
23    @abstractmethod
24    def check_availability(
25        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
26    ) -> Tuple[bool, Optional[str]]:
27        """
28        Checks stream availability.
29
30        :param stream: stream
31        :param logger: source logger
32        :param source: (optional) source
33        :return: A tuple of (boolean, str). If boolean is true, then the stream
34          is available, and no str is required. Otherwise, the stream is unavailable
35          for some reason and the str should describe what went wrong and how to
36          resolve the unavailability, if possible.
37        """
38
39    @staticmethod
40    def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
41        """
42        Gets the first stream_slice from a given stream's stream_slices.
43        :param stream: stream
44        :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty)
45        :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice)
46        """
47        # We wrap the return output of stream_slices() because some implementations return types that are iterable,
48        # but not iterators such as lists or tuples
49        slices = iter(
50            stream.stream_slices(
51                cursor_field=stream.cursor_field,  # type: ignore[arg-type]
52                sync_mode=SyncMode.full_refresh,
53            )
54        )
55        return next(slices)
56
57    @staticmethod
58    def get_first_record_for_slice(
59        stream: Stream, stream_slice: Optional[Mapping[str, Any]]
60    ) -> StreamData:
61        """
62        Gets the first record for a stream_slice of a stream.
63
64        :param stream: stream instance from which to read records
65        :param stream_slice: stream_slice parameters for slicing the stream
66        :raises StopIteration: if there is no first record to return (the read_records generator is empty)
67        :return: StreamData containing the first record in the slice
68        """
69        # Store the original value of exit_on_rate_limit
70        original_exit_on_rate_limit = stream.exit_on_rate_limit
71
72        try:
73            # Ensure exit_on_rate_limit is safely set to True if possible
74            stream.exit_on_rate_limit = True
75
76            # We wrap the return output of read_records() because some implementations return types that are iterable,
77            # but not iterators such as lists or tuples
78            records_for_slice = iter(
79                stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
80            )
81
82            return next(records_for_slice)
83        finally:
84            # Restore the original exit_on_rate_limit value
85            stream.exit_on_rate_limit = original_exit_on_rate_limit

Abstract base class for checking stream availability.

@abstractmethod
def check_availability( self, stream: airbyte_cdk.Stream, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
23    @abstractmethod
24    def check_availability(
25        self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
26    ) -> Tuple[bool, Optional[str]]:
27        """
28        Checks stream availability.
29
30        :param stream: stream
31        :param logger: source logger
32        :param source: (optional) source
33        :return: A tuple of (boolean, str). If boolean is true, then the stream
34          is available, and no str is required. Otherwise, the stream is unavailable
35          for some reason and the str should describe what went wrong and how to
36          resolve the unavailability, if possible.
37        """

Checks stream availability.

Parameters
  • stream: stream
  • logger: source logger
  • source: (optional) source
Returns

A tuple of (boolean, str). If boolean is true, then the stream is available, and no str is required. Otherwise, the stream is unavailable for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible.

@staticmethod
def get_first_stream_slice( stream: airbyte_cdk.Stream) -> Optional[Mapping[str, Any]]:
39    @staticmethod
40    def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
41        """
42        Gets the first stream_slice from a given stream's stream_slices.
43        :param stream: stream
44        :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty)
45        :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice)
46        """
47        # We wrap the return output of stream_slices() because some implementations return types that are iterable,
48        # but not iterators such as lists or tuples
49        slices = iter(
50            stream.stream_slices(
51                cursor_field=stream.cursor_field,  # type: ignore[arg-type]
52                sync_mode=SyncMode.full_refresh,
53            )
54        )
55        return next(slices)

Gets the first stream_slice from a given stream's stream_slices.

Parameters
  • stream: stream
Raises
  • StopIteration: if there is no first slice to return (the stream_slices generator is empty)
Returns

first stream slice from 'stream_slices' generator (None is a valid stream slice)

@staticmethod
def get_first_record_for_slice( stream: airbyte_cdk.Stream, stream_slice: Optional[Mapping[str, Any]]) -> Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]:
57    @staticmethod
58    def get_first_record_for_slice(
59        stream: Stream, stream_slice: Optional[Mapping[str, Any]]
60    ) -> StreamData:
61        """
62        Gets the first record for a stream_slice of a stream.
63
64        :param stream: stream instance from which to read records
65        :param stream_slice: stream_slice parameters for slicing the stream
66        :raises StopIteration: if there is no first record to return (the read_records generator is empty)
67        :return: StreamData containing the first record in the slice
68        """
69        # Store the original value of exit_on_rate_limit
70        original_exit_on_rate_limit = stream.exit_on_rate_limit
71
72        try:
73            # Ensure exit_on_rate_limit is safely set to True if possible
74            stream.exit_on_rate_limit = True
75
76            # We wrap the return output of read_records() because some implementations return types that are iterable,
77            # but not iterators such as lists or tuples
78            records_for_slice = iter(
79                stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
80            )
81
82            return next(records_for_slice)
83        finally:
84            # Restore the original exit_on_rate_limit value
85            stream.exit_on_rate_limit = original_exit_on_rate_limit

Gets the first record for a stream_slice of a stream.

Parameters
  • stream: stream instance from which to read records
  • stream_slice: stream_slice parameters for slicing the stream
Raises
  • StopIteration: if there is no first record to return (the read_records generator is empty)
Returns

StreamData containing the first record in the slice