airbyte_cdk.sources.declarative.manifest_declarative_source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import json 6import logging 7import pkgutil 8from copy import deepcopy 9from importlib import metadata 10from types import ModuleType 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set 12 13import yaml 14from jsonschema.exceptions import ValidationError 15from jsonschema.validators import validate 16from packaging.version import InvalidVersion, Version 17 18from airbyte_cdk.models import ( 19 AirbyteConnectionStatus, 20 AirbyteMessage, 21 AirbyteStateMessage, 22 ConfiguredAirbyteCatalog, 23 ConnectorSpecification, 24 FailureType, 25) 26from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 27from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 28from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource 29from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 30 DeclarativeStream as DeclarativeStreamModel, 31) 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel 33from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 34 StateDelegatingStream as StateDelegatingStreamModel, 35) 36from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 37 get_registered_components_module, 38) 39from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 40 ManifestComponentTransformer, 41) 42from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 43 ManifestReferenceResolver, 44) 45from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 46 ModelToComponentFactory, 47) 48from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 49from airbyte_cdk.sources.message import MessageRepository 50from airbyte_cdk.sources.streams.core import Stream 51from airbyte_cdk.sources.types import ConnectionDefinition 52from airbyte_cdk.sources.utils.slice_logger import ( 53 AlwaysLogSliceLogger, 54 DebugSliceLogger, 55 SliceLogger, 56) 57from airbyte_cdk.utils.traced_exception import AirbyteTracedException 58 59 60class ManifestDeclarativeSource(DeclarativeSource): 61 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 62 63 def __init__( 64 self, 65 source_config: ConnectionDefinition, 66 *, 67 config: Mapping[str, Any] | None = None, 68 debug: bool = False, 69 emit_connector_builder_messages: bool = False, 70 component_factory: Optional[ModelToComponentFactory] = None, 71 ): 72 """ 73 Args: 74 config: The provided config dict. 75 source_config: The manifest of low-code components that describe the source connector. 76 debug: True if debug mode is enabled. 77 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 78 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 79 """ 80 self.logger = logging.getLogger(f"airbyte.{self.name}") 81 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 82 manifest = dict(source_config) 83 if "type" not in manifest: 84 manifest["type"] = "DeclarativeSource" 85 86 # If custom components are needed, locate and/or register them. 87 self.components_module: ModuleType | None = get_registered_components_module(config=config) 88 89 resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) 90 propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( 91 "", resolved_source_config, {} 92 ) 93 self._source_config = propagated_source_config 94 self._debug = debug 95 self._emit_connector_builder_messages = emit_connector_builder_messages 96 self._constructor = ( 97 component_factory 98 if component_factory 99 else ModelToComponentFactory( 100 emit_connector_builder_messages, 101 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 102 ) 103 ) 104 self._message_repository = self._constructor.get_message_repository() 105 self._slice_logger: SliceLogger = ( 106 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 107 ) 108 109 self._config = config or {} 110 self._validate_source() 111 112 @property 113 def resolved_manifest(self) -> Mapping[str, Any]: 114 return self._source_config 115 116 @property 117 def message_repository(self) -> MessageRepository: 118 return self._message_repository 119 120 @property 121 def dynamic_streams(self) -> List[Dict[str, Any]]: 122 return self._dynamic_stream_configs( 123 manifest=self._source_config, config=self._config, with_dynamic_stream_name=True 124 ) 125 126 @property 127 def connection_checker(self) -> ConnectionChecker: 128 check = self._source_config["check"] 129 if "type" not in check: 130 check["type"] = "CheckStream" 131 check_stream = self._constructor.create_component( 132 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 133 check, 134 dict(), 135 emit_connector_builder_messages=self._emit_connector_builder_messages, 136 ) 137 if isinstance(check_stream, ConnectionChecker): 138 return check_stream 139 else: 140 raise ValueError( 141 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 142 ) 143 144 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 145 self._emit_manifest_debug_message( 146 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 147 ) 148 149 stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 150 self._source_config, config 151 ) 152 153 api_budget_model = self._source_config.get("api_budget") 154 if api_budget_model: 155 self._constructor.set_api_budget(api_budget_model, config) 156 157 source_streams = [ 158 self._constructor.create_component( 159 StateDelegatingStreamModel 160 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 161 else DeclarativeStreamModel, 162 stream_config, 163 config, 164 emit_connector_builder_messages=self._emit_connector_builder_messages, 165 ) 166 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 167 ] 168 169 return source_streams 170 171 @staticmethod 172 def _initialize_cache_for_parent_streams( 173 stream_configs: List[Dict[str, Any]], 174 ) -> List[Dict[str, Any]]: 175 parent_streams = set() 176 177 def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None: 178 for parent_config in parent_configs: 179 parent_streams.add(parent_config["stream"]["name"]) 180 if parent_config["stream"]["type"] == "StateDelegatingStream": 181 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 182 "use_cache" 183 ] = True 184 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 185 "use_cache" 186 ] = True 187 else: 188 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 189 190 for stream_config in stream_configs: 191 if stream_config.get("incremental_sync", {}).get("parent_stream"): 192 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 193 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 194 "use_cache" 195 ] = True 196 197 elif stream_config.get("retriever", {}).get("partition_router", {}): 198 partition_router = stream_config["retriever"]["partition_router"] 199 200 if isinstance(partition_router, dict) and partition_router.get( 201 "parent_stream_configs" 202 ): 203 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 204 elif isinstance(partition_router, list): 205 for router in partition_router: 206 if router.get("parent_stream_configs"): 207 update_with_cache_parent_configs(router["parent_stream_configs"]) 208 209 for stream_config in stream_configs: 210 if stream_config["name"] in parent_streams: 211 if stream_config["type"] == "StateDelegatingStream": 212 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 213 True 214 ) 215 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 216 True 217 ) 218 else: 219 stream_config["retriever"]["requester"]["use_cache"] = True 220 221 return stream_configs 222 223 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 224 """ 225 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 226 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 227 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 228 in the project root. 229 """ 230 self._configure_logger_level(logger) 231 self._emit_manifest_debug_message( 232 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 233 ) 234 235 spec = self._source_config.get("spec") 236 if spec: 237 if "type" not in spec: 238 spec["type"] = "Spec" 239 spec_component = self._constructor.create_component(SpecModel, spec, dict()) 240 return spec_component.generate_spec() 241 else: 242 return super().spec(logger) 243 244 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 245 self._configure_logger_level(logger) 246 return super().check(logger, config) 247 248 def read( 249 self, 250 logger: logging.Logger, 251 config: Mapping[str, Any], 252 catalog: ConfiguredAirbyteCatalog, 253 state: Optional[List[AirbyteStateMessage]] = None, 254 ) -> Iterator[AirbyteMessage]: 255 self._configure_logger_level(logger) 256 yield from super().read(logger, config, catalog, state) 257 258 def _configure_logger_level(self, logger: logging.Logger) -> None: 259 """ 260 Set the log level to logging.DEBUG if debug mode is enabled 261 """ 262 if self._debug: 263 logger.setLevel(logging.DEBUG) 264 265 def _validate_source(self) -> None: 266 """ 267 Validates the connector manifest against the declarative component schema 268 """ 269 try: 270 raw_component_schema = pkgutil.get_data( 271 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 272 ) 273 if raw_component_schema is not None: 274 declarative_component_schema = yaml.load( 275 raw_component_schema, Loader=yaml.SafeLoader 276 ) 277 else: 278 raise RuntimeError( 279 "Failed to read manifest component json schema required for validation" 280 ) 281 except FileNotFoundError as e: 282 raise FileNotFoundError( 283 f"Failed to read manifest component json schema required for validation: {e}" 284 ) 285 286 streams = self._source_config.get("streams") 287 dynamic_streams = self._source_config.get("dynamic_streams") 288 if not (streams or dynamic_streams): 289 raise ValidationError( 290 f"A valid manifest should have at least one stream defined. Got {streams}" 291 ) 292 293 try: 294 validate(self._source_config, declarative_component_schema) 295 except ValidationError as e: 296 raise ValidationError( 297 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 298 ) from e 299 300 cdk_version_str = metadata.version("airbyte_cdk") 301 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 302 manifest_version_str = self._source_config.get("version") 303 if manifest_version_str is None: 304 raise RuntimeError( 305 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 306 ) 307 manifest_version = self._parse_version(manifest_version_str, "manifest") 308 309 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 310 # Skipping version compatibility check on unreleased dev branch 311 pass 312 elif (cdk_version.major, cdk_version.minor) < ( 313 manifest_version.major, 314 manifest_version.minor, 315 ): 316 raise ValidationError( 317 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 318 f"manifest may contain features that are not in the current CDK version." 319 ) 320 elif (manifest_version.major, manifest_version.minor) < (0, 29): 321 raise ValidationError( 322 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 323 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 324 f"{cdk_version!s} which contains these breaking changes." 325 ) 326 327 @staticmethod 328 def _parse_version( 329 version: str, 330 version_type: str, 331 ) -> Version: 332 """Takes a semantic version represented as a string and splits it into a tuple. 333 334 The fourth part (prerelease) is not returned in the tuple. 335 336 Returns: 337 Version: the parsed version object 338 """ 339 try: 340 parsed_version = Version(version) 341 except InvalidVersion as ex: 342 raise ValidationError( 343 f"The {version_type} version '{version}' is not a valid version format." 344 ) from ex 345 else: 346 # No exception 347 return parsed_version 348 349 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 350 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 351 stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) 352 for s in stream_configs: 353 if "type" not in s: 354 s["type"] = "DeclarativeStream" 355 return stream_configs 356 357 def _dynamic_stream_configs( 358 self, 359 manifest: Mapping[str, Any], 360 config: Mapping[str, Any], 361 with_dynamic_stream_name: Optional[bool] = None, 362 ) -> List[Dict[str, Any]]: 363 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 364 dynamic_stream_configs: List[Dict[str, Any]] = [] 365 seen_dynamic_streams: Set[str] = set() 366 367 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 368 components_resolver_config = dynamic_definition["components_resolver"] 369 370 if not components_resolver_config: 371 raise ValueError( 372 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 373 ) 374 375 resolver_type = components_resolver_config.get("type") 376 if not resolver_type: 377 raise ValueError( 378 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 379 ) 380 381 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 382 raise ValueError( 383 f"Invalid components resolver type '{resolver_type}'. " 384 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 385 ) 386 387 if "retriever" in components_resolver_config: 388 components_resolver_config["retriever"]["requester"]["use_cache"] = True 389 390 # Create a resolver for dynamic components based on type 391 components_resolver = self._constructor.create_component( 392 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config 393 ) 394 395 stream_template_config = dynamic_definition["stream_template"] 396 397 for dynamic_stream in components_resolver.resolve_components( 398 stream_template_config=stream_template_config 399 ): 400 dynamic_stream = { 401 **ManifestComponentTransformer().propagate_types_and_parameters( 402 "", dynamic_stream, {}, use_parent_parameters=True 403 ) 404 } 405 406 if "type" not in dynamic_stream: 407 dynamic_stream["type"] = "DeclarativeStream" 408 409 # Ensure that each stream is created with a unique name 410 name = dynamic_stream.get("name") 411 412 if with_dynamic_stream_name: 413 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 414 "name", f"dynamic_stream_{dynamic_definition_index}" 415 ) 416 417 if not isinstance(name, str): 418 raise ValueError( 419 f"Expected stream name {name} to be a string, got {type(name)}." 420 ) 421 422 if name in seen_dynamic_streams: 423 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 424 failure_type = FailureType.system_error 425 426 if resolver_type == "ConfigComponentsResolver": 427 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 428 failure_type = FailureType.config_error 429 430 raise AirbyteTracedException( 431 message=error_message, 432 internal_message=error_message, 433 failure_type=failure_type, 434 ) 435 436 seen_dynamic_streams.add(name) 437 dynamic_stream_configs.append(dynamic_stream) 438 439 return dynamic_stream_configs 440 441 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 442 self.logger.debug("declarative source created from manifest", extra=extra_args)
61class ManifestDeclarativeSource(DeclarativeSource): 62 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 63 64 def __init__( 65 self, 66 source_config: ConnectionDefinition, 67 *, 68 config: Mapping[str, Any] | None = None, 69 debug: bool = False, 70 emit_connector_builder_messages: bool = False, 71 component_factory: Optional[ModelToComponentFactory] = None, 72 ): 73 """ 74 Args: 75 config: The provided config dict. 76 source_config: The manifest of low-code components that describe the source connector. 77 debug: True if debug mode is enabled. 78 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 79 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 80 """ 81 self.logger = logging.getLogger(f"airbyte.{self.name}") 82 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 83 manifest = dict(source_config) 84 if "type" not in manifest: 85 manifest["type"] = "DeclarativeSource" 86 87 # If custom components are needed, locate and/or register them. 88 self.components_module: ModuleType | None = get_registered_components_module(config=config) 89 90 resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) 91 propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( 92 "", resolved_source_config, {} 93 ) 94 self._source_config = propagated_source_config 95 self._debug = debug 96 self._emit_connector_builder_messages = emit_connector_builder_messages 97 self._constructor = ( 98 component_factory 99 if component_factory 100 else ModelToComponentFactory( 101 emit_connector_builder_messages, 102 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 103 ) 104 ) 105 self._message_repository = self._constructor.get_message_repository() 106 self._slice_logger: SliceLogger = ( 107 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 108 ) 109 110 self._config = config or {} 111 self._validate_source() 112 113 @property 114 def resolved_manifest(self) -> Mapping[str, Any]: 115 return self._source_config 116 117 @property 118 def message_repository(self) -> MessageRepository: 119 return self._message_repository 120 121 @property 122 def dynamic_streams(self) -> List[Dict[str, Any]]: 123 return self._dynamic_stream_configs( 124 manifest=self._source_config, config=self._config, with_dynamic_stream_name=True 125 ) 126 127 @property 128 def connection_checker(self) -> ConnectionChecker: 129 check = self._source_config["check"] 130 if "type" not in check: 131 check["type"] = "CheckStream" 132 check_stream = self._constructor.create_component( 133 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 134 check, 135 dict(), 136 emit_connector_builder_messages=self._emit_connector_builder_messages, 137 ) 138 if isinstance(check_stream, ConnectionChecker): 139 return check_stream 140 else: 141 raise ValueError( 142 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 143 ) 144 145 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 146 self._emit_manifest_debug_message( 147 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 148 ) 149 150 stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 151 self._source_config, config 152 ) 153 154 api_budget_model = self._source_config.get("api_budget") 155 if api_budget_model: 156 self._constructor.set_api_budget(api_budget_model, config) 157 158 source_streams = [ 159 self._constructor.create_component( 160 StateDelegatingStreamModel 161 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 162 else DeclarativeStreamModel, 163 stream_config, 164 config, 165 emit_connector_builder_messages=self._emit_connector_builder_messages, 166 ) 167 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 168 ] 169 170 return source_streams 171 172 @staticmethod 173 def _initialize_cache_for_parent_streams( 174 stream_configs: List[Dict[str, Any]], 175 ) -> List[Dict[str, Any]]: 176 parent_streams = set() 177 178 def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None: 179 for parent_config in parent_configs: 180 parent_streams.add(parent_config["stream"]["name"]) 181 if parent_config["stream"]["type"] == "StateDelegatingStream": 182 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 183 "use_cache" 184 ] = True 185 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 186 "use_cache" 187 ] = True 188 else: 189 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 190 191 for stream_config in stream_configs: 192 if stream_config.get("incremental_sync", {}).get("parent_stream"): 193 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 194 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 195 "use_cache" 196 ] = True 197 198 elif stream_config.get("retriever", {}).get("partition_router", {}): 199 partition_router = stream_config["retriever"]["partition_router"] 200 201 if isinstance(partition_router, dict) and partition_router.get( 202 "parent_stream_configs" 203 ): 204 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 205 elif isinstance(partition_router, list): 206 for router in partition_router: 207 if router.get("parent_stream_configs"): 208 update_with_cache_parent_configs(router["parent_stream_configs"]) 209 210 for stream_config in stream_configs: 211 if stream_config["name"] in parent_streams: 212 if stream_config["type"] == "StateDelegatingStream": 213 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 214 True 215 ) 216 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 217 True 218 ) 219 else: 220 stream_config["retriever"]["requester"]["use_cache"] = True 221 222 return stream_configs 223 224 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 225 """ 226 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 227 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 228 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 229 in the project root. 230 """ 231 self._configure_logger_level(logger) 232 self._emit_manifest_debug_message( 233 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 234 ) 235 236 spec = self._source_config.get("spec") 237 if spec: 238 if "type" not in spec: 239 spec["type"] = "Spec" 240 spec_component = self._constructor.create_component(SpecModel, spec, dict()) 241 return spec_component.generate_spec() 242 else: 243 return super().spec(logger) 244 245 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 246 self._configure_logger_level(logger) 247 return super().check(logger, config) 248 249 def read( 250 self, 251 logger: logging.Logger, 252 config: Mapping[str, Any], 253 catalog: ConfiguredAirbyteCatalog, 254 state: Optional[List[AirbyteStateMessage]] = None, 255 ) -> Iterator[AirbyteMessage]: 256 self._configure_logger_level(logger) 257 yield from super().read(logger, config, catalog, state) 258 259 def _configure_logger_level(self, logger: logging.Logger) -> None: 260 """ 261 Set the log level to logging.DEBUG if debug mode is enabled 262 """ 263 if self._debug: 264 logger.setLevel(logging.DEBUG) 265 266 def _validate_source(self) -> None: 267 """ 268 Validates the connector manifest against the declarative component schema 269 """ 270 try: 271 raw_component_schema = pkgutil.get_data( 272 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 273 ) 274 if raw_component_schema is not None: 275 declarative_component_schema = yaml.load( 276 raw_component_schema, Loader=yaml.SafeLoader 277 ) 278 else: 279 raise RuntimeError( 280 "Failed to read manifest component json schema required for validation" 281 ) 282 except FileNotFoundError as e: 283 raise FileNotFoundError( 284 f"Failed to read manifest component json schema required for validation: {e}" 285 ) 286 287 streams = self._source_config.get("streams") 288 dynamic_streams = self._source_config.get("dynamic_streams") 289 if not (streams or dynamic_streams): 290 raise ValidationError( 291 f"A valid manifest should have at least one stream defined. Got {streams}" 292 ) 293 294 try: 295 validate(self._source_config, declarative_component_schema) 296 except ValidationError as e: 297 raise ValidationError( 298 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 299 ) from e 300 301 cdk_version_str = metadata.version("airbyte_cdk") 302 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 303 manifest_version_str = self._source_config.get("version") 304 if manifest_version_str is None: 305 raise RuntimeError( 306 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 307 ) 308 manifest_version = self._parse_version(manifest_version_str, "manifest") 309 310 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 311 # Skipping version compatibility check on unreleased dev branch 312 pass 313 elif (cdk_version.major, cdk_version.minor) < ( 314 manifest_version.major, 315 manifest_version.minor, 316 ): 317 raise ValidationError( 318 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 319 f"manifest may contain features that are not in the current CDK version." 320 ) 321 elif (manifest_version.major, manifest_version.minor) < (0, 29): 322 raise ValidationError( 323 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 324 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 325 f"{cdk_version!s} which contains these breaking changes." 326 ) 327 328 @staticmethod 329 def _parse_version( 330 version: str, 331 version_type: str, 332 ) -> Version: 333 """Takes a semantic version represented as a string and splits it into a tuple. 334 335 The fourth part (prerelease) is not returned in the tuple. 336 337 Returns: 338 Version: the parsed version object 339 """ 340 try: 341 parsed_version = Version(version) 342 except InvalidVersion as ex: 343 raise ValidationError( 344 f"The {version_type} version '{version}' is not a valid version format." 345 ) from ex 346 else: 347 # No exception 348 return parsed_version 349 350 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 351 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 352 stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) 353 for s in stream_configs: 354 if "type" not in s: 355 s["type"] = "DeclarativeStream" 356 return stream_configs 357 358 def _dynamic_stream_configs( 359 self, 360 manifest: Mapping[str, Any], 361 config: Mapping[str, Any], 362 with_dynamic_stream_name: Optional[bool] = None, 363 ) -> List[Dict[str, Any]]: 364 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 365 dynamic_stream_configs: List[Dict[str, Any]] = [] 366 seen_dynamic_streams: Set[str] = set() 367 368 for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions): 369 components_resolver_config = dynamic_definition["components_resolver"] 370 371 if not components_resolver_config: 372 raise ValueError( 373 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 374 ) 375 376 resolver_type = components_resolver_config.get("type") 377 if not resolver_type: 378 raise ValueError( 379 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 380 ) 381 382 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 383 raise ValueError( 384 f"Invalid components resolver type '{resolver_type}'. " 385 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 386 ) 387 388 if "retriever" in components_resolver_config: 389 components_resolver_config["retriever"]["requester"]["use_cache"] = True 390 391 # Create a resolver for dynamic components based on type 392 components_resolver = self._constructor.create_component( 393 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config 394 ) 395 396 stream_template_config = dynamic_definition["stream_template"] 397 398 for dynamic_stream in components_resolver.resolve_components( 399 stream_template_config=stream_template_config 400 ): 401 dynamic_stream = { 402 **ManifestComponentTransformer().propagate_types_and_parameters( 403 "", dynamic_stream, {}, use_parent_parameters=True 404 ) 405 } 406 407 if "type" not in dynamic_stream: 408 dynamic_stream["type"] = "DeclarativeStream" 409 410 # Ensure that each stream is created with a unique name 411 name = dynamic_stream.get("name") 412 413 if with_dynamic_stream_name: 414 dynamic_stream["dynamic_stream_name"] = dynamic_definition.get( 415 "name", f"dynamic_stream_{dynamic_definition_index}" 416 ) 417 418 if not isinstance(name, str): 419 raise ValueError( 420 f"Expected stream name {name} to be a string, got {type(name)}." 421 ) 422 423 if name in seen_dynamic_streams: 424 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 425 failure_type = FailureType.system_error 426 427 if resolver_type == "ConfigComponentsResolver": 428 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 429 failure_type = FailureType.config_error 430 431 raise AirbyteTracedException( 432 message=error_message, 433 internal_message=error_message, 434 failure_type=failure_type, 435 ) 436 437 seen_dynamic_streams.add(name) 438 dynamic_stream_configs.append(dynamic_stream) 439 440 return dynamic_stream_configs 441 442 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 443 self.logger.debug("declarative source created from manifest", extra=extra_args)
Declarative source defined by a manifest of low-code components that define source connector behavior
64 def __init__( 65 self, 66 source_config: ConnectionDefinition, 67 *, 68 config: Mapping[str, Any] | None = None, 69 debug: bool = False, 70 emit_connector_builder_messages: bool = False, 71 component_factory: Optional[ModelToComponentFactory] = None, 72 ): 73 """ 74 Args: 75 config: The provided config dict. 76 source_config: The manifest of low-code components that describe the source connector. 77 debug: True if debug mode is enabled. 78 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 79 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 80 """ 81 self.logger = logging.getLogger(f"airbyte.{self.name}") 82 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 83 manifest = dict(source_config) 84 if "type" not in manifest: 85 manifest["type"] = "DeclarativeSource" 86 87 # If custom components are needed, locate and/or register them. 88 self.components_module: ModuleType | None = get_registered_components_module(config=config) 89 90 resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) 91 propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( 92 "", resolved_source_config, {} 93 ) 94 self._source_config = propagated_source_config 95 self._debug = debug 96 self._emit_connector_builder_messages = emit_connector_builder_messages 97 self._constructor = ( 98 component_factory 99 if component_factory 100 else ModelToComponentFactory( 101 emit_connector_builder_messages, 102 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 103 ) 104 ) 105 self._message_repository = self._constructor.get_message_repository() 106 self._slice_logger: SliceLogger = ( 107 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 108 ) 109 110 self._config = config or {} 111 self._validate_source()
Arguments:
- config: The provided config dict.
- source_config: The manifest of low-code components that describe the source connector.
- debug: True if debug mode is enabled.
- emit_connector_builder_messages: True if messages should be emitted to the connector builder.
- component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
127 @property 128 def connection_checker(self) -> ConnectionChecker: 129 check = self._source_config["check"] 130 if "type" not in check: 131 check["type"] = "CheckStream" 132 check_stream = self._constructor.create_component( 133 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 134 check, 135 dict(), 136 emit_connector_builder_messages=self._emit_connector_builder_messages, 137 ) 138 if isinstance(check_stream, ConnectionChecker): 139 return check_stream 140 else: 141 raise ValueError( 142 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 143 )
Returns the ConnectionChecker to use for the check
operation
145 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 146 self._emit_manifest_debug_message( 147 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 148 ) 149 150 stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 151 self._source_config, config 152 ) 153 154 api_budget_model = self._source_config.get("api_budget") 155 if api_budget_model: 156 self._constructor.set_api_budget(api_budget_model, config) 157 158 source_streams = [ 159 self._constructor.create_component( 160 StateDelegatingStreamModel 161 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 162 else DeclarativeStreamModel, 163 stream_config, 164 config, 165 emit_connector_builder_messages=self._emit_connector_builder_messages, 166 ) 167 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 168 ] 169 170 return source_streams
Parameters
- config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns
A list of the streams in this source connector.
224 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 225 """ 226 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 227 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 228 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 229 in the project root. 230 """ 231 self._configure_logger_level(logger) 232 self._emit_manifest_debug_message( 233 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 234 ) 235 236 spec = self._source_config.get("spec") 237 if spec: 238 if "type" not in spec: 239 spec["type"] = "Spec" 240 spec_component = self._constructor.create_component(SpecModel, spec, dict()) 241 return spec_component.generate_spec() 242 else: 243 return super().spec(logger)
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
245 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 246 self._configure_logger_level(logger) 247 return super().check(logger, config)
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
249 def read( 250 self, 251 logger: logging.Logger, 252 config: Mapping[str, Any], 253 catalog: ConfiguredAirbyteCatalog, 254 state: Optional[List[AirbyteStateMessage]] = None, 255 ) -> Iterator[AirbyteMessage]: 256 self._configure_logger_level(logger) 257 yield from super().read(logger, config, catalog, state)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.