
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 5import traceback
 6from abc import ABC, abstractmethod
 7from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional
 9from airbyte_protocol_dataclasses.models import SyncMode
11from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
12from airbyte_cdk.models import Type as MessageType
13from airbyte_cdk.sources.streams import Stream
14from airbyte_cdk.sources.streams.checkpoint import Cursor
15from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
16from airbyte_cdk.utils.traced_exception import AirbyteTracedException
19class IdentitiesStream(Stream, ABC):
20    """
21    The identities stream. A full refresh stream to sync identities from a certain domain.
22    The load_identity_groups method manage the logic to get such data.
23    """
25    IDENTITIES_STREAM_NAME = "identities"
27    is_resumable = False
29    def __init__(self) -> None:
30        super().__init__()
31        self._cursor: MutableMapping[str, Any] = {}
33    @property
34    def state(self) -> MutableMapping[str, Any]:
35        return self._cursor
37    @state.setter
38    def state(self, value: MutableMapping[str, Any]) -> None:
39        """State setter, accept state serialized by state getter."""
40        self._cursor = value
42    def read_records(
43        self,
44        sync_mode: SyncMode,
45        cursor_field: Optional[List[str]] = None,
46        stream_slice: Optional[Mapping[str, Any]] = None,
47        stream_state: Optional[Mapping[str, Any]] = None,
48    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
49        try:
50            identity_groups = self.load_identity_groups()
51            for record in identity_groups:
52                yield stream_data_to_airbyte_message(, record)
53        except AirbyteTracedException as exc:
54            # Re-raise the exception to stop the whole sync immediately as this is a fatal error
55            raise exc
56        except Exception as e:
57            yield AirbyteMessage(
58                type=MessageType.LOG,
59                log=AirbyteLogMessage(
60                    level=Level.ERROR,
61                    message=f"Error trying to read identities: {e} stream={}",
62                    stack_trace=traceback.format_exc(),
63                ),
64            )
66    @abstractmethod
67    def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
68        raise NotImplementedError("Implement this method to read identity records")
70    @property
71    def name(self) -> str:
72        return self.IDENTITIES_STREAM_NAME
74    def get_cursor(self) -> Optional[Cursor]:
75        return None
class IdentitiesStream(airbyte_cdk.sources.streams.core.Stream, abc.ABC):
20class IdentitiesStream(Stream, ABC):
21    """
22    The identities stream. A full refresh stream to sync identities from a certain domain.
23    The load_identity_groups method manage the logic to get such data.
24    """
26    IDENTITIES_STREAM_NAME = "identities"
28    is_resumable = False
30    def __init__(self) -> None:
31        super().__init__()
32        self._cursor: MutableMapping[str, Any] = {}
34    @property
35    def state(self) -> MutableMapping[str, Any]:
36        return self._cursor
38    @state.setter
39    def state(self, value: MutableMapping[str, Any]) -> None:
40        """State setter, accept state serialized by state getter."""
41        self._cursor = value
43    def read_records(
44        self,
45        sync_mode: SyncMode,
46        cursor_field: Optional[List[str]] = None,
47        stream_slice: Optional[Mapping[str, Any]] = None,
48        stream_state: Optional[Mapping[str, Any]] = None,
49    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
50        try:
51            identity_groups = self.load_identity_groups()
52            for record in identity_groups:
53                yield stream_data_to_airbyte_message(, record)
54        except AirbyteTracedException as exc:
55            # Re-raise the exception to stop the whole sync immediately as this is a fatal error
56            raise exc
57        except Exception as e:
58            yield AirbyteMessage(
59                type=MessageType.LOG,
60                log=AirbyteLogMessage(
61                    level=Level.ERROR,
62                    message=f"Error trying to read identities: {e} stream={}",
63                    stack_trace=traceback.format_exc(),
64                ),
65            )
67    @abstractmethod
68    def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
69        raise NotImplementedError("Implement this method to read identity records")
71    @property
72    def name(self) -> str:
73        return self.IDENTITIES_STREAM_NAME
75    def get_cursor(self) -> Optional[Cursor]:
76        return None

The identities stream. A full refresh stream to sync identities from a certain domain. The load_identity_groups method manage the logic to get such data.

is_resumable = False

True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.

state: MutableMapping[str, Any]
34    @property
35    def state(self) -> MutableMapping[str, Any]:
36        return self._cursor

State setter, accept state serialized by state getter.

def read_records( self, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]] = None, stream_slice: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
43    def read_records(
44        self,
45        sync_mode: SyncMode,
46        cursor_field: Optional[List[str]] = None,
47        stream_slice: Optional[Mapping[str, Any]] = None,
48        stream_state: Optional[Mapping[str, Any]] = None,
49    ) -> Iterable[Mapping[str, Any] | AirbyteMessage]:
50        try:
51            identity_groups = self.load_identity_groups()
52            for record in identity_groups:
53                yield stream_data_to_airbyte_message(, record)
54        except AirbyteTracedException as exc:
55            # Re-raise the exception to stop the whole sync immediately as this is a fatal error
56            raise exc
57        except Exception as e:
58            yield AirbyteMessage(
59                type=MessageType.LOG,
60                log=AirbyteLogMessage(
61                    level=Level.ERROR,
62                    message=f"Error trying to read identities: {e} stream={}",
63                    stack_trace=traceback.format_exc(),
64                ),
65            )

This method should be overridden by subclasses to read records based on the inputs

def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
67    @abstractmethod
68    def load_identity_groups(self) -> Iterable[Dict[str, Any]]:
69        raise NotImplementedError("Implement this method to read identity records")
name: str
71    @property
72    def name(self) -> str:
73        return self.IDENTITIES_STREAM_NAME

Stream name. By default this is the implementing class name, but it can be overridden as needed.

def get_cursor(self) -> Optional[airbyte_cdk.sources.streams.checkpoint.Cursor]:
75    def get_cursor(self) -> Optional[Cursor]:
76        return None

A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.