airbyte_cdk.sources.declarative.manifest_declarative_source

  1#
  2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  3#
  4
  5import json
  6import logging
  7import pkgutil
  8from copy import deepcopy
  9from importlib import metadata
 10from types import ModuleType
 11from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set
 12
 13import orjson
 14import yaml
 15from jsonschema.exceptions import ValidationError
 16from jsonschema.validators import validate
 17from packaging.version import InvalidVersion, Version
 18
 19from airbyte_cdk.config_observation import create_connector_config_control_message
 20from airbyte_cdk.connector_builder.models import (
 21    LogMessage as ConnectorBuilderLogMessage,
 22)
 23from airbyte_cdk.manifest_migrations.migration_handler import (
 24    ManifestMigrationHandler,
 25)
 26from airbyte_cdk.models import (
 27    AirbyteConnectionStatus,
 28    AirbyteMessage,
 29    AirbyteStateMessage,
 30    ConfiguredAirbyteCatalog,
 31    ConnectorSpecification,
 32    FailureType,
 33)
 34from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
 35from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 36from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 37from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 39    DeclarativeStream as DeclarativeStreamModel,
 40)
 41from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 42    Spec as SpecModel,
 43)
 44from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 45    StateDelegatingStream as StateDelegatingStreamModel,
 46)
 47from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 48    get_registered_components_module,
 49)
 50from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 51    ManifestComponentTransformer,
 52)
 53from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
 54    ManifestNormalizer,
 55)
 56from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 57    ManifestReferenceResolver,
 58)
 59from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 60    ModelToComponentFactory,
 61)
 62from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 63from airbyte_cdk.sources.declarative.spec.spec import Spec
 64from airbyte_cdk.sources.message import MessageRepository
 65from airbyte_cdk.sources.streams.core import Stream
 66from airbyte_cdk.sources.types import Config, ConnectionDefinition
 67from airbyte_cdk.sources.utils.slice_logger import (
 68    AlwaysLogSliceLogger,
 69    DebugSliceLogger,
 70    SliceLogger,
 71)
 72from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 73
 74
 75def _get_declarative_component_schema() -> Dict[str, Any]:
 76    try:
 77        raw_component_schema = pkgutil.get_data(
 78            "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
 79        )
 80        if raw_component_schema is not None:
 81            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
 82            return declarative_component_schema  # type: ignore
 83        else:
 84            raise RuntimeError(
 85                "Failed to read manifest component json schema required for deduplication"
 86            )
 87    except FileNotFoundError as e:
 88        raise FileNotFoundError(
 89            f"Failed to read manifest component json schema required for deduplication: {e}"
 90        )
 91
 92
 93class ManifestDeclarativeSource(DeclarativeSource):
 94    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
 95
 96    def __init__(
 97        self,
 98        source_config: ConnectionDefinition,
 99        *,
100        config: Mapping[str, Any] | None = None,
101        debug: bool = False,
102        emit_connector_builder_messages: bool = False,
103        component_factory: Optional[ModelToComponentFactory] = None,
104        migrate_manifest: Optional[bool] = False,
105        normalize_manifest: Optional[bool] = False,
106        config_path: Optional[str] = None,
107    ) -> None:
108        """
109        Args:
110            config: The provided config dict.
111            source_config: The manifest of low-code components that describe the source connector.
112            debug: True if debug mode is enabled.
113            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
114            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
115            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
116            config_path: Optional path to the config file.
117        """
118        self.logger = logging.getLogger(f"airbyte.{self.name}")
119        self._should_normalize = normalize_manifest
120        self._should_migrate = migrate_manifest
121        self._declarative_component_schema = _get_declarative_component_schema()
122        # If custom components are needed, locate and/or register them.
123        self.components_module: ModuleType | None = get_registered_components_module(config=config)
124        # set additional attributes
125        self._debug = debug
126        self._emit_connector_builder_messages = emit_connector_builder_messages
127        self._constructor = (
128            component_factory
129            if component_factory
130            else ModelToComponentFactory(
131                emit_connector_builder_messages,
132                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
133            )
134        )
135        self._message_repository = self._constructor.get_message_repository()
136        self._slice_logger: SliceLogger = (
137            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
138        )
139
140        # resolve all components in the manifest
141        self._source_config = self._pre_process_manifest(dict(source_config))
142        # validate resolved manifest against the declarative component schema
143        self._validate_source()
144        # apply additional post-processing to the manifest
145        self._post_process_manifest()
146
147        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
148        self._spec_component: Optional[Spec] = (
149            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
150        )
151        self._config = self._migrate_and_transform_config(config_path, config) or {}
152
153    @property
154    def resolved_manifest(self) -> Mapping[str, Any]:
155        """
156        Returns the resolved manifest configuration for the source.
157
158        This property provides access to the internal source configuration as a mapping,
159        which contains all settings and parameters required to define the source's behavior.
160
161        Returns:
162            Mapping[str, Any]: The resolved source configuration manifest.
163        """
164        return self._source_config
165
166    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
167        """
168        Preprocesses the provided manifest dictionary by resolving any manifest references.
169
170        This method modifies the input manifest in place, resolving references using the
171        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
172
173        Args:
174            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
175
176        Returns:
177            None
178        """
179        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
180        manifest = self._fix_source_type(manifest)
181        # Resolve references in the manifest
182        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
183        # Propagate types and parameters throughout the manifest
184        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
185            "", resolved_manifest, {}
186        )
187
188        return propagated_manifest
189
190    def _post_process_manifest(self) -> None:
191        """
192        Post-processes the manifest after validation.
193        This method is responsible for any additional modifications or transformations needed
194        after the manifest has been validated and before it is used in the source.
195        """
196        # apply manifest migration, if required
197        self._migrate_manifest()
198        # apply manifest normalization, if required
199        self._normalize_manifest()
200
201    def _normalize_manifest(self) -> None:
202        """
203        This method is used to normalize the manifest. It should be called after the manifest has been validated.
204
205        Connector Builder UI rendering requires the manifest to be in a specific format.
206         - references have been resolved
207         - the commonly used definitions are extracted to the `definitions.linked.*`
208        """
209        if self._should_normalize:
210            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
211            self._source_config = normalizer.normalize()
212
213    def _migrate_and_transform_config(
214        self,
215        config_path: Optional[str],
216        config: Optional[Config],
217    ) -> Optional[Config]:
218        if not config:
219            return None
220        if not self._spec_component:
221            return config
222        mutable_config = dict(config)
223        self._spec_component.migrate_config(mutable_config)
224        if mutable_config != config:
225            if config_path:
226                with open(config_path, "w") as f:
227                    json.dump(mutable_config, f)
228            self.message_repository.emit_message(
229                create_connector_config_control_message(mutable_config)
230            )
231            # We have no mechanism for consuming the queue, so we print the messages to stdout
232            for message in self.message_repository.consume_queue():
233                print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
234        self._spec_component.transform_config(mutable_config)
235        return mutable_config
236
237    def _migrate_manifest(self) -> None:
238        """
239        This method is used to migrate the manifest. It should be called after the manifest has been validated.
240        The migration is done in place, so the original manifest is modified.
241
242        The original manifest is returned if any error occurs during migration.
243        """
244        if self._should_migrate:
245            manifest_migrator = ManifestMigrationHandler(self._source_config)
246            self._source_config = manifest_migrator.apply_migrations()
247            # validate migrated manifest against the declarative component schema
248            self._validate_source()
249
250    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
251        """
252        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
253        """
254        if "type" not in manifest:
255            manifest["type"] = "DeclarativeSource"
256
257        return manifest
258
259    @property
260    def message_repository(self) -> MessageRepository:
261        return self._message_repository
262
263    @property
264    def dynamic_streams(self) -> List[Dict[str, Any]]:
265        return self._dynamic_stream_configs(
266            manifest=self._source_config,
267            config=self._config,
268            with_dynamic_stream_name=True,
269        )
270
271    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
272        return self._constructor.get_model_deprecations()
273
274    @property
275    def connection_checker(self) -> ConnectionChecker:
276        check = self._source_config["check"]
277        if "type" not in check:
278            check["type"] = "CheckStream"
279        check_stream = self._constructor.create_component(
280            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
281            check,
282            dict(),
283            emit_connector_builder_messages=self._emit_connector_builder_messages,
284        )
285        if isinstance(check_stream, ConnectionChecker):
286            return check_stream
287        else:
288            raise ValueError(
289                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
290            )
291
292    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
293        if self._spec_component:
294            self._spec_component.validate_config(config)
295
296        self._emit_manifest_debug_message(
297            extra_args={
298                "source_name": self.name,
299                "parsed_config": json.dumps(self._source_config),
300            }
301        )
302
303        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
304
305        api_budget_model = self._source_config.get("api_budget")
306        if api_budget_model:
307            self._constructor.set_api_budget(api_budget_model, config)
308
309        source_streams = [
310            self._constructor.create_component(
311                (
312                    StateDelegatingStreamModel
313                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
314                    else DeclarativeStreamModel
315                ),
316                stream_config,
317                config,
318                emit_connector_builder_messages=self._emit_connector_builder_messages,
319            )
320            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
321        ]
322
323        return source_streams
324
325    @staticmethod
326    def _initialize_cache_for_parent_streams(
327        stream_configs: List[Dict[str, Any]],
328    ) -> List[Dict[str, Any]]:
329        parent_streams = set()
330
331        def update_with_cache_parent_configs(
332            parent_configs: list[dict[str, Any]],
333        ) -> None:
334            for parent_config in parent_configs:
335                parent_streams.add(parent_config["stream"]["name"])
336                if parent_config["stream"]["type"] == "StateDelegatingStream":
337                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
338                        "use_cache"
339                    ] = True
340                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
341                        "use_cache"
342                    ] = True
343                else:
344                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
345
346        for stream_config in stream_configs:
347            if stream_config.get("incremental_sync", {}).get("parent_stream"):
348                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
349                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
350                    "use_cache"
351                ] = True
352
353            elif stream_config.get("retriever", {}).get("partition_router", {}):
354                partition_router = stream_config["retriever"]["partition_router"]
355
356                if isinstance(partition_router, dict) and partition_router.get(
357                    "parent_stream_configs"
358                ):
359                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
360                elif isinstance(partition_router, list):
361                    for router in partition_router:
362                        if router.get("parent_stream_configs"):
363                            update_with_cache_parent_configs(router["parent_stream_configs"])
364
365        for stream_config in stream_configs:
366            if stream_config["name"] in parent_streams:
367                if stream_config["type"] == "StateDelegatingStream":
368                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
369                        True
370                    )
371                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
372                        True
373                    )
374                else:
375                    stream_config["retriever"]["requester"]["use_cache"] = True
376
377        return stream_configs
378
379    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
380        """
381        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
382        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
383        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
384        in the project root.
385        """
386        self._configure_logger_level(logger)
387        self._emit_manifest_debug_message(
388            extra_args={
389                "source_name": self.name,
390                "parsed_config": json.dumps(self._source_config),
391            }
392        )
393
394        return (
395            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
396        )
397
398    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
399        self._configure_logger_level(logger)
400        return super().check(logger, config)
401
402    def read(
403        self,
404        logger: logging.Logger,
405        config: Mapping[str, Any],
406        catalog: ConfiguredAirbyteCatalog,
407        state: Optional[List[AirbyteStateMessage]] = None,
408    ) -> Iterator[AirbyteMessage]:
409        self._configure_logger_level(logger)
410        yield from super().read(logger, config, catalog, state)
411
412    def _configure_logger_level(self, logger: logging.Logger) -> None:
413        """
414        Set the log level to logging.DEBUG if debug mode is enabled
415        """
416        if self._debug:
417            logger.setLevel(logging.DEBUG)
418
419    def _validate_source(self) -> None:
420        """
421        Validates the connector manifest against the declarative component schema
422        """
423
424        try:
425            validate(self._source_config, self._declarative_component_schema)
426        except ValidationError as e:
427            raise ValidationError(
428                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
429            ) from e
430
431        cdk_version_str = metadata.version("airbyte_cdk")
432        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
433        manifest_version_str = self._source_config.get("version")
434        if manifest_version_str is None:
435            raise RuntimeError(
436                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
437            )
438        manifest_version = self._parse_version(manifest_version_str, "manifest")
439
440        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
441            # Skipping version compatibility check on unreleased dev branch
442            pass
443        elif (cdk_version.major, cdk_version.minor) < (
444            manifest_version.major,
445            manifest_version.minor,
446        ):
447            raise ValidationError(
448                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
449                f"manifest may contain features that are not in the current CDK version."
450            )
451        elif (manifest_version.major, manifest_version.minor) < (0, 29):
452            raise ValidationError(
453                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
454                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
455                f"{cdk_version!s} which contains these breaking changes."
456            )
457
458    @staticmethod
459    def _parse_version(
460        version: str,
461        version_type: str,
462    ) -> Version:
463        """Takes a semantic version represented as a string and splits it into a tuple.
464
465        The fourth part (prerelease) is not returned in the tuple.
466
467        Returns:
468            Version: the parsed version object
469        """
470        try:
471            parsed_version = Version(version)
472        except InvalidVersion as ex:
473            raise ValidationError(
474                f"The {version_type} version '{version}' is not a valid version format."
475            ) from ex
476        else:
477            # No exception
478            return parsed_version
479
480    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
481        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
482        stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
483        for s in stream_configs:
484            if "type" not in s:
485                s["type"] = "DeclarativeStream"
486        return stream_configs
487
488    def _dynamic_stream_configs(
489        self,
490        manifest: Mapping[str, Any],
491        config: Mapping[str, Any],
492        with_dynamic_stream_name: Optional[bool] = None,
493    ) -> List[Dict[str, Any]]:
494        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
495        dynamic_stream_configs: List[Dict[str, Any]] = []
496        seen_dynamic_streams: Set[str] = set()
497
498        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
499            components_resolver_config = dynamic_definition["components_resolver"]
500
501            if not components_resolver_config:
502                raise ValueError(
503                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
504                )
505
506            resolver_type = components_resolver_config.get("type")
507            if not resolver_type:
508                raise ValueError(
509                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
510                )
511
512            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
513                raise ValueError(
514                    f"Invalid components resolver type '{resolver_type}'. "
515                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
516                )
517
518            if "retriever" in components_resolver_config:
519                components_resolver_config["retriever"]["requester"]["use_cache"] = True
520
521            # Create a resolver for dynamic components based on type
522            components_resolver = self._constructor.create_component(
523                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
524                components_resolver_config,
525                config,
526            )
527
528            stream_template_config = dynamic_definition["stream_template"]
529
530            for dynamic_stream in components_resolver.resolve_components(
531                stream_template_config=stream_template_config
532            ):
533                dynamic_stream = {
534                    **ManifestComponentTransformer().propagate_types_and_parameters(
535                        "", dynamic_stream, {}, use_parent_parameters=True
536                    )
537                }
538
539                if "type" not in dynamic_stream:
540                    dynamic_stream["type"] = "DeclarativeStream"
541
542                # Ensure that each stream is created with a unique name
543                name = dynamic_stream.get("name")
544
545                if with_dynamic_stream_name:
546                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
547                        "name", f"dynamic_stream_{dynamic_definition_index}"
548                    )
549
550                if not isinstance(name, str):
551                    raise ValueError(
552                        f"Expected stream name {name} to be a string, got {type(name)}."
553                    )
554
555                if name in seen_dynamic_streams:
556                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
557                    failure_type = FailureType.system_error
558
559                    if resolver_type == "ConfigComponentsResolver":
560                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
561                        failure_type = FailureType.config_error
562
563                    raise AirbyteTracedException(
564                        message=error_message,
565                        internal_message=error_message,
566                        failure_type=failure_type,
567                    )
568
569                seen_dynamic_streams.add(name)
570                dynamic_stream_configs.append(dynamic_stream)
571
572        return dynamic_stream_configs
573
574    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
575        self.logger.debug("declarative source created from manifest", extra=extra_args)
 94class ManifestDeclarativeSource(DeclarativeSource):
 95    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
 96
 97    def __init__(
 98        self,
 99        source_config: ConnectionDefinition,
100        *,
101        config: Mapping[str, Any] | None = None,
102        debug: bool = False,
103        emit_connector_builder_messages: bool = False,
104        component_factory: Optional[ModelToComponentFactory] = None,
105        migrate_manifest: Optional[bool] = False,
106        normalize_manifest: Optional[bool] = False,
107        config_path: Optional[str] = None,
108    ) -> None:
109        """
110        Args:
111            config: The provided config dict.
112            source_config: The manifest of low-code components that describe the source connector.
113            debug: True if debug mode is enabled.
114            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
115            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
116            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
117            config_path: Optional path to the config file.
118        """
119        self.logger = logging.getLogger(f"airbyte.{self.name}")
120        self._should_normalize = normalize_manifest
121        self._should_migrate = migrate_manifest
122        self._declarative_component_schema = _get_declarative_component_schema()
123        # If custom components are needed, locate and/or register them.
124        self.components_module: ModuleType | None = get_registered_components_module(config=config)
125        # set additional attributes
126        self._debug = debug
127        self._emit_connector_builder_messages = emit_connector_builder_messages
128        self._constructor = (
129            component_factory
130            if component_factory
131            else ModelToComponentFactory(
132                emit_connector_builder_messages,
133                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
134            )
135        )
136        self._message_repository = self._constructor.get_message_repository()
137        self._slice_logger: SliceLogger = (
138            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
139        )
140
141        # resolve all components in the manifest
142        self._source_config = self._pre_process_manifest(dict(source_config))
143        # validate resolved manifest against the declarative component schema
144        self._validate_source()
145        # apply additional post-processing to the manifest
146        self._post_process_manifest()
147
148        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
149        self._spec_component: Optional[Spec] = (
150            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
151        )
152        self._config = self._migrate_and_transform_config(config_path, config) or {}
153
154    @property
155    def resolved_manifest(self) -> Mapping[str, Any]:
156        """
157        Returns the resolved manifest configuration for the source.
158
159        This property provides access to the internal source configuration as a mapping,
160        which contains all settings and parameters required to define the source's behavior.
161
162        Returns:
163            Mapping[str, Any]: The resolved source configuration manifest.
164        """
165        return self._source_config
166
167    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
168        """
169        Preprocesses the provided manifest dictionary by resolving any manifest references.
170
171        This method modifies the input manifest in place, resolving references using the
172        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
173
174        Args:
175            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
176
177        Returns:
178            None
179        """
180        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
181        manifest = self._fix_source_type(manifest)
182        # Resolve references in the manifest
183        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
184        # Propagate types and parameters throughout the manifest
185        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
186            "", resolved_manifest, {}
187        )
188
189        return propagated_manifest
190
191    def _post_process_manifest(self) -> None:
192        """
193        Post-processes the manifest after validation.
194        This method is responsible for any additional modifications or transformations needed
195        after the manifest has been validated and before it is used in the source.
196        """
197        # apply manifest migration, if required
198        self._migrate_manifest()
199        # apply manifest normalization, if required
200        self._normalize_manifest()
201
202    def _normalize_manifest(self) -> None:
203        """
204        This method is used to normalize the manifest. It should be called after the manifest has been validated.
205
206        Connector Builder UI rendering requires the manifest to be in a specific format.
207         - references have been resolved
208         - the commonly used definitions are extracted to the `definitions.linked.*`
209        """
210        if self._should_normalize:
211            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
212            self._source_config = normalizer.normalize()
213
214    def _migrate_and_transform_config(
215        self,
216        config_path: Optional[str],
217        config: Optional[Config],
218    ) -> Optional[Config]:
219        if not config:
220            return None
221        if not self._spec_component:
222            return config
223        mutable_config = dict(config)
224        self._spec_component.migrate_config(mutable_config)
225        if mutable_config != config:
226            if config_path:
227                with open(config_path, "w") as f:
228                    json.dump(mutable_config, f)
229            self.message_repository.emit_message(
230                create_connector_config_control_message(mutable_config)
231            )
232            # We have no mechanism for consuming the queue, so we print the messages to stdout
233            for message in self.message_repository.consume_queue():
234                print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
235        self._spec_component.transform_config(mutable_config)
236        return mutable_config
237
238    def _migrate_manifest(self) -> None:
239        """
240        This method is used to migrate the manifest. It should be called after the manifest has been validated.
241        The migration is done in place, so the original manifest is modified.
242
243        The original manifest is returned if any error occurs during migration.
244        """
245        if self._should_migrate:
246            manifest_migrator = ManifestMigrationHandler(self._source_config)
247            self._source_config = manifest_migrator.apply_migrations()
248            # validate migrated manifest against the declarative component schema
249            self._validate_source()
250
251    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
252        """
253        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
254        """
255        if "type" not in manifest:
256            manifest["type"] = "DeclarativeSource"
257
258        return manifest
259
260    @property
261    def message_repository(self) -> MessageRepository:
262        return self._message_repository
263
264    @property
265    def dynamic_streams(self) -> List[Dict[str, Any]]:
266        return self._dynamic_stream_configs(
267            manifest=self._source_config,
268            config=self._config,
269            with_dynamic_stream_name=True,
270        )
271
272    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
273        return self._constructor.get_model_deprecations()
274
275    @property
276    def connection_checker(self) -> ConnectionChecker:
277        check = self._source_config["check"]
278        if "type" not in check:
279            check["type"] = "CheckStream"
280        check_stream = self._constructor.create_component(
281            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
282            check,
283            dict(),
284            emit_connector_builder_messages=self._emit_connector_builder_messages,
285        )
286        if isinstance(check_stream, ConnectionChecker):
287            return check_stream
288        else:
289            raise ValueError(
290                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
291            )
292
293    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
294        if self._spec_component:
295            self._spec_component.validate_config(config)
296
297        self._emit_manifest_debug_message(
298            extra_args={
299                "source_name": self.name,
300                "parsed_config": json.dumps(self._source_config),
301            }
302        )
303
304        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
305
306        api_budget_model = self._source_config.get("api_budget")
307        if api_budget_model:
308            self._constructor.set_api_budget(api_budget_model, config)
309
310        source_streams = [
311            self._constructor.create_component(
312                (
313                    StateDelegatingStreamModel
314                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
315                    else DeclarativeStreamModel
316                ),
317                stream_config,
318                config,
319                emit_connector_builder_messages=self._emit_connector_builder_messages,
320            )
321            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
322        ]
323
324        return source_streams
325
326    @staticmethod
327    def _initialize_cache_for_parent_streams(
328        stream_configs: List[Dict[str, Any]],
329    ) -> List[Dict[str, Any]]:
330        parent_streams = set()
331
332        def update_with_cache_parent_configs(
333            parent_configs: list[dict[str, Any]],
334        ) -> None:
335            for parent_config in parent_configs:
336                parent_streams.add(parent_config["stream"]["name"])
337                if parent_config["stream"]["type"] == "StateDelegatingStream":
338                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
339                        "use_cache"
340                    ] = True
341                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
342                        "use_cache"
343                    ] = True
344                else:
345                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
346
347        for stream_config in stream_configs:
348            if stream_config.get("incremental_sync", {}).get("parent_stream"):
349                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
350                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
351                    "use_cache"
352                ] = True
353
354            elif stream_config.get("retriever", {}).get("partition_router", {}):
355                partition_router = stream_config["retriever"]["partition_router"]
356
357                if isinstance(partition_router, dict) and partition_router.get(
358                    "parent_stream_configs"
359                ):
360                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
361                elif isinstance(partition_router, list):
362                    for router in partition_router:
363                        if router.get("parent_stream_configs"):
364                            update_with_cache_parent_configs(router["parent_stream_configs"])
365
366        for stream_config in stream_configs:
367            if stream_config["name"] in parent_streams:
368                if stream_config["type"] == "StateDelegatingStream":
369                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
370                        True
371                    )
372                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
373                        True
374                    )
375                else:
376                    stream_config["retriever"]["requester"]["use_cache"] = True
377
378        return stream_configs
379
380    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
381        """
382        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
383        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
384        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
385        in the project root.
386        """
387        self._configure_logger_level(logger)
388        self._emit_manifest_debug_message(
389            extra_args={
390                "source_name": self.name,
391                "parsed_config": json.dumps(self._source_config),
392            }
393        )
394
395        return (
396            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
397        )
398
399    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
400        self._configure_logger_level(logger)
401        return super().check(logger, config)
402
403    def read(
404        self,
405        logger: logging.Logger,
406        config: Mapping[str, Any],
407        catalog: ConfiguredAirbyteCatalog,
408        state: Optional[List[AirbyteStateMessage]] = None,
409    ) -> Iterator[AirbyteMessage]:
410        self._configure_logger_level(logger)
411        yield from super().read(logger, config, catalog, state)
412
413    def _configure_logger_level(self, logger: logging.Logger) -> None:
414        """
415        Set the log level to logging.DEBUG if debug mode is enabled
416        """
417        if self._debug:
418            logger.setLevel(logging.DEBUG)
419
420    def _validate_source(self) -> None:
421        """
422        Validates the connector manifest against the declarative component schema
423        """
424
425        try:
426            validate(self._source_config, self._declarative_component_schema)
427        except ValidationError as e:
428            raise ValidationError(
429                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
430            ) from e
431
432        cdk_version_str = metadata.version("airbyte_cdk")
433        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
434        manifest_version_str = self._source_config.get("version")
435        if manifest_version_str is None:
436            raise RuntimeError(
437                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
438            )
439        manifest_version = self._parse_version(manifest_version_str, "manifest")
440
441        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
442            # Skipping version compatibility check on unreleased dev branch
443            pass
444        elif (cdk_version.major, cdk_version.minor) < (
445            manifest_version.major,
446            manifest_version.minor,
447        ):
448            raise ValidationError(
449                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
450                f"manifest may contain features that are not in the current CDK version."
451            )
452        elif (manifest_version.major, manifest_version.minor) < (0, 29):
453            raise ValidationError(
454                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
455                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
456                f"{cdk_version!s} which contains these breaking changes."
457            )
458
459    @staticmethod
460    def _parse_version(
461        version: str,
462        version_type: str,
463    ) -> Version:
464        """Takes a semantic version represented as a string and splits it into a tuple.
465
466        The fourth part (prerelease) is not returned in the tuple.
467
468        Returns:
469            Version: the parsed version object
470        """
471        try:
472            parsed_version = Version(version)
473        except InvalidVersion as ex:
474            raise ValidationError(
475                f"The {version_type} version '{version}' is not a valid version format."
476            ) from ex
477        else:
478            # No exception
479            return parsed_version
480
481    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
482        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
483        stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
484        for s in stream_configs:
485            if "type" not in s:
486                s["type"] = "DeclarativeStream"
487        return stream_configs
488
489    def _dynamic_stream_configs(
490        self,
491        manifest: Mapping[str, Any],
492        config: Mapping[str, Any],
493        with_dynamic_stream_name: Optional[bool] = None,
494    ) -> List[Dict[str, Any]]:
495        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
496        dynamic_stream_configs: List[Dict[str, Any]] = []
497        seen_dynamic_streams: Set[str] = set()
498
499        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
500            components_resolver_config = dynamic_definition["components_resolver"]
501
502            if not components_resolver_config:
503                raise ValueError(
504                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
505                )
506
507            resolver_type = components_resolver_config.get("type")
508            if not resolver_type:
509                raise ValueError(
510                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
511                )
512
513            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
514                raise ValueError(
515                    f"Invalid components resolver type '{resolver_type}'. "
516                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
517                )
518
519            if "retriever" in components_resolver_config:
520                components_resolver_config["retriever"]["requester"]["use_cache"] = True
521
522            # Create a resolver for dynamic components based on type
523            components_resolver = self._constructor.create_component(
524                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
525                components_resolver_config,
526                config,
527            )
528
529            stream_template_config = dynamic_definition["stream_template"]
530
531            for dynamic_stream in components_resolver.resolve_components(
532                stream_template_config=stream_template_config
533            ):
534                dynamic_stream = {
535                    **ManifestComponentTransformer().propagate_types_and_parameters(
536                        "", dynamic_stream, {}, use_parent_parameters=True
537                    )
538                }
539
540                if "type" not in dynamic_stream:
541                    dynamic_stream["type"] = "DeclarativeStream"
542
543                # Ensure that each stream is created with a unique name
544                name = dynamic_stream.get("name")
545
546                if with_dynamic_stream_name:
547                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
548                        "name", f"dynamic_stream_{dynamic_definition_index}"
549                    )
550
551                if not isinstance(name, str):
552                    raise ValueError(
553                        f"Expected stream name {name} to be a string, got {type(name)}."
554                    )
555
556                if name in seen_dynamic_streams:
557                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
558                    failure_type = FailureType.system_error
559
560                    if resolver_type == "ConfigComponentsResolver":
561                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
562                        failure_type = FailureType.config_error
563
564                    raise AirbyteTracedException(
565                        message=error_message,
566                        internal_message=error_message,
567                        failure_type=failure_type,
568                    )
569
570                seen_dynamic_streams.add(name)
571                dynamic_stream_configs.append(dynamic_stream)
572
573        return dynamic_stream_configs
574
575    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
576        self.logger.debug("declarative source created from manifest", extra=extra_args)

