airbyte_cdk.sources.declarative.manifest_declarative_source

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import json
  6import logging
  7import pkgutil
  8from copy import deepcopy
  9from importlib import metadata
 10from types import ModuleType
 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
 12
 13import yaml
 14from jsonschema.exceptions import ValidationError
 15from jsonschema.validators import validate
 16from packaging.version import InvalidVersion, Version
 17
 18from airbyte_cdk.models import (
 19    AirbyteConnectionStatus,
 20    AirbyteMessage,
 21    AirbyteStateMessage,
 22    ConfiguredAirbyteCatalog,
 23    ConnectorSpecification,
 24    FailureType,
 25)
 26from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 27from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 28from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
 29from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 30    DeclarativeStream as DeclarativeStreamModel,
 31)
 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
 33from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 34    StateDelegatingStream as StateDelegatingStreamModel,
 35)
 36from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 37    get_registered_components_module,
 38)
 39from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 40    ManifestComponentTransformer,
 41)
 42from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 43    ManifestReferenceResolver,
 44)
 45from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 46    ModelToComponentFactory,
 47)
 48from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 49from airbyte_cdk.sources.message import MessageRepository
 50from airbyte_cdk.sources.streams.core import Stream
 51from airbyte_cdk.sources.types import ConnectionDefinition
 52from airbyte_cdk.sources.utils.slice_logger import (
 53    AlwaysLogSliceLogger,
 54    DebugSliceLogger,
 55    SliceLogger,
 56)
 57from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 58
 59
 60class ManifestDeclarativeSource(DeclarativeSource):
 61    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
 62
 63    def __init__(
 64        self,
 65        source_config: ConnectionDefinition,
 66        *,
 67        config: Mapping[str, Any] | None = None,
 68        debug: bool = False,
 69        emit_connector_builder_messages: bool = False,
 70        component_factory: Optional[ModelToComponentFactory] = None,
 71    ):
 72        """
 73        Args:
 74            config: The provided config dict.
 75            source_config: The manifest of low-code components that describe the source connector.
 76            debug: True if debug mode is enabled.
 77            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
 78            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
 79        """
 80        self.logger = logging.getLogger(f"airbyte.{self.name}")
 81        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
 82        manifest = dict(source_config)
 83        if "type" not in manifest:
 84            manifest["type"] = "DeclarativeSource"
 85
 86        # If custom components are needed, locate and/or register them.
 87        self.components_module: ModuleType | None = get_registered_components_module(config=config)
 88
 89        resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
 90        propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
 91            "", resolved_source_config, {}
 92        )
 93        self._source_config = propagated_source_config
 94        self._debug = debug
 95        self._emit_connector_builder_messages = emit_connector_builder_messages
 96        self._constructor = (
 97            component_factory
 98            if component_factory
 99            else ModelToComponentFactory(
100                emit_connector_builder_messages,
101                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
102            )
103        )
104        self._message_repository = self._constructor.get_message_repository()
105        self._slice_logger: SliceLogger = (
106            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
107        )
108
109        self._validate_source()
110
111    @property
112    def resolved_manifest(self) -> Mapping[str, Any]:
113        return self._source_config
114
115    @property
116    def message_repository(self) -> MessageRepository:
117        return self._message_repository
118
119    @property
120    def connection_checker(self) -> ConnectionChecker:
121        check = self._source_config["check"]
122        if "type" not in check:
123            check["type"] = "CheckStream"
124        check_stream = self._constructor.create_component(
125            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
126            check,
127            dict(),
128            emit_connector_builder_messages=self._emit_connector_builder_messages,
129        )
130        if isinstance(check_stream, ConnectionChecker):
131            return check_stream
132        else:
133            raise ValueError(
134                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
135            )
136
137    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
138        self._emit_manifest_debug_message(
139            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
140        )
141
142        stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
143            self._source_config, config
144        )
145
146        api_budget_model = self._source_config.get("api_budget")
147        if api_budget_model:
148            self._constructor.set_api_budget(api_budget_model, config)
149
150        source_streams = [
151            self._constructor.create_component(
152                StateDelegatingStreamModel
153                if stream_config.get("type") == StateDelegatingStreamModel.__name__
154                else DeclarativeStreamModel,
155                stream_config,
156                config,
157                emit_connector_builder_messages=self._emit_connector_builder_messages,
158            )
159            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
160        ]
161
162        return source_streams
163
164    @staticmethod
165    def _initialize_cache_for_parent_streams(
166        stream_configs: List[Dict[str, Any]],
167    ) -> List[Dict[str, Any]]:
168        parent_streams = set()
169
170        def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
171            for parent_config in parent_configs:
172                parent_streams.add(parent_config["stream"]["name"])
173                if parent_config["stream"]["type"] == "StateDelegatingStream":
174                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
175                        "use_cache"
176                    ] = True
177                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
178                        "use_cache"
179                    ] = True
180                else:
181                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
182
183        for stream_config in stream_configs:
184            if stream_config.get("incremental_sync", {}).get("parent_stream"):
185                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
186                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
187                    "use_cache"
188                ] = True
189
190            elif stream_config.get("retriever", {}).get("partition_router", {}):
191                partition_router = stream_config["retriever"]["partition_router"]
192
193                if isinstance(partition_router, dict) and partition_router.get(
194                    "parent_stream_configs"
195                ):
196                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
197                elif isinstance(partition_router, list):
198                    for router in partition_router:
199                        if router.get("parent_stream_configs"):
200                            update_with_cache_parent_configs(router["parent_stream_configs"])
201
202        for stream_config in stream_configs:
203            if stream_config["name"] in parent_streams:
204                if stream_config["type"] == "StateDelegatingStream":
205                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
206                        True
207                    )
208                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
209                        True
210                    )
211                else:
212                    stream_config["retriever"]["requester"]["use_cache"] = True
213
214        return stream_configs
215
216    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
217        """
218        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
219        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
220        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
221        in the project root.
222        """
223        self._configure_logger_level(logger)
224        self._emit_manifest_debug_message(
225            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
226        )
227
228        spec = self._source_config.get("spec")
229        if spec:
230            if "type" not in spec:
231                spec["type"] = "Spec"
232            spec_component = self._constructor.create_component(SpecModel, spec, dict())
233            return spec_component.generate_spec()
234        else:
235            return super().spec(logger)
236
237    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
238        self._configure_logger_level(logger)
239        return super().check(logger, config)
240
241    def read(
242        self,
243        logger: logging.Logger,
244        config: Mapping[str, Any],
245        catalog: ConfiguredAirbyteCatalog,
246        state: Optional[List[AirbyteStateMessage]] = None,
247    ) -> Iterator[AirbyteMessage]:
248        self._configure_logger_level(logger)
249        yield from super().read(logger, config, catalog, state)
250
251    def _configure_logger_level(self, logger: logging.Logger) -> None:
252        """
253        Set the log level to logging.DEBUG if debug mode is enabled
254        """
255        if self._debug:
256            logger.setLevel(logging.DEBUG)
257
258    def _validate_source(self) -> None:
259        """
260        Validates the connector manifest against the declarative component schema
261        """
262        try:
263            raw_component_schema = pkgutil.get_data(
264                "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
265            )
266            if raw_component_schema is not None:
267                declarative_component_schema = yaml.load(
268                    raw_component_schema, Loader=yaml.SafeLoader
269                )
270            else:
271                raise RuntimeError(
272                    "Failed to read manifest component json schema required for validation"
273                )
274        except FileNotFoundError as e:
275            raise FileNotFoundError(
276                f"Failed to read manifest component json schema required for validation: {e}"
277            )
278
279        streams = self._source_config.get("streams")
280        dynamic_streams = self._source_config.get("dynamic_streams")
281        if not (streams or dynamic_streams):
282            raise ValidationError(
283                f"A valid manifest should have at least one stream defined. Got {streams}"
284            )
285
286        try:
287            validate(self._source_config, declarative_component_schema)
288        except ValidationError as e:
289            raise ValidationError(
290                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
291            ) from e
292
293        cdk_version_str = metadata.version("airbyte_cdk")
294        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
295        manifest_version_str = self._source_config.get("version")
296        if manifest_version_str is None:
297            raise RuntimeError(
298                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
299            )
300        manifest_version = self._parse_version(manifest_version_str, "manifest")
301
302        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
303            # Skipping version compatibility check on unreleased dev branch
304            pass
305        elif (cdk_version.major, cdk_version.minor) < (
306            manifest_version.major,
307            manifest_version.minor,
308        ):
309            raise ValidationError(
310                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
311                f"manifest may contain features that are not in the current CDK version."
312            )
313        elif (manifest_version.major, manifest_version.minor) < (0, 29):
314            raise ValidationError(
315                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
316                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
317                f"{cdk_version!s} which contains these breaking changes."
318            )
319
320    @staticmethod
321    def _parse_version(
322        version: str,
323        version_type: str,
324    ) -> Version:
325        """Takes a semantic version represented as a string and splits it into a tuple.
326
327        The fourth part (prerelease) is not returned in the tuple.
328
329        Returns:
330            Version: the parsed version object
331        """
332        try:
333            parsed_version = Version(version)
334        except InvalidVersion as ex:
335            raise ValidationError(
336                f"The {version_type} version '{version}' is not a valid version format."
337            ) from ex
338        else:
339            # No exception
340            return parsed_version
341
342    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
343        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
344        stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
345        for s in stream_configs:
346            if "type" not in s:
347                s["type"] = "DeclarativeStream"
348        return stream_configs
349
350    def _dynamic_stream_configs(
351        self, manifest: Mapping[str, Any], config: Mapping[str, Any]
352    ) -> List[Dict[str, Any]]:
353        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
354        dynamic_stream_configs: List[Dict[str, Any]] = []
355        seen_dynamic_streams: Set[str] = set()
356
357        for dynamic_definition in dynamic_stream_definitions:
358            components_resolver_config = dynamic_definition["components_resolver"]
359
360            if not components_resolver_config:
361                raise ValueError(
362                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
363                )
364
365            resolver_type = components_resolver_config.get("type")
366            if not resolver_type:
367                raise ValueError(
368                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
369                )
370
371            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
372                raise ValueError(
373                    f"Invalid components resolver type '{resolver_type}'. "
374                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
375                )
376
377            if "retriever" in components_resolver_config:
378                components_resolver_config["retriever"]["requester"]["use_cache"] = True
379
380            # Create a resolver for dynamic components based on type
381            components_resolver = self._constructor.create_component(
382                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
383            )
384
385            stream_template_config = dynamic_definition["stream_template"]
386
387            for dynamic_stream in components_resolver.resolve_components(
388                stream_template_config=stream_template_config
389            ):
390                if "type" not in dynamic_stream:
391                    dynamic_stream["type"] = "DeclarativeStream"
392
393                # Ensure that each stream is created with a unique name
394                name = dynamic_stream.get("name")
395
396                if not isinstance(name, str):
397                    raise ValueError(
398                        f"Expected stream name {name} to be a string, got {type(name)}."
399                    )
400
401                if name in seen_dynamic_streams:
402                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
403                    failure_type = FailureType.system_error
404
405                    if resolver_type == "ConfigComponentsResolver":
406                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
407                        failure_type = FailureType.config_error
408
409                    raise AirbyteTracedException(
410                        message=error_message,
411                        internal_message=error_message,
412                        failure_type=failure_type,
413                    )
414
415                seen_dynamic_streams.add(name)
416                dynamic_stream_configs.append(dynamic_stream)
417
418        return dynamic_stream_configs
419
420    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
421        self.logger.debug("declarative source created from manifest", extra=extra_args)
 61class ManifestDeclarativeSource(DeclarativeSource):
 62    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
 63
 64    def __init__(
 65        self,
 66        source_config: ConnectionDefinition,
 67        *,
 68        config: Mapping[str, Any] | None = None,
 69        debug: bool = False,
 70        emit_connector_builder_messages: bool = False,
 71        component_factory: Optional[ModelToComponentFactory] = None,
 72    ):
 73        """
 74        Args:
 75            config: The provided config dict.
 76            source_config: The manifest of low-code components that describe the source connector.
 77            debug: True if debug mode is enabled.
 78            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
 79            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
 80        """
 81        self.logger = logging.getLogger(f"airbyte.{self.name}")
 82        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
 83        manifest = dict(source_config)
 84        if "type" not in manifest:
 85            manifest["type"] = "DeclarativeSource"
 86
 87        # If custom components are needed, locate and/or register them.
 88        self.components_module: ModuleType | None = get_registered_components_module(config=config)
 89
 90        resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
 91        propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
 92            "", resolved_source_config, {}
 93        )
 94        self._source_config = propagated_source_config
 95        self._debug = debug
 96        self._emit_connector_builder_messages = emit_connector_builder_messages
 97        self._constructor = (
 98            component_factory
 99            if component_factory
100            else ModelToComponentFactory(
101                emit_connector_builder_messages,
102                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
103            )
104        )
105        self._message_repository = self._constructor.get_message_repository()
106        self._slice_logger: SliceLogger = (
107            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
108        )
109
110        self._validate_source()
111
112    @property
113    def resolved_manifest(self) -> Mapping[str, Any]:
114        return self._source_config
115
116    @property
117    def message_repository(self) -> MessageRepository:
118        return self._message_repository
119
120    @property
121    def connection_checker(self) -> ConnectionChecker:
122        check = self._source_config["check"]
123        if "type" not in check:
124            check["type"] = "CheckStream"
125        check_stream = self._constructor.create_component(
126            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
127            check,
128            dict(),
129            emit_connector_builder_messages=self._emit_connector_builder_messages,
130        )
131        if isinstance(check_stream, ConnectionChecker):
132            return check_stream
133        else:
134            raise ValueError(
135                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
136            )
137
138    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
139        self._emit_manifest_debug_message(
140            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
141        )
142
143        stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
144            self._source_config, config
145        )
146
147        api_budget_model = self._source_config.get("api_budget")
148        if api_budget_model:
149            self._constructor.set_api_budget(api_budget_model, config)
150
151        source_streams = [
152            self._constructor.create_component(
153                StateDelegatingStreamModel
154                if stream_config.get("type") == StateDelegatingStreamModel.__name__
155                else DeclarativeStreamModel,
156                stream_config,
157                config,
158                emit_connector_builder_messages=self._emit_connector_builder_messages,
159            )
160            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
161        ]
162
163        return source_streams
164
165    @staticmethod
166    def _initialize_cache_for_parent_streams(
167        stream_configs: List[Dict[str, Any]],
168    ) -> List[Dict[str, Any]]:
169        parent_streams = set()
170
171        def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
172            for parent_config in parent_configs:
173                parent_streams.add(parent_config["stream"]["name"])
174                if parent_config["stream"]["type"] == "StateDelegatingStream":
175                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
176                        "use_cache"
177                    ] = True
178                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
179                        "use_cache"
180                    ] = True
181                else:
182                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
183
184        for stream_config in stream_configs:
185            if stream_config.get("incremental_sync", {}).get("parent_stream"):
186                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
187                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
188                    "use_cache"
189                ] = True
190
191            elif stream_config.get("retriever", {}).get("partition_router", {}):
192                partition_router = stream_config["retriever"]["partition_router"]
193
194                if isinstance(partition_router, dict) and partition_router.get(
195                    "parent_stream_configs"
196                ):
197                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
198                elif isinstance(partition_router, list):
199                    for router in partition_router:
200                        if router.get("parent_stream_configs"):
201                            update_with_cache_parent_configs(router["parent_stream_configs"])
202
203        for stream_config in stream_configs:
204            if stream_config["name"] in parent_streams:
205                if stream_config["type"] == "StateDelegatingStream":
206                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
207                        True
208                    )
209                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
210                        True
211                    )
212                else:
213                    stream_config["retriever"]["requester"]["use_cache"] = True
214
215        return stream_configs
216
217    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
218        """
219        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
220        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
221        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
222        in the project root.
223        """
224        self._configure_logger_level(logger)
225        self._emit_manifest_debug_message(
226            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
227        )
228
229        spec = self._source_config.get("spec")
230        if spec:
231            if "type" not in spec:
232                spec["type"] = "Spec"
233            spec_component = self._constructor.create_component(SpecModel, spec, dict())
234            return spec_component.generate_spec()
235        else:
236            return super().spec(logger)
237
238    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
239        self._configure_logger_level(logger)
240        return super().check(logger, config)
241
242    def read(
243        self,
244        logger: logging.Logger,
245        config: Mapping[str, Any],
246        catalog: ConfiguredAirbyteCatalog,
247        state: Optional[List[AirbyteStateMessage]] = None,
248    ) -> Iterator[AirbyteMessage]:
249        self._configure_logger_level(logger)
250        yield from super().read(logger, config, catalog, state)
251
252    def _configure_logger_level(self, logger: logging.Logger) -> None:
253        """
254        Set the log level to logging.DEBUG if debug mode is enabled
255        """
256        if self._debug:
257            logger.setLevel(logging.DEBUG)
258
259    def _validate_source(self) -> None:
260        """
261        Validates the connector manifest against the declarative component schema
262        """
263        try:
264            raw_component_schema = pkgutil.get_data(
265                "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
266            )
267            if raw_component_schema is not None:
268                declarative_component_schema = yaml.load(
269                    raw_component_schema, Loader=yaml.SafeLoader
270                )
271            else:
272                raise RuntimeError(
273                    "Failed to read manifest component json schema required for validation"
274                )
275        except FileNotFoundError as e:
276            raise FileNotFoundError(
277                f"Failed to read manifest component json schema required for validation: {e}"
278            )
279
280        streams = self._source_config.get("streams")
281        dynamic_streams = self._source_config.get("dynamic_streams")
282        if not (streams or dynamic_streams):
283            raise ValidationError(
284                f"A valid manifest should have at least one stream defined. Got {streams}"
285            )
286
287        try:
288            validate(self._source_config, declarative_component_schema)
289        except ValidationError as e:
290            raise ValidationError(
291                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
292            ) from e
293
294        cdk_version_str = metadata.version("airbyte_cdk")
295        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
296        manifest_version_str = self._source_config.get("version")
297        if manifest_version_str is None:
298            raise RuntimeError(
299                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
300            )
301        manifest_version = self._parse_version(manifest_version_str, "manifest")
302
303        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
304            # Skipping version compatibility check on unreleased dev branch
305            pass
306        elif (cdk_version.major, cdk_version.minor) < (
307            manifest_version.major,
308            manifest_version.minor,
309        ):
310            raise ValidationError(
311                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
312                f"manifest may contain features that are not in the current CDK version."
313            )
314        elif (manifest_version.major, manifest_version.minor) < (0, 29):
315            raise ValidationError(
316                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
317                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
318                f"{cdk_version!s} which contains these breaking changes."
319            )
320
321    @staticmethod
322    def _parse_version(
323        version: str,
324        version_type: str,
325    ) -> Version:
326        """Takes a semantic version represented as a string and splits it into a tuple.
327
328        The fourth part (prerelease) is not returned in the tuple.
329
330        Returns:
331            Version: the parsed version object
332        """
333        try:
334            parsed_version = Version(version)
335        except InvalidVersion as ex:
336            raise ValidationError(
337                f"The {version_type} version '{version}' is not a valid version format."
338            ) from ex
339        else:
340            # No exception
341            return parsed_version
342
343    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
344        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
345        stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
346        for s in stream_configs:
347            if "type" not in s:
348                s["type"] = "DeclarativeStream"
349        return stream_configs
350
351    def _dynamic_stream_configs(
352        self, manifest: Mapping[str, Any], config: Mapping[str, Any]
353    ) -> List[Dict[str, Any]]:
354        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
355        dynamic_stream_configs: List[Dict[str, Any]] = []
356        seen_dynamic_streams: Set[str] = set()
357
358        for dynamic_definition in dynamic_stream_definitions:
359            components_resolver_config = dynamic_definition["components_resolver"]
360
361            if not components_resolver_config:
362                raise ValueError(
363                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
364                )
365
366            resolver_type = components_resolver_config.get("type")
367            if not resolver_type:
368                raise ValueError(
369                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
370                )
371
372            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
373                raise ValueError(
374                    f"Invalid components resolver type '{resolver_type}'. "
375                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
376                )
377
378            if "retriever" in components_resolver_config:
379                components_resolver_config["retriever"]["requester"]["use_cache"] = True
380
381            # Create a resolver for dynamic components based on type
382            components_resolver = self._constructor.create_component(
383                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
384            )
385
386            stream_template_config = dynamic_definition["stream_template"]
387
388            for dynamic_stream in components_resolver.resolve_components(
389                stream_template_config=stream_template_config
390            ):
391                if "type" not in dynamic_stream:
392                    dynamic_stream["type"] = "DeclarativeStream"
393
394                # Ensure that each stream is created with a unique name
395                name = dynamic_stream.get("name")
396
397                if not isinstance(name, str):
398                    raise ValueError(
399                        f"Expected stream name {name} to be a string, got {type(name)}."
400                    )
401
402                if name in seen_dynamic_streams:
403                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
404                    failure_type = FailureType.system_error
405
406                    if resolver_type == "ConfigComponentsResolver":
407                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
408                        failure_type = FailureType.config_error
409
410                    raise AirbyteTracedException(
411                        message=error_message,
412                        internal_message=error_message,
413                        failure_type=failure_type,
414                    )
415
416                seen_dynamic_streams.add(name)
417                dynamic_stream_configs.append(dynamic_stream)
418
419        return dynamic_stream_configs
420
421    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
422        self.logger.debug("declarative source created from manifest", extra=extra_args)

