airbyte_cdk.sources.utils.record_helper

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4import time
 5from collections.abc import Mapping as ABCMapping
 6from typing import Any, Mapping, Optional
 7
 8from airbyte_cdk.models import (
 9    AirbyteLogMessage,
10    AirbyteMessage,
11    AirbyteRecordMessage,
12    AirbyteTraceMessage,
13)
14from airbyte_cdk.models import Type as MessageType
15from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
16from airbyte_cdk.sources.streams.core import StreamData
17from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
18
19
20def stream_data_to_airbyte_message(
21    stream_name: str,
22    data_or_message: StreamData,
23    transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform),
24    schema: Optional[Mapping[str, Any]] = None,
25    is_file_transfer_message: bool = False,
26) -> AirbyteMessage:
27    if schema is None:
28        schema = {}
29
30    match data_or_message:
31        case ABCMapping():
32            data = dict(data_or_message)
33            now_millis = time.time_ns() // 1_000_000
34            # Transform object fields according to config. Most likely you will
35            # need it to normalize values against json schema. By default no action
36            # taken unless configured. See
37            # docs/connector-development/cdk-python/schemas.md for details.
38            transformer.transform(data, schema)
39            if is_file_transfer_message:
40                message = AirbyteFileTransferRecordMessage(
41                    stream=stream_name, file=data, emitted_at=now_millis, data={}
42                )
43            else:
44                message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
45            return AirbyteMessage(type=MessageType.RECORD, record=message)
46        case AirbyteTraceMessage():
47            return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message)
48        case AirbyteLogMessage():
49            return AirbyteMessage(type=MessageType.LOG, log=data_or_message)
50        case _:
51            raise ValueError(
52                f"Unexpected type for data_or_message: {type(data_or_message)}: {data_or_message}"
53            )
def stream_data_to_airbyte_message( stream_name: str, data_or_message: Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage], transformer: airbyte_cdk.TypeTransformer = <airbyte_cdk.TypeTransformer object>, schema: Optional[Mapping[str, Any]] = None, is_file_transfer_message: bool = False) -> airbyte_cdk.AirbyteMessage:
21def stream_data_to_airbyte_message(
22    stream_name: str,
23    data_or_message: StreamData,
24    transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform),
25    schema: Optional[Mapping[str, Any]] = None,
26    is_file_transfer_message: bool = False,
27) -> AirbyteMessage:
28    if schema is None:
29        schema = {}
30
31    match data_or_message:
32        case ABCMapping():
33            data = dict(data_or_message)
34            now_millis = time.time_ns() // 1_000_000
35            # Transform object fields according to config. Most likely you will
36            # need it to normalize values against json schema. By default no action
37            # taken unless configured. See
38            # docs/connector-development/cdk-python/schemas.md for details.
39            transformer.transform(data, schema)
40            if is_file_transfer_message:
41                message = AirbyteFileTransferRecordMessage(
42                    stream=stream_name, file=data, emitted_at=now_millis, data={}
43                )
44            else:
45                message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
46            return AirbyteMessage(type=MessageType.RECORD, record=message)
47        case AirbyteTraceMessage():
48            return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message)
49        case AirbyteLogMessage():
50            return AirbyteMessage(type=MessageType.LOG, log=data_or_message)
51        case _:
52            raise ValueError(
53                f"Unexpected type for data_or_message: {type(data_or_message)}: {data_or_message}"
54            )