airbyte_cdk.sources.abstract_source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import logging 6from abc import ABC, abstractmethod 7from typing import ( 8 Any, 9 Dict, 10 Iterable, 11 Iterator, 12 List, 13 Mapping, 14 MutableMapping, 15 Optional, 16 Tuple, 17 Union, 18) 19 20from airbyte_cdk.exception_handler import generate_failed_streams_error_message 21from airbyte_cdk.models import ( 22 AirbyteCatalog, 23 AirbyteConnectionStatus, 24 AirbyteMessage, 25 AirbyteStateMessage, 26 AirbyteStreamStatus, 27 ConfiguredAirbyteCatalog, 28 ConfiguredAirbyteStream, 29 FailureType, 30 Status, 31 StreamDescriptor, 32) 33from airbyte_cdk.models import Type as MessageType 34from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 35from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository 36from airbyte_cdk.sources.source import Source 37from airbyte_cdk.sources.streams import Stream 38from airbyte_cdk.sources.streams.core import StreamData 39from airbyte_cdk.sources.streams.http.http import HttpStream 40from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message 41from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config 42from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger 43from airbyte_cdk.utils.event_timing import create_timer 44from airbyte_cdk.utils.stream_status_utils import ( 45 as_airbyte_message as stream_status_as_airbyte_message, 46) 47from airbyte_cdk.utils.traced_exception import AirbyteTracedException 48 49_default_message_repository = InMemoryMessageRepository() 50 51 52class AbstractSource(Source, ABC): 53 """ 54 Abstract base class for an Airbyte Source. Consumers should implement any abstract methods 55 in this class to create an Airbyte Specification compliant Source. 56 """ 57 58 @abstractmethod 59 def check_connection( 60 self, logger: logging.Logger, config: Mapping[str, Any] 61 ) -> Tuple[bool, Optional[Any]]: 62 """ 63 :param logger: source logger 64 :param config: The user-provided configuration as specified by the source's spec. 65 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 66 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 67 and we can connect to the underlying data source using the provided configuration. 68 Otherwise, the input config cannot be used to connect to the underlying data source, 69 and the "error" object should describe what went wrong. 70 The error object will be cast to string to display the problem to the user. 71 """ 72 73 @abstractmethod 74 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 75 """ 76 :param config: The user-provided configuration as specified by the source's spec. 77 Any stream construction related operation should happen here. 78 :return: A list of the streams in this source connector. 79 """ 80 81 # Stream name to instance map for applying output object transformation 82 _stream_to_instance_map: Dict[str, Stream] = {} 83 _slice_logger: SliceLogger = DebugSliceLogger() 84 85 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 86 """Implements the Discover operation from the Airbyte Specification. 87 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover. 88 """ 89 streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)] 90 return AirbyteCatalog(streams=streams) 91 92 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 93 """Implements the Check Connection operation from the Airbyte Specification. 94 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check. 95 """ 96 check_succeeded, error = self.check_connection(logger, config) 97 if not check_succeeded: 98 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 99 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 100 101 def read( 102 self, 103 logger: logging.Logger, 104 config: Mapping[str, Any], 105 catalog: ConfiguredAirbyteCatalog, 106 state: Optional[List[AirbyteStateMessage]] = None, 107 ) -> Iterator[AirbyteMessage]: 108 """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.""" 109 logger.info(f"Starting syncing {self.name}") 110 config, internal_config = split_config(config) 111 # TODO assert all streams exist in the connector 112 # get the streams once in case the connector needs to make any queries to generate them 113 stream_instances = {s.name: s for s in self.streams(config)} 114 state_manager = ConnectorStateManager(state=state) 115 self._stream_to_instance_map = stream_instances 116 117 stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {} 118 119 with create_timer(self.name) as timer: 120 for configured_stream in catalog.streams: 121 stream_instance = stream_instances.get(configured_stream.stream.name) 122 is_stream_exist = bool(stream_instance) 123 try: 124 # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors 125 if not stream_instance: 126 if not self.raise_exception_on_missing_stream: 127 yield stream_status_as_airbyte_message( 128 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 129 ) 130 continue 131 132 error_message = ( 133 f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. " 134 f"Refresh the schema in your replication settings and remove this stream from future sync attempts." 135 ) 136 137 # Use configured_stream as stream_instance to support references in error handling. 138 stream_instance = configured_stream.stream 139 140 raise AirbyteTracedException( 141 message="A stream listed in your configuration was not found in the source. Please check the logs for more " 142 "details.", 143 internal_message=error_message, 144 failure_type=FailureType.config_error, 145 ) 146 147 timer.start_event(f"Syncing stream {configured_stream.stream.name}") 148 logger.info(f"Marking stream {configured_stream.stream.name} as STARTED") 149 yield stream_status_as_airbyte_message( 150 configured_stream.stream, AirbyteStreamStatus.STARTED 151 ) 152 yield from self._read_stream( 153 logger=logger, 154 stream_instance=stream_instance, 155 configured_stream=configured_stream, 156 state_manager=state_manager, 157 internal_config=internal_config, 158 ) 159 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 160 yield stream_status_as_airbyte_message( 161 configured_stream.stream, AirbyteStreamStatus.COMPLETE 162 ) 163 164 except Exception as e: 165 yield from self._emit_queued_messages() 166 logger.exception( 167 f"Encountered an exception while reading stream {configured_stream.stream.name}" 168 ) 169 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 170 yield stream_status_as_airbyte_message( 171 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 172 ) 173 174 stream_descriptor = StreamDescriptor(name=configured_stream.stream.name) 175 176 if isinstance(e, AirbyteTracedException): 177 traced_exception = e 178 info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error." 179 else: 180 traced_exception = self._serialize_exception( 181 stream_descriptor, e, stream_instance=stream_instance 182 ) 183 info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}" 184 185 yield traced_exception.as_sanitized_airbyte_message( 186 stream_descriptor=stream_descriptor 187 ) 188 stream_name_to_exception[stream_instance.name] = traced_exception # type: ignore # use configured_stream if stream_instance is None 189 if self.stop_sync_on_stream_failure: 190 logger.info(info_message) 191 break 192 finally: 193 # Finish read event only if the stream instance exists; 194 # otherwise, there's no need as it never started 195 if is_stream_exist: 196 timer.finish_event() 197 logger.info(f"Finished syncing {configured_stream.stream.name}") 198 logger.info(timer.report()) 199 200 if len(stream_name_to_exception) > 0: 201 error_message = generate_failed_streams_error_message( 202 {key: [value] for key, value in stream_name_to_exception.items()} 203 ) 204 logger.info(error_message) 205 # We still raise at least one exception when a stream raises an exception because the platform currently relies 206 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 207 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 208 raise AirbyteTracedException( 209 message=error_message, failure_type=FailureType.config_error 210 ) 211 logger.info(f"Finished syncing {self.name}") 212 213 @staticmethod 214 def _serialize_exception( 215 stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None 216 ) -> AirbyteTracedException: 217 display_message = stream_instance.get_error_display_message(e) if stream_instance else None 218 if display_message: 219 return AirbyteTracedException.from_exception( 220 e, message=display_message, stream_descriptor=stream_descriptor 221 ) 222 return AirbyteTracedException.from_exception(e, stream_descriptor=stream_descriptor) 223 224 @property 225 def raise_exception_on_missing_stream(self) -> bool: 226 return False 227 228 def _read_stream( 229 self, 230 logger: logging.Logger, 231 stream_instance: Stream, 232 configured_stream: ConfiguredAirbyteStream, 233 state_manager: ConnectorStateManager, 234 internal_config: InternalConfig, 235 ) -> Iterator[AirbyteMessage]: 236 if internal_config.page_size and isinstance(stream_instance, HttpStream): 237 logger.info( 238 f"Setting page size for {stream_instance.name} to {internal_config.page_size}" 239 ) 240 stream_instance.page_size = internal_config.page_size 241 logger.debug( 242 f"Syncing configured stream: {configured_stream.stream.name}", 243 extra={ 244 "sync_mode": configured_stream.sync_mode, 245 "primary_key": configured_stream.primary_key, 246 "cursor_field": configured_stream.cursor_field, 247 }, 248 ) 249 stream_instance.log_stream_sync_configuration() 250 251 stream_name = configured_stream.stream.name 252 stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace) 253 254 # This is a hack. Existing full refresh streams that are converted into resumable full refresh need to discard 255 # the state because the terminal state for a full refresh sync is not compatible with substream resumable full 256 # refresh state. This is only required when running live traffic regression testing since the platform normally 257 # handles whether to pass state 258 if stream_state == {"__ab_no_cursor_state_message": True}: 259 stream_state = {} 260 261 if "state" in dir(stream_instance): 262 stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance) 263 logger.info(f"Setting state of {self.name} stream to {stream_state}") 264 265 record_iterator = stream_instance.read( 266 configured_stream, 267 logger, 268 self._slice_logger, 269 stream_state, 270 state_manager, 271 internal_config, 272 ) 273 274 record_counter = 0 275 logger.info(f"Syncing stream: {stream_name} ") 276 for record_data_or_message in record_iterator: 277 record = self._get_message(record_data_or_message, stream_instance) 278 if record.type == MessageType.RECORD: 279 record_counter += 1 280 if record_counter == 1: 281 logger.info(f"Marking stream {stream_name} as RUNNING") 282 # If we just read the first record of the stream, emit the transition to the RUNNING state 283 yield stream_status_as_airbyte_message( 284 configured_stream.stream, AirbyteStreamStatus.RUNNING 285 ) 286 yield from self._emit_queued_messages() 287 yield record 288 289 logger.info(f"Read {record_counter} records from {stream_name} stream") 290 291 def _emit_queued_messages(self) -> Iterable[AirbyteMessage]: 292 if self.message_repository: 293 yield from self.message_repository.consume_queue() 294 return 295 296 def _get_message( 297 self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream 298 ) -> AirbyteMessage: 299 """ 300 Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage 301 """ 302 match record_data_or_message: 303 case AirbyteMessage(): 304 return record_data_or_message 305 case _: 306 return stream_data_to_airbyte_message( 307 stream.name, 308 record_data_or_message, 309 stream.transformer, 310 stream.get_json_schema(), 311 ) 312 313 @property 314 def message_repository(self) -> Union[None, MessageRepository]: 315 return _default_message_repository 316 317 @property 318 def stop_sync_on_stream_failure(self) -> bool: 319 """ 320 WARNING: This function is in-development which means it is subject to change. Use at your own risk. 321 322 By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then 323 continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync 324 on the first error seen and emit a single error trace message for that stream. 325 """ 326 return False
53class AbstractSource(Source, ABC): 54 """ 55 Abstract base class for an Airbyte Source. Consumers should implement any abstract methods 56 in this class to create an Airbyte Specification compliant Source. 57 """ 58 59 @abstractmethod 60 def check_connection( 61 self, logger: logging.Logger, config: Mapping[str, Any] 62 ) -> Tuple[bool, Optional[Any]]: 63 """ 64 :param logger: source logger 65 :param config: The user-provided configuration as specified by the source's spec. 66 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 67 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 68 and we can connect to the underlying data source using the provided configuration. 69 Otherwise, the input config cannot be used to connect to the underlying data source, 70 and the "error" object should describe what went wrong. 71 The error object will be cast to string to display the problem to the user. 72 """ 73 74 @abstractmethod 75 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 76 """ 77 :param config: The user-provided configuration as specified by the source's spec. 78 Any stream construction related operation should happen here. 79 :return: A list of the streams in this source connector. 80 """ 81 82 # Stream name to instance map for applying output object transformation 83 _stream_to_instance_map: Dict[str, Stream] = {} 84 _slice_logger: SliceLogger = DebugSliceLogger() 85 86 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 87 """Implements the Discover operation from the Airbyte Specification. 88 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover. 89 """ 90 streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)] 91 return AirbyteCatalog(streams=streams) 92 93 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 94 """Implements the Check Connection operation from the Airbyte Specification. 95 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check. 96 """ 97 check_succeeded, error = self.check_connection(logger, config) 98 if not check_succeeded: 99 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 100 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 101 102 def read( 103 self, 104 logger: logging.Logger, 105 config: Mapping[str, Any], 106 catalog: ConfiguredAirbyteCatalog, 107 state: Optional[List[AirbyteStateMessage]] = None, 108 ) -> Iterator[AirbyteMessage]: 109 """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.""" 110 logger.info(f"Starting syncing {self.name}") 111 config, internal_config = split_config(config) 112 # TODO assert all streams exist in the connector 113 # get the streams once in case the connector needs to make any queries to generate them 114 stream_instances = {s.name: s for s in self.streams(config)} 115 state_manager = ConnectorStateManager(state=state) 116 self._stream_to_instance_map = stream_instances 117 118 stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {} 119 120 with create_timer(self.name) as timer: 121 for configured_stream in catalog.streams: 122 stream_instance = stream_instances.get(configured_stream.stream.name) 123 is_stream_exist = bool(stream_instance) 124 try: 125 # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors 126 if not stream_instance: 127 if not self.raise_exception_on_missing_stream: 128 yield stream_status_as_airbyte_message( 129 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 130 ) 131 continue 132 133 error_message = ( 134 f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. " 135 f"Refresh the schema in your replication settings and remove this stream from future sync attempts." 136 ) 137 138 # Use configured_stream as stream_instance to support references in error handling. 139 stream_instance = configured_stream.stream 140 141 raise AirbyteTracedException( 142 message="A stream listed in your configuration was not found in the source. Please check the logs for more " 143 "details.", 144 internal_message=error_message, 145 failure_type=FailureType.config_error, 146 ) 147 148 timer.start_event(f"Syncing stream {configured_stream.stream.name}") 149 logger.info(f"Marking stream {configured_stream.stream.name} as STARTED") 150 yield stream_status_as_airbyte_message( 151 configured_stream.stream, AirbyteStreamStatus.STARTED 152 ) 153 yield from self._read_stream( 154 logger=logger, 155 stream_instance=stream_instance, 156 configured_stream=configured_stream, 157 state_manager=state_manager, 158 internal_config=internal_config, 159 ) 160 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 161 yield stream_status_as_airbyte_message( 162 configured_stream.stream, AirbyteStreamStatus.COMPLETE 163 ) 164 165 except Exception as e: 166 yield from self._emit_queued_messages() 167 logger.exception( 168 f"Encountered an exception while reading stream {configured_stream.stream.name}" 169 ) 170 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 171 yield stream_status_as_airbyte_message( 172 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 173 ) 174 175 stream_descriptor = StreamDescriptor(name=configured_stream.stream.name) 176 177 if isinstance(e, AirbyteTracedException): 178 traced_exception = e 179 info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error." 180 else: 181 traced_exception = self._serialize_exception( 182 stream_descriptor, e, stream_instance=stream_instance 183 ) 184 info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}" 185 186 yield traced_exception.as_sanitized_airbyte_message( 187 stream_descriptor=stream_descriptor 188 ) 189 stream_name_to_exception[stream_instance.name] = traced_exception # type: ignore # use configured_stream if stream_instance is None 190 if self.stop_sync_on_stream_failure: 191 logger.info(info_message) 192 break 193 finally: 194 # Finish read event only if the stream instance exists; 195 # otherwise, there's no need as it never started 196 if is_stream_exist: 197 timer.finish_event() 198 logger.info(f"Finished syncing {configured_stream.stream.name}") 199 logger.info(timer.report()) 200 201 if len(stream_name_to_exception) > 0: 202 error_message = generate_failed_streams_error_message( 203 {key: [value] for key, value in stream_name_to_exception.items()} 204 ) 205 logger.info(error_message) 206 # We still raise at least one exception when a stream raises an exception because the platform currently relies 207 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 208 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 209 raise AirbyteTracedException( 210 message=error_message, failure_type=FailureType.config_error 211 ) 212 logger.info(f"Finished syncing {self.name}") 213 214 @staticmethod 215 def _serialize_exception( 216 stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None 217 ) -> AirbyteTracedException: 218 display_message = stream_instance.get_error_display_message(e) if stream_instance else None 219 if display_message: 220 return AirbyteTracedException.from_exception( 221 e, message=display_message, stream_descriptor=stream_descriptor 222 ) 223 return AirbyteTracedException.from_exception(e, stream_descriptor=stream_descriptor) 224 225 @property 226 def raise_exception_on_missing_stream(self) -> bool: 227 return False 228 229 def _read_stream( 230 self, 231 logger: logging.Logger, 232 stream_instance: Stream, 233 configured_stream: ConfiguredAirbyteStream, 234 state_manager: ConnectorStateManager, 235 internal_config: InternalConfig, 236 ) -> Iterator[AirbyteMessage]: 237 if internal_config.page_size and isinstance(stream_instance, HttpStream): 238 logger.info( 239 f"Setting page size for {stream_instance.name} to {internal_config.page_size}" 240 ) 241 stream_instance.page_size = internal_config.page_size 242 logger.debug( 243 f"Syncing configured stream: {configured_stream.stream.name}", 244 extra={ 245 "sync_mode": configured_stream.sync_mode, 246 "primary_key": configured_stream.primary_key, 247 "cursor_field": configured_stream.cursor_field, 248 }, 249 ) 250 stream_instance.log_stream_sync_configuration() 251 252 stream_name = configured_stream.stream.name 253 stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace) 254 255 # This is a hack. Existing full refresh streams that are converted into resumable full refresh need to discard 256 # the state because the terminal state for a full refresh sync is not compatible with substream resumable full 257 # refresh state. This is only required when running live traffic regression testing since the platform normally 258 # handles whether to pass state 259 if stream_state == {"__ab_no_cursor_state_message": True}: 260 stream_state = {} 261 262 if "state" in dir(stream_instance): 263 stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance) 264 logger.info(f"Setting state of {self.name} stream to {stream_state}") 265 266 record_iterator = stream_instance.read( 267 configured_stream, 268 logger, 269 self._slice_logger, 270 stream_state, 271 state_manager, 272 internal_config, 273 ) 274 275 record_counter = 0 276 logger.info(f"Syncing stream: {stream_name} ") 277 for record_data_or_message in record_iterator: 278 record = self._get_message(record_data_or_message, stream_instance) 279 if record.type == MessageType.RECORD: 280 record_counter += 1 281 if record_counter == 1: 282 logger.info(f"Marking stream {stream_name} as RUNNING") 283 # If we just read the first record of the stream, emit the transition to the RUNNING state 284 yield stream_status_as_airbyte_message( 285 configured_stream.stream, AirbyteStreamStatus.RUNNING 286 ) 287 yield from self._emit_queued_messages() 288 yield record 289 290 logger.info(f"Read {record_counter} records from {stream_name} stream") 291 292 def _emit_queued_messages(self) -> Iterable[AirbyteMessage]: 293 if self.message_repository: 294 yield from self.message_repository.consume_queue() 295 return 296 297 def _get_message( 298 self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream 299 ) -> AirbyteMessage: 300 """ 301 Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage 302 """ 303 match record_data_or_message: 304 case AirbyteMessage(): 305 return record_data_or_message 306 case _: 307 return stream_data_to_airbyte_message( 308 stream.name, 309 record_data_or_message, 310 stream.transformer, 311 stream.get_json_schema(), 312 ) 313 314 @property 315 def message_repository(self) -> Union[None, MessageRepository]: 316 return _default_message_repository 317 318 @property 319 def stop_sync_on_stream_failure(self) -> bool: 320 """ 321 WARNING: This function is in-development which means it is subject to change. Use at your own risk. 322 323 By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then 324 continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync 325 on the first error seen and emit a single error trace message for that stream. 326 """ 327 return False
Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.
59 @abstractmethod 60 def check_connection( 61 self, logger: logging.Logger, config: Mapping[str, Any] 62 ) -> Tuple[bool, Optional[Any]]: 63 """ 64 :param logger: source logger 65 :param config: The user-provided configuration as specified by the source's spec. 66 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 67 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 68 and we can connect to the underlying data source using the provided configuration. 69 Otherwise, the input config cannot be used to connect to the underlying data source, 70 and the "error" object should describe what went wrong. 71 The error object will be cast to string to display the problem to the user. 72 """
Parameters
- logger: source logger
- config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.
74 @abstractmethod 75 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 76 """ 77 :param config: The user-provided configuration as specified by the source's spec. 78 Any stream construction related operation should happen here. 79 :return: A list of the streams in this source connector. 80 """
Parameters
- config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns
A list of the streams in this source connector.
86 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 87 """Implements the Discover operation from the Airbyte Specification. 88 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover. 89 """ 90 streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)] 91 return AirbyteCatalog(streams=streams)
Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
93 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 94 """Implements the Check Connection operation from the Airbyte Specification. 95 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check. 96 """ 97 check_succeeded, error = self.check_connection(logger, config) 98 if not check_succeeded: 99 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 100 return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
102 def read( 103 self, 104 logger: logging.Logger, 105 config: Mapping[str, Any], 106 catalog: ConfiguredAirbyteCatalog, 107 state: Optional[List[AirbyteStateMessage]] = None, 108 ) -> Iterator[AirbyteMessage]: 109 """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.""" 110 logger.info(f"Starting syncing {self.name}") 111 config, internal_config = split_config(config) 112 # TODO assert all streams exist in the connector 113 # get the streams once in case the connector needs to make any queries to generate them 114 stream_instances = {s.name: s for s in self.streams(config)} 115 state_manager = ConnectorStateManager(state=state) 116 self._stream_to_instance_map = stream_instances 117 118 stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {} 119 120 with create_timer(self.name) as timer: 121 for configured_stream in catalog.streams: 122 stream_instance = stream_instances.get(configured_stream.stream.name) 123 is_stream_exist = bool(stream_instance) 124 try: 125 # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors 126 if not stream_instance: 127 if not self.raise_exception_on_missing_stream: 128 yield stream_status_as_airbyte_message( 129 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 130 ) 131 continue 132 133 error_message = ( 134 f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. " 135 f"Refresh the schema in your replication settings and remove this stream from future sync attempts." 136 ) 137 138 # Use configured_stream as stream_instance to support references in error handling. 139 stream_instance = configured_stream.stream 140 141 raise AirbyteTracedException( 142 message="A stream listed in your configuration was not found in the source. Please check the logs for more " 143 "details.", 144 internal_message=error_message, 145 failure_type=FailureType.config_error, 146 ) 147 148 timer.start_event(f"Syncing stream {configured_stream.stream.name}") 149 logger.info(f"Marking stream {configured_stream.stream.name} as STARTED") 150 yield stream_status_as_airbyte_message( 151 configured_stream.stream, AirbyteStreamStatus.STARTED 152 ) 153 yield from self._read_stream( 154 logger=logger, 155 stream_instance=stream_instance, 156 configured_stream=configured_stream, 157 state_manager=state_manager, 158 internal_config=internal_config, 159 ) 160 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 161 yield stream_status_as_airbyte_message( 162 configured_stream.stream, AirbyteStreamStatus.COMPLETE 163 ) 164 165 except Exception as e: 166 yield from self._emit_queued_messages() 167 logger.exception( 168 f"Encountered an exception while reading stream {configured_stream.stream.name}" 169 ) 170 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 171 yield stream_status_as_airbyte_message( 172 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 173 ) 174 175 stream_descriptor = StreamDescriptor(name=configured_stream.stream.name) 176 177 if isinstance(e, AirbyteTracedException): 178 traced_exception = e 179 info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error." 180 else: 181 traced_exception = self._serialize_exception( 182 stream_descriptor, e, stream_instance=stream_instance 183 ) 184 info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}" 185 186 yield traced_exception.as_sanitized_airbyte_message( 187 stream_descriptor=stream_descriptor 188 ) 189 stream_name_to_exception[stream_instance.name] = traced_exception # type: ignore # use configured_stream if stream_instance is None 190 if self.stop_sync_on_stream_failure: 191 logger.info(info_message) 192 break 193 finally: 194 # Finish read event only if the stream instance exists; 195 # otherwise, there's no need as it never started 196 if is_stream_exist: 197 timer.finish_event() 198 logger.info(f"Finished syncing {configured_stream.stream.name}") 199 logger.info(timer.report()) 200 201 if len(stream_name_to_exception) > 0: 202 error_message = generate_failed_streams_error_message( 203 {key: [value] for key, value in stream_name_to_exception.items()} 204 ) 205 logger.info(error_message) 206 # We still raise at least one exception when a stream raises an exception because the platform currently relies 207 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 208 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 209 raise AirbyteTracedException( 210 message=error_message, failure_type=FailureType.config_error 211 ) 212 logger.info(f"Finished syncing {self.name}")
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.
318 @property 319 def stop_sync_on_stream_failure(self) -> bool: 320 """ 321 WARNING: This function is in-development which means it is subject to change. Use at your own risk. 322 323 By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then 324 continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync 325 on the first error seen and emit a single error trace message for that stream. 326 """ 327 return False
WARNING: This function is in-development which means it is subject to change. Use at your own risk.
By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync on the first error seen and emit a single error trace message for that stream.