airbyte_cdk.sources.abstract_source

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import logging
  6from abc import ABC, abstractmethod
  7from typing import (
  8    Any,
  9    Dict,
 10    Iterable,
 11    Iterator,
 12    List,
 13    Mapping,
 14    MutableMapping,
 15    Optional,
 16    Tuple,
 17    Union,
 18)
 19
 20from airbyte_cdk.exception_handler import generate_failed_streams_error_message
 21from airbyte_cdk.models import (
 22    AirbyteCatalog,
 23    AirbyteConnectionStatus,
 24    AirbyteMessage,
 25    AirbyteStateMessage,
 26    AirbyteStreamStatus,
 27    ConfiguredAirbyteCatalog,
 28    ConfiguredAirbyteStream,
 29    FailureType,
 30    Status,
 31    StreamDescriptor,
 32)
 33from airbyte_cdk.models import Type as MessageType
 34from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 35from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
 36from airbyte_cdk.sources.source import Source
 37from airbyte_cdk.sources.streams import Stream
 38from airbyte_cdk.sources.streams.core import StreamData
 39from airbyte_cdk.sources.streams.http.http import HttpStream
 40from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
 41from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
 42from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
 43from airbyte_cdk.utils.event_timing import create_timer
 44from airbyte_cdk.utils.stream_status_utils import (
 45    as_airbyte_message as stream_status_as_airbyte_message,
 46)
 47from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 48
 49_default_message_repository = InMemoryMessageRepository()
 50
 51
 52class AbstractSource(Source, ABC):
 53    """
 54    Abstract base class for an Airbyte Source. Consumers should implement any abstract methods
 55    in this class to create an Airbyte Specification compliant Source.
 56    """
 57
 58    @abstractmethod
 59    def check_connection(
 60        self, logger: logging.Logger, config: Mapping[str, Any]
 61    ) -> Tuple[bool, Optional[Any]]:
 62        """
 63        :param logger: source logger
 64        :param config: The user-provided configuration as specified by the source's spec.
 65          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
 66        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
 67          and we can connect to the underlying data source using the provided configuration.
 68          Otherwise, the input config cannot be used to connect to the underlying data source,
 69          and the "error" object should describe what went wrong.
 70          The error object will be cast to string to display the problem to the user.
 71        """
 72
 73    @abstractmethod
 74    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
 75        """
 76        :param config: The user-provided configuration as specified by the source's spec.
 77        Any stream construction related operation should happen here.
 78        :return: A list of the streams in this source connector.
 79        """
 80
 81    # Stream name to instance map for applying output object transformation
 82    _stream_to_instance_map: Dict[str, Stream] = {}
 83    _slice_logger: SliceLogger = DebugSliceLogger()
 84
 85    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
 86        """Implements the Discover operation from the Airbyte Specification.
 87        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
 88        """
 89        streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
 90        return AirbyteCatalog(streams=streams)
 91
 92    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
 93        """Implements the Check Connection operation from the Airbyte Specification.
 94        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
 95        """
 96        check_succeeded, error = self.check_connection(logger, config)
 97        if not check_succeeded:
 98            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
 99        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
