airbyte_cdk.sources.declarative.manifest_declarative_source

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5import json
  6import logging
  7import pkgutil
  8from copy import deepcopy
  9from importlib import metadata
 10from types import ModuleType
 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
 12
 13import yaml
 14from jsonschema.exceptions import ValidationError
 15from jsonschema.validators import validate
 16from packaging.version import InvalidVersion, Version
 17
 18from airbyte_cdk.models import (
 19    AirbyteConnectionStatus,
 20    AirbyteMessage,
 21    AirbyteStateMessage,
 22    ConfiguredAirbyteCatalog,
 23    ConnectorSpecification,
 24    FailureType,
 25)
 26from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 27from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 28from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
 29from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 30    DeclarativeStream as DeclarativeStreamModel,
 31)
 32from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
 33from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 34    StateDelegatingStream as StateDelegatingStreamModel,
 35)
 36from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 37    get_registered_components_module,
 38)
 39from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 40    ManifestComponentTransformer,
 41)
 42from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 43    ManifestReferenceResolver,
 44)
 45from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 46    ModelToComponentFactory,
 47)
 48from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 49from airbyte_cdk.sources.message import MessageRepository
 50from airbyte_cdk.sources.streams.core import Stream
 51from airbyte_cdk.sources.types import ConnectionDefinition
 52from airbyte_cdk.sources.utils.slice_logger import (
 53    AlwaysLogSliceLogger,
 54    DebugSliceLogger,
 55    SliceLogger,
 56)
 57from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 58
 59
 60class ManifestDeclarativeSource(DeclarativeSource):
 61    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
 62
 63    def __init__(
 64        self,
 65        source_config: ConnectionDefinition,
 66        *,
 67        config: Mapping[str, Any] | None = None,
 68        debug: bool = False,
 69        emit_connector_builder_messages: bool = False,
 70        component_factory: Optional[ModelToComponentFactory] = None,
 71    ):
 72        """
 73        Args:
 74            config: The provided config dict.
 75            source_config: The manifest of low-code components that describe the source connector.
 76            debug: True if debug mode is enabled.
 77            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
 78            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
 79        """
 80        self.logger = logging.getLogger(f"airbyte.{self.name}")
 81        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
 82        manifest = dict(source_config)
 83        if "type" not in manifest:
 84            manifest["type"] = "DeclarativeSource"
 85
 86        # If custom components are needed, locate and/or register them.
 87        self.components_module: ModuleType | None = get_registered_components_module(config=config)
 88
 89        resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
 90        propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
 91            "", resolved_source_config, {}
 92        )
 93        self._source_config = propagated_source_config
 94        self._debug = debug
 95        self._emit_connector_builder_messages = emit_connector_builder_messages
 96        self._constructor = (
 97            component_factory
 98            if component_factory
 99            else ModelToComponentFactory(
100                emit_connector_builder_messages,
101                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
102            )
103        )
104        self._message_repository = self._constructor.get_message_repository()
105        self._slice_logger: SliceLogger = (
106            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
107        )
108
109        self._config = config or {}
110        self._validate_source()
111
112    @property
113    def resolved_manifest(self) -> Mapping[str, Any]:
114        return self._source_config
115
116    @property
117    def message_repository(self) -> MessageRepository:
118        return self._message_repository
119
120    @property
121    def dynamic_streams(self) -> List[Dict[str, Any]]:
122        return self._dynamic_stream_configs(
123            manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
124        )
125
126    @property
127    def connection_checker(self) -> ConnectionChecker:
128        check = self._source_config["check"]
129        if "type" not in check:
130            check["type"] = "CheckStream"
131        check_stream = self._constructor.create_component(
132            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
133            check,
134            dict(),
135            emit_connector_builder_messages=self._emit_connector_builder_messages,
136        )
137        if isinstance(check_stream, ConnectionChecker):
138            return check_stream
139        else:
140            raise ValueError(
141                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
142            )
143
144    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
145        self._emit_manifest_debug_message(
146            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
147        )
148
149        stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
150            self._source_config, config
151        )
152
153        api_budget_model = self._source_config.get("api_budget")
154        if api_budget_model:
155            self._constructor.set_api_budget(api_budget_model, config)
156
157        source_streams = [
158            self._constructor.create_component(
159                StateDelegatingStreamModel
160                if stream_config.get("type") == StateDelegatingStreamModel.__name__
161                else DeclarativeStreamModel,
162                stream_config,
163                config,
164                emit_connector_builder_messages=self._emit_connector_builder_messages,
165            )
166            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
167        ]
168
169        return source_streams
170
171    @staticmethod
172    def _initialize_cache_for_parent_streams(
173        stream_configs: List[Dict[str, Any]],
174    ) -> List[Dict[str, Any]]:
175        parent_streams = set()
176
177        def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
178            for parent_config in parent_configs:
179                parent_streams.add(parent_config["stream"]["name"])
180                if parent_config["stream"]["type"] == "StateDelegatingStream":
181                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
182                        "use_cache"
183                    ] = True
184                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
185                        "use_cache"
186                    ] = True
187                else:
188                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
189
190        for stream_config in stream_configs:
191            if stream_config.get("incremental_sync", {}).get("parent_stream"):
192                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
193                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
194                    "use_cache"
195                ] = True
196
197            elif stream_config.get("retriever", {}).get("partition_router", {}):
198                partition_router = stream_config["retriever"]["partition_router"]
199
200                if isinstance(partition_router, dict) and partition_router.get(
201                    "parent_stream_configs"
202                ):
203                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
204                elif isinstance(partition_router, list):
205                    for router in partition_router:
206                        if router.get("parent_stream_configs"):
207                            update_with_cache_parent_configs(router["parent_stream_configs"])
208
209        for stream_config in stream_configs:
210            if stream_config["name"] in parent_streams:
211                if stream_config["type"] == "StateDelegatingStream":
212                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
213                        True
214                    )
215                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
216                        True
217                    )
218                else:
219                    stream_config["retriever"]["requester"]["use_cache"] = True
220
221        return stream_configs
222
223    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
224        """
225        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
226        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
227        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
228        in the project root.
229        """
230        self._configure_logger_level(logger)
231        self._emit_manifest_debug_message(
232            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
233        )
234
235        spec = self._source_config.get("spec")
236        if spec:
237            if "type" not in spec:
238                spec["type"] = "Spec"
239            spec_component = self._constructor.create_component(SpecModel, spec, dict())
240            return spec_component.generate_spec()
241        else:
242            return super().spec(logger)
243
244    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
245        self._configure_logger_level(logger)
246        return super().check(logger, config)
247
248    def read(
249        self,
250        logger: logging.Logger,
251        config: Mapping[str, Any],
252        catalog: ConfiguredAirbyteCatalog,
253        state: Optional[List[AirbyteStateMessage]] = None,
254    ) -> Iterator[AirbyteMessage]:
255        self._configure_logger_level(logger)
256        yield from super().read(logger, config, catalog, state)
257
258    def _configure_logger_level(self, logger: logging.Logger) -> None:
259        """
260        Set the log level to logging.DEBUG if debug mode is enabled
261        """
262        if self._debug:
263            logger.setLevel(logging.DEBUG)
264
265    def _validate_source(self) -> None:
266        """
267        Validates the connector manifest against the declarative component schema
268        """
269        try:
270            raw_component_schema = pkgutil.get_data(
271                "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
272            )
273            if raw_component_schema is not None:
274                declarative_component_schema = yaml.load(
275                    raw_component_schema, Loader=yaml.SafeLoader
276                )
277            else:
278                raise RuntimeError(
279                    "Failed to read manifest component json schema required for validation"
280                )
281        except FileNotFoundError as e:
282            raise FileNotFoundError(
283                f"Failed to read manifest component json schema required for validation: {e}"
284            )
285
286        streams = self._source_config.get("streams")
287        dynamic_streams = self._source_config.get("dynamic_streams")
288        if not (streams or dynamic_streams):
289            raise ValidationError(
290                f"A valid manifest should have at least one stream defined. Got {streams}"
291            )
292
293        try:
294            validate(self._source_config, declarative_component_schema)
295        except ValidationError as e:
296            raise ValidationError(
297                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
298            ) from e
299
300        cdk_version_str = metadata.version("airbyte_cdk")
301        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
302        manifest_version_str = self._source_config.get("version")
303        if manifest_version_str is None:
304            raise RuntimeError(
305                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
306            )
307        manifest_version = self._parse_version(manifest_version_str, "manifest")
308
309        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
310            # Skipping version compatibility check on unreleased dev branch
311            pass
312        elif (cdk_version.major, cdk_version.minor) < (
313            manifest_version.major,
314            manifest_version.minor,
315        ):
316            raise ValidationError(
317                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
318                f"manifest may contain features that are not in the current CDK version."
319            )
320        elif (manifest_version.major, manifest_version.minor) < (0, 29):
321            raise ValidationError(
322                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
323                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
324                f"{cdk_version!s} which contains these breaking changes."
325            )
326
327    @staticmethod
328    def _parse_version(
329        version: str,
330        version_type: str,
331    ) -> Version:
332        """Takes a semantic version represented as a string and splits it into a tuple.
333
334        The fourth part (prerelease) is not returned in the tuple.
335
336        Returns:
337            Version: the parsed version object
338        """
339        try:
340            parsed_version = Version(version)
341        except InvalidVersion as ex:
342            raise ValidationError(
343                f"The {version_type} version '{version}' is not a valid version format."
344            ) from ex
345        else:
346            # No exception
347            return parsed_version
348
349    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
350        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
351        stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
352        for s in stream_configs:
353            if "type" not in s:
354                s["type"] = "DeclarativeStream"
355        return stream_configs
356
357    def _dynamic_stream_configs(
358        self,
359        manifest: Mapping[str, Any],
360        config: Mapping[str, Any],
361        with_dynamic_stream_name: Optional[bool] = None,
362    ) -> List[Dict[str, Any]]:
363        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
364        dynamic_stream_configs: List[Dict[str, Any]] = []
365        seen_dynamic_streams: Set[str] = set()
366
367        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
368            components_resolver_config = dynamic_definition["components_resolver"]
369
370            if not components_resolver_config:
371                raise ValueError(
372                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
373                )
374
375            resolver_type = components_resolver_config.get("type")
376            if not resolver_type:
377                raise ValueError(
378                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
379                )
380
381            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
382                raise ValueError(
383                    f"Invalid components resolver type '{resolver_type}'. "
384                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
385                )
386
387            if "retriever" in components_resolver_config:
388                components_resolver_config["retriever"]["requester"]["use_cache"] = True
389
390            # Create a resolver for dynamic components based on type
391            components_resolver = self._constructor.create_component(
392                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
393            )
394
395            stream_template_config = dynamic_definition["stream_template"]
396
397            for dynamic_stream in components_resolver.resolve_components(
398                stream_template_config=stream_template_config
399            ):
400                dynamic_stream = {
401                    **ManifestComponentTransformer().propagate_types_and_parameters(
402                        "", dynamic_stream, {}, use_parent_parameters=True
403                    )
404                }
405
406                if "type" not in dynamic_stream:
407                    dynamic_stream["type"] = "DeclarativeStream"
408
409                # Ensure that each stream is created with a unique name
410                name = dynamic_stream.get("name")
411
412                if with_dynamic_stream_name:
413                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
414                        "name", f"dynamic_stream_{dynamic_definition_index}"
415                    )
416
417                if not isinstance(name, str):
418                    raise ValueError(
419                        f"Expected stream name {name} to be a string, got {type(name)}."
420                    )
421
422                if name in seen_dynamic_streams:
423                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
424                    failure_type = FailureType.system_error
425
426                    if resolver_type == "ConfigComponentsResolver":
427                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
428                        failure_type = FailureType.config_error
429
430                    raise AirbyteTracedException(
431                        message=error_message,
432                        internal_message=error_message,
433                        failure_type=failure_type,
434                    )
435
436                seen_dynamic_streams.add(name)
437                dynamic_stream_configs.append(dynamic_stream)
438
439        return dynamic_stream_configs
440
441    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
442        self.logger.debug("declarative source created from manifest", extra=extra_args)
 61class ManifestDeclarativeSource(DeclarativeSource):
 62    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
 63
 64    def __init__(
 65        self,
 66        source_config: ConnectionDefinition,
 67        *,
 68        config: Mapping[str, Any] | None = None,
 69        debug: bool = False,
 70        emit_connector_builder_messages: bool = False,
 71        component_factory: Optional[ModelToComponentFactory] = None,
 72    ):
 73        """
 74        Args:
 75            config: The provided config dict.
 76            source_config: The manifest of low-code components that describe the source connector.
 77            debug: True if debug mode is enabled.
 78            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
 79            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
 80        """
 81        self.logger = logging.getLogger(f"airbyte.{self.name}")
 82        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
 83        manifest = dict(source_config)
 84        if "type" not in manifest:
 85            manifest["type"] = "DeclarativeSource"
 86
 87        # If custom components are needed, locate and/or register them.
 88        self.components_module: ModuleType | None = get_registered_components_module(config=config)
 89
 90        resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
 91        propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
 92            "", resolved_source_config, {}
 93        )
 94        self._source_config = propagated_source_config
 95        self._debug = debug
 96        self._emit_connector_builder_messages = emit_connector_builder_messages
 97        self._constructor = (
 98            component_factory
 99            if component_factory
100            else ModelToComponentFactory(
101                emit_connector_builder_messages,
102                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
103            )
104        )
105        self._message_repository = self._constructor.get_message_repository()
106        self._slice_logger: SliceLogger = (
107            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
108        )
109
110        self._config = config or {}
111        self._validate_source()
112
113    @property
114    def resolved_manifest(self) -> Mapping[str, Any]:
115        return self._source_config
116
117    @property
118    def message_repository(self) -> MessageRepository:
119        return self._message_repository
120
121    @property
122    def dynamic_streams(self) -> List[Dict[str, Any]]:
123        return self._dynamic_stream_configs(
124            manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
125        )
126
127    @property
128    def connection_checker(self) -> ConnectionChecker:
129        check = self._source_config["check"]
130        if "type" not in check:
131            check["type"] = "CheckStream"
132        check_stream = self._constructor.create_component(
133            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
134            check,
135            dict(),
136            emit_connector_builder_messages=self._emit_connector_builder_messages,
137        )
138        if isinstance(check_stream, ConnectionChecker):
139            return check_stream
140        else:
141            raise ValueError(
142                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
143            )
144
145    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
146        self._emit_manifest_debug_message(
147            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
148        )
149
150        stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
151            self._source_config, config
152        )
153
154        api_budget_model = self._source_config.get("api_budget")
155        if api_budget_model:
156            self._constructor.set_api_budget(api_budget_model, config)
157
158        source_streams = [
159            self._constructor.create_component(
160                StateDelegatingStreamModel
161                if stream_config.get("type") == StateDelegatingStreamModel.__name__
162                else DeclarativeStreamModel,
163                stream_config,
164                config,
165                emit_connector_builder_messages=self._emit_connector_builder_messages,
166            )
167            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
168        ]
169
170        return source_streams
171
172    @staticmethod
173    def _initialize_cache_for_parent_streams(
174        stream_configs: List[Dict[str, Any]],
175    ) -> List[Dict[str, Any]]:
176        parent_streams = set()
177
178        def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
179            for parent_config in parent_configs:
180                parent_streams.add(parent_config["stream"]["name"])
181                if parent_config["stream"]["type"] == "StateDelegatingStream":
182                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
183                        "use_cache"
184                    ] = True
185                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
186                        "use_cache"
187                    ] = True
188                else:
189                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
190
191        for stream_config in stream_configs:
192            if stream_config.get("incremental_sync", {}).get("parent_stream"):
193                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
194                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
195                    "use_cache"
196                ] = True
197
198            elif stream_config.get("retriever", {}).get("partition_router", {}):
199                partition_router = stream_config["retriever"]["partition_router"]
200
201                if isinstance(partition_router, dict) and partition_router.get(
202                    "parent_stream_configs"
203                ):
204                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
205                elif isinstance(partition_router, list):
206                    for router in partition_router:
207                        if router.get("parent_stream_configs"):
208                            update_with_cache_parent_configs(router["parent_stream_configs"])
209
210        for stream_config in stream_configs:
211            if stream_config["name"] in parent_streams:
212                if stream_config["type"] == "StateDelegatingStream":
213                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
214                        True
215                    )
216                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
217                        True
218                    )
219                else:
220                    stream_config["retriever"]["requester"]["use_cache"] = True
221
222        return stream_configs
223
224    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
225        """
226        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
227        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
228        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
229        in the project root.
230        """
231        self._configure_logger_level(logger)
232        self._emit_manifest_debug_message(
233            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
234        )
235
236        spec = self._source_config.get("spec")
237        if spec:
238            if "type" not in spec:
239                spec["type"] = "Spec"
240            spec_component = self._constructor.create_component(SpecModel, spec, dict())
241            return spec_component.generate_spec()
242        else:
243            return super().spec(logger)
244
245    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
246        self._configure_logger_level(logger)
247        return super().check(logger, config)
248
249    def read(
250        self,
251        logger: logging.Logger,
252        config: Mapping[str, Any],
253        catalog: ConfiguredAirbyteCatalog,
254        state: Optional[List[AirbyteStateMessage]] = None,
255    ) -> Iterator[AirbyteMessage]:
256        self._configure_logger_level(logger)
257        yield from super().read(logger, config, catalog, state)
258
259    def _configure_logger_level(self, logger: logging.Logger) -> None:
260        """
261        Set the log level to logging.DEBUG if debug mode is enabled
262        """
263        if self._debug:
264            logger.setLevel(logging.DEBUG)
265
266    def _validate_source(self) -> None:
267        """
268        Validates the connector manifest against the declarative component schema
269        """
270        try:
271            raw_component_schema = pkgutil.get_data(
272                "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
273            )
274            if raw_component_schema is not None:
275                declarative_component_schema = yaml.load(
276                    raw_component_schema, Loader=yaml.SafeLoader
277                )
278            else:
279                raise RuntimeError(
280                    "Failed to read manifest component json schema required for validation"
281                )
282        except FileNotFoundError as e:
283            raise FileNotFoundError(
284                f"Failed to read manifest component json schema required for validation: {e}"
285            )
286
287        streams = self._source_config.get("streams")
288        dynamic_streams = self._source_config.get("dynamic_streams")
289        if not (streams or dynamic_streams):
290            raise ValidationError(
291                f"A valid manifest should have at least one stream defined. Got {streams}"
292            )
293
294        try:
295            validate(self._source_config, declarative_component_schema)
296        except ValidationError as e:
297            raise ValidationError(
298                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
299            ) from e
300
301        cdk_version_str = metadata.version("airbyte_cdk")
302        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
303        manifest_version_str = self._source_config.get("version")
304        if manifest_version_str is None:
305            raise RuntimeError(
306                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
307            )
308        manifest_version = self._parse_version(manifest_version_str, "manifest")
309
310        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
311            # Skipping version compatibility check on unreleased dev branch
312            pass
313        elif (cdk_version.major, cdk_version.minor) < (
314            manifest_version.major,
315            manifest_version.minor,
316        ):
317            raise ValidationError(
318                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
319                f"manifest may contain features that are not in the current CDK version."
320            )
321        elif (manifest_version.major, manifest_version.minor) < (0, 29):
322            raise ValidationError(
323                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
324                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
325                f"{cdk_version!s} which contains these breaking changes."
326            )
327
328    @staticmethod
329    def _parse_version(
330        version: str,
331        version_type: str,
332    ) -> Version:
333        """Takes a semantic version represented as a string and splits it into a tuple.
334
335        The fourth part (prerelease) is not returned in the tuple.
336
337        Returns:
338            Version: the parsed version object
339        """
340        try:
341            parsed_version = Version(version)
342        except InvalidVersion as ex:
343            raise ValidationError(
344                f"The {version_type} version '{version}' is not a valid version format."
345            ) from ex
346        else:
347            # No exception
348            return parsed_version
349
350    def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
351        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
352        stream_configs: List[Dict[str, Any]] = manifest.get("streams", [])
353        for s in stream_configs:
354            if "type" not in s:
355                s["type"] = "DeclarativeStream"
356        return stream_configs
357
358    def _dynamic_stream_configs(
359        self,
360        manifest: Mapping[str, Any],
361        config: Mapping[str, Any],
362        with_dynamic_stream_name: Optional[bool] = None,
363    ) -> List[Dict[str, Any]]:
364        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
365        dynamic_stream_configs: List[Dict[str, Any]] = []
366        seen_dynamic_streams: Set[str] = set()
367
368        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
369            components_resolver_config = dynamic_definition["components_resolver"]
370
371            if not components_resolver_config:
372                raise ValueError(
373                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
374                )
375
376            resolver_type = components_resolver_config.get("type")
377            if not resolver_type:
378                raise ValueError(
379                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
380                )
381
382            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
383                raise ValueError(
384                    f"Invalid components resolver type '{resolver_type}'. "
385                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
386                )
387
388            if "retriever" in components_resolver_config:
389                components_resolver_config["retriever"]["requester"]["use_cache"] = True
390
391            # Create a resolver for dynamic components based on type
392            components_resolver = self._constructor.create_component(
393                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
394            )
395
396            stream_template_config = dynamic_definition["stream_template"]
397
398            for dynamic_stream in components_resolver.resolve_components(
399                stream_template_config=stream_template_config
400            ):
401                dynamic_stream = {
402                    **ManifestComponentTransformer().propagate_types_and_parameters(
403                        "", dynamic_stream, {}, use_parent_parameters=True
404                    )
405                }
406
407                if "type" not in dynamic_stream:
408                    dynamic_stream["type"] = "DeclarativeStream"
409
410                # Ensure that each stream is created with a unique name
411                name = dynamic_stream.get("name")
412
413                if with_dynamic_stream_name:
414                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
415                        "name", f"dynamic_stream_{dynamic_definition_index}"
416                    )
417
418                if not isinstance(name, str):
419                    raise ValueError(
420                        f"Expected stream name {name} to be a string, got {type(name)}."
421                    )
422
423                if name in seen_dynamic_streams:
424                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
425                    failure_type = FailureType.system_error
426
427                    if resolver_type == "ConfigComponentsResolver":
428                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
429                        failure_type = FailureType.config_error
430
431                    raise AirbyteTracedException(
432                        message=error_message,
433                        internal_message=error_message,
434                        failure_type=failure_type,
435                    )
436
437                seen_dynamic_streams.add(name)
438                dynamic_stream_configs.append(dynamic_stream)
439
440        return dynamic_stream_configs
441
442    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
443        self.logger.debug("declarative source created from manifest", extra=extra_args)

