airbyte_cdk.sources
1# 2# Copyright (c) 2021 Airbyte, Inc., all rights reserved. 3# 4 5import dpath.options 6 7from .abstract_source import AbstractSource 8from .config import BaseConfig 9from .source import Source 10 11# As part of the CDK sources, we do not control what the APIs return and it is possible that a key is empty. 12# Reasons why we are doing this at the airbyte_cdk level: 13# * As of today, all the use cases should allow for empty keys 14# * Cases as of 2023-08-31: oauth/session token provider responses, extractor, transformation and substream) 15# * The behavior is explicit at the package level and not hidden in every package that needs dpath.options.ALLOW_EMPTY_STRING_KEYS = True 16# There is a downside in enforcing this option preemptively in the module __init__.py: the runtime code will import dpath even though the it 17# might not need dpath leading to longer initialization time. 18# There is a downside in using dpath as a library since the options are global: if we have two pieces of code that want different options, 19# this will not be thread-safe. 20dpath.options.ALLOW_EMPTY_STRING_KEYS = True 21 22__all__ = [ 23 "AbstractSource", 24 "BaseConfig", 25 "Source", 26]
53class AbstractSource(Source, ABC): 54 """ 55 Abstract base class for an Airbyte Source. Consumers should implement any abstract methods 56 in this class to create an Airbyte Specification compliant Source. 57 """ 58 59 @abstractmethod 60 def check_connection( 61 self, logger: logging.Logger, config: Mapping[str, Any] 62 ) -> Tuple[bool, Optional[Any]]: 63 """ 64 :param logger: source logger 65 :param config: The user-provided configuration as specified by the source's spec. 66 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 67 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 68 and we can connect to the underlying data source using the provided configuration. 69 Otherwise, the input config cannot be used to connect to the underlying data source, 70 and the "error" object should describe what went wrong. 71 The error object will be cast to string to display the problem to the user. 72 """ 73 74 @abstractmethod 75 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 76 """ 77 :param config: The user-provided configuration as specified by the source's spec. 78 Any stream construction related operation should happen here. 79 :return: A list of the streams in this source connector. 80 """ 81 82 # Stream name to instance map for applying output object transformation 83 _stream_to_instance_map: Dict[str, Stream] = {} 84 _slice_logger: SliceLogger = DebugSliceLogger() 85 86 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 87 """Implements the Discover operation from the Airbyte Specification. 88 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover. 89 """ 90 streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)] 91 return AirbyteCatalog(streams=streams) 92 93 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 94 """Implements the Check Connection operation from the Airbyte Specification. 95 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check. 96 """ 97 check_succeeded, error = self.check_connection(logger, config) 98 if not check_succeeded: 99 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 100 return AirbyteConnectionStatus(status=Status.SUCCEEDED) 101 102 def read( 103 self, 104 logger: logging.Logger, 105 config: Mapping[str, Any], 106 catalog: ConfiguredAirbyteCatalog, 107 state: Optional[List[AirbyteStateMessage]] = None, 108 ) -> Iterator[AirbyteMessage]: 109 """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.""" 110 logger.info(f"Starting syncing {self.name}") 111 config, internal_config = split_config(config) 112 # TODO assert all streams exist in the connector 113 # get the streams once in case the connector needs to make any queries to generate them 114 stream_instances = {s.name: s for s in self.streams(config)} 115 state_manager = ConnectorStateManager(state=state) 116 self._stream_to_instance_map = stream_instances 117 118 stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {} 119 120 with create_timer(self.name) as timer: 121 for configured_stream in catalog.streams: 122 stream_instance = stream_instances.get(configured_stream.stream.name) 123 is_stream_exist = bool(stream_instance) 124 try: 125 # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors 126 if not stream_instance: 127 if not self.raise_exception_on_missing_stream: 128 yield stream_status_as_airbyte_message( 129 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 130 ) 131 continue 132 133 error_message = ( 134 f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. " 135 f"Refresh the schema in your replication settings and remove this stream from future sync attempts." 136 ) 137 138 # Use configured_stream as stream_instance to support references in error handling. 139 stream_instance = configured_stream.stream 140 141 raise AirbyteTracedException( 142 message="A stream listed in your configuration was not found in the source. Please check the logs for more " 143 "details.", 144 internal_message=error_message, 145 failure_type=FailureType.config_error, 146 ) 147 148 timer.start_event(f"Syncing stream {configured_stream.stream.name}") 149 logger.info(f"Marking stream {configured_stream.stream.name} as STARTED") 150 yield stream_status_as_airbyte_message( 151 configured_stream.stream, AirbyteStreamStatus.STARTED 152 ) 153 yield from self._read_stream( 154 logger=logger, 155 stream_instance=stream_instance, 156 configured_stream=configured_stream, 157 state_manager=state_manager, 158 internal_config=internal_config, 159 ) 160 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 161 yield stream_status_as_airbyte_message( 162 configured_stream.stream, AirbyteStreamStatus.COMPLETE 163 ) 164 165 except Exception as e: 166 yield from self._emit_queued_messages() 167 logger.exception( 168 f"Encountered an exception while reading stream {configured_stream.stream.name}" 169 ) 170 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 171 yield stream_status_as_airbyte_message( 172 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 173 ) 174 175 stream_descriptor = StreamDescriptor(name=configured_stream.stream.name) 176 177 if isinstance(e, AirbyteTracedException): 178 traced_exception = e 179 info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error." 180 else: 181 traced_exception = self._serialize_exception( 182 stream_descriptor, e, stream_instance=stream_instance 183 ) 184 info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}" 185 186 yield traced_exception.as_sanitized_airbyte_message( 187 stream_descriptor=stream_descriptor 188 ) 189 stream_name_to_exception[stream_instance.name] = traced_exception # type: ignore # use configured_stream if stream_instance is None 190 if self.stop_sync_on_stream_failure: 191 logger.info(info_message) 192 break 193 finally: 194 # Finish read event only if the stream instance exists; 195 # otherwise, there's no need as it never started 196 if is_stream_exist: 197 timer.finish_event() 198 logger.info(f"Finished syncing {configured_stream.stream.name}") 199 logger.info(timer.report()) 200 201 if len(stream_name_to_exception) > 0: 202 error_message = generate_failed_streams_error_message( 203 {key: [value] for key, value in stream_name_to_exception.items()} 204 ) 205 logger.info(error_message) 206 # We still raise at least one exception when a stream raises an exception because the platform currently relies 207 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 208 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 209 raise AirbyteTracedException( 210 message=error_message, failure_type=FailureType.config_error 211 ) 212 logger.info(f"Finished syncing {self.name}") 213 214 @staticmethod 215 def _serialize_exception( 216 stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None 217 ) -> AirbyteTracedException: 218 display_message = stream_instance.get_error_display_message(e) if stream_instance else None 219 if display_message: 220 return AirbyteTracedException.from_exception( 221 e, message=display_message, stream_descriptor=stream_descriptor 222 ) 223 return AirbyteTracedException.from_exception(e, stream_descriptor=stream_descriptor) 224 225 @property 226 def raise_exception_on_missing_stream(self) -> bool: 227 return False 228 229 def _read_stream( 230 self, 231 logger: logging.Logger, 232 stream_instance: Stream, 233 configured_stream: ConfiguredAirbyteStream, 234 state_manager: ConnectorStateManager, 235 internal_config: InternalConfig, 236 ) -> Iterator[AirbyteMessage]: 237 if internal_config.page_size and isinstance(stream_instance, HttpStream): 238 logger.info( 239 f"Setting page size for {stream_instance.name} to {internal_config.page_size}" 240 ) 241 stream_instance.page_size = internal_config.page_size 242 logger.debug( 243 f"Syncing configured stream: {configured_stream.stream.name}", 244 extra={ 245 "sync_mode": configured_stream.sync_mode, 246 "primary_key": configured_stream.primary_key, 247 "cursor_field": configured_stream.cursor_field, 248 }, 249 ) 250 stream_instance.log_stream_sync_configuration() 251 252 stream_name = configured_stream.stream.name 253 stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace) 254 255 # This is a hack. Existing full refresh streams that are converted into resumable full refresh need to discard 256 # the state because the terminal state for a full refresh sync is not compatible with substream resumable full 257 # refresh state. This is only required when running live traffic regression testing since the platform normally 258 # handles whether to pass state 259 if stream_state == {"__ab_no_cursor_state_message": True}: 260 stream_state = {} 261 262 if "state" in dir(stream_instance): 263 stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance) 264 logger.info(f"Setting state of {self.name} stream to {stream_state}") 265 266 record_iterator = stream_instance.read( 267 configured_stream, 268 logger, 269 self._slice_logger, 270 stream_state, 271 state_manager, 272 internal_config, 273 ) 274 275 record_counter = 0 276 logger.info(f"Syncing stream: {stream_name} ") 277 for record_data_or_message in record_iterator: 278 record = self._get_message(record_data_or_message, stream_instance) 279 if record.type == MessageType.RECORD: 280 record_counter += 1 281 if record_counter == 1: 282 logger.info(f"Marking stream {stream_name} as RUNNING") 283 # If we just read the first record of the stream, emit the transition to the RUNNING state 284 yield stream_status_as_airbyte_message( 285 configured_stream.stream, AirbyteStreamStatus.RUNNING 286 ) 287 yield from self._emit_queued_messages() 288 yield record 289 290 logger.info(f"Read {record_counter} records from {stream_name} stream") 291 292 def _emit_queued_messages(self) -> Iterable[AirbyteMessage]: 293 if self.message_repository: 294 yield from self.message_repository.consume_queue() 295 return 296 297 def _get_message( 298 self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream 299 ) -> AirbyteMessage: 300 """ 301 Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage 302 """ 303 match record_data_or_message: 304 case AirbyteMessage(): 305 return record_data_or_message 306 case _: 307 return stream_data_to_airbyte_message( 308 stream.name, 309 record_data_or_message, 310 stream.transformer, 311 stream.get_json_schema(), 312 ) 313 314 @property 315 def message_repository(self) -> Union[None, MessageRepository]: 316 return _default_message_repository 317 318 @property 319 def stop_sync_on_stream_failure(self) -> bool: 320 """ 321 WARNING: This function is in-development which means it is subject to change. Use at your own risk. 322 323 By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then 324 continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync 325 on the first error seen and emit a single error trace message for that stream. 326 """ 327 return False
Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.
59 @abstractmethod 60 def check_connection( 61 self, logger: logging.Logger, config: Mapping[str, Any] 62 ) -> Tuple[bool, Optional[Any]]: 63 """ 64 :param logger: source logger 65 :param config: The user-provided configuration as specified by the source's spec. 66 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 67 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 68 and we can connect to the underlying data source using the provided configuration. 69 Otherwise, the input config cannot be used to connect to the underlying data source, 70 and the "error" object should describe what went wrong. 71 The error object will be cast to string to display the problem to the user. 72 """
Parameters
- logger: source logger
- config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.
74 @abstractmethod 75 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 76 """ 77 :param config: The user-provided configuration as specified by the source's spec. 78 Any stream construction related operation should happen here. 79 :return: A list of the streams in this source connector. 80 """
Parameters
- config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns
A list of the streams in this source connector.
86 def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: 87 """Implements the Discover operation from the Airbyte Specification. 88 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover. 89 """ 90 streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)] 91 return AirbyteCatalog(streams=streams)
Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
93 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 94 """Implements the Check Connection operation from the Airbyte Specification. 95 See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check. 96 """ 97 check_succeeded, error = self.check_connection(logger, config) 98 if not check_succeeded: 99 return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) 100 return AirbyteConnectionStatus(status=Status.SUCCEEDED)
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
102 def read( 103 self, 104 logger: logging.Logger, 105 config: Mapping[str, Any], 106 catalog: ConfiguredAirbyteCatalog, 107 state: Optional[List[AirbyteStateMessage]] = None, 108 ) -> Iterator[AirbyteMessage]: 109 """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.""" 110 logger.info(f"Starting syncing {self.name}") 111 config, internal_config = split_config(config) 112 # TODO assert all streams exist in the connector 113 # get the streams once in case the connector needs to make any queries to generate them 114 stream_instances = {s.name: s for s in self.streams(config)} 115 state_manager = ConnectorStateManager(state=state) 116 self._stream_to_instance_map = stream_instances 117 118 stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {} 119 120 with create_timer(self.name) as timer: 121 for configured_stream in catalog.streams: 122 stream_instance = stream_instances.get(configured_stream.stream.name) 123 is_stream_exist = bool(stream_instance) 124 try: 125 # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors 126 if not stream_instance: 127 if not self.raise_exception_on_missing_stream: 128 yield stream_status_as_airbyte_message( 129 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 130 ) 131 continue 132 133 error_message = ( 134 f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. " 135 f"Refresh the schema in your replication settings and remove this stream from future sync attempts." 136 ) 137 138 # Use configured_stream as stream_instance to support references in error handling. 139 stream_instance = configured_stream.stream 140 141 raise AirbyteTracedException( 142 message="A stream listed in your configuration was not found in the source. Please check the logs for more " 143 "details.", 144 internal_message=error_message, 145 failure_type=FailureType.config_error, 146 ) 147 148 timer.start_event(f"Syncing stream {configured_stream.stream.name}") 149 logger.info(f"Marking stream {configured_stream.stream.name} as STARTED") 150 yield stream_status_as_airbyte_message( 151 configured_stream.stream, AirbyteStreamStatus.STARTED 152 ) 153 yield from self._read_stream( 154 logger=logger, 155 stream_instance=stream_instance, 156 configured_stream=configured_stream, 157 state_manager=state_manager, 158 internal_config=internal_config, 159 ) 160 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 161 yield stream_status_as_airbyte_message( 162 configured_stream.stream, AirbyteStreamStatus.COMPLETE 163 ) 164 165 except Exception as e: 166 yield from self._emit_queued_messages() 167 logger.exception( 168 f"Encountered an exception while reading stream {configured_stream.stream.name}" 169 ) 170 logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED") 171 yield stream_status_as_airbyte_message( 172 configured_stream.stream, AirbyteStreamStatus.INCOMPLETE 173 ) 174 175 stream_descriptor = StreamDescriptor(name=configured_stream.stream.name) 176 177 if isinstance(e, AirbyteTracedException): 178 traced_exception = e 179 info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error." 180 else: 181 traced_exception = self._serialize_exception( 182 stream_descriptor, e, stream_instance=stream_instance 183 ) 184 info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}" 185 186 yield traced_exception.as_sanitized_airbyte_message( 187 stream_descriptor=stream_descriptor 188 ) 189 stream_name_to_exception[stream_instance.name] = traced_exception # type: ignore # use configured_stream if stream_instance is None 190 if self.stop_sync_on_stream_failure: 191 logger.info(info_message) 192 break 193 finally: 194 # Finish read event only if the stream instance exists; 195 # otherwise, there's no need as it never started 196 if is_stream_exist: 197 timer.finish_event() 198 logger.info(f"Finished syncing {configured_stream.stream.name}") 199 logger.info(timer.report()) 200 201 if len(stream_name_to_exception) > 0: 202 error_message = generate_failed_streams_error_message( 203 {key: [value] for key, value in stream_name_to_exception.items()} 204 ) 205 logger.info(error_message) 206 # We still raise at least one exception when a stream raises an exception because the platform currently relies 207 # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error 208 # type because this combined error isn't actionable, but rather the previously emitted individual errors. 209 raise AirbyteTracedException( 210 message=error_message, failure_type=FailureType.config_error 211 ) 212 logger.info(f"Finished syncing {self.name}")
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.
318 @property 319 def stop_sync_on_stream_failure(self) -> bool: 320 """ 321 WARNING: This function is in-development which means it is subject to change. Use at your own risk. 322 323 By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then 324 continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync 325 on the first error seen and emit a single error trace message for that stream. 326 """ 327 return False
WARNING: This function is in-development which means it is subject to change. Use at your own risk.
By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync on the first error seen and emit a single error trace message for that stream.
13class BaseConfig(BaseModel): 14 """Base class for connector spec, adds the following behaviour: 15 16 - resolve $ref and replace it with definition 17 - replace all occurrences of anyOf with oneOf 18 - drop description 19 """ 20 21 @classmethod 22 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 23 """We're overriding the schema classmethod to enable some post-processing""" 24 schema = super().schema(*args, **kwargs) 25 rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf 26 expand_refs(schema) 27 schema.pop("description", None) # description added from the docstring 28 return schema
Base class for connector spec, adds the following behaviour:
- resolve $ref and replace it with definition
- replace all occurrences of anyOf with oneOf
- drop description
21 @classmethod 22 def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]: 23 """We're overriding the schema classmethod to enable some post-processing""" 24 schema = super().schema(*args, **kwargs) 25 rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf 26 expand_refs(schema) 27 schema.pop("description", None) # description added from the docstring 28 return schema
We're overriding the schema classmethod to enable some post-processing
56class Source( 57 DefaultConnectorMixin, 58 BaseSource[Mapping[str, Any], List[AirbyteStateMessage], ConfiguredAirbyteCatalog], 59 ABC, 60): 61 # can be overridden to change an input state. 62 @classmethod 63 def read_state(cls, state_path: str) -> List[AirbyteStateMessage]: 64 """ 65 Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either 66 a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the 67 incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s). 68 :param state_path: The filepath to where the stream states are located 69 :return: The complete stream state based on the connector's previous sync 70 """ 71 parsed_state_messages = [] 72 if state_path: 73 state_obj = BaseConnector._read_json_file(state_path) 74 if state_obj: 75 for state in state_obj: # type: ignore # `isinstance(state_obj, List)` ensures that this is a list 76 parsed_message = AirbyteStateMessageSerializer.load(state) 77 if ( 78 not parsed_message.stream 79 and not parsed_message.data 80 and not parsed_message.global_ 81 ): 82 raise ValueError( 83 "AirbyteStateMessage should contain either a stream, global, or state field" 84 ) 85 parsed_state_messages.append(parsed_message) 86 return parsed_state_messages 87 88 # can be overridden to change an input catalog 89 @classmethod 90 def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog: 91 return ConfiguredAirbyteCatalogSerializer.load(cls._read_json_file(catalog_path)) 92 93 @property 94 def name(self) -> str: 95 """Source name""" 96 return self.__class__.__name__
Helper class that provides a standard way to create an ABC using inheritance.
62 @classmethod 63 def read_state(cls, state_path: str) -> List[AirbyteStateMessage]: 64 """ 65 Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either 66 a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the 67 incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s). 68 :param state_path: The filepath to where the stream states are located 69 :return: The complete stream state based on the connector's previous sync 70 """ 71 parsed_state_messages = [] 72 if state_path: 73 state_obj = BaseConnector._read_json_file(state_path) 74 if state_obj: 75 for state in state_obj: # type: ignore # `isinstance(state_obj, List)` ensures that this is a list 76 parsed_message = AirbyteStateMessageSerializer.load(state) 77 if ( 78 not parsed_message.stream 79 and not parsed_message.data 80 and not parsed_message.global_ 81 ): 82 raise ValueError( 83 "AirbyteStateMessage should contain either a stream, global, or state field" 84 ) 85 parsed_state_messages.append(parsed_message) 86 return parsed_state_messages
Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).
Parameters
- state_path: The filepath to where the stream states are located
Returns
The complete stream state based on the connector's previous sync