airbyte_cdk.connector_builder.main

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5
  6import sys
  7from typing import Any, List, Mapping, Optional, Tuple
  8
  9import orjson
 10
 11from airbyte_cdk.connector import BaseConnector
 12from airbyte_cdk.connector_builder.connector_builder_handler import (
 13    TestLimits,
 14    create_source,
 15    full_resolve_manifest,
 16    get_limits,
 17    read_stream,
 18    resolve_manifest,
 19)
 20from airbyte_cdk.entrypoint import AirbyteEntrypoint
 21from airbyte_cdk.models import (
 22    AirbyteMessage,
 23    AirbyteMessageSerializer,
 24    AirbyteStateMessage,
 25    ConfiguredAirbyteCatalog,
 26    ConfiguredAirbyteCatalogSerializer,
 27)
 28from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
 29    ConcurrentDeclarativeSource,
 30)
 31from airbyte_cdk.sources.source import Source
 32from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 33
 34
 35def get_config_and_catalog_from_args(
 36    args: List[str],
 37) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], List[AirbyteStateMessage]]:
 38    # TODO: Add functionality for the `debug` logger.
 39    #  Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
 40    parsed_args = AirbyteEntrypoint.parse_args(args)
 41    config_path, catalog_path, state_path = (
 42        parsed_args.config,
 43        parsed_args.catalog,
 44        parsed_args.state,
 45    )
 46    if parsed_args.command != "read":
 47        raise ValueError("Only read commands are allowed for Connector Builder requests.")
 48
 49    config = BaseConnector.read_config(config_path)
 50
 51    if "__command" not in config:
 52        raise ValueError(
 53            f"Invalid config: `__command` should be provided at the root of the config but config only has keys {list(config.keys())}"
 54        )
 55
 56    command = config["__command"]
 57    if command == "test_read":
 58        catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path))
 59        state = Source.read_state(state_path)
 60    else:
 61        catalog = None
 62        state = []
 63
 64    if "__injected_declarative_manifest" not in config:
 65        raise ValueError(
 66            f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
 67        )
 68
 69    return command, config, catalog, state
 70
 71
 72def handle_connector_builder_request(
 73    source: ConcurrentDeclarativeSource,
 74    command: str,
 75    config: Mapping[str, Any],
 76    catalog: Optional[ConfiguredAirbyteCatalog],
 77    state: List[AirbyteStateMessage],
 78    limits: TestLimits,
 79) -> AirbyteMessage:
 80    if command == "resolve_manifest":
 81        return resolve_manifest(source)
 82    elif command == "test_read":
 83        assert catalog is not None, (
 84            "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
 85        )
 86        return read_stream(source, config, catalog, state, limits)
 87    elif command == "full_resolve_manifest":
 88        return full_resolve_manifest(source, limits)
 89    else:
 90        raise ValueError(f"Unrecognized command {command}.")
 91
 92
 93def handle_request(args: List[str]) -> str:
 94    command, config, catalog, state = get_config_and_catalog_from_args(args)
 95    limits = get_limits(config)
 96    source = create_source(config=config, limits=limits, catalog=catalog, state=state)
 97    return orjson.dumps(  # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage
 98        AirbyteMessageSerializer.dump(
 99            handle_connector_builder_request(source, command, config, catalog, state, limits)
100        )
101    ).decode()
102
103
104if __name__ == "__main__":
105    try:
106        print(handle_request(sys.argv[1:]))
107    except Exception as exc:
108        error = AirbyteTracedException.from_exception(
109            exc, message=f"Error handling request: {str(exc)}"
110        )
111        m = error.as_airbyte_message()
112        print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
def get_config_and_catalog_from_args( args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]]:
36def get_config_and_catalog_from_args(
37    args: List[str],
38) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], List[AirbyteStateMessage]]:
39    # TODO: Add functionality for the `debug` logger.
40    #  Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
41    parsed_args = AirbyteEntrypoint.parse_args(args)
42    config_path, catalog_path, state_path = (
43        parsed_args.config,
44        parsed_args.catalog,
45        parsed_args.state,
46    )
47    if parsed_args.command != "read":
48        raise ValueError("Only read commands are allowed for Connector Builder requests.")
49
50    config = BaseConnector.read_config(config_path)
51
52    if "__command" not in config:
53        raise ValueError(
54            f"Invalid config: `__command` should be provided at the root of the config but config only has keys {list(config.keys())}"
55        )
56
57    command = config["__command"]
58    if command == "test_read":
59        catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path))
60        state = Source.read_state(state_path)
61    else:
62        catalog = None
63        state = []
64
65    if "__injected_declarative_manifest" not in config:
66        raise ValueError(
67            f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
68        )
69
70    return command, config, catalog, state
def handle_connector_builder_request( source: airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource, command: str, config: Mapping[str, Any], catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: airbyte_cdk.sources.declarative.concurrent_declarative_source.TestLimits) -> airbyte_cdk.AirbyteMessage:
73def handle_connector_builder_request(
74    source: ConcurrentDeclarativeSource,
75    command: str,
76    config: Mapping[str, Any],
77    catalog: Optional[ConfiguredAirbyteCatalog],
78    state: List[AirbyteStateMessage],
79    limits: TestLimits,
80) -> AirbyteMessage:
81    if command == "resolve_manifest":
82        return resolve_manifest(source)
83    elif command == "test_read":
84        assert catalog is not None, (
85            "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
86        )
87        return read_stream(source, config, catalog, state, limits)
88    elif command == "full_resolve_manifest":
89        return full_resolve_manifest(source, limits)
90    else:
91        raise ValueError(f"Unrecognized command {command}.")
def handle_request(args: List[str]) -> str:
 94def handle_request(args: List[str]) -> str:
 95    command, config, catalog, state = get_config_and_catalog_from_args(args)
 96    limits = get_limits(config)
 97    source = create_source(config=config, limits=limits, catalog=catalog, state=state)
 98    return orjson.dumps(  # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage
 99        AirbyteMessageSerializer.dump(
100            handle_connector_builder_request(source, command, config, catalog, state, limits)
101        )
102    ).decode()