Declarative source defined by a manifest of low-code components that define source connector behavior

ManifestDeclarativeSource( source_config: Mapping[str, Any], *, config: Optional[Mapping[str, Any]] = None, debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[airbyte_cdk.sources.declarative.parsers.model_to_component_factory.ModelToComponentFactory] = None)
 64    def __init__(
 65        self,
 66        source_config: ConnectionDefinition,
 67        *,
 68        config: Mapping[str, Any] | None = None,
 69        debug: bool = False,
 70        emit_connector_builder_messages: bool = False,
 71        component_factory: Optional[ModelToComponentFactory] = None,
 72    ):
 73        """
 74        Args:
 75            config: The provided config dict.
 76            source_config: The manifest of low-code components that describe the source connector.
 77            debug: True if debug mode is enabled.
 78            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
 79            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
 80        """
 81        self.logger = logging.getLogger(f"airbyte.{self.name}")
 82        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
 83        manifest = dict(source_config)
 84        if "type" not in manifest:
 85            manifest["type"] = "DeclarativeSource"
 86
 87        # If custom components are needed, locate and/or register them.
 88        self.components_module: ModuleType | None = get_registered_components_module(config=config)
 89
 90        resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
 91        propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
 92            "", resolved_source_config, {}
 93        )
 94        self._source_config = propagated_source_config
 95        self._debug = debug
 96        self._emit_connector_builder_messages = emit_connector_builder_messages
 97        self._constructor = (
 98            component_factory
 99            if component_factory
100            else ModelToComponentFactory(
101                emit_connector_builder_messages,
102                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
103            )
104        )
105        self._message_repository = self._constructor.get_message_repository()
106        self._slice_logger: SliceLogger = (
107            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
108        )
109
110        self._config = config or {}
111        self._validate_source()
Arguments:
  • config: The provided config dict.
  • source_config: The manifest of low-code components that describe the source connector.
  • debug: True if debug mode is enabled.
  • emit_connector_builder_messages: True if messages should be emitted to the connector builder.
  • component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
