airbyte_cdk.test.state_builder

 1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 2
 3from typing import Any, List
 4
 5from airbyte_cdk.models import (
 6    AirbyteStateBlob,
 7    AirbyteStateMessage,
 8    AirbyteStateType,
 9    AirbyteStreamState,
10    StreamDescriptor,
11)
12
13
14class StateBuilder:
15    def __init__(self) -> None:
16        self._state: List[AirbyteStateMessage] = []
17
18    def with_stream_state(self, stream_name: str, state: Any) -> "StateBuilder":
19        self._state.append(
20            AirbyteStateMessage(
21                type=AirbyteStateType.STREAM,
22                stream=AirbyteStreamState(
23                    stream_state=state
24                    if isinstance(state, AirbyteStateBlob)
25                    else AirbyteStateBlob(state),
26                    stream_descriptor=StreamDescriptor(**{"name": stream_name}),
27                ),
28            )
29        )
30        return self
31
32    def build(self) -> List[AirbyteStateMessage]:
33        return self._state
class StateBuilder:
15class StateBuilder:
16    def __init__(self) -> None:
17        self._state: List[AirbyteStateMessage] = []
18
19    def with_stream_state(self, stream_name: str, state: Any) -> "StateBuilder":
20        self._state.append(
21            AirbyteStateMessage(
22                type=AirbyteStateType.STREAM,
23                stream=AirbyteStreamState(
24                    stream_state=state
25                    if isinstance(state, AirbyteStateBlob)
26                    else AirbyteStateBlob(state),
27                    stream_descriptor=StreamDescriptor(**{"name": stream_name}),
28                ),
29            )
30        )
31        return self
32
33    def build(self) -> List[AirbyteStateMessage]:
34        return self._state
def with_stream_state( self, stream_name: str, state: Any) -> StateBuilder:
19    def with_stream_state(self, stream_name: str, state: Any) -> "StateBuilder":
20        self._state.append(
21            AirbyteStateMessage(
22                type=AirbyteStateType.STREAM,
23                stream=AirbyteStreamState(
24                    stream_state=state
25                    if isinstance(state, AirbyteStateBlob)
26                    else AirbyteStateBlob(state),
27                    stream_descriptor=StreamDescriptor(**{"name": stream_name}),
28                ),
29            )
30        )
31        return self
def build(self) -> List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]:
33    def build(self) -> List[AirbyteStateMessage]:
34        return self._state