airbyte_cdk.connector_builder.main
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6import sys 7from typing import Any, List, Mapping, Optional, Tuple 8 9import orjson 10 11from airbyte_cdk.connector import BaseConnector 12from airbyte_cdk.connector_builder.connector_builder_handler import ( 13 TestLimits, 14 create_source, 15 full_resolve_manifest, 16 get_limits, 17 read_stream, 18 resolve_manifest, 19) 20from airbyte_cdk.entrypoint import AirbyteEntrypoint 21from airbyte_cdk.models import ( 22 AirbyteMessage, 23 AirbyteMessageSerializer, 24 AirbyteStateMessage, 25 ConfiguredAirbyteCatalog, 26 ConfiguredAirbyteCatalogSerializer, 27) 28from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource 29from airbyte_cdk.sources.source import Source 30from airbyte_cdk.utils.traced_exception import AirbyteTracedException 31 32 33def get_config_and_catalog_from_args( 34 args: List[str], 35) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]: 36 # TODO: Add functionality for the `debug` logger. 37 # Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`. 38 parsed_args = AirbyteEntrypoint.parse_args(args) 39 config_path, catalog_path, state_path = ( 40 parsed_args.config, 41 parsed_args.catalog, 42 parsed_args.state, 43 ) 44 if parsed_args.command != "read": 45 raise ValueError("Only read commands are allowed for Connector Builder requests.") 46 47 config = BaseConnector.read_config(config_path) 48 49 if "__command" not in config: 50 raise ValueError( 51 f"Invalid config: `__command` should be provided at the root of the config but config only has keys {list(config.keys())}" 52 ) 53 54 command = config["__command"] 55 if command == "test_read": 56 catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path)) 57 state = Source.read_state(state_path) 58 else: 59 catalog = None 60 state = [] 61 62 if "__injected_declarative_manifest" not in config: 63 raise ValueError( 64 f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" 65 ) 66 67 return command, config, catalog, state 68 69 70def handle_connector_builder_request( 71 source: ManifestDeclarativeSource, 72 command: str, 73 config: Mapping[str, Any], 74 catalog: Optional[ConfiguredAirbyteCatalog], 75 state: List[AirbyteStateMessage], 76 limits: TestLimits, 77) -> AirbyteMessage: 78 if command == "resolve_manifest": 79 return resolve_manifest(source) 80 elif command == "test_read": 81 assert catalog is not None, ( 82 "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." 83 ) 84 return read_stream(source, config, catalog, state, limits) 85 elif command == "full_resolve_manifest": 86 return full_resolve_manifest(source, limits) 87 else: 88 raise ValueError(f"Unrecognized command {command}.") 89 90 91def handle_request(args: List[str]) -> str: 92 command, config, catalog, state = get_config_and_catalog_from_args(args) 93 limits = get_limits(config) 94 source = create_source(config, limits) 95 return orjson.dumps( 96 AirbyteMessageSerializer.dump( 97 handle_connector_builder_request(source, command, config, catalog, state, limits) 98 ) 99 ).decode() # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage 100 101 102if __name__ == "__main__": 103 try: 104 print(handle_request(sys.argv[1:])) 105 except Exception as exc: 106 error = AirbyteTracedException.from_exception( 107 exc, message=f"Error handling request: {str(exc)}" 108 ) 109 m = error.as_airbyte_message() 110 print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
def
get_config_and_catalog_from_args( args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], Any]:
34def get_config_and_catalog_from_args( 35 args: List[str], 36) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]: 37 # TODO: Add functionality for the `debug` logger. 38 # Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`. 39 parsed_args = AirbyteEntrypoint.parse_args(args) 40 config_path, catalog_path, state_path = ( 41 parsed_args.config, 42 parsed_args.catalog, 43 parsed_args.state, 44 ) 45 if parsed_args.command != "read": 46 raise ValueError("Only read commands are allowed for Connector Builder requests.") 47 48 config = BaseConnector.read_config(config_path) 49 50 if "__command" not in config: 51 raise ValueError( 52 f"Invalid config: `__command` should be provided at the root of the config but config only has keys {list(config.keys())}" 53 ) 54 55 command = config["__command"] 56 if command == "test_read": 57 catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path)) 58 state = Source.read_state(state_path) 59 else: 60 catalog = None 61 state = [] 62 63 if "__injected_declarative_manifest" not in config: 64 raise ValueError( 65 f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" 66 ) 67 68 return command, config, catalog, state
def
handle_connector_builder_request( source: airbyte_cdk.ManifestDeclarativeSource, command: str, config: Mapping[str, Any], catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: airbyte_cdk.connector_builder.connector_builder_handler.TestLimits) -> airbyte_cdk.AirbyteMessage:
71def handle_connector_builder_request( 72 source: ManifestDeclarativeSource, 73 command: str, 74 config: Mapping[str, Any], 75 catalog: Optional[ConfiguredAirbyteCatalog], 76 state: List[AirbyteStateMessage], 77 limits: TestLimits, 78) -> AirbyteMessage: 79 if command == "resolve_manifest": 80 return resolve_manifest(source) 81 elif command == "test_read": 82 assert catalog is not None, ( 83 "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." 84 ) 85 return read_stream(source, config, catalog, state, limits) 86 elif command == "full_resolve_manifest": 87 return full_resolve_manifest(source, limits) 88 else: 89 raise ValueError(f"Unrecognized command {command}.")
def
handle_request(args: List[str]) -> str:
92def handle_request(args: List[str]) -> str: 93 command, config, catalog, state = get_config_and_catalog_from_args(args) 94 limits = get_limits(config) 95 source = create_source(config, limits) 96 return orjson.dumps( 97 AirbyteMessageSerializer.dump( 98 handle_connector_builder_request(source, command, config, catalog, state, limits) 99 ) 100 ).decode() # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage