airbyte_cdk.test.state_builder
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2 3from typing import Any, List 4 5from airbyte_cdk.models import ( 6 AirbyteStateBlob, 7 AirbyteStateMessage, 8 AirbyteStateType, 9 AirbyteStreamState, 10 StreamDescriptor, 11) 12 13 14class StateBuilder: 15 def __init__(self) -> None: 16 self._state: List[AirbyteStateMessage] = [] 17 18 def with_stream_state(self, stream_name: str, state: Any) -> "StateBuilder": 19 self._state.append( 20 AirbyteStateMessage( 21 type=AirbyteStateType.STREAM, 22 stream=AirbyteStreamState( 23 stream_state=state 24 if isinstance(state, AirbyteStateBlob) 25 else AirbyteStateBlob(state), 26 stream_descriptor=StreamDescriptor(**{"name": stream_name}), 27 ), 28 ) 29 ) 30 return self 31 32 def build(self) -> List[AirbyteStateMessage]: 33 return self._state
class
StateBuilder:
15class StateBuilder: 16 def __init__(self) -> None: 17 self._state: List[AirbyteStateMessage] = [] 18 19 def with_stream_state(self, stream_name: str, state: Any) -> "StateBuilder": 20 self._state.append( 21 AirbyteStateMessage( 22 type=AirbyteStateType.STREAM, 23 stream=AirbyteStreamState( 24 stream_state=state 25 if isinstance(state, AirbyteStateBlob) 26 else AirbyteStateBlob(state), 27 stream_descriptor=StreamDescriptor(**{"name": stream_name}), 28 ), 29 ) 30 ) 31 return self 32 33 def build(self) -> List[AirbyteStateMessage]: 34 return self._state
19 def with_stream_state(self, stream_name: str, state: Any) -> "StateBuilder": 20 self._state.append( 21 AirbyteStateMessage( 22 type=AirbyteStateType.STREAM, 23 stream=AirbyteStreamState( 24 stream_state=state 25 if isinstance(state, AirbyteStateBlob) 26 else AirbyteStateBlob(state), 27 stream_descriptor=StreamDescriptor(**{"name": stream_name}), 28 ), 29 ) 30 ) 31 return self