airbyte_cdk.sources.streams.concurrent.abstract_stream
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import ABC, abstractmethod 6from typing import Any, Iterable, Mapping, Optional 7 8from typing_extensions import deprecated 9 10from airbyte_cdk.models import AirbyteStream 11from airbyte_cdk.sources.source import ExperimentalClassWarning 12from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability 13from airbyte_cdk.sources.streams.concurrent.cursor import Cursor 14from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 15 16 17@deprecated( 18 "This class is experimental. Use at your own risk.", 19 category=ExperimentalClassWarning, 20) 21class AbstractStream(ABC): 22 """ 23 AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. 24 This interface is not yet stable and may change in the future. Use at your own risk. 25 26 Why create a new interface instead of adding concurrency capabilities the existing Stream? 27 We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve. 28 29 High level, the changes we are targeting are: 30 - Removing superfluous or leaky parameters from the methods' interfaces 31 - Using composition instead of inheritance to add new capabilities 32 33 To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. 34 Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly 35 36 Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design. 37 - Only full refresh is supported. This will be addressed in the future. 38 - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field. 39 - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future. 40 - The Stream's behavior cannot depend on a namespace 41 - TypeTransformer is not supported. This will be addressed in the future. 42 - Nested cursor and primary keys are not supported 43 """ 44 45 @abstractmethod 46 def generate_partitions(self) -> Iterable[Partition]: 47 """ 48 Generates the partitions that will be read by this stream. 49 :return: An iterable of partitions. 50 """ 51 52 @property 53 @abstractmethod 54 def name(self) -> str: 55 """ 56 :return: The stream name 57 """ 58 59 @property 60 @abstractmethod 61 def cursor_field(self) -> Optional[str]: 62 """ 63 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 64 :return: The name of the field used as a cursor. Nested cursor fields are not supported. 65 """ 66 67 @abstractmethod 68 def check_availability(self) -> StreamAvailability: 69 """ 70 :return: The stream's availability 71 """ 72 73 @abstractmethod 74 def get_json_schema(self) -> Mapping[str, Any]: 75 """ 76 :return: A dict of the JSON schema representing this stream. 77 """ 78 79 @abstractmethod 80 def as_airbyte_stream(self) -> AirbyteStream: 81 """ 82 :return: A dict of the JSON schema representing this stream. 83 """ 84 85 @abstractmethod 86 def log_stream_sync_configuration(self) -> None: 87 """ 88 Logs the stream's configuration for debugging purposes. 89 """ 90 91 @property 92 @abstractmethod 93 def cursor(self) -> Cursor: 94 """ 95 :return: The cursor associated with this stream. 96 """
18@deprecated( 19 "This class is experimental. Use at your own risk.", 20 category=ExperimentalClassWarning, 21) 22class AbstractStream(ABC): 23 """ 24 AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. 25 This interface is not yet stable and may change in the future. Use at your own risk. 26 27 Why create a new interface instead of adding concurrency capabilities the existing Stream? 28 We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve. 29 30 High level, the changes we are targeting are: 31 - Removing superfluous or leaky parameters from the methods' interfaces 32 - Using composition instead of inheritance to add new capabilities 33 34 To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. 35 Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly 36 37 Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design. 38 - Only full refresh is supported. This will be addressed in the future. 39 - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field. 40 - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future. 41 - The Stream's behavior cannot depend on a namespace 42 - TypeTransformer is not supported. This will be addressed in the future. 43 - Nested cursor and primary keys are not supported 44 """ 45 46 @abstractmethod 47 def generate_partitions(self) -> Iterable[Partition]: 48 """ 49 Generates the partitions that will be read by this stream. 50 :return: An iterable of partitions. 51 """ 52 53 @property 54 @abstractmethod 55 def name(self) -> str: 56 """ 57 :return: The stream name 58 """ 59 60 @property 61 @abstractmethod 62 def cursor_field(self) -> Optional[str]: 63 """ 64 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 65 :return: The name of the field used as a cursor. Nested cursor fields are not supported. 66 """ 67 68 @abstractmethod 69 def check_availability(self) -> StreamAvailability: 70 """ 71 :return: The stream's availability 72 """ 73 74 @abstractmethod 75 def get_json_schema(self) -> Mapping[str, Any]: 76 """ 77 :return: A dict of the JSON schema representing this stream. 78 """ 79 80 @abstractmethod 81 def as_airbyte_stream(self) -> AirbyteStream: 82 """ 83 :return: A dict of the JSON schema representing this stream. 84 """ 85 86 @abstractmethod 87 def log_stream_sync_configuration(self) -> None: 88 """ 89 Logs the stream's configuration for debugging purposes. 90 """ 91 92 @property 93 @abstractmethod 94 def cursor(self) -> Cursor: 95 """ 96 :return: The cursor associated with this stream. 97 """
AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.
Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
High level, the changes we are targeting are:
- Removing superfluous or leaky parameters from the methods' interfaces
- Using composition instead of inheritance to add new capabilities
To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
- Only full refresh is supported. This will be addressed in the future.
- The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
- Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
- The Stream's behavior cannot depend on a namespace
- TypeTransformer is not supported. This will be addressed in the future.
- Nested cursor and primary keys are not supported
46 @abstractmethod 47 def generate_partitions(self) -> Iterable[Partition]: 48 """ 49 Generates the partitions that will be read by this stream. 50 :return: An iterable of partitions. 51 """
Generates the partitions that will be read by this stream.
Returns
An iterable of partitions.
60 @property 61 @abstractmethod 62 def cursor_field(self) -> Optional[str]: 63 """ 64 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 65 :return: The name of the field used as a cursor. Nested cursor fields are not supported. 66 """
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. Nested cursor fields are not supported.
68 @abstractmethod 69 def check_availability(self) -> StreamAvailability: 70 """ 71 :return: The stream's availability 72 """
Returns
The stream's availability
74 @abstractmethod 75 def get_json_schema(self) -> Mapping[str, Any]: 76 """ 77 :return: A dict of the JSON schema representing this stream. 78 """
Returns
A dict of the JSON schema representing this stream.
80 @abstractmethod 81 def as_airbyte_stream(self) -> AirbyteStream: 82 """ 83 :return: A dict of the JSON schema representing this stream. 84 """
Returns
A dict of the JSON schema representing this stream.