airbyte_cdk.sources.streams.concurrent.adapters
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import json 7import logging 8from functools import lru_cache 9from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union 10 11from typing_extensions import deprecated 12 13from airbyte_cdk.models import ( 14 AirbyteLogMessage, 15 AirbyteMessage, 16 AirbyteStream, 17 ConfiguredAirbyteStream, 18 Level, 19 SyncMode, 20 Type, 21) 22from airbyte_cdk.sources import AbstractSource, Source 23from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 24from airbyte_cdk.sources.message import MessageRepository 25from airbyte_cdk.sources.source import ExperimentalClassWarning 26from airbyte_cdk.sources.streams import Stream 27from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 28from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor 29from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 30from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage 31from airbyte_cdk.sources.streams.concurrent.helpers import ( 32 get_cursor_field_from_stream, 33 get_primary_key_from_stream, 34) 35from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 36from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 37from airbyte_cdk.sources.streams.core import StreamData 38from airbyte_cdk.sources.types import Record 39from airbyte_cdk.sources.utils.schema_helpers import InternalConfig 40from airbyte_cdk.sources.utils.slice_logger import SliceLogger 41from airbyte_cdk.utils.slice_hasher import SliceHasher 42 43""" 44This module contains adapters to help enabling concurrency on Stream objects without needing to migrate to AbstractStream 45""" 46 47 48@deprecated( 49 "This class is experimental. Use at your own risk.", 50 category=ExperimentalClassWarning, 51) 52class StreamFacade(AbstractStreamFacade[DefaultStream], Stream): 53 """ 54 The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream. 55 56 All methods either delegate to the wrapped AbstractStream or provide a default implementation. 57 The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported. 58 """ 59 60 @classmethod 61 def create_from_stream( 62 cls, 63 stream: Stream, 64 source: AbstractSource, 65 logger: logging.Logger, 66 state: Optional[MutableMapping[str, Any]], 67 cursor: Cursor, 68 ) -> Stream: 69 """ 70 Create a ConcurrentStream from a Stream object. 71 :param source: The source 72 :param stream: The stream 73 :param max_workers: The maximum number of worker thread to use 74 :return: 75 """ 76 pk = get_primary_key_from_stream(stream.primary_key) 77 cursor_field = get_cursor_field_from_stream(stream) 78 79 if not source.message_repository: 80 raise ValueError( 81 "A message repository is required to emit non-record messages. Please set the message repository on the source." 82 ) 83 84 message_repository = source.message_repository 85 return StreamFacade( 86 DefaultStream( 87 partition_generator=StreamPartitionGenerator( 88 stream, 89 message_repository, 90 SyncMode.full_refresh 91 if isinstance(cursor, FinalStateCursor) 92 else SyncMode.incremental, 93 [cursor_field] if cursor_field is not None else None, 94 state, 95 ), 96 name=stream.name, 97 namespace=stream.namespace, 98 json_schema=stream.get_json_schema(), 99 primary_key=pk, 100 cursor_field=cursor_field, 101 logger=logger, 102 cursor=cursor, 103 ), 104 stream, 105 cursor, 106 slice_logger=source._slice_logger, 107 logger=logger, 108 ) 109 110 @property 111 def state(self) -> MutableMapping[str, Any]: 112 raise NotImplementedError( 113 "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte" 114 ) 115 116 @state.setter 117 def state(self, value: Mapping[str, Any]) -> None: 118 if "state" in dir(self._legacy_stream): 119 self._legacy_stream.state = value # type: ignore # validating `state` is attribute of stream using `if` above 120 121 def __init__( 122 self, 123 stream: DefaultStream, 124 legacy_stream: Stream, 125 cursor: Cursor, 126 slice_logger: SliceLogger, 127 logger: logging.Logger, 128 ): 129 """ 130 :param stream: The underlying AbstractStream 131 """ 132 self._abstract_stream = stream 133 self._legacy_stream = legacy_stream 134 self._cursor = cursor 135 self._slice_logger = slice_logger 136 self._logger = logger 137 138 def read( 139 self, 140 configured_stream: ConfiguredAirbyteStream, 141 logger: logging.Logger, 142 slice_logger: SliceLogger, 143 stream_state: MutableMapping[str, Any], 144 state_manager: ConnectorStateManager, 145 internal_config: InternalConfig, 146 ) -> Iterable[StreamData]: 147 yield from self._read_records() 148 149 def read_records( 150 self, 151 sync_mode: SyncMode, 152 cursor_field: Optional[List[str]] = None, 153 stream_slice: Optional[Mapping[str, Any]] = None, 154 stream_state: Optional[Mapping[str, Any]] = None, 155 ) -> Iterable[StreamData]: 156 try: 157 yield from self._read_records() 158 except Exception as exc: 159 if hasattr(self._cursor, "state"): 160 state = str(self._cursor.state) 161 else: 162 # This shouldn't happen if the ConcurrentCursor was used 163 state = "unknown; no state attribute was available on the cursor" 164 yield AirbyteMessage( 165 type=Type.LOG, 166 log=AirbyteLogMessage( 167 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 168 ), 169 ) 170 raise exc 171 172 def _read_records(self) -> Iterable[StreamData]: 173 for partition in self._abstract_stream.generate_partitions(): 174 if self._slice_logger.should_log_slice_message(self._logger): 175 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 176 for record in partition.read(): 177 yield record.data 178 179 @property 180 def name(self) -> str: 181 return self._abstract_stream.name 182 183 @property 184 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 185 # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface 186 return self.as_airbyte_stream().source_defined_primary_key # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]] 187 188 @property 189 def cursor_field(self) -> Union[str, List[str]]: 190 if self._abstract_stream.cursor_field is None: 191 return [] 192 else: 193 return self._abstract_stream.cursor_field 194 195 @property 196 def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor 197 return self._cursor 198 199 # FIXME the lru_cache seems to be mostly there because of typing issue 200 @lru_cache(maxsize=None) 201 def get_json_schema(self) -> Mapping[str, Any]: 202 return self._abstract_stream.get_json_schema() 203 204 @property 205 def supports_incremental(self) -> bool: 206 return self._legacy_stream.supports_incremental 207 208 def as_airbyte_stream(self) -> AirbyteStream: 209 return self._abstract_stream.as_airbyte_stream() 210 211 def log_stream_sync_configuration(self) -> None: 212 self._abstract_stream.log_stream_sync_configuration() 213 214 def get_underlying_stream(self) -> DefaultStream: 215 return self._abstract_stream 216 217 218class SliceEncoder(json.JSONEncoder): 219 def default(self, obj: Any) -> Any: 220 if hasattr(obj, "__json_serializable__"): 221 return obj.__json_serializable__() 222 223 # Let the base class default method raise the TypeError 224 return super().default(obj) 225 226 227class StreamPartition(Partition): 228 """ 229 This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface 230 231 StreamPartitions are instantiated from a Stream and a stream_slice. 232 233 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 234 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 235 """ 236 237 def __init__( 238 self, 239 stream: Stream, 240 _slice: Optional[Mapping[str, Any]], 241 message_repository: MessageRepository, 242 sync_mode: SyncMode, 243 cursor_field: Optional[List[str]], 244 state: Optional[MutableMapping[str, Any]], 245 ): 246 """ 247 :param stream: The stream to delegate to 248 :param _slice: The partition's stream_slice 249 :param message_repository: The message repository to use to emit non-record messages 250 """ 251 self._stream = stream 252 self._slice = _slice 253 self._message_repository = message_repository 254 self._sync_mode = sync_mode 255 self._cursor_field = cursor_field 256 self._state = state 257 self._hash = SliceHasher.hash(self._stream.name, self._slice) 258 259 def read(self) -> Iterable[Record]: 260 """ 261 Read messages from the stream. 262 If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. 263 Otherwise, the message will be emitted on the message repository. 264 """ 265 try: 266 # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice 267 # by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to: 268 # * fetch_next_page 269 # * parse_response 270 # Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do 271 # `if not stream_state` to know if it calls the Event stream or not 272 for record_data in self._stream.read_records( 273 cursor_field=self._cursor_field, 274 sync_mode=SyncMode.full_refresh, 275 stream_slice=copy.deepcopy(self._slice), 276 stream_state=self._state, 277 ): 278 # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade 279 # For now, file-based connectors have their own stream facade 280 if isinstance(record_data, Mapping): 281 data_to_return = dict(record_data) 282 self._stream.transformer.transform( 283 data_to_return, self._stream.get_json_schema() 284 ) 285 yield Record( 286 data=data_to_return, 287 stream_name=self.stream_name(), 288 associated_slice=self._slice, # type: ignore [arg-type] 289 ) 290 elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: 291 yield Record( 292 data=record_data.record.data or {}, 293 stream_name=self.stream_name(), 294 associated_slice=self._slice, # type: ignore [arg-type] 295 ) 296 else: 297 self._message_repository.emit_message(record_data) 298 except Exception as e: 299 display_message = self._stream.get_error_display_message(e) 300 if display_message: 301 raise ExceptionWithDisplayMessage(display_message) from e 302 else: 303 raise e 304 305 def to_slice(self) -> Optional[Mapping[str, Any]]: 306 return self._slice 307 308 def __hash__(self) -> int: 309 return self._hash 310 311 def stream_name(self) -> str: 312 return self._stream.name 313 314 def __repr__(self) -> str: 315 return f"StreamPartition({self._stream.name}, {self._slice})" 316 317 318class StreamPartitionGenerator(PartitionGenerator): 319 """ 320 This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices 321 322 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 323 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 324 """ 325 326 def __init__( 327 self, 328 stream: Stream, 329 message_repository: MessageRepository, 330 sync_mode: SyncMode, 331 cursor_field: Optional[List[str]], 332 state: Optional[MutableMapping[str, Any]], 333 ): 334 """ 335 :param stream: The stream to delegate to 336 :param message_repository: The message repository to use to emit non-record messages 337 """ 338 self.message_repository = message_repository 339 self._stream = stream 340 self._sync_mode = sync_mode 341 self._cursor_field = cursor_field 342 self._state = state 343 344 def generate(self) -> Iterable[Partition]: 345 for s in self._stream.stream_slices( 346 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 347 ): 348 yield StreamPartition( 349 self._stream, 350 copy.deepcopy(s), 351 self.message_repository, 352 self._sync_mode, 353 self._cursor_field, 354 self._state, 355 )
49@deprecated( 50 "This class is experimental. Use at your own risk.", 51 category=ExperimentalClassWarning, 52) 53class StreamFacade(AbstractStreamFacade[DefaultStream], Stream): 54 """ 55 The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream. 56 57 All methods either delegate to the wrapped AbstractStream or provide a default implementation. 58 The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported. 59 """ 60 61 @classmethod 62 def create_from_stream( 63 cls, 64 stream: Stream, 65 source: AbstractSource, 66 logger: logging.Logger, 67 state: Optional[MutableMapping[str, Any]], 68 cursor: Cursor, 69 ) -> Stream: 70 """ 71 Create a ConcurrentStream from a Stream object. 72 :param source: The source 73 :param stream: The stream 74 :param max_workers: The maximum number of worker thread to use 75 :return: 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 80 if not source.message_repository: 81 raise ValueError( 82 "A message repository is required to emit non-record messages. Please set the message repository on the source." 83 ) 84 85 message_repository = source.message_repository 86 return StreamFacade( 87 DefaultStream( 88 partition_generator=StreamPartitionGenerator( 89 stream, 90 message_repository, 91 SyncMode.full_refresh 92 if isinstance(cursor, FinalStateCursor) 93 else SyncMode.incremental, 94 [cursor_field] if cursor_field is not None else None, 95 state, 96 ), 97 name=stream.name, 98 namespace=stream.namespace, 99 json_schema=stream.get_json_schema(), 100 primary_key=pk, 101 cursor_field=cursor_field, 102 logger=logger, 103 cursor=cursor, 104 ), 105 stream, 106 cursor, 107 slice_logger=source._slice_logger, 108 logger=logger, 109 ) 110 111 @property 112 def state(self) -> MutableMapping[str, Any]: 113 raise NotImplementedError( 114 "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte" 115 ) 116 117 @state.setter 118 def state(self, value: Mapping[str, Any]) -> None: 119 if "state" in dir(self._legacy_stream): 120 self._legacy_stream.state = value # type: ignore # validating `state` is attribute of stream using `if` above 121 122 def __init__( 123 self, 124 stream: DefaultStream, 125 legacy_stream: Stream, 126 cursor: Cursor, 127 slice_logger: SliceLogger, 128 logger: logging.Logger, 129 ): 130 """ 131 :param stream: The underlying AbstractStream 132 """ 133 self._abstract_stream = stream 134 self._legacy_stream = legacy_stream 135 self._cursor = cursor 136 self._slice_logger = slice_logger 137 self._logger = logger 138 139 def read( 140 self, 141 configured_stream: ConfiguredAirbyteStream, 142 logger: logging.Logger, 143 slice_logger: SliceLogger, 144 stream_state: MutableMapping[str, Any], 145 state_manager: ConnectorStateManager, 146 internal_config: InternalConfig, 147 ) -> Iterable[StreamData]: 148 yield from self._read_records() 149 150 def read_records( 151 self, 152 sync_mode: SyncMode, 153 cursor_field: Optional[List[str]] = None, 154 stream_slice: Optional[Mapping[str, Any]] = None, 155 stream_state: Optional[Mapping[str, Any]] = None, 156 ) -> Iterable[StreamData]: 157 try: 158 yield from self._read_records() 159 except Exception as exc: 160 if hasattr(self._cursor, "state"): 161 state = str(self._cursor.state) 162 else: 163 # This shouldn't happen if the ConcurrentCursor was used 164 state = "unknown; no state attribute was available on the cursor" 165 yield AirbyteMessage( 166 type=Type.LOG, 167 log=AirbyteLogMessage( 168 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 169 ), 170 ) 171 raise exc 172 173 def _read_records(self) -> Iterable[StreamData]: 174 for partition in self._abstract_stream.generate_partitions(): 175 if self._slice_logger.should_log_slice_message(self._logger): 176 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 177 for record in partition.read(): 178 yield record.data 179 180 @property 181 def name(self) -> str: 182 return self._abstract_stream.name 183 184 @property 185 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 186 # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface 187 return self.as_airbyte_stream().source_defined_primary_key # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]] 188 189 @property 190 def cursor_field(self) -> Union[str, List[str]]: 191 if self._abstract_stream.cursor_field is None: 192 return [] 193 else: 194 return self._abstract_stream.cursor_field 195 196 @property 197 def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor 198 return self._cursor 199 200 # FIXME the lru_cache seems to be mostly there because of typing issue 201 @lru_cache(maxsize=None) 202 def get_json_schema(self) -> Mapping[str, Any]: 203 return self._abstract_stream.get_json_schema() 204 205 @property 206 def supports_incremental(self) -> bool: 207 return self._legacy_stream.supports_incremental 208 209 def as_airbyte_stream(self) -> AirbyteStream: 210 return self._abstract_stream.as_airbyte_stream() 211 212 def log_stream_sync_configuration(self) -> None: 213 self._abstract_stream.log_stream_sync_configuration() 214 215 def get_underlying_stream(self) -> DefaultStream: 216 return self._abstract_stream
The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
All methods either delegate to the wrapped AbstractStream or provide a default implementation. The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
122 def __init__( 123 self, 124 stream: DefaultStream, 125 legacy_stream: Stream, 126 cursor: Cursor, 127 slice_logger: SliceLogger, 128 logger: logging.Logger, 129 ): 130 """ 131 :param stream: The underlying AbstractStream 132 """ 133 self._abstract_stream = stream 134 self._legacy_stream = legacy_stream 135 self._cursor = cursor 136 self._slice_logger = slice_logger 137 self._logger = logger
Parameters
- stream: The underlying AbstractStream
61 @classmethod 62 def create_from_stream( 63 cls, 64 stream: Stream, 65 source: AbstractSource, 66 logger: logging.Logger, 67 state: Optional[MutableMapping[str, Any]], 68 cursor: Cursor, 69 ) -> Stream: 70 """ 71 Create a ConcurrentStream from a Stream object. 72 :param source: The source 73 :param stream: The stream 74 :param max_workers: The maximum number of worker thread to use 75 :return: 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 80 if not source.message_repository: 81 raise ValueError( 82 "A message repository is required to emit non-record messages. Please set the message repository on the source." 83 ) 84 85 message_repository = source.message_repository 86 return StreamFacade( 87 DefaultStream( 88 partition_generator=StreamPartitionGenerator( 89 stream, 90 message_repository, 91 SyncMode.full_refresh 92 if isinstance(cursor, FinalStateCursor) 93 else SyncMode.incremental, 94 [cursor_field] if cursor_field is not None else None, 95 state, 96 ), 97 name=stream.name, 98 namespace=stream.namespace, 99 json_schema=stream.get_json_schema(), 100 primary_key=pk, 101 cursor_field=cursor_field, 102 logger=logger, 103 cursor=cursor, 104 ), 105 stream, 106 cursor, 107 slice_logger=source._slice_logger, 108 logger=logger, 109 )
Create a ConcurrentStream from a Stream object.
Parameters
- source: The source
- stream: The stream
- max_workers: The maximum number of worker thread to use
Returns
Return the underlying stream facade object.
Inherited Members
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- name
- get_error_display_message
- read
- read_only_records
- read_records
- get_json_schema
- as_airbyte_stream
- supports_incremental
- is_resumable
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- primary_key
- stream_slices
- state_checkpoint_interval
- get_updated_state
- get_cursor
- log_stream_sync_configuration
- configured_json_schema
219class SliceEncoder(json.JSONEncoder): 220 def default(self, obj: Any) -> Any: 221 if hasattr(obj, "__json_serializable__"): 222 return obj.__json_serializable__() 223 224 # Let the base class default method raise the TypeError 225 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
220 def default(self, obj: Any) -> Any: 221 if hasattr(obj, "__json_serializable__"): 222 return obj.__json_serializable__() 223 224 # Let the base class default method raise the TypeError 225 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder.default(self, o)
228class StreamPartition(Partition): 229 """ 230 This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface 231 232 StreamPartitions are instantiated from a Stream and a stream_slice. 233 234 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 235 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 236 """ 237 238 def __init__( 239 self, 240 stream: Stream, 241 _slice: Optional[Mapping[str, Any]], 242 message_repository: MessageRepository, 243 sync_mode: SyncMode, 244 cursor_field: Optional[List[str]], 245 state: Optional[MutableMapping[str, Any]], 246 ): 247 """ 248 :param stream: The stream to delegate to 249 :param _slice: The partition's stream_slice 250 :param message_repository: The message repository to use to emit non-record messages 251 """ 252 self._stream = stream 253 self._slice = _slice 254 self._message_repository = message_repository 255 self._sync_mode = sync_mode 256 self._cursor_field = cursor_field 257 self._state = state 258 self._hash = SliceHasher.hash(self._stream.name, self._slice) 259 260 def read(self) -> Iterable[Record]: 261 """ 262 Read messages from the stream. 263 If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. 264 Otherwise, the message will be emitted on the message repository. 265 """ 266 try: 267 # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice 268 # by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to: 269 # * fetch_next_page 270 # * parse_response 271 # Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do 272 # `if not stream_state` to know if it calls the Event stream or not 273 for record_data in self._stream.read_records( 274 cursor_field=self._cursor_field, 275 sync_mode=SyncMode.full_refresh, 276 stream_slice=copy.deepcopy(self._slice), 277 stream_state=self._state, 278 ): 279 # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade 280 # For now, file-based connectors have their own stream facade 281 if isinstance(record_data, Mapping): 282 data_to_return = dict(record_data) 283 self._stream.transformer.transform( 284 data_to_return, self._stream.get_json_schema() 285 ) 286 yield Record( 287 data=data_to_return, 288 stream_name=self.stream_name(), 289 associated_slice=self._slice, # type: ignore [arg-type] 290 ) 291 elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: 292 yield Record( 293 data=record_data.record.data or {}, 294 stream_name=self.stream_name(), 295 associated_slice=self._slice, # type: ignore [arg-type] 296 ) 297 else: 298 self._message_repository.emit_message(record_data) 299 except Exception as e: 300 display_message = self._stream.get_error_display_message(e) 301 if display_message: 302 raise ExceptionWithDisplayMessage(display_message) from e 303 else: 304 raise e 305 306 def to_slice(self) -> Optional[Mapping[str, Any]]: 307 return self._slice 308 309 def __hash__(self) -> int: 310 return self._hash 311 312 def stream_name(self) -> str: 313 return self._stream.name 314 315 def __repr__(self) -> str: 316 return f"StreamPartition({self._stream.name}, {self._slice})"
This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
StreamPartitions are instantiated from a Stream and a stream_slice.
This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
238 def __init__( 239 self, 240 stream: Stream, 241 _slice: Optional[Mapping[str, Any]], 242 message_repository: MessageRepository, 243 sync_mode: SyncMode, 244 cursor_field: Optional[List[str]], 245 state: Optional[MutableMapping[str, Any]], 246 ): 247 """ 248 :param stream: The stream to delegate to 249 :param _slice: The partition's stream_slice 250 :param message_repository: The message repository to use to emit non-record messages 251 """ 252 self._stream = stream 253 self._slice = _slice 254 self._message_repository = message_repository 255 self._sync_mode = sync_mode 256 self._cursor_field = cursor_field 257 self._state = state 258 self._hash = SliceHasher.hash(self._stream.name, self._slice)
Parameters
- stream: The stream to delegate to
- _slice: The partition's stream_slice
- message_repository: The message repository to use to emit non-record messages
260 def read(self) -> Iterable[Record]: 261 """ 262 Read messages from the stream. 263 If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. 264 Otherwise, the message will be emitted on the message repository. 265 """ 266 try: 267 # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice 268 # by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to: 269 # * fetch_next_page 270 # * parse_response 271 # Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do 272 # `if not stream_state` to know if it calls the Event stream or not 273 for record_data in self._stream.read_records( 274 cursor_field=self._cursor_field, 275 sync_mode=SyncMode.full_refresh, 276 stream_slice=copy.deepcopy(self._slice), 277 stream_state=self._state, 278 ): 279 # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade 280 # For now, file-based connectors have their own stream facade 281 if isinstance(record_data, Mapping): 282 data_to_return = dict(record_data) 283 self._stream.transformer.transform( 284 data_to_return, self._stream.get_json_schema() 285 ) 286 yield Record( 287 data=data_to_return, 288 stream_name=self.stream_name(), 289 associated_slice=self._slice, # type: ignore [arg-type] 290 ) 291 elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: 292 yield Record( 293 data=record_data.record.data or {}, 294 stream_name=self.stream_name(), 295 associated_slice=self._slice, # type: ignore [arg-type] 296 ) 297 else: 298 self._message_repository.emit_message(record_data) 299 except Exception as e: 300 display_message = self._stream.get_error_display_message(e) 301 if display_message: 302 raise ExceptionWithDisplayMessage(display_message) from e 303 else: 304 raise e
Read messages from the stream. If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. Otherwise, the message will be emitted on the message repository.
Converts the partition to a slice that can be serialized and deserialized.
Note: it would have been interesting to have a type of Mapping[str, Comparable]
to simplify typing but some slices can have nested
values (example)
Returns
A mapping representing a slice
319class StreamPartitionGenerator(PartitionGenerator): 320 """ 321 This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices 322 323 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 324 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 325 """ 326 327 def __init__( 328 self, 329 stream: Stream, 330 message_repository: MessageRepository, 331 sync_mode: SyncMode, 332 cursor_field: Optional[List[str]], 333 state: Optional[MutableMapping[str, Any]], 334 ): 335 """ 336 :param stream: The stream to delegate to 337 :param message_repository: The message repository to use to emit non-record messages 338 """ 339 self.message_repository = message_repository 340 self._stream = stream 341 self._sync_mode = sync_mode 342 self._cursor_field = cursor_field 343 self._state = state 344 345 def generate(self) -> Iterable[Partition]: 346 for s in self._stream.stream_slices( 347 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 348 ): 349 yield StreamPartition( 350 self._stream, 351 copy.deepcopy(s), 352 self.message_repository, 353 self._sync_mode, 354 self._cursor_field, 355 self._state, 356 )
This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
327 def __init__( 328 self, 329 stream: Stream, 330 message_repository: MessageRepository, 331 sync_mode: SyncMode, 332 cursor_field: Optional[List[str]], 333 state: Optional[MutableMapping[str, Any]], 334 ): 335 """ 336 :param stream: The stream to delegate to 337 :param message_repository: The message repository to use to emit non-record messages 338 """ 339 self.message_repository = message_repository 340 self._stream = stream 341 self._sync_mode = sync_mode 342 self._cursor_field = cursor_field 343 self._state = state
Parameters
- stream: The stream to delegate to
- message_repository: The message repository to use to emit non-record messages
345 def generate(self) -> Iterable[Partition]: 346 for s in self._stream.stream_slices( 347 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 348 ): 349 yield StreamPartition( 350 self._stream, 351 copy.deepcopy(s), 352 self.message_repository, 353 self._sync_mode, 354 self._cursor_field, 355 self._state, 356 )
Generates partitions for a given sync mode.
Returns
An iterable of partitions