Declarative source defined by a manifest of low-code components that define source connector behavior

ManifestDeclarativeSource( source_config: Mapping[str, Any], *, config: Optional[Mapping[str, Any]] = None, debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[airbyte_cdk.sources.declarative.parsers.model_to_component_factory.ModelToComponentFactory] = None, migrate_manifest: Optional[bool] = False, normalize_manifest: Optional[bool] = False, config_path: Optional[str] = None)
 97    def __init__(
 98        self,
 99        source_config: ConnectionDefinition,
100        *,
101        config: Mapping[str, Any] | None = None,
102        debug: bool = False,
103        emit_connector_builder_messages: bool = False,
104        component_factory: Optional[ModelToComponentFactory] = None,
105        migrate_manifest: Optional[bool] = False,
106        normalize_manifest: Optional[bool] = False,
107        config_path: Optional[str] = None,
108    ) -> None:
109        """
110        Args:
111            config: The provided config dict.
112            source_config: The manifest of low-code components that describe the source connector.
113            debug: True if debug mode is enabled.
114            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
115            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
116            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
117            config_path: Optional path to the config file.
118        """
119        self.logger = logging.getLogger(f"airbyte.{self.name}")
120        self._should_normalize = normalize_manifest
121        self._should_migrate = migrate_manifest
122        self._declarative_component_schema = _get_declarative_component_schema()
123        # If custom components are needed, locate and/or register them.
124        self.components_module: ModuleType | None = get_registered_components_module(config=config)
125        # set additional attributes
126        self._debug = debug
127        self._emit_connector_builder_messages = emit_connector_builder_messages
128        self._constructor = (
129            component_factory
130            if component_factory
131            else ModelToComponentFactory(
132                emit_connector_builder_messages,
133                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
134            )
135        )
136        self._message_repository = self._constructor.get_message_repository()
137        self._slice_logger: SliceLogger = (
138            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
139        )
140
141        # resolve all components in the manifest
142        self._source_config = self._pre_process_manifest(dict(source_config))
143        # validate resolved manifest against the declarative component schema
144        self._validate_source()
145        # apply additional post-processing to the manifest
146        self._post_process_manifest()
147
148        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
149        self._spec_component: Optional[Spec] = (
150            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
151        )
152        self._config = self._migrate_and_transform_config(config_path, config) or {}
Arguments:
  • config: The provided config dict.
  • source_config: The manifest of low-code components that describe the source connector.
  • debug: True if debug mode is enabled.
  • emit_connector_builder_messages: True if messages should be emitted to the connector builder.
  • component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
  • normalize_manifest: Optional flag to indicate if the manifest should be normalized.
  • config_path: Optional path to the config file.
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
154    @property
155    def resolved_manifest(self) -> Mapping[str, Any]:
156        """
157        Returns the resolved manifest configuration for the source.
158
159        This property provides access to the internal source configuration as a mapping,
160        which contains all settings and parameters required to define the source's behavior.
161
162        Returns:
163            Mapping[str, Any]: The resolved source configuration manifest.
164        """
165        return self._source_config

Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.

