airbyte_cdk.connector_builder.connector_builder_handler

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5
  6from dataclasses import asdict, dataclass, field
  7from typing import Any, ClassVar, Dict, List, Mapping
  8
  9from airbyte_cdk.connector_builder.test_reader import TestReader
 10from airbyte_cdk.models import (
 11    AirbyteMessage,
 12    AirbyteRecordMessage,
 13    AirbyteStateMessage,
 14    ConfiguredAirbyteCatalog,
 15    Type,
 16)
 17from airbyte_cdk.models import Type as MessageType
 18from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
 19from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
 20from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 21    ModelToComponentFactory,
 22)
 23from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
 24from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
 25from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 26
 27DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
 28DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
 29DEFAULT_MAXIMUM_RECORDS = 100
 30DEFAULT_MAXIMUM_STREAMS = 100
 31
 32MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice"
 33MAX_SLICES_KEY = "max_slices"
 34MAX_RECORDS_KEY = "max_records"
 35MAX_STREAMS_KEY = "max_streams"
 36
 37
 38@dataclass
 39class TestLimits:
 40    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 41
 42    max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
 43    max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
 44    max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
 45    max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)
 46
 47
 48def get_limits(config: Mapping[str, Any]) -> TestLimits:
 49    command_config = config.get("__test_read_config", {})
 50    max_pages_per_slice = (
 51        command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
 52    )
 53    max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
 54    max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
 55    max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS
 56    return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
 57
 58
 59def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
 60    """
 61    Determines whether the manifest should be migrated,
 62    based on the presence of the "__should_migrate" key in the config.
 63
 64    This flag is set by the UI.
 65    """
 66    return config.get("__should_migrate", False)
 67
 68
 69def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
 70    """
 71    Check if the manifest should be normalized.
 72    :param config: The configuration to check
 73    :return: True if the manifest should be normalized, False otherwise.
 74    """
 75    return config.get("__should_normalize", False)
 76
 77
 78def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
 79    manifest = config["__injected_declarative_manifest"]
 80    return ManifestDeclarativeSource(
 81        config=config,
 82        emit_connector_builder_messages=True,
 83        source_config=manifest,
 84        migrate_manifest=should_migrate_manifest(config),
 85        normalize_manifest=should_normalize_manifest(config),
 86        component_factory=ModelToComponentFactory(
 87            emit_connector_builder_messages=True,
 88            limit_pages_fetched_per_slice=limits.max_pages_per_slice,
 89            limit_slices_fetched=limits.max_slices,
 90            disable_retries=True,
 91            disable_cache=True,
 92        ),
 93    )
 94
 95
 96def read_stream(
 97    source: DeclarativeSource,
 98    config: Mapping[str, Any],
 99    configured_catalog: ConfiguredAirbyteCatalog,
100    state: List[AirbyteStateMessage],
101    limits: TestLimits,
102) -> AirbyteMessage:
103    try:
104        test_read_handler = TestReader(
105            limits.max_pages_per_slice, limits.max_slices, limits.max_records
106        )
107        # The connector builder only supports a single stream
108        stream_name = configured_catalog.streams[0].stream.name
109
110        stream_read = test_read_handler.run_test_read(
111            source, config, configured_catalog, state, limits.max_records
112        )
113
114        return AirbyteMessage(
115            type=MessageType.RECORD,
116            record=AirbyteRecordMessage(
117                data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
118            ),
119        )
120    except Exception as exc:
121        error = AirbyteTracedException.from_exception(
122            exc,
123            message=filter_secrets(
124                f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
125            ),
126        )
127        return error.as_airbyte_message()
128
129
130def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
131    try:
132        return AirbyteMessage(
133            type=Type.RECORD,
134            record=AirbyteRecordMessage(
135                data={"manifest": source.resolved_manifest},
136                emitted_at=_emitted_at(),
137                stream="resolve_manifest",
138            ),
139        )
140    except Exception as exc:
141        error = AirbyteTracedException.from_exception(
142            exc, message=f"Error resolving manifest: {str(exc)}"
143        )
144        return error.as_airbyte_message()
145
146
147def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
148    try:
149        manifest = {**source.resolved_manifest}
150        streams = manifest.get("streams", [])
151        for stream in streams:
152            stream["dynamic_stream_name"] = None
153
154        mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
155        for stream in source.dynamic_streams:
156            generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
157
158            if len(generated_streams) < limits.max_streams:
159                generated_streams += [stream]
160
161        for generated_streams_list in mapped_streams.values():
162            streams.extend(generated_streams_list)
163
164        manifest["streams"] = streams
165        return AirbyteMessage(
166            type=Type.RECORD,
167            record=AirbyteRecordMessage(
168                data={"manifest": manifest},
169                emitted_at=_emitted_at(),
170                stream="full_resolve_manifest",
171            ),
172        )
173    except AirbyteTracedException as exc:
174        return exc.as_airbyte_message()
175    except Exception as exc:
176        error = AirbyteTracedException.from_exception(
177            exc, message=f"Error full resolving manifest: {str(exc)}"
178        )
179        return error.as_airbyte_message()
180
181
182def _emitted_at() -> int:
183    return ab_datetime_now().to_epoch_millis()
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
DEFAULT_MAXIMUM_RECORDS = 100
DEFAULT_MAXIMUM_STREAMS = 100
MAX_PAGES_PER_SLICE_KEY = 'max_pages_per_slice'
MAX_SLICES_KEY = 'max_slices'
MAX_RECORDS_KEY = 'max_records'
MAX_STREAMS_KEY = 'max_streams'
@dataclass
class TestLimits:
39@dataclass
40class TestLimits:
41    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
42
43    max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
44    max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
45    max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
46    max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)
TestLimits( max_records: int = 100, max_pages_per_slice: int = 5, max_slices: int = 5, max_streams: int = 100)
max_records: int = 100
max_pages_per_slice: int = 5
max_slices: int = 5
max_streams: int = 100
def get_limits( config: Mapping[str, Any]) -> TestLimits:
49def get_limits(config: Mapping[str, Any]) -> TestLimits:
50    command_config = config.get("__test_read_config", {})
51    max_pages_per_slice = (
52        command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
53    )
54    max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
55    max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
56    max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS
57    return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
60def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
61    """
62    Determines whether the manifest should be migrated,
63    based on the presence of the "__should_migrate" key in the config.
64
65    This flag is set by the UI.
66    """
67    return config.get("__should_migrate", False)

