airbyte_cdk.models.airbyte_protocol
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from dataclasses import InitVar, dataclass 6from typing import Annotated, Any, Dict, List, Mapping, Optional, Union 7 8from airbyte_protocol_dataclasses.models import * # noqa: F403 # Allow '*' 9from serpyco_rs.metadata import Alias 10 11from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage 12 13# ruff: noqa: F405 # ignore fuzzy import issues with 'import *' 14 15 16@dataclass 17class AirbyteStateBlob: 18 """ 19 A dataclass that dynamically sets attributes based on provided keyword arguments and positional arguments. 20 Used to "mimic" pydantic Basemodel with ConfigDict(extra='allow') option. 21 22 The `AirbyteStateBlob` class allows for flexible instantiation by accepting any number of keyword arguments 23 and positional arguments. These are used to dynamically update the instance's attributes. This class is useful 24 in scenarios where the attributes of an object are not known until runtime and need to be set dynamically. 25 26 Attributes: 27 kwargs (InitVar[Mapping[str, Any]]): A dictionary of keyword arguments used to set attributes dynamically. 28 29 Methods: 30 __init__(*args: Any, **kwargs: Any) -> None: 31 Initializes the `AirbyteStateBlob` by setting attributes from the provided arguments. 32 33 __eq__(other: object) -> bool: 34 Checks equality between two `AirbyteStateBlob` instances based on their internal dictionaries. 35 Returns `False` if the other object is not an instance of `AirbyteStateBlob`. 36 """ 37 38 kwargs: InitVar[Mapping[str, Any]] 39 40 def __init__(self, *args: Any, **kwargs: Any) -> None: 41 # Set any attribute passed in through kwargs 42 for arg in args: 43 self.__dict__.update(arg) 44 for key, value in kwargs.items(): 45 setattr(self, key, value) 46 47 def __eq__(self, other: object) -> bool: 48 return ( 49 False 50 if not isinstance(other, AirbyteStateBlob) 51 else bool(self.__dict__ == other.__dict__) 52 ) 53 54 55# The following dataclasses have been redeclared to include the new version of AirbyteStateBlob 56@dataclass 57class AirbyteStreamState: 58 stream_descriptor: StreamDescriptor # type: ignore [name-defined] 59 stream_state: Optional[AirbyteStateBlob] = None 60 61 62@dataclass 63class AirbyteGlobalState: 64 stream_states: List[AirbyteStreamState] 65 shared_state: Optional[AirbyteStateBlob] = None 66 67 68@dataclass 69class AirbyteStateMessage: 70 type: Optional[AirbyteStateType] = None # type: ignore [name-defined] 71 stream: Optional[AirbyteStreamState] = None 72 global_: Annotated[AirbyteGlobalState | None, Alias("global")] = ( 73 None # "global" is a reserved keyword in python ⇒ Alias is used for (de-)serialization 74 ) 75 data: Optional[Dict[str, Any]] = None 76 sourceStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined] 77 destinationStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined] 78 79 80@dataclass 81class AirbyteMessage: 82 type: Type # type: ignore [name-defined] 83 log: Optional[AirbyteLogMessage] = None # type: ignore [name-defined] 84 spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined] 85 connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined] 86 catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined] 87 record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined] 88 state: Optional[AirbyteStateMessage] = None 89 trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined] 90 control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]
17@dataclass 18class AirbyteStateBlob: 19 """ 20 A dataclass that dynamically sets attributes based on provided keyword arguments and positional arguments. 21 Used to "mimic" pydantic Basemodel with ConfigDict(extra='allow') option. 22 23 The `AirbyteStateBlob` class allows for flexible instantiation by accepting any number of keyword arguments 24 and positional arguments. These are used to dynamically update the instance's attributes. This class is useful 25 in scenarios where the attributes of an object are not known until runtime and need to be set dynamically. 26 27 Attributes: 28 kwargs (InitVar[Mapping[str, Any]]): A dictionary of keyword arguments used to set attributes dynamically. 29 30 Methods: 31 __init__(*args: Any, **kwargs: Any) -> None: 32 Initializes the `AirbyteStateBlob` by setting attributes from the provided arguments. 33 34 __eq__(other: object) -> bool: 35 Checks equality between two `AirbyteStateBlob` instances based on their internal dictionaries. 36 Returns `False` if the other object is not an instance of `AirbyteStateBlob`. 37 """ 38 39 kwargs: InitVar[Mapping[str, Any]] 40 41 def __init__(self, *args: Any, **kwargs: Any) -> None: 42 # Set any attribute passed in through kwargs 43 for arg in args: 44 self.__dict__.update(arg) 45 for key, value in kwargs.items(): 46 setattr(self, key, value) 47 48 def __eq__(self, other: object) -> bool: 49 return ( 50 False 51 if not isinstance(other, AirbyteStateBlob) 52 else bool(self.__dict__ == other.__dict__) 53 )
A dataclass that dynamically sets attributes based on provided keyword arguments and positional arguments. Used to "mimic" pydantic Basemodel with ConfigDict(extra='allow') option.
The AirbyteStateBlob
class allows for flexible instantiation by accepting any number of keyword arguments
and positional arguments. These are used to dynamically update the instance's attributes. This class is useful
in scenarios where the attributes of an object are not known until runtime and need to be set dynamically.
Attributes:
- kwargs (InitVar[Mapping[str, Any]]): A dictionary of keyword arguments used to set attributes dynamically.
Methods:
__init__(args: Any, *kwargs: Any) -> None: Initializes the
AirbyteStateBlob
by setting attributes from the provided arguments.__eq__(other: object) -> bool: Checks equality between two
AirbyteStateBlob
instances based on their internal dictionaries. ReturnsFalse
if the other object is not an instance ofAirbyteStateBlob
.
57@dataclass 58class AirbyteStreamState: 59 stream_descriptor: StreamDescriptor # type: ignore [name-defined] 60 stream_state: Optional[AirbyteStateBlob] = None
63@dataclass 64class AirbyteGlobalState: 65 stream_states: List[AirbyteStreamState] 66 shared_state: Optional[AirbyteStateBlob] = None
69@dataclass 70class AirbyteStateMessage: 71 type: Optional[AirbyteStateType] = None # type: ignore [name-defined] 72 stream: Optional[AirbyteStreamState] = None 73 global_: Annotated[AirbyteGlobalState | None, Alias("global")] = ( 74 None # "global" is a reserved keyword in python ⇒ Alias is used for (de-)serialization 75 ) 76 data: Optional[Dict[str, Any]] = None 77 sourceStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined] 78 destinationStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined]
81@dataclass 82class AirbyteMessage: 83 type: Type # type: ignore [name-defined] 84 log: Optional[AirbyteLogMessage] = None # type: ignore [name-defined] 85 spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined] 86 connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined] 87 catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined] 88 record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined] 89 state: Optional[AirbyteStateMessage] = None 90 trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined] 91 control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]