airbyte_cdk.sources.declarative.manifest_declarative_source

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import json
  6import logging
  7import pkgutil
  8from copy import deepcopy
  9from importlib import metadata
 10from types import ModuleType
 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
 12
 13import yaml
 14from jsonschema.exceptions import ValidationError
 15from jsonschema.validators import validate
 16from packaging.version import InvalidVersion, Version
 17
 18from airbyte_cdk.connector_builder.models import (
 19    LogMessage as ConnectorBuilderLogMessage,
 20)
 21from airbyte_cdk.manifest_migrations.migration_handler import (
 22    ManifestMigrationHandler,
 23)
 24from airbyte_cdk.models import (
 25    AirbyteConnectionStatus,
 26    AirbyteMessage,
 27    AirbyteStateMessage,
 28    ConfiguredAirbyteCatalog,
 29    ConnectorSpecification,
 30    FailureType,
 31)
 32from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 33from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 34from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
 35from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 36    DeclarativeStream as DeclarativeStreamModel,
 37)
 38from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 39    Spec as SpecModel,
 40)
 41from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 42    StateDelegatingStream as StateDelegatingStreamModel,
 43)
 44from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 45    get_registered_components_module,
 46)
 47from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 48    ManifestComponentTransformer,
 49)
 50from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
 51    ManifestNormalizer,
 52)
 53from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 54    ManifestReferenceResolver,
 55)
 56from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 57    ModelToComponentFactory,
 58)
 59from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 60from airbyte_cdk.sources.message import MessageRepository
 61from airbyte_cdk.sources.streams.core import Stream
 62from airbyte_cdk.sources.types import ConnectionDefinition
 63from airbyte_cdk.sources.utils.slice_logger import (
 64    AlwaysLogSliceLogger,
 65    DebugSliceLogger,
 66    SliceLogger,
 67)
 68from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 69
 70
 71def _get_declarative_component_schema() -> Dict[str, Any]:
 72    try:
 73        raw_component_schema = pkgutil.get_data(
 74            "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
 75        )
 76        if raw_component_schema is not None:
 77            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
 78            return declarative_component_schema  # type: ignore
 79        else:
 80            raise RuntimeError(
 81                "Failed to read manifest component json schema required for deduplication"
 82            )
 83    except FileNotFoundError as e:
 84        raise FileNotFoundError(
 85            f"Failed to read manifest component json schema required for deduplication: {e}"
 86        )
 87
 88
 89class ManifestDeclarativeSource(DeclarativeSource):
 90    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
 91
 92    def __init__(
 93        self,
 94        source_config: ConnectionDefinition,
 95        *,
 96        config: Mapping[str, Any] | None = None,
 97        debug: bool = False,
 98        emit_connector_builder_messages: bool = False,
 99        component_factory: Optional[ModelToComponentFactory] = None,
100        migrate_manifest: Optional[bool] = False,
101        normalize_manifest: Optional[bool] = False,
102    ) -> None:
103        """
104        Args:
105            config: The provided config dict.
106            source_config: The manifest of low-code components that describe the source connector.
107            debug: True if debug mode is enabled.
108            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
109            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
110            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
111        """
112        self.logger = logging.getLogger(f"airbyte.{self.name}")
113        self._should_normalize = normalize_manifest
114        self._should_migrate = migrate_manifest
115        self._declarative_component_schema = _get_declarative_component_schema()
116        # If custom components are needed, locate and/or register them.
117        self.components_module: ModuleType | None = get_registered_components_module(config=config)
118        # set additional attributes
119        self._debug = debug
120        self._emit_connector_builder_messages = emit_connector_builder_messages
121        self._constructor = (
122            component_factory
123            if component_factory
124            else ModelToComponentFactory(
125                emit_connector_builder_messages,
126                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
127            )
128        )
129        self._message_repository = self._constructor.get_message_repository()
130        self._slice_logger: SliceLogger = (
131            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
132        )
133        self._config = config or {}
134
135        # resolve all components in the manifest
136        self._source_config = self._pre_process_manifest(dict(source_config))
137        # validate resolved manifest against the declarative component schema
138        self._validate_source()
139        # apply additional post-processing to the manifest
140        self._post_process_manifest()
141
142    @property
143    def resolved_manifest(self) -> Mapping[str, Any]:
144        """
145        Returns the resolved manifest configuration for the source.
146
147        This property provides access to the internal source configuration as a mapping,
148        which contains all settings and parameters required to define the source's behavior.
149
150        Returns:
151            Mapping[str, Any]: The resolved source configuration manifest.
152        """
153        return self._source_config
154
155    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
156        """
157        Preprocesses the provided manifest dictionary by resolving any manifest references.
158
159        This method modifies the input manifest in place, resolving references using the
160        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
161
162        Args:
163            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
164
165        Returns:
166            None
167        """
168        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
169        manifest = self._fix_source_type(manifest)
170        # Resolve references in the manifest
171        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
172        # Propagate types and parameters throughout the manifest
173        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
174            "", resolved_manifest, {}
175        )
176
177        return propagated_manifest
178
179    def _post_process_manifest(self) -> None:
180        """
181        Post-processes the manifest after validation.
182        This method is responsible for any additional modifications or transformations needed
183        after the manifest has been validated and before it is used in the source.
184        """
185        # apply manifest migration, if required
186        self._migrate_manifest()
187        # apply manifest normalization, if required
188        self._normalize_manifest()
189
190    def _normalize_manifest(self) -> None:
191        """
192        This method is used to normalize the manifest. It should be called after the manifest has been validated.
193
194        Connector Builder UI rendering requires the manifest to be in a specific format.
195         - references have been resolved
196         - the commonly used definitions are extracted to the `definitions.linked.*`
197        """
198        if self._should_normalize:
199            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
200            self._source_config = normalizer.normalize()
201
202    def _migrate_manifest(self) -> None:
203        """
204        This method is used to migrate the manifest. It should be called after the manifest has been validated.
205        The migration is done in place, so the original manifest is modified.
206
207        The original manifest is returned if any error occurs during migration.
208        """
209        if self._should_migrate:
210            manifest_migrator = ManifestMigrationHandler(self._source_config)
211            self._source_config = manifest_migrator.apply_migrations()
212            # validate migrated manifest against the declarative component schema
213            self._validate_source()
214
215    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
216        """
217        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
218        """
219        if "type" not in manifest:
220            manifest["type"] = "DeclarativeSource"
221
222        return manifest
223
224    @property
225    def message_repository(self) -> MessageRepository:
226        return self._message_repository
227
228    @property
229    def dynamic_streams(self) -> List[Dict[str, Any]]:
230        return self._dynamic_stream_configs(
231            manifest=self._source_config,
232            config=self._config,
233            with_dynamic_stream_name=True,
234        )
235
236    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
237        return self._constructor.get_model_deprecations()
238
239    @property
240    def connection_checker(self) -> ConnectionChecker:
241        check = self._source_config["check"]
242        if "type" not in check:
243            check["type"] = "CheckStream"
244        check_stream = self._constructor.create_component(
245            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
246            check,
247            dict(),
248            emit_connector_builder_messages=self._emit_connector_builder_messages,
249        )
250        if isinstance(check_stream, ConnectionChecker):
251            return check_stream
252        else:
253            raise ValueError(
254                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
255            )
256
257    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
258        self._emit_manifest_debug_message(
259            extra_args={
260                "source_name": self.name,
261                "parsed_config": json.dumps(self._source_config),
262            }
263        )
264
265        stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
266            self._source_config, config
267        )
268
269        api_budget_model = self._source_config.get("api_budget")
270        if api_budget_model:
271            self._constructor.set_api_budget(api_budget_model, config)
272
273        source_streams = [
274            self._constructor.create_component(
275                (
276                    StateDelegatingStreamModel
277                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
278                    else DeclarativeStreamModel
279                ),
280                stream_config,
281                config,
282                emit_connector_builder_messages=self._emit_connector_builder_messages,
283            )
284            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
285        ]
286
287        return source_streams
288
289    @staticmethod
290    def _initialize_cache_for_parent_streams(
291        stream_configs: List[Dict[str, Any]],
292    ) -> List[Dict[str, Any]]:
293        parent_streams = set()
294
295        def update_with_cache_parent_configs(
296            parent_configs: list[dict[str, Any]],
297        ) -> None:
298            for parent_config in parent_configs:
299                parent_streams.add(parent_config["stream"]["name"])
300                if parent_config["stream"]["type"] == "StateDelegatingStream":
301                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
302                        "use_cache"
303                    ] = True
304                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
305                        "use_cache"
306                    ] = True
307                else:
308                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
309
310        for stream_config in stream_configs:
311            if stream_config.get("incremental_sync", {}).get("parent_stream"):
312                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
313                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
314                    "use_cache"
315                ] = True
316
317            elif stream_config.get("retriever", {}).get("partition_router", {}):
318                partition_router = stream_config["retriever"]["partition_router"]
319
320                if isinstance(partition_router, dict) and partition_router.get(
321                    "parent_stream_configs"
322                ):
323                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
324                elif isinstance(partition_router, list):
325                    for router in partition_router:
326                        if router.get("parent_stream_configs"):
327                            update_with_cache_parent_configs(router["parent_stream_configs"])
328
329        for stream_config in stream_configs:
330            if stream_config["name"] in parent_streams:
331                if stream_config["type"] == "StateDelegatingStream":
332                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
333                        True
334                    )
335                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
336                        True
337                    )
338                else:
339                    stream_config["retriever"]["requester"]["use_cache"] = True
340
341        return stream_configs
342
343    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
344        """
345        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
346        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
347        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
348        in the project root.
349        """
350        self._configure_logger_level(logger)
351        self._emit_manifest_debug_message(
352            extra_args={
353                "source_name": self.name,
354                "parsed_config": json.dumps(self._source_config),
355            }
356        )
357
358        spec = self._source_config.get("spec")
359        if spec:
360            if "type" not in spec:
361                spec["type"] = "Spec"
362            spec_component = self._constructor.create_component(SpecModel, spec, dict())
363            return spec_component.generate_spec()
364        else:
365            return super().spec(logger)
366
367    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
368        self._configure_logger_level(logger)
369        return super().check(logger, config)
370
371    def read(
372        self,
373        logger: logging.Logger,
374        config: Mapping[str, Any],
375        catalog: ConfiguredAirbyteCatalog,
376        state: Optional[List[AirbyteStateMessage]] = None,
377    ) -> Iterator[AirbyteMessage]:
378        self._configure_logger_level(logger)
379        yield from super().read(logger, config, catalog, state)
380
381    def _configure_logger_level(self, logger: logging.Logger) -> None:
382        """
383        Set the log level to logging.DEBUG if debug mode is enabled
384        """
385        if self._debug:
386            logger.setLevel(logging.DEBUG)
387
388    def _validate_source(self) -> None:
389        """
390        Validates the connector manifest against the declarative component schema
391        """
392
393        try:
394            validate(self._source_config, self._declarative_component_schema)
395        except ValidationError as e:
396            raise ValidationError(
397                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
398            ) from e
399
400        cdk_version_str = metadata.version("airbyte_cdk")
401        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
402        manifest_version_str = self._source_config.get("version")
403        if manifest_version_str is None:
404            raise RuntimeError(
405                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
406            )
407        manifest_version = self._parse_version(manifest_version_str, "manifest")
408
409        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
410            # Skipping version compatibility check on unreleased dev branch
411            pass
412        elif (cdk_version.major, cdk_version.minor) < (
413            manifest_version.major,
414            manifest_version.minor,
415        ):
416            raise ValidationError(
417                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
418                f"manifest may contain features that are not in the current CDK version."
419            )
420        elif (manifest_version.major, manifest_version.minor) < (0, 29):
421            raise ValidationError(
422                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
423                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
424                f"{cdk_version!s} which contains these breaking changes."
425            )
426
427    @staticmethod
428    def _parse_version(
429        version: str,
430        version_type: str,
431    ) -> Version:
432        """Takes a semantic version represented as a string and splits it into a tuple.
433
434        The fourth part (prerelease) is not returned in the tuple.
435
436        Returns:
437            Version: the parsed version object
438        """
439        try:
440            parsed_version = Version(version)
441        except InvalidVersion as ex:
442            raise ValidationError(
443                f"The {version_type} version '{version}' is not a valid version format."
444            ) from ex
445        else:
446            # No exception
447            return parsed_version
448
449    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
450        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
451        stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
452        for s in stream_configs:
453            if "type" not in s:
454                s["type"] = "DeclarativeStream"
455        return stream_configs
456
457    def _dynamic_stream_configs(
458        self,
459        manifest: Mapping[str, Any],
460        config: Mapping[str, Any],
461        with_dynamic_stream_name: Optional[bool] = None,
462    ) -> List[Dict[str, Any]]:
463        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
464        dynamic_stream_configs: List[Dict[str, Any]] = []
465        seen_dynamic_streams: Set[str] = set()
466
467        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
468            components_resolver_config = dynamic_definition["components_resolver"]
469
470            if not components_resolver_config:
471                raise ValueError(
472                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
473                )
474
475            resolver_type = components_resolver_config.get("type")
476            if not resolver_type:
477                raise ValueError(
478                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
479                )
480
481            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
482                raise ValueError(
483                    f"Invalid components resolver type '{resolver_type}'. "
484                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
485                )
486
487            if "retriever" in components_resolver_config:
488                components_resolver_config["retriever"]["requester"]["use_cache"] = True
489
490            # Create a resolver for dynamic components based on type
491            components_resolver = self._constructor.create_component(
492                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
493                components_resolver_config,
494                config,
495            )
496
497            stream_template_config = dynamic_definition["stream_template"]
498
499            for dynamic_stream in components_resolver.resolve_components(
500                stream_template_config=stream_template_config
501            ):
502                dynamic_stream = {
503                    **ManifestComponentTransformer().propagate_types_and_parameters(
504                        "", dynamic_stream, {}, use_parent_parameters=True
505                    )
506                }
507
508                if "type" not in dynamic_stream:
509                    dynamic_stream["type"] = "DeclarativeStream"
510
511                # Ensure that each stream is created with a unique name
512                name = dynamic_stream.get("name")
513
514                if with_dynamic_stream_name:
515                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
516                        "name", f"dynamic_stream_{dynamic_definition_index}"
517                    )
518
519                if not isinstance(name, str):
520                    raise ValueError(
521                        f"Expected stream name {name} to be a string, got {type(name)}."
522                    )
523
524                if name in seen_dynamic_streams:
525                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
526                    failure_type = FailureType.system_error
527
528                    if resolver_type == "ConfigComponentsResolver":
529                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
530                        failure_type = FailureType.config_error
531
532                    raise AirbyteTracedException(
533                        message=error_message,
534                        internal_message=error_message,
535                        failure_type=failure_type,
536                    )
537
538                seen_dynamic_streams.add(name)
539                dynamic_stream_configs.append(dynamic_stream)
540
541        return dynamic_stream_configs
542
543    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
544        self.logger.debug("declarative source created from manifest", extra=extra_args)
 90class ManifestDeclarativeSource(DeclarativeSource):
 91    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
 92
 93    def __init__(
 94        self,
 95        source_config: ConnectionDefinition,
 96        *,
 97        config: Mapping[str, Any] | None = None,
 98        debug: bool = False,
 99        emit_connector_builder_messages: bool = False,
100        component_factory: Optional[ModelToComponentFactory] = None,
101        migrate_manifest: Optional[bool] = False,
102        normalize_manifest: Optional[bool] = False,
103    ) -> None:
104        """
105        Args:
106            config: The provided config dict.
107            source_config: The manifest of low-code components that describe the source connector.
108            debug: True if debug mode is enabled.
109            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
110            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
111            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
112        """
113        self.logger = logging.getLogger(f"airbyte.{self.name}")
114        self._should_normalize = normalize_manifest
115        self._should_migrate = migrate_manifest
116        self._declarative_component_schema = _get_declarative_component_schema()
117        # If custom components are needed, locate and/or register them.
118        self.components_module: ModuleType | None = get_registered_components_module(config=config)
119        # set additional attributes
120        self._debug = debug
121        self._emit_connector_builder_messages = emit_connector_builder_messages
122        self._constructor = (
123            component_factory
124            if component_factory
125            else ModelToComponentFactory(
126                emit_connector_builder_messages,
127                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
128            )
129        )
130        self._message_repository = self._constructor.get_message_repository()
131        self._slice_logger: SliceLogger = (
132            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
133        )
134        self._config = config or {}
135
136        # resolve all components in the manifest
137        self._source_config = self._pre_process_manifest(dict(source_config))
138        # validate resolved manifest against the declarative component schema
139        self._validate_source()
140        # apply additional post-processing to the manifest
141        self._post_process_manifest()
142
143    @property
144    def resolved_manifest(self) -> Mapping[str, Any]:
145        """
146        Returns the resolved manifest configuration for the source.
147
148        This property provides access to the internal source configuration as a mapping,
149        which contains all settings and parameters required to define the source's behavior.
150
151        Returns:
152            Mapping[str, Any]: The resolved source configuration manifest.
153        """
154        return self._source_config
155
156    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
157        """
158        Preprocesses the provided manifest dictionary by resolving any manifest references.
159
160        This method modifies the input manifest in place, resolving references using the
161        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
162
163        Args:
164            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
165
166        Returns:
167            None
168        """
169        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
170        manifest = self._fix_source_type(manifest)
171        # Resolve references in the manifest
172        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
173        # Propagate types and parameters throughout the manifest
174        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
175            "", resolved_manifest, {}
176        )
177
178        return propagated_manifest
179
180    def _post_process_manifest(self) -> None:
181        """
182        Post-processes the manifest after validation.
183        This method is responsible for any additional modifications or transformations needed
184        after the manifest has been validated and before it is used in the source.
185        """
186        # apply manifest migration, if required
187        self._migrate_manifest()
188        # apply manifest normalization, if required
189        self._normalize_manifest()
190
191    def _normalize_manifest(self) -> None:
192        """
193        This method is used to normalize the manifest. It should be called after the manifest has been validated.
194
195        Connector Builder UI rendering requires the manifest to be in a specific format.
196         - references have been resolved
197         - the commonly used definitions are extracted to the `definitions.linked.*`
198        """
199        if self._should_normalize:
200            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
201            self._source_config = normalizer.normalize()
202
203    def _migrate_manifest(self) -> None:
204        """
205        This method is used to migrate the manifest. It should be called after the manifest has been validated.
206        The migration is done in place, so the original manifest is modified.
207
208        The original manifest is returned if any error occurs during migration.
209        """
210        if self._should_migrate:
211            manifest_migrator = ManifestMigrationHandler(self._source_config)
212            self._source_config = manifest_migrator.apply_migrations()
213            # validate migrated manifest against the declarative component schema
214            self._validate_source()
215
216    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
217        """
218        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
219        """
220        if "type" not in manifest:
221            manifest["type"] = "DeclarativeSource"
222
223        return manifest
224
225    @property
226    def message_repository(self) -> MessageRepository:
227        return self._message_repository
228
229    @property
230    def dynamic_streams(self) -> List[Dict[str, Any]]:
231        return self._dynamic_stream_configs(
232            manifest=self._source_config,
233            config=self._config,
234            with_dynamic_stream_name=True,
235        )
236
237    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
238        return self._constructor.get_model_deprecations()
239
240    @property
241    def connection_checker(self) -> ConnectionChecker:
242        check = self._source_config["check"]
243        if "type" not in check:
244            check["type"] = "CheckStream"
245        check_stream = self._constructor.create_component(
246            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
247            check,
248            dict(),
249            emit_connector_builder_messages=self._emit_connector_builder_messages,
250        )
251        if isinstance(check_stream, ConnectionChecker):
252            return check_stream
253        else:
254            raise ValueError(
255                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
256            )
257
258    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
259        self._emit_manifest_debug_message(
260            extra_args={
261                "source_name": self.name,
262                "parsed_config": json.dumps(self._source_config),
263            }
264        )
265
266        stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
267            self._source_config, config
268        )
269
270        api_budget_model = self._source_config.get("api_budget")
271        if api_budget_model:
272            self._constructor.set_api_budget(api_budget_model, config)
273
274        source_streams = [
275            self._constructor.create_component(
276                (
277                    StateDelegatingStreamModel
278                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
279                    else DeclarativeStreamModel
280                ),
281                stream_config,
282                config,
283                emit_connector_builder_messages=self._emit_connector_builder_messages,
284            )
285            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
286        ]
287
288        return source_streams
289
290    @staticmethod
291    def _initialize_cache_for_parent_streams(
292        stream_configs: List[Dict[str, Any]],
293    ) -> List[Dict[str, Any]]:
294        parent_streams = set()
295
296        def update_with_cache_parent_configs(
297            parent_configs: list[dict[str, Any]],
298        ) -> None:
299            for parent_config in parent_configs:
300                parent_streams.add(parent_config["stream"]["name"])
301                if parent_config["stream"]["type"] == "StateDelegatingStream":
302                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
303                        "use_cache"
304                    ] = True
305                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
306                        "use_cache"
307                    ] = True
308                else:
309                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
310
311        for stream_config in stream_configs:
312            if stream_config.get("incremental_sync", {}).get("parent_stream"):
313                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
314                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
315                    "use_cache"
316                ] = True
317
318            elif stream_config.get("retriever", {}).get("partition_router", {}):
319                partition_router = stream_config["retriever"]["partition_router"]
320
321                if isinstance(partition_router, dict) and partition_router.get(
322                    "parent_stream_configs"
323                ):
324                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
325                elif isinstance(partition_router, list):
326                    for router in partition_router:
327                        if router.get("parent_stream_configs"):
328                            update_with_cache_parent_configs(router["parent_stream_configs"])
329
330        for stream_config in stream_configs:
331            if stream_config["name"] in parent_streams:
332                if stream_config["type"] == "StateDelegatingStream":
333                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
334                        True
335                    )
336                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
337                        True
338                    )
339                else:
340                    stream_config["retriever"]["requester"]["use_cache"] = True
341
342        return stream_configs
343
344    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
345        """
346        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
347        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
348        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
349        in the project root.
350        """
351        self._configure_logger_level(logger)
352        self._emit_manifest_debug_message(
353            extra_args={
354                "source_name": self.name,
355                "parsed_config": json.dumps(self._source_config),
356            }
357        )
358
359        spec = self._source_config.get("spec")
360        if spec:
361            if "type" not in spec:
362                spec["type"] = "Spec"
363            spec_component = self._constructor.create_component(SpecModel, spec, dict())
364            return spec_component.generate_spec()
365        else:
366            return super().spec(logger)
367
368    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
369        self._configure_logger_level(logger)
370        return super().check(logger, config)
371
372    def read(
373        self,
374        logger: logging.Logger,
375        config: Mapping[str, Any],
376        catalog: ConfiguredAirbyteCatalog,
377        state: Optional[List[AirbyteStateMessage]] = None,
378    ) -> Iterator[AirbyteMessage]:
379        self._configure_logger_level(logger)
380        yield from super().read(logger, config, catalog, state)
381
382    def _configure_logger_level(self, logger: logging.Logger) -> None:
383        """
384        Set the log level to logging.DEBUG if debug mode is enabled
385        """
386        if self._debug:
387            logger.setLevel(logging.DEBUG)
388
389    def _validate_source(self) -> None:
390        """
391        Validates the connector manifest against the declarative component schema
392        """
393
394        try:
395            validate(self._source_config, self._declarative_component_schema)
396        except ValidationError as e:
397            raise ValidationError(
398                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
399            ) from e
400
401        cdk_version_str = metadata.version("airbyte_cdk")
402        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
403        manifest_version_str = self._source_config.get("version")
404        if manifest_version_str is None:
405            raise RuntimeError(
406                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
407            )
408        manifest_version = self._parse_version(manifest_version_str, "manifest")
409
410        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
411            # Skipping version compatibility check on unreleased dev branch
412            pass
413        elif (cdk_version.major, cdk_version.minor) < (
414            manifest_version.major,
415            manifest_version.minor,
416        ):
417            raise ValidationError(
418                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
419                f"manifest may contain features that are not in the current CDK version."
420            )
421        elif (manifest_version.major, manifest_version.minor) < (0, 29):
422            raise ValidationError(
423                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
424                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
425                f"{cdk_version!s} which contains these breaking changes."
426            )
427
428    @staticmethod
429    def _parse_version(
430        version: str,
431        version_type: str,
432    ) -> Version:
433        """Takes a semantic version represented as a string and splits it into a tuple.
434
435        The fourth part (prerelease) is not returned in the tuple.
436
437        Returns:
438            Version: the parsed version object
439        """
440        try:
441            parsed_version = Version(version)
442        except InvalidVersion as ex:
443            raise ValidationError(
444                f"The {version_type} version '{version}' is not a valid version format."
445            ) from ex
446        else:
447            # No exception
448            return parsed_version
449
450    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
451        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
452        stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
453        for s in stream_configs:
454            if "type" not in s:
455                s["type"] = "DeclarativeStream"
456        return stream_configs
457
458    def _dynamic_stream_configs(
459        self,
460        manifest: Mapping[str, Any],
461        config: Mapping[str, Any],
462        with_dynamic_stream_name: Optional[bool] = None,
463    ) -> List[Dict[str, Any]]:
464        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
465        dynamic_stream_configs: List[Dict[str, Any]] = []
466        seen_dynamic_streams: Set[str] = set()
467
468        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
469            components_resolver_config = dynamic_definition["components_resolver"]
470
471            if not components_resolver_config:
472                raise ValueError(
473                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
474                )
475
476            resolver_type = components_resolver_config.get("type")
477            if not resolver_type:
478                raise ValueError(
479                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
480                )
481
482            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
483                raise ValueError(
484                    f"Invalid components resolver type '{resolver_type}'. "
485                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
486                )
487
488            if "retriever" in components_resolver_config:
489                components_resolver_config["retriever"]["requester"]["use_cache"] = True
490
491            # Create a resolver for dynamic components based on type
492            components_resolver = self._constructor.create_component(
493                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
494                components_resolver_config,
495                config,
496            )
497
498            stream_template_config = dynamic_definition["stream_template"]
499
500            for dynamic_stream in components_resolver.resolve_components(
501                stream_template_config=stream_template_config
502            ):
503                dynamic_stream = {
504                    **ManifestComponentTransformer().propagate_types_and_parameters(
505                        "", dynamic_stream, {}, use_parent_parameters=True
506                    )
507                }
508
509                if "type" not in dynamic_stream:
510                    dynamic_stream["type"] = "DeclarativeStream"
511
512                # Ensure that each stream is created with a unique name
513                name = dynamic_stream.get("name")
514
515                if with_dynamic_stream_name:
516                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
517                        "name", f"dynamic_stream_{dynamic_definition_index}"
518                    )
519
520                if not isinstance(name, str):
521                    raise ValueError(
522                        f"Expected stream name {name} to be a string, got {type(name)}."
523                    )
524
525                if name in seen_dynamic_streams:
526                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
527                    failure_type = FailureType.system_error
528
529                    if resolver_type == "ConfigComponentsResolver":
530                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
531                        failure_type = FailureType.config_error
532
533                    raise AirbyteTracedException(
534                        message=error_message,
535                        internal_message=error_message,
536                        failure_type=failure_type,
537                    )
538
539                seen_dynamic_streams.add(name)
540                dynamic_stream_configs.append(dynamic_stream)
541
542        return dynamic_stream_configs
543
544    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
545        self.logger.debug("declarative source created from manifest", extra=extra_args)

