airbyte_cdk.sources.streams.concurrent.abstract_stream

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from abc import ABC, abstractmethod
 6from typing import Any, Iterable, Mapping, Optional
 7
 8from typing_extensions import deprecated
 9
10from airbyte_cdk.models import AirbyteStream
11from airbyte_cdk.sources.source import ExperimentalClassWarning
12from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
13from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
14from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
15
16
17@deprecated(
18    "This class is experimental. Use at your own risk.",
19    category=ExperimentalClassWarning,
20)
21class AbstractStream(ABC):
22    """
23    AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK.
24    This interface is not yet stable and may change in the future. Use at your own risk.
25
26    Why create a new interface instead of adding concurrency capabilities the existing Stream?
27    We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
28
29    High level, the changes we are targeting are:
30    - Removing superfluous or leaky parameters from the methods' interfaces
31    - Using composition instead of inheritance to add new capabilities
32
33    To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces.
34    Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
35
36    Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
37    - Only full refresh is supported. This will be addressed in the future.
38    - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
39    - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
40    - The Stream's behavior cannot depend on a namespace
41    - TypeTransformer is not supported. This will be addressed in the future.
42    - Nested cursor and primary keys are not supported
43    """
44
45    @abstractmethod
46    def generate_partitions(self) -> Iterable[Partition]:
47        """
48        Generates the partitions that will be read by this stream.
49        :return: An iterable of partitions.
50        """
51
52    @property
53    @abstractmethod
54    def name(self) -> str:
55        """
56        :return: The stream name
57        """
58
59    @property
60    @abstractmethod
61    def cursor_field(self) -> Optional[str]:
62        """
63        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
64        :return: The name of the field used as a cursor. Nested cursor fields are not supported.
65        """
66
67    @abstractmethod
68    def check_availability(self) -> StreamAvailability:
69        """
70        :return: The stream's availability
71        """
72
73    @abstractmethod
74    def get_json_schema(self) -> Mapping[str, Any]:
75        """
76        :return: A dict of the JSON schema representing this stream.
77        """
78
79    @abstractmethod
80    def as_airbyte_stream(self) -> AirbyteStream:
81        """
82        :return: A dict of the JSON schema representing this stream.
83        """
84
85    @abstractmethod
86    def log_stream_sync_configuration(self) -> None:
87        """
88        Logs the stream's configuration for debugging purposes.
89        """
90
91    @property
92    @abstractmethod
93    def cursor(self) -> Cursor:
94        """
95        :return: The cursor associated with this stream.
96        """
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
class AbstractStream(abc.ABC):
18@deprecated(
19    "This class is experimental. Use at your own risk.",
20    category=ExperimentalClassWarning,
21)
22class AbstractStream(ABC):
23    """
24    AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK.
25    This interface is not yet stable and may change in the future. Use at your own risk.
26
27    Why create a new interface instead of adding concurrency capabilities the existing Stream?
28    We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
29
30    High level, the changes we are targeting are:
31    - Removing superfluous or leaky parameters from the methods' interfaces
32    - Using composition instead of inheritance to add new capabilities
33
34    To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces.
35    Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
36
37    Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
38    - Only full refresh is supported. This will be addressed in the future.
39    - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
40    - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
41    - The Stream's behavior cannot depend on a namespace
42    - TypeTransformer is not supported. This will be addressed in the future.
43    - Nested cursor and primary keys are not supported
44    """
45
46    @abstractmethod
47    def generate_partitions(self) -> Iterable[Partition]:
48        """
49        Generates the partitions that will be read by this stream.
50        :return: An iterable of partitions.
51        """
52
53    @property
54    @abstractmethod
55    def name(self) -> str:
56        """
57        :return: The stream name
58        """
59
60    @property
61    @abstractmethod
62    def cursor_field(self) -> Optional[str]:
63        """
64        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
65        :return: The name of the field used as a cursor. Nested cursor fields are not supported.
66        """
67
68    @abstractmethod
69    def check_availability(self) -> StreamAvailability:
70        """
71        :return: The stream's availability
72        """
73
74    @abstractmethod
75    def get_json_schema(self) -> Mapping[str, Any]:
76        """
77        :return: A dict of the JSON schema representing this stream.
78        """
79
80    @abstractmethod
81    def as_airbyte_stream(self) -> AirbyteStream:
82        """
83        :return: A dict of the JSON schema representing this stream.
84        """
85
86    @abstractmethod
87    def log_stream_sync_configuration(self) -> None:
88        """
89        Logs the stream's configuration for debugging purposes.
90        """
91
92    @property
93    @abstractmethod
94    def cursor(self) -> Cursor:
95        """
96        :return: The cursor associated with this stream.
97        """

AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.

High level, the changes we are targeting are:

  • Removing superfluous or leaky parameters from the methods' interfaces
  • Using composition instead of inheritance to add new capabilities

To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly

Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.

  • Only full refresh is supported. This will be addressed in the future.
  • The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
  • Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
  • The Stream's behavior cannot depend on a namespace
  • TypeTransformer is not supported. This will be addressed in the future.
  • Nested cursor and primary keys are not supported
@abstractmethod
def generate_partitions( self) -> Iterable[airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition]:
46    @abstractmethod
47    def generate_partitions(self) -> Iterable[Partition]:
48        """
49        Generates the partitions that will be read by this stream.
50        :return: An iterable of partitions.
51        """

Generates the partitions that will be read by this stream.

Returns

An iterable of partitions.

name: str
53    @property
54    @abstractmethod
55    def name(self) -> str:
56        """
57        :return: The stream name
58        """
Returns

The stream name

cursor_field: Optional[str]
60    @property
61    @abstractmethod
62    def cursor_field(self) -> Optional[str]:
63        """
64        Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
65        :return: The name of the field used as a cursor. Nested cursor fields are not supported.
66        """

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. Nested cursor fields are not supported.

@abstractmethod
def check_availability( self) -> airbyte_cdk.sources.streams.concurrent.availability_strategy.StreamAvailability:
68    @abstractmethod
69    def check_availability(self) -> StreamAvailability:
70        """
71        :return: The stream's availability
72        """
Returns

The stream's availability

@abstractmethod
def get_json_schema(self) -> Mapping[str, Any]:
74    @abstractmethod
75    def get_json_schema(self) -> Mapping[str, Any]:
76        """
77        :return: A dict of the JSON schema representing this stream.
78        """
Returns

A dict of the JSON schema representing this stream.

@abstractmethod
def as_airbyte_stream( self) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStream:
80    @abstractmethod
81    def as_airbyte_stream(self) -> AirbyteStream:
82        """
83        :return: A dict of the JSON schema representing this stream.
84        """
Returns

A dict of the JSON schema representing this stream.

@abstractmethod
def log_stream_sync_configuration(self) -> None:
86    @abstractmethod
87    def log_stream_sync_configuration(self) -> None:
88        """
89        Logs the stream's configuration for debugging purposes.
90        """

Logs the stream's configuration for debugging purposes.

cursor: airbyte_cdk.Cursor
92    @property
93    @abstractmethod
94    def cursor(self) -> Cursor:
95        """
96        :return: The cursor associated with this stream.
97        """
Returns

The cursor associated with this stream.