airbyte_cdk.sources.streams.concurrent.abstract_stream

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from abc import ABC, abstractmethod
 6from typing import Any, Iterable, Mapping, Optional
 7
 8from typing_extensions import deprecated
 9
10from airbyte_cdk.models import AirbyteStream
11from airbyte_cdk.sources.source import ExperimentalClassWarning
12from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
13from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
14from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
15
16
17class AbstractStream(ABC):
18    """
19    AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK.
20    This interface is not yet stable and may change in the future. Use at your own risk.
21
22    Why create a new interface instead of adding concurrency capabilities the existing Stream?
23    We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
24
25    High level, the changes we are targeting are:
26    - Removing superfluous or leaky parameters from the methods' interfaces
27    - Using composition instead of inheritance to add new capabilities
28
29    To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces.
30    Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
31
32    Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
33    - Only full refresh is supported. This will be addressed in the future.
34    - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
35    - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
36    - The Stream's behavior cannot depend on a namespace
37    - TypeTransformer is not supported. This will be addressed in the future.
38    - Nested cursor and primary keys are not supported
39    """
40
41    @abstractmethod
42    def generate_partitions(self) -> Iterable[Partition]:
43        """
44        Generates the partitions that will be read by this stream.
45        :return: An iterable of partitions.
46        """
47
48    @property
49    @abstractmethod
50    def name(self) -> str:
51        """
52        :return: The stream name
53        """
54
55    @property
56    @abstractmethod
57    def cursor_field(self) -> Optional[str]:
58        """
59        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
60        :return: The name of the field used as a cursor. Nested cursor fields are not supported.
61        """
62
63    @abstractmethod
64    def get_json_schema(self) -> Mapping[str, Any]:
65        """
66        :return: A dict of the JSON schema representing this stream.
67        """
68
69    @abstractmethod
70    def as_airbyte_stream(self) -> AirbyteStream:
71        """
72        :return: A dict of the JSON schema representing this stream.
73        """
74
75    @abstractmethod
76    def log_stream_sync_configuration(self) -> None:
77        """
78        Logs the stream's configuration for debugging purposes.
79        """
80
81    @property
82    @abstractmethod
83    def cursor(self) -> Cursor:
84        """
85        :return: The cursor associated with this stream.
86        """
87
88    @abstractmethod
89    def check_availability(self) -> StreamAvailability:
90        """
91        :return: If the stream is available and if not, why
92        """
class AbstractStream(abc.ABC):
18class AbstractStream(ABC):
19    """
20    AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK.
21    This interface is not yet stable and may change in the future. Use at your own risk.
22
23    Why create a new interface instead of adding concurrency capabilities the existing Stream?
24    We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
25
26    High level, the changes we are targeting are:
27    - Removing superfluous or leaky parameters from the methods' interfaces
28    - Using composition instead of inheritance to add new capabilities
29
30    To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces.
31    Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
32
33    Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
34    - Only full refresh is supported. This will be addressed in the future.
35    - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
36    - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
37    - The Stream's behavior cannot depend on a namespace
38    - TypeTransformer is not supported. This will be addressed in the future.
39    - Nested cursor and primary keys are not supported
40    """
41
42    @abstractmethod
43    def generate_partitions(self) -> Iterable[Partition]:
44        """
45        Generates the partitions that will be read by this stream.
46        :return: An iterable of partitions.
47        """
48
49    @property
50    @abstractmethod
51    def name(self) -> str:
52        """
53        :return: The stream name
54        """
55
56    @property
57    @abstractmethod
58    def cursor_field(self) -> Optional[str]:
59        """
60        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
61        :return: The name of the field used as a cursor. Nested cursor fields are not supported.
62        """
63
64    @abstractmethod
65    def get_json_schema(self) -> Mapping[str, Any]:
66        """
67        :return: A dict of the JSON schema representing this stream.
68        """
69
70    @abstractmethod
71    def as_airbyte_stream(self) -> AirbyteStream:
72        """
73        :return: A dict of the JSON schema representing this stream.
74        """
75
76    @abstractmethod
77    def log_stream_sync_configuration(self) -> None:
78        """
79        Logs the stream's configuration for debugging purposes.
80        """
81
82    @property
83    @abstractmethod
84    def cursor(self) -> Cursor:
85        """
86        :return: The cursor associated with this stream.
87        """
88
89    @abstractmethod
90    def check_availability(self) -> StreamAvailability:
91        """
92        :return: If the stream is available and if not, why
93        """

AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:

  • Removing superfluous or leaky parameters from the methods' interfaces
  • Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.

  • Only full refresh is supported. This will be addressed in the future.
  • The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
  • Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
  • The Stream's behavior cannot depend on a namespace
  • TypeTransformer is not supported. This will be addressed in the future.
  • Nested cursor and primary keys are not supported
@abstractmethod
def generate_partitions( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
42    @abstractmethod
43    def generate_partitions(self) -> Iterable[Partition]:
44        """
45        Generates the partitions that will be read by this stream.
46        :return: An iterable of partitions.
47        """

Generates the partitions that will be read by this stream.

Returns

An iterable of partitions.

name: str
49    @property
50    @abstractmethod
51    def name(self) -> str:
52        """
53        :return: The stream name
54        """
Returns

The stream name

cursor_field: Optional[str]
56    @property
57    @abstractmethod
58    def cursor_field(self) -> Optional[str]:
59        """
60        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
61        :return: The name of the field used as a cursor. Nested cursor fields are not supported.
62        """

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. Nested cursor fields are not supported.

@abstractmethod
def get_json_schema(self) -> Mapping[str, Any]:
64    @abstractmethod
65    def get_json_schema(self) -> Mapping[str, Any]:
66        """
67        :return: A dict of the JSON schema representing this stream.
68        """
Returns

A dict of the JSON schema representing this stream.

@abstractmethod
def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
70    @abstractmethod
71    def as_airbyte_stream(self) -> AirbyteStream:
72        """
73        :return: A dict of the JSON schema representing this stream.
74        """
Returns

A dict of the JSON schema representing this stream.

@abstractmethod
def log_stream_sync_configuration(self) -> None:
76    @abstractmethod
77    def log_stream_sync_configuration(self) -> None:
78        """
79        Logs the stream's configuration for debugging purposes.
80        """

Logs the stream's configuration for debugging purposes.

cursor: airbyte_cdk.Cursor
82    @property
83    @abstractmethod
84    def cursor(self) -> Cursor:
85        """
86        :return: The cursor associated with this stream.
87        """
Returns

The cursor associated with this stream.

@abstractmethod
def check_availability( self) -> airbyte_cdk.sources.streams.concurrent.availability_strategy.StreamAvailability:
89    @abstractmethod
90    def check_availability(self) -> StreamAvailability:
91        """
92        :return: If the stream is available and if not, why
93        """
Returns

If the stream is available and if not, why