airbyte_cdk.test.utils.reading
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3from typing import Any, List, Mapping, Optional 4 5from airbyte_cdk import AbstractSource 6from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, SyncMode 7from airbyte_cdk.test.catalog_builder import CatalogBuilder 8from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read 9from airbyte_cdk.test.models.outcome import ExpectedOutcome 10 11 12def catalog(stream_name: str, sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: 13 """Create a catalog with a single stream.""" 14 return CatalogBuilder().with_stream(stream_name, sync_mode).build() 15 16 17def read_records( 18 source: AbstractSource, 19 config: Mapping[str, Any], 20 stream_name: str, 21 sync_mode: SyncMode, 22 state: Optional[List[AirbyteStateMessage]] = None, 23 expecting_exception: bool | None = None, # Deprecated, use expected_outcome instead. 24 *, 25 expected_outcome: ExpectedOutcome | None = None, 26) -> EntrypointOutput: 27 """Read records from a stream.""" 28 _catalog = catalog(stream_name, sync_mode) 29 return read( 30 source, 31 config, 32 _catalog, 33 state, 34 expecting_exception=expecting_exception, # Deprecated, for backward compatibility. 35 expected_outcome=expected_outcome, 36 )
def
catalog( stream_name: str, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog:
13def catalog(stream_name: str, sync_mode: SyncMode) -> ConfiguredAirbyteCatalog: 14 """Create a catalog with a single stream.""" 15 return CatalogBuilder().with_stream(stream_name, sync_mode).build()
Create a catalog with a single stream.
def
read_records( source: airbyte_cdk.AbstractSource, config: Mapping[str, Any], stream_name: str, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None, expecting_exception: bool | None = None, *, expected_outcome: airbyte_cdk.test.models.ExpectedOutcome | None = None) -> airbyte_cdk.test.entrypoint_wrapper.EntrypointOutput:
18def read_records( 19 source: AbstractSource, 20 config: Mapping[str, Any], 21 stream_name: str, 22 sync_mode: SyncMode, 23 state: Optional[List[AirbyteStateMessage]] = None, 24 expecting_exception: bool | None = None, # Deprecated, use expected_outcome instead. 25 *, 26 expected_outcome: ExpectedOutcome | None = None, 27) -> EntrypointOutput: 28 """Read records from a stream.""" 29 _catalog = catalog(stream_name, sync_mode) 30 return read( 31 source, 32 config, 33 _catalog, 34 state, 35 expecting_exception=expecting_exception, # Deprecated, for backward compatibility. 36 expected_outcome=expected_outcome, 37 )
Read records from a stream.