airbyte_cdk.models.airbyte_protocol_serializers

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2from typing import Any, Dict
 3
 4from serpyco_rs import CustomType, Serializer
 5
 6from .airbyte_protocol import (  # type: ignore[attr-defined] # all classes are imported to airbyte_protocol via *
 7    AirbyteCatalog,
 8    AirbyteMessage,
 9    AirbyteStateBlob,
10    AirbyteStateMessage,
11    AirbyteStream,
12    AirbyteStreamState,
13    ConfiguredAirbyteCatalog,
14    ConfiguredAirbyteStream,
15    ConnectorSpecification,
16)
17
18
19class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]):
20    def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
21        # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
22        return {k: v for k, v in value.__dict__.items()}
23
24    def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
25        return AirbyteStateBlob(value)
26
27    def get_json_schema(self) -> Dict[str, Any]:
28        return {"type": "object"}
29
30
31def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, Dict[str, Any]] | None:
32    return AirbyteStateBlobType() if t is AirbyteStateBlob else None
33
34
35AirbyteCatalogSerializer = Serializer(AirbyteCatalog, omit_none=True)
36AirbyteStreamSerializer = Serializer(AirbyteStream, omit_none=True)
37AirbyteStreamStateSerializer = Serializer(
38    AirbyteStreamState, omit_none=True, custom_type_resolver=custom_type_resolver
39)
40AirbyteStateMessageSerializer = Serializer(
41    AirbyteStateMessage, omit_none=True, custom_type_resolver=custom_type_resolver
42)
43AirbyteMessageSerializer = Serializer(
44    AirbyteMessage, omit_none=True, custom_type_resolver=custom_type_resolver
45)
46ConfiguredAirbyteCatalogSerializer = Serializer(ConfiguredAirbyteCatalog, omit_none=True)
47ConfiguredAirbyteStreamSerializer = Serializer(ConfiguredAirbyteStream, omit_none=True)
48ConnectorSpecificationSerializer = Serializer(ConnectorSpecification, omit_none=True)
class AirbyteStateBlobType(serpyco_rs._custom_types.CustomType[airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob, typing.Dict[str, typing.Any]]):
20class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]):
21    def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
22        # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
23        return {k: v for k, v in value.__dict__.items()}
24
25    def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
26        return AirbyteStateBlob(value)
27
28    def get_json_schema(self) -> Dict[str, Any]:
29        return {"type": "object"}

Helper class that provides a standard way to create an ABC using inheritance.

def serialize( self, value: airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob) -> Dict[str, Any]:
21    def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
22        # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
23        return {k: v for k, v in value.__dict__.items()}
def deserialize( self, value: Dict[str, Any]) -> airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob:
25    def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
26        return AirbyteStateBlob(value)
def get_json_schema(self) -> Dict[str, Any]:
28    def get_json_schema(self) -> Dict[str, Any]:
29        return {"type": "object"}
def custom_type_resolver( t: type) -> Optional[serpyco_rs._custom_types.CustomType[airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob, Dict[str, Any]]]:
32def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, Dict[str, Any]] | None:
33    return AirbyteStateBlobType() if t is AirbyteStateBlob else None
AirbyteCatalogSerializer = <serpyco_rs._main.Serializer object>
AirbyteStreamSerializer = <serpyco_rs._main.Serializer object>
AirbyteStreamStateSerializer = <serpyco_rs._main.Serializer object>
AirbyteStateMessageSerializer = <serpyco_rs._main.Serializer object>
AirbyteMessageSerializer = <serpyco_rs._main.Serializer object>
ConfiguredAirbyteCatalogSerializer = <serpyco_rs._main.Serializer object>
ConfiguredAirbyteStreamSerializer = <serpyco_rs._main.Serializer object>
ConnectorSpecificationSerializer = <serpyco_rs._main.Serializer object>