airbyte_cdk.sources.streams.availability_strategy
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6import typing 7from abc import ABC, abstractmethod 8from typing import Any, Mapping, Optional, Tuple 9 10from airbyte_cdk.models import SyncMode 11from airbyte_cdk.sources.streams.core import Stream, StreamData 12 13if typing.TYPE_CHECKING: 14 from airbyte_cdk.sources import Source 15 16 17class AvailabilityStrategy(ABC): 18 """ 19 Abstract base class for checking stream availability. 20 """ 21 22 @abstractmethod 23 def check_availability( 24 self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None 25 ) -> Tuple[bool, Optional[str]]: 26 """ 27 Checks stream availability. 28 29 :param stream: stream 30 :param logger: source logger 31 :param source: (optional) source 32 :return: A tuple of (boolean, str). If boolean is true, then the stream 33 is available, and no str is required. Otherwise, the stream is unavailable 34 for some reason and the str should describe what went wrong and how to 35 resolve the unavailability, if possible. 36 """ 37 38 @staticmethod 39 def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: 40 """ 41 Gets the first stream_slice from a given stream's stream_slices. 42 :param stream: stream 43 :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) 44 :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) 45 """ 46 # We wrap the return output of stream_slices() because some implementations return types that are iterable, 47 # but not iterators such as lists or tuples 48 slices = iter( 49 stream.stream_slices( 50 cursor_field=stream.cursor_field, # type: ignore[arg-type] 51 sync_mode=SyncMode.full_refresh, 52 ) 53 ) 54 return next(slices) 55 56 @staticmethod 57 def get_first_record_for_slice( 58 stream: Stream, stream_slice: Optional[Mapping[str, Any]] 59 ) -> StreamData: 60 """ 61 Gets the first record for a stream_slice of a stream. 62 63 :param stream: stream instance from which to read records 64 :param stream_slice: stream_slice parameters for slicing the stream 65 :raises StopIteration: if there is no first record to return (the read_records generator is empty) 66 :return: StreamData containing the first record in the slice 67 """ 68 # Store the original value of exit_on_rate_limit 69 original_exit_on_rate_limit = stream.exit_on_rate_limit 70 71 try: 72 # Ensure exit_on_rate_limit is safely set to True if possible 73 stream.exit_on_rate_limit = True 74 75 # We wrap the return output of read_records() because some implementations return types that are iterable, 76 # but not iterators such as lists or tuples 77 records_for_slice = iter( 78 stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) 79 ) 80 81 return next(records_for_slice) 82 finally: 83 # Restore the original exit_on_rate_limit value 84 stream.exit_on_rate_limit = original_exit_on_rate_limit
class
AvailabilityStrategy(abc.ABC):
18class AvailabilityStrategy(ABC): 19 """ 20 Abstract base class for checking stream availability. 21 """ 22 23 @abstractmethod 24 def check_availability( 25 self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None 26 ) -> Tuple[bool, Optional[str]]: 27 """ 28 Checks stream availability. 29 30 :param stream: stream 31 :param logger: source logger 32 :param source: (optional) source 33 :return: A tuple of (boolean, str). If boolean is true, then the stream 34 is available, and no str is required. Otherwise, the stream is unavailable 35 for some reason and the str should describe what went wrong and how to 36 resolve the unavailability, if possible. 37 """ 38 39 @staticmethod 40 def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: 41 """ 42 Gets the first stream_slice from a given stream's stream_slices. 43 :param stream: stream 44 :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) 45 :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) 46 """ 47 # We wrap the return output of stream_slices() because some implementations return types that are iterable, 48 # but not iterators such as lists or tuples 49 slices = iter( 50 stream.stream_slices( 51 cursor_field=stream.cursor_field, # type: ignore[arg-type] 52 sync_mode=SyncMode.full_refresh, 53 ) 54 ) 55 return next(slices) 56 57 @staticmethod 58 def get_first_record_for_slice( 59 stream: Stream, stream_slice: Optional[Mapping[str, Any]] 60 ) -> StreamData: 61 """ 62 Gets the first record for a stream_slice of a stream. 63 64 :param stream: stream instance from which to read records 65 :param stream_slice: stream_slice parameters for slicing the stream 66 :raises StopIteration: if there is no first record to return (the read_records generator is empty) 67 :return: StreamData containing the first record in the slice 68 """ 69 # Store the original value of exit_on_rate_limit 70 original_exit_on_rate_limit = stream.exit_on_rate_limit 71 72 try: 73 # Ensure exit_on_rate_limit is safely set to True if possible 74 stream.exit_on_rate_limit = True 75 76 # We wrap the return output of read_records() because some implementations return types that are iterable, 77 # but not iterators such as lists or tuples 78 records_for_slice = iter( 79 stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) 80 ) 81 82 return next(records_for_slice) 83 finally: 84 # Restore the original exit_on_rate_limit value 85 stream.exit_on_rate_limit = original_exit_on_rate_limit
Abstract base class for checking stream availability.
@abstractmethod
def
check_availability( self, stream: airbyte_cdk.Stream, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
23 @abstractmethod 24 def check_availability( 25 self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None 26 ) -> Tuple[bool, Optional[str]]: 27 """ 28 Checks stream availability. 29 30 :param stream: stream 31 :param logger: source logger 32 :param source: (optional) source 33 :return: A tuple of (boolean, str). If boolean is true, then the stream 34 is available, and no str is required. Otherwise, the stream is unavailable 35 for some reason and the str should describe what went wrong and how to 36 resolve the unavailability, if possible. 37 """
Checks stream availability.
Parameters
- stream: stream
- logger: source logger
- source: (optional) source
Returns
A tuple of (boolean, str). If boolean is true, then the stream is available, and no str is required. Otherwise, the stream is unavailable for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible.
@staticmethod
def
get_first_stream_slice( stream: airbyte_cdk.Stream) -> Optional[Mapping[str, Any]]:
39 @staticmethod 40 def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: 41 """ 42 Gets the first stream_slice from a given stream's stream_slices. 43 :param stream: stream 44 :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) 45 :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) 46 """ 47 # We wrap the return output of stream_slices() because some implementations return types that are iterable, 48 # but not iterators such as lists or tuples 49 slices = iter( 50 stream.stream_slices( 51 cursor_field=stream.cursor_field, # type: ignore[arg-type] 52 sync_mode=SyncMode.full_refresh, 53 ) 54 ) 55 return next(slices)
Gets the first stream_slice from a given stream's stream_slices.
Parameters
- stream: stream
Raises
- StopIteration: if there is no first slice to return (the stream_slices generator is empty)
Returns
first stream slice from 'stream_slices' generator (
None
is a valid stream slice)
@staticmethod
def
get_first_record_for_slice( stream: airbyte_cdk.Stream, stream_slice: Optional[Mapping[str, Any]]) -> Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]:
57 @staticmethod 58 def get_first_record_for_slice( 59 stream: Stream, stream_slice: Optional[Mapping[str, Any]] 60 ) -> StreamData: 61 """ 62 Gets the first record for a stream_slice of a stream. 63 64 :param stream: stream instance from which to read records 65 :param stream_slice: stream_slice parameters for slicing the stream 66 :raises StopIteration: if there is no first record to return (the read_records generator is empty) 67 :return: StreamData containing the first record in the slice 68 """ 69 # Store the original value of exit_on_rate_limit 70 original_exit_on_rate_limit = stream.exit_on_rate_limit 71 72 try: 73 # Ensure exit_on_rate_limit is safely set to True if possible 74 stream.exit_on_rate_limit = True 75 76 # We wrap the return output of read_records() because some implementations return types that are iterable, 77 # but not iterators such as lists or tuples 78 records_for_slice = iter( 79 stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) 80 ) 81 82 return next(records_for_slice) 83 finally: 84 # Restore the original exit_on_rate_limit value 85 stream.exit_on_rate_limit = original_exit_on_rate_limit
Gets the first record for a stream_slice of a stream.
Parameters
- stream: stream instance from which to read records
- stream_slice: stream_slice parameters for slicing the stream
Raises
- StopIteration: if there is no first record to return (the read_records generator is empty)
Returns
StreamData containing the first record in the slice