100
101    def read(
102        self,
103        logger: logging.Logger,
104        config: Mapping[str, Any],
105        catalog: ConfiguredAirbyteCatalog,
106        state: Optional[List[AirbyteStateMessage]] = None,
107    ) -> Iterator[AirbyteMessage]:
108        """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
109        logger.info(f"Starting syncing {self.name}")
110        config, internal_config = split_config(config)
111        # TODO assert all streams exist in the connector
112        # get the streams once in case the connector needs to make any queries to generate them
113        stream_instances = {s.name: s for s in self.streams(config)}
114        state_manager = ConnectorStateManager(state=state)
115        self._stream_to_instance_map = stream_instances
116
117        stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {}
118
119        with create_timer(self.name) as timer:
120            for configured_stream in catalog.streams:
121                stream_instance = stream_instances.get(configured_stream.stream.name)
122                is_stream_exist = bool(stream_instance)
123                try:
124                    # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors
125                    if not stream_instance:
126                        if not self.raise_exception_on_missing_stream:
127                            yield stream_status_as_airbyte_message(
128                                configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
129                            )
130                            continue
131
132                        error_message = (
133                            f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
134                            f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
135                        )
136
137                        # Use configured_stream as stream_instance to support references in error handling.
138                        stream_instance = configured_stream.stream
139
140                        raise AirbyteTracedException(
141                            message="A stream listed in your configuration was not found in the source. Please check the logs for more "
142                            "details.",
143                            internal_message=error_message,
144                            failure_type=FailureType.config_error,
145                        )
146
147                    timer.start_event(f"Syncing stream {configured_stream.stream.name}")
148                    logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
149                    yield stream_status_as_airbyte_message(
150                        configured_stream.stream, AirbyteStreamStatus.STARTED
151                    )
152                    yield from self._read_stream(
153                        logger=logger,
154                        stream_instance=stream_instance,
155                        configured_stream=configured_stream,
156                        state_manager=state_manager,
157                        internal_config=internal_config,
158                    )
159                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
160                    yield stream_status_as_airbyte_message(
161                        configured_stream.stream, AirbyteStreamStatus.COMPLETE
162                    )
163
164                except Exception as e:
165                    yield from self._emit_queued_messages()
166                    logger.exception(
167                        f"Encountered an exception while reading stream {configured_stream.stream.name}"
168                    )
169                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
170                    yield stream_status_as_airbyte_message(
171                        configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
172                    )
173
174                    stream_descriptor = StreamDescriptor(name=configured_stream.stream.name)
175
176                    if isinstance(e, AirbyteTracedException):
177                        traced_exception = e
178                        info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
179                    else:
180                        traced_exception = self._serialize_exception(
181                            stream_descriptor, e, stream_instance=stream_instance
182                        )
183                        info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}"
184
185                    yield traced_exception.as_sanitized_airbyte_message(
186                        stream_descriptor=stream_descriptor
187                    )
188                    stream_name_to_exception[stream_instance.name] = traced_exception  # type: ignore # use configured_stream if stream_instance is None
189                    if self.stop_sync_on_stream_failure:
190                        logger.info(info_message)
191                        break
192                finally:
193                    # Finish read event only if the stream instance exists;
194                    # otherwise, there's no need as it never started
195                    if is_stream_exist:
196                        timer.finish_event()
197                        logger.info(f"Finished syncing {configured_stream.stream.name}")
198                        logger.info(timer.report())
199
200        if len(stream_name_to_exception) > 0:
201            error_message = generate_failed_streams_error_message(
202                {key: [value] for key, value in stream_name_to_exception.items()}
203            )
204            logger.info(error_message)
205            # We still raise at least one exception when a stream raises an exception because the platform currently relies
206            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
207            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
208            raise AirbyteTracedException(
209                message=error_message, failure_type=FailureType.config_error
210            )
211        logger.info(f"Finished syncing {self.name}")
212
213    @staticmethod
214    def _serialize_exception(
215        stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None
216    ) -> AirbyteTracedException:
217        display_message = stream_instance.get_error_display_message(e) if stream_instance else None
218        if display_message:
219            return AirbyteTracedException.from_exception(
220                e, message=display_message, stream_descriptor=stream_descriptor
221            )
222        return AirbyteTracedException.from_exception(e, stream_descriptor=stream_descriptor)
223
224    @property
225    def raise_exception_on_missing_stream(self) -> bool:
226        return False
227
228    def _read_stream(
229        self,
230        logger: logging.Logger,
231        stream_instance: Stream,
232        configured_stream: ConfiguredAirbyteStream,
233        state_manager: ConnectorStateManager,
234        internal_config: InternalConfig,
235    ) -> Iterator[AirbyteMessage]:
236        if internal_config.page_size and isinstance(stream_instance, HttpStream):
237            logger.info(
238                f"Setting page size for {stream_instance.name} to {internal_config.page_size}"
239            )
240            stream_instance.page_size = internal_config.page_size
241        logger.debug(
242            f"Syncing configured stream: {configured_stream.stream.name}",
243            extra={
244                "sync_mode": configured_stream.sync_mode,
245                "primary_key": configured_stream.primary_key,
246                "cursor_field": configured_stream.cursor_field,
247            },
248        )
249        stream_instance.log_stream_sync_configuration()
250
251        stream_name = configured_stream.stream.name
252        stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)
253
254        # This is a hack. Existing full refresh streams that are converted into resumable full refresh need to discard
255        # the state because the terminal state for a full refresh sync is not compatible with substream resumable full
256        # refresh state. This is only required when running live traffic regression testing since the platform normally
257        # handles whether to pass state
258        if stream_state == {"__ab_no_cursor_state_message": True}:
259            stream_state = {}
260
261        if "state" in dir(stream_instance):
262            stream_instance.state = stream_state  # type: ignore # we check that state in the dir(stream_instance)
263            logger.info(f"Setting state of {self.name} stream to {stream_state}")
264
265        record_iterator = stream_instance.read(
266            configured_stream,
267            logger,
268            self._slice_logger,
269            stream_state,
270            state_manager,
271            internal_config,
272        )
273
274        record_counter = 0
275        logger.info(f"Syncing stream: {stream_name} ")
276        for record_data_or_message in record_iterator:
277            record = self._get_message(record_data_or_message, stream_instance)
278            if record.type == MessageType.RECORD:
279                record_counter += 1
280                if record_counter == 1:
281                    logger.info(f"Marking stream {stream_name} as RUNNING")
282                    # If we just read the first record of the stream, emit the transition to the RUNNING state
283                    yield stream_status_as_airbyte_message(
284                        configured_stream.stream, AirbyteStreamStatus.RUNNING
285                    )
286            yield from self._emit_queued_messages()
287            yield record
288
289        logger.info(f"Read {record_counter} records from {stream_name} stream")
290
291    def _emit_queued_messages(self) -> Iterable[AirbyteMessage]:
292        if self.message_repository:
293            yield from self.message_repository.consume_queue()
294        return
295
296    def _get_message(
297        self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream
298    ) -> AirbyteMessage:
299        """
300        Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
301        """
302        match record_data_or_message:
303            case AirbyteMessage():
304                return record_data_or_message
305            case _:
306                return stream_data_to_airbyte_message(
307                    stream.name,
308                    record_data_or_message,
309                    stream.transformer,
310                    stream.get_json_schema(),
311                )
312
313    @property
314    def message_repository(self) -> Union[None, MessageRepository]:
315        return _default_message_repository
316
317    @property
318    def stop_sync_on_stream_failure(self) -> bool:
319        """
320        WARNING: This function is in-development which means it is subject to change. Use at your own risk.
321
322        By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
323        continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync
324        on the first error seen and emit a single error trace message for that stream.
325        """
326        return False
 53class AbstractSource(Source, ABC):
 54    """
 55    Abstract base class for an Airbyte Source. Consumers should implement any abstract methods
 56    in this class to create an Airbyte Specification compliant Source.
 57    """
 58
 59    @abstractmethod
 60    def check_connection(
 61        self, logger: logging.Logger, config: Mapping[str, Any]
 62    ) -> Tuple[bool, Optional[Any]]:
 63        """
 64        :param logger: source logger
 65        :param config: The user-provided configuration as specified by the source's spec.
 66          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
 67        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
 68          and we can connect to the underlying data source using the provided configuration.
 69          Otherwise, the input config cannot be used to connect to the underlying data source,
 70          and the "error" object should describe what went wrong.
 71          The error object will be cast to string to display the problem to the user.
 72        """
 73
 74    @abstractmethod
 75    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
 76        """
 77        :param config: The user-provided configuration as specified by the source's spec.
 78        Any stream construction related operation should happen here.
 79        :return: A list of the streams in this source connector.
 80        """
 81
 82    # Stream name to instance map for applying output object transformation
 83    _stream_to_instance_map: Dict[str, Stream] = {}
 84    _slice_logger: SliceLogger = DebugSliceLogger()
 85
 86    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
 87        """Implements the Discover operation from the Airbyte Specification.
 88        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
 89        """
 90        streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
 91        return AirbyteCatalog(streams=streams)
 92
 93    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
 94        """Implements the Check Connection operation from the Airbyte Specification.
 95        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
 96        """
 97        check_succeeded, error = self.check_connection(logger, config)
 98        if not check_succeeded:
 99            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
100        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
101
102    def read(
103        self,
104        logger: logging.Logger,
105        config: Mapping[str, Any],
106        catalog: ConfiguredAirbyteCatalog,
107        state: Optional[List[AirbyteStateMessage]] = None,
108    ) -> Iterator[AirbyteMessage]:
109        """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
110        logger.info(f"Starting syncing {self.name}")
111        config, internal_config = split_config(config)
112        # TODO assert all streams exist in the connector
113        # get the streams once in case the connector needs to make any queries to generate them
114        stream_instances = {s.name: s for s in self.streams(config)}
115        state_manager = ConnectorStateManager(state=state)
116        self._stream_to_instance_map = stream_instances
117
118        stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {}
119
120        with create_timer(self.name) as timer:
121            for configured_stream in catalog.streams:
122                stream_instance = stream_instances.get(configured_stream.stream.name)
123                is_stream_exist = bool(stream_instance)
124                try:
125                    # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors
126                    if not stream_instance:
127                        if not self.raise_exception_on_missing_stream:
128                            yield stream_status_as_airbyte_message(
129                                configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
130                            )
131                            continue
132
133                        error_message = (
134                            f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
135                            f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
136                        )
137
138                        # Use configured_stream as stream_instance to support references in error handling.
139                        stream_instance = configured_stream.stream
140
141                        raise AirbyteTracedException(
142                            message="A stream listed in your configuration was not found in the source. Please check the logs for more "
143                            "details.",
144                            internal_message=error_message,
145                            failure_type=FailureType.config_error,
146                        )
147
148                    timer.start_event(f"Syncing stream {configured_stream.stream.name}")
149                    logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
150                    yield stream_status_as_airbyte_message(
151                        configured_stream.stream, AirbyteStreamStatus.STARTED
152                    )
153                    yield from self._read_stream(
154                        logger=logger,
155                        stream_instance=stream_instance,
156                        configured_stream=configured_stream,
157                        state_manager=state_manager,
158                        internal_config=internal_config,
159                    )
160                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
161                    yield stream_status_as_airbyte_message(
162                        configured_stream.stream, AirbyteStreamStatus.COMPLETE
163                    )
164
165                except Exception as e:
166                    yield from self._emit_queued_messages()
167                    logger.exception(
168                        f"Encountered an exception while reading stream {configured_stream.stream.name}"
169                    )
170                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
171                    yield stream_status_as_airbyte_message(
172                        configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
173                    )
174
175                    stream_descriptor = StreamDescriptor(name=configured_stream.stream.name)
176
177                    if isinstance(e, AirbyteTracedException):
178                        traced_exception = e
179                        info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
180                    else:
181                        traced_exception = self._serialize_exception(
182                            stream_descriptor, e, stream_instance=stream_instance
183                        )
184                        info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}"
185
186                    yield traced_exception.as_sanitized_airbyte_message(
187                        stream_descriptor=stream_descriptor
188                    )
189                    stream_name_to_exception[stream_instance.name] = traced_exception  # type: ignore # use configured_stream if stream_instance is None
190                    if self.stop_sync_on_stream_failure:
191                        logger.info(info_message)
192                        break
193                finally:
194                    # Finish read event only if the stream instance exists;
195                    # otherwise, there's no need as it never started
196                    if is_stream_exist:
197                        timer.finish_event()
198                        logger.info(f"Finished syncing {configured_stream.stream.name}")
199                        logger.info(timer.report())
200
201        if len(stream_name_to_exception) > 0:
202            error_message = generate_failed_streams_error_message(
203                {key: [value] for key, value in stream_name_to_exception.items()}
204            )
205            logger.info(error_message)
206            # We still raise at least one exception when a stream raises an exception because the platform currently relies
207            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
208            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
209            raise AirbyteTracedException(
210                message=error_message, failure_type=FailureType.config_error
211            )
212        logger.info(f"Finished syncing {self.name}")
213
214    @staticmethod
215    def _serialize_exception(
216        stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None
217    ) -> AirbyteTracedException:
218        display_message = stream_instance.get_error_display_message(e) if stream_instance else None
219        if display_message:
220            return AirbyteTracedException.from_exception(
221                e, message=display_message, stream_descriptor=stream_descriptor
222            )
223        return AirbyteTracedException.from_exception(e, stream_descriptor=stream_descriptor)
224
225    @property
226    def raise_exception_on_missing_stream(self) -> bool:
227        return False
228
229    def _read_stream(
230        self,
231        logger: logging.Logger,
232        stream_instance: Stream,
233        configured_stream: ConfiguredAirbyteStream,
234        state_manager: ConnectorStateManager,
235        internal_config: InternalConfig,
236    ) -> Iterator[AirbyteMessage]:
237        if internal_config.page_size and isinstance(stream_instance, HttpStream):
238            logger.info(
239                f"Setting page size for {stream_instance.name} to {internal_config.page_size}"
240            )
241            stream_instance.page_size = internal_config.page_size
242        logger.debug(
243            f"Syncing configured stream: {configured_stream.stream.name}",
244            extra={
245                "sync_mode": configured_stream.sync_mode,
246                "primary_key": configured_stream.primary_key,
247                "cursor_field": configured_stream.cursor_field,
248            },
249        )
250        stream_instance.log_stream_sync_configuration()
251
252        stream_name = configured_stream.stream.name
253        stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)
254
255        # This is a hack. Existing full refresh streams that are converted into resumable full refresh need to discard
256        # the state because the terminal state for a full refresh sync is not compatible with substream resumable full
257        # refresh state. This is only required when running live traffic regression testing since the platform normally
258        # handles whether to pass state
259        if stream_state == {"__ab_no_cursor_state_message": True}:
260            stream_state = {}
261
262        if "state" in dir(stream_instance):
263            stream_instance.state = stream_state  # type: ignore # we check that state in the dir(stream_instance)
264            logger.info(f"Setting state of {self.name} stream to {stream_state}")
265
266        record_iterator = stream_instance.read(
267            configured_stream,
268            logger,
269            self._slice_logger,
270            stream_state,
271            state_manager,
272            internal_config,
273        )
274
275        record_counter = 0
276        logger.info(f"Syncing stream: {stream_name} ")
277        for record_data_or_message in record_iterator:
278            record = self._get_message(record_data_or_message, stream_instance)
279            if record.type == MessageType.RECORD:
280                record_counter += 1
281                if record_counter == 1:
282                    logger.info(f"Marking stream {stream_name} as RUNNING")
283                    # If we just read the first record of the stream, emit the transition to the RUNNING state
284                    yield stream_status_as_airbyte_message(
285                        configured_stream.stream, AirbyteStreamStatus.RUNNING
286                    )
287            yield from self._emit_queued_messages()
288            yield record
289
290        logger.info(f"Read {record_counter} records from {stream_name} stream")
291
292    def _emit_queued_messages(self) -> Iterable[AirbyteMessage]:
293        if self.message_repository:
294            yield from self.message_repository.consume_queue()
295        return
296
297    def _get_message(
298        self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream
299    ) -> AirbyteMessage:
300        """
301        Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
302        """
303        match record_data_or_message:
304            case AirbyteMessage():
305                return record_data_or_message
306            case _:
307                return stream_data_to_airbyte_message(
308                    stream.name,
309                    record_data_or_message,
310                    stream.transformer,
311                    stream.get_json_schema(),
312                )
313
314    @property
315    def message_repository(self) -> Union[None, MessageRepository]:
316        return _default_message_repository
317
318    @property
319    def stop_sync_on_stream_failure(self) -> bool:
320        """
321        WARNING: This function is in-development which means it is subject to change. Use at your own risk.
322
323        By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
324        continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync
325        on the first error seen and emit a single error trace message for that stream.
326        """
327        return False

Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.

