airbyte_cdk.connector_builder.connector_builder_handler
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6from dataclasses import asdict, dataclass, field 7from typing import Any, ClassVar, Dict, List, Mapping 8 9from airbyte_cdk.connector_builder.test_reader import TestReader 10from airbyte_cdk.models import ( 11 AirbyteMessage, 12 AirbyteRecordMessage, 13 AirbyteStateMessage, 14 ConfiguredAirbyteCatalog, 15 Type, 16) 17from airbyte_cdk.models import Type as MessageType 18from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource 19from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource 20from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 21 ModelToComponentFactory, 22) 23from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets 24from airbyte_cdk.utils.datetime_helpers import ab_datetime_now 25from airbyte_cdk.utils.traced_exception import AirbyteTracedException 26 27DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5 28DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5 29DEFAULT_MAXIMUM_RECORDS = 100 30DEFAULT_MAXIMUM_STREAMS = 100 31 32MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice" 33MAX_SLICES_KEY = "max_slices" 34MAX_RECORDS_KEY = "max_records" 35MAX_STREAMS_KEY = "max_streams" 36 37 38@dataclass 39class TestLimits: 40 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 41 42 max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS) 43 max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE) 44 max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES) 45 max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS) 46 47 48def get_limits(config: Mapping[str, Any]) -> TestLimits: 49 command_config = config.get("__test_read_config", {}) 50 max_pages_per_slice = ( 51 command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE 52 ) 53 max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES 54 max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS 55 max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS 56 return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams) 57 58 59def should_migrate_manifest(config: Mapping[str, Any]) -> bool: 60 """ 61 Determines whether the manifest should be migrated, 62 based on the presence of the "__should_migrate" key in the config. 63 64 This flag is set by the UI. 65 """ 66 return config.get("__should_migrate", False) 67 68 69def should_normalize_manifest(config: Mapping[str, Any]) -> bool: 70 """ 71 Check if the manifest should be normalized. 72 :param config: The configuration to check 73 :return: True if the manifest should be normalized, False otherwise. 74 """ 75 return config.get("__should_normalize", False) 76 77 78def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource: 79 manifest = config["__injected_declarative_manifest"] 80 return ManifestDeclarativeSource( 81 config=config, 82 emit_connector_builder_messages=True, 83 source_config=manifest, 84 migrate_manifest=should_migrate_manifest(config), 85 normalize_manifest=should_normalize_manifest(config), 86 component_factory=ModelToComponentFactory( 87 emit_connector_builder_messages=True, 88 limit_pages_fetched_per_slice=limits.max_pages_per_slice, 89 limit_slices_fetched=limits.max_slices, 90 disable_retries=True, 91 disable_cache=True, 92 ), 93 ) 94 95 96def read_stream( 97 source: DeclarativeSource, 98 config: Mapping[str, Any], 99 configured_catalog: ConfiguredAirbyteCatalog, 100 state: List[AirbyteStateMessage], 101 limits: TestLimits, 102) -> AirbyteMessage: 103 try: 104 test_read_handler = TestReader( 105 limits.max_pages_per_slice, limits.max_slices, limits.max_records 106 ) 107 # The connector builder only supports a single stream 108 stream_name = configured_catalog.streams[0].stream.name 109 110 stream_read = test_read_handler.run_test_read( 111 source, 112 config, 113 configured_catalog, 114 stream_name, 115 state, 116 limits.max_records, 117 ) 118 119 return AirbyteMessage( 120 type=MessageType.RECORD, 121 record=AirbyteRecordMessage( 122 data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at() 123 ), 124 ) 125 except Exception as exc: 126 error = AirbyteTracedException.from_exception( 127 exc, 128 message=filter_secrets( 129 f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}" 130 ), 131 ) 132 return error.as_airbyte_message() 133 134 135def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: 136 try: 137 return AirbyteMessage( 138 type=Type.RECORD, 139 record=AirbyteRecordMessage( 140 data={"manifest": source.resolved_manifest}, 141 emitted_at=_emitted_at(), 142 stream="resolve_manifest", 143 ), 144 ) 145 except Exception as exc: 146 error = AirbyteTracedException.from_exception( 147 exc, message=f"Error resolving manifest: {str(exc)}" 148 ) 149 return error.as_airbyte_message() 150 151 152def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage: 153 try: 154 manifest = {**source.resolved_manifest} 155 streams = manifest.get("streams", []) 156 for stream in streams: 157 stream["dynamic_stream_name"] = None 158 159 mapped_streams: Dict[str, List[Dict[str, Any]]] = {} 160 for stream in source.dynamic_streams: 161 generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) 162 163 if len(generated_streams) < limits.max_streams: 164 generated_streams += [stream] 165 166 for generated_streams_list in mapped_streams.values(): 167 streams.extend(generated_streams_list) 168 169 manifest["streams"] = streams 170 return AirbyteMessage( 171 type=Type.RECORD, 172 record=AirbyteRecordMessage( 173 data={"manifest": manifest}, 174 emitted_at=_emitted_at(), 175 stream="full_resolve_manifest", 176 ), 177 ) 178 except AirbyteTracedException as exc: 179 return exc.as_airbyte_message() 180 except Exception as exc: 181 error = AirbyteTracedException.from_exception( 182 exc, message=f"Error full resolving manifest: {str(exc)}" 183 ) 184 return error.as_airbyte_message() 185 186 187def _emitted_at() -> int: 188 return ab_datetime_now().to_epoch_millis()
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE =
5
DEFAULT_MAXIMUM_NUMBER_OF_SLICES =
5
DEFAULT_MAXIMUM_RECORDS =
100
DEFAULT_MAXIMUM_STREAMS =
100
MAX_PAGES_PER_SLICE_KEY =
'max_pages_per_slice'
MAX_SLICES_KEY =
'max_slices'
MAX_RECORDS_KEY =
'max_records'
MAX_STREAMS_KEY =
'max_streams'
@dataclass
class
TestLimits:
39@dataclass 40class TestLimits: 41 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 42 43 max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS) 44 max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE) 45 max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES) 46 max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)
49def get_limits(config: Mapping[str, Any]) -> TestLimits: 50 command_config = config.get("__test_read_config", {}) 51 max_pages_per_slice = ( 52 command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE 53 ) 54 max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES 55 max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS 56 max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS 57 return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
def
should_migrate_manifest(config: Mapping[str, Any]) -> bool:
60def should_migrate_manifest(config: Mapping[str, Any]) -> bool: 61 """ 62 Determines whether the manifest should be migrated, 63 based on the presence of the "__should_migrate" key in the config. 64 65 This flag is set by the UI. 66 """ 67 return config.get("__should_migrate", False)
Determines whether the manifest should be migrated, based on the presence of the "__should_migrate" key in the config.
This flag is set by the UI.
def
should_normalize_manifest(config: Mapping[str, Any]) -> bool:
70def should_normalize_manifest(config: Mapping[str, Any]) -> bool: 71 """ 72 Check if the manifest should be normalized. 73 :param config: The configuration to check 74 :return: True if the manifest should be normalized, False otherwise. 75 """ 76 return config.get("__should_normalize", False)
Check if the manifest should be normalized.
Parameters
- config: The configuration to check
Returns
True if the manifest should be normalized, False otherwise.
def
create_source( config: Mapping[str, Any], limits: TestLimits) -> airbyte_cdk.ManifestDeclarativeSource:
79def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource: 80 manifest = config["__injected_declarative_manifest"] 81 return ManifestDeclarativeSource( 82 config=config, 83 emit_connector_builder_messages=True, 84 source_config=manifest, 85 migrate_manifest=should_migrate_manifest(config), 86 normalize_manifest=should_normalize_manifest(config), 87 component_factory=ModelToComponentFactory( 88 emit_connector_builder_messages=True, 89 limit_pages_fetched_per_slice=limits.max_pages_per_slice, 90 limit_slices_fetched=limits.max_slices, 91 disable_retries=True, 92 disable_cache=True, 93 ), 94 )
def
read_stream( source: airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: TestLimits) -> airbyte_cdk.AirbyteMessage:
97def read_stream( 98 source: DeclarativeSource, 99 config: Mapping[str, Any], 100 configured_catalog: ConfiguredAirbyteCatalog, 101 state: List[AirbyteStateMessage], 102 limits: TestLimits, 103) -> AirbyteMessage: 104 try: 105 test_read_handler = TestReader( 106 limits.max_pages_per_slice, limits.max_slices, limits.max_records 107 ) 108 # The connector builder only supports a single stream 109 stream_name = configured_catalog.streams[0].stream.name 110 111 stream_read = test_read_handler.run_test_read( 112 source, 113 config, 114 configured_catalog, 115 stream_name, 116 state, 117 limits.max_records, 118 ) 119 120 return AirbyteMessage( 121 type=MessageType.RECORD, 122 record=AirbyteRecordMessage( 123 data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at() 124 ), 125 ) 126 except Exception as exc: 127 error = AirbyteTracedException.from_exception( 128 exc, 129 message=filter_secrets( 130 f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}" 131 ), 132 ) 133 return error.as_airbyte_message()
136def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: 137 try: 138 return AirbyteMessage( 139 type=Type.RECORD, 140 record=AirbyteRecordMessage( 141 data={"manifest": source.resolved_manifest}, 142 emitted_at=_emitted_at(), 143 stream="resolve_manifest", 144 ), 145 ) 146 except Exception as exc: 147 error = AirbyteTracedException.from_exception( 148 exc, message=f"Error resolving manifest: {str(exc)}" 149 ) 150 return error.as_airbyte_message()
def
full_resolve_manifest( source: airbyte_cdk.ManifestDeclarativeSource, limits: TestLimits) -> airbyte_cdk.AirbyteMessage:
153def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage: 154 try: 155 manifest = {**source.resolved_manifest} 156 streams = manifest.get("streams", []) 157 for stream in streams: 158 stream["dynamic_stream_name"] = None 159 160 mapped_streams: Dict[str, List[Dict[str, Any]]] = {} 161 for stream in source.dynamic_streams: 162 generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) 163 164 if len(generated_streams) < limits.max_streams: 165 generated_streams += [stream] 166 167 for generated_streams_list in mapped_streams.values(): 168 streams.extend(generated_streams_list) 169 170 manifest["streams"] = streams 171 return AirbyteMessage( 172 type=Type.RECORD, 173 record=AirbyteRecordMessage( 174 data={"manifest": manifest}, 175 emitted_at=_emitted_at(), 176 stream="full_resolve_manifest", 177 ), 178 ) 179 except AirbyteTracedException as exc: 180 return exc.as_airbyte_message() 181 except Exception as exc: 182 error = AirbyteTracedException.from_exception( 183 exc, message=f"Error full resolving manifest: {str(exc)}" 184 ) 185 return error.as_airbyte_message()