airbyte_cdk.connector_builder.connector_builder_handler

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5
  6from dataclasses import asdict, dataclass, field
  7from typing import Any, ClassVar, Dict, List, Mapping
  8
  9from airbyte_cdk.connector_builder.test_reader import TestReader
 10from airbyte_cdk.models import (
 11    AirbyteMessage,
 12    AirbyteRecordMessage,
 13    AirbyteStateMessage,
 14    ConfiguredAirbyteCatalog,
 15    Type,
 16)
 17from airbyte_cdk.models import Type as MessageType
 18from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
 19from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
 20from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 21    ModelToComponentFactory,
 22)
 23from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
 24from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
 25from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 26
 27DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
 28DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
 29DEFAULT_MAXIMUM_RECORDS = 100
 30DEFAULT_MAXIMUM_STREAMS = 100
 31
 32MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice"
 33MAX_SLICES_KEY = "max_slices"
 34MAX_RECORDS_KEY = "max_records"
 35MAX_STREAMS_KEY = "max_streams"
 36
 37
 38@dataclass
 39class TestLimits:
 40    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 41
 42    max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
 43    max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
 44    max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
 45    max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)
 46
 47
 48def get_limits(config: Mapping[str, Any]) -> TestLimits:
 49    command_config = config.get("__test_read_config", {})
 50    max_pages_per_slice = (
 51        command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
 52    )
 53    max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
 54    max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
 55    max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS
 56    return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
 57
 58
 59def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
 60    """
 61    Determines whether the manifest should be migrated,
 62    based on the presence of the "__should_migrate" key in the config.
 63
 64    This flag is set by the UI.
 65    """
 66    return config.get("__should_migrate", False)
 67
 68
 69def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
 70    """
 71    Check if the manifest should be normalized.
 72    :param config: The configuration to check
 73    :return: True if the manifest should be normalized, False otherwise.
 74    """
 75    return config.get("__should_normalize", False)
 76
 77
 78def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
 79    manifest = config["__injected_declarative_manifest"]
 80    return ManifestDeclarativeSource(
 81        config=config,
 82        emit_connector_builder_messages=True,
 83        source_config=manifest,
 84        migrate_manifest=should_migrate_manifest(config),
 85        normalize_manifest=should_normalize_manifest(config),
 86        component_factory=ModelToComponentFactory(
 87            emit_connector_builder_messages=True,
 88            limit_pages_fetched_per_slice=limits.max_pages_per_slice,
 89            limit_slices_fetched=limits.max_slices,
 90            disable_retries=True,
 91            disable_cache=True,
 92        ),
 93    )
 94
 95
 96def read_stream(
 97    source: DeclarativeSource,
 98    config: Mapping[str, Any],
 99    configured_catalog: ConfiguredAirbyteCatalog,
100    state: List[AirbyteStateMessage],
101    limits: TestLimits,
102) -> AirbyteMessage:
103    try:
104        test_read_handler = TestReader(
105            limits.max_pages_per_slice, limits.max_slices, limits.max_records
106        )
107        # The connector builder only supports a single stream
108        stream_name = configured_catalog.streams[0].stream.name
109
110        stream_read = test_read_handler.run_test_read(
111            source,
112            config,
113            configured_catalog,
114            stream_name,
115            state,
116            limits.max_records,
117        )
118
119        return AirbyteMessage(
120            type=MessageType.RECORD,
121            record=AirbyteRecordMessage(
122                data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
123            ),
124        )
125    except Exception as exc:
126        error = AirbyteTracedException.from_exception(
127            exc,
128            message=filter_secrets(
129                f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
130            ),
131        )
132        return error.as_airbyte_message()
133
134
135def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
136    try:
137        return AirbyteMessage(
138            type=Type.RECORD,
139            record=AirbyteRecordMessage(
140                data={"manifest": source.resolved_manifest},
141                emitted_at=_emitted_at(),
142                stream="resolve_manifest",
143            ),
144        )
145    except Exception as exc:
146        error = AirbyteTracedException.from_exception(
147            exc, message=f"Error resolving manifest: {str(exc)}"
148        )
149        return error.as_airbyte_message()
150
151
152def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
153    try:
154        manifest = {**source.resolved_manifest}
155        streams = manifest.get("streams", [])
156        for stream in streams:
157            stream["dynamic_stream_name"] = None
158
159        mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
160        for stream in source.dynamic_streams:
161            generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
162
163            if len(generated_streams) < limits.max_streams:
164                generated_streams += [stream]
165
166        for generated_streams_list in mapped_streams.values():
167            streams.extend(generated_streams_list)
168
169        manifest["streams"] = streams
170        return AirbyteMessage(
171            type=Type.RECORD,
172            record=AirbyteRecordMessage(
173                data={"manifest": manifest},
174                emitted_at=_emitted_at(),
175                stream="full_resolve_manifest",
176            ),
177        )
178    except AirbyteTracedException as exc:
179        return exc.as_airbyte_message()
180    except Exception as exc:
181        error = AirbyteTracedException.from_exception(
182            exc, message=f"Error full resolving manifest: {str(exc)}"
183        )
184        return error.as_airbyte_message()
185
186
187def _emitted_at() -> int:
188    return ab_datetime_now().to_epoch_millis()
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
DEFAULT_MAXIMUM_RECORDS = 100
DEFAULT_MAXIMUM_STREAMS = 100
MAX_PAGES_PER_SLICE_KEY = 'max_pages_per_slice'
MAX_SLICES_KEY = 'max_slices'
MAX_RECORDS_KEY = 'max_records'
MAX_STREAMS_KEY = 'max_streams'
@dataclass
class TestLimits:
39@dataclass
40class TestLimits:
41    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
42
43    max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
44    max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
45    max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
46    max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)
TestLimits( max_records: int = 100, max_pages_per_slice: int = 5, max_slices: int = 5, max_streams: int = 100)
max_records: int = 100
max_pages_per_slice: int = 5
max_slices: int = 5
max_streams: int = 100
def get_limits( config: Mapping[str, Any]) -> TestLimits:
49def get_limits(config: Mapping[str, Any]) -> TestLimits:
50    command_config = config.get("__test_read_config", {})
51    max_pages_per_slice = (
52        command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
53    )
54    max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
55    max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
56    max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS
57    return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
60def should_migrate_manifest(config: Mapping[str, Any]) -> bool:
61    """
62    Determines whether the manifest should be migrated,
63    based on the presence of the "__should_migrate" key in the config.
64
65    This flag is set by the UI.
66    """
67    return config.get("__should_migrate", False)

