airbyte_cdk.sources.streams.concurrent.default_stream
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from logging import Logger 6from typing import Any, Callable, Iterable, List, Mapping, Optional, Union 7 8from airbyte_cdk.models import AirbyteStream, SyncMode 9from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 10from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability 11from airbyte_cdk.sources.streams.concurrent.cursor import Cursor 12from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 13from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 14from airbyte_cdk.utils.traced_exception import AirbyteTracedException 15 16 17class DefaultStream(AbstractStream): 18 def __init__( 19 self, 20 partition_generator: PartitionGenerator, 21 name: str, 22 json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], 23 primary_key: List[str], 24 cursor_field: Optional[str], 25 logger: Logger, 26 cursor: Cursor, 27 namespace: Optional[str] = None, 28 supports_file_transfer: bool = False, 29 ) -> None: 30 self._stream_partition_generator = partition_generator 31 self._name = name 32 self._json_schema = json_schema 33 self._primary_key = primary_key 34 self._cursor_field = cursor_field 35 self._logger = logger 36 self._cursor = cursor 37 self._namespace = namespace 38 self._supports_file_transfer = supports_file_transfer 39 40 def generate_partitions(self) -> Iterable[Partition]: 41 yield from self._stream_partition_generator.generate() 42 43 @property 44 def name(self) -> str: 45 return self._name 46 47 @property 48 def namespace(self) -> Optional[str]: 49 return self._namespace 50 51 @property 52 def cursor_field(self) -> Optional[str]: 53 return self._cursor_field 54 55 def get_json_schema(self) -> Mapping[str, Any]: 56 return self._json_schema() if callable(self._json_schema) else self._json_schema 57 58 def as_airbyte_stream(self) -> AirbyteStream: 59 stream = AirbyteStream( 60 name=self.name, 61 json_schema=dict(self.get_json_schema()), 62 supported_sync_modes=[SyncMode.full_refresh], 63 is_resumable=False, 64 is_file_based=self._supports_file_transfer, 65 ) 66 67 if self._namespace: 68 stream.namespace = self._namespace 69 70 if self._cursor_field: 71 stream.source_defined_cursor = True 72 stream.is_resumable = True 73 stream.supported_sync_modes.append(SyncMode.incremental) 74 stream.default_cursor_field = [self._cursor_field] 75 76 keys = self._primary_key 77 if keys and len(keys) > 0: 78 stream.source_defined_primary_key = [[key] for key in keys] 79 80 return stream 81 82 def log_stream_sync_configuration(self) -> None: 83 self._logger.debug( 84 f"Syncing stream instance: {self.name}", 85 extra={ 86 "primary_key": self._primary_key, 87 "cursor_field": self.cursor_field, 88 }, 89 ) 90 91 @property 92 def cursor(self) -> Cursor: 93 return self._cursor 94 95 def check_availability(self) -> StreamAvailability: 96 """ 97 Check stream availability by attempting to read the first record of the stream. 98 """ 99 try: 100 partition = next(iter(self.generate_partitions())) 101 except StopIteration: 102 # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is: 103 # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) 104 # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>` 105 # without accounting for the case in which the parent stream is empty. 106 return StreamAvailability.unavailable( 107 f"Cannot attempt to connect to stream {self.name} - no stream slices were found" 108 ) 109 except AirbyteTracedException as error: 110 return StreamAvailability.unavailable( 111 error.message or error.internal_message or "<no error message>" 112 ) 113 114 try: 115 next(iter(partition.read())) 116 return StreamAvailability.available() 117 except StopIteration: 118 self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") 119 return StreamAvailability.available() 120 except AirbyteTracedException as error: 121 return StreamAvailability.unavailable( 122 error.message or error.internal_message or "<no error message>" 123 )
18class DefaultStream(AbstractStream): 19 def __init__( 20 self, 21 partition_generator: PartitionGenerator, 22 name: str, 23 json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], 24 primary_key: List[str], 25 cursor_field: Optional[str], 26 logger: Logger, 27 cursor: Cursor, 28 namespace: Optional[str] = None, 29 supports_file_transfer: bool = False, 30 ) -> None: 31 self._stream_partition_generator = partition_generator 32 self._name = name 33 self._json_schema = json_schema 34 self._primary_key = primary_key 35 self._cursor_field = cursor_field 36 self._logger = logger 37 self._cursor = cursor 38 self._namespace = namespace 39 self._supports_file_transfer = supports_file_transfer 40 41 def generate_partitions(self) -> Iterable[Partition]: 42 yield from self._stream_partition_generator.generate() 43 44 @property 45 def name(self) -> str: 46 return self._name 47 48 @property 49 def namespace(self) -> Optional[str]: 50 return self._namespace 51 52 @property 53 def cursor_field(self) -> Optional[str]: 54 return self._cursor_field 55 56 def get_json_schema(self) -> Mapping[str, Any]: 57 return self._json_schema() if callable(self._json_schema) else self._json_schema 58 59 def as_airbyte_stream(self) -> AirbyteStream: 60 stream = AirbyteStream( 61 name=self.name, 62 json_schema=dict(self.get_json_schema()), 63 supported_sync_modes=[SyncMode.full_refresh], 64 is_resumable=False, 65 is_file_based=self._supports_file_transfer, 66 ) 67 68 if self._namespace: 69 stream.namespace = self._namespace 70 71 if self._cursor_field: 72 stream.source_defined_cursor = True 73 stream.is_resumable = True 74 stream.supported_sync_modes.append(SyncMode.incremental) 75 stream.default_cursor_field = [self._cursor_field] 76 77 keys = self._primary_key 78 if keys and len(keys) > 0: 79 stream.source_defined_primary_key = [[key] for key in keys] 80 81 return stream 82 83 def log_stream_sync_configuration(self) -> None: 84 self._logger.debug( 85 f"Syncing stream instance: {self.name}", 86 extra={ 87 "primary_key": self._primary_key, 88 "cursor_field": self.cursor_field, 89 }, 90 ) 91 92 @property 93 def cursor(self) -> Cursor: 94 return self._cursor 95 96 def check_availability(self) -> StreamAvailability: 97 """ 98 Check stream availability by attempting to read the first record of the stream. 99 """ 100 try: 101 partition = next(iter(self.generate_partitions())) 102 except StopIteration: 103 # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is: 104 # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) 105 # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>` 106 # without accounting for the case in which the parent stream is empty. 107 return StreamAvailability.unavailable( 108 f"Cannot attempt to connect to stream {self.name} - no stream slices were found" 109 ) 110 except AirbyteTracedException as error: 111 return StreamAvailability.unavailable( 112 error.message or error.internal_message or "<no error message>" 113 ) 114 115 try: 116 next(iter(partition.read())) 117 return StreamAvailability.available() 118 except StopIteration: 119 self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") 120 return StreamAvailability.available() 121 except AirbyteTracedException as error: 122 return StreamAvailability.unavailable( 123 error.message or error.internal_message or "<no error message>" 124 )
AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.
Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
High level, the changes we are targeting are:
- Removing superfluous or leaky parameters from the methods' interfaces
- Using composition instead of inheritance to add new capabilities
To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
- Only full refresh is supported. This will be addressed in the future.
- The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
- Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
- The Stream's behavior cannot depend on a namespace
- TypeTransformer is not supported. This will be addressed in the future.
- Nested cursor and primary keys are not supported
19 def __init__( 20 self, 21 partition_generator: PartitionGenerator, 22 name: str, 23 json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], 24 primary_key: List[str], 25 cursor_field: Optional[str], 26 logger: Logger, 27 cursor: Cursor, 28 namespace: Optional[str] = None, 29 supports_file_transfer: bool = False, 30 ) -> None: 31 self._stream_partition_generator = partition_generator 32 self._name = name 33 self._json_schema = json_schema 34 self._primary_key = primary_key 35 self._cursor_field = cursor_field 36 self._logger = logger 37 self._cursor = cursor 38 self._namespace = namespace 39 self._supports_file_transfer = supports_file_transfer
41 def generate_partitions(self) -> Iterable[Partition]: 42 yield from self._stream_partition_generator.generate()
Generates the partitions that will be read by this stream.
Returns
An iterable of partitions.
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. Nested cursor fields are not supported.
56 def get_json_schema(self) -> Mapping[str, Any]: 57 return self._json_schema() if callable(self._json_schema) else self._json_schema
Returns
A dict of the JSON schema representing this stream.
59 def as_airbyte_stream(self) -> AirbyteStream: 60 stream = AirbyteStream( 61 name=self.name, 62 json_schema=dict(self.get_json_schema()), 63 supported_sync_modes=[SyncMode.full_refresh], 64 is_resumable=False, 65 is_file_based=self._supports_file_transfer, 66 ) 67 68 if self._namespace: 69 stream.namespace = self._namespace 70 71 if self._cursor_field: 72 stream.source_defined_cursor = True 73 stream.is_resumable = True 74 stream.supported_sync_modes.append(SyncMode.incremental) 75 stream.default_cursor_field = [self._cursor_field] 76 77 keys = self._primary_key 78 if keys and len(keys) > 0: 79 stream.source_defined_primary_key = [[key] for key in keys] 80 81 return stream
Returns
A dict of the JSON schema representing this stream.
83 def log_stream_sync_configuration(self) -> None: 84 self._logger.debug( 85 f"Syncing stream instance: {self.name}", 86 extra={ 87 "primary_key": self._primary_key, 88 "cursor_field": self.cursor_field, 89 }, 90 )
Logs the stream's configuration for debugging purposes.
96 def check_availability(self) -> StreamAvailability: 97 """ 98 Check stream availability by attempting to read the first record of the stream. 99 """ 100 try: 101 partition = next(iter(self.generate_partitions())) 102 except StopIteration: 103 # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is: 104 # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) 105 # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>` 106 # without accounting for the case in which the parent stream is empty. 107 return StreamAvailability.unavailable( 108 f"Cannot attempt to connect to stream {self.name} - no stream slices were found" 109 ) 110 except AirbyteTracedException as error: 111 return StreamAvailability.unavailable( 112 error.message or error.internal_message or "<no error message>" 113 ) 114 115 try: 116 next(iter(partition.read())) 117 return StreamAvailability.available() 118 except StopIteration: 119 self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") 120 return StreamAvailability.available() 121 except AirbyteTracedException as error: 122 return StreamAvailability.unavailable( 123 error.message or error.internal_message or "<no error message>" 124 )
Check stream availability by attempting to read the first record of the stream.