Declarative source defined by a manifest of low-code components that define source connector behavior

ManifestDeclarativeSource( source_config: Mapping[str, Any], *, config: Optional[Mapping[str, Any]] = None, debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[airbyte_cdk.sources.declarative.parsers.model_to_component_factory.ModelToComponentFactory] = None)
 64    def __init__(
 65        self,
 66        source_config: ConnectionDefinition,
 67        *,
 68        config: Mapping[str, Any] | None = None,
 69        debug: bool = False,
 70        emit_connector_builder_messages: bool = False,
 71        component_factory: Optional[ModelToComponentFactory] = None,
 72    ):
 73        """
 74        Args:
 75            config: The provided config dict.
 76            source_config: The manifest of low-code components that describe the source connector.
 77            debug: True if debug mode is enabled.
 78            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
 79            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
 80        """
 81        self.logger = logging.getLogger(f"airbyte.{self.name}")
 82        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
 83        manifest = dict(source_config)
 84        if "type" not in manifest:
 85            manifest["type"] = "DeclarativeSource"
 86
 87        # If custom components are needed, locate and/or register them.
 88        self.components_module: ModuleType | None = get_registered_components_module(config=config)
 89
 90        resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
 91        propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
 92            "", resolved_source_config, {}
 93        )
 94        self._source_config = propagated_source_config
 95        self._debug = debug
 96        self._emit_connector_builder_messages = emit_connector_builder_messages
 97        self._constructor = (
 98            component_factory
 99            if component_factory
100            else ModelToComponentFactory(
101                emit_connector_builder_messages,
102                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
103            )
104        )
105        self._message_repository = self._constructor.get_message_repository()
106        self._slice_logger: SliceLogger = (
107            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
108        )
109
110        self._validate_source()
Arguments:
  • config: The provided config dict.
  • source_config: The manifest of low-code components that describe the source connector.
  • debug: True if debug mode is enabled.
  • emit_connector_builder_messages: True if messages should be emitted to the connector builder.
  • component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
