airbyte_cdk.sources.utils.record_helper
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import time 5from collections.abc import Mapping as ABCMapping 6from typing import Any, Mapping, Optional 7 8from airbyte_cdk.models import ( 9 AirbyteLogMessage, 10 AirbyteMessage, 11 AirbyteRecordMessage, 12 AirbyteRecordMessageFileReference, 13 AirbyteTraceMessage, 14) 15from airbyte_cdk.models import Type as MessageType 16from airbyte_cdk.sources.streams.core import StreamData 17from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer 18 19 20def stream_data_to_airbyte_message( 21 stream_name: str, 22 data_or_message: StreamData, 23 transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform), 24 schema: Optional[Mapping[str, Any]] = None, 25 file_reference: Optional[AirbyteRecordMessageFileReference] = None, 26) -> AirbyteMessage: 27 if schema is None: 28 schema = {} 29 30 match data_or_message: 31 case ABCMapping(): 32 data = dict(data_or_message) 33 now_millis = time.time_ns() // 1_000_000 34 # Transform object fields according to config. Most likely you will 35 # need it to normalize values against json schema. By default no action 36 # taken unless configured. See 37 # docs/connector-development/cdk-python/schemas.md for details. 38 transformer.transform(data, schema) 39 message = AirbyteRecordMessage( 40 stream=stream_name, 41 data=data, 42 emitted_at=now_millis, 43 file_reference=file_reference, 44 ) 45 return AirbyteMessage(type=MessageType.RECORD, record=message) 46 case AirbyteTraceMessage(): 47 return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message) 48 case AirbyteLogMessage(): 49 return AirbyteMessage(type=MessageType.LOG, log=data_or_message) 50 case _: 51 raise ValueError( 52 f"Unexpected type for data_or_message: {type(data_or_message)}: {data_or_message}" 53 )
def
stream_data_to_airbyte_message( stream_name: str, data_or_message: Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage], transformer: airbyte_cdk.TypeTransformer = <airbyte_cdk.TypeTransformer object>, schema: Optional[Mapping[str, Any]] = None, file_reference: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessageFileReference] = None) -> airbyte_cdk.AirbyteMessage:
21def stream_data_to_airbyte_message( 22 stream_name: str, 23 data_or_message: StreamData, 24 transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform), 25 schema: Optional[Mapping[str, Any]] = None, 26 file_reference: Optional[AirbyteRecordMessageFileReference] = None, 27) -> AirbyteMessage: 28 if schema is None: 29 schema = {} 30 31 match data_or_message: 32 case ABCMapping(): 33 data = dict(data_or_message) 34 now_millis = time.time_ns() // 1_000_000 35 # Transform object fields according to config. Most likely you will 36 # need it to normalize values against json schema. By default no action 37 # taken unless configured. See 38 # docs/connector-development/cdk-python/schemas.md for details. 39 transformer.transform(data, schema) 40 message = AirbyteRecordMessage( 41 stream=stream_name, 42 data=data, 43 emitted_at=now_millis, 44 file_reference=file_reference, 45 ) 46 return AirbyteMessage(type=MessageType.RECORD, record=message) 47 case AirbyteTraceMessage(): 48 return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message) 49 case AirbyteLogMessage(): 50 return AirbyteMessage(type=MessageType.LOG, log=data_or_message) 51 case _: 52 raise ValueError( 53 f"Unexpected type for data_or_message: {type(data_or_message)}: {data_or_message}" 54 )