@abstractmethod
def check_connection( self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
59    @abstractmethod
60    def check_connection(
61        self, logger: logging.Logger, config: Mapping[str, Any]
62    ) -> Tuple[bool, Optional[Any]]:
63        """
64        :param logger: source logger
65        :param config: The user-provided configuration as specified by the source's spec.
66          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
67        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
68          and we can connect to the underlying data source using the provided configuration.
69          Otherwise, the input config cannot be used to connect to the underlying data source,
70          and the "error" object should describe what went wrong.
71          The error object will be cast to string to display the problem to the user.
72        """
Parameters
  • logger: source logger
  • config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.

@abstractmethod
def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.Stream]:
74    @abstractmethod
75    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
76        """
77        :param config: The user-provided configuration as specified by the source's spec.
78        Any stream construction related operation should happen here.
79        :return: A list of the streams in this source connector.
80        """
Parameters
  • config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns

A list of the streams in this source connector.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
86    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
87        """Implements the Discover operation from the Airbyte Specification.
88        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
89        """
90        streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
91        return AirbyteCatalog(streams=streams)

Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
 93    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
 94        """Implements the Check Connection operation from the Airbyte Specification.
 95        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
 96        """
 97        check_succeeded, error = self.check_connection(logger, config)
 98        if not check_succeeded:
 99            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