Determines whether the manifest should be migrated, based on the presence of the "__should_migrate" key in the config.

This flag is set by the UI.

def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
70def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
71    """
72    Check if the manifest should be normalized.
73    :param config: The configuration to check
74    :return: True if the manifest should be normalized, False otherwise.
75    """
76    return config.get("__should_normalize", False)

Check if the manifest should be normalized.

Parameters
  • config: The configuration to check
Returns

True if the manifest should be normalized, False otherwise.

def create_source( config: Mapping[str, Any], limits: TestLimits) -> airbyte_cdk.ManifestDeclarativeSource:
79def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
80    manifest = config["__injected_declarative_manifest"]
81    return ManifestDeclarativeSource(
82        config=config,
83        emit_connector_builder_messages=True,
84        source_config=manifest,
85        migrate_manifest=should_migrate_manifest(config),
86        normalize_manifest=should_normalize_manifest(config),
87        component_factory=ModelToComponentFactory(
88            emit_connector_builder_messages=True,
89            limit_pages_fetched_per_slice=limits.max_pages_per_slice,
90            limit_slices_fetched=limits.max_slices,
91            disable_retries=True,
92            disable_cache=True,
93        ),
94    )
def read_stream( source: airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: TestLimits) -> airbyte_cdk.AirbyteMessage:
 97def read_stream(
 98    source: DeclarativeSource,
 99    config: Mapping[str, Any],
100    configured_catalog: ConfiguredAirbyteCatalog,
101    state: List[AirbyteStateMessage],
102    limits: TestLimits,
103) -> AirbyteMessage:
104    try:
105        test_read_handler = TestReader(
106            limits.max_pages_per_slice, limits.max_slices, limits.max_records
107        )
108        # The connector builder only supports a single stream
109        stream_name = configured_catalog.streams[0].stream.name
110
111        stream_read = test_read_handler.run_test_read(
112            source, config, configured_catalog, state, limits.max_records
113        )
114
115        return AirbyteMessage(
116            type=MessageType.RECORD,
117            record=AirbyteRecordMessage(
118                data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
119            ),
120        )
121    except Exception as exc:
122        error = AirbyteTracedException.from_exception(
123            exc,
124            message=filter_secrets(
125                f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
126            ),
127        )
128        return error.as_airbyte_message()
def resolve_manifest( source: airbyte_cdk.ManifestDeclarativeSource) -> airbyte_cdk.AirbyteMessage:
131def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
132    try:
133        return AirbyteMessage(
134            type=Type.RECORD,
135            record=AirbyteRecordMessage(
136                data={"manifest": source.resolved_manifest},
137                emitted_at=_emitted_at(),
138                stream="resolve_manifest",
139            ),
140        )
141    except Exception as exc:
142        error = AirbyteTracedException.from_exception(
143            exc, message=f"Error resolving manifest: {str(exc)}"
144        )
145        return error.as_airbyte_message()
def full_resolve_manifest( source: airbyte_cdk.ManifestDeclarativeSource, limits: TestLimits) -> airbyte_cdk.AirbyteMessage:
148def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
149    try:
150        manifest = {**source.resolved_manifest}
151        streams = manifest.get("streams", [])
152        for stream in streams:
153            stream["dynamic_stream_name"] = None
154
155        mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
156        for stream in source.dynamic_streams:
157            generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
158
159            if len(generated_streams) < limits.max_streams:
160                generated_streams += [stream]
161
162        for generated_streams_list in mapped_streams.values():
163            streams.extend(generated_streams_list)
164
165        manifest["streams"] = streams
166        return AirbyteMessage(
167            type=Type.RECORD,
168            record=AirbyteRecordMessage(
169                data={"manifest": manifest},
170                emitted_at=_emitted_at(),
171                stream="full_resolve_manifest",
172            ),
173        )
174    except AirbyteTracedException as exc:
175        return exc.as_airbyte_message()
176    except Exception as exc:
177        error = AirbyteTracedException.from_exception(
178            exc, message=f"Error full resolving manifest: {str(exc)}"
179        )
180        return error.as_airbyte_message()