airbyte_cdk.test.utils.reading

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2
 3from typing import Any, List, Mapping, Optional
 4
 5from airbyte_cdk import AbstractSource
 6from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, SyncMode
 7from airbyte_cdk.test.catalog_builder import CatalogBuilder
 8from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
 9from airbyte_cdk.test.models.outcome import ExpectedOutcome
10
11
12def catalog(stream_name: str, sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
13    """Create a catalog with a single stream."""
14    return CatalogBuilder().with_stream(stream_name, sync_mode).build()
15
16
17def read_records(
18    source: AbstractSource,
19    config: Mapping[str, Any],
20    stream_name: str,
21    sync_mode: SyncMode,
22    state: Optional[List[AirbyteStateMessage]] = None,
23    expecting_exception: bool | None = None,  # Deprecated, use expected_outcome instead.
24    *,
25    expected_outcome: ExpectedOutcome | None = None,
26) -> EntrypointOutput:
27    """Read records from a stream."""
28    _catalog = catalog(stream_name, sync_mode)
29    return read(
30        source,
31        config,
32        _catalog,
33        state,
34        expecting_exception=expecting_exception,  # Deprecated, for backward compatibility.
35        expected_outcome=expected_outcome,
36    )
def catalog( stream_name: str, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog:
13def catalog(stream_name: str, sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
14    """Create a catalog with a single stream."""
15    return CatalogBuilder().with_stream(stream_name, sync_mode).build()

Create a catalog with a single stream.

def read_records( source: airbyte_cdk.AbstractSource, config: Mapping[str, Any], stream_name: str, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None, expecting_exception: bool | None = None, *, expected_outcome: airbyte_cdk.test.models.ExpectedOutcome | None = None) -> airbyte_cdk.test.entrypoint_wrapper.EntrypointOutput:
18def read_records(
19    source: AbstractSource,
20    config: Mapping[str, Any],
21    stream_name: str,
22    sync_mode: SyncMode,
23    state: Optional[List[AirbyteStateMessage]] = None,
24    expecting_exception: bool | None = None,  # Deprecated, use expected_outcome instead.
25    *,
26    expected_outcome: ExpectedOutcome | None = None,
27) -> EntrypointOutput:
28    """Read records from a stream."""
29    _catalog = catalog(stream_name, sync_mode)
30    return read(
31        source,
32        config,
33        _catalog,
34        state,
35        expecting_exception=expecting_exception,  # Deprecated, for backward compatibility.
36        expected_outcome=expected_outcome,
37    )

Read records from a stream.