airbyte_cdk.connector_builder.connector_builder_handler
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6from dataclasses import asdict, dataclass, field 7from typing import Any, ClassVar, Dict, List, Mapping 8 9from airbyte_cdk.connector_builder.test_reader import TestReader 10from airbyte_cdk.models import ( 11 AirbyteMessage, 12 AirbyteRecordMessage, 13 AirbyteStateMessage, 14 ConfiguredAirbyteCatalog, 15 Type, 16) 17from airbyte_cdk.models import Type as MessageType 18from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource 19from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource 20from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 21 ModelToComponentFactory, 22) 23from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets 24from airbyte_cdk.utils.datetime_helpers import ab_datetime_now 25from airbyte_cdk.utils.traced_exception import AirbyteTracedException 26 27DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5 28DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5 29DEFAULT_MAXIMUM_RECORDS = 100 30DEFAULT_MAXIMUM_STREAMS = 100 31 32MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice" 33MAX_SLICES_KEY = "max_slices" 34MAX_RECORDS_KEY = "max_records" 35MAX_STREAMS_KEY = "max_streams" 36 37 38@dataclass 39class TestLimits: 40 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 41 42 max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS) 43 max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE) 44 max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES) 45 max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS) 46 47 48def get_limits(config: Mapping[str, Any]) -> TestLimits: 49 command_config = config.get("__test_read_config", {}) 50 max_pages_per_slice = ( 51 command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE 52 ) 53 max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES 54 max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS 55 max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS 56 return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams) 57 58 59def should_migrate_manifest(config: Mapping[str, Any]) -> bool: 60 """ 61 Determines whether the manifest should be migrated, 62 based on the presence of the "__should_migrate" key in the config. 63 64 This flag is set by the UI. 65 """ 66 return config.get("__should_migrate", False) 67 68 69def should_normalize_manifest(config: Mapping[str, Any]) -> bool: 70 """ 71 Check if the manifest should be normalized. 72 :param config: The configuration to check 73 :return: True if the manifest should be normalized, False otherwise. 74 """ 75 return config.get("__should_normalize", False) 76 77 78def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource: 79 manifest = config["__injected_declarative_manifest"] 80 return ManifestDeclarativeSource( 81 config=config, 82 emit_connector_builder_messages=True, 83 source_config=manifest, 84 migrate_manifest=should_migrate_manifest(config), 85 normalize_manifest=should_normalize_manifest(config), 86 component_factory=ModelToComponentFactory( 87 emit_connector_builder_messages=True, 88 limit_pages_fetched_per_slice=limits.max_pages_per_slice, 89 limit_slices_fetched=limits.max_slices, 90 disable_retries=True, 91 disable_cache=True, 92 ), 93 ) 94 95 96def read_stream( 97 source: DeclarativeSource, 98 config: Mapping[str, Any], 99 configured_catalog: ConfiguredAirbyteCatalog, 100 state: List[AirbyteStateMessage], 101 limits: TestLimits, 102) -> AirbyteMessage: 103 try: 104 test_read_handler = TestReader( 105 limits.max_pages_per_slice, limits.max_slices, limits.max_records 106 ) 107 # The connector builder only supports a single stream 108 stream_name = configured_catalog.streams[0].stream.name 109 110 stream_read = test_read_handler.run_test_read( 111 source, config, configured_catalog, state, limits.max_records 112 ) 113 114 return AirbyteMessage( 115 type=MessageType.RECORD, 116 record=AirbyteRecordMessage( 117 data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at() 118 ), 119 ) 120 except Exception as exc: 121 error = AirbyteTracedException.from_exception( 122 exc, 123 message=filter_secrets( 124 f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}" 125 ), 126 ) 127 return error.as_airbyte_message() 128 129 130def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: 131 try: 132 return AirbyteMessage( 133 type=Type.RECORD, 134 record=AirbyteRecordMessage( 135 data={"manifest": source.resolved_manifest}, 136 emitted_at=_emitted_at(), 137 stream="resolve_manifest", 138 ), 139 ) 140 except Exception as exc: 141 error = AirbyteTracedException.from_exception( 142 exc, message=f"Error resolving manifest: {str(exc)}" 143 ) 144 return error.as_airbyte_message() 145 146 147def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage: 148 try: 149 manifest = {**source.resolved_manifest} 150 streams = manifest.get("streams", []) 151 for stream in streams: 152 stream["dynamic_stream_name"] = None 153 154 mapped_streams: Dict[str, List[Dict[str, Any]]] = {} 155 for stream in source.dynamic_streams: 156 generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) 157 158 if len(generated_streams) < limits.max_streams: 159 generated_streams += [stream] 160 161 for generated_streams_list in mapped_streams.values(): 162 streams.extend(generated_streams_list) 163 164 manifest["streams"] = streams 165 return AirbyteMessage( 166 type=Type.RECORD, 167 record=AirbyteRecordMessage( 168 data={"manifest": manifest}, 169 emitted_at=_emitted_at(), 170 stream="full_resolve_manifest", 171 ), 172 ) 173 except AirbyteTracedException as exc: 174 return exc.as_airbyte_message() 175 except Exception as exc: 176 error = AirbyteTracedException.from_exception( 177 exc, message=f"Error full resolving manifest: {str(exc)}" 178 ) 179 return error.as_airbyte_message() 180 181 182def _emitted_at() -> int: 183 return ab_datetime_now().to_epoch_millis()
DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE =
5
DEFAULT_MAXIMUM_NUMBER_OF_SLICES =
5
DEFAULT_MAXIMUM_RECORDS =
100
DEFAULT_MAXIMUM_STREAMS =
100
MAX_PAGES_PER_SLICE_KEY =
'max_pages_per_slice'
MAX_SLICES_KEY =
'max_slices'
MAX_RECORDS_KEY =
'max_records'
MAX_STREAMS_KEY =
'max_streams'
@dataclass
class
TestLimits:
39@dataclass 40class TestLimits: 41 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 42 43 max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS) 44 max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE) 45 max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES) 46 max_streams: int = field(default=DEFAULT_MAXIMUM_STREAMS)
49def get_limits(config: Mapping[str, Any]) -> TestLimits: 50 command_config = config.get("__test_read_config", {}) 51 max_pages_per_slice = ( 52 command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE 53 ) 54 max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES 55 max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS 56 max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS 57 return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
def
should_migrate_manifest(config: Mapping[str, Any]) -> bool:
60def should_migrate_manifest(config: Mapping[str, Any]) -> bool: 61 """ 62 Determines whether the manifest should be migrated, 63 based on the presence of the "__should_migrate" key in the config. 64 65 This flag is set by the UI. 66 """ 67 return config.get("__should_migrate", False)
Determines whether the manifest should be migrated, based on the presence of the "__should_migrate" key in the config.
This flag is set by the UI.
def
should_normalize_manifest(config: Mapping[str, Any]) -> bool:
70def should_normalize_manifest(config: Mapping[str, Any]) -> bool: 71 """ 72 Check if the manifest should be normalized. 73 :param config: The configuration to check 74 :return: True if the manifest should be normalized, False otherwise. 75 """ 76 return config.get("__should_normalize", False)
Check if the manifest should be normalized.
Parameters
- config: The configuration to check
Returns
True if the manifest should be normalized, False otherwise.
def
create_source( config: Mapping[str, Any], limits: TestLimits) -> airbyte_cdk.ManifestDeclarativeSource:
79def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource: 80 manifest = config["__injected_declarative_manifest"] 81 return ManifestDeclarativeSource( 82 config=config, 83 emit_connector_builder_messages=True, 84 source_config=manifest, 85 migrate_manifest=should_migrate_manifest(config), 86 normalize_manifest=should_normalize_manifest(config), 87 component_factory=ModelToComponentFactory( 88 emit_connector_builder_messages=True, 89 limit_pages_fetched_per_slice=limits.max_pages_per_slice, 90 limit_slices_fetched=limits.max_slices, 91 disable_retries=True, 92 disable_cache=True, 93 ), 94 )
def
read_stream( source: airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: TestLimits) -> airbyte_cdk.AirbyteMessage:
97def read_stream( 98 source: DeclarativeSource, 99 config: Mapping[str, Any], 100 configured_catalog: ConfiguredAirbyteCatalog, 101 state: List[AirbyteStateMessage], 102 limits: TestLimits, 103) -> AirbyteMessage: 104 try: 105 test_read_handler = TestReader( 106 limits.max_pages_per_slice, limits.max_slices, limits.max_records 107 ) 108 # The connector builder only supports a single stream 109 stream_name = configured_catalog.streams[0].stream.name 110 111 stream_read = test_read_handler.run_test_read( 112 source, config, configured_catalog, state, limits.max_records 113 ) 114 115 return AirbyteMessage( 116 type=MessageType.RECORD, 117 record=AirbyteRecordMessage( 118 data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at() 119 ), 120 ) 121 except Exception as exc: 122 error = AirbyteTracedException.from_exception( 123 exc, 124 message=filter_secrets( 125 f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}" 126 ), 127 ) 128 return error.as_airbyte_message()
131def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage: 132 try: 133 return AirbyteMessage( 134 type=Type.RECORD, 135 record=AirbyteRecordMessage( 136 data={"manifest": source.resolved_manifest}, 137 emitted_at=_emitted_at(), 138 stream="resolve_manifest", 139 ), 140 ) 141 except Exception as exc: 142 error = AirbyteTracedException.from_exception( 143 exc, message=f"Error resolving manifest: {str(exc)}" 144 ) 145 return error.as_airbyte_message()
def
full_resolve_manifest( source: airbyte_cdk.ManifestDeclarativeSource, limits: TestLimits) -> airbyte_cdk.AirbyteMessage:
148def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage: 149 try: 150 manifest = {**source.resolved_manifest} 151 streams = manifest.get("streams", []) 152 for stream in streams: 153 stream["dynamic_stream_name"] = None 154 155 mapped_streams: Dict[str, List[Dict[str, Any]]] = {} 156 for stream in source.dynamic_streams: 157 generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) 158 159 if len(generated_streams) < limits.max_streams: 160 generated_streams += [stream] 161 162 for generated_streams_list in mapped_streams.values(): 163 streams.extend(generated_streams_list) 164 165 manifest["streams"] = streams 166 return AirbyteMessage( 167 type=Type.RECORD, 168 record=AirbyteRecordMessage( 169 data={"manifest": manifest}, 170 emitted_at=_emitted_at(), 171 stream="full_resolve_manifest", 172 ), 173 ) 174 except AirbyteTracedException as exc: 175 return exc.as_airbyte_message() 176 except Exception as exc: 177 error = AirbyteTracedException.from_exception( 178 exc, message=f"Error full resolving manifest: {str(exc)}" 179 ) 180 return error.as_airbyte_message()