airbyte_cdk.sources.streams.concurrent.adapters
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import json 7import logging 8from functools import lru_cache 9from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union 10 11from typing_extensions import deprecated 12 13from airbyte_cdk.models import ( 14 AirbyteLogMessage, 15 AirbyteMessage, 16 AirbyteStream, 17 ConfiguredAirbyteStream, 18 Level, 19 SyncMode, 20 Type, 21) 22from airbyte_cdk.sources import AbstractSource, Source 23from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 24from airbyte_cdk.sources.message import MessageRepository 25from airbyte_cdk.sources.source import ExperimentalClassWarning 26from airbyte_cdk.sources.streams import Stream 27from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 28from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField, FinalStateCursor 29from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 30from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage 31from airbyte_cdk.sources.streams.concurrent.helpers import ( 32 get_cursor_field_from_stream, 33 get_primary_key_from_stream, 34) 35from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 36from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 37from airbyte_cdk.sources.streams.core import StreamData 38from airbyte_cdk.sources.types import Record 39from airbyte_cdk.sources.utils.schema_helpers import InternalConfig 40from airbyte_cdk.sources.utils.slice_logger import SliceLogger 41from airbyte_cdk.utils.slice_hasher import SliceHasher 42 43""" 44This module contains adapters to help enabling concurrency on Stream objects without needing to migrate to AbstractStream 45""" 46 47 48@deprecated( 49 "This class is experimental. Use at your own risk.", 50 category=ExperimentalClassWarning, 51) 52class StreamFacade(AbstractStreamFacade[DefaultStream], Stream): 53 """ 54 The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream. 55 56 All methods either delegate to the wrapped AbstractStream or provide a default implementation. 57 The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported. 58 """ 59 60 @classmethod 61 def create_from_stream( 62 cls, 63 stream: Stream, 64 source: AbstractSource, 65 logger: logging.Logger, 66 state: Optional[MutableMapping[str, Any]], 67 cursor: Cursor, 68 ) -> Stream: 69 """ 70 Create a ConcurrentStream from a Stream object. 71 :param source: The source 72 :param stream: The stream 73 :param max_workers: The maximum number of worker thread to use 74 :return: 75 """ 76 pk = get_primary_key_from_stream(stream.primary_key) 77 cursor_field = get_cursor_field_from_stream(stream) 78 79 if not source.message_repository: 80 raise ValueError( 81 "A message repository is required to emit non-record messages. Please set the message repository on the source." 82 ) 83 84 message_repository = source.message_repository 85 return StreamFacade( 86 DefaultStream( 87 partition_generator=StreamPartitionGenerator( 88 stream, 89 message_repository, 90 SyncMode.full_refresh 91 if isinstance(cursor, FinalStateCursor) 92 else SyncMode.incremental, 93 [cursor_field] if cursor_field is not None else None, 94 state, 95 ), 96 name=stream.name, 97 namespace=stream.namespace, 98 json_schema=stream.get_json_schema(), 99 primary_key=pk, 100 cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, 101 logger=logger, 102 cursor=cursor, 103 ), 104 stream, 105 cursor, 106 slice_logger=source._slice_logger, 107 logger=logger, 108 ) 109 110 @property 111 def state(self) -> MutableMapping[str, Any]: 112 raise NotImplementedError( 113 "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte" 114 ) 115 116 @state.setter 117 def state(self, value: Mapping[str, Any]) -> None: 118 if "state" in dir(self._legacy_stream): 119 self._legacy_stream.state = value # type: ignore # validating `state` is attribute of stream using `if` above 120 121 def __init__( 122 self, 123 stream: DefaultStream, 124 legacy_stream: Stream, 125 cursor: Cursor, 126 slice_logger: SliceLogger, 127 logger: logging.Logger, 128 ): 129 """ 130 :param stream: The underlying AbstractStream 131 """ 132 self._abstract_stream = stream 133 self._legacy_stream = legacy_stream 134 self._cursor = cursor 135 self._slice_logger = slice_logger 136 self._logger = logger 137 138 def read( 139 self, 140 configured_stream: ConfiguredAirbyteStream, 141 logger: logging.Logger, 142 slice_logger: SliceLogger, 143 stream_state: MutableMapping[str, Any], 144 state_manager: ConnectorStateManager, 145 internal_config: InternalConfig, 146 ) -> Iterable[StreamData]: 147 yield from self._read_records() 148 149 def read_records( 150 self, 151 sync_mode: SyncMode, 152 cursor_field: Optional[List[str]] = None, 153 stream_slice: Optional[Mapping[str, Any]] = None, 154 stream_state: Optional[Mapping[str, Any]] = None, 155 ) -> Iterable[StreamData]: 156 try: 157 yield from self._read_records() 158 except Exception as exc: 159 if hasattr(self._cursor, "state"): 160 state = str(self._cursor.state) 161 else: 162 # This shouldn't happen if the ConcurrentCursor was used 163 state = "unknown; no state attribute was available on the cursor" 164 yield AirbyteMessage( 165 type=Type.LOG, 166 log=AirbyteLogMessage( 167 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 168 ), 169 ) 170 raise exc 171 172 def _read_records(self) -> Iterable[StreamData]: 173 for partition in self._abstract_stream.generate_partitions(): 174 if self._slice_logger.should_log_slice_message(self._logger): 175 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 176 for record in partition.read(): 177 yield record.data 178 179 @property 180 def name(self) -> str: 181 return self._abstract_stream.name 182 183 @property 184 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 185 # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface 186 return self.as_airbyte_stream().source_defined_primary_key # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]] 187 188 @property 189 def cursor_field(self) -> Union[str, List[str]]: 190 if self._abstract_stream.cursor_field is None: 191 return [] 192 else: 193 return self._abstract_stream.cursor_field 194 195 @property 196 def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor 197 return self._cursor 198 199 @property 200 def block_simultaneous_read(self) -> str: 201 """Returns the blocking group name from the underlying stream""" 202 return self._abstract_stream.block_simultaneous_read 203 204 # FIXME the lru_cache seems to be mostly there because of typing issue 205 @lru_cache(maxsize=None) 206 def get_json_schema(self) -> Mapping[str, Any]: 207 return self._abstract_stream.get_json_schema() 208 209 @property 210 def supports_incremental(self) -> bool: 211 return self._legacy_stream.supports_incremental 212 213 def as_airbyte_stream(self) -> AirbyteStream: 214 return self._abstract_stream.as_airbyte_stream() 215 216 def log_stream_sync_configuration(self) -> None: 217 self._abstract_stream.log_stream_sync_configuration() 218 219 def get_underlying_stream(self) -> DefaultStream: 220 return self._abstract_stream 221 222 223class SliceEncoder(json.JSONEncoder): 224 def default(self, obj: Any) -> Any: 225 if hasattr(obj, "__json_serializable__"): 226 return obj.__json_serializable__() 227 228 # Let the base class default method raise the TypeError 229 return super().default(obj) 230 231 232class StreamPartition(Partition): 233 """ 234 This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface 235 236 StreamPartitions are instantiated from a Stream and a stream_slice. 237 238 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 239 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 240 """ 241 242 def __init__( 243 self, 244 stream: Stream, 245 _slice: Optional[Mapping[str, Any]], 246 message_repository: MessageRepository, 247 sync_mode: SyncMode, 248 cursor_field: Optional[List[str]], 249 state: Optional[MutableMapping[str, Any]], 250 ): 251 """ 252 :param stream: The stream to delegate to 253 :param _slice: The partition's stream_slice 254 :param message_repository: The message repository to use to emit non-record messages 255 """ 256 self._stream = stream 257 self._slice = _slice 258 self._message_repository = message_repository 259 self._sync_mode = sync_mode 260 self._cursor_field = cursor_field 261 self._state = state 262 self._hash = SliceHasher.hash(self._stream.name, self._slice) 263 264 def read(self) -> Iterable[Record]: 265 """ 266 Read messages from the stream. 267 If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. 268 Otherwise, the message will be emitted on the message repository. 269 """ 270 try: 271 # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice 272 # by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to: 273 # * fetch_next_page 274 # * parse_response 275 # Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do 276 # `if not stream_state` to know if it calls the Event stream or not 277 for record_data in self._stream.read_records( 278 cursor_field=self._cursor_field, 279 sync_mode=SyncMode.full_refresh, 280 stream_slice=copy.deepcopy(self._slice), 281 stream_state=self._state, 282 ): 283 # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade 284 # For now, file-based connectors have their own stream facade 285 if isinstance(record_data, Mapping): 286 data_to_return = dict(record_data) 287 self._stream.transformer.transform( 288 data_to_return, self._stream.get_json_schema() 289 ) 290 yield Record( 291 data=data_to_return, 292 stream_name=self.stream_name(), 293 associated_slice=self._slice, # type: ignore [arg-type] 294 ) 295 elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: 296 yield Record( 297 data=record_data.record.data or {}, 298 stream_name=self.stream_name(), 299 associated_slice=self._slice, # type: ignore [arg-type] 300 ) 301 else: 302 self._message_repository.emit_message(record_data) 303 except Exception as e: 304 display_message = self._stream.get_error_display_message(e) 305 if display_message: 306 raise ExceptionWithDisplayMessage(display_message) from e 307 else: 308 raise e 309 310 def to_slice(self) -> Optional[Mapping[str, Any]]: 311 return self._slice 312 313 def __hash__(self) -> int: 314 return self._hash 315 316 def stream_name(self) -> str: 317 return self._stream.name 318 319 def __repr__(self) -> str: 320 return f"StreamPartition({self._stream.name}, {self._slice})" 321 322 323class StreamPartitionGenerator(PartitionGenerator): 324 """ 325 This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices 326 327 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 328 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 329 """ 330 331 def __init__( 332 self, 333 stream: Stream, 334 message_repository: MessageRepository, 335 sync_mode: SyncMode, 336 cursor_field: Optional[List[str]], 337 state: Optional[MutableMapping[str, Any]], 338 ): 339 """ 340 :param stream: The stream to delegate to 341 :param message_repository: The message repository to use to emit non-record messages 342 """ 343 self.message_repository = message_repository 344 self._stream = stream 345 self._sync_mode = sync_mode 346 self._cursor_field = cursor_field 347 self._state = state 348 349 def generate(self) -> Iterable[Partition]: 350 for s in self._stream.stream_slices( 351 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 352 ): 353 yield StreamPartition( 354 self._stream, 355 copy.deepcopy(s), 356 self.message_repository, 357 self._sync_mode, 358 self._cursor_field, 359 self._state, 360 )
49@deprecated( 50 "This class is experimental. Use at your own risk.", 51 category=ExperimentalClassWarning, 52) 53class StreamFacade(AbstractStreamFacade[DefaultStream], Stream): 54 """ 55 The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream. 56 57 All methods either delegate to the wrapped AbstractStream or provide a default implementation. 58 The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported. 59 """ 60 61 @classmethod 62 def create_from_stream( 63 cls, 64 stream: Stream, 65 source: AbstractSource, 66 logger: logging.Logger, 67 state: Optional[MutableMapping[str, Any]], 68 cursor: Cursor, 69 ) -> Stream: 70 """ 71 Create a ConcurrentStream from a Stream object. 72 :param source: The source 73 :param stream: The stream 74 :param max_workers: The maximum number of worker thread to use 75 :return: 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 80 if not source.message_repository: 81 raise ValueError( 82 "A message repository is required to emit non-record messages. Please set the message repository on the source." 83 ) 84 85 message_repository = source.message_repository 86 return StreamFacade( 87 DefaultStream( 88 partition_generator=StreamPartitionGenerator( 89 stream, 90 message_repository, 91 SyncMode.full_refresh 92 if isinstance(cursor, FinalStateCursor) 93 else SyncMode.incremental, 94 [cursor_field] if cursor_field is not None else None, 95 state, 96 ), 97 name=stream.name, 98 namespace=stream.namespace, 99 json_schema=stream.get_json_schema(), 100 primary_key=pk, 101 cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, 102 logger=logger, 103 cursor=cursor, 104 ), 105 stream, 106 cursor, 107 slice_logger=source._slice_logger, 108 logger=logger, 109 ) 110 111 @property 112 def state(self) -> MutableMapping[str, Any]: 113 raise NotImplementedError( 114 "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte" 115 ) 116 117 @state.setter 118 def state(self, value: Mapping[str, Any]) -> None: 119 if "state" in dir(self._legacy_stream): 120 self._legacy_stream.state = value # type: ignore # validating `state` is attribute of stream using `if` above 121 122 def __init__( 123 self, 124 stream: DefaultStream, 125 legacy_stream: Stream, 126 cursor: Cursor, 127 slice_logger: SliceLogger, 128 logger: logging.Logger, 129 ): 130 """ 131 :param stream: The underlying AbstractStream 132 """ 133 self._abstract_stream = stream 134 self._legacy_stream = legacy_stream 135 self._cursor = cursor 136 self._slice_logger = slice_logger 137 self._logger = logger 138 139 def read( 140 self, 141 configured_stream: ConfiguredAirbyteStream, 142 logger: logging.Logger, 143 slice_logger: SliceLogger, 144 stream_state: MutableMapping[str, Any], 145 state_manager: ConnectorStateManager, 146 internal_config: InternalConfig, 147 ) -> Iterable[StreamData]: 148 yield from self._read_records() 149 150 def read_records( 151 self, 152 sync_mode: SyncMode, 153 cursor_field: Optional[List[str]] = None, 154 stream_slice: Optional[Mapping[str, Any]] = None, 155 stream_state: Optional[Mapping[str, Any]] = None, 156 ) -> Iterable[StreamData]: 157 try: 158 yield from self._read_records() 159 except Exception as exc: 160 if hasattr(self._cursor, "state"): 161 state = str(self._cursor.state) 162 else: 163 # This shouldn't happen if the ConcurrentCursor was used 164 state = "unknown; no state attribute was available on the cursor" 165 yield AirbyteMessage( 166 type=Type.LOG, 167 log=AirbyteLogMessage( 168 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 169 ), 170 ) 171 raise exc 172 173 def _read_records(self) -> Iterable[StreamData]: 174 for partition in self._abstract_stream.generate_partitions(): 175 if self._slice_logger.should_log_slice_message(self._logger): 176 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 177 for record in partition.read(): 178 yield record.data 179 180 @property 181 def name(self) -> str: 182 return self._abstract_stream.name 183 184 @property 185 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 186 # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface 187 return self.as_airbyte_stream().source_defined_primary_key # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]] 188 189 @property 190 def cursor_field(self) -> Union[str, List[str]]: 191 if self._abstract_stream.cursor_field is None: 192 return [] 193 else: 194 return self._abstract_stream.cursor_field 195 196 @property 197 def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor 198 return self._cursor 199 200 @property 201 def block_simultaneous_read(self) -> str: 202 """Returns the blocking group name from the underlying stream""" 203 return self._abstract_stream.block_simultaneous_read 204 205 # FIXME the lru_cache seems to be mostly there because of typing issue 206 @lru_cache(maxsize=None) 207 def get_json_schema(self) -> Mapping[str, Any]: 208 return self._abstract_stream.get_json_schema() 209 210 @property 211 def supports_incremental(self) -> bool: 212 return self._legacy_stream.supports_incremental 213 214 def as_airbyte_stream(self) -> AirbyteStream: 215 return self._abstract_stream.as_airbyte_stream() 216 217 def log_stream_sync_configuration(self) -> None: 218 self._abstract_stream.log_stream_sync_configuration() 219 220 def get_underlying_stream(self) -> DefaultStream: 221 return self._abstract_stream
The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
All methods either delegate to the wrapped AbstractStream or provide a default implementation. The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
122 def __init__( 123 self, 124 stream: DefaultStream, 125 legacy_stream: Stream, 126 cursor: Cursor, 127 slice_logger: SliceLogger, 128 logger: logging.Logger, 129 ): 130 """ 131 :param stream: The underlying AbstractStream 132 """ 133 self._abstract_stream = stream 134 self._legacy_stream = legacy_stream 135 self._cursor = cursor 136 self._slice_logger = slice_logger 137 self._logger = logger
Parameters
- stream: The underlying AbstractStream
61 @classmethod 62 def create_from_stream( 63 cls, 64 stream: Stream, 65 source: AbstractSource, 66 logger: logging.Logger, 67 state: Optional[MutableMapping[str, Any]], 68 cursor: Cursor, 69 ) -> Stream: 70 """ 71 Create a ConcurrentStream from a Stream object. 72 :param source: The source 73 :param stream: The stream 74 :param max_workers: The maximum number of worker thread to use 75 :return: 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 80 if not source.message_repository: 81 raise ValueError( 82 "A message repository is required to emit non-record messages. Please set the message repository on the source." 83 ) 84 85 message_repository = source.message_repository 86 return StreamFacade( 87 DefaultStream( 88 partition_generator=StreamPartitionGenerator( 89 stream, 90 message_repository, 91 SyncMode.full_refresh 92 if isinstance(cursor, FinalStateCursor) 93 else SyncMode.incremental, 94 [cursor_field] if cursor_field is not None else None, 95 state, 96 ), 97 name=stream.name, 98 namespace=stream.namespace, 99 json_schema=stream.get_json_schema(), 100 primary_key=pk, 101 cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, 102 logger=logger, 103 cursor=cursor, 104 ), 105 stream, 106 cursor, 107 slice_logger=source._slice_logger, 108 logger=logger, 109 )
Create a ConcurrentStream from a Stream object.
Parameters
- source: The source
- stream: The stream
- max_workers: The maximum number of worker thread to use
Returns
200 @property 201 def block_simultaneous_read(self) -> str: 202 """Returns the blocking group name from the underlying stream""" 203 return self._abstract_stream.block_simultaneous_read
Returns the blocking group name from the underlying stream
Return the underlying stream facade object.
Inherited Members
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- name
- get_error_display_message
- read
- read_only_records
- read_records
- get_json_schema
- as_airbyte_stream
- supports_incremental
- is_resumable
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- primary_key
- stream_slices
- state_checkpoint_interval
- get_updated_state
- get_cursor
- log_stream_sync_configuration
- configured_json_schema
224class SliceEncoder(json.JSONEncoder): 225 def default(self, obj: Any) -> Any: 226 if hasattr(obj, "__json_serializable__"): 227 return obj.__json_serializable__() 228 229 # Let the base class default method raise the TypeError 230 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default() method with another method that returns a serializable
object for o if possible, otherwise it should call the superclass
implementation (to raise TypeError).
225 def default(self, obj: Any) -> Any: 226 if hasattr(obj, "__json_serializable__"): 227 return obj.__json_serializable__() 228 229 # Let the base class default method raise the TypeError 230 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o, or calls the base implementation
(to raise a TypeError).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder.default(self, o)
233class StreamPartition(Partition): 234 """ 235 This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface 236 237 StreamPartitions are instantiated from a Stream and a stream_slice. 238 239 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 240 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 241 """ 242 243 def __init__( 244 self, 245 stream: Stream, 246 _slice: Optional[Mapping[str, Any]], 247 message_repository: MessageRepository, 248 sync_mode: SyncMode, 249 cursor_field: Optional[List[str]], 250 state: Optional[MutableMapping[str, Any]], 251 ): 252 """ 253 :param stream: The stream to delegate to 254 :param _slice: The partition's stream_slice 255 :param message_repository: The message repository to use to emit non-record messages 256 """ 257 self._stream = stream 258 self._slice = _slice 259 self._message_repository = message_repository 260 self._sync_mode = sync_mode 261 self._cursor_field = cursor_field 262 self._state = state 263 self._hash = SliceHasher.hash(self._stream.name, self._slice) 264 265 def read(self) -> Iterable[Record]: 266 """ 267 Read messages from the stream. 268 If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. 269 Otherwise, the message will be emitted on the message repository. 270 """ 271 try: 272 # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice 273 # by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to: 274 # * fetch_next_page 275 # * parse_response 276 # Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do 277 # `if not stream_state` to know if it calls the Event stream or not 278 for record_data in self._stream.read_records( 279 cursor_field=self._cursor_field, 280 sync_mode=SyncMode.full_refresh, 281 stream_slice=copy.deepcopy(self._slice), 282 stream_state=self._state, 283 ): 284 # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade 285 # For now, file-based connectors have their own stream facade 286 if isinstance(record_data, Mapping): 287 data_to_return = dict(record_data) 288 self._stream.transformer.transform( 289 data_to_return, self._stream.get_json_schema() 290 ) 291 yield Record( 292 data=data_to_return, 293 stream_name=self.stream_name(), 294 associated_slice=self._slice, # type: ignore [arg-type] 295 ) 296 elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: 297 yield Record( 298 data=record_data.record.data or {}, 299 stream_name=self.stream_name(), 300 associated_slice=self._slice, # type: ignore [arg-type] 301 ) 302 else: 303 self._message_repository.emit_message(record_data) 304 except Exception as e: 305 display_message = self._stream.get_error_display_message(e) 306 if display_message: 307 raise ExceptionWithDisplayMessage(display_message) from e 308 else: 309 raise e 310 311 def to_slice(self) -> Optional[Mapping[str, Any]]: 312 return self._slice 313 314 def __hash__(self) -> int: 315 return self._hash 316 317 def stream_name(self) -> str: 318 return self._stream.name 319 320 def __repr__(self) -> str: 321 return f"StreamPartition({self._stream.name}, {self._slice})"
This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
StreamPartitions are instantiated from a Stream and a stream_slice.
This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
243 def __init__( 244 self, 245 stream: Stream, 246 _slice: Optional[Mapping[str, Any]], 247 message_repository: MessageRepository, 248 sync_mode: SyncMode, 249 cursor_field: Optional[List[str]], 250 state: Optional[MutableMapping[str, Any]], 251 ): 252 """ 253 :param stream: The stream to delegate to 254 :param _slice: The partition's stream_slice 255 :param message_repository: The message repository to use to emit non-record messages 256 """ 257 self._stream = stream 258 self._slice = _slice 259 self._message_repository = message_repository 260 self._sync_mode = sync_mode 261 self._cursor_field = cursor_field 262 self._state = state 263 self._hash = SliceHasher.hash(self._stream.name, self._slice)
Parameters
- stream: The stream to delegate to
- _slice: The partition's stream_slice
- message_repository: The message repository to use to emit non-record messages
265 def read(self) -> Iterable[Record]: 266 """ 267 Read messages from the stream. 268 If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. 269 Otherwise, the message will be emitted on the message repository. 270 """ 271 try: 272 # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice 273 # by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to: 274 # * fetch_next_page 275 # * parse_response 276 # Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do 277 # `if not stream_state` to know if it calls the Event stream or not 278 for record_data in self._stream.read_records( 279 cursor_field=self._cursor_field, 280 sync_mode=SyncMode.full_refresh, 281 stream_slice=copy.deepcopy(self._slice), 282 stream_state=self._state, 283 ): 284 # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade 285 # For now, file-based connectors have their own stream facade 286 if isinstance(record_data, Mapping): 287 data_to_return = dict(record_data) 288 self._stream.transformer.transform( 289 data_to_return, self._stream.get_json_schema() 290 ) 291 yield Record( 292 data=data_to_return, 293 stream_name=self.stream_name(), 294 associated_slice=self._slice, # type: ignore [arg-type] 295 ) 296 elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: 297 yield Record( 298 data=record_data.record.data or {}, 299 stream_name=self.stream_name(), 300 associated_slice=self._slice, # type: ignore [arg-type] 301 ) 302 else: 303 self._message_repository.emit_message(record_data) 304 except Exception as e: 305 display_message = self._stream.get_error_display_message(e) 306 if display_message: 307 raise ExceptionWithDisplayMessage(display_message) from e 308 else: 309 raise e
Read messages from the stream. If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. Otherwise, the message will be emitted on the message repository.
Converts the partition to a slice that can be serialized and deserialized.
Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested
values (example)
Returns
A mapping representing a slice
324class StreamPartitionGenerator(PartitionGenerator): 325 """ 326 This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices 327 328 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 329 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 330 """ 331 332 def __init__( 333 self, 334 stream: Stream, 335 message_repository: MessageRepository, 336 sync_mode: SyncMode, 337 cursor_field: Optional[List[str]], 338 state: Optional[MutableMapping[str, Any]], 339 ): 340 """ 341 :param stream: The stream to delegate to 342 :param message_repository: The message repository to use to emit non-record messages 343 """ 344 self.message_repository = message_repository 345 self._stream = stream 346 self._sync_mode = sync_mode 347 self._cursor_field = cursor_field 348 self._state = state 349 350 def generate(self) -> Iterable[Partition]: 351 for s in self._stream.stream_slices( 352 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 353 ): 354 yield StreamPartition( 355 self._stream, 356 copy.deepcopy(s), 357 self.message_repository, 358 self._sync_mode, 359 self._cursor_field, 360 self._state, 361 )
This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
332 def __init__( 333 self, 334 stream: Stream, 335 message_repository: MessageRepository, 336 sync_mode: SyncMode, 337 cursor_field: Optional[List[str]], 338 state: Optional[MutableMapping[str, Any]], 339 ): 340 """ 341 :param stream: The stream to delegate to 342 :param message_repository: The message repository to use to emit non-record messages 343 """ 344 self.message_repository = message_repository 345 self._stream = stream 346 self._sync_mode = sync_mode 347 self._cursor_field = cursor_field 348 self._state = state
Parameters
- stream: The stream to delegate to
- message_repository: The message repository to use to emit non-record messages
350 def generate(self) -> Iterable[Partition]: 351 for s in self._stream.stream_slices( 352 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 353 ): 354 yield StreamPartition( 355 self._stream, 356 copy.deepcopy(s), 357 self.message_repository, 358 self._sync_mode, 359 self._cursor_field, 360 self._state, 361 )
Generates partitions for a given sync mode.
Returns
An iterable of partitions