airbyte_cdk.sources.streams.concurrent.adapters
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import json 7import logging 8from functools import lru_cache 9from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union 10 11from typing_extensions import deprecated 12 13from airbyte_cdk.models import ( 14 AirbyteLogMessage, 15 AirbyteMessage, 16 AirbyteStream, 17 ConfiguredAirbyteStream, 18 Level, 19 SyncMode, 20 Type, 21) 22from airbyte_cdk.sources import AbstractSource, Source 23from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 24from airbyte_cdk.sources.message import MessageRepository 25from airbyte_cdk.sources.source import ExperimentalClassWarning 26from airbyte_cdk.sources.streams import Stream 27from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy 28from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 29from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( 30 AbstractAvailabilityStrategy, 31 AlwaysAvailableAvailabilityStrategy, 32) 33from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor 34from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 35from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage 36from airbyte_cdk.sources.streams.concurrent.helpers import ( 37 get_cursor_field_from_stream, 38 get_primary_key_from_stream, 39) 40from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 41from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 42from airbyte_cdk.sources.streams.core import StreamData 43from airbyte_cdk.sources.types import Record 44from airbyte_cdk.sources.utils.schema_helpers import InternalConfig 45from airbyte_cdk.sources.utils.slice_logger import SliceLogger 46from airbyte_cdk.utils.slice_hasher import SliceHasher 47 48""" 49This module contains adapters to help enabling concurrency on Stream objects without needing to migrate to AbstractStream 50""" 51 52 53@deprecated( 54 "This class is experimental. Use at your own risk.", 55 category=ExperimentalClassWarning, 56) 57class StreamFacade(AbstractStreamFacade[DefaultStream], Stream): 58 """ 59 The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream. 60 61 All methods either delegate to the wrapped AbstractStream or provide a default implementation. 62 The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported. 63 """ 64 65 @classmethod 66 def create_from_stream( 67 cls, 68 stream: Stream, 69 source: AbstractSource, 70 logger: logging.Logger, 71 state: Optional[MutableMapping[str, Any]], 72 cursor: Cursor, 73 ) -> Stream: 74 """ 75 Create a ConcurrentStream from a Stream object. 76 :param source: The source 77 :param stream: The stream 78 :param max_workers: The maximum number of worker thread to use 79 :return: 80 """ 81 pk = get_primary_key_from_stream(stream.primary_key) 82 cursor_field = get_cursor_field_from_stream(stream) 83 84 if not source.message_repository: 85 raise ValueError( 86 "A message repository is required to emit non-record messages. Please set the message repository on the source." 87 ) 88 89 message_repository = source.message_repository 90 return StreamFacade( 91 DefaultStream( 92 partition_generator=StreamPartitionGenerator( 93 stream, 94 message_repository, 95 SyncMode.full_refresh 96 if isinstance(cursor, FinalStateCursor) 97 else SyncMode.incremental, 98 [cursor_field] if cursor_field is not None else None, 99 state, 100 ), 101 name=stream.name, 102 namespace=stream.namespace, 103 json_schema=stream.get_json_schema(), 104 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 105 primary_key=pk, 106 cursor_field=cursor_field, 107 logger=logger, 108 cursor=cursor, 109 ), 110 stream, 111 cursor, 112 slice_logger=source._slice_logger, 113 logger=logger, 114 ) 115 116 @property 117 def state(self) -> MutableMapping[str, Any]: 118 raise NotImplementedError( 119 "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte" 120 ) 121 122 @state.setter 123 def state(self, value: Mapping[str, Any]) -> None: 124 if "state" in dir(self._legacy_stream): 125 self._legacy_stream.state = value # type: ignore # validating `state` is attribute of stream using `if` above 126 127 def __init__( 128 self, 129 stream: DefaultStream, 130 legacy_stream: Stream, 131 cursor: Cursor, 132 slice_logger: SliceLogger, 133 logger: logging.Logger, 134 ): 135 """ 136 :param stream: The underlying AbstractStream 137 """ 138 self._abstract_stream = stream 139 self._legacy_stream = legacy_stream 140 self._cursor = cursor 141 self._slice_logger = slice_logger 142 self._logger = logger 143 144 def read( 145 self, 146 configured_stream: ConfiguredAirbyteStream, 147 logger: logging.Logger, 148 slice_logger: SliceLogger, 149 stream_state: MutableMapping[str, Any], 150 state_manager: ConnectorStateManager, 151 internal_config: InternalConfig, 152 ) -> Iterable[StreamData]: 153 yield from self._read_records() 154 155 def read_records( 156 self, 157 sync_mode: SyncMode, 158 cursor_field: Optional[List[str]] = None, 159 stream_slice: Optional[Mapping[str, Any]] = None, 160 stream_state: Optional[Mapping[str, Any]] = None, 161 ) -> Iterable[StreamData]: 162 try: 163 yield from self._read_records() 164 except Exception as exc: 165 if hasattr(self._cursor, "state"): 166 state = str(self._cursor.state) 167 else: 168 # This shouldn't happen if the ConcurrentCursor was used 169 state = "unknown; no state attribute was available on the cursor" 170 yield AirbyteMessage( 171 type=Type.LOG, 172 log=AirbyteLogMessage( 173 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 174 ), 175 ) 176 raise exc 177 178 def _read_records(self) -> Iterable[StreamData]: 179 for partition in self._abstract_stream.generate_partitions(): 180 if self._slice_logger.should_log_slice_message(self._logger): 181 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 182 for record in partition.read(): 183 yield record.data 184 185 @property 186 def name(self) -> str: 187 return self._abstract_stream.name 188 189 @property 190 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 191 # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface 192 return self.as_airbyte_stream().source_defined_primary_key # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]] 193 194 @property 195 def cursor_field(self) -> Union[str, List[str]]: 196 if self._abstract_stream.cursor_field is None: 197 return [] 198 else: 199 return self._abstract_stream.cursor_field 200 201 @property 202 def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor 203 return self._cursor 204 205 @lru_cache(maxsize=None) 206 def get_json_schema(self) -> Mapping[str, Any]: 207 return self._abstract_stream.get_json_schema() 208 209 @property 210 def supports_incremental(self) -> bool: 211 return self._legacy_stream.supports_incremental 212 213 def check_availability( 214 self, logger: logging.Logger, source: Optional["Source"] = None 215 ) -> Tuple[bool, Optional[str]]: 216 """ 217 Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters 218 :param logger: (ignored) 219 :param source: (ignored) 220 :return: 221 """ 222 availability = self._abstract_stream.check_availability() 223 return availability.is_available(), availability.message() 224 225 def as_airbyte_stream(self) -> AirbyteStream: 226 return self._abstract_stream.as_airbyte_stream() 227 228 def log_stream_sync_configuration(self) -> None: 229 self._abstract_stream.log_stream_sync_configuration() 230 231 def get_underlying_stream(self) -> DefaultStream: 232 return self._abstract_stream 233 234 235class SliceEncoder(json.JSONEncoder): 236 def default(self, obj: Any) -> Any: 237 if hasattr(obj, "__json_serializable__"): 238 return obj.__json_serializable__() 239 240 # Let the base class default method raise the TypeError 241 return super().default(obj) 242 243 244class StreamPartition(Partition): 245 """ 246 This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface 247 248 StreamPartitions are instantiated from a Stream and a stream_slice. 249 250 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 251 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 252 """ 253 254 def __init__( 255 self, 256 stream: Stream, 257 _slice: Optional[Mapping[str, Any]], 258 message_repository: MessageRepository, 259 sync_mode: SyncMode, 260 cursor_field: Optional[List[str]], 261 state: Optional[MutableMapping[str, Any]], 262 ): 263 """ 264 :param stream: The stream to delegate to 265 :param _slice: The partition's stream_slice 266 :param message_repository: The message repository to use to emit non-record messages 267 """ 268 self._stream = stream 269 self._slice = _slice 270 self._message_repository = message_repository 271 self._sync_mode = sync_mode 272 self._cursor_field = cursor_field 273 self._state = state 274 self._hash = SliceHasher.hash(self._stream.name, self._slice) 275 276 def read(self) -> Iterable[Record]: 277 """ 278 Read messages from the stream. 279 If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. 280 Otherwise, the message will be emitted on the message repository. 281 """ 282 try: 283 # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice 284 # by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to: 285 # * fetch_next_page 286 # * parse_response 287 # Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do 288 # `if not stream_state` to know if it calls the Event stream or not 289 for record_data in self._stream.read_records( 290 cursor_field=self._cursor_field, 291 sync_mode=SyncMode.full_refresh, 292 stream_slice=copy.deepcopy(self._slice), 293 stream_state=self._state, 294 ): 295 # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade 296 # For now, file-based connectors have their own stream facade 297 if isinstance(record_data, Mapping): 298 data_to_return = dict(record_data) 299 self._stream.transformer.transform( 300 data_to_return, self._stream.get_json_schema() 301 ) 302 yield Record( 303 data=data_to_return, 304 stream_name=self.stream_name(), 305 associated_slice=self._slice, # type: ignore [arg-type] 306 ) 307 elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: 308 yield Record( 309 data=record_data.record.data or {}, 310 stream_name=self.stream_name(), 311 associated_slice=self._slice, # type: ignore [arg-type] 312 ) 313 else: 314 self._message_repository.emit_message(record_data) 315 except Exception as e: 316 display_message = self._stream.get_error_display_message(e) 317 if display_message: 318 raise ExceptionWithDisplayMessage(display_message) from e 319 else: 320 raise e 321 322 def to_slice(self) -> Optional[Mapping[str, Any]]: 323 return self._slice 324 325 def __hash__(self) -> int: 326 return self._hash 327 328 def stream_name(self) -> str: 329 return self._stream.name 330 331 def __repr__(self) -> str: 332 return f"StreamPartition({self._stream.name}, {self._slice})" 333 334 335class StreamPartitionGenerator(PartitionGenerator): 336 """ 337 This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices 338 339 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 340 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 341 """ 342 343 def __init__( 344 self, 345 stream: Stream, 346 message_repository: MessageRepository, 347 sync_mode: SyncMode, 348 cursor_field: Optional[List[str]], 349 state: Optional[MutableMapping[str, Any]], 350 ): 351 """ 352 :param stream: The stream to delegate to 353 :param message_repository: The message repository to use to emit non-record messages 354 """ 355 self.message_repository = message_repository 356 self._stream = stream 357 self._sync_mode = sync_mode 358 self._cursor_field = cursor_field 359 self._state = state 360 361 def generate(self) -> Iterable[Partition]: 362 for s in self._stream.stream_slices( 363 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 364 ): 365 yield StreamPartition( 366 self._stream, 367 copy.deepcopy(s), 368 self.message_repository, 369 self._sync_mode, 370 self._cursor_field, 371 self._state, 372 ) 373 374 375@deprecated( 376 "Availability strategy has been soft deprecated. Do not use. Class is subject to removal", 377 category=ExperimentalClassWarning, 378) 379class AvailabilityStrategyFacade(AvailabilityStrategy): 380 def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy): 381 self._abstract_availability_strategy = abstract_availability_strategy 382 383 def check_availability( 384 self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None 385 ) -> Tuple[bool, Optional[str]]: 386 """ 387 Checks stream availability. 388 389 Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy. 390 391 :param stream: (unused) 392 :param logger: logger object to use 393 :param source: (unused) 394 :return: A tuple of (boolean, str). If boolean is true, then the stream 395 """ 396 stream_availability = self._abstract_availability_strategy.check_availability(logger) 397 return stream_availability.is_available(), stream_availability.message()
54@deprecated( 55 "This class is experimental. Use at your own risk.", 56 category=ExperimentalClassWarning, 57) 58class StreamFacade(AbstractStreamFacade[DefaultStream], Stream): 59 """ 60 The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream. 61 62 All methods either delegate to the wrapped AbstractStream or provide a default implementation. 63 The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported. 64 """ 65 66 @classmethod 67 def create_from_stream( 68 cls, 69 stream: Stream, 70 source: AbstractSource, 71 logger: logging.Logger, 72 state: Optional[MutableMapping[str, Any]], 73 cursor: Cursor, 74 ) -> Stream: 75 """ 76 Create a ConcurrentStream from a Stream object. 77 :param source: The source 78 :param stream: The stream 79 :param max_workers: The maximum number of worker thread to use 80 :return: 81 """ 82 pk = get_primary_key_from_stream(stream.primary_key) 83 cursor_field = get_cursor_field_from_stream(stream) 84 85 if not source.message_repository: 86 raise ValueError( 87 "A message repository is required to emit non-record messages. Please set the message repository on the source." 88 ) 89 90 message_repository = source.message_repository 91 return StreamFacade( 92 DefaultStream( 93 partition_generator=StreamPartitionGenerator( 94 stream, 95 message_repository, 96 SyncMode.full_refresh 97 if isinstance(cursor, FinalStateCursor) 98 else SyncMode.incremental, 99 [cursor_field] if cursor_field is not None else None, 100 state, 101 ), 102 name=stream.name, 103 namespace=stream.namespace, 104 json_schema=stream.get_json_schema(), 105 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 106 primary_key=pk, 107 cursor_field=cursor_field, 108 logger=logger, 109 cursor=cursor, 110 ), 111 stream, 112 cursor, 113 slice_logger=source._slice_logger, 114 logger=logger, 115 ) 116 117 @property 118 def state(self) -> MutableMapping[str, Any]: 119 raise NotImplementedError( 120 "This should not be called as part of the Concurrent CDK code. Please report the problem to Airbyte" 121 ) 122 123 @state.setter 124 def state(self, value: Mapping[str, Any]) -> None: 125 if "state" in dir(self._legacy_stream): 126 self._legacy_stream.state = value # type: ignore # validating `state` is attribute of stream using `if` above 127 128 def __init__( 129 self, 130 stream: DefaultStream, 131 legacy_stream: Stream, 132 cursor: Cursor, 133 slice_logger: SliceLogger, 134 logger: logging.Logger, 135 ): 136 """ 137 :param stream: The underlying AbstractStream 138 """ 139 self._abstract_stream = stream 140 self._legacy_stream = legacy_stream 141 self._cursor = cursor 142 self._slice_logger = slice_logger 143 self._logger = logger 144 145 def read( 146 self, 147 configured_stream: ConfiguredAirbyteStream, 148 logger: logging.Logger, 149 slice_logger: SliceLogger, 150 stream_state: MutableMapping[str, Any], 151 state_manager: ConnectorStateManager, 152 internal_config: InternalConfig, 153 ) -> Iterable[StreamData]: 154 yield from self._read_records() 155 156 def read_records( 157 self, 158 sync_mode: SyncMode, 159 cursor_field: Optional[List[str]] = None, 160 stream_slice: Optional[Mapping[str, Any]] = None, 161 stream_state: Optional[Mapping[str, Any]] = None, 162 ) -> Iterable[StreamData]: 163 try: 164 yield from self._read_records() 165 except Exception as exc: 166 if hasattr(self._cursor, "state"): 167 state = str(self._cursor.state) 168 else: 169 # This shouldn't happen if the ConcurrentCursor was used 170 state = "unknown; no state attribute was available on the cursor" 171 yield AirbyteMessage( 172 type=Type.LOG, 173 log=AirbyteLogMessage( 174 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 175 ), 176 ) 177 raise exc 178 179 def _read_records(self) -> Iterable[StreamData]: 180 for partition in self._abstract_stream.generate_partitions(): 181 if self._slice_logger.should_log_slice_message(self._logger): 182 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 183 for record in partition.read(): 184 yield record.data 185 186 @property 187 def name(self) -> str: 188 return self._abstract_stream.name 189 190 @property 191 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 192 # This method is not expected to be called directly. It is only implemented for backward compatibility with the old interface 193 return self.as_airbyte_stream().source_defined_primary_key # type: ignore # source_defined_primary_key is known to be an Optional[List[List[str]]] 194 195 @property 196 def cursor_field(self) -> Union[str, List[str]]: 197 if self._abstract_stream.cursor_field is None: 198 return [] 199 else: 200 return self._abstract_stream.cursor_field 201 202 @property 203 def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor 204 return self._cursor 205 206 @lru_cache(maxsize=None) 207 def get_json_schema(self) -> Mapping[str, Any]: 208 return self._abstract_stream.get_json_schema() 209 210 @property 211 def supports_incremental(self) -> bool: 212 return self._legacy_stream.supports_incremental 213 214 def check_availability( 215 self, logger: logging.Logger, source: Optional["Source"] = None 216 ) -> Tuple[bool, Optional[str]]: 217 """ 218 Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters 219 :param logger: (ignored) 220 :param source: (ignored) 221 :return: 222 """ 223 availability = self._abstract_stream.check_availability() 224 return availability.is_available(), availability.message() 225 226 def as_airbyte_stream(self) -> AirbyteStream: 227 return self._abstract_stream.as_airbyte_stream() 228 229 def log_stream_sync_configuration(self) -> None: 230 self._abstract_stream.log_stream_sync_configuration() 231 232 def get_underlying_stream(self) -> DefaultStream: 233 return self._abstract_stream
The StreamFacade is a Stream that wraps an AbstractStream and exposes it as a Stream.
All methods either delegate to the wrapped AbstractStream or provide a default implementation. The default implementations define restrictions imposed on Streams migrated to the new interface. For instance, only source-defined cursors are supported.
128 def __init__( 129 self, 130 stream: DefaultStream, 131 legacy_stream: Stream, 132 cursor: Cursor, 133 slice_logger: SliceLogger, 134 logger: logging.Logger, 135 ): 136 """ 137 :param stream: The underlying AbstractStream 138 """ 139 self._abstract_stream = stream 140 self._legacy_stream = legacy_stream 141 self._cursor = cursor 142 self._slice_logger = slice_logger 143 self._logger = logger
Parameters
- stream: The underlying AbstractStream
66 @classmethod 67 def create_from_stream( 68 cls, 69 stream: Stream, 70 source: AbstractSource, 71 logger: logging.Logger, 72 state: Optional[MutableMapping[str, Any]], 73 cursor: Cursor, 74 ) -> Stream: 75 """ 76 Create a ConcurrentStream from a Stream object. 77 :param source: The source 78 :param stream: The stream 79 :param max_workers: The maximum number of worker thread to use 80 :return: 81 """ 82 pk = get_primary_key_from_stream(stream.primary_key) 83 cursor_field = get_cursor_field_from_stream(stream) 84 85 if not source.message_repository: 86 raise ValueError( 87 "A message repository is required to emit non-record messages. Please set the message repository on the source." 88 ) 89 90 message_repository = source.message_repository 91 return StreamFacade( 92 DefaultStream( 93 partition_generator=StreamPartitionGenerator( 94 stream, 95 message_repository, 96 SyncMode.full_refresh 97 if isinstance(cursor, FinalStateCursor) 98 else SyncMode.incremental, 99 [cursor_field] if cursor_field is not None else None, 100 state, 101 ), 102 name=stream.name, 103 namespace=stream.namespace, 104 json_schema=stream.get_json_schema(), 105 availability_strategy=AlwaysAvailableAvailabilityStrategy(), 106 primary_key=pk, 107 cursor_field=cursor_field, 108 logger=logger, 109 cursor=cursor, 110 ), 111 stream, 112 cursor, 113 slice_logger=source._slice_logger, 114 logger=logger, 115 )
Create a ConcurrentStream from a Stream object.
Parameters
- source: The source
- stream: The stream
- max_workers: The maximum number of worker thread to use
Returns
214 def check_availability( 215 self, logger: logging.Logger, source: Optional["Source"] = None 216 ) -> Tuple[bool, Optional[str]]: 217 """ 218 Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters 219 :param logger: (ignored) 220 :param source: (ignored) 221 :return: 222 """ 223 availability = self._abstract_stream.check_availability() 224 return availability.is_available(), availability.message()
Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
Parameters
- logger: (ignored)
- source: (ignored)
Returns
Return the underlying stream facade object.
Inherited Members
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- name
- get_error_display_message
- read
- read_only_records
- read_records
- get_json_schema
- as_airbyte_stream
- supports_incremental
- is_resumable
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- primary_key
- stream_slices
- state_checkpoint_interval
- get_updated_state
- get_cursor
- log_stream_sync_configuration
- configured_json_schema
236class SliceEncoder(json.JSONEncoder): 237 def default(self, obj: Any) -> Any: 238 if hasattr(obj, "__json_serializable__"): 239 return obj.__json_serializable__() 240 241 # Let the base class default method raise the TypeError 242 return super().default(obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
237 def default(self, obj: Any) -> Any: 238 if hasattr(obj, "__json_serializable__"): 239 return obj.__json_serializable__() 240 241 # Let the base class default method raise the TypeError 242 return super().default(obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder.default(self, o)
245class StreamPartition(Partition): 246 """ 247 This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface 248 249 StreamPartitions are instantiated from a Stream and a stream_slice. 250 251 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 252 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 253 """ 254 255 def __init__( 256 self, 257 stream: Stream, 258 _slice: Optional[Mapping[str, Any]], 259 message_repository: MessageRepository, 260 sync_mode: SyncMode, 261 cursor_field: Optional[List[str]], 262 state: Optional[MutableMapping[str, Any]], 263 ): 264 """ 265 :param stream: The stream to delegate to 266 :param _slice: The partition's stream_slice 267 :param message_repository: The message repository to use to emit non-record messages 268 """ 269 self._stream = stream 270 self._slice = _slice 271 self._message_repository = message_repository 272 self._sync_mode = sync_mode 273 self._cursor_field = cursor_field 274 self._state = state 275 self._hash = SliceHasher.hash(self._stream.name, self._slice) 276 277 def read(self) -> Iterable[Record]: 278 """ 279 Read messages from the stream. 280 If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. 281 Otherwise, the message will be emitted on the message repository. 282 """ 283 try: 284 # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice 285 # by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to: 286 # * fetch_next_page 287 # * parse_response 288 # Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do 289 # `if not stream_state` to know if it calls the Event stream or not 290 for record_data in self._stream.read_records( 291 cursor_field=self._cursor_field, 292 sync_mode=SyncMode.full_refresh, 293 stream_slice=copy.deepcopy(self._slice), 294 stream_state=self._state, 295 ): 296 # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade 297 # For now, file-based connectors have their own stream facade 298 if isinstance(record_data, Mapping): 299 data_to_return = dict(record_data) 300 self._stream.transformer.transform( 301 data_to_return, self._stream.get_json_schema() 302 ) 303 yield Record( 304 data=data_to_return, 305 stream_name=self.stream_name(), 306 associated_slice=self._slice, # type: ignore [arg-type] 307 ) 308 elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: 309 yield Record( 310 data=record_data.record.data or {}, 311 stream_name=self.stream_name(), 312 associated_slice=self._slice, # type: ignore [arg-type] 313 ) 314 else: 315 self._message_repository.emit_message(record_data) 316 except Exception as e: 317 display_message = self._stream.get_error_display_message(e) 318 if display_message: 319 raise ExceptionWithDisplayMessage(display_message) from e 320 else: 321 raise e 322 323 def to_slice(self) -> Optional[Mapping[str, Any]]: 324 return self._slice 325 326 def __hash__(self) -> int: 327 return self._hash 328 329 def stream_name(self) -> str: 330 return self._stream.name 331 332 def __repr__(self) -> str: 333 return f"StreamPartition({self._stream.name}, {self._slice})"
This class acts as an adapter between the new Partition interface and the Stream's stream_slice interface
StreamPartitions are instantiated from a Stream and a stream_slice.
This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
255 def __init__( 256 self, 257 stream: Stream, 258 _slice: Optional[Mapping[str, Any]], 259 message_repository: MessageRepository, 260 sync_mode: SyncMode, 261 cursor_field: Optional[List[str]], 262 state: Optional[MutableMapping[str, Any]], 263 ): 264 """ 265 :param stream: The stream to delegate to 266 :param _slice: The partition's stream_slice 267 :param message_repository: The message repository to use to emit non-record messages 268 """ 269 self._stream = stream 270 self._slice = _slice 271 self._message_repository = message_repository 272 self._sync_mode = sync_mode 273 self._cursor_field = cursor_field 274 self._state = state 275 self._hash = SliceHasher.hash(self._stream.name, self._slice)
Parameters
- stream: The stream to delegate to
- _slice: The partition's stream_slice
- message_repository: The message repository to use to emit non-record messages
277 def read(self) -> Iterable[Record]: 278 """ 279 Read messages from the stream. 280 If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. 281 Otherwise, the message will be emitted on the message repository. 282 """ 283 try: 284 # using `stream_state=self._state` have a very different behavior than the current one as today the state is updated slice 285 # by slice incrementally. We don't have this guarantee with Concurrent CDK. For HttpStream, `stream_state` is passed to: 286 # * fetch_next_page 287 # * parse_response 288 # Both are not used for Stripe so we should be good for the first iteration of Concurrent CDK. However, Stripe still do 289 # `if not stream_state` to know if it calls the Event stream or not 290 for record_data in self._stream.read_records( 291 cursor_field=self._cursor_field, 292 sync_mode=SyncMode.full_refresh, 293 stream_slice=copy.deepcopy(self._slice), 294 stream_state=self._state, 295 ): 296 # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade 297 # For now, file-based connectors have their own stream facade 298 if isinstance(record_data, Mapping): 299 data_to_return = dict(record_data) 300 self._stream.transformer.transform( 301 data_to_return, self._stream.get_json_schema() 302 ) 303 yield Record( 304 data=data_to_return, 305 stream_name=self.stream_name(), 306 associated_slice=self._slice, # type: ignore [arg-type] 307 ) 308 elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: 309 yield Record( 310 data=record_data.record.data or {}, 311 stream_name=self.stream_name(), 312 associated_slice=self._slice, # type: ignore [arg-type] 313 ) 314 else: 315 self._message_repository.emit_message(record_data) 316 except Exception as e: 317 display_message = self._stream.get_error_display_message(e) 318 if display_message: 319 raise ExceptionWithDisplayMessage(display_message) from e 320 else: 321 raise e
Read messages from the stream. If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. Otherwise, the message will be emitted on the message repository.
Converts the partition to a slice that can be serialized and deserialized.
Note: it would have been interesting to have a type of Mapping[str, Comparable]
to simplify typing but some slices can have nested
values (example)
Returns
A mapping representing a slice
336class StreamPartitionGenerator(PartitionGenerator): 337 """ 338 This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices 339 340 This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. 341 In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time. 342 """ 343 344 def __init__( 345 self, 346 stream: Stream, 347 message_repository: MessageRepository, 348 sync_mode: SyncMode, 349 cursor_field: Optional[List[str]], 350 state: Optional[MutableMapping[str, Any]], 351 ): 352 """ 353 :param stream: The stream to delegate to 354 :param message_repository: The message repository to use to emit non-record messages 355 """ 356 self.message_repository = message_repository 357 self._stream = stream 358 self._sync_mode = sync_mode 359 self._cursor_field = cursor_field 360 self._state = state 361 362 def generate(self) -> Iterable[Partition]: 363 for s in self._stream.stream_slices( 364 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 365 ): 366 yield StreamPartition( 367 self._stream, 368 copy.deepcopy(s), 369 self.message_repository, 370 self._sync_mode, 371 self._cursor_field, 372 self._state, 373 )
This class acts as an adapter between the new PartitionGenerator and Stream.stream_slices
This class can be used to help enable concurrency on existing connectors without having to rewrite everything as AbstractStream. In the long-run, it would be preferable to update the connectors, but we don't have the tooling or need to justify the effort at this time.
344 def __init__( 345 self, 346 stream: Stream, 347 message_repository: MessageRepository, 348 sync_mode: SyncMode, 349 cursor_field: Optional[List[str]], 350 state: Optional[MutableMapping[str, Any]], 351 ): 352 """ 353 :param stream: The stream to delegate to 354 :param message_repository: The message repository to use to emit non-record messages 355 """ 356 self.message_repository = message_repository 357 self._stream = stream 358 self._sync_mode = sync_mode 359 self._cursor_field = cursor_field 360 self._state = state
Parameters
- stream: The stream to delegate to
- message_repository: The message repository to use to emit non-record messages
362 def generate(self) -> Iterable[Partition]: 363 for s in self._stream.stream_slices( 364 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 365 ): 366 yield StreamPartition( 367 self._stream, 368 copy.deepcopy(s), 369 self.message_repository, 370 self._sync_mode, 371 self._cursor_field, 372 self._state, 373 )
Generates partitions for a given sync mode.
Returns
An iterable of partitions
376@deprecated( 377 "Availability strategy has been soft deprecated. Do not use. Class is subject to removal", 378 category=ExperimentalClassWarning, 379) 380class AvailabilityStrategyFacade(AvailabilityStrategy): 381 def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy): 382 self._abstract_availability_strategy = abstract_availability_strategy 383 384 def check_availability( 385 self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None 386 ) -> Tuple[bool, Optional[str]]: 387 """ 388 Checks stream availability. 389 390 Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy. 391 392 :param stream: (unused) 393 :param logger: logger object to use 394 :param source: (unused) 395 :return: A tuple of (boolean, str). If boolean is true, then the stream 396 """ 397 stream_availability = self._abstract_availability_strategy.check_availability(logger) 398 return stream_availability.is_available(), stream_availability.message()
Abstract base class for checking stream availability.
384 def check_availability( 385 self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None 386 ) -> Tuple[bool, Optional[str]]: 387 """ 388 Checks stream availability. 389 390 Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy. 391 392 :param stream: (unused) 393 :param logger: logger object to use 394 :param source: (unused) 395 :return: A tuple of (boolean, str). If boolean is true, then the stream 396 """ 397 stream_availability = self._abstract_availability_strategy.check_availability(logger) 398 return stream_availability.is_available(), stream_availability.message()
Checks stream availability.
Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.
Parameters
- stream: (unused)
- logger: logger object to use
- source: (unused)
Returns
A tuple of (boolean, str). If boolean is true, then the stream