112    @property
113    def resolved_manifest(self) -> Mapping[str, Any]:
114        return self._source_config
message_repository: airbyte_cdk.MessageRepository
116    @property
117    def message_repository(self) -> MessageRepository:
118        return self._message_repository
120    @property
121    def connection_checker(self) -> ConnectionChecker:
122        check = self._source_config["check"]
123        if "type" not in check:
124            check["type"] = "CheckStream"
125        check_stream = self._constructor.create_component(
126            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
127            check,
128            dict(),
129            emit_connector_builder_messages=self._emit_connector_builder_messages,
130        )
131        if isinstance(check_stream, ConnectionChecker):
132            return check_stream
133        else:
134            raise ValueError(
135                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
136            )

Returns the ConnectionChecker to use for the check operation

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.Stream]:
138    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
139        self._emit_manifest_debug_message(
140            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
141        )
142
143        stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
144            self._source_config, config
145        )
146
147        api_budget_model = self._source_config.get("api_budget")
148        if api_budget_model:
149            self._constructor.set_api_budget(api_budget_model, config)
150
151        source_streams = [
152            self._constructor.create_component(
153                StateDelegatingStreamModel
154                if stream_config.get("type") == StateDelegatingStreamModel.__name__
155                else DeclarativeStreamModel,
156                stream_config,
157                config,
158                emit_connector_builder_messages=self._emit_connector_builder_messages,
159            )
160            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
161        ]
162
163        return source_streams
Parameters
  • config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns

A list of the streams in this source connector.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
217    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
218        """
219        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
220        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
221        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
222        in the project root.
223        """
224        self._configure_logger_level(logger)
225        self._emit_manifest_debug_message(
226            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
227        )
228
229        spec = self._source_config.get("spec")
230        if spec:
231            if "type" not in spec:
232                spec["type"] = "Spec"
233            spec_component = self._constructor.create_component(SpecModel, spec, dict())
234            return spec_component.generate_spec()
235        else:
236            return super().spec(logger)

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
238    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
239        self._configure_logger_level(logger)
240        return super().check(logger, config)

Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
242    def read(
243        self,
244        logger: logging.Logger,
245        config: Mapping[str, Any],
246        catalog: ConfiguredAirbyteCatalog,
247        state: Optional[List[AirbyteStateMessage]] = None,
248    ) -> Iterator[AirbyteMessage]:
249        self._configure_logger_level(logger)
250        yield from super().read(logger, config, catalog, state)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.