Declarative source defined by a manifest of low-code components that define source connector behavior

ManifestDeclarativeSource( source_config: Mapping[str, Any], *, config: Optional[Mapping[str, Any]] = None, debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[airbyte_cdk.sources.declarative.parsers.model_to_component_factory.ModelToComponentFactory] = None, migrate_manifest: Optional[bool] = False, normalize_manifest: Optional[bool] = False)
 93    def __init__(
 94        self,
 95        source_config: ConnectionDefinition,
 96        *,
 97        config: Mapping[str, Any] | None = None,
 98        debug: bool = False,
 99        emit_connector_builder_messages: bool = False,
100        component_factory: Optional[ModelToComponentFactory] = None,
101        migrate_manifest: Optional[bool] = False,
102        normalize_manifest: Optional[bool] = False,
103    ) -> None:
104        """
105        Args:
106            config: The provided config dict.
107            source_config: The manifest of low-code components that describe the source connector.
108            debug: True if debug mode is enabled.
109            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
110            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
111            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
112        """
113        self.logger = logging.getLogger(f"airbyte.{self.name}")
114        self._should_normalize = normalize_manifest
115        self._should_migrate = migrate_manifest
116        self._declarative_component_schema = _get_declarative_component_schema()
117        # If custom components are needed, locate and/or register them.
118        self.components_module: ModuleType | None = get_registered_components_module(config=config)
119        # set additional attributes
120        self._debug = debug
121        self._emit_connector_builder_messages = emit_connector_builder_messages
122        self._constructor = (
123            component_factory
124            if component_factory
125            else ModelToComponentFactory(
126                emit_connector_builder_messages,
127                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
128            )
129        )
130        self._message_repository = self._constructor.get_message_repository()
131        self._slice_logger: SliceLogger = (
132            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
133        )
134        self._config = config or {}
135
136        # resolve all components in the manifest
137        self._source_config = self._pre_process_manifest(dict(source_config))
138        # validate resolved manifest against the declarative component schema
139        self._validate_source()
140        # apply additional post-processing to the manifest
141        self._post_process_manifest()
Arguments:
  • config: The provided config dict.
  • source_config: The manifest of low-code components that describe the source connector.
  • debug: True if debug mode is enabled.
  • emit_connector_builder_messages: True if messages should be emitted to the connector builder.
  • component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
  • normalize_manifest: Optional flag to indicate if the manifest should be normalized.
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
143    @property
144    def resolved_manifest(self) -> Mapping[str, Any]:
145        """
146        Returns the resolved manifest configuration for the source.
147
148        This property provides access to the internal source configuration as a mapping,
149        which contains all settings and parameters required to define the source's behavior.
150
151        Returns:
152            Mapping[str, Any]: The resolved source configuration manifest.
153        """
154        return self._source_config

Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.

