airbyte_cdk.models.airbyte_protocol_serializers

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2from typing import Any, Dict
 3
 4from serpyco_rs import CustomType, Serializer
 5
 6from .airbyte_protocol import (  # type: ignore[attr-defined] # all classes are imported to airbyte_protocol via *
 7    AirbyteMessage,
 8    AirbyteStateBlob,
 9    AirbyteStateMessage,
10    AirbyteStreamState,
11    ConfiguredAirbyteCatalog,
12    ConfiguredAirbyteStream,
13    ConnectorSpecification,
14)
15
16
17class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]):
18    def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
19        # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
20        return {k: v for k, v in value.__dict__.items()}
21
22    def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
23        return AirbyteStateBlob(value)
24
25    def get_json_schema(self) -> Dict[str, Any]:
26        return {"type": "object"}
27
28
29def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, Dict[str, Any]] | None:
30    return AirbyteStateBlobType() if t is AirbyteStateBlob else None
31
32
33AirbyteStreamStateSerializer = Serializer(
34    AirbyteStreamState, omit_none=True, custom_type_resolver=custom_type_resolver
35)
36AirbyteStateMessageSerializer = Serializer(
37    AirbyteStateMessage, omit_none=True, custom_type_resolver=custom_type_resolver
38)
39AirbyteMessageSerializer = Serializer(
40    AirbyteMessage, omit_none=True, custom_type_resolver=custom_type_resolver
41)
42ConfiguredAirbyteCatalogSerializer = Serializer(ConfiguredAirbyteCatalog, omit_none=True)
43ConfiguredAirbyteStreamSerializer = Serializer(ConfiguredAirbyteStream, omit_none=True)
44ConnectorSpecificationSerializer = Serializer(ConnectorSpecification, omit_none=True)
class AirbyteStateBlobType(serpyco_rs._custom_types.CustomType[airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob, typing.Dict[str, typing.Any]]):
18class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]):
19    def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
20        # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
21        return {k: v for k, v in value.__dict__.items()}
22
23    def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
24        return AirbyteStateBlob(value)
25
26    def get_json_schema(self) -> Dict[str, Any]:
27        return {"type": "object"}

Helper class that provides a standard way to create an ABC using inheritance.

def serialize( self, value: airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob) -> Dict[str, Any]:
19    def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
20        # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
21        return {k: v for k, v in value.__dict__.items()}
def deserialize( self, value: Dict[str, Any]) -> airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob:
23    def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
24        return AirbyteStateBlob(value)
def get_json_schema(self) -> Dict[str, Any]:
26    def get_json_schema(self) -> Dict[str, Any]:
27        return {"type": "object"}
def custom_type_resolver( t: type) -> Optional[serpyco_rs._custom_types.CustomType[airbyte_cdk.models.airbyte_protocol.AirbyteStateBlob, Dict[str, Any]]]:
30def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, Dict[str, Any]] | None:
31    return AirbyteStateBlobType() if t is AirbyteStateBlob else None
AirbyteStreamStateSerializer = <serpyco_rs._main.Serializer object>
AirbyteStateMessageSerializer = <serpyco_rs._main.Serializer object>
AirbyteMessageSerializer = <serpyco_rs._main.Serializer object>
ConfiguredAirbyteCatalogSerializer = <serpyco_rs._main.Serializer object>
ConfiguredAirbyteStreamSerializer = <serpyco_rs._main.Serializer object>
ConnectorSpecificationSerializer = <serpyco_rs._main.Serializer object>