airbyte_cdk.sources.streams.concurrent.abstract_stream

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from abc import ABC, abstractmethod
  6from typing import Any, Iterable, Mapping, Optional
  7
  8from typing_extensions import deprecated
  9
 10from airbyte_cdk.models import AirbyteStream
 11from airbyte_cdk.sources.source import ExperimentalClassWarning
 12from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
 13from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
 14from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 15
 16
 17class AbstractStream(ABC):
 18    """
 19    AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK.
 20    This interface is not yet stable and may change in the future. Use at your own risk.
 21
 22    Why create a new interface instead of adding concurrency capabilities the existing Stream?
 23    We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
 24
 25    High level, the changes we are targeting are:
 26    - Removing superfluous or leaky parameters from the methods' interfaces
 27    - Using composition instead of inheritance to add new capabilities
 28
 29    To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces.
 30    Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
 31
 32    Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
 33    - Only full refresh is supported. This will be addressed in the future.
 34    - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
 35    - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
 36    - The Stream's behavior cannot depend on a namespace
 37    - TypeTransformer is not supported. This will be addressed in the future.
 38    - Nested cursor and primary keys are not supported
 39    """
 40
 41    @abstractmethod
 42    def generate_partitions(self) -> Iterable[Partition]:
 43        """
 44        Generates the partitions that will be read by this stream.
 45        :return: An iterable of partitions.
 46        """
 47
 48    @property
 49    @abstractmethod
 50    def name(self) -> str:
 51        """
 52        :return: The stream name
 53        """
 54
 55    @property
 56    @abstractmethod
 57    def cursor_field(self) -> Optional[str]:
 58        """
 59        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
 60        :return: The name of the field used as a cursor. Nested cursor fields are not supported.
 61        """
 62
 63    @abstractmethod
 64    def get_json_schema(self) -> Mapping[str, Any]:
 65        """
 66        :return: A dict of the JSON schema representing this stream.
 67        """
 68
 69    @abstractmethod
 70    def as_airbyte_stream(self) -> AirbyteStream:
 71        """
 72        :return: A dict of the JSON schema representing this stream.
 73        """
 74
 75    @abstractmethod
 76    def log_stream_sync_configuration(self) -> None:
 77        """
 78        Logs the stream's configuration for debugging purposes.
 79        """
 80
 81    @property
 82    @abstractmethod
 83    def cursor(self) -> Cursor:
 84        """
 85        :return: The cursor associated with this stream.
 86        """
 87
 88    @property
 89    def block_simultaneous_read(self) -> str:
 90        """
 91        Override to return a non-empty group name if this stream should block simultaneous reads.
 92        When a non-empty string is returned, prevents starting partition generation for this stream if:
 93        - Another stream with the same group name is already active
 94        - Any of its parent streams are in an active group
 95
 96        This allows grouping multiple streams that share the same resource (e.g., API endpoint or session)
 97        to prevent them from running concurrently, even if they don't have a parent-child relationship.
 98
 99        :return: Group name for blocking (non-empty string), or "" to allow concurrent reading
100        """
101        return ""  # Default: allow concurrent reading
102
103    @abstractmethod
104    def check_availability(self) -> StreamAvailability:
105        """
106        :return: If the stream is available and if not, why
107        """
class AbstractStream(abc.ABC):
 18class AbstractStream(ABC):
 19    """
 20    AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK.
 21    This interface is not yet stable and may change in the future. Use at your own risk.
 22
 23    Why create a new interface instead of adding concurrency capabilities the existing Stream?
 24    We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
 25
 26    High level, the changes we are targeting are:
 27    - Removing superfluous or leaky parameters from the methods' interfaces
 28    - Using composition instead of inheritance to add new capabilities
 29
 30    To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces.
 31    Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
 32
 33    Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
 34    - Only full refresh is supported. This will be addressed in the future.
 35    - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
 36    - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
 37    - The Stream's behavior cannot depend on a namespace
 38    - TypeTransformer is not supported. This will be addressed in the future.
 39    - Nested cursor and primary keys are not supported
 40    """
 41
 42    @abstractmethod
 43    def generate_partitions(self) -> Iterable[Partition]:
 44        """
 45        Generates the partitions that will be read by this stream.
 46        :return: An iterable of partitions.
 47        """
 48
 49    @property
 50    @abstractmethod
 51    def name(self) -> str:
 52        """
 53        :return: The stream name
 54        """
 55
 56    @property
 57    @abstractmethod
 58    def cursor_field(self) -> Optional[str]:
 59        """
 60        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
 61        :return: The name of the field used as a cursor. Nested cursor fields are not supported.
 62        """
 63
 64    @abstractmethod
 65    def get_json_schema(self) -> Mapping[str, Any]:
 66        """
 67        :return: A dict of the JSON schema representing this stream.
 68        """
 69
 70    @abstractmethod
 71    def as_airbyte_stream(self) -> AirbyteStream:
 72        """
 73        :return: A dict of the JSON schema representing this stream.
 74        """
 75
 76    @abstractmethod
 77    def log_stream_sync_configuration(self) -> None:
 78        """
 79        Logs the stream's configuration for debugging purposes.
 80        """
 81
 82    @property
 83    @abstractmethod
 84    def cursor(self) -> Cursor:
 85        """
 86        :return: The cursor associated with this stream.
 87        """
 88
 89    @property
 90    def block_simultaneous_read(self) -> str:
 91        """
 92        Override to return a non-empty group name if this stream should block simultaneous reads.
 93        When a non-empty string is returned, prevents starting partition generation for this stream if:
 94        - Another stream with the same group name is already active
 95        - Any of its parent streams are in an active group
 96
 97        This allows grouping multiple streams that share the same resource (e.g., API endpoint or session)
 98        to prevent them from running concurrently, even if they don't have a parent-child relationship.
 99
100        :return: Group name for blocking (non-empty string), or "" to allow concurrent reading
101        """
102        return ""  # Default: allow concurrent reading
103
104    @abstractmethod
105    def check_availability(self) -> StreamAvailability:
106        """
107        :return: If the stream is available and if not, why
108        """

AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:

  • Removing superfluous or leaky parameters from the methods' interfaces
  • Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.

  • Only full refresh is supported. This will be addressed in the future.
  • The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
  • Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
  • The Stream's behavior cannot depend on a namespace
  • TypeTransformer is not supported. This will be addressed in the future.
  • Nested cursor and primary keys are not supported
@abstractmethod
def generate_partitions( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
42    @abstractmethod
43    def generate_partitions(self) -> Iterable[Partition]:
44        """
45        Generates the partitions that will be read by this stream.
46        :return: An iterable of partitions.
47        """

Generates the partitions that will be read by this stream.

Returns

An iterable of partitions.

name: str
49    @property
50    @abstractmethod
51    def name(self) -> str:
52        """
53        :return: The stream name
54        """
Returns

The stream name

cursor_field: Optional[str]
56    @property
57    @abstractmethod
58    def cursor_field(self) -> Optional[str]:
59        """
60        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
61        :return: The name of the field used as a cursor. Nested cursor fields are not supported.
62        """

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. Nested cursor fields are not supported.

@abstractmethod
def get_json_schema(self) -> Mapping[str, Any]:
64    @abstractmethod
65    def get_json_schema(self) -> Mapping[str, Any]:
66        """
67        :return: A dict of the JSON schema representing this stream.
68        """
Returns

A dict of the JSON schema representing this stream.

@abstractmethod
def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
70    @abstractmethod
71    def as_airbyte_stream(self) -> AirbyteStream:
72        """
73        :return: A dict of the JSON schema representing this stream.
74        """
Returns

A dict of the JSON schema representing this stream.

@abstractmethod
def log_stream_sync_configuration(self) -> None:
76    @abstractmethod
77    def log_stream_sync_configuration(self) -> None:
78        """
79        Logs the stream's configuration for debugging purposes.
80        """

Logs the stream's configuration for debugging purposes.

cursor: airbyte_cdk.Cursor
82    @property
83    @abstractmethod
84    def cursor(self) -> Cursor:
85        """
86        :return: The cursor associated with this stream.
87        """
Returns

The cursor associated with this stream.

block_simultaneous_read: str
 89    @property
 90    def block_simultaneous_read(self) -> str:
 91        """
 92        Override to return a non-empty group name if this stream should block simultaneous reads.
 93        When a non-empty string is returned, prevents starting partition generation for this stream if:
 94        - Another stream with the same group name is already active
 95        - Any of its parent streams are in an active group
 96
 97        This allows grouping multiple streams that share the same resource (e.g., API endpoint or session)
 98        to prevent them from running concurrently, even if they don't have a parent-child relationship.
 99
100        :return: Group name for blocking (non-empty string), or "" to allow concurrent reading
101        """
102        return ""  # Default: allow concurrent reading

Override to return a non-empty group name if this stream should block simultaneous reads. When a non-empty string is returned, prevents starting partition generation for this stream if:

  • Another stream with the same group name is already active
  • Any of its parent streams are in an active group

This allows grouping multiple streams that share the same resource (e.g., API endpoint or session) to prevent them from running concurrently, even if they don't have a parent-child relationship.

Returns

Group name for blocking (non-empty string), or "" to allow concurrent reading

@abstractmethod
def check_availability( self) -> airbyte_cdk.sources.streams.concurrent.availability_strategy.StreamAvailability:
104    @abstractmethod
105    def check_availability(self) -> StreamAvailability:
106        """
107        :return: If the stream is available and if not, why
108        """
Returns

If the stream is available and if not, why