airbyte_cdk.connector_builder.connector_builder_handler
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6from dataclasses import asdict 7from typing import Any, Dict, List, Mapping, Optional 8 9from airbyte_cdk.connector_builder.test_reader import TestReader 10from airbyte_cdk.models import ( 11 AirbyteMessage, 12 AirbyteRecordMessage, 13 AirbyteStateMessage, 14 ConfiguredAirbyteCatalog, 15 Type, 16) 17from airbyte_cdk.models import Type as MessageType 18from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( 19 ConcurrentDeclarativeSource, 20 TestLimits, 21) 22from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets 23from airbyte_cdk.utils.datetime_helpers import ab_datetime_now 24from airbyte_cdk.utils.traced_exception import AirbyteTracedException 25 26MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice" 27MAX_SLICES_KEY = "max_slices" 28MAX_RECORDS_KEY = "max_records" 29MAX_STREAMS_KEY = "max_streams" 30 31 32def get_limits(config: Mapping[str, Any]) -> TestLimits: 33 command_config = config.get("__test_read_config", {}) 34 return TestLimits( 35 max_records=command_config.get(MAX_RECORDS_KEY, TestLimits.DEFAULT_MAX_RECORDS), 36 max_pages_per_slice=command_config.get( 37 MAX_PAGES_PER_SLICE_KEY, TestLimits.DEFAULT_MAX_PAGES_PER_SLICE 38 ), 39 max_slices=command_config.get(MAX_SLICES_KEY, TestLimits.DEFAULT_MAX_SLICES), 40 max_streams=command_config.get(MAX_STREAMS_KEY, TestLimits.DEFAULT_MAX_STREAMS), 41 ) 42 43 44def should_migrate_manifest(config: Mapping[str, Any]) -> bool: 45 """ 46 Determines whether the manifest should be migrated, 47 based on the presence of the "__should_migrate" key in the config. 48 49 This flag is set by the UI. 50 """ 51 return config.get("__should_migrate", False) 52 53 54def should_normalize_manifest(config: Mapping[str, Any]) -> bool: 55 """ 56 Check if the manifest should be normalized. 57 :param config: The configuration to check 58 :return: True if the manifest should be normalized, False otherwise. 59 """ 60 return config.get("__should_normalize", False) 61 62 63def create_source( 64 config: Mapping[str, Any], 65 limits: TestLimits | None = None, 66 catalog: ConfiguredAirbyteCatalog | None = None, 67 state: List[AirbyteStateMessage] | None = None, 68) -> ConcurrentDeclarativeSource: 69 manifest = config["__injected_declarative_manifest"] 70 71 # We enforce a concurrency level of 1 so that the stream is processed on a single thread 72 # to retain ordering for the grouping of the builder message responses. 73 if "concurrency_level" in manifest: 74 manifest["concurrency_level"]["default_concurrency"] = 1 75 else: 76 manifest["concurrency_level"] = {"type": "ConcurrencyLevel", "default_concurrency": 1} 77 78 return ConcurrentDeclarativeSource( 79 catalog=catalog, 80 config=config, 81 state=state, 82 source_config=manifest, 83 emit_connector_builder_messages=True, 84 migrate_manifest=should_migrate_manifest(config), 85 normalize_manifest=should_normalize_manifest(config), 86 limits=limits, 87 ) 88 89 90def read_stream( 91 source: ConcurrentDeclarativeSource, 92 config: Mapping[str, Any], 93 configured_catalog: ConfiguredAirbyteCatalog, 94 state: List[AirbyteStateMessage], 95 limits: TestLimits, 96) -> AirbyteMessage: 97 try: 98 test_read_handler = TestReader( 99 limits.max_pages_per_slice, limits.max_slices, limits.max_records 100 ) 101 # The connector builder only supports a single stream 102 stream_name = configured_catalog.streams[0].stream.name 103 104 stream_read = test_read_handler.run_test_read( 105 source, 106 config, 107 configured_catalog, 108 stream_name, 109 state, 110 limits.max_records, 111 ) 112 113 return AirbyteMessage( 114 type=MessageType.RECORD, 115 record=AirbyteRecordMessage( 116 data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at() 117 ), 118 ) 119 except Exception as exc: 120 error = AirbyteTracedException.from_exception( 121 exc, 122 message=filter_secrets( 123 f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}" 124 ), 125 ) 126 return error.as_airbyte_message() 127 128 129def resolve_manifest( 130 source: ConcurrentDeclarativeSource, 131) -> AirbyteMessage: 132 try: 133 return AirbyteMessage( 134 type=Type.RECORD, 135 record=AirbyteRecordMessage( 136 data={"manifest": source.resolved_manifest}, 137 emitted_at=_emitted_at(), 138 stream="resolve_manifest", 139 ), 140 ) 141 except Exception as exc: 142 error = AirbyteTracedException.from_exception( 143 exc, message=f"Error resolving manifest: {str(exc)}" 144 ) 145 return error.as_airbyte_message() 146 147 148def full_resolve_manifest( 149 source: ConcurrentDeclarativeSource, limits: TestLimits 150) -> AirbyteMessage: 151 try: 152 manifest = {**source.resolved_manifest} 153 streams = manifest.get("streams", []) 154 for stream in streams: 155 stream["dynamic_stream_name"] = None 156 157 mapped_streams: Dict[str, List[Dict[str, Any]]] = {} 158 for stream in source.dynamic_streams: 159 generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) 160 161 if len(generated_streams) < limits.max_streams: 162 generated_streams += [stream] 163 164 for generated_streams_list in mapped_streams.values(): 165 streams.extend(generated_streams_list) 166 167 manifest["streams"] = streams 168 return AirbyteMessage( 169 type=Type.RECORD, 170 record=AirbyteRecordMessage( 171 data={"manifest": manifest}, 172 emitted_at=_emitted_at(), 173 stream="full_resolve_manifest", 174 ), 175 ) 176 except AirbyteTracedException as exc: 177 return exc.as_airbyte_message() 178 except Exception as exc: 179 error = AirbyteTracedException.from_exception( 180 exc, message=f"Error full resolving manifest: {str(exc)}" 181 ) 182 return error.as_airbyte_message() 183 184 185def _emitted_at() -> int: 186 return ab_datetime_now().to_epoch_millis()
MAX_PAGES_PER_SLICE_KEY =
'max_pages_per_slice'
MAX_SLICES_KEY =
'max_slices'
MAX_RECORDS_KEY =
'max_records'
MAX_STREAMS_KEY =
'max_streams'
def
get_limits( config: Mapping[str, Any]) -> airbyte_cdk.sources.declarative.concurrent_declarative_source.TestLimits:
33def get_limits(config: Mapping[str, Any]) -> TestLimits: 34 command_config = config.get("__test_read_config", {}) 35 return TestLimits( 36 max_records=command_config.get(MAX_RECORDS_KEY, TestLimits.DEFAULT_MAX_RECORDS), 37 max_pages_per_slice=command_config.get( 38 MAX_PAGES_PER_SLICE_KEY, TestLimits.DEFAULT_MAX_PAGES_PER_SLICE 39 ), 40 max_slices=command_config.get(MAX_SLICES_KEY, TestLimits.DEFAULT_MAX_SLICES), 41 max_streams=command_config.get(MAX_STREAMS_KEY, TestLimits.DEFAULT_MAX_STREAMS), 42 )
def
should_migrate_manifest(config: Mapping[str, Any]) -> bool:
45def should_migrate_manifest(config: Mapping[str, Any]) -> bool: 46 """ 47 Determines whether the manifest should be migrated, 48 based on the presence of the "__should_migrate" key in the config. 49 50 This flag is set by the UI. 51 """ 52 return config.get("__should_migrate", False)
Determines whether the manifest should be migrated, based on the presence of the "__should_migrate" key in the config.
This flag is set by the UI.
def
should_normalize_manifest(config: Mapping[str, Any]) -> bool:
55def should_normalize_manifest(config: Mapping[str, Any]) -> bool: 56 """ 57 Check if the manifest should be normalized. 58 :param config: The configuration to check 59 :return: True if the manifest should be normalized, False otherwise. 60 """ 61 return config.get("__should_normalize", False)
Check if the manifest should be normalized.
Parameters
- config: The configuration to check
Returns
True if the manifest should be normalized, False otherwise.
def
create_source( config: Mapping[str, Any], limits: airbyte_cdk.sources.declarative.concurrent_declarative_source.TestLimits | None = None, catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog | None = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource:
64def create_source( 65 config: Mapping[str, Any], 66 limits: TestLimits | None = None, 67 catalog: ConfiguredAirbyteCatalog | None = None, 68 state: List[AirbyteStateMessage] | None = None, 69) -> ConcurrentDeclarativeSource: 70 manifest = config["__injected_declarative_manifest"] 71 72 # We enforce a concurrency level of 1 so that the stream is processed on a single thread 73 # to retain ordering for the grouping of the builder message responses. 74 if "concurrency_level" in manifest: 75 manifest["concurrency_level"]["default_concurrency"] = 1 76 else: 77 manifest["concurrency_level"] = {"type": "ConcurrencyLevel", "default_concurrency": 1} 78 79 return ConcurrentDeclarativeSource( 80 catalog=catalog, 81 config=config, 82 state=state, 83 source_config=manifest, 84 emit_connector_builder_messages=True, 85 migrate_manifest=should_migrate_manifest(config), 86 normalize_manifest=should_normalize_manifest(config), 87 limits=limits, 88 )
def
read_stream( source: airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], limits: airbyte_cdk.sources.declarative.concurrent_declarative_source.TestLimits) -> airbyte_cdk.AirbyteMessage:
91def read_stream( 92 source: ConcurrentDeclarativeSource, 93 config: Mapping[str, Any], 94 configured_catalog: ConfiguredAirbyteCatalog, 95 state: List[AirbyteStateMessage], 96 limits: TestLimits, 97) -> AirbyteMessage: 98 try: 99 test_read_handler = TestReader( 100 limits.max_pages_per_slice, limits.max_slices, limits.max_records 101 ) 102 # The connector builder only supports a single stream 103 stream_name = configured_catalog.streams[0].stream.name 104 105 stream_read = test_read_handler.run_test_read( 106 source, 107 config, 108 configured_catalog, 109 stream_name, 110 state, 111 limits.max_records, 112 ) 113 114 return AirbyteMessage( 115 type=MessageType.RECORD, 116 record=AirbyteRecordMessage( 117 data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at() 118 ), 119 ) 120 except Exception as exc: 121 error = AirbyteTracedException.from_exception( 122 exc, 123 message=filter_secrets( 124 f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}" 125 ), 126 ) 127 return error.as_airbyte_message()
def
resolve_manifest( source: airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource) -> airbyte_cdk.AirbyteMessage:
130def resolve_manifest( 131 source: ConcurrentDeclarativeSource, 132) -> AirbyteMessage: 133 try: 134 return AirbyteMessage( 135 type=Type.RECORD, 136 record=AirbyteRecordMessage( 137 data={"manifest": source.resolved_manifest}, 138 emitted_at=_emitted_at(), 139 stream="resolve_manifest", 140 ), 141 ) 142 except Exception as exc: 143 error = AirbyteTracedException.from_exception( 144 exc, message=f"Error resolving manifest: {str(exc)}" 145 ) 146 return error.as_airbyte_message()
def
full_resolve_manifest( source: airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource, limits: airbyte_cdk.sources.declarative.concurrent_declarative_source.TestLimits) -> airbyte_cdk.AirbyteMessage:
149def full_resolve_manifest( 150 source: ConcurrentDeclarativeSource, limits: TestLimits 151) -> AirbyteMessage: 152 try: 153 manifest = {**source.resolved_manifest} 154 streams = manifest.get("streams", []) 155 for stream in streams: 156 stream["dynamic_stream_name"] = None 157 158 mapped_streams: Dict[str, List[Dict[str, Any]]] = {} 159 for stream in source.dynamic_streams: 160 generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], []) 161 162 if len(generated_streams) < limits.max_streams: 163 generated_streams += [stream] 164 165 for generated_streams_list in mapped_streams.values(): 166 streams.extend(generated_streams_list) 167 168 manifest["streams"] = streams 169 return AirbyteMessage( 170 type=Type.RECORD, 171 record=AirbyteRecordMessage( 172 data={"manifest": manifest}, 173 emitted_at=_emitted_at(), 174 stream="full_resolve_manifest", 175 ), 176 ) 177 except AirbyteTracedException as exc: 178 return exc.as_airbyte_message() 179 except Exception as exc: 180 error = AirbyteTracedException.from_exception( 181 exc, message=f"Error full resolving manifest: {str(exc)}" 182 ) 183 return error.as_airbyte_message()