airbyte_cdk.sources.utils.record_helper
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import time 5from collections.abc import Mapping as ABCMapping 6from typing import Any, Mapping, Optional 7 8from airbyte_cdk.models import ( 9 AirbyteLogMessage, 10 AirbyteMessage, 11 AirbyteRecordMessage, 12 AirbyteTraceMessage, 13) 14from airbyte_cdk.models import Type as MessageType 15from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage 16from airbyte_cdk.sources.streams.core import StreamData 17from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer 18 19 20def stream_data_to_airbyte_message( 21 stream_name: str, 22 data_or_message: StreamData, 23 transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform), 24 schema: Optional[Mapping[str, Any]] = None, 25 is_file_transfer_message: bool = False, 26) -> AirbyteMessage: 27 if schema is None: 28 schema = {} 29 30 match data_or_message: 31 case ABCMapping(): 32 data = dict(data_or_message) 33 now_millis = time.time_ns() // 1_000_000 34 # Transform object fields according to config. Most likely you will 35 # need it to normalize values against json schema. By default no action 36 # taken unless configured. See 37 # docs/connector-development/cdk-python/schemas.md for details. 38 transformer.transform(data, schema) 39 if is_file_transfer_message: 40 message = AirbyteFileTransferRecordMessage( 41 stream=stream_name, file=data, emitted_at=now_millis, data={} 42 ) 43 else: 44 message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis) 45 return AirbyteMessage(type=MessageType.RECORD, record=message) 46 case AirbyteTraceMessage(): 47 return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message) 48 case AirbyteLogMessage(): 49 return AirbyteMessage(type=MessageType.LOG, log=data_or_message) 50 case _: 51 raise ValueError( 52 f"Unexpected type for data_or_message: {type(data_or_message)}: {data_or_message}" 53 )
def
stream_data_to_airbyte_message( stream_name: str, data_or_message: Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage], transformer: airbyte_cdk.TypeTransformer = <airbyte_cdk.TypeTransformer object>, schema: Optional[Mapping[str, Any]] = None, is_file_transfer_message: bool = False) -> airbyte_cdk.AirbyteMessage:
21def stream_data_to_airbyte_message( 22 stream_name: str, 23 data_or_message: StreamData, 24 transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform), 25 schema: Optional[Mapping[str, Any]] = None, 26 is_file_transfer_message: bool = False, 27) -> AirbyteMessage: 28 if schema is None: 29 schema = {} 30 31 match data_or_message: 32 case ABCMapping(): 33 data = dict(data_or_message) 34 now_millis = time.time_ns() // 1_000_000 35 # Transform object fields according to config. Most likely you will 36 # need it to normalize values against json schema. By default no action 37 # taken unless configured. See 38 # docs/connector-development/cdk-python/schemas.md for details. 39 transformer.transform(data, schema) 40 if is_file_transfer_message: 41 message = AirbyteFileTransferRecordMessage( 42 stream=stream_name, file=data, emitted_at=now_millis, data={} 43 ) 44 else: 45 message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis) 46 return AirbyteMessage(type=MessageType.RECORD, record=message) 47 case AirbyteTraceMessage(): 48 return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message) 49 case AirbyteLogMessage(): 50 return AirbyteMessage(type=MessageType.LOG, log=data_or_message) 51 case _: 52 raise ValueError( 53 f"Unexpected type for data_or_message: {type(data_or_message)}: {data_or_message}" 54 )