airbyte_cdk.sources.streams.concurrent.availability_strategy
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from abc import ABC, abstractmethod 7from typing import Optional 8 9from typing_extensions import deprecated 10 11from airbyte_cdk.sources.source import ExperimentalClassWarning 12 13 14class StreamAvailability(ABC): 15 @abstractmethod 16 def is_available(self) -> bool: 17 """ 18 :return: True if the stream is available. False if the stream is not 19 """ 20 21 @abstractmethod 22 def message(self) -> Optional[str]: 23 """ 24 :return: A message describing why the stream is not available. If the stream is available, this should return None. 25 """ 26 27 28class StreamAvailable(StreamAvailability): 29 def is_available(self) -> bool: 30 return True 31 32 def message(self) -> Optional[str]: 33 return None 34 35 36class StreamUnavailable(StreamAvailability): 37 def __init__(self, message: str): 38 self._message = message 39 40 def is_available(self) -> bool: 41 return False 42 43 def message(self) -> Optional[str]: 44 return self._message 45 46 47# Singleton instances of StreamAvailability to avoid the overhead of creating new dummy objects 48STREAM_AVAILABLE = StreamAvailable() 49 50 51@deprecated( 52 "This class is experimental. Use at your own risk.", 53 category=ExperimentalClassWarning, 54) 55class AbstractAvailabilityStrategy(ABC): 56 """ 57 AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK. 58 This interface is not yet stable and may change in the future. Use at your own risk. 59 60 Why create a new interface instead of using the existing AvailabilityStrategy? 61 The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream. 62 """ 63 64 @abstractmethod 65 def check_availability(self, logger: logging.Logger) -> StreamAvailability: 66 """ 67 Checks stream availability. 68 69 :param logger: logger object to use 70 :return: A StreamAvailability object describing the stream's availability 71 """ 72 73 74@deprecated( 75 "This class is experimental. Use at your own risk.", 76 category=ExperimentalClassWarning, 77) 78class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy): 79 """ 80 An availability strategy that always indicates a stream is available. 81 82 This strategy is used to avoid breaking changes and serves as a soft 83 deprecation of the availability strategy, allowing a smoother transition 84 without disrupting existing functionality. 85 """ 86 87 def check_availability(self, logger: logging.Logger) -> StreamAvailability: 88 """ 89 Checks stream availability. 90 91 :param logger: logger object to use 92 :return: A StreamAvailability object describing the stream's availability 93 """ 94 return StreamAvailable()
15class StreamAvailability(ABC): 16 @abstractmethod 17 def is_available(self) -> bool: 18 """ 19 :return: True if the stream is available. False if the stream is not 20 """ 21 22 @abstractmethod 23 def message(self) -> Optional[str]: 24 """ 25 :return: A message describing why the stream is not available. If the stream is available, this should return None. 26 """
Helper class that provides a standard way to create an ABC using inheritance.
16 @abstractmethod 17 def is_available(self) -> bool: 18 """ 19 :return: True if the stream is available. False if the stream is not 20 """
Returns
True if the stream is available. False if the stream is not
22 @abstractmethod 23 def message(self) -> Optional[str]: 24 """ 25 :return: A message describing why the stream is not available. If the stream is available, this should return None. 26 """
Returns
A message describing why the stream is not available. If the stream is available, this should return None.
29class StreamAvailable(StreamAvailability): 30 def is_available(self) -> bool: 31 return True 32 33 def message(self) -> Optional[str]: 34 return None
Helper class that provides a standard way to create an ABC using inheritance.
52@deprecated( 53 "This class is experimental. Use at your own risk.", 54 category=ExperimentalClassWarning, 55) 56class AbstractAvailabilityStrategy(ABC): 57 """ 58 AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK. 59 This interface is not yet stable and may change in the future. Use at your own risk. 60 61 Why create a new interface instead of using the existing AvailabilityStrategy? 62 The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream. 63 """ 64 65 @abstractmethod 66 def check_availability(self, logger: logging.Logger) -> StreamAvailability: 67 """ 68 Checks stream availability. 69 70 :param logger: logger object to use 71 :return: A StreamAvailability object describing the stream's availability 72 """
AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.
Why create a new interface instead of using the existing AvailabilityStrategy? The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream.
65 @abstractmethod 66 def check_availability(self, logger: logging.Logger) -> StreamAvailability: 67 """ 68 Checks stream availability. 69 70 :param logger: logger object to use 71 :return: A StreamAvailability object describing the stream's availability 72 """
Checks stream availability.
Parameters
- logger: logger object to use
Returns
A StreamAvailability object describing the stream's availability
75@deprecated( 76 "This class is experimental. Use at your own risk.", 77 category=ExperimentalClassWarning, 78) 79class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy): 80 """ 81 An availability strategy that always indicates a stream is available. 82 83 This strategy is used to avoid breaking changes and serves as a soft 84 deprecation of the availability strategy, allowing a smoother transition 85 without disrupting existing functionality. 86 """ 87 88 def check_availability(self, logger: logging.Logger) -> StreamAvailability: 89 """ 90 Checks stream availability. 91 92 :param logger: logger object to use 93 :return: A StreamAvailability object describing the stream's availability 94 """ 95 return StreamAvailable()
An availability strategy that always indicates a stream is available.
This strategy is used to avoid breaking changes and serves as a soft deprecation of the availability strategy, allowing a smoother transition without disrupting existing functionality.
88 def check_availability(self, logger: logging.Logger) -> StreamAvailability: 89 """ 90 Checks stream availability. 91 92 :param logger: logger object to use 93 :return: A StreamAvailability object describing the stream's availability 94 """ 95 return StreamAvailable()
Checks stream availability.
Parameters
- logger: logger object to use
Returns
A StreamAvailability object describing the stream's availability