airbyte_cdk.models.airbyte_protocol_serializers
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2from typing import Any, Dict 3 4from serpyco_rs import CustomType, Serializer 5 6from .airbyte_protocol import ( # type: ignore[attr-defined] # all classes are imported to airbyte_protocol via * 7 AirbyteCatalog, 8 AirbyteMessage, 9 AirbyteStateBlob, 10 AirbyteStateMessage, 11 AirbyteStream, 12 AirbyteStreamState, 13 ConfiguredAirbyteCatalog, 14 ConfiguredAirbyteStream, 15 ConnectorSpecification, 16) 17 18 19class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]): 20 def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]: 21 # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete" 22 return {k: v for k, v in value.__dict__.items()} 23 24 def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob: 25 return AirbyteStateBlob(value) 26 27 def get_json_schema(self) -> Dict[str, Any]: 28 return {"type": "object"} 29 30 31def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, Dict[str, Any]] | None: 32 return AirbyteStateBlobType() if t is AirbyteStateBlob else None 33 34 35AirbyteCatalogSerializer = Serializer(AirbyteCatalog, omit_none=True) 36AirbyteStreamSerializer = Serializer(AirbyteStream, omit_none=True) 37AirbyteStreamStateSerializer = Serializer( 38 AirbyteStreamState, omit_none=True, custom_type_resolver=custom_type_resolver 39) 40AirbyteStateMessageSerializer = Serializer( 41 AirbyteStateMessage, omit_none=True, custom_type_resolver=custom_type_resolver 42) 43AirbyteMessageSerializer = Serializer( 44 AirbyteMessage, omit_none=True, custom_type_resolver=custom_type_resolver 45) 46ConfiguredAirbyteCatalogSerializer = Serializer(ConfiguredAirbyteCatalog, omit_none=True) 47ConfiguredAirbyteStreamSerializer = Serializer(ConfiguredAirbyteStream, omit_none=True) 48ConnectorSpecificationSerializer = Serializer(ConnectorSpecification, omit_none=True)
class
AirbyteStateBlobType(serpyco_rs._custom_types.CustomType[airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob, typing.Dict[str, typing.Any]]):
20class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]): 21 def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]: 22 # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete" 23 return {k: v for k, v in value.__dict__.items()} 24 25 def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob: 26 return AirbyteStateBlob(value) 27 28 def get_json_schema(self) -> Dict[str, Any]: 29 return {"type": "object"}
Helper class that provides a standard way to create an ABC using inheritance.
def
serialize( self, value: airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob) -> Dict[str, Any]:
def
deserialize( self, value: Dict[str, Any]) -> airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob:
def
custom_type_resolver( t: type) -> Optional[serpyco_rs._custom_types.CustomType[airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob, Dict[str, Any]]]:
AirbyteCatalogSerializer =
<serpyco_rs._main.Serializer object>
AirbyteStreamSerializer =
<serpyco_rs._main.Serializer object>
AirbyteStreamStateSerializer =
<serpyco_rs._main.Serializer object>
AirbyteStateMessageSerializer =
<serpyco_rs._main.Serializer object>
AirbyteMessageSerializer =
<serpyco_rs._main.Serializer object>
ConfiguredAirbyteCatalogSerializer =
<serpyco_rs._main.Serializer object>
ConfiguredAirbyteStreamSerializer =
<serpyco_rs._main.Serializer object>
ConnectorSpecificationSerializer =
<serpyco_rs._main.Serializer object>