airbyte_cdk.sources.declarative.types

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from __future__ import annotations
 6
 7from airbyte_cdk.sources.types import (
 8    Config,
 9    ConnectionDefinition,
10    FieldPointer,
11    Record,
12    StreamSlice,
13    StreamState,
14)
15
16# Note: This package originally contained class definitions for low-code CDK types, but we promoted them into the Python CDK.
17# We've migrated connectors in the repository to reference the new location, but these assignments are used to retain backwards
18# compatibility for sources created by OSS customers or on forks. This can be removed when we start bumping major versions.
19
20FieldPointer = FieldPointer
21Config = Config
22ConnectionDefinition = ConnectionDefinition
23StreamState = StreamState
24Record = Record
25StreamSlice = StreamSlice
FieldPointer = typing.List[str]
Config = typing.Mapping[str, typing.Any]
ConnectionDefinition = typing.Mapping[str, typing.Any]
StreamState = typing.Mapping[str, typing.Any]
class Record(typing.Mapping[str, typing.Any]):
21class Record(Mapping[str, Any]):
22    def __init__(
23        self,
24        data: Mapping[str, Any],
25        stream_name: str,
26        associated_slice: Optional[StreamSlice] = None,
27        is_file_transfer_message: bool = False,
28    ):
29        self._data = data
30        self._associated_slice = associated_slice
31        self.stream_name = stream_name
32        self.is_file_transfer_message = is_file_transfer_message
33
34    @property
35    def data(self) -> Mapping[str, Any]:
36        return self._data
37
38    @property
39    def associated_slice(self) -> Optional[StreamSlice]:
40        return self._associated_slice
41
42    def __repr__(self) -> str:
43        return repr(self._data)
44
45    def __getitem__(self, key: str) -> Any:
46        return self._data[key]
47
48    def __len__(self) -> int:
49        return len(self._data)
50
51    def __iter__(self) -> Any:
52        return iter(self._data)
53
54    def __contains__(self, item: object) -> bool:
55        return item in self._data
56
57    def __eq__(self, other: object) -> bool:
58        if isinstance(other, Record):
59            # noinspection PyProtectedMember
60            return self._data == other._data
61        return False
62
63    def __ne__(self, other: object) -> bool:
64        return not self.__eq__(other)

A Mapping is a generic container for associating key/value pairs.

This class provides concrete generic implementations of all methods except for __getitem__, __iter__, and __len__.

Record( data: Mapping[str, Any], stream_name: str, associated_slice: Optional[StreamSlice] = None, is_file_transfer_message: bool = False)
22    def __init__(
23        self,
24        data: Mapping[str, Any],
25        stream_name: str,
26        associated_slice: Optional[StreamSlice] = None,
27        is_file_transfer_message: bool = False,
28    ):
29        self._data = data
30        self._associated_slice = associated_slice
31        self.stream_name = stream_name
32        self.is_file_transfer_message = is_file_transfer_message
stream_name
is_file_transfer_message
data: Mapping[str, Any]
34    @property
35    def data(self) -> Mapping[str, Any]:
36        return self._data
associated_slice: Optional[StreamSlice]
38    @property
39    def associated_slice(self) -> Optional[StreamSlice]:
40        return self._associated_slice
class StreamSlice(typing.Mapping[str, typing.Any]):
 67class StreamSlice(Mapping[str, Any]):
 68    def __init__(
 69        self,
 70        *,
 71        partition: Mapping[str, Any],
 72        cursor_slice: Mapping[str, Any],
 73        extra_fields: Optional[Mapping[str, Any]] = None,
 74    ) -> None:
 75        """
 76        :param partition: The partition keys representing a unique partition in the stream.
 77        :param cursor_slice: The incremental cursor slice keys, such as dates or pagination tokens.
 78        :param extra_fields: Additional fields that should not be part of the partition but passed along, such as metadata from the parent stream.
 79        """
 80        self._partition = partition
 81        self._cursor_slice = cursor_slice
 82        self._extra_fields = extra_fields or {}
 83
 84        # Ensure that partition keys do not overlap with cursor slice keys
 85        if partition.keys() & cursor_slice.keys():
 86            raise ValueError("Keys for partition and incremental sync cursor should not overlap")
 87
 88        self._stream_slice = dict(partition) | dict(cursor_slice)
 89
 90    @property
 91    def partition(self) -> Mapping[str, Any]:
 92        """Returns the partition portion of the stream slice."""
 93        p = self._partition
 94        while isinstance(p, StreamSlice):
 95            p = p.partition
 96        return p
 97
 98    @property
 99    def cursor_slice(self) -> Mapping[str, Any]:
