airbyte_cdk.sources.declarative.manifest_declarative_source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import json 6import logging 7import pkgutil 8from copy import deepcopy 9from importlib import metadata 10from types import ModuleType 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set 12 13import yaml 14from jsonschema.exceptions import ValidationError 15from jsonschema.validators import validate 16from packaging.version import InvalidVersion, Version 17 18from airbyte_cdk.models import ( 19 AirbyteConnectionStatus, 20 AirbyteMessage, 21 AirbyteStateMessage, 22 ConfiguredAirbyteCatalog, 23 ConnectorSpecification, 24 FailureType, 25) 26from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING 27from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 28from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource 29from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 30 DeclarativeStream as DeclarativeStreamModel, 31) 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel 33from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( 34 StateDelegatingStream as StateDelegatingStreamModel, 35) 36from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( 37 get_registered_components_module, 38) 39from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( 40 ManifestComponentTransformer, 41) 42from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( 43 ManifestReferenceResolver, 44) 45from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( 46 ModelToComponentFactory, 47) 48from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING 49from airbyte_cdk.sources.message import MessageRepository 50from airbyte_cdk.sources.streams.core import Stream 51from airbyte_cdk.sources.types import ConnectionDefinition 52from airbyte_cdk.sources.utils.slice_logger import ( 53 AlwaysLogSliceLogger, 54 DebugSliceLogger, 55 SliceLogger, 56) 57from airbyte_cdk.utils.traced_exception import AirbyteTracedException 58 59 60class ManifestDeclarativeSource(DeclarativeSource): 61 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 62 63 def __init__( 64 self, 65 source_config: ConnectionDefinition, 66 *, 67 config: Mapping[str, Any] | None = None, 68 debug: bool = False, 69 emit_connector_builder_messages: bool = False, 70 component_factory: Optional[ModelToComponentFactory] = None, 71 ): 72 """ 73 Args: 74 config: The provided config dict. 75 source_config: The manifest of low-code components that describe the source connector. 76 debug: True if debug mode is enabled. 77 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 78 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 79 """ 80 self.logger = logging.getLogger(f"airbyte.{self.name}") 81 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 82 manifest = dict(source_config) 83 if "type" not in manifest: 84 manifest["type"] = "DeclarativeSource" 85 86 # If custom components are needed, locate and/or register them. 87 self.components_module: ModuleType | None = get_registered_components_module(config=config) 88 89 resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) 90 propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( 91 "", resolved_source_config, {} 92 ) 93 self._source_config = propagated_source_config 94 self._debug = debug 95 self._emit_connector_builder_messages = emit_connector_builder_messages 96 self._constructor = ( 97 component_factory 98 if component_factory 99 else ModelToComponentFactory( 100 emit_connector_builder_messages, 101 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 102 ) 103 ) 104 self._message_repository = self._constructor.get_message_repository() 105 self._slice_logger: SliceLogger = ( 106 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 107 ) 108 109 self._validate_source() 110 111 @property 112 def resolved_manifest(self) -> Mapping[str, Any]: 113 return self._source_config 114 115 @property 116 def message_repository(self) -> MessageRepository: 117 return self._message_repository 118 119 @property 120 def connection_checker(self) -> ConnectionChecker: 121 check = self._source_config["check"] 122 if "type" not in check: 123 check["type"] = "CheckStream" 124 check_stream = self._constructor.create_component( 125 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 126 check, 127 dict(), 128 emit_connector_builder_messages=self._emit_connector_builder_messages, 129 ) 130 if isinstance(check_stream, ConnectionChecker): 131 return check_stream 132 else: 133 raise ValueError( 134 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 135 ) 136 137 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 138 self._emit_manifest_debug_message( 139 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 140 ) 141 142 stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 143 self._source_config, config 144 ) 145 146 api_budget_model = self._source_config.get("api_budget") 147 if api_budget_model: 148 self._constructor.set_api_budget(api_budget_model, config) 149 150 source_streams = [ 151 self._constructor.create_component( 152 StateDelegatingStreamModel 153 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 154 else DeclarativeStreamModel, 155 stream_config, 156 config, 157 emit_connector_builder_messages=self._emit_connector_builder_messages, 158 ) 159 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 160 ] 161 162 return source_streams 163 164 @staticmethod 165 def _initialize_cache_for_parent_streams( 166 stream_configs: List[Dict[str, Any]], 167 ) -> List[Dict[str, Any]]: 168 parent_streams = set() 169 170 def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None: 171 for parent_config in parent_configs: 172 parent_streams.add(parent_config["stream"]["name"]) 173 if parent_config["stream"]["type"] == "StateDelegatingStream": 174 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 175 "use_cache" 176 ] = True 177 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 178 "use_cache" 179 ] = True 180 else: 181 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 182 183 for stream_config in stream_configs: 184 if stream_config.get("incremental_sync", {}).get("parent_stream"): 185 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 186 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 187 "use_cache" 188 ] = True 189 190 elif stream_config.get("retriever", {}).get("partition_router", {}): 191 partition_router = stream_config["retriever"]["partition_router"] 192 193 if isinstance(partition_router, dict) and partition_router.get( 194 "parent_stream_configs" 195 ): 196 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 197 elif isinstance(partition_router, list): 198 for router in partition_router: 199 if router.get("parent_stream_configs"): 200 update_with_cache_parent_configs(router["parent_stream_configs"]) 201 202 for stream_config in stream_configs: 203 if stream_config["name"] in parent_streams: 204 if stream_config["type"] == "StateDelegatingStream": 205 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 206 True 207 ) 208 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 209 True 210 ) 211 else: 212 stream_config["retriever"]["requester"]["use_cache"] = True 213 214 return stream_configs 215 216 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 217 """ 218 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 219 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 220 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 221 in the project root. 222 """ 223 self._configure_logger_level(logger) 224 self._emit_manifest_debug_message( 225 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 226 ) 227 228 spec = self._source_config.get("spec") 229 if spec: 230 if "type" not in spec: 231 spec["type"] = "Spec" 232 spec_component = self._constructor.create_component(SpecModel, spec, dict()) 233 return spec_component.generate_spec() 234 else: 235 return super().spec(logger) 236 237 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 238 self._configure_logger_level(logger) 239 return super().check(logger, config) 240 241 def read( 242 self, 243 logger: logging.Logger, 244 config: Mapping[str, Any], 245 catalog: ConfiguredAirbyteCatalog, 246 state: Optional[List[AirbyteStateMessage]] = None, 247 ) -> Iterator[AirbyteMessage]: 248 self._configure_logger_level(logger) 249 yield from super().read(logger, config, catalog, state) 250 251 def _configure_logger_level(self, logger: logging.Logger) -> None: 252 """ 253 Set the log level to logging.DEBUG if debug mode is enabled 254 """ 255 if self._debug: 256 logger.setLevel(logging.DEBUG) 257 258 def _validate_source(self) -> None: 259 """ 260 Validates the connector manifest against the declarative component schema 261 """ 262 try: 263 raw_component_schema = pkgutil.get_data( 264 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 265 ) 266 if raw_component_schema is not None: 267 declarative_component_schema = yaml.load( 268 raw_component_schema, Loader=yaml.SafeLoader 269 ) 270 else: 271 raise RuntimeError( 272 "Failed to read manifest component json schema required for validation" 273 ) 274 except FileNotFoundError as e: 275 raise FileNotFoundError( 276 f"Failed to read manifest component json schema required for validation: {e}" 277 ) 278 279 streams = self._source_config.get("streams") 280 dynamic_streams = self._source_config.get("dynamic_streams") 281 if not (streams or dynamic_streams): 282 raise ValidationError( 283 f"A valid manifest should have at least one stream defined. Got {streams}" 284 ) 285 286 try: 287 validate(self._source_config, declarative_component_schema) 288 except ValidationError as e: 289 raise ValidationError( 290 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 291 ) from e 292 293 cdk_version_str = metadata.version("airbyte_cdk") 294 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 295 manifest_version_str = self._source_config.get("version") 296 if manifest_version_str is None: 297 raise RuntimeError( 298 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 299 ) 300 manifest_version = self._parse_version(manifest_version_str, "manifest") 301 302 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 303 # Skipping version compatibility check on unreleased dev branch 304 pass 305 elif (cdk_version.major, cdk_version.minor) < ( 306 manifest_version.major, 307 manifest_version.minor, 308 ): 309 raise ValidationError( 310 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 311 f"manifest may contain features that are not in the current CDK version." 312 ) 313 elif (manifest_version.major, manifest_version.minor) < (0, 29): 314 raise ValidationError( 315 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 316 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 317 f"{cdk_version!s} which contains these breaking changes." 318 ) 319 320 @staticmethod 321 def _parse_version( 322 version: str, 323 version_type: str, 324 ) -> Version: 325 """Takes a semantic version represented as a string and splits it into a tuple. 326 327 The fourth part (prerelease) is not returned in the tuple. 328 329 Returns: 330 Version: the parsed version object 331 """ 332 try: 333 parsed_version = Version(version) 334 except InvalidVersion as ex: 335 raise ValidationError( 336 f"The {version_type} version '{version}' is not a valid version format." 337 ) from ex 338 else: 339 # No exception 340 return parsed_version 341 342 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 343 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 344 stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) 345 for s in stream_configs: 346 if "type" not in s: 347 s["type"] = "DeclarativeStream" 348 return stream_configs 349 350 def _dynamic_stream_configs( 351 self, manifest: Mapping[str, Any], config: Mapping[str, Any] 352 ) -> List[Dict[str, Any]]: 353 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 354 dynamic_stream_configs: List[Dict[str, Any]] = [] 355 seen_dynamic_streams: Set[str] = set() 356 357 for dynamic_definition in dynamic_stream_definitions: 358 components_resolver_config = dynamic_definition["components_resolver"] 359 360 if not components_resolver_config: 361 raise ValueError( 362 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 363 ) 364 365 resolver_type = components_resolver_config.get("type") 366 if not resolver_type: 367 raise ValueError( 368 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 369 ) 370 371 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 372 raise ValueError( 373 f"Invalid components resolver type '{resolver_type}'. " 374 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 375 ) 376 377 if "retriever" in components_resolver_config: 378 components_resolver_config["retriever"]["requester"]["use_cache"] = True 379 380 # Create a resolver for dynamic components based on type 381 components_resolver = self._constructor.create_component( 382 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config 383 ) 384 385 stream_template_config = dynamic_definition["stream_template"] 386 387 for dynamic_stream in components_resolver.resolve_components( 388 stream_template_config=stream_template_config 389 ): 390 if "type" not in dynamic_stream: 391 dynamic_stream["type"] = "DeclarativeStream" 392 393 # Ensure that each stream is created with a unique name 394 name = dynamic_stream.get("name") 395 396 if not isinstance(name, str): 397 raise ValueError( 398 f"Expected stream name {name} to be a string, got {type(name)}." 399 ) 400 401 if name in seen_dynamic_streams: 402 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 403 failure_type = FailureType.system_error 404 405 if resolver_type == "ConfigComponentsResolver": 406 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 407 failure_type = FailureType.config_error 408 409 raise AirbyteTracedException( 410 message=error_message, 411 internal_message=error_message, 412 failure_type=failure_type, 413 ) 414 415 seen_dynamic_streams.add(name) 416 dynamic_stream_configs.append(dynamic_stream) 417 418 return dynamic_stream_configs 419 420 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 421 self.logger.debug("declarative source created from manifest", extra=extra_args)
61class ManifestDeclarativeSource(DeclarativeSource): 62 """Declarative source defined by a manifest of low-code components that define source connector behavior""" 63 64 def __init__( 65 self, 66 source_config: ConnectionDefinition, 67 *, 68 config: Mapping[str, Any] | None = None, 69 debug: bool = False, 70 emit_connector_builder_messages: bool = False, 71 component_factory: Optional[ModelToComponentFactory] = None, 72 ): 73 """ 74 Args: 75 config: The provided config dict. 76 source_config: The manifest of low-code components that describe the source connector. 77 debug: True if debug mode is enabled. 78 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 79 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 80 """ 81 self.logger = logging.getLogger(f"airbyte.{self.name}") 82 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 83 manifest = dict(source_config) 84 if "type" not in manifest: 85 manifest["type"] = "DeclarativeSource" 86 87 # If custom components are needed, locate and/or register them. 88 self.components_module: ModuleType | None = get_registered_components_module(config=config) 89 90 resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) 91 propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( 92 "", resolved_source_config, {} 93 ) 94 self._source_config = propagated_source_config 95 self._debug = debug 96 self._emit_connector_builder_messages = emit_connector_builder_messages 97 self._constructor = ( 98 component_factory 99 if component_factory 100 else ModelToComponentFactory( 101 emit_connector_builder_messages, 102 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 103 ) 104 ) 105 self._message_repository = self._constructor.get_message_repository() 106 self._slice_logger: SliceLogger = ( 107 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 108 ) 109 110 self._validate_source() 111 112 @property 113 def resolved_manifest(self) -> Mapping[str, Any]: 114 return self._source_config 115 116 @property 117 def message_repository(self) -> MessageRepository: 118 return self._message_repository 119 120 @property 121 def connection_checker(self) -> ConnectionChecker: 122 check = self._source_config["check"] 123 if "type" not in check: 124 check["type"] = "CheckStream" 125 check_stream = self._constructor.create_component( 126 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 127 check, 128 dict(), 129 emit_connector_builder_messages=self._emit_connector_builder_messages, 130 ) 131 if isinstance(check_stream, ConnectionChecker): 132 return check_stream 133 else: 134 raise ValueError( 135 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 136 ) 137 138 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 139 self._emit_manifest_debug_message( 140 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 141 ) 142 143 stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 144 self._source_config, config 145 ) 146 147 api_budget_model = self._source_config.get("api_budget") 148 if api_budget_model: 149 self._constructor.set_api_budget(api_budget_model, config) 150 151 source_streams = [ 152 self._constructor.create_component( 153 StateDelegatingStreamModel 154 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 155 else DeclarativeStreamModel, 156 stream_config, 157 config, 158 emit_connector_builder_messages=self._emit_connector_builder_messages, 159 ) 160 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 161 ] 162 163 return source_streams 164 165 @staticmethod 166 def _initialize_cache_for_parent_streams( 167 stream_configs: List[Dict[str, Any]], 168 ) -> List[Dict[str, Any]]: 169 parent_streams = set() 170 171 def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None: 172 for parent_config in parent_configs: 173 parent_streams.add(parent_config["stream"]["name"]) 174 if parent_config["stream"]["type"] == "StateDelegatingStream": 175 parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][ 176 "use_cache" 177 ] = True 178 parent_config["stream"]["incremental_stream"]["retriever"]["requester"][ 179 "use_cache" 180 ] = True 181 else: 182 parent_config["stream"]["retriever"]["requester"]["use_cache"] = True 183 184 for stream_config in stream_configs: 185 if stream_config.get("incremental_sync", {}).get("parent_stream"): 186 parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"]) 187 stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][ 188 "use_cache" 189 ] = True 190 191 elif stream_config.get("retriever", {}).get("partition_router", {}): 192 partition_router = stream_config["retriever"]["partition_router"] 193 194 if isinstance(partition_router, dict) and partition_router.get( 195 "parent_stream_configs" 196 ): 197 update_with_cache_parent_configs(partition_router["parent_stream_configs"]) 198 elif isinstance(partition_router, list): 199 for router in partition_router: 200 if router.get("parent_stream_configs"): 201 update_with_cache_parent_configs(router["parent_stream_configs"]) 202 203 for stream_config in stream_configs: 204 if stream_config["name"] in parent_streams: 205 if stream_config["type"] == "StateDelegatingStream": 206 stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = ( 207 True 208 ) 209 stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = ( 210 True 211 ) 212 else: 213 stream_config["retriever"]["requester"]["use_cache"] = True 214 215 return stream_configs 216 217 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 218 """ 219 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 220 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 221 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 222 in the project root. 223 """ 224 self._configure_logger_level(logger) 225 self._emit_manifest_debug_message( 226 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 227 ) 228 229 spec = self._source_config.get("spec") 230 if spec: 231 if "type" not in spec: 232 spec["type"] = "Spec" 233 spec_component = self._constructor.create_component(SpecModel, spec, dict()) 234 return spec_component.generate_spec() 235 else: 236 return super().spec(logger) 237 238 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 239 self._configure_logger_level(logger) 240 return super().check(logger, config) 241 242 def read( 243 self, 244 logger: logging.Logger, 245 config: Mapping[str, Any], 246 catalog: ConfiguredAirbyteCatalog, 247 state: Optional[List[AirbyteStateMessage]] = None, 248 ) -> Iterator[AirbyteMessage]: 249 self._configure_logger_level(logger) 250 yield from super().read(logger, config, catalog, state) 251 252 def _configure_logger_level(self, logger: logging.Logger) -> None: 253 """ 254 Set the log level to logging.DEBUG if debug mode is enabled 255 """ 256 if self._debug: 257 logger.setLevel(logging.DEBUG) 258 259 def _validate_source(self) -> None: 260 """ 261 Validates the connector manifest against the declarative component schema 262 """ 263 try: 264 raw_component_schema = pkgutil.get_data( 265 "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml" 266 ) 267 if raw_component_schema is not None: 268 declarative_component_schema = yaml.load( 269 raw_component_schema, Loader=yaml.SafeLoader 270 ) 271 else: 272 raise RuntimeError( 273 "Failed to read manifest component json schema required for validation" 274 ) 275 except FileNotFoundError as e: 276 raise FileNotFoundError( 277 f"Failed to read manifest component json schema required for validation: {e}" 278 ) 279 280 streams = self._source_config.get("streams") 281 dynamic_streams = self._source_config.get("dynamic_streams") 282 if not (streams or dynamic_streams): 283 raise ValidationError( 284 f"A valid manifest should have at least one stream defined. Got {streams}" 285 ) 286 287 try: 288 validate(self._source_config, declarative_component_schema) 289 except ValidationError as e: 290 raise ValidationError( 291 "Validation against json schema defined in declarative_component_schema.yaml schema failed" 292 ) from e 293 294 cdk_version_str = metadata.version("airbyte_cdk") 295 cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk") 296 manifest_version_str = self._source_config.get("version") 297 if manifest_version_str is None: 298 raise RuntimeError( 299 "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support." 300 ) 301 manifest_version = self._parse_version(manifest_version_str, "manifest") 302 303 if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0): 304 # Skipping version compatibility check on unreleased dev branch 305 pass 306 elif (cdk_version.major, cdk_version.minor) < ( 307 manifest_version.major, 308 manifest_version.minor, 309 ): 310 raise ValidationError( 311 f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your " 312 f"manifest may contain features that are not in the current CDK version." 313 ) 314 elif (manifest_version.major, manifest_version.minor) < (0, 29): 315 raise ValidationError( 316 f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the " 317 f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version " 318 f"{cdk_version!s} which contains these breaking changes." 319 ) 320 321 @staticmethod 322 def _parse_version( 323 version: str, 324 version_type: str, 325 ) -> Version: 326 """Takes a semantic version represented as a string and splits it into a tuple. 327 328 The fourth part (prerelease) is not returned in the tuple. 329 330 Returns: 331 Version: the parsed version object 332 """ 333 try: 334 parsed_version = Version(version) 335 except InvalidVersion as ex: 336 raise ValidationError( 337 f"The {version_type} version '{version}' is not a valid version format." 338 ) from ex 339 else: 340 # No exception 341 return parsed_version 342 343 def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: 344 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config 345 stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) 346 for s in stream_configs: 347 if "type" not in s: 348 s["type"] = "DeclarativeStream" 349 return stream_configs 350 351 def _dynamic_stream_configs( 352 self, manifest: Mapping[str, Any], config: Mapping[str, Any] 353 ) -> List[Dict[str, Any]]: 354 dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) 355 dynamic_stream_configs: List[Dict[str, Any]] = [] 356 seen_dynamic_streams: Set[str] = set() 357 358 for dynamic_definition in dynamic_stream_definitions: 359 components_resolver_config = dynamic_definition["components_resolver"] 360 361 if not components_resolver_config: 362 raise ValueError( 363 f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" 364 ) 365 366 resolver_type = components_resolver_config.get("type") 367 if not resolver_type: 368 raise ValueError( 369 f"Missing 'type' in components resolver configuration: {components_resolver_config}" 370 ) 371 372 if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: 373 raise ValueError( 374 f"Invalid components resolver type '{resolver_type}'. " 375 f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." 376 ) 377 378 if "retriever" in components_resolver_config: 379 components_resolver_config["retriever"]["requester"]["use_cache"] = True 380 381 # Create a resolver for dynamic components based on type 382 components_resolver = self._constructor.create_component( 383 COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config 384 ) 385 386 stream_template_config = dynamic_definition["stream_template"] 387 388 for dynamic_stream in components_resolver.resolve_components( 389 stream_template_config=stream_template_config 390 ): 391 if "type" not in dynamic_stream: 392 dynamic_stream["type"] = "DeclarativeStream" 393 394 # Ensure that each stream is created with a unique name 395 name = dynamic_stream.get("name") 396 397 if not isinstance(name, str): 398 raise ValueError( 399 f"Expected stream name {name} to be a string, got {type(name)}." 400 ) 401 402 if name in seen_dynamic_streams: 403 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support." 404 failure_type = FailureType.system_error 405 406 if resolver_type == "ConfigComponentsResolver": 407 error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration." 408 failure_type = FailureType.config_error 409 410 raise AirbyteTracedException( 411 message=error_message, 412 internal_message=error_message, 413 failure_type=failure_type, 414 ) 415 416 seen_dynamic_streams.add(name) 417 dynamic_stream_configs.append(dynamic_stream) 418 419 return dynamic_stream_configs 420 421 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 422 self.logger.debug("declarative source created from manifest", extra=extra_args)
Declarative source defined by a manifest of low-code components that define source connector behavior
64 def __init__( 65 self, 66 source_config: ConnectionDefinition, 67 *, 68 config: Mapping[str, Any] | None = None, 69 debug: bool = False, 70 emit_connector_builder_messages: bool = False, 71 component_factory: Optional[ModelToComponentFactory] = None, 72 ): 73 """ 74 Args: 75 config: The provided config dict. 76 source_config: The manifest of low-code components that describe the source connector. 77 debug: True if debug mode is enabled. 78 emit_connector_builder_messages: True if messages should be emitted to the connector builder. 79 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. 80 """ 81 self.logger = logging.getLogger(f"airbyte.{self.name}") 82 # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing 83 manifest = dict(source_config) 84 if "type" not in manifest: 85 manifest["type"] = "DeclarativeSource" 86 87 # If custom components are needed, locate and/or register them. 88 self.components_module: ModuleType | None = get_registered_components_module(config=config) 89 90 resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) 91 propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( 92 "", resolved_source_config, {} 93 ) 94 self._source_config = propagated_source_config 95 self._debug = debug 96 self._emit_connector_builder_messages = emit_connector_builder_messages 97 self._constructor = ( 98 component_factory 99 if component_factory 100 else ModelToComponentFactory( 101 emit_connector_builder_messages, 102 max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"), 103 ) 104 ) 105 self._message_repository = self._constructor.get_message_repository() 106 self._slice_logger: SliceLogger = ( 107 AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger() 108 ) 109 110 self._validate_source()
Arguments:
- config: The provided config dict.
- source_config: The manifest of low-code components that describe the source connector.
- debug: True if debug mode is enabled.
- emit_connector_builder_messages: True if messages should be emitted to the connector builder.
- component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
120 @property 121 def connection_checker(self) -> ConnectionChecker: 122 check = self._source_config["check"] 123 if "type" not in check: 124 check["type"] = "CheckStream" 125 check_stream = self._constructor.create_component( 126 COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], 127 check, 128 dict(), 129 emit_connector_builder_messages=self._emit_connector_builder_messages, 130 ) 131 if isinstance(check_stream, ConnectionChecker): 132 return check_stream 133 else: 134 raise ValueError( 135 f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" 136 )
Returns the ConnectionChecker to use for the check
operation
138 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 139 self._emit_manifest_debug_message( 140 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 141 ) 142 143 stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( 144 self._source_config, config 145 ) 146 147 api_budget_model = self._source_config.get("api_budget") 148 if api_budget_model: 149 self._constructor.set_api_budget(api_budget_model, config) 150 151 source_streams = [ 152 self._constructor.create_component( 153 StateDelegatingStreamModel 154 if stream_config.get("type") == StateDelegatingStreamModel.__name__ 155 else DeclarativeStreamModel, 156 stream_config, 157 config, 158 emit_connector_builder_messages=self._emit_connector_builder_messages, 159 ) 160 for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) 161 ] 162 163 return source_streams
Parameters
- config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns
A list of the streams in this source connector.
217 def spec(self, logger: logging.Logger) -> ConnectorSpecification: 218 """ 219 Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible 220 configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this 221 will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" 222 in the project root. 223 """ 224 self._configure_logger_level(logger) 225 self._emit_manifest_debug_message( 226 extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} 227 ) 228 229 spec = self._source_config.get("spec") 230 if spec: 231 if "type" not in spec: 232 spec["type"] = "Spec" 233 spec_component = self._constructor.create_component(SpecModel, spec, dict()) 234 return spec_component.generate_spec() 235 else: 236 return super().spec(logger)
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.
238 def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: 239 self._configure_logger_level(logger) 240 return super().check(logger, config)
Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
242 def read( 243 self, 244 logger: logging.Logger, 245 config: Mapping[str, Any], 246 catalog: ConfiguredAirbyteCatalog, 247 state: Optional[List[AirbyteStateMessage]] = None, 248 ) -> Iterator[AirbyteMessage]: 249 self._configure_logger_level(logger) 250 yield from super().read(logger, config, catalog, state)
Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.