airbyte_cdk.sources

 1#
 2# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
 3#
 4
 5import dpath.options
 6
 7from .abstract_source import AbstractSource
 8from .config import BaseConfig
 9from .source import Source
10
11# As part of the CDK sources, we do not control what the APIs return and it is possible that a key is empty.
12# Reasons why we are doing this at the airbyte_cdk level:
13# * As of today, all the use cases should allow for empty keys
14#     * Cases as of 2023-08-31: oauth/session token provider responses, extractor, transformation and substream)
15# * The behavior is explicit at the package level and not hidden in every package that needs dpath.options.ALLOW_EMPTY_STRING_KEYS = True
16# There is a downside in enforcing this option preemptively in the module __init__.py: the runtime code will import dpath even though the it
17# might not need dpath leading to longer initialization time.
18# There is a downside in using dpath as a library since the options are global: if we have two pieces of code that want different options,
19# this will not be thread-safe.
20dpath.options.ALLOW_EMPTY_STRING_KEYS = True
21
22__all__ = [
23    "AbstractSource",
24    "BaseConfig",
25    "Source",
26]
 53class AbstractSource(Source, ABC):
 54    """
 55    Abstract base class for an Airbyte Source. Consumers should implement any abstract methods
 56    in this class to create an Airbyte Specification compliant Source.
 57    """
 58
 59    @abstractmethod
 60    def check_connection(
 61        self, logger: logging.Logger, config: Mapping[str, Any]
 62    ) -> Tuple[bool, Optional[Any]]:
 63        """
 64        :param logger: source logger
 65        :param config: The user-provided configuration as specified by the source's spec.
 66          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
 67        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
 68          and we can connect to the underlying data source using the provided configuration.
 69          Otherwise, the input config cannot be used to connect to the underlying data source,
 70          and the "error" object should describe what went wrong.
 71          The error object will be cast to string to display the problem to the user.
 72        """
 73
 74    @abstractmethod
 75    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
 76        """
 77        :param config: The user-provided configuration as specified by the source's spec.
 78        Any stream construction related operation should happen here.
 79        :return: A list of the streams in this source connector.
 80        """
 81
 82    # Stream name to instance map for applying output object transformation
 83    _stream_to_instance_map: Dict[str, Stream] = {}
 84    _slice_logger: SliceLogger = DebugSliceLogger()
 85
 86    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
 87        """Implements the Discover operation from the Airbyte Specification.
 88        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
 89        """
 90        streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
 91        return AirbyteCatalog(streams=streams)
 92
 93    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
 94        """Implements the Check Connection operation from the Airbyte Specification.
 95        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
 96        """
 97        check_succeeded, error = self.check_connection(logger, config)
 98        if not check_succeeded:
 99            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