Returns:

Mapping[str, Any]: The resolved source configuration manifest.

message_repository: airbyte_cdk.MessageRepository
260    @property
261    def message_repository(self) -> MessageRepository:
262        return self._message_repository
dynamic_streams: List[Dict[str, Any]]
264    @property
265    def dynamic_streams(self) -> List[Dict[str, Any]]:
266        return self._dynamic_stream_configs(
267            manifest=self._source_config,
268            config=self._config,
269            with_dynamic_stream_name=True,
270        )
def deprecation_warnings(self) -> List[airbyte_cdk.connector_builder.models.LogMessage]:
272    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
273        return self._constructor.get_model_deprecations()

Returns a list of deprecation warnings for the source.

275    @property
276    def connection_checker(self) -> ConnectionChecker:
277        check = self._source_config["check"]
278        if "type" not in check:
279            check["type"] = "CheckStream"
280        check_stream = self._constructor.create_component(
281            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
282            check,
283            dict(),
284            emit_connector_builder_messages=self._emit_connector_builder_messages,
285        )
286        if isinstance(check_stream, ConnectionChecker):
287            return check_stream
288        else:
289            raise ValueError(
290                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
291            )

Returns the ConnectionChecker to use for the check operation

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.Stream]:
293    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
294        if self._spec_component:
295            self._spec_component.validate_config(config)
296
297        self._emit_manifest_debug_message(
298            extra_args={
299                "source_name": self.name,
300                "parsed_config": json.dumps(self._source_config),
301            }
302        )
303
304        stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
305
306        api_budget_model = self._source_config.get("api_budget")
307        if api_budget_model:
308            self._constructor.set_api_budget(api_budget_model, config)
309
310        source_streams = [
311            self._constructor.create_component(
312                (
313                    StateDelegatingStreamModel
314                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
315                    else DeclarativeStreamModel
316                ),
317                stream_config,
318                config,
319                emit_connector_builder_messages=self._emit_connector_builder_messages,
320            )
321            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
322        ]
323
324        return source_streams
Parameters
  • config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns

A list of the streams in this source connector.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
380    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
381        """
382        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
383        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
384        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
385        in the project root.
386        """
387        self._configure_logger_level(logger)
388        self._emit_manifest_debug_message(
389            extra_args={
390                "source_name": self.name,
391                "parsed_config": json.dumps(self._source_config),
392            }
393        )
394
395        return (
396            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
397        )

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
399    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
400        self._configure_logger_level(logger)
401        return super().check(logger, config)

Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
403    def read(
404        self,
405        logger: logging.Logger,
406        config: Mapping[str, Any],
407        catalog: ConfiguredAirbyteCatalog,
408        state: Optional[List[AirbyteStateMessage]] = None,
409    ) -> Iterator[AirbyteMessage]:
410        self._configure_logger_level(logger)
411        yield from super().read(logger, config, catalog, state)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.