Returns:

Mapping[str, Any]: The resolved source configuration manifest.

message_repository: airbyte_cdk.MessageRepository
225    @property
226    def message_repository(self) -> MessageRepository:
227        return self._message_repository
dynamic_streams: List[Dict[str, Any]]
229    @property
230    def dynamic_streams(self) -> List[Dict[str, Any]]:
231        return self._dynamic_stream_configs(
232            manifest=self._source_config,
233            config=self._config,
234            with_dynamic_stream_name=True,
235        )
def deprecation_warnings(self) -> List[airbyte_cdk.connector_builder.models.LogMessage]:
237    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
238        return self._constructor.get_model_deprecations()

Returns a list of deprecation warnings for the source.

240    @property
241    def connection_checker(self) -> ConnectionChecker:
242        check = self._source_config["check"]
243        if "type" not in check:
244            check["type"] = "CheckStream"
245        check_stream = self._constructor.create_component(
246            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
247            check,
248            dict(),
249            emit_connector_builder_messages=self._emit_connector_builder_messages,
250        )
251        if isinstance(check_stream, ConnectionChecker):
252            return check_stream
253        else:
254            raise ValueError(
255                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
256            )

Returns the ConnectionChecker to use for the check operation

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.Stream]:
258    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
259        self._emit_manifest_debug_message(
260            extra_args={
261                "source_name": self.name,
262                "parsed_config": json.dumps(self._source_config),
263            }
264        )
265
266        stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
267            self._source_config, config
268        )
269
270        api_budget_model = self._source_config.get("api_budget")
271        if api_budget_model:
272            self._constructor.set_api_budget(api_budget_model, config)
273
274        source_streams = [
275            self._constructor.create_component(
276                (
277                    StateDelegatingStreamModel
278                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
279                    else DeclarativeStreamModel
280                ),
281                stream_config,
282                config,
283                emit_connector_builder_messages=self._emit_connector_builder_messages,
284            )
285            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
286        ]
287
288        return source_streams
Parameters
  • config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns

A list of the streams in this source connector.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
344    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
345        """
346        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
347        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
348        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
349        in the project root.
350        """
351        self._configure_logger_level(logger)
352        self._emit_manifest_debug_message(
353            extra_args={
354                "source_name": self.name,
355                "parsed_config": json.dumps(self._source_config),
356            }
357        )
358
359        spec = self._source_config.get("spec")
360        if spec:
361            if "type" not in spec:
362                spec["type"] = "Spec"
363            spec_component = self._constructor.create_component(SpecModel, spec, dict())
364            return spec_component.generate_spec()
365        else:
366            return super().spec(logger)

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
368    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
369        self._configure_logger_level(logger)
370        return super().check(logger, config)

Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
372    def read(
373        self,
374        logger: logging.Logger,
375        config: Mapping[str, Any],
376        catalog: ConfiguredAirbyteCatalog,
377        state: Optional[List[AirbyteStateMessage]] = None,
378    ) -> Iterator[AirbyteMessage]:
379        self._configure_logger_level(logger)
380        yield from super().read(logger, config, catalog, state)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.