airbyte_cdk.sources.streams.concurrent.availability_strategy

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5import logging
 6from abc import ABC, abstractmethod
 7from typing import Optional
 8
 9from typing_extensions import deprecated
10
11from airbyte_cdk.sources.source import ExperimentalClassWarning
12
13
14class StreamAvailability(ABC):
15    @abstractmethod
16    def is_available(self) -> bool:
17        """
18        :return: True if the stream is available. False if the stream is not
19        """
20
21    @abstractmethod
22    def message(self) -> Optional[str]:
23        """
24        :return: A message describing why the stream is not available. If the stream is available, this should return None.
25        """
26
27
28class StreamAvailable(StreamAvailability):
29    def is_available(self) -> bool:
30        return True
31
32    def message(self) -> Optional[str]:
33        return None
34
35
36class StreamUnavailable(StreamAvailability):
37    def __init__(self, message: str):
38        self._message = message
39
40    def is_available(self) -> bool:
41        return False
42
43    def message(self) -> Optional[str]:
44        return self._message
45
46
47# Singleton instances of StreamAvailability to avoid the overhead of creating new dummy objects
48STREAM_AVAILABLE = StreamAvailable()
49
50
51@deprecated(
52    "This class is experimental. Use at your own risk.",
53    category=ExperimentalClassWarning,
54)
55class AbstractAvailabilityStrategy(ABC):
56    """
57    AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK.
58    This interface is not yet stable and may change in the future. Use at your own risk.
59
60    Why create a new interface instead of using the existing AvailabilityStrategy?
61    The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream.
62    """
63
64    @abstractmethod
65    def check_availability(self, logger: logging.Logger) -> StreamAvailability:
66        """
67        Checks stream availability.
68
69        :param logger: logger object to use
70        :return: A StreamAvailability object describing the stream's availability
71        """
72
73
74@deprecated(
75    "This class is experimental. Use at your own risk.",
76    category=ExperimentalClassWarning,
77)
78class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy):
79    """
80    An availability strategy that always indicates a stream is available.
81
82    This strategy is used to avoid breaking changes and serves as a soft
83    deprecation of the availability strategy, allowing a smoother transition
84    without disrupting existing functionality.
85    """
86
87    def check_availability(self, logger: logging.Logger) -> StreamAvailability:
88        """
89        Checks stream availability.
90
91        :param logger: logger object to use
92        :return: A StreamAvailability object describing the stream's availability
93        """
94        return StreamAvailable()
class StreamAvailability(abc.ABC):
15class StreamAvailability(ABC):
16    @abstractmethod
17    def is_available(self) -> bool:
18        """
19        :return: True if the stream is available. False if the stream is not
20        """
21
22    @abstractmethod
23    def message(self) -> Optional[str]:
24        """
25        :return: A message describing why the stream is not available. If the stream is available, this should return None.
26        """

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def is_available(self) -> bool:
16    @abstractmethod
17    def is_available(self) -> bool:
18        """
19        :return: True if the stream is available. False if the stream is not
20        """
Returns

True if the stream is available. False if the stream is not

@abstractmethod
def message(self) -> Optional[str]:
22    @abstractmethod
23    def message(self) -> Optional[str]:
24        """
25        :return: A message describing why the stream is not available. If the stream is available, this should return None.
26        """
Returns

A message describing why the stream is not available. If the stream is available, this should return None.

class StreamAvailable(StreamAvailability):
29class StreamAvailable(StreamAvailability):
30    def is_available(self) -> bool:
31        return True
32
33    def message(self) -> Optional[str]:
34        return None

Helper class that provides a standard way to create an ABC using inheritance.

def is_available(self) -> bool:
30    def is_available(self) -> bool:
31        return True
Returns

True if the stream is available. False if the stream is not

def message(self) -> Optional[str]:
33    def message(self) -> Optional[str]:
34        return None
Returns

A message describing why the stream is not available. If the stream is available, this should return None.

class StreamUnavailable(StreamAvailability):
37class StreamUnavailable(StreamAvailability):
38    def __init__(self, message: str):
39        self._message = message
40
41    def is_available(self) -> bool:
42        return False
43
44    def message(self) -> Optional[str]:
45        return self._message

Helper class that provides a standard way to create an ABC using inheritance.

StreamUnavailable(message: str)
38    def __init__(self, message: str):
39        self._message = message
def is_available(self) -> bool:
41    def is_available(self) -> bool:
42        return False
Returns

True if the stream is available. False if the stream is not

def message(self) -> Optional[str]:
44    def message(self) -> Optional[str]:
45        return self._message
Returns

A message describing why the stream is not available. If the stream is available, this should return None.

STREAM_AVAILABLE = <StreamAvailable object>
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
class AbstractAvailabilityStrategy(abc.ABC):
52@deprecated(
53    "This class is experimental. Use at your own risk.",
54    category=ExperimentalClassWarning,
55)
56class AbstractAvailabilityStrategy(ABC):
57    """
58    AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK.
59    This interface is not yet stable and may change in the future. Use at your own risk.
60
61    Why create a new interface instead of using the existing AvailabilityStrategy?
62    The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream.
63    """
64
65    @abstractmethod
66    def check_availability(self, logger: logging.Logger) -> StreamAvailability:
67        """
68        Checks stream availability.
69
70        :param logger: logger object to use
71        :return: A StreamAvailability object describing the stream's availability
72        """

AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of using the existing AvailabilityStrategy? The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream.

@abstractmethod
def check_availability( self, logger: logging.Logger) -> StreamAvailability:
65    @abstractmethod
66    def check_availability(self, logger: logging.Logger) -> StreamAvailability:
67        """
68        Checks stream availability.
69
70        :param logger: logger object to use
71        :return: A StreamAvailability object describing the stream's availability
72        """

Checks stream availability.

Parameters
  • logger: logger object to use
Returns

A StreamAvailability object describing the stream's availability

@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy):
75@deprecated(
76    "This class is experimental. Use at your own risk.",
77    category=ExperimentalClassWarning,
78)
79class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy):
80    """
81    An availability strategy that always indicates a stream is available.
82
83    This strategy is used to avoid breaking changes and serves as a soft
84    deprecation of the availability strategy, allowing a smoother transition
85    without disrupting existing functionality.
86    """
87
88    def check_availability(self, logger: logging.Logger) -> StreamAvailability:
89        """
90        Checks stream availability.
91
92        :param logger: logger object to use
93        :return: A StreamAvailability object describing the stream's availability
94        """
95        return StreamAvailable()

An availability strategy that always indicates a stream is available.

This strategy is used to avoid breaking changes and serves as a soft deprecation of the availability strategy, allowing a smoother transition without disrupting existing functionality.

def check_availability( self, logger: logging.Logger) -> StreamAvailability:
88    def check_availability(self, logger: logging.Logger) -> StreamAvailability:
89        """
90        Checks stream availability.
91
92        :param logger: logger object to use
93        :return: A StreamAvailability object describing the stream's availability
94        """
95        return StreamAvailable()

Checks stream availability.

Parameters
  • logger: logger object to use
Returns

A StreamAvailability object describing the stream's availability