100        return AirbyteConnectionStatus(status=Status.SUCCEEDED)
101
102    def read(
103        self,
104        logger: logging.Logger,
105        config: Mapping[str, Any],
106        catalog: ConfiguredAirbyteCatalog,
107        state: Optional[List[AirbyteStateMessage]] = None,
108    ) -> Iterator[AirbyteMessage]:
109        """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
110        logger.info(f"Starting syncing {self.name}")
111        config, internal_config = split_config(config)
112        # TODO assert all streams exist in the connector
113        # get the streams once in case the connector needs to make any queries to generate them
114        stream_instances = {s.name: s for s in self.streams(config)}
115        state_manager = ConnectorStateManager(state=state)
116        self._stream_to_instance_map = stream_instances
117
118        stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {}
119
120        with create_timer(self.name) as timer:
121            for configured_stream in catalog.streams:
122                stream_instance = stream_instances.get(configured_stream.stream.name)
123                is_stream_exist = bool(stream_instance)
124                try:
125                    # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors
126                    if not stream_instance:
127                        if not self.raise_exception_on_missing_stream:
128                            yield stream_status_as_airbyte_message(
129                                configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
130                            )
131                            continue
132
133                        error_message = (
134                            f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
135                            f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
136                        )
137
138                        # Use configured_stream as stream_instance to support references in error handling.
139                        stream_instance = configured_stream.stream
140
141                        raise AirbyteTracedException(
142                            message="A stream listed in your configuration was not found in the source. Please check the logs for more "
143                            "details.",
144                            internal_message=error_message,
145                            failure_type=FailureType.config_error,
146                        )
147
148                    timer.start_event(f"Syncing stream {configured_stream.stream.name}")
149                    logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
150                    yield stream_status_as_airbyte_message(
151                        configured_stream.stream, AirbyteStreamStatus.STARTED
152                    )
153                    yield from self._read_stream(
154                        logger=logger,
155                        stream_instance=stream_instance,
156                        configured_stream=configured_stream,
157                        state_manager=state_manager,
158                        internal_config=internal_config,
159                    )
160                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
161                    yield stream_status_as_airbyte_message(
162                        configured_stream.stream, AirbyteStreamStatus.COMPLETE
163                    )
164
165                except Exception as e:
166                    yield from self._emit_queued_messages()
167                    logger.exception(
168                        f"Encountered an exception while reading stream {configured_stream.stream.name}"
169                    )
170                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
171                    yield stream_status_as_airbyte_message(
172                        configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
173                    )
174
175                    stream_descriptor = StreamDescriptor(name=configured_stream.stream.name)
176
177                    if isinstance(e, AirbyteTracedException):
178                        traced_exception = e
179                        info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
180                    else:
181                        traced_exception = self._serialize_exception(
182                            stream_descriptor, e, stream_instance=stream_instance
183                        )
184                        info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}"
185
186                    yield traced_exception.as_sanitized_airbyte_message(
187                        stream_descriptor=stream_descriptor
188                    )
189                    stream_name_to_exception[stream_instance.name] = traced_exception  # type: ignore # use configured_stream if stream_instance is None
190                    if self.stop_sync_on_stream_failure:
191                        logger.info(info_message)
192                        break
193                finally:
194                    # Finish read event only if the stream instance exists;
195                    # otherwise, there's no need as it never started
196                    if is_stream_exist:
197                        timer.finish_event()
198                        logger.info(f"Finished syncing {configured_stream.stream.name}")
199                        logger.info(timer.report())
200
201        if len(stream_name_to_exception) > 0:
202            error_message = generate_failed_streams_error_message(
203                {key: [value] for key, value in stream_name_to_exception.items()}
204            )
205            logger.info(error_message)
206            # We still raise at least one exception when a stream raises an exception because the platform currently relies
207            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
208            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
209            raise AirbyteTracedException(
210                message=error_message, failure_type=FailureType.config_error
211            )
212        logger.info(f"Finished syncing {self.name}")
213
214    @staticmethod
215    def _serialize_exception(
216        stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None
217    ) -> AirbyteTracedException:
218        display_message = stream_instance.get_error_display_message(e) if stream_instance else None
219        if display_message:
220            return AirbyteTracedException.from_exception(
221                e, message=display_message, stream_descriptor=stream_descriptor
222            )
223        return AirbyteTracedException.from_exception(e, stream_descriptor=stream_descriptor)
224
225    @property
226    def raise_exception_on_missing_stream(self) -> bool:
227        return False
228
229    def _read_stream(
230        self,
231        logger: logging.Logger,
232        stream_instance: Stream,
233        configured_stream: ConfiguredAirbyteStream,
234        state_manager: ConnectorStateManager,
235        internal_config: InternalConfig,
236    ) -> Iterator[AirbyteMessage]:
237        if internal_config.page_size and isinstance(stream_instance, HttpStream):
238            logger.info(
239                f"Setting page size for {stream_instance.name} to {internal_config.page_size}"
240            )
241            stream_instance.page_size = internal_config.page_size
242        logger.debug(
243            f"Syncing configured stream: {configured_stream.stream.name}",
244            extra={
245                "sync_mode": configured_stream.sync_mode,
246                "primary_key": configured_stream.primary_key,
247                "cursor_field": configured_stream.cursor_field,
248            },
249        )
250        stream_instance.log_stream_sync_configuration()
251
252        stream_name = configured_stream.stream.name
253        stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace)
254
255        # This is a hack. Existing full refresh streams that are converted into resumable full refresh need to discard
256        # the state because the terminal state for a full refresh sync is not compatible with substream resumable full
257        # refresh state. This is only required when running live traffic regression testing since the platform normally
258        # handles whether to pass state
259        if stream_state == {"__ab_no_cursor_state_message": True}:
260            stream_state = {}
261
262        if "state" in dir(stream_instance):
263            stream_instance.state = stream_state  # type: ignore # we check that state in the dir(stream_instance)
264            logger.info(f"Setting state of {self.name} stream to {stream_state}")
265
266        record_iterator = stream_instance.read(
267            configured_stream,
268            logger,
269            self._slice_logger,
270            stream_state,
271            state_manager,
272            internal_config,
273        )
274
275        record_counter = 0
276        logger.info(f"Syncing stream: {stream_name} ")
277        for record_data_or_message in record_iterator:
278            record = self._get_message(record_data_or_message, stream_instance)
279            if record.type == MessageType.RECORD:
280                record_counter += 1
281                if record_counter == 1:
282                    logger.info(f"Marking stream {stream_name} as RUNNING")
283                    # If we just read the first record of the stream, emit the transition to the RUNNING state
284                    yield stream_status_as_airbyte_message(
285                        configured_stream.stream, AirbyteStreamStatus.RUNNING
286                    )
287            yield from self._emit_queued_messages()
288            yield record
289
290        logger.info(f"Read {record_counter} records from {stream_name} stream")
291
292    def _emit_queued_messages(self) -> Iterable[AirbyteMessage]:
293        if self.message_repository:
294            yield from self.message_repository.consume_queue()
295        return
296
297    def _get_message(
298        self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream
299    ) -> AirbyteMessage:
300        """
301        Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
302        """
303        match record_data_or_message:
304            case AirbyteMessage():
305                return record_data_or_message
306            case _:
307                return stream_data_to_airbyte_message(
308                    stream.name,
309                    record_data_or_message,
310                    stream.transformer,
311                    stream.get_json_schema(),
312                )
313
314    @property
315    def message_repository(self) -> Union[None, MessageRepository]:
316        return _default_message_repository
317
318    @property
319    def stop_sync_on_stream_failure(self) -> bool:
320        """
321        WARNING: This function is in-development which means it is subject to change. Use at your own risk.
322
323        By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
324        continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync
325        on the first error seen and emit a single error trace message for that stream.
326        """
327        return False

Abstract base class for an Airbyte Source. Consumers should implement any abstract methods in this class to create an Airbyte Specification compliant Source.

@abstractmethod
def check_connection( self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
59    @abstractmethod
60    def check_connection(
61        self, logger: logging.Logger, config: Mapping[str, Any]
62    ) -> Tuple[bool, Optional[Any]]:
63        """
64        :param logger: source logger
65        :param config: The user-provided configuration as specified by the source's spec.
66          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
67        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
68          and we can connect to the underlying data source using the provided configuration.
69          Otherwise, the input config cannot be used to connect to the underlying data source,
70          and the "error" object should describe what went wrong.
71          The error object will be cast to string to display the problem to the user.
72        """
Parameters
  • logger: source logger
  • config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.

@abstractmethod
def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.Stream]:
74    @abstractmethod
75    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
76        """
77        :param config: The user-provided configuration as specified by the source's spec.
78        Any stream construction related operation should happen here.
79        :return: A list of the streams in this source connector.
80        """
Parameters
  • config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns

A list of the streams in this source connector.

def discover( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
86    def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
87        """Implements the Discover operation from the Airbyte Specification.
88        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
89        """
90        streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
91        return AirbyteCatalog(streams=streams)

Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
 93    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
 94        """Implements the Check Connection operation from the Airbyte Specification.
 95        See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
 96        """
 97        check_succeeded, error = self.check_connection(logger, config)
 98        if not check_succeeded:
 99            return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
100        return AirbyteConnectionStatus(status=Status.SUCCEEDED)

Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
102    def read(
103        self,
104        logger: logging.Logger,
105        config: Mapping[str, Any],
106        catalog: ConfiguredAirbyteCatalog,
107        state: Optional[List[AirbyteStateMessage]] = None,
108    ) -> Iterator[AirbyteMessage]:
109        """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
110        logger.info(f"Starting syncing {self.name}")
111        config, internal_config = split_config(config)
112        # TODO assert all streams exist in the connector
113        # get the streams once in case the connector needs to make any queries to generate them
114        stream_instances = {s.name: s for s in self.streams(config)}
115        state_manager = ConnectorStateManager(state=state)
116        self._stream_to_instance_map = stream_instances
117
118        stream_name_to_exception: MutableMapping[str, AirbyteTracedException] = {}
119
120        with create_timer(self.name) as timer:
121            for configured_stream in catalog.streams:
122                stream_instance = stream_instances.get(configured_stream.stream.name)
123                is_stream_exist = bool(stream_instance)
124                try:
125                    # Used direct reference to `stream_instance` instead of `is_stream_exist` to avoid mypy type checking errors
126                    if not stream_instance:
127                        if not self.raise_exception_on_missing_stream:
128                            yield stream_status_as_airbyte_message(
129                                configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
130                            )
131                            continue
132
133                        error_message = (
134                            f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
135                            f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
136                        )
137
138                        # Use configured_stream as stream_instance to support references in error handling.
139                        stream_instance = configured_stream.stream
140
141                        raise AirbyteTracedException(
142                            message="A stream listed in your configuration was not found in the source. Please check the logs for more "
143                            "details.",
144                            internal_message=error_message,
145                            failure_type=FailureType.config_error,
146                        )
147
148                    timer.start_event(f"Syncing stream {configured_stream.stream.name}")
149                    logger.info(f"Marking stream {configured_stream.stream.name} as STARTED")
150                    yield stream_status_as_airbyte_message(
151                        configured_stream.stream, AirbyteStreamStatus.STARTED
152                    )
153                    yield from self._read_stream(
154                        logger=logger,
155                        stream_instance=stream_instance,
156                        configured_stream=configured_stream,
157                        state_manager=state_manager,
158                        internal_config=internal_config,
159                    )
160                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
161                    yield stream_status_as_airbyte_message(
162                        configured_stream.stream, AirbyteStreamStatus.COMPLETE
163                    )
164
165                except Exception as e:
166                    yield from self._emit_queued_messages()
167                    logger.exception(
168                        f"Encountered an exception while reading stream {configured_stream.stream.name}"
169                    )
170                    logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
171                    yield stream_status_as_airbyte_message(
172                        configured_stream.stream, AirbyteStreamStatus.INCOMPLETE
173                    )
174
175                    stream_descriptor = StreamDescriptor(name=configured_stream.stream.name)
176
177                    if isinstance(e, AirbyteTracedException):
178                        traced_exception = e
179                        info_message = f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
180                    else:
181                        traced_exception = self._serialize_exception(
182                            stream_descriptor, e, stream_instance=stream_instance
183                        )
184                        info_message = f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}"
185
186                    yield traced_exception.as_sanitized_airbyte_message(
187                        stream_descriptor=stream_descriptor
188                    )
189                    stream_name_to_exception[stream_instance.name] = traced_exception  # type: ignore # use configured_stream if stream_instance is None
190                    if self.stop_sync_on_stream_failure:
191                        logger.info(info_message)
192                        break
193                finally:
194                    # Finish read event only if the stream instance exists;
195                    # otherwise, there's no need as it never started
196                    if is_stream_exist:
197                        timer.finish_event()
198                        logger.info(f"Finished syncing {configured_stream.stream.name}")
199                        logger.info(timer.report())
200
201        if len(stream_name_to_exception) > 0:
202            error_message = generate_failed_streams_error_message(
203                {key: [value] for key, value in stream_name_to_exception.items()}
204            )
205            logger.info(error_message)
206            # We still raise at least one exception when a stream raises an exception because the platform currently relies
207            # on a non-zero exit code to determine if a sync attempt has failed. We also raise the exception as a config_error
208            # type because this combined error isn't actionable, but rather the previously emitted individual errors.
209            raise AirbyteTracedException(
210                message=error_message, failure_type=FailureType.config_error
211            )
212        logger.info(f"Finished syncing {self.name}")

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.