100        """Returns the cursor slice portion of the stream slice."""
101        c = self._cursor_slice
102        while isinstance(c, StreamSlice):
103            c = c.cursor_slice
104        return c
105
106    @property
107    def extra_fields(self) -> Mapping[str, Any]:
108        """Returns the extra fields that are not part of the partition."""
109        return self._extra_fields
110
111    def __repr__(self) -> str:
112        return repr(self._stream_slice)
113
114    def __setitem__(self, key: str, value: Any) -> None:
115        raise ValueError("StreamSlice is immutable")
116
117    def __getitem__(self, key: str) -> Any:
118        return self._stream_slice[key]
119
120    def __len__(self) -> int:
121        return len(self._stream_slice)
122
123    def __iter__(self) -> Iterator[str]:
124        return iter(self._stream_slice)
125
126    def __contains__(self, item: Any) -> bool:
127        return item in self._stream_slice
128
129    def keys(self) -> KeysView[str]:
130        return self._stream_slice.keys()
131
132    def items(self) -> ItemsView[str, Any]:
133        return self._stream_slice.items()
134
135    def values(self) -> ValuesView[Any]:
136        return self._stream_slice.values()
137
138    def get(self, key: str, default: Any = None) -> Optional[Any]:
139        return self._stream_slice.get(key, default)
140
141    def __eq__(self, other: Any) -> bool:
142        if isinstance(other, dict):
143            return self._stream_slice == other
144        if isinstance(other, StreamSlice):
145            # noinspection PyProtectedMember
146            return self._partition == other._partition and self._cursor_slice == other._cursor_slice
147        return False
148
149    def __ne__(self, other: Any) -> bool:
150        return not self.__eq__(other)
151
152    def __json_serializable__(self) -> Any:
153        return self._stream_slice
154
155    def __hash__(self) -> int:
156        return SliceHasher.hash(
157            stream_slice=self._stream_slice
158        )  # no need to provide stream_name here as this is used for slicing the cursor
159
160    def __bool__(self) -> bool:
161        return bool(self._stream_slice) or bool(self._extra_fields)

A Mapping is a generic container for associating key/value pairs.

This class provides concrete generic implementations of all methods except for __getitem__, __iter__, and __len__.

StreamSlice( *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any], extra_fields: Optional[Mapping[str, Any]] = None)
68    def __init__(
69        self,
70        *,
71        partition: Mapping[str, Any],
72        cursor_slice: Mapping[str, Any],
73        extra_fields: Optional[Mapping[str, Any]] = None,
74    ) -> None:
75        """
76        :param partition: The partition keys representing a unique partition in the stream.
77        :param cursor_slice: The incremental cursor slice keys, such as dates or pagination tokens.
78        :param extra_fields: Additional fields that should not be part of the partition but passed along, such as metadata from the parent stream.
79        """
80        self._partition = partition
81        self._cursor_slice = cursor_slice
82        self._extra_fields = extra_fields or {}
83
84        # Ensure that partition keys do not overlap with cursor slice keys
85        if partition.keys() & cursor_slice.keys():
86            raise ValueError("Keys for partition and incremental sync cursor should not overlap")
87
88        self._stream_slice = dict(partition) | dict(cursor_slice)
Parameters
  • partition: The partition keys representing a unique partition in the stream.
  • cursor_slice: The incremental cursor slice keys, such as dates or pagination tokens.
  • extra_fields: Additional fields that should not be part of the partition but passed along, such as metadata from the parent stream.
partition: Mapping[str, Any]
90    @property
91    def partition(self) -> Mapping[str, Any]:
92        """Returns the partition portion of the stream slice."""
93        p = self._partition
94        while isinstance(p, StreamSlice):
95            p = p.partition
96        return p

Returns the partition portion of the stream slice.

cursor_slice: Mapping[str, Any]
 98    @property
 99    def cursor_slice(self) -> Mapping[str, Any]:
100        """Returns the cursor slice portion of the stream slice."""
101        c = self._cursor_slice
102        while isinstance(c, StreamSlice):
103            c = c.cursor_slice
104        return c

Returns the cursor slice portion of the stream slice.

extra_fields: Mapping[str, Any]
106    @property
107    def extra_fields(self) -> Mapping[str, Any]:
108        """Returns the extra fields that are not part of the partition."""
109        return self._extra_fields

Returns the extra fields that are not part of the partition.

def keys(self) -> KeysView[str]:
129    def keys(self) -> KeysView[str]:
130        return self._stream_slice.keys()

D.keys() -> a set-like object providing a view on D's keys

def items(self) -> ItemsView[str, Any]:
132    def items(self) -> ItemsView[str, Any]:
133        return self._stream_slice.items()

D.items() -> a set-like object providing a view on D's items

def values(self) -> ValuesView[Any]:
135    def values(self) -> ValuesView[Any]:
136        return self._stream_slice.values()

D.values() -> an object providing a view on D's values

def get(self, key: str, default: Any = None) -> Optional[Any]:
138    def get(self, key: str, default: Any = None) -> Optional[Any]:
139        return self._stream_slice.get(key, default)

D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.