113    @property
114    def resolved_manifest(self) -> Mapping[str, Any]:
115        return self._source_config
message_repository: airbyte_cdk.MessageRepository
117    @property
118    def message_repository(self) -> MessageRepository:
119        return self._message_repository
dynamic_streams: List[Dict[str, Any]]
121    @property
122    def dynamic_streams(self) -> List[Dict[str, Any]]:
123        return self._dynamic_stream_configs(
124            manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
125        )
127    @property
128    def connection_checker(self) -> ConnectionChecker:
129        check = self._source_config["check"]
130        if "type" not in check:
131            check["type"] = "CheckStream"
132        check_stream = self._constructor.create_component(
133            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
134            check,
135            dict(),
136            emit_connector_builder_messages=self._emit_connector_builder_messages,
137        )
138        if isinstance(check_stream, ConnectionChecker):
139            return check_stream
140        else:
141            raise ValueError(
142                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
143            )

Returns the ConnectionChecker to use for the check operation

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.Stream]:
145    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
146        self._emit_manifest_debug_message(
147            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
148        )
149
150        stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
151            self._source_config, config
152        )
153
154        api_budget_model = self._source_config.get("api_budget")
155        if api_budget_model:
156            self._constructor.set_api_budget(api_budget_model, config)
157
158        source_streams = [
159            self._constructor.create_component(
160                StateDelegatingStreamModel
161                if stream_config.get("type") == StateDelegatingStreamModel.__name__
162                else DeclarativeStreamModel,
163                stream_config,
164                config,
165                emit_connector_builder_messages=self._emit_connector_builder_messages,
166            )
167            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
168        ]
169
170        return source_streams
Parameters
  • config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns

A list of the streams in this source connector.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
224    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
225        """
226        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
227        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
228        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
229        in the project root.
230        """
231        self._configure_logger_level(logger)
232        self._emit_manifest_debug_message(
233            extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
234        )
235
236        spec = self._source_config.get("spec")
237        if spec:
238            if "type" not in spec:
239                spec["type"] = "Spec"
240            spec_component = self._constructor.create_component(SpecModel, spec, dict())
241            return spec_component.generate_spec()
242        else:
243            return super().spec(logger)

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
245    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
246        self._configure_logger_level(logger)
247        return super().check(logger, config)

Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
249    def read(
250        self,
251        logger: logging.Logger,
252        config: Mapping[str, Any],
253        catalog: ConfiguredAirbyteCatalog,
254        state: Optional[List[AirbyteStateMessage]] = None,
255    ) -> Iterator[AirbyteMessage]:
256        self._configure_logger_level(logger)
257        yield from super().read(logger, config, catalog, state)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.