airbyte_cdk.sources.concurrent_source.concurrent_source_adapter
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from abc import ABC 7from datetime import timedelta 8from typing import Any, Callable, Iterator, List, Mapping, MutableMapping, Optional, Tuple 9 10from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog 11from airbyte_cdk.sources import AbstractSource 12from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource 13from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 14from airbyte_cdk.sources.streams import Stream 15from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream 16from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 17from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade 18from airbyte_cdk.sources.streams.concurrent.cursor import ( 19 ConcurrentCursor, 20 Cursor, 21 CursorField, 22 CursorValueType, 23 FinalStateCursor, 24 GapType, 25) 26from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( 27 AbstractStreamStateConverter, 28) 29 30DEFAULT_LOOKBACK_SECONDS = 0 31 32 33class ConcurrentSourceAdapter(AbstractSource, ABC): 34 def __init__(self, concurrent_source: ConcurrentSource, **kwargs: Any) -> None: 35 """ 36 ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source. 37 38 The source's streams are still defined through the streams() method. 39 Streams wrapped in a StreamFacade will be processed concurrently. 40 Other streams will be processed sequentially as a later step. 41 """ 42 self._concurrent_source = concurrent_source 43 super().__init__(**kwargs) 44 45 def read( 46 self, 47 logger: logging.Logger, 48 config: Mapping[str, Any], 49 catalog: ConfiguredAirbyteCatalog, 50 state: Optional[List[AirbyteStateMessage]] = None, 51 ) -> Iterator[AirbyteMessage]: 52 abstract_streams = self._select_abstract_streams(config, catalog) 53 concurrent_stream_names = {stream.name for stream in abstract_streams} 54 configured_catalog_for_regular_streams = ConfiguredAirbyteCatalog( 55 streams=[ 56 stream 57 for stream in catalog.streams 58 if stream.stream.name not in concurrent_stream_names 59 ] 60 ) 61 if abstract_streams: 62 yield from self._concurrent_source.read(abstract_streams) 63 if configured_catalog_for_regular_streams.streams: 64 yield from super().read(logger, config, configured_catalog_for_regular_streams, state) 65 66 def _select_abstract_streams( 67 self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog 68 ) -> List[AbstractStream]: 69 """ 70 Selects streams that can be processed concurrently and returns their abstract representations. 71 """ 72 all_streams = self.streams(config) 73 stream_name_to_instance: Mapping[str, Stream] = {s.name: s for s in all_streams} 74 abstract_streams: List[AbstractStream] = [] 75 for configured_stream in configured_catalog.streams: 76 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 77 if not stream_instance: 78 continue 79 80 if isinstance(stream_instance, AbstractStreamFacade): 81 abstract_streams.append(stream_instance.get_underlying_stream()) 82 return abstract_streams 83 84 def convert_to_concurrent_stream( 85 self, 86 logger: logging.Logger, 87 stream: Stream, 88 state_manager: ConnectorStateManager, 89 cursor: Optional[Cursor] = None, 90 ) -> Stream: 91 """ 92 Prepares a stream for concurrent processing by initializing or assigning a cursor, 93 managing the stream's state, and returning an updated Stream instance. 94 """ 95 state: MutableMapping[str, Any] = {} 96 97 if cursor: 98 state = state_manager.get_stream_state(stream.name, stream.namespace) 99 100 stream.cursor = cursor # type: ignore[assignment] # cursor is of type ConcurrentCursor, which inherits from Cursor 101 if hasattr(stream, "parent"): 102 stream.parent.cursor = cursor 103 else: 104 cursor = FinalStateCursor( 105 stream_name=stream.name, 106 stream_namespace=stream.namespace, 107 message_repository=self.message_repository, # type: ignore[arg-type] # _default_message_repository will be returned in the worst case 108 ) 109 return StreamFacade.create_from_stream(stream, self, logger, state, cursor) 110 111 def initialize_cursor( 112 self, 113 stream: Stream, 114 state_manager: ConnectorStateManager, 115 converter: AbstractStreamStateConverter, 116 slice_boundary_fields: Optional[Tuple[str, str]], 117 start: Optional[CursorValueType], 118 end_provider: Callable[[], CursorValueType], 119 lookback_window: Optional[GapType] = None, 120 slice_range: Optional[GapType] = None, 121 ) -> Optional[ConcurrentCursor]: 122 lookback_window = lookback_window or timedelta(seconds=DEFAULT_LOOKBACK_SECONDS) 123 124 cursor_field_name = stream.cursor_field 125 126 if cursor_field_name: 127 if not isinstance(cursor_field_name, str): 128 raise ValueError( 129 f"Cursor field type must be a string, but received {type(cursor_field_name).__name__}." 130 ) 131 132 return ConcurrentCursor( 133 stream.name, 134 stream.namespace, 135 state_manager.get_stream_state(stream.name, stream.namespace), 136 self.message_repository, # type: ignore[arg-type] # _default_message_repository will be returned in the worst case 137 state_manager, 138 converter, 139 CursorField(cursor_field_name), 140 slice_boundary_fields, 141 start, 142 end_provider, 143 lookback_window, 144 slice_range, 145 ) 146 147 return None
DEFAULT_LOOKBACK_SECONDS =
0
class
ConcurrentSourceAdapter(airbyte_cdk.connector.DefaultConnectorMixin, airbyte_cdk.sources.source.BaseSource[typing.Mapping[str, typing.Any], typing.List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], abc.ABC):
34class ConcurrentSourceAdapter(AbstractSource, ABC): 35 def __init__(self, concurrent_source: ConcurrentSource, **kwargs: Any) -> None: 36 """ 37 ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source. 38 39 The source's streams are still defined through the streams() method. 40 Streams wrapped in a StreamFacade will be processed concurrently. 41 Other streams will be processed sequentially as a later step. 42 """ 43 self._concurrent_source = concurrent_source 44 super().__init__(**kwargs) 45 46 def read( 47 self, 48 logger: logging.Logger, 49 config: Mapping[str, Any], 50 catalog: ConfiguredAirbyteCatalog, 51 state: Optional[List[AirbyteStateMessage]] = None, 52 ) -> Iterator[AirbyteMessage]: 53 abstract_streams = self._select_abstract_streams(config, catalog) 54 concurrent_stream_names = {stream.name for stream in abstract_streams} 55 configured_catalog_for_regular_streams = ConfiguredAirbyteCatalog( 56 streams=[ 57 stream 58 for stream in catalog.streams 59 if stream.stream.name not in concurrent_stream_names 60 ] 61 ) 62 if abstract_streams: 63 yield from self._concurrent_source.read(abstract_streams) 64 if configured_catalog_for_regular_streams.streams: 65 yield from super().read(logger, config, configured_catalog_for_regular_streams, state) 66 67 def _select_abstract_streams( 68 self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog 69 ) -> List[AbstractStream]: 70 """ 71 Selects streams that can be processed concurrently and returns their abstract representations. 72 """ 73 all_streams = self.streams(config) 74 stream_name_to_instance: Mapping[str, Stream] = {s.name: s for s in all_streams} 75 abstract_streams: List[AbstractStream] = [] 76 for configured_stream in configured_catalog.streams: 77 stream_instance = stream_name_to_instance.get(configured_stream.stream.name) 78 if not stream_instance: 79 continue 80 81 if isinstance(stream_instance, AbstractStreamFacade): 82 abstract_streams.append(stream_instance.get_underlying_stream()) 83 return abstract_streams 84 85 def convert_to_concurrent_stream( 86 self, 87 logger: logging.Logger, 88 stream: Stream, 89 state_manager: ConnectorStateManager, 90 cursor: Optional[Cursor] = None, 91 ) -> Stream: 92 """ 93 Prepares a stream for concurrent processing by initializing or assigning a cursor, 94 managing the stream's state, and returning an updated Stream instance. 95 """ 96 state: MutableMapping[str, Any] = {} 97 98 if cursor: 99 state = state_manager.get_stream_state(stream.name, stream.namespace) 100 101 stream.cursor = cursor # type: ignore[assignment] # cursor is of type ConcurrentCursor, which inherits from Cursor 102 if hasattr(stream, "parent"): 103 stream.parent.cursor = cursor 104 else: 105 cursor = FinalStateCursor( 106 stream_name=stream.name, 107 stream_namespace=stream.namespace, 108 message_repository=self.message_repository, # type: ignore[arg-type] # _default_message_repository will be returned in the worst case 109 ) 110 return StreamFacade.create_from_stream(stream, self, logger, state, cursor) 111 112 def initialize_cursor( 113 self, 114 stream: Stream, 115 state_manager: ConnectorStateManager, 116 converter: AbstractStreamStateConverter, 117 slice_boundary_fields: Optional[Tuple[str, str]], 118 start: Optional[CursorValueType], 119 end_provider: Callable[[], CursorValueType], 120 lookback_window: Optional[GapType] = None, 121 slice_range: Optional[GapType] = None, 122 ) -> Optional[ConcurrentCursor]: 123 lookback_window = lookback_window or timedelta(seconds=DEFAULT_LOOKBACK_SECONDS) 124 125 cursor_field_name = stream.cursor_field 126 127 if cursor_field_name: 128 if not isinstance(cursor_field_name, str): 129 raise ValueError( 130 f"Cursor field type must be a string, but received {type(cursor_field_name).__name__}." 131 ) 132 133 return ConcurrentCursor( 134 stream.name, 135 stream.namespace, 136 state_manager.get_stream_state(stream.name, stream.namespace), 137 self.message_repository, # type: ignore[arg-type] # _default_message_repository will be returned in the worst case 138 state_manager, 139 converter, 140 CursorField(cursor_field_name), 141 slice_boundary_fields, 142 start, 143 end_provider, 144 lookback_window, 145 slice_range, 146 ) 147 148 return None
Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.
ConcurrentSourceAdapter( concurrent_source: airbyte_cdk.ConcurrentSource, **kwargs: Any)
35 def __init__(self, concurrent_source: ConcurrentSource, **kwargs: Any) -> None: 36 """ 37 ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source. 38 39 The source's streams are still defined through the streams() method. 40 Streams wrapped in a StreamFacade will be processed concurrently. 41 Other streams will be processed sequentially as a later step. 42 """ 43 self._concurrent_source = concurrent_source 44 super().__init__(**kwargs)
ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.
The source's streams are still defined through the streams() method. Streams wrapped in a StreamFacade will be processed concurrently. Other streams will be processed sequentially as a later step.
def
read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
46 def read( 47 self, 48 logger: logging.Logger, 49 config: Mapping[str, Any], 50 catalog: ConfiguredAirbyteCatalog, 51 state: Optional[List[AirbyteStateMessage]] = None, 52 ) -> Iterator[AirbyteMessage]: 53 abstract_streams = self._select_abstract_streams(config, catalog) 54 concurrent_stream_names = {stream.name for stream in abstract_streams} 55 configured_catalog_for_regular_streams = ConfiguredAirbyteCatalog( 56 streams=[ 57 stream 58 for stream in catalog.streams 59 if stream.stream.name not in concurrent_stream_names 60 ] 61 ) 62 if abstract_streams: 63 yield from self._concurrent_source.read(abstract_streams) 64 if configured_catalog_for_regular_streams.streams: 65 yield from super().read(logger, config, configured_catalog_for_regular_streams, state)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.
def
convert_to_concurrent_stream( self, logger: logging.Logger, stream: airbyte_cdk.Stream, state_manager: airbyte_cdk.ConnectorStateManager, cursor: Optional[airbyte_cdk.Cursor] = None) -> airbyte_cdk.Stream:
85 def convert_to_concurrent_stream( 86 self, 87 logger: logging.Logger, 88 stream: Stream, 89 state_manager: ConnectorStateManager, 90 cursor: Optional[Cursor] = None, 91 ) -> Stream: 92 """ 93 Prepares a stream for concurrent processing by initializing or assigning a cursor, 94 managing the stream's state, and returning an updated Stream instance. 95 """ 96 state: MutableMapping[str, Any] = {} 97 98 if cursor: 99 state = state_manager.get_stream_state(stream.name, stream.namespace) 100 101 stream.cursor = cursor # type: ignore[assignment] # cursor is of type ConcurrentCursor, which inherits from Cursor 102 if hasattr(stream, "parent"): 103 stream.parent.cursor = cursor 104 else: 105 cursor = FinalStateCursor( 106 stream_name=stream.name, 107 stream_namespace=stream.namespace, 108 message_repository=self.message_repository, # type: ignore[arg-type] # _default_message_repository will be returned in the worst case 109 ) 110 return StreamFacade.create_from_stream(stream, self, logger, state, cursor)
Prepares a stream for concurrent processing by initializing or assigning a cursor, managing the stream's state, and returning an updated Stream instance.
def
initialize_cursor( self, stream: airbyte_cdk.Stream, state_manager: airbyte_cdk.ConnectorStateManager, converter: airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter, slice_boundary_fields: Optional[Tuple[str, str]], start: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], end_provider: Callable[[], airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], lookback_window: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None, slice_range: Optional[airbyte_cdk.sources.streams.concurrent.cursor_types.GapType] = None) -> Optional[airbyte_cdk.ConcurrentCursor]:
112 def initialize_cursor( 113 self, 114 stream: Stream, 115 state_manager: ConnectorStateManager, 116 converter: AbstractStreamStateConverter, 117 slice_boundary_fields: Optional[Tuple[str, str]], 118 start: Optional[CursorValueType], 119 end_provider: Callable[[], CursorValueType], 120 lookback_window: Optional[GapType] = None, 121 slice_range: Optional[GapType] = None, 122 ) -> Optional[ConcurrentCursor]: 123 lookback_window = lookback_window or timedelta(seconds=DEFAULT_LOOKBACK_SECONDS) 124 125 cursor_field_name = stream.cursor_field 126 127 if cursor_field_name: 128 if not isinstance(cursor_field_name, str): 129 raise ValueError( 130 f"Cursor field type must be a string, but received {type(cursor_field_name).__name__}." 131 ) 132 133 return ConcurrentCursor( 134 stream.name, 135 stream.namespace, 136 state_manager.get_stream_state(stream.name, stream.namespace), 137 self.message_repository, # type: ignore[arg-type] # _default_message_repository will be returned in the worst case 138 state_manager, 139 converter, 140 CursorField(cursor_field_name), 141 slice_boundary_fields, 142 start, 143 end_provider, 144 lookback_window, 145 slice_range, 146 ) 147 148 return None