airbyte_cdk.sources.streams.concurrent.abstract_stream
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import ABC, abstractmethod 6from typing import Any, Iterable, Mapping, Optional 7 8from typing_extensions import deprecated 9 10from airbyte_cdk.models import AirbyteStream 11from airbyte_cdk.sources.source import ExperimentalClassWarning 12from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability 13from airbyte_cdk.sources.streams.concurrent.cursor import Cursor 14from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 15 16 17class AbstractStream(ABC): 18 """ 19 AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. 20 This interface is not yet stable and may change in the future. Use at your own risk. 21 22 Why create a new interface instead of adding concurrency capabilities the existing Stream? 23 We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve. 24 25 High level, the changes we are targeting are: 26 - Removing superfluous or leaky parameters from the methods' interfaces 27 - Using composition instead of inheritance to add new capabilities 28 29 To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. 30 Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly 31 32 Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design. 33 - Only full refresh is supported. This will be addressed in the future. 34 - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field. 35 - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future. 36 - The Stream's behavior cannot depend on a namespace 37 - TypeTransformer is not supported. This will be addressed in the future. 38 - Nested cursor and primary keys are not supported 39 """ 40 41 @abstractmethod 42 def generate_partitions(self) -> Iterable[Partition]: 43 """ 44 Generates the partitions that will be read by this stream. 45 :return: An iterable of partitions. 46 """ 47 48 @property 49 @abstractmethod 50 def name(self) -> str: 51 """ 52 :return: The stream name 53 """ 54 55 @property 56 @abstractmethod 57 def cursor_field(self) -> Optional[str]: 58 """ 59 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 60 :return: The name of the field used as a cursor. Nested cursor fields are not supported. 61 """ 62 63 @abstractmethod 64 def get_json_schema(self) -> Mapping[str, Any]: 65 """ 66 :return: A dict of the JSON schema representing this stream. 67 """ 68 69 @abstractmethod 70 def as_airbyte_stream(self) -> AirbyteStream: 71 """ 72 :return: A dict of the JSON schema representing this stream. 73 """ 74 75 @abstractmethod 76 def log_stream_sync_configuration(self) -> None: 77 """ 78 Logs the stream's configuration for debugging purposes. 79 """ 80 81 @property 82 @abstractmethod 83 def cursor(self) -> Cursor: 84 """ 85 :return: The cursor associated with this stream. 86 """ 87 88 @abstractmethod 89 def check_availability(self) -> StreamAvailability: 90 """ 91 :return: If the stream is available and if not, why 92 """
18class AbstractStream(ABC): 19 """ 20 AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. 21 This interface is not yet stable and may change in the future. Use at your own risk. 22 23 Why create a new interface instead of adding concurrency capabilities the existing Stream? 24 We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve. 25 26 High level, the changes we are targeting are: 27 - Removing superfluous or leaky parameters from the methods' interfaces 28 - Using composition instead of inheritance to add new capabilities 29 30 To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. 31 Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly 32 33 Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design. 34 - Only full refresh is supported. This will be addressed in the future. 35 - The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field. 36 - Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future. 37 - The Stream's behavior cannot depend on a namespace 38 - TypeTransformer is not supported. This will be addressed in the future. 39 - Nested cursor and primary keys are not supported 40 """ 41 42 @abstractmethod 43 def generate_partitions(self) -> Iterable[Partition]: 44 """ 45 Generates the partitions that will be read by this stream. 46 :return: An iterable of partitions. 47 """ 48 49 @property 50 @abstractmethod 51 def name(self) -> str: 52 """ 53 :return: The stream name 54 """ 55 56 @property 57 @abstractmethod 58 def cursor_field(self) -> Optional[str]: 59 """ 60 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 61 :return: The name of the field used as a cursor. Nested cursor fields are not supported. 62 """ 63 64 @abstractmethod 65 def get_json_schema(self) -> Mapping[str, Any]: 66 """ 67 :return: A dict of the JSON schema representing this stream. 68 """ 69 70 @abstractmethod 71 def as_airbyte_stream(self) -> AirbyteStream: 72 """ 73 :return: A dict of the JSON schema representing this stream. 74 """ 75 76 @abstractmethod 77 def log_stream_sync_configuration(self) -> None: 78 """ 79 Logs the stream's configuration for debugging purposes. 80 """ 81 82 @property 83 @abstractmethod 84 def cursor(self) -> Cursor: 85 """ 86 :return: The cursor associated with this stream. 87 """ 88 89 @abstractmethod 90 def check_availability(self) -> StreamAvailability: 91 """ 92 :return: If the stream is available and if not, why 93 """
AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.
Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
High level, the changes we are targeting are:
- Removing superfluous or leaky parameters from the methods' interfaces
- Using composition instead of inheritance to add new capabilities
To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
- Only full refresh is supported. This will be addressed in the future.
- The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
- Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
- The Stream's behavior cannot depend on a namespace
- TypeTransformer is not supported. This will be addressed in the future.
- Nested cursor and primary keys are not supported
42 @abstractmethod 43 def generate_partitions(self) -> Iterable[Partition]: 44 """ 45 Generates the partitions that will be read by this stream. 46 :return: An iterable of partitions. 47 """
Generates the partitions that will be read by this stream.
Returns
An iterable of partitions.
56 @property 57 @abstractmethod 58 def cursor_field(self) -> Optional[str]: 59 """ 60 Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. 61 :return: The name of the field used as a cursor. Nested cursor fields are not supported. 62 """
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. Nested cursor fields are not supported.
64 @abstractmethod 65 def get_json_schema(self) -> Mapping[str, Any]: 66 """ 67 :return: A dict of the JSON schema representing this stream. 68 """
Returns
A dict of the JSON schema representing this stream.
70 @abstractmethod 71 def as_airbyte_stream(self) -> AirbyteStream: 72 """ 73 :return: A dict of the JSON schema representing this stream. 74 """
Returns
A dict of the JSON schema representing this stream.
76 @abstractmethod 77 def log_stream_sync_configuration(self) -> None: 78 """ 79 Logs the stream's configuration for debugging purposes. 80 """
Logs the stream's configuration for debugging purposes.
82 @property 83 @abstractmethod 84 def cursor(self) -> Cursor: 85 """ 86 :return: The cursor associated with this stream. 87 """
Returns
The cursor associated with this stream.
89 @abstractmethod 90 def check_availability(self) -> StreamAvailability: 91 """ 92 :return: If the stream is available and if not, why 93 """
Returns
If the stream is available and if not, why