airbyte_cdk.models.airbyte_protocol
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from dataclasses import InitVar, dataclass 6from typing import Annotated, Any, Dict, List, Mapping, Optional, Union 7 8from airbyte_protocol_dataclasses.models import * # noqa: F403 # Allow '*' 9from serpyco_rs.metadata import Alias 10 11# ruff: noqa: F405 # ignore fuzzy import issues with 'import *' 12 13 14@dataclass 15class AirbyteStateBlob: 16 """ 17 A dataclass that dynamically sets attributes based on provided keyword arguments and positional arguments. 18 Used to "mimic" pydantic Basemodel with ConfigDict(extra='allow') option. 19 20 The `AirbyteStateBlob` class allows for flexible instantiation by accepting any number of keyword arguments 21 and positional arguments. These are used to dynamically update the instance's attributes. This class is useful 22 in scenarios where the attributes of an object are not known until runtime and need to be set dynamically. 23 24 Attributes: 25 kwargs (InitVar[Mapping[str, Any]]): A dictionary of keyword arguments used to set attributes dynamically. 26 27 Methods: 28 __init__(*args: Any, **kwargs: Any) -> None: 29 Initializes the `AirbyteStateBlob` by setting attributes from the provided arguments. 30 31 __eq__(other: object) -> bool: 32 Checks equality between two `AirbyteStateBlob` instances based on their internal dictionaries. 33 Returns `False` if the other object is not an instance of `AirbyteStateBlob`. 34 """ 35 36 kwargs: InitVar[Mapping[str, Any]] 37 38 def __init__(self, *args: Any, **kwargs: Any) -> None: 39 # Set any attribute passed in through kwargs 40 for arg in args: 41 self.__dict__.update(arg) 42 for key, value in kwargs.items(): 43 setattr(self, key, value) 44 45 def __eq__(self, other: object) -> bool: 46 return ( 47 False 48 if not isinstance(other, AirbyteStateBlob) 49 else bool(self.__dict__ == other.__dict__) 50 ) 51 52 53# The following dataclasses have been redeclared to include the new version of AirbyteStateBlob 54@dataclass 55class AirbyteStreamState: 56 stream_descriptor: StreamDescriptor # type: ignore [name-defined] 57 stream_state: Optional[AirbyteStateBlob] = None 58 59 60@dataclass 61class AirbyteGlobalState: 62 stream_states: List[AirbyteStreamState] 63 shared_state: Optional[AirbyteStateBlob] = None 64 65 66@dataclass 67class AirbyteStateMessage: 68 type: Optional[AirbyteStateType] = None # type: ignore [name-defined] 69 stream: Optional[AirbyteStreamState] = None 70 global_: Annotated[AirbyteGlobalState | None, Alias("global")] = ( 71 None # "global" is a reserved keyword in python ⇒ Alias is used for (de-)serialization 72 ) 73 data: Optional[Dict[str, Any]] = None 74 sourceStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined] 75 destinationStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined] 76 77 78@dataclass 79class AirbyteMessage: 80 type: Type # type: ignore [name-defined] 81 log: Optional[AirbyteLogMessage] = None # type: ignore [name-defined] 82 spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined] 83 connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined] 84 catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined] 85 record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined] 86 state: Optional[AirbyteStateMessage] = None 87 trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined] 88 control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]
15@dataclass 16class AirbyteStateBlob: 17 """ 18 A dataclass that dynamically sets attributes based on provided keyword arguments and positional arguments. 19 Used to "mimic" pydantic Basemodel with ConfigDict(extra='allow') option. 20 21 The `AirbyteStateBlob` class allows for flexible instantiation by accepting any number of keyword arguments 22 and positional arguments. These are used to dynamically update the instance's attributes. This class is useful 23 in scenarios where the attributes of an object are not known until runtime and need to be set dynamically. 24 25 Attributes: 26 kwargs (InitVar[Mapping[str, Any]]): A dictionary of keyword arguments used to set attributes dynamically. 27 28 Methods: 29 __init__(*args: Any, **kwargs: Any) -> None: 30 Initializes the `AirbyteStateBlob` by setting attributes from the provided arguments. 31 32 __eq__(other: object) -> bool: 33 Checks equality between two `AirbyteStateBlob` instances based on their internal dictionaries. 34 Returns `False` if the other object is not an instance of `AirbyteStateBlob`. 35 """ 36 37 kwargs: InitVar[Mapping[str, Any]] 38 39 def __init__(self, *args: Any, **kwargs: Any) -> None: 40 # Set any attribute passed in through kwargs 41 for arg in args: 42 self.__dict__.update(arg) 43 for key, value in kwargs.items(): 44 setattr(self, key, value) 45 46 def __eq__(self, other: object) -> bool: 47 return ( 48 False 49 if not isinstance(other, AirbyteStateBlob) 50 else bool(self.__dict__ == other.__dict__) 51 )
A dataclass that dynamically sets attributes based on provided keyword arguments and positional arguments. Used to "mimic" pydantic Basemodel with ConfigDict(extra='allow') option.
The AirbyteStateBlob
class allows for flexible instantiation by accepting any number of keyword arguments
and positional arguments. These are used to dynamically update the instance's attributes. This class is useful
in scenarios where the attributes of an object are not known until runtime and need to be set dynamically.
Attributes:
- kwargs (InitVar[Mapping[str, Any]]): A dictionary of keyword arguments used to set attributes dynamically.
Methods:
__init__(args: Any, *kwargs: Any) -> None: Initializes the
AirbyteStateBlob
by setting attributes from the provided arguments.__eq__(other: object) -> bool: Checks equality between two
AirbyteStateBlob
instances based on their internal dictionaries. ReturnsFalse
if the other object is not an instance ofAirbyteStateBlob
.
55@dataclass 56class AirbyteStreamState: 57 stream_descriptor: StreamDescriptor # type: ignore [name-defined] 58 stream_state: Optional[AirbyteStateBlob] = None
61@dataclass 62class AirbyteGlobalState: 63 stream_states: List[AirbyteStreamState] 64 shared_state: Optional[AirbyteStateBlob] = None
67@dataclass 68class AirbyteStateMessage: 69 type: Optional[AirbyteStateType] = None # type: ignore [name-defined] 70 stream: Optional[AirbyteStreamState] = None 71 global_: Annotated[AirbyteGlobalState | None, Alias("global")] = ( 72 None # "global" is a reserved keyword in python ⇒ Alias is used for (de-)serialization 73 ) 74 data: Optional[Dict[str, Any]] = None 75 sourceStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined] 76 destinationStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined]
79@dataclass 80class AirbyteMessage: 81 type: Type # type: ignore [name-defined] 82 log: Optional[AirbyteLogMessage] = None # type: ignore [name-defined] 83 spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined] 84 connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined] 85 catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined] 86 record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined] 87 state: Optional[AirbyteStateMessage] = None 88 trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined] 89 control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]