airbyte_cdk.connector_builder.main
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6import sys 7from typing import Any, List, Mapping, Optional, Tuple 8 9import orjson 10 11from airbyte_cdk.connector import BaseConnector 12from airbyte_cdk.connector_builder.connector_builder_handler import ( 13 TestLimits, 14 create_source, 15 full_resolve_manifest, 16 get_limits, 17 read_stream, 18 resolve_manifest, 19) 20from airbyte_cdk.entrypoint import AirbyteEntrypoint 21from airbyte_cdk.models import ( 22 AirbyteMessage, 23 AirbyteMessageSerializer, 24 AirbyteStateMessage, 25 ConfiguredAirbyteCatalog, 26 ConfiguredAirbyteCatalogSerializer, 27) 28from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( 29 ConcurrentDeclarativeSource, 30) 31from airbyte_cdk.sources.source import Source 32from airbyte_cdk.utils.traced_exception import AirbyteTracedException 33 34 35def get_config_and_catalog_from_args( 36 args: List[str], 37) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], List[AirbyteStateMessage]]: 38 # TODO: Add functionality for the `debug` logger. 39 # Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`. 40 parsed_args = AirbyteEntrypoint.parse_args(args) 41 config_path, catalog_path, state_path = ( 42 parsed_args.config, 43 parsed_args.catalog, 44 parsed_args.state, 45 ) 46 if parsed_args.command != "read": 47 raise ValueError("Only read commands are allowed for Connector Builder requests.") 48 49 config = BaseConnector.read_config(config_path) 50 51 if "__command" not in config: 52 raise ValueError( 53 f"Invalid config: `__command` should be provided at the root of the config but config only has keys {list(config.keys())}" 54 ) 55 56 command = config["__command"] 57 if command == "test_read": 58 catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path)) 59 state = Source.read_state(state_path) 60 else: 61 catalog = None 62 state = [] 63 64 if "__injected_declarative_manifest" not in config: 65 raise ValueError( 66 f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" 67 ) 68 69 return command, config, catalog, state 70 71 72def handle_connector_builder_request( 73 source: ConcurrentDeclarativeSource, 74 command: str, 75 config: Mapping[str, Any], 76 catalog: Optional[ConfiguredAirbyteCatalog], 77 state: List[AirbyteStateMessage], 78 limits: TestLimits, 79) -> AirbyteMessage: 80 if command == "resolve_manifest": 81 return resolve_manifest(source) 82 elif command == "test_read": 83 assert catalog is not None, ( 84 "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." 85 ) 86 return read_stream(source, config, catalog, state, limits) 87 elif command == "full_resolve_manifest": 88 return full_resolve_manifest(source, limits) 89 else: 90 raise ValueError(f"Unrecognized command {command}.") 91 92 93def handle_request(args: List[str]) -> str: 94 command, config, catalog, state = get_config_and_catalog_from_args(args) 95 limits = get_limits(config) 96 source = create_source(config=config, limits=limits, catalog=catalog, state=state) 97 return orjson.dumps( # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage 98 AirbyteMessageSerializer.dump( 99 handle_connector_builder_request(source, command, config, catalog, state, limits) 100 ) 101 ).decode() 102 103 104if __name__ == "__main__": 105 try: 106 print(handle_request(sys.argv[1:])) 107 except Exception as exc: 108 error = AirbyteTracedException.from_exception( 109 exc, message=f"Error handling request: {str(exc)}" 110 ) 111 m = error.as_airbyte_message() 112 print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
def
get_config_and_catalog_from_args( args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]]:
36def get_config_and_catalog_from_args( 37 args: List[str], 38) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], List[AirbyteStateMessage]]: 39 # TODO: Add functionality for the `debug` logger. 40 # Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`. 41 parsed_args = AirbyteEntrypoint.parse_args(args) 42 config_path, catalog_path, state_path = ( 43 parsed_args.config, 44 parsed_args.catalog, 45 parsed_args.state, 46 ) 47 if parsed_args.command != "read": 48 raise ValueError("Only read commands are allowed for Connector Builder requests.") 49 50 config = BaseConnector.read_config(config_path) 51 52 if "__command" not in config: 53 raise ValueError( 54 f"Invalid config: `__command` should be provided at the root of the config but config only has keys {list(config.keys())}" 55 ) 56 57 command = config["__command"] 58 if command == "test_read": 59 catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path)) 60 state = Source.read_state(state_path) 61 else: 62 catalog = None 63 state = [] 64 65 if "__injected_declarative_manifest" not in config: 66 raise ValueError( 67 f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}" 68 ) 69 70 return command, config, catalog, state
def
handle_connector_builder_request( source: airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource, command: str, config: Mapping[str, Any], catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: airbyte_cdk.sources.declarative.concurrent_declarative_source.TestLimits) -> airbyte_cdk.AirbyteMessage:
73def handle_connector_builder_request( 74 source: ConcurrentDeclarativeSource, 75 command: str, 76 config: Mapping[str, Any], 77 catalog: Optional[ConfiguredAirbyteCatalog], 78 state: List[AirbyteStateMessage], 79 limits: TestLimits, 80) -> AirbyteMessage: 81 if command == "resolve_manifest": 82 return resolve_manifest(source) 83 elif command == "test_read": 84 assert catalog is not None, ( 85 "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None." 86 ) 87 return read_stream(source, config, catalog, state, limits) 88 elif command == "full_resolve_manifest": 89 return full_resolve_manifest(source, limits) 90 else: 91 raise ValueError(f"Unrecognized command {command}.")
def
handle_request(args: List[str]) -> str:
94def handle_request(args: List[str]) -> str: 95 command, config, catalog, state = get_config_and_catalog_from_args(args) 96 limits = get_limits(config) 97 source = create_source(config=config, limits=limits, catalog=catalog, state=state) 98 return orjson.dumps( # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage 99 AirbyteMessageSerializer.dump( 100 handle_connector_builder_request(source, command, config, catalog, state, limits) 101 ) 102 ).decode()