airbyte_cdk.sources.streams.concurrent.default_stream
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from logging import Logger 6from typing import Any, Callable, Iterable, List, Mapping, Optional, Union 7 8from airbyte_cdk.models import AirbyteStream, SyncMode 9from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( 10 ConcurrentPerPartitionCursor, 11) 12from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter 13from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( 14 StreamSlicerPartitionGenerator, 15) 16from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 17from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability 18from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField 19from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 20from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 21from airbyte_cdk.utils.traced_exception import AirbyteTracedException 22 23 24class DefaultStream(AbstractStream): 25 def __init__( 26 self, 27 partition_generator: PartitionGenerator, 28 name: str, 29 json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], 30 primary_key: List[str], 31 cursor_field: Optional[CursorField], 32 logger: Logger, 33 cursor: Cursor, 34 namespace: Optional[str] = None, 35 supports_file_transfer: bool = False, 36 block_simultaneous_read: str = "", 37 ) -> None: 38 self._stream_partition_generator = partition_generator 39 self._name = name 40 self._json_schema = json_schema 41 self._primary_key = primary_key 42 self._cursor_field = cursor_field 43 self._logger = logger 44 self._cursor = cursor 45 self._namespace = namespace 46 self._supports_file_transfer = supports_file_transfer 47 self._block_simultaneous_read = block_simultaneous_read 48 49 def generate_partitions(self) -> Iterable[Partition]: 50 yield from self._stream_partition_generator.generate() 51 52 @property 53 def name(self) -> str: 54 return self._name 55 56 @property 57 def namespace(self) -> Optional[str]: 58 return self._namespace 59 60 @property 61 def cursor_field(self) -> Optional[str]: 62 return self._cursor_field.cursor_field_key if self._cursor_field else None 63 64 def get_json_schema(self) -> Mapping[str, Any]: 65 return self._json_schema() if callable(self._json_schema) else self._json_schema 66 67 def as_airbyte_stream(self) -> AirbyteStream: 68 stream = AirbyteStream( 69 name=self.name, 70 json_schema=dict(self.get_json_schema()), 71 supported_sync_modes=[SyncMode.full_refresh], 72 is_resumable=False, 73 is_file_based=self._supports_file_transfer, 74 ) 75 76 if self._namespace: 77 stream.namespace = self._namespace 78 79 if self._cursor_field: 80 stream.source_defined_cursor = ( 81 not self._cursor_field.supports_catalog_defined_cursor_field 82 ) 83 stream.is_resumable = True 84 stream.supported_sync_modes.append(SyncMode.incremental) 85 stream.default_cursor_field = [self._cursor_field.cursor_field_key] 86 87 keys = self._primary_key 88 if keys and len(keys) > 0: 89 stream.source_defined_primary_key = [[key] for key in keys] 90 91 return stream 92 93 def log_stream_sync_configuration(self) -> None: 94 self._logger.debug( 95 f"Syncing stream instance: {self.name}", 96 extra={ 97 "primary_key": self._primary_key, 98 "cursor_field": self.cursor_field, 99 }, 100 ) 101 102 @property 103 def cursor(self) -> Cursor: 104 return self._cursor 105 106 @property 107 def block_simultaneous_read(self) -> str: 108 """Returns the blocking group name for this stream, or empty string if no blocking""" 109 return self._block_simultaneous_read 110 111 @block_simultaneous_read.setter 112 def block_simultaneous_read(self, value: str) -> None: 113 self._block_simultaneous_read = value 114 115 def get_partition_router(self) -> PartitionRouter | None: 116 """Return the partition router for this stream, or None if not available.""" 117 if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator): 118 return None 119 stream_slicer = self._stream_partition_generator._stream_slicer 120 if not isinstance(stream_slicer, ConcurrentPerPartitionCursor): 121 return None 122 return stream_slicer._partition_router 123 124 def check_availability(self) -> StreamAvailability: 125 """ 126 Check stream availability by attempting to read the first record of the stream. 127 """ 128 try: 129 partition = next(iter(self.generate_partitions())) 130 except StopIteration: 131 # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is: 132 # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) 133 # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>` 134 # without accounting for the case in which the parent stream is empty. 135 return StreamAvailability.unavailable( 136 f"Cannot attempt to connect to stream {self.name} - no stream slices were found" 137 ) 138 except AirbyteTracedException as error: 139 return StreamAvailability.unavailable( 140 error.message or error.internal_message or "<no error message>" 141 ) 142 143 try: 144 next(iter(partition.read())) 145 return StreamAvailability.available() 146 except StopIteration: 147 self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") 148 return StreamAvailability.available() 149 except AirbyteTracedException as error: 150 return StreamAvailability.unavailable( 151 error.message or error.internal_message or "<no error message>" 152 )
25class DefaultStream(AbstractStream): 26 def __init__( 27 self, 28 partition_generator: PartitionGenerator, 29 name: str, 30 json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], 31 primary_key: List[str], 32 cursor_field: Optional[CursorField], 33 logger: Logger, 34 cursor: Cursor, 35 namespace: Optional[str] = None, 36 supports_file_transfer: bool = False, 37 block_simultaneous_read: str = "", 38 ) -> None: 39 self._stream_partition_generator = partition_generator 40 self._name = name 41 self._json_schema = json_schema 42 self._primary_key = primary_key 43 self._cursor_field = cursor_field 44 self._logger = logger 45 self._cursor = cursor 46 self._namespace = namespace 47 self._supports_file_transfer = supports_file_transfer 48 self._block_simultaneous_read = block_simultaneous_read 49 50 def generate_partitions(self) -> Iterable[Partition]: 51 yield from self._stream_partition_generator.generate() 52 53 @property 54 def name(self) -> str: 55 return self._name 56 57 @property 58 def namespace(self) -> Optional[str]: 59 return self._namespace 60 61 @property 62 def cursor_field(self) -> Optional[str]: 63 return self._cursor_field.cursor_field_key if self._cursor_field else None 64 65 def get_json_schema(self) -> Mapping[str, Any]: 66 return self._json_schema() if callable(self._json_schema) else self._json_schema 67 68 def as_airbyte_stream(self) -> AirbyteStream: 69 stream = AirbyteStream( 70 name=self.name, 71 json_schema=dict(self.get_json_schema()), 72 supported_sync_modes=[SyncMode.full_refresh], 73 is_resumable=False, 74 is_file_based=self._supports_file_transfer, 75 ) 76 77 if self._namespace: 78 stream.namespace = self._namespace 79 80 if self._cursor_field: 81 stream.source_defined_cursor = ( 82 not self._cursor_field.supports_catalog_defined_cursor_field 83 ) 84 stream.is_resumable = True 85 stream.supported_sync_modes.append(SyncMode.incremental) 86 stream.default_cursor_field = [self._cursor_field.cursor_field_key] 87 88 keys = self._primary_key 89 if keys and len(keys) > 0: 90 stream.source_defined_primary_key = [[key] for key in keys] 91 92 return stream 93 94 def log_stream_sync_configuration(self) -> None: 95 self._logger.debug( 96 f"Syncing stream instance: {self.name}", 97 extra={ 98 "primary_key": self._primary_key, 99 "cursor_field": self.cursor_field, 100 }, 101 ) 102 103 @property 104 def cursor(self) -> Cursor: 105 return self._cursor 106 107 @property 108 def block_simultaneous_read(self) -> str: 109 """Returns the blocking group name for this stream, or empty string if no blocking""" 110 return self._block_simultaneous_read 111 112 @block_simultaneous_read.setter 113 def block_simultaneous_read(self, value: str) -> None: 114 self._block_simultaneous_read = value 115 116 def get_partition_router(self) -> PartitionRouter | None: 117 """Return the partition router for this stream, or None if not available.""" 118 if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator): 119 return None 120 stream_slicer = self._stream_partition_generator._stream_slicer 121 if not isinstance(stream_slicer, ConcurrentPerPartitionCursor): 122 return None 123 return stream_slicer._partition_router 124 125 def check_availability(self) -> StreamAvailability: 126 """ 127 Check stream availability by attempting to read the first record of the stream. 128 """ 129 try: 130 partition = next(iter(self.generate_partitions())) 131 except StopIteration: 132 # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is: 133 # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) 134 # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>` 135 # without accounting for the case in which the parent stream is empty. 136 return StreamAvailability.unavailable( 137 f"Cannot attempt to connect to stream {self.name} - no stream slices were found" 138 ) 139 except AirbyteTracedException as error: 140 return StreamAvailability.unavailable( 141 error.message or error.internal_message or "<no error message>" 142 ) 143 144 try: 145 next(iter(partition.read())) 146 return StreamAvailability.available() 147 except StopIteration: 148 self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") 149 return StreamAvailability.available() 150 except AirbyteTracedException as error: 151 return StreamAvailability.unavailable( 152 error.message or error.internal_message or "<no error message>" 153 )
AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.
Why create a new interface instead of adding concurrency capabilities the existing Stream? We learnt a lot since the initial design of the Stream interface, and we wanted to take the opportunity to improve.
High level, the changes we are targeting are:
- Removing superfluous or leaky parameters from the methods' interfaces
- Using composition instead of inheritance to add new capabilities
To allow us to iterate fast while ensuring backwards compatibility, we are creating a new interface with a facade object that will bridge the old and the new interfaces. Source connectors that wish to leverage concurrency need to implement this new interface. An example will be available shortly
Current restrictions on sources that implement this interface. Not all of these restrictions will be lifted in the future, but most will as we iterate on the design.
- Only full refresh is supported. This will be addressed in the future.
- The read method does not accept a cursor_field. Streams must be internally aware of the cursor field to use. User-defined cursor fields can be implemented by modifying the connector's main method to instantiate the streams with the configured cursor field.
- Streams cannot return user-friendly messages by overriding Stream.get_error_display_message. This will be addressed in the future.
- The Stream's behavior cannot depend on a namespace
- TypeTransformer is not supported. This will be addressed in the future.
- Nested cursor and primary keys are not supported
26 def __init__( 27 self, 28 partition_generator: PartitionGenerator, 29 name: str, 30 json_schema: Union[Mapping[str, Any], Callable[[], Mapping[str, Any]]], 31 primary_key: List[str], 32 cursor_field: Optional[CursorField], 33 logger: Logger, 34 cursor: Cursor, 35 namespace: Optional[str] = None, 36 supports_file_transfer: bool = False, 37 block_simultaneous_read: str = "", 38 ) -> None: 39 self._stream_partition_generator = partition_generator 40 self._name = name 41 self._json_schema = json_schema 42 self._primary_key = primary_key 43 self._cursor_field = cursor_field 44 self._logger = logger 45 self._cursor = cursor 46 self._namespace = namespace 47 self._supports_file_transfer = supports_file_transfer 48 self._block_simultaneous_read = block_simultaneous_read
50 def generate_partitions(self) -> Iterable[Partition]: 51 yield from self._stream_partition_generator.generate()
Generates the partitions that will be read by this stream.
Returns
An iterable of partitions.
61 @property 62 def cursor_field(self) -> Optional[str]: 63 return self._cursor_field.cursor_field_key if self._cursor_field else None
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. Nested cursor fields are not supported.
65 def get_json_schema(self) -> Mapping[str, Any]: 66 return self._json_schema() if callable(self._json_schema) else self._json_schema
Returns
A dict of the JSON schema representing this stream.
68 def as_airbyte_stream(self) -> AirbyteStream: 69 stream = AirbyteStream( 70 name=self.name, 71 json_schema=dict(self.get_json_schema()), 72 supported_sync_modes=[SyncMode.full_refresh], 73 is_resumable=False, 74 is_file_based=self._supports_file_transfer, 75 ) 76 77 if self._namespace: 78 stream.namespace = self._namespace 79 80 if self._cursor_field: 81 stream.source_defined_cursor = ( 82 not self._cursor_field.supports_catalog_defined_cursor_field 83 ) 84 stream.is_resumable = True 85 stream.supported_sync_modes.append(SyncMode.incremental) 86 stream.default_cursor_field = [self._cursor_field.cursor_field_key] 87 88 keys = self._primary_key 89 if keys and len(keys) > 0: 90 stream.source_defined_primary_key = [[key] for key in keys] 91 92 return stream
Returns
A dict of the JSON schema representing this stream.
94 def log_stream_sync_configuration(self) -> None: 95 self._logger.debug( 96 f"Syncing stream instance: {self.name}", 97 extra={ 98 "primary_key": self._primary_key, 99 "cursor_field": self.cursor_field, 100 }, 101 )
Logs the stream's configuration for debugging purposes.
107 @property 108 def block_simultaneous_read(self) -> str: 109 """Returns the blocking group name for this stream, or empty string if no blocking""" 110 return self._block_simultaneous_read
Returns the blocking group name for this stream, or empty string if no blocking
116 def get_partition_router(self) -> PartitionRouter | None: 117 """Return the partition router for this stream, or None if not available.""" 118 if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator): 119 return None 120 stream_slicer = self._stream_partition_generator._stream_slicer 121 if not isinstance(stream_slicer, ConcurrentPerPartitionCursor): 122 return None 123 return stream_slicer._partition_router
Return the partition router for this stream, or None if not available.
125 def check_availability(self) -> StreamAvailability: 126 """ 127 Check stream availability by attempting to read the first record of the stream. 128 """ 129 try: 130 partition = next(iter(self.generate_partitions())) 131 except StopIteration: 132 # NOTE: The following comment was copied from legacy stuff and I don't know how relevant it is: 133 # If stream_slices has no `next()` item (Note - this is different from stream_slices returning [None]!) 134 # This can happen when a substream's `stream_slices` method does a `for record in parent_records: yield <something>` 135 # without accounting for the case in which the parent stream is empty. 136 return StreamAvailability.unavailable( 137 f"Cannot attempt to connect to stream {self.name} - no stream slices were found" 138 ) 139 except AirbyteTracedException as error: 140 return StreamAvailability.unavailable( 141 error.message or error.internal_message or "<no error message>" 142 ) 143 144 try: 145 next(iter(partition.read())) 146 return StreamAvailability.available() 147 except StopIteration: 148 self._logger.info(f"Successfully connected to stream {self.name}, but got 0 records.") 149 return StreamAvailability.available() 150 except AirbyteTracedException as error: 151 return StreamAvailability.unavailable( 152 error.message or error.internal_message or "<no error message>" 153 )
Check stream availability by attempting to read the first record of the stream.