airbyte_cdk.sources.streams.permissions.identities_stream
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import traceback 6from abc import ABC, abstractmethod 7from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional 8 9from airbyte_protocol_dataclasses.models import SyncMode 10 11from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level 12from airbyte_cdk.models import Type as MessageType 13from airbyte_cdk.sources.streams import Stream 14from airbyte_cdk.sources.streams.checkpoint import Cursor 15from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message 16from airbyte_cdk.utils.traced_exception import AirbyteTracedException 17 18 19class IdentitiesStream(Stream, ABC): 20 """ 21 The identities stream. A full refresh stream to sync identities from a certain domain. 22 The load_identity_groups method manage the logic to get such data. 23 """ 24 25 IDENTITIES_STREAM_NAME = "identities" 26 27 is_resumable = False 28 29 def __init__(self) -> None: 30 super().__init__() 31 self._cursor: MutableMapping[str, Any] = {} 32 33 @property 34 def state(self) -> MutableMapping[str, Any]: 35 return self._cursor 36 37 @state.setter 38 def state(self, value: MutableMapping[str, Any]) -> None: 39 """State setter, accept state serialized by state getter.""" 40 self._cursor = value 41 42 def read_records( 43 self, 44 sync_mode: SyncMode, 45 cursor_field: Optional[List[str]] = None, 46 stream_slice: Optional[Mapping[str, Any]] = None, 47 stream_state: Optional[Mapping[str, Any]] = None, 48 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 49 try: 50 identity_groups = self.load_identity_groups() 51 for record in identity_groups: 52 yield stream_data_to_airbyte_message(self.name, record) 53 except AirbyteTracedException as exc: 54 # Re-raise the exception to stop the whole sync immediately as this is a fatal error 55 raise exc 56 except Exception as e: 57 yield AirbyteMessage( 58 type=MessageType.LOG, 59 log=AirbyteLogMessage( 60 level=Level.ERROR, 61 message=f"Error trying to read identities: {e} stream={self.name}", 62 stack_trace=traceback.format_exc(), 63 ), 64 ) 65 66 @abstractmethod 67 def load_identity_groups(self) -> Iterable[Dict[str, Any]]: 68 raise NotImplementedError("Implement this method to read identity records") 69 70 @property 71 def name(self) -> str: 72 return self.IDENTITIES_STREAM_NAME 73 74 def get_cursor(self) -> Optional[Cursor]: 75 return None
20class IdentitiesStream(Stream, ABC): 21 """ 22 The identities stream. A full refresh stream to sync identities from a certain domain. 23 The load_identity_groups method manage the logic to get such data. 24 """ 25 26 IDENTITIES_STREAM_NAME = "identities" 27 28 is_resumable = False 29 30 def __init__(self) -> None: 31 super().__init__() 32 self._cursor: MutableMapping[str, Any] = {} 33 34 @property 35 def state(self) -> MutableMapping[str, Any]: 36 return self._cursor 37 38 @state.setter 39 def state(self, value: MutableMapping[str, Any]) -> None: 40 """State setter, accept state serialized by state getter.""" 41 self._cursor = value 42 43 def read_records( 44 self, 45 sync_mode: SyncMode, 46 cursor_field: Optional[List[str]] = None, 47 stream_slice: Optional[Mapping[str, Any]] = None, 48 stream_state: Optional[Mapping[str, Any]] = None, 49 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 50 try: 51 identity_groups = self.load_identity_groups() 52 for record in identity_groups: 53 yield stream_data_to_airbyte_message(self.name, record) 54 except AirbyteTracedException as exc: 55 # Re-raise the exception to stop the whole sync immediately as this is a fatal error 56 raise exc 57 except Exception as e: 58 yield AirbyteMessage( 59 type=MessageType.LOG, 60 log=AirbyteLogMessage( 61 level=Level.ERROR, 62 message=f"Error trying to read identities: {e} stream={self.name}", 63 stack_trace=traceback.format_exc(), 64 ), 65 ) 66 67 @abstractmethod 68 def load_identity_groups(self) -> Iterable[Dict[str, Any]]: 69 raise NotImplementedError("Implement this method to read identity records") 70 71 @property 72 def name(self) -> str: 73 return self.IDENTITIES_STREAM_NAME 74 75 def get_cursor(self) -> Optional[Cursor]: 76 return None
The identities stream. A full refresh stream to sync identities from a certain domain. The load_identity_groups method manage the logic to get such data.
Returns
True if this stream allows the checkpointing of sync progress and can resume from it on subsequent attempts. This differs from supports_incremental because certain kinds of streams like those supporting resumable full refresh can checkpoint progress in between attempts for improved fault tolerance. However, they will start from the beginning on the next sync job.
43 def read_records( 44 self, 45 sync_mode: SyncMode, 46 cursor_field: Optional[List[str]] = None, 47 stream_slice: Optional[Mapping[str, Any]] = None, 48 stream_state: Optional[Mapping[str, Any]] = None, 49 ) -> Iterable[Mapping[str, Any] | AirbyteMessage]: 50 try: 51 identity_groups = self.load_identity_groups() 52 for record in identity_groups: 53 yield stream_data_to_airbyte_message(self.name, record) 54 except AirbyteTracedException as exc: 55 # Re-raise the exception to stop the whole sync immediately as this is a fatal error 56 raise exc 57 except Exception as e: 58 yield AirbyteMessage( 59 type=MessageType.LOG, 60 log=AirbyteLogMessage( 61 level=Level.ERROR, 62 message=f"Error trying to read identities: {e} stream={self.name}", 63 stack_trace=traceback.format_exc(), 64 ), 65 )
This method should be overridden by subclasses to read records based on the inputs
Returns
Stream name. By default this is the implementing class name, but it can be overridden as needed.
A Cursor is an interface that a stream can implement to manage how its internal state is read and updated while reading records. Historically, Python connectors had no concept of a cursor to manage state. Python streams need to define a cursor implementation and override this method to manage state through a Cursor.
Inherited Members
- airbyte_cdk.sources.streams.core.Stream
- logger
- transformer
- cursor
- has_multiple_slices
- get_error_display_message
- read
- read_only_records
- get_json_schema
- as_airbyte_stream
- supports_incremental
- cursor_field
- namespace
- source_defined_cursor
- exit_on_rate_limit
- primary_key
- stream_slices
- state_checkpoint_interval
- get_updated_state
- log_stream_sync_configuration
- configured_json_schema