Determines whether the manifest should be migrated, based on the presence of the "__should_migrate" key in the config.

This flag is set by the UI.

def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
70def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
71    """
72    Check if the manifest should be normalized.
73    :param config: The configuration to check
74    :return: True if the manifest should be normalized, False otherwise.
75    """
76    return config.get("__should_normalize", False)

Check if the manifest should be normalized.

Parameters
  • config: The configuration to check
Returns

True if the manifest should be normalized, False otherwise.

def create_source( config: Mapping[str, Any], limits: TestLimits) -> airbyte_cdk.ManifestDeclarativeSource:
79def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
80    manifest = config["__injected_declarative_manifest"]
81    return ManifestDeclarativeSource(
82        config=config,
83        emit_connector_builder_messages=True,
84        source_config=manifest,
85        migrate_manifest=should_migrate_manifest(config),
86        normalize_manifest=should_normalize_manifest(config),
87        component_factory=ModelToComponentFactory(
88            emit_connector_builder_messages=True,
89            limit_pages_fetched_per_slice=limits.max_pages_per_slice,
90            limit_slices_fetched=limits.max_slices,
91            disable_retries=True,
92            disable_cache=True,
93        ),
94    )
def read_stream( source: airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: TestLimits) -> airbyte_cdk.AirbyteMessage:
 97def read_stream(
 98    source: DeclarativeSource,
 99    config: Mapping[str, Any],
100    configured_catalog: ConfiguredAirbyteCatalog,
101    state: List[AirbyteStateMessage],
102    limits: TestLimits,
103) -> AirbyteMessage:
104    try:
105        test_read_handler = TestReader(
106            limits.max_pages_per_slice, limits.max_slices, limits.max_records
107        )
108        # The connector builder only supports a single stream
109        stream_name = configured_catalog.streams[0].stream.name
110
111        stream_read = test_read_handler.run_test_read(
112            source,
113            config,
114            configured_catalog,
115            stream_name,
116            state,
117            limits.max_records,
118        )
119
120        return AirbyteMessage(
121            type=MessageType.RECORD,
122            record=AirbyteRecordMessage(
123                data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
124            ),
125        )
126    except Exception as exc:
127        error = AirbyteTracedException.from_exception(
128            exc,
129            message=filter_secrets(
130                f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
131            ),
132        )
133        return error.as_airbyte_message()
def resolve_manifest( source: airbyte_cdk.ManifestDeclarativeSource) -> airbyte_cdk.AirbyteMessage:
136def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
137    try:
138        return AirbyteMessage(
139            type=Type.RECORD,
140            record=AirbyteRecordMessage(
141                data={"manifest": source.resolved_manifest},
142                emitted_at=_emitted_at(),
143                stream="resolve_manifest",
144            ),
145        )
146    except Exception as exc:
147        error = AirbyteTracedException.from_exception(
148            exc, message=f"Error resolving manifest: {str(exc)}"
149        )
150        return error.as_airbyte_message()
def full_resolve_manifest( source: airbyte_cdk.ManifestDeclarativeSource, limits: TestLimits) -> airbyte_cdk.AirbyteMessage:
153def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
154    try:
155        manifest = {**source.resolved_manifest}
156        streams = manifest.get("streams", [])
157        for stream in streams:
158            stream["dynamic_stream_name"] = None
159
160        mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
161        for stream in source.dynamic_streams:
162            generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
163
164            if len(generated_streams) < limits.max_streams:
165                generated_streams += [stream]
166
167        for generated_streams_list in mapped_streams.values():
168            streams.extend(generated_streams_list)
169
170        manifest["streams"] = streams
171        return AirbyteMessage(
172            type=Type.RECORD,
173            record=AirbyteRecordMessage(
174                data={"manifest": manifest},
175                emitted_at=_emitted_at(),
176                stream="full_resolve_manifest",
177            ),
178        )
179    except AirbyteTracedException as exc:
180        return exc.as_airbyte_message()
181    except Exception as exc:
182        error = AirbyteTracedException.from_exception(
183            exc, message=f"Error full resolving manifest: {str(exc)}"
184        )
185        return error.as_airbyte_message()