airbyte_cdk.sources.source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6import logging 7from abc import ABC, abstractmethod 8from typing import Any, Generic, Iterable, List, Mapping, Optional, TypeVar 9 10from airbyte_cdk.connector import BaseConnector, DefaultConnectorMixin, TConfig 11from airbyte_cdk.models import ( 12 AirbyteCatalog, 13 AirbyteMessage, 14 AirbyteStateMessage, 15 AirbyteStateMessageSerializer, 16 ConfiguredAirbyteCatalog, 17 ConfiguredAirbyteCatalogSerializer, 18) 19 20TState = TypeVar("TState") 21TCatalog = TypeVar("TCatalog") 22 23 24class ExperimentalClassWarning(DeprecationWarning): 25 pass 26 27 28class BaseSource(BaseConnector[TConfig], ABC, Generic[TConfig, TState, TCatalog]): 29 @abstractmethod 30 def read_state(self, state_path: str) -> TState: ... 31 32 @abstractmethod 33 def read_catalog(self, catalog_path: str) -> TCatalog: ... 34 35 @abstractmethod 36 def read( 37 self, 38 logger: logging.Logger, 39 config: TConfig, 40 catalog: TCatalog, 41 state: Optional[TState] = None, 42 ) -> Iterable[AirbyteMessage]: 43 """ 44 Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state. 45 """ 46 47 @abstractmethod 48 def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog: 49 """ 50 Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a 51 Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field. 52 """ 53 54 55class Source( 56 DefaultConnectorMixin, 57 BaseSource[Mapping[str, Any], List[AirbyteStateMessage], ConfiguredAirbyteCatalog], 58 ABC, 59): 60 # can be overridden to change an input state. 61 @classmethod 62 def read_state(cls, state_path: str) -> List[AirbyteStateMessage]: 63 """ 64 Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either 65 a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the 66 incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s). 67 :param state_path: The filepath to where the stream states are located 68 :return: The complete stream state based on the connector's previous sync 69 """ 70 parsed_state_messages = [] 71 if state_path: 72 state_obj = BaseConnector._read_json_file(state_path) 73 if state_obj: 74 for state in state_obj: # type: ignore # `isinstance(state_obj, List)` ensures that this is a list 75 parsed_message = AirbyteStateMessageSerializer.load(state) 76 if ( 77 not parsed_message.stream 78 and not parsed_message.data 79 and not parsed_message.global_ 80 ): 81 raise ValueError( 82 "AirbyteStateMessage should contain either a stream, global, or state field" 83 ) 84 parsed_state_messages.append(parsed_message) 85 return parsed_state_messages 86 87 # can be overridden to change an input catalog 88 @classmethod 89 def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog: 90 return ConfiguredAirbyteCatalogSerializer.load(cls._read_json_file(catalog_path)) 91 92 @property 93 def name(self) -> str: 94 """Source name""" 95 return self.__class__.__name__
Base class for warnings about deprecated features.
29class BaseSource(BaseConnector[TConfig], ABC, Generic[TConfig, TState, TCatalog]): 30 @abstractmethod 31 def read_state(self, state_path: str) -> TState: ... 32 33 @abstractmethod 34 def read_catalog(self, catalog_path: str) -> TCatalog: ... 35 36 @abstractmethod 37 def read( 38 self, 39 logger: logging.Logger, 40 config: TConfig, 41 catalog: TCatalog, 42 state: Optional[TState] = None, 43 ) -> Iterable[AirbyteMessage]: 44 """ 45 Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state. 46 """ 47 48 @abstractmethod 49 def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog: 50 """ 51 Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a 52 Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field. 53 """
Helper class that provides a standard way to create an ABC using inheritance.
36 @abstractmethod 37 def read( 38 self, 39 logger: logging.Logger, 40 config: TConfig, 41 catalog: TCatalog, 42 state: Optional[TState] = None, 43 ) -> Iterable[AirbyteMessage]: 44 """ 45 Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state. 46 """
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
48 @abstractmethod 49 def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog: 50 """ 51 Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a 52 Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field. 53 """
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
56class Source( 57 DefaultConnectorMixin, 58 BaseSource[Mapping[str, Any], List[AirbyteStateMessage], ConfiguredAirbyteCatalog], 59 ABC, 60): 61 # can be overridden to change an input state. 62 @classmethod 63 def read_state(cls, state_path: str) -> List[AirbyteStateMessage]: 64 """ 65 Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either 66 a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the 67 incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s). 68 :param state_path: The filepath to where the stream states are located 69 :return: The complete stream state based on the connector's previous sync 70 """ 71 parsed_state_messages = [] 72 if state_path: 73 state_obj = BaseConnector._read_json_file(state_path) 74 if state_obj: 75 for state in state_obj: # type: ignore # `isinstance(state_obj, List)` ensures that this is a list 76 parsed_message = AirbyteStateMessageSerializer.load(state) 77 if ( 78 not parsed_message.stream 79 and not parsed_message.data 80 and not parsed_message.global_ 81 ): 82 raise ValueError( 83 "AirbyteStateMessage should contain either a stream, global, or state field" 84 ) 85 parsed_state_messages.append(parsed_message) 86 return parsed_state_messages 87 88 # can be overridden to change an input catalog 89 @classmethod 90 def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog: 91 return ConfiguredAirbyteCatalogSerializer.load(cls._read_json_file(catalog_path)) 92 93 @property 94 def name(self) -> str: 95 """Source name""" 96 return self.__class__.__name__
Helper class that provides a standard way to create an ABC using inheritance.
62 @classmethod 63 def read_state(cls, state_path: str) -> List[AirbyteStateMessage]: 64 """ 65 Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either 66 a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the 67 incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s). 68 :param state_path: The filepath to where the stream states are located 69 :return: The complete stream state based on the connector's previous sync 70 """ 71 parsed_state_messages = [] 72 if state_path: 73 state_obj = BaseConnector._read_json_file(state_path) 74 if state_obj: 75 for state in state_obj: # type: ignore # `isinstance(state_obj, List)` ensures that this is a list 76 parsed_message = AirbyteStateMessageSerializer.load(state) 77 if ( 78 not parsed_message.stream 79 and not parsed_message.data 80 and not parsed_message.global_ 81 ): 82 raise ValueError( 83 "AirbyteStateMessage should contain either a stream, global, or state field" 84 ) 85 parsed_state_messages.append(parsed_message) 86 return parsed_state_messages
Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).
Parameters
- state_path: The filepath to where the stream states are located
Returns
The complete stream state based on the connector's previous sync