raise_exception_on_missing_stream: bool
225    @property
226    def raise_exception_on_missing_stream(self) -> bool:
227        return False
message_repository: Optional[airbyte_cdk.MessageRepository]
314    @property
315    def message_repository(self) -> Union[None, MessageRepository]:
316        return _default_message_repository
stop_sync_on_stream_failure: bool
318    @property
319    def stop_sync_on_stream_failure(self) -> bool:
320        """
321        WARNING: This function is in-development which means it is subject to change. Use at your own risk.
322
323        By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
324        continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync
325        on the first error seen and emit a single error trace message for that stream.
326        """
327        return False

WARNING: This function is in-development which means it is subject to change. Use at your own risk.

By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then continue syncing the next stream. This can be overwritten on a per-source basis so that the source will stop the sync on the first error seen and emit a single error trace message for that stream.

class BaseConfig(pydantic.v1.main.BaseModel):
13class BaseConfig(BaseModel):
14    """Base class for connector spec, adds the following behaviour:
15
16    - resolve $ref and replace it with definition
17    - replace all occurrences of anyOf with oneOf
18    - drop description
19    """
20
21    @classmethod
22    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
23        """We're overriding the schema classmethod to enable some post-processing"""
24        schema = super().schema(*args, **kwargs)
25        rename_key(schema, old_key="anyOf", new_key="oneOf")  # UI supports only oneOf
26        expand_refs(schema)
27        schema.pop("description", None)  # description added from the docstring
28        return schema