100        return AirbyteConnectionStatus(status=Status.SUCCEEDED)

Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
102    def read(
103        self,
104        logger: logging.Logger,
105        config: Mapping[str, Any],
106        catalog: ConfiguredAirbyteCatalog,
107        state: Optional[List[AirbyteStateMessage]] = None,
108    ) -> Iterator[AirbyteMessage]:
109        """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
110        logger.info(f"Starting syncing {self.name}")
111        config, internal_config = split_config(config)
112        # TODO assert all streams exist in the connector
113        # get the streams once in case the connector needs to make any queries to generate them
114        stream_instances = {s.name: s for s in self.streams(config)}
115        state_manager = ConnectorStateManager(state=state)
116        self._stream_to_instance_map = stream_instances
117
118        stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {}
119
120        with create_timer(self.name) as timer:
121            for configured_stream in catalog.streams:
122                stream_instance = stream_instances.get(configured_stream.stream.name)
123                is_stream_exist = bool(stream_instance)
124                try:
125                    # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors
126                    if not stream_instance:
127                        if not self.raise_exception_on_missing_stream:
128                            yield stream_status_as_airbyte_message(
129                                configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
130                            )
131                            continue
132
133                        error_message = (
134                            f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
135                            f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
136                        )
137
138                        # Use configured_stream as stream_instance to support references in error handling.
139                        stream_instance = configured_stream.stream
140
141                        raise AirbyteTracedException(
142                            message="A stream listed in your configuration was not found in the source. Please check the logs for more "
143                            "details.",
144                            internal_message=error_message,
145                            failure_type=FailureType.config_error,
146                        )
147
148                    timer.start_event(f"Syncing stream {configured_stream.stream.name}")
149                    logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
150                    yield stream_status_as_airbyte_message(
151                        configured_stream.stream, AirbyteStreamStatus.STARTED
152                    )
153                    yield from self._read_stream(
154                        logger=logger,
155                        stream_instance=stream_instance,
156                        configured_stream=configured_stream,
157                        state_manager=state_manager,
158                        internal_config=internal_config,
159                    )
160                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
161                    yield stream_status_as_airbyte_message(
162                        configured_stream.stream, AirbyteStreamStatus.COMPLETE
163                    )
164
165                except Exception as e:
166                    yield from self._emit_queued_messages()
167                    logger.exception(
168                        f"Encountered an exception while reading stream {configured_stream.stream.name}"
169                    )
170                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
171                    yield stream_status_as_airbyte_message(
172                        configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
173                    )
174
175                    stream_descriptor = StreamDescriptor(name=configured_stream.stream.name)
176
177                    if isinstance(e, AirbyteTracedException):
178                        traced_exception = e
179                        info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
180                    else:
181                        traced_exception = self._serialize_exception(
182                            stream_descriptor, e, stream_instance=stream_instance
183                        )
184                        info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}"
185
186                    yield traced_exception.as_sanitized_airbyte_message(
187                        stream_descriptor=stream_descriptor
188                    )
189                    stream_name_to_exception[stream_instance.name] = traced_exception  # type: ignore # use configured_stream if stream_instance is None
190                    if self.stop_sync_on_stream_failure:
191                        logger.info(info_message)
192                        break
193                finally:
194                    # Finish read event only if the stream instance exists;
195                    # otherwise, there's no need as it never started
196                    if is_stream_exist:
197                        timer.finish_event()
198                        logger.info(f"Finished syncing {configured_stream.stream.name}")
199                        logger.info(timer.report())
200
201        if len(stream_name_to_exception) > 0:
202            error_message = generate_failed_streams_error_message(
203                {key: [value] for key, value in stream_name_to_exception.items()}
204            )
205            logger.info(error_message)
206            # We still raise at least one exception when a stream raises an exception because the platform currently relies
207            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
208            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
209            raise AirbyteTracedException(
210                message=error_message, failure_type=FailureType.config_error
211            )
212        logger.info(f"Finished syncing {self.name}")

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.

raise_exception_on_missing_stream: bool
225    @property
226    def raise_exception_on_missing_stream(self) -> bool:
227        return False
message_repository: Optional[airbyte_cdk.MessageRepository]
314    @property
315    def message_repository(self) -> Union[None, MessageRepository]:
316        return _default_message_repository
stop_sync_on_stream_failure: bool
318    @property
319    def stop_sync_on_stream_failure(self) -> bool:
320        """
321        WARNING: This function is in-development which means it is subject to change. Use at your own risk.
322
323        By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
324        continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync
325        on the first error seen and emit a single error trace message for that stream.
326        """
327        return False

WARNING: This function is in-development which means it is subject to change. Use at your own risk.

By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync on the first error seen and emit a single error trace message for that stream.