airbyte_cdk.connector_builder.connector_builder_handler

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5
  6from dataclasses import asdict
  7from typing import Any, Dict, List, Mapping, Optional
  8
  9from airbyte_cdk.connector_builder.test_reader import TestReader
 10from airbyte_cdk.models import (
 11    AirbyteMessage,
 12    AirbyteRecordMessage,
 13    AirbyteStateMessage,
 14    ConfiguredAirbyteCatalog,
 15    Type,
 16)
 17from airbyte_cdk.models import Type as MessageType
 18from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
 19    ConcurrentDeclarativeSource,
 20    TestLimits,
 21)
 22from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
 23from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
 24from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 25
 26MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice"
 27MAX_SLICES_KEY = "max_slices"
 28MAX_RECORDS_KEY = "max_records"
 29MAX_STREAMS_KEY = "max_streams"
 30
 31
 32def get_limits(config: Mapping[str, Any]) -> TestLimits:
 33    command_config = config.get("__test_read_config", {})
 34    return TestLimits(
 35        max_records=command_config.get(MAX_RECORDS_KEY, TestLimits.DEFAULT_MAX_RECORDS),
 36        max_pages_per_slice=command_config.get(
 37            MAX_PAGES_PER_SLICE_KEY, TestLimits.DEFAULT_MAX_PAGES_PER_SLICE
 38        ),
 39        max_slices=command_config.get(MAX_SLICES_KEY, TestLimits.DEFAULT_MAX_SLICES),
 40        max_streams=command_config.get(MAX_STREAMS_KEY, TestLimits.DEFAULT_MAX_STREAMS),
 41    )
 42
 43
 44def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
 45    """
 46    Determines whether the manifest should be migrated,
 47    based on the presence of the "__should_migrate" key in the config.
 48
 49    This flag is set by the UI.
 50    """
 51    return config.get("__should_migrate", False)
 52
 53
 54def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
 55    """
 56    Check if the manifest should be normalized.
 57    :param config: The configuration to check
 58    :return: True if the manifest should be normalized, False otherwise.
 59    """
 60    return config.get("__should_normalize", False)
 61
 62
 63def create_source(
 64    config: Mapping[str, Any],
 65    limits: TestLimits | None = None,
 66    catalog: ConfiguredAirbyteCatalog | None = None,
 67    state: List[AirbyteStateMessage] | None = None,
 68) -> ConcurrentDeclarativeSource:
 69    manifest = config["__injected_declarative_manifest"]
 70
 71    # We enforce a concurrency level of 1 so that the stream is processed on a single thread
 72    # to retain ordering for the grouping of the builder message responses.
 73    if "concurrency_level" in manifest:
 74        manifest["concurrency_level"]["default_concurrency"] = 1
 75    else:
 76        manifest["concurrency_level"] = {"type": "ConcurrencyLevel", "default_concurrency": 1}
 77
 78    return ConcurrentDeclarativeSource(
 79        catalog=catalog,
 80        config=config,
 81        state=state,
 82        source_config=manifest,
 83        emit_connector_builder_messages=True,
 84        migrate_manifest=should_migrate_manifest(config),
 85        normalize_manifest=should_normalize_manifest(config),
 86        limits=limits,
 87    )
 88
 89
 90def read_stream(
 91    source: ConcurrentDeclarativeSource,
 92    config: Mapping[str, Any],
 93    configured_catalog: ConfiguredAirbyteCatalog,
 94    state: List[AirbyteStateMessage],
 95    limits: TestLimits,
 96) -> AirbyteMessage:
 97    try:
 98        test_read_handler = TestReader(
 99            limits.max_pages_per_slice, limits.max_slices, limits.max_records
100        )
101        # The connector builder only supports a single stream
102        stream_name = configured_catalog.streams[0].stream.name
103
104        stream_read = test_read_handler.run_test_read(
105            source,
106            config,
107            configured_catalog,
108            stream_name,
109            state,
110            limits.max_records,
111        )
112
113        return AirbyteMessage(
114            type=MessageType.RECORD,
115            record=AirbyteRecordMessage(
116                data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
117            ),
118        )
119    except Exception as exc:
120        error = AirbyteTracedException.from_exception(
121            exc,
122            message=filter_secrets(
123                f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
124            ),
125        )
126        return error.as_airbyte_message()
127
128
129def resolve_manifest(
130    source: ConcurrentDeclarativeSource,
131) -> AirbyteMessage:
132    try:
133        return AirbyteMessage(
134            type=Type.RECORD,
135            record=AirbyteRecordMessage(
136                data={"manifest": source.resolved_manifest},
137                emitted_at=_emitted_at(),
138                stream="resolve_manifest",
139            ),
140        )
141    except Exception as exc:
142        error = AirbyteTracedException.from_exception(
143            exc, message=f"Error resolving manifest: {str(exc)}"
144        )
145        return error.as_airbyte_message()
146
147
148def full_resolve_manifest(
149    source: ConcurrentDeclarativeSource, limits: TestLimits
150) -> AirbyteMessage:
151    try:
152        manifest = {**source.resolved_manifest}
153        streams = manifest.get("streams", [])
154        for stream in streams:
155            stream["dynamic_stream_name"] = None
156
157        mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
158        for stream in source.dynamic_streams:
159            generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
160
161            if len(generated_streams) < limits.max_streams:
162                generated_streams += [stream]
163
164        for generated_streams_list in mapped_streams.values():
165            streams.extend(generated_streams_list)
166
167        manifest["streams"] = streams
168        return AirbyteMessage(
169            type=Type.RECORD,
170            record=AirbyteRecordMessage(
171                data={"manifest": manifest},
172                emitted_at=_emitted_at(),
173                stream="full_resolve_manifest",
174            ),
175        )
176    except AirbyteTracedException as exc:
177        return exc.as_airbyte_message()
178    except Exception as exc:
179        error = AirbyteTracedException.from_exception(
180            exc, message=f"Error full resolving manifest: {str(exc)}"
181        )
182        return error.as_airbyte_message()
183
184
185def _emitted_at() -> int:
186    return ab_datetime_now().to_epoch_millis()
MAX_PAGES_PER_SLICE_KEY = 'max_pages_per_slice'
MAX_SLICES_KEY = 'max_slices'
MAX_RECORDS_KEY = 'max_records'
MAX_STREAMS_KEY = 'max_streams'
def get_limits( config: Mapping[str, Any]) -> airbyte_cdk.sources.declarative.concurrent_declarative_source.TestLimits:
33def get_limits(config: Mapping[str, Any]) -> TestLimits:
34    command_config = config.get("__test_read_config", {})
35    return TestLimits(
36        max_records=command_config.get(MAX_RECORDS_KEY, TestLimits.DEFAULT_MAX_RECORDS),
37        max_pages_per_slice=command_config.get(
38            MAX_PAGES_PER_SLICE_KEY, TestLimits.DEFAULT_MAX_PAGES_PER_SLICE
39        ),
40        max_slices=command_config.get(MAX_SLICES_KEY, TestLimits.DEFAULT_MAX_SLICES),
41        max_streams=command_config.get(MAX_STREAMS_KEY, TestLimits.DEFAULT_MAX_STREAMS),
42    )
def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
45def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
46    """
47    Determines whether the manifest should be migrated,
48    based on the presence of the "__should_migrate" key in the config.
49
50    This flag is set by the UI.
51    """
52    return config.get("__should_migrate", False)

Determines whether the manifest should be migrated, based on the presence of the "__should_migrate" key in the config.

This flag is set by the UI.

def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
55def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
56    """
57    Check if the manifest should be normalized.
58    :param config: The configuration to check
59    :return: True if the manifest should be normalized, False otherwise.
60    """
61    return config.get("__should_normalize", False)

Check if the manifest should be normalized.

Parameters
  • config: The configuration to check
Returns

True if the manifest should be normalized, False otherwise.

def create_source( config: Mapping[str, Any], limits: airbyte_cdk.sources.declarative.concurrent_declarative_source.TestLimits | None = None, catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog | None = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource:
64def create_source(
65    config: Mapping[str, Any],
66    limits: TestLimits | None = None,
67    catalog: ConfiguredAirbyteCatalog | None = None,
68    state: List[AirbyteStateMessage] | None = None,
69) -> ConcurrentDeclarativeSource:
70    manifest = config["__injected_declarative_manifest"]
71
72    # We enforce a concurrency level of 1 so that the stream is processed on a single thread
73    # to retain ordering for the grouping of the builder message responses.
74    if "concurrency_level" in manifest:
75        manifest["concurrency_level"]["default_concurrency"] = 1
76    else:
77        manifest["concurrency_level"] = {"type": "ConcurrencyLevel", "default_concurrency": 1}
78
79    return ConcurrentDeclarativeSource(
80        catalog=catalog,
81        config=config,
82        state=state,
83        source_config=manifest,
84        emit_connector_builder_messages=True,
85        migrate_manifest=should_migrate_manifest(config),
86        normalize_manifest=should_normalize_manifest(config),
87        limits=limits,
88    )
def read_stream( source: airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: airbyte_cdk.sources.declarative.concurrent_declarative_source.TestLimits) -> airbyte_cdk.AirbyteMessage:
 91def read_stream(
 92    source: ConcurrentDeclarativeSource,
 93    config: Mapping[str, Any],
 94    configured_catalog: ConfiguredAirbyteCatalog,
 95    state: List[AirbyteStateMessage],
 96    limits: TestLimits,
 97) -> AirbyteMessage:
 98    try:
 99        test_read_handler = TestReader(
100            limits.max_pages_per_slice, limits.max_slices, limits.max_records
101        )
102        # The connector builder only supports a single stream
103        stream_name = configured_catalog.streams[0].stream.name
104
105        stream_read = test_read_handler.run_test_read(
106            source,
107            config,
108            configured_catalog,
109            stream_name,
110            state,
111            limits.max_records,
112        )
113
114        return AirbyteMessage(
115            type=MessageType.RECORD,
116            record=AirbyteRecordMessage(
117                data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
118            ),
119        )
120    except Exception as exc:
121        error = AirbyteTracedException.from_exception(
122            exc,
123            message=filter_secrets(
124                f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
125            ),
126        )
127        return error.as_airbyte_message()
130def resolve_manifest(
131    source: ConcurrentDeclarativeSource,
132) -> AirbyteMessage:
133    try:
134        return AirbyteMessage(
135            type=Type.RECORD,
136            record=AirbyteRecordMessage(
137                data={"manifest": source.resolved_manifest},
138                emitted_at=_emitted_at(),
139                stream="resolve_manifest",
140            ),
141        )
142    except Exception as exc:
143        error = AirbyteTracedException.from_exception(
144            exc, message=f"Error resolving manifest: {str(exc)}"
145        )
146        return error.as_airbyte_message()
149def full_resolve_manifest(
150    source: ConcurrentDeclarativeSource, limits: TestLimits
151) -> AirbyteMessage:
152    try:
153        manifest = {**source.resolved_manifest}
154        streams = manifest.get("streams", [])
155        for stream in streams:
156            stream["dynamic_stream_name"] = None
157
158        mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
159        for stream in source.dynamic_streams:
160            generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
161
162            if len(generated_streams) < limits.max_streams:
163                generated_streams += [stream]
164
165        for generated_streams_list in mapped_streams.values():
166            streams.extend(generated_streams_list)
167
168        manifest["streams"] = streams
169        return AirbyteMessage(
170            type=Type.RECORD,
171            record=AirbyteRecordMessage(
172                data={"manifest": manifest},
173                emitted_at=_emitted_at(),
174                stream="full_resolve_manifest",
175            ),
176        )
177    except AirbyteTracedException as exc:
178        return exc.as_airbyte_message()
179    except Exception as exc:
180        error = AirbyteTracedException.from_exception(
181            exc, message=f"Error full resolving manifest: {str(exc)}"
182        )
183        return error.as_airbyte_message()