Base class for connector spec, adds the following behaviour:

  • resolve $ref and replace it with definition
  • replace all occurrences of anyOf with oneOf
  • drop description
@classmethod
def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
21    @classmethod
22    def schema(cls, *args: Any, **kwargs: Any) -> Dict[str, Any]:
23        """We're overriding the schema classmethod to enable some post-processing"""
24        schema = super().schema(*args, **kwargs)
25        rename_key(schema, old_key="anyOf", new_key="oneOf")  # UI supports only oneOf
26        expand_refs(schema)
27        schema.pop("description", None)  # description added from the docstring
28        return schema

We're overriding the schema classmethod to enable some post-processing

56class Source(
57    DefaultConnectorMixin,
58    BaseSource[Mapping[str, Any], List[AirbyteStateMessage], ConfiguredAirbyteCatalog],
59    ABC,
60):
61    # can be overridden to change an input state.
62    @classmethod
63    def read_state(cls, state_path: str) -> List[AirbyteStateMessage]:
64        """
65        Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either
66        a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the
67        incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).
68        :param state_path: The filepath to where the stream states are located
69        :return: The complete stream state based on the connector's previous sync
70        """
71        parsed_state_messages = []
72        if state_path:
73            state_obj = BaseConnector._read_json_file(state_path)
74            if state_obj:
75                for state in state_obj:  # type: ignore  # `isinstance(state_obj, List)` ensures that this is a list
76                    parsed_message = AirbyteStateMessageSerializer.load(state)
77                    if (
78                        not parsed_message.stream
79                        and not parsed_message.data
80                        and not parsed_message.global_
81                    ):
82                        raise ValueError(
83                            "AirbyteStateMessage should contain either a stream, global, or state field"
84                        )
85                    parsed_state_messages.append(parsed_message)
86        return parsed_state_messages
87
88    # can be overridden to change an input catalog
89    @classmethod
90    def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog:
91        return ConfiguredAirbyteCatalogSerializer.load(cls._read_json_file(catalog_path))
92
93    @property
94    def name(self) -> str:
95        """Source name"""
96        return self.__class__.__name__

Helper class that provides a standard way to create an ABC using inheritance.

@classmethod
def read_state( cls, state_path: str) -> List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]:
62    @classmethod
63    def read_state(cls, state_path: str) -> List[AirbyteStateMessage]:
64        """
65        Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either
66        a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the
67        incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).
68        :param state_path: The filepath to where the stream states are located
69        :return: The complete stream state based on the connector's previous sync
70        """
71        parsed_state_messages = []
72        if state_path:
73            state_obj = BaseConnector._read_json_file(state_path)
74            if state_obj:
75                for state in state_obj:  # type: ignore  # `isinstance(state_obj, List)` ensures that this is a list
76                    parsed_message = AirbyteStateMessageSerializer.load(state)
77                    if (
78                        not parsed_message.stream
79                        and not parsed_message.data
80                        and not parsed_message.global_
81                    ):
82                        raise ValueError(
83                            "AirbyteStateMessage should contain either a stream, global, or state field"
84                        )
85                    parsed_state_messages.append(parsed_message)
86        return parsed_state_messages

Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).

Parameters
  • state_path: The filepath to where the stream states are located
Returns

The complete stream state based on the connector's previous sync

@classmethod
def read_catalog( cls, catalog_path: str) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog:
89    @classmethod
90    def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog:
91        return ConfiguredAirbyteCatalogSerializer.load(cls._read_json_file(catalog_path))
name: str
93    @property
94    def name(self) -> str:
95        """Source name"""
96        return self.__class__.__name__

Source name