airbyte_cdk.sources.declarative.types

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from __future__ import annotations
 6
 7from airbyte_cdk.sources.types import (
 8    Config,
 9    ConnectionDefinition,
10    FieldPointer,
11    Record,
12    StreamSlice,
13    StreamState,
14)
15
16# Note: This package originally contained class definitions for low-code CDK types, but we promoted them into the Python CDK.
17# We've migrated connectors in the repository to reference the new location, but these assignments are used to retain backwards
18# compatibility for sources created by OSS customers or on forks. This can be removed when we start bumping major versions.
19
20FieldPointer = FieldPointer
21Config = Config
22ConnectionDefinition = ConnectionDefinition
23StreamState = StreamState
24Record = Record
25StreamSlice = StreamSlice
FieldPointer = typing.List[str]
Config = typing.Mapping[str, typing.Any]
ConnectionDefinition = typing.Mapping[str, typing.Any]
StreamState = typing.Mapping[str, typing.Any]
class Record(typing.Mapping[str, typing.Any]):
22class Record(Mapping[str, Any]):
23    def __init__(
24        self,
25        data: Mapping[str, Any],
26        stream_name: str,
27        associated_slice: Optional[StreamSlice] = None,
28        file_reference: Optional[AirbyteRecordMessageFileReference] = None,
29    ):
30        self._data = data
31        self._associated_slice = associated_slice
32        self.stream_name = stream_name
33        self._file_reference = file_reference
34
35    @property
36    def data(self) -> Mapping[str, Any]:
37        return self._data
38
39    @property
40    def associated_slice(self) -> Optional[StreamSlice]:
41        return self._associated_slice
42
43    @property
44    def file_reference(self) -> AirbyteRecordMessageFileReference:
45        return self._file_reference
46
47    @file_reference.setter
48    def file_reference(self, value: AirbyteRecordMessageFileReference) -> None:
49        self._file_reference = value
50
51    def __repr__(self) -> str:
52        return repr(self._data)
53
54    def __getitem__(self, key: str) -> Any:
55        return self._data[key]
56
57    def __len__(self) -> int:
58        return len(self._data)
59
60    def __iter__(self) -> Any:
61        return iter(self._data)
62
63    def __contains__(self, item: object) -> bool:
64        return item in self._data
65
66    def __eq__(self, other: object) -> bool:
67        if isinstance(other, Record):
68            # noinspection PyProtectedMember
69            return self._data == other._data
70        return False
71
72    def __ne__(self, other: object) -> bool:
73        return not self.__eq__(other)

A Mapping is a generic container for associating key/value pairs.

This class provides concrete generic implementations of all methods except for __getitem__, __iter__, and __len__.

Record( data: Mapping[str, Any], stream_name: str, associated_slice: Optional[StreamSlice] = None, file_reference: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessageFileReference] = None)
23    def __init__(
24        self,
25        data: Mapping[str, Any],
26        stream_name: str,
27        associated_slice: Optional[StreamSlice] = None,
28        file_reference: Optional[AirbyteRecordMessageFileReference] = None,
29    ):
30        self._data = data
31        self._associated_slice = associated_slice
32        self.stream_name = stream_name
33        self._file_reference = file_reference
stream_name
data: Mapping[str, Any]
35    @property
36    def data(self) -> Mapping[str, Any]:
37        return self._data
associated_slice: Optional[StreamSlice]
39    @property
40    def associated_slice(self) -> Optional[StreamSlice]:
41        return self._associated_slice
file_reference: airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteRecordMessageFileReference
43    @property
44    def file_reference(self) -> AirbyteRecordMessageFileReference:
45        return self._file_reference
class StreamSlice(typing.Mapping[str, typing.Any]):
 76class StreamSlice(Mapping[str, Any]):
 77    def __init__(
 78        self,
 79        *,
 80        partition: Mapping[str, Any],
 81        cursor_slice: Mapping[str, Any],
 82        extra_fields: Optional[Mapping[str, Any]] = None,
 83    ) -> None:
 84        """
 85        :param partition: The partition keys representing a unique partition in the stream.
 86        :param cursor_slice: The incremental cursor slice keys, such as dates or pagination tokens.
 87        :param extra_fields: Additional fields that should not be part of the partition but passed along, such as metadata from the parent stream.
 88        """
 89        self._partition = partition
 90        self._cursor_slice = cursor_slice
 91        self._extra_fields = extra_fields or {}
 92
 93        # Ensure that partition keys do not overlap with cursor slice keys
 94        if partition.keys() & cursor_slice.keys():
 95            raise ValueError("Keys for partition and incremental sync cursor should not overlap")
 96
 97        self._stream_slice = dict(partition) | dict(cursor_slice)
 98
 99    @property
100    def partition(self) -> Mapping[str, Any]:
101        """Returns the partition portion of the stream slice."""
102        p = self._partition
103        while isinstance(p, StreamSlice):
104            p = p.partition
105        return p
106
107    @property
108    def cursor_slice(self) -> Mapping[str, Any]:
109        """Returns the cursor slice portion of the stream slice."""
110        c = self._cursor_slice
111        while isinstance(c, StreamSlice):
112            c = c.cursor_slice
113        return c
114
115    @property
116    def extra_fields(self) -> Mapping[str, Any]:
117        """Returns the extra fields that are not part of the partition."""
118        return self._extra_fields
119
120    def __repr__(self) -> str:
121        return repr(self._stream_slice)
122
123    def __setitem__(self, key: str, value: Any) -> None:
124        raise ValueError("StreamSlice is immutable")
125
126    def __getitem__(self, key: str) -> Any:
127        return self._stream_slice[key]
128
129    def __len__(self) -> int:
130        return len(self._stream_slice)
131
132    def __iter__(self) -> Iterator[str]:
133        return iter(self._stream_slice)
134
135    def __contains__(self, item: Any) -> bool:
136        return item in self._stream_slice
137
138    def keys(self) -> KeysView[str]:
139        return self._stream_slice.keys()
140
141    def items(self) -> ItemsView[str, Any]:
142        return self._stream_slice.items()
143
144    def values(self) -> ValuesView[Any]:
145        return self._stream_slice.values()
146
147    def get(self, key: str, default: Any = None) -> Optional[Any]:
148        return self._stream_slice.get(key, default)
149
150    def __eq__(self, other: Any) -> bool:
151        if isinstance(other, dict):
152            return self._stream_slice == other
153        if isinstance(other, StreamSlice):
154            # noinspection PyProtectedMember
155            return self._partition == other._partition and self._cursor_slice == other._cursor_slice
156        return False
157
158    def __ne__(self, other: Any) -> bool:
159        return not self.__eq__(other)
160
161    def __json_serializable__(self) -> Any:
162        return self._stream_slice
163
164    def __hash__(self) -> int:
165        return SliceHasher.hash(
166            stream_slice=self._stream_slice
167        )  # no need to provide stream_name here as this is used for slicing the cursor
168
169    def __bool__(self) -> bool:
170        return bool(self._stream_slice) or bool(self._extra_fields)

A Mapping is a generic container for associating key/value pairs.

This class provides concrete generic implementations of all methods except for __getitem__, __iter__, and __len__.

StreamSlice( *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any], extra_fields: Optional[Mapping[str, Any]] = None)
77    def __init__(
78        self,
79        *,
80        partition: Mapping[str, Any],
81        cursor_slice: Mapping[str, Any],
82        extra_fields: Optional[Mapping[str, Any]] = None,
83    ) -> None:
84        """
85        :param partition: The partition keys representing a unique partition in the stream.
86        :param cursor_slice: The incremental cursor slice keys, such as dates or pagination tokens.
87        :param extra_fields: Additional fields that should not be part of the partition but passed along, such as metadata from the parent stream.
88        """
89        self._partition = partition
90        self._cursor_slice = cursor_slice
91        self._extra_fields = extra_fields or {}
92
93        # Ensure that partition keys do not overlap with cursor slice keys
94        if partition.keys() & cursor_slice.keys():
95            raise ValueError("Keys for partition and incremental sync cursor should not overlap")
96
97        self._stream_slice = dict(partition) | dict(cursor_slice)
Parameters
  • partition: The partition keys representing a unique partition in the stream.
  • cursor_slice: The incremental cursor slice keys, such as dates or pagination tokens.
  • extra_fields: Additional fields that should not be part of the partition but passed along, such as metadata from the parent stream.
partition: Mapping[str, Any]
 99    @property
100    def partition(self) -> Mapping[str, Any]:
101        """Returns the partition portion of the stream slice."""
102        p = self._partition
103        while isinstance(p, StreamSlice):
104            p = p.partition
105        return p

Returns the partition portion of the stream slice.

cursor_slice: Mapping[str, Any]
107    @property
108    def cursor_slice(self) -> Mapping[str, Any]:
109        """Returns the cursor slice portion of the stream slice."""
110        c = self._cursor_slice
111        while isinstance(c, StreamSlice):
112            c = c.cursor_slice
113        return c

Returns the cursor slice portion of the stream slice.

extra_fields: Mapping[str, Any]
115    @property
116    def extra_fields(self) -> Mapping[str, Any]:
117        """Returns the extra fields that are not part of the partition."""
118        return self._extra_fields

Returns the extra fields that are not part of the partition.

def keys(self) -> KeysView[str]:
138    def keys(self) -> KeysView[str]:
139        return self._stream_slice.keys()

D.keys() -> a set-like object providing a view on D's keys

def items(self) -> ItemsView[str, Any]:
141    def items(self) -> ItemsView[str, Any]:
142        return self._stream_slice.items()

D.items() -> a set-like object providing a view on D's items

def values(self) -> ValuesView[Any]:
144    def values(self) -> ValuesView[Any]:
145        return self._stream_slice.values()

D.values() -> an object providing a view on D's values

def get(self, key: str, default: Any = None) -> Optional[Any]:
147    def get(self, key: str, default: Any = None) -> Optional[Any]:
148        return self._stream_slice.get(key, default)

D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.