airbyte_cdk.destinations
The destinations module provides classes for building destination connectors.
class
Destination(airbyte_cdk.connector.DefaultConnectorMixin, airbyte_cdk.connector.BaseConnector[typing.Mapping[str, typing.Any]], abc.ABC):
30class Destination(Connector, ABC): 31 VALID_CMDS = {"spec", "check", "write"} 32 33 @abstractmethod 34 def write( 35 self, 36 config: Mapping[str, Any], 37 configured_catalog: ConfiguredAirbyteCatalog, 38 input_messages: Iterable[AirbyteMessage], 39 ) -> Iterable[AirbyteMessage]: 40 """Implement to define how the connector writes data to the destination""" 41 42 def _run_check(self, config: Mapping[str, Any]) -> AirbyteMessage: 43 check_result = self.check(logger, config) 44 return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result) 45 46 def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]: 47 """Reads from stdin, converting to Airbyte messages""" 48 for line in input_stream: 49 try: 50 yield AirbyteMessageSerializer.load(orjson.loads(line)) 51 except orjson.JSONDecodeError: 52 logger.info( 53 f"ignoring input which can't be deserialized as Airbyte Message: {line}" 54 ) 55 56 def _run_write( 57 self, 58 config: Mapping[str, Any], 59 configured_catalog_path: str, 60 input_stream: io.TextIOWrapper, 61 ) -> Iterable[AirbyteMessage]: 62 catalog = ConfiguredAirbyteCatalogSerializer.load( 63 orjson.loads(open(configured_catalog_path).read()) 64 ) 65 input_messages = self._parse_input_stream(input_stream) 66 logger.info("Begin writing to the destination...") 67 yield from self.write( 68 config=config, configured_catalog=catalog, input_messages=input_messages 69 ) 70 logger.info("Writing complete.") 71 72 def parse_args(self, args: List[str]) -> argparse.Namespace: 73 """ 74 :param args: commandline arguments 75 :return: 76 """ 77 78 parent_parser = argparse.ArgumentParser(add_help=False) 79 main_parser = argparse.ArgumentParser() 80 subparsers = main_parser.add_subparsers(title="commands", dest="command") 81 82 # spec 83 subparsers.add_parser( 84 "spec", help="outputs the json configuration specification", parents=[parent_parser] 85 ) 86 87 # check 88 check_parser = subparsers.add_parser( 89 "check", help="checks the config can be used to connect", parents=[parent_parser] 90 ) 91 required_check_parser = check_parser.add_argument_group("required named arguments") 92 required_check_parser.add_argument( 93 "--config", type=str, required=True, help="path to the json configuration file" 94 ) 95 96 # write 97 write_parser = subparsers.add_parser( 98 "write", help="Writes data to the destination", parents=[parent_parser] 99 ) 100 write_required = write_parser.add_argument_group("required named arguments") 101 write_required.add_argument( 102 "--config", type=str, required=True, help="path to the JSON configuration file" 103 ) 104 write_required.add_argument( 105 "--catalog", type=str, required=True, help="path to the configured catalog JSON file" 106 ) 107 108 parsed_args = main_parser.parse_args(args) 109 cmd = parsed_args.command 110 if not cmd: 111 raise Exception("No command entered. ") 112 elif cmd not in ["spec", "check", "write"]: 113 # This is technically dead code since parse_args() would fail if this was the case 114 # But it's non-obvious enough to warrant placing it here anyways 115 raise Exception(f"Unknown command entered: {cmd}") 116 117 return parsed_args 118 119 def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]: 120 cmd = parsed_args.command 121 if cmd not in self.VALID_CMDS: 122 raise Exception(f"Unrecognized command: {cmd}") 123 124 spec = self.spec(logger) 125 if cmd == "spec": 126 yield AirbyteMessage(type=Type.SPEC, spec=spec) 127 return 128 config = self.read_config(config_path=parsed_args.config) 129 if self.check_config_against_spec or cmd == "check": 130 try: 131 check_config_against_spec_or_exit(config, spec) 132 except AirbyteTracedException as traced_exc: 133 connection_status = traced_exc.as_connection_status_message() 134 if connection_status and cmd == "check": 135 yield connection_status 136 return 137 raise traced_exc 138 139 if cmd == "check": 140 yield self._run_check(config=config) 141 elif cmd == "write": 142 # Wrap in UTF-8 to override any other input encodings 143 wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8") 144 yield from self._run_write( 145 config=config, 146 configured_catalog_path=parsed_args.catalog, 147 input_stream=wrapped_stdin, 148 ) 149 150 def run(self, args: List[str]) -> None: 151 init_uncaught_exception_handler(logger) 152 parsed_args = self.parse_args(args) 153 output_messages = self.run_cmd(parsed_args) 154 for message in output_messages: 155 print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
Helper class that provides a standard way to create an ABC using inheritance.
@abstractmethod
def
write( self, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, input_messages: Iterable[airbyte_cdk.AirbyteMessage]) -> Iterable[airbyte_cdk.AirbyteMessage]:
33 @abstractmethod 34 def write( 35 self, 36 config: Mapping[str, Any], 37 configured_catalog: ConfiguredAirbyteCatalog, 38 input_messages: Iterable[AirbyteMessage], 39 ) -> Iterable[AirbyteMessage]: 40 """Implement to define how the connector writes data to the destination"""
Implement to define how the connector writes data to the destination
def
parse_args(self, args: List[str]) -> argparse.Namespace:
72 def parse_args(self, args: List[str]) -> argparse.Namespace: 73 """ 74 :param args: commandline arguments 75 :return: 76 """ 77 78 parent_parser = argparse.ArgumentParser(add_help=False) 79 main_parser = argparse.ArgumentParser() 80 subparsers = main_parser.add_subparsers(title="commands", dest="command") 81 82 # spec 83 subparsers.add_parser( 84 "spec", help="outputs the json configuration specification", parents=[parent_parser] 85 ) 86 87 # check 88 check_parser = subparsers.add_parser( 89 "check", help="checks the config can be used to connect", parents=[parent_parser] 90 ) 91 required_check_parser = check_parser.add_argument_group("required named arguments") 92 required_check_parser.add_argument( 93 "--config", type=str, required=True, help="path to the json configuration file" 94 ) 95 96 # write 97 write_parser = subparsers.add_parser( 98 "write", help="Writes data to the destination", parents=[parent_parser] 99 ) 100 write_required = write_parser.add_argument_group("required named arguments") 101 write_required.add_argument( 102 "--config", type=str, required=True, help="path to the JSON configuration file" 103 ) 104 write_required.add_argument( 105 "--catalog", type=str, required=True, help="path to the configured catalog JSON file" 106 ) 107 108 parsed_args = main_parser.parse_args(args) 109 cmd = parsed_args.command 110 if not cmd: 111 raise Exception("No command entered. ") 112 elif cmd not in ["spec", "check", "write"]: 113 # This is technically dead code since parse_args() would fail if this was the case 114 # But it's non-obvious enough to warrant placing it here anyways 115 raise Exception(f"Unknown command entered: {cmd}") 116 117 return parsed_args
Parameters
- args: commandline arguments
Returns
119 def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]: 120 cmd = parsed_args.command 121 if cmd not in self.VALID_CMDS: 122 raise Exception(f"Unrecognized command: {cmd}") 123 124 spec = self.spec(logger) 125 if cmd == "spec": 126 yield AirbyteMessage(type=Type.SPEC, spec=spec) 127 return 128 config = self.read_config(config_path=parsed_args.config) 129 if self.check_config_against_spec or cmd == "check": 130 try: 131 check_config_against_spec_or_exit(config, spec) 132 except AirbyteTracedException as traced_exc: 133 connection_status = traced_exc.as_connection_status_message() 134 if connection_status and cmd == "check": 135 yield connection_status 136 return 137 raise traced_exc 138 139 if cmd == "check": 140 yield self._run_check(config=config) 141 elif cmd == "write": 142 # Wrap in UTF-8 to override any other input encodings 143 wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8") 144 yield from self._run_write( 145 config=config, 146 configured_catalog_path=parsed_args.catalog, 147 input_stream=wrapped_stdin, 148 )