airbyte_cdk.connector_builder.main

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5
  6import sys
  7from typing import Any, List, Mapping, Optional, Tuple
  8
  9import orjson
 10
 11from airbyte_cdk.connector import BaseConnector
 12from airbyte_cdk.connector_builder.connector_builder_handler import (
 13    TestLimits,
 14    create_source,
 15    full_resolve_manifest,
 16    get_limits,
 17    read_stream,
 18    resolve_manifest,
 19)
 20from airbyte_cdk.entrypoint import AirbyteEntrypoint
 21from airbyte_cdk.models import (
 22    AirbyteMessage,
 23    AirbyteMessageSerializer,
 24    AirbyteStateMessage,
 25    ConfiguredAirbyteCatalog,
 26    ConfiguredAirbyteCatalogSerializer,
 27)
 28from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
 29from airbyte_cdk.sources.source import Source
 30from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 31
 32
 33def get_config_and_catalog_from_args(
 34    args: List[str],
 35) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
 36    # TODO: Add functionality for the `debug` logger.
 37    #  Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
 38    parsed_args = AirbyteEntrypoint.parse_args(args)
 39    config_path, catalog_path, state_path = (
 40        parsed_args.config,
 41        parsed_args.catalog,
 42        parsed_args.state,
 43    )
 44    if parsed_args.command != "read":
 45        raise ValueError("Only read commands are allowed for Connector Builder requests.")
 46
 47    config = BaseConnector.read_config(config_path)
 48
 49    if "__command" not in config:
 50        raise ValueError(
 51            f"Invalid config: `__command` should be provided at the root of the config but config only has keys {list(config.keys())}"
 52        )
 53
 54    command = config["__command"]
 55    if command == "test_read":
 56        catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path))
 57        state = Source.read_state(state_path)
 58    else:
 59        catalog = None
 60        state = []
 61
 62    if "__injected_declarative_manifest" not in config:
 63        raise ValueError(
 64            f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
 65        )
 66
 67    return command, config, catalog, state
 68
 69
 70def handle_connector_builder_request(
 71    source: ManifestDeclarativeSource,
 72    command: str,
 73    config: Mapping[str, Any],
 74    catalog: Optional[ConfiguredAirbyteCatalog],
 75    state: List[AirbyteStateMessage],
 76    limits: TestLimits,
 77) -> AirbyteMessage:
 78    if command == "resolve_manifest":
 79        return resolve_manifest(source)
 80    elif command == "test_read":
 81        assert catalog is not None, (
 82            "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
 83        )
 84        return read_stream(source, config, catalog, state, limits)
 85    elif command == "full_resolve_manifest":
 86        return full_resolve_manifest(source, limits)
 87    else:
 88        raise ValueError(f"Unrecognized command {command}.")
 89
 90
 91def handle_request(args: List[str]) -> str:
 92    command, config, catalog, state = get_config_and_catalog_from_args(args)
 93    limits = get_limits(config)
 94    source = create_source(config, limits)
 95    return orjson.dumps(
 96        AirbyteMessageSerializer.dump(
 97            handle_connector_builder_request(source, command, config, catalog, state, limits)
 98        )
 99    ).decode()  # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage
100
101
102if __name__ == "__main__":
103    try:
104        print(handle_request(sys.argv[1:]))
105    except Exception as exc:
106        error = AirbyteTracedException.from_exception(
107            exc, message=f"Error handling request: {str(exc)}"
108        )
109        m = error.as_airbyte_message()
110        print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
def get_config_and_catalog_from_args( args: List[str]) -> Tuple[str, Mapping[str, Any], Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], Any]:
34def get_config_and_catalog_from_args(
35    args: List[str],
36) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
37    # TODO: Add functionality for the `debug` logger.
38    #  Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
39    parsed_args = AirbyteEntrypoint.parse_args(args)
40    config_path, catalog_path, state_path = (
41        parsed_args.config,
42        parsed_args.catalog,
43        parsed_args.state,
44    )
45    if parsed_args.command != "read":
46        raise ValueError("Only read commands are allowed for Connector Builder requests.")
47
48    config = BaseConnector.read_config(config_path)
49
50    if "__command" not in config:
51        raise ValueError(
52            f"Invalid config: `__command` should be provided at the root of the config but config only has keys {list(config.keys())}"
53        )
54
55    command = config["__command"]
56    if command == "test_read":
57        catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path))
58        state = Source.read_state(state_path)
59    else:
60        catalog = None
61        state = []
62
63    if "__injected_declarative_manifest" not in config:
64        raise ValueError(
65            f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
66        )
67
68    return command, config, catalog, state
def handle_connector_builder_request( source: airbyte_cdk.ManifestDeclarativeSource, command: str, config: Mapping[str, Any], catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog], state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: airbyte_cdk.connector_builder.connector_builder_handler.TestLimits) -> airbyte_cdk.AirbyteMessage:
71def handle_connector_builder_request(
72    source: ManifestDeclarativeSource,
73    command: str,
74    config: Mapping[str, Any],
75    catalog: Optional[ConfiguredAirbyteCatalog],
76    state: List[AirbyteStateMessage],
77    limits: TestLimits,
78) -> AirbyteMessage:
79    if command == "resolve_manifest":
80        return resolve_manifest(source)
81    elif command == "test_read":
82        assert catalog is not None, (
83            "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
84        )
85        return read_stream(source, config, catalog, state, limits)
86    elif command == "full_resolve_manifest":
87        return full_resolve_manifest(source, limits)
88    else:
89        raise ValueError(f"Unrecognized command {command}.")
def handle_request(args: List[str]) -> str:
 92def handle_request(args: List[str]) -> str:
 93    command, config, catalog, state = get_config_and_catalog_from_args(args)
 94    limits = get_limits(config)
 95    source = create_source(config, limits)
 96    return orjson.dumps(
 97        AirbyteMessageSerializer.dump(
 98            handle_connector_builder_request(source, command, config, catalog, state, limits)
 99        )
100    ).decode()  # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage