airbyte_cdk.sources.streams.availability_strategy
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6import typing 7from abc import ABC, abstractmethod 8from typing import Any, Mapping, Optional, Tuple 9 10from airbyte_cdk.models import SyncMode 11from airbyte_cdk.sources.streams.core import Stream, StreamData 12 13if typing.TYPE_CHECKING: 14 from airbyte_cdk.sources import Source 15 16 17# FIXME this 18class AvailabilityStrategy(ABC): 19 """ 20 Abstract base class for checking stream availability. 21 """ 22 23 @abstractmethod 24 def check_availability( 25 self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None 26 ) -> Tuple[bool, Optional[str]]: 27 """ 28 Checks stream availability. 29 30 :param stream: stream 31 :param logger: source logger 32 :param source: (optional) source 33 :return: A tuple of (boolean, str). If boolean is true, then the stream 34 is available, and no str is required. Otherwise, the stream is unavailable 35 for some reason and the str should describe what went wrong and how to 36 resolve the unavailability, if possible. 37 """ 38 39 @staticmethod 40 def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: 41 """ 42 Gets the first stream_slice from a given stream's stream_slices. 43 :param stream: stream 44 :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) 45 :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) 46 """ 47 # We wrap the return output of stream_slices() because some implementations return types that are iterable, 48 # but not iterators such as lists or tuples 49 slices = iter( 50 stream.stream_slices( 51 cursor_field=stream.cursor_field, # type: ignore[arg-type] 52 sync_mode=SyncMode.full_refresh, 53 ) 54 ) 55 return next(slices) 56 57 @staticmethod 58 def get_first_record_for_slice( 59 stream: Stream, stream_slice: Optional[Mapping[str, Any]] 60 ) -> StreamData: 61 """ 62 Gets the first record for a stream_slice of a stream. 63 64 :param stream: stream instance from which to read records 65 :param stream_slice: stream_slice parameters for slicing the stream 66 :raises StopIteration: if there is no first record to return (the read_records generator is empty) 67 :return: StreamData containing the first record in the slice 68 """ 69 # Store the original value of exit_on_rate_limit 70 original_exit_on_rate_limit = stream.exit_on_rate_limit 71 72 try: 73 # Ensure exit_on_rate_limit is safely set to True if possible 74 stream.exit_on_rate_limit = True 75 76 # We wrap the return output of read_records() because some implementations return types that are iterable, 77 # but not iterators such as lists or tuples 78 records_for_slice = iter( 79 stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) 80 ) 81 82 return next(records_for_slice) 83 finally: 84 # Restore the original exit_on_rate_limit value 85 stream.exit_on_rate_limit = original_exit_on_rate_limit
class
AvailabilityStrategy(abc.ABC):
19class AvailabilityStrategy(ABC): 20 """ 21 Abstract base class for checking stream availability. 22 """ 23 24 @abstractmethod 25 def check_availability( 26 self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None 27 ) -> Tuple[bool, Optional[str]]: 28 """ 29 Checks stream availability. 30 31 :param stream: stream 32 :param logger: source logger 33 :param source: (optional) source 34 :return: A tuple of (boolean, str). If boolean is true, then the stream 35 is available, and no str is required. Otherwise, the stream is unavailable 36 for some reason and the str should describe what went wrong and how to 37 resolve the unavailability, if possible. 38 """ 39 40 @staticmethod 41 def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: 42 """ 43 Gets the first stream_slice from a given stream's stream_slices. 44 :param stream: stream 45 :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) 46 :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) 47 """ 48 # We wrap the return output of stream_slices() because some implementations return types that are iterable, 49 # but not iterators such as lists or tuples 50 slices = iter( 51 stream.stream_slices( 52 cursor_field=stream.cursor_field, # type: ignore[arg-type] 53 sync_mode=SyncMode.full_refresh, 54 ) 55 ) 56 return next(slices) 57 58 @staticmethod 59 def get_first_record_for_slice( 60 stream: Stream, stream_slice: Optional[Mapping[str, Any]] 61 ) -> StreamData: 62 """ 63 Gets the first record for a stream_slice of a stream. 64 65 :param stream: stream instance from which to read records 66 :param stream_slice: stream_slice parameters for slicing the stream 67 :raises StopIteration: if there is no first record to return (the read_records generator is empty) 68 :return: StreamData containing the first record in the slice 69 """ 70 # Store the original value of exit_on_rate_limit 71 original_exit_on_rate_limit = stream.exit_on_rate_limit 72 73 try: 74 # Ensure exit_on_rate_limit is safely set to True if possible 75 stream.exit_on_rate_limit = True 76 77 # We wrap the return output of read_records() because some implementations return types that are iterable, 78 # but not iterators such as lists or tuples 79 records_for_slice = iter( 80 stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) 81 ) 82 83 return next(records_for_slice) 84 finally: 85 # Restore the original exit_on_rate_limit value 86 stream.exit_on_rate_limit = original_exit_on_rate_limit
Abstract base class for checking stream availability.
@abstractmethod
def
check_availability( self, stream: airbyte_cdk.Stream, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
24 @abstractmethod 25 def check_availability( 26 self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None 27 ) -> Tuple[bool, Optional[str]]: 28 """ 29 Checks stream availability. 30 31 :param stream: stream 32 :param logger: source logger 33 :param source: (optional) source 34 :return: A tuple of (boolean, str). If boolean is true, then the stream 35 is available, and no str is required. Otherwise, the stream is unavailable 36 for some reason and the str should describe what went wrong and how to 37 resolve the unavailability, if possible. 38 """
Checks stream availability.
Parameters
- stream: stream
- logger: source logger
- source: (optional) source
Returns
A tuple of (boolean, str). If boolean is true, then the stream is available, and no str is required. Otherwise, the stream is unavailable for some reason and the str should describe what went wrong and how to resolve the unavailability, if possible.
@staticmethod
def
get_first_stream_slice( stream: airbyte_cdk.Stream) -> Optional[Mapping[str, Any]]:
40 @staticmethod 41 def get_first_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]: 42 """ 43 Gets the first stream_slice from a given stream's stream_slices. 44 :param stream: stream 45 :raises StopIteration: if there is no first slice to return (the stream_slices generator is empty) 46 :return: first stream slice from 'stream_slices' generator (`None` is a valid stream slice) 47 """ 48 # We wrap the return output of stream_slices() because some implementations return types that are iterable, 49 # but not iterators such as lists or tuples 50 slices = iter( 51 stream.stream_slices( 52 cursor_field=stream.cursor_field, # type: ignore[arg-type] 53 sync_mode=SyncMode.full_refresh, 54 ) 55 ) 56 return next(slices)
Gets the first stream_slice from a given stream's stream_slices.
Parameters
- stream: stream
Raises
- StopIteration: if there is no first slice to return (the stream_slices generator is empty)
Returns
first stream slice from 'stream_slices' generator (
None
is a valid stream slice)
@staticmethod
def
get_first_record_for_slice( stream: airbyte_cdk.Stream, stream_slice: Optional[Mapping[str, Any]]) -> Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]:
58 @staticmethod 59 def get_first_record_for_slice( 60 stream: Stream, stream_slice: Optional[Mapping[str, Any]] 61 ) -> StreamData: 62 """ 63 Gets the first record for a stream_slice of a stream. 64 65 :param stream: stream instance from which to read records 66 :param stream_slice: stream_slice parameters for slicing the stream 67 :raises StopIteration: if there is no first record to return (the read_records generator is empty) 68 :return: StreamData containing the first record in the slice 69 """ 70 # Store the original value of exit_on_rate_limit 71 original_exit_on_rate_limit = stream.exit_on_rate_limit 72 73 try: 74 # Ensure exit_on_rate_limit is safely set to True if possible 75 stream.exit_on_rate_limit = True 76 77 # We wrap the return output of read_records() because some implementations return types that are iterable, 78 # but not iterators such as lists or tuples 79 records_for_slice = iter( 80 stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice) 81 ) 82 83 return next(records_for_slice) 84 finally: 85 # Restore the original exit_on_rate_limit value 86 stream.exit_on_rate_limit = original_exit_on_rate_limit
Gets the first record for a stream_slice of a stream.
Parameters
- stream: stream instance from which to read records
- stream_slice: stream_slice parameters for slicing the stream
Raises
- StopIteration: if there is no first record to return (the read_records generator is empty)
Returns
StreamData containing the first record in the slice