airbyte_cdk.models.airbyte_protocol_serializers
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2from typing import Any, Dict 3 4from serpyco_rs import CustomType, Serializer 5 6from .airbyte_protocol import ( # type: ignore[attr-defined] # all classes are imported to airbyte_protocol via * 7 AirbyteMessage, 8 AirbyteStateBlob, 9 AirbyteStateMessage, 10 AirbyteStreamState, 11 ConfiguredAirbyteCatalog, 12 ConfiguredAirbyteStream, 13 ConnectorSpecification, 14) 15 16 17class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]): 18 def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]: 19 # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete" 20 return {k: v for k, v in value.__dict__.items()} 21 22 def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob: 23 return AirbyteStateBlob(value) 24 25 def get_json_schema(self) -> Dict[str, Any]: 26 return {"type": "object"} 27 28 29def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, Dict[str, Any]] | None: 30 return AirbyteStateBlobType() if t is AirbyteStateBlob else None 31 32 33AirbyteStreamStateSerializer = Serializer( 34 AirbyteStreamState, omit_none=True, custom_type_resolver=custom_type_resolver 35) 36AirbyteStateMessageSerializer = Serializer( 37 AirbyteStateMessage, omit_none=True, custom_type_resolver=custom_type_resolver 38) 39AirbyteMessageSerializer = Serializer( 40 AirbyteMessage, omit_none=True, custom_type_resolver=custom_type_resolver 41) 42ConfiguredAirbyteCatalogSerializer = Serializer(ConfiguredAirbyteCatalog, omit_none=True) 43ConfiguredAirbyteStreamSerializer = Serializer(ConfiguredAirbyteStream, omit_none=True) 44ConnectorSpecificationSerializer = Serializer(ConnectorSpecification, omit_none=True)
class
AirbyteStateBlobType(serpyco_rs._custom_types.CustomType[airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob, typing.Dict[str, typing.Any]]):
18class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]): 19 def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]: 20 # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete" 21 return {k: v for k, v in value.__dict__.items()} 22 23 def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob: 24 return AirbyteStateBlob(value) 25 26 def get_json_schema(self) -> Dict[str, Any]: 27 return {"type": "object"}
Helper class that provides a standard way to create an ABC using inheritance.
def
serialize( self, value: airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob) -> Dict[str, Any]:
def
deserialize( self, value: Dict[str, Any]) -> airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob:
def
custom_type_resolver( t: type) -> Optional[serpyco_rs._custom_types.CustomType[airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob, Dict[str, Any]]]:
AirbyteStreamStateSerializer =
<serpyco_rs._main.Serializer object>
AirbyteStateMessageSerializer =
<serpyco_rs._main.Serializer object>
AirbyteMessageSerializer =
<serpyco_rs._main.Serializer object>
ConfiguredAirbyteCatalogSerializer =
<serpyco_rs._main.Serializer object>
ConfiguredAirbyteStreamSerializer =
<serpyco_rs._main.Serializer object>
ConnectorSpecificationSerializer =
<serpyco_rs._main.Serializer object>