airbyte_cdk.sources.declarative.manifest_declarative_source

  1#
  2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  3#
  4
  5import json
  6import logging
  7import pkgutil
  8from copy import deepcopy
  9from importlib import metadata
 10from types import ModuleType
 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
 12
 13import orjson
 14import yaml
 15from jsonschema.exceptions import ValidationError
 16from jsonschema.validators import validate
 17from packaging.version import InvalidVersion, Version
 18
 19from airbyte_cdk.config_observation import create_connector_config_control_message
 20from airbyte_cdk.connector_builder.models import (
 21    LogMessage as ConnectorBuilderLogMessage,
 22)
 23from airbyte_cdk.manifest_migrations.migration_handler import (
 24    ManifestMigrationHandler,
 25)
 26from airbyte_cdk.models import (
 27    AirbyteConnectionStatus,
 28    AirbyteMessage,
 29    AirbyteStateMessage,
 30    ConfiguredAirbyteCatalog,
 31    ConnectorSpecification,
 32    FailureType,
 33)
 34from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
 35from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 36from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 37from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
 38from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
 39from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 40    ConditionalStreams as ConditionalStreamsModel,
 41)
 42from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 43    DeclarativeStream as DeclarativeStreamModel,
 44)
 45from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 46    Spec as SpecModel,
 47)
 48from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 49    StateDelegatingStream as StateDelegatingStreamModel,
 50)
 51from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 52    get_registered_components_module,
 53)
 54from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 55    ManifestComponentTransformer,
 56)
 57from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
 58    ManifestNormalizer,
 59)
 60from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 61    ManifestReferenceResolver,
 62)
 63from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 64    ModelToComponentFactory,
 65)
 66from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 67from airbyte_cdk.sources.declarative.spec.spec import Spec
 68from airbyte_cdk.sources.message import MessageRepository
 69from airbyte_cdk.sources.streams.core import Stream
 70from airbyte_cdk.sources.types import Config, ConnectionDefinition
 71from airbyte_cdk.sources.utils.slice_logger import (
 72    AlwaysLogSliceLogger,
 73    DebugSliceLogger,
 74    SliceLogger,
 75)
 76from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 77
 78
 79def _get_declarative_component_schema() -> Dict[str, Any]:
 80    try:
 81        raw_component_schema = pkgutil.get_data(
 82            "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
 83        )
 84        if raw_component_schema is not None:
 85            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
 86            return declarative_component_schema  # type: ignore
 87        else:
 88            raise RuntimeError(
 89                "Failed to read manifest component json schema required for deduplication"
 90            )
 91    except FileNotFoundError as e:
 92        raise FileNotFoundError(
 93            f"Failed to read manifest component json schema required for deduplication: {e}"
 94        )
 95
 96
 97class ManifestDeclarativeSource(DeclarativeSource):
 98    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
 99
100    def __init__(
101        self,
102        source_config: ConnectionDefinition,
103        *,
104        config: Mapping[str, Any] | None = None,
105        debug: bool = False,
106        emit_connector_builder_messages: bool = False,
107        component_factory: Optional[ModelToComponentFactory] = None,
108        migrate_manifest: Optional[bool] = False,
109        normalize_manifest: Optional[bool] = False,
110        config_path: Optional[str] = None,
111    ) -> None:
112        """
113        Args:
114            config: The provided config dict.
115            source_config: The manifest of low-code components that describe the source connector.
116            debug: True if debug mode is enabled.
117            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
118            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
119            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
120            config_path: Optional path to the config file.
121        """
122        self.logger = logging.getLogger(f"airbyte.{self.name}")
123        self._should_normalize = normalize_manifest
124        self._should_migrate = migrate_manifest
125        self._declarative_component_schema = _get_declarative_component_schema()
126        # If custom components are needed, locate and/or register them.
127        self.components_module: ModuleType | None = get_registered_components_module(config=config)
128        # set additional attributes
129        self._debug = debug
130        self._emit_connector_builder_messages = emit_connector_builder_messages
131        self._constructor = (
132            component_factory
133            if component_factory
134            else ModelToComponentFactory(
135                emit_connector_builder_messages=emit_connector_builder_messages,
136                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
137            )
138        )
139        self._message_repository = self._constructor.get_message_repository()
140        self._slice_logger: SliceLogger = (
141            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
142        )
143
144        # resolve all components in the manifest
145        self._source_config = self._pre_process_manifest(dict(source_config))
146        # validate resolved manifest against the declarative component schema
147        self._validate_source()
148        # apply additional post-processing to the manifest
149        self._post_process_manifest()
150
151        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
152        self._spec_component: Optional[Spec] = (
153            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
154        )
155        self._config = self._migrate_and_transform_config(config_path, config) or {}
156
157    @property
158    def resolved_manifest(self) -> Mapping[str, Any]:
159        """
160        Returns the resolved manifest configuration for the source.
161
162        This property provides access to the internal source configuration as a mapping,
163        which contains all settings and parameters required to define the source's behavior.
164
165        Returns:
166            Mapping[str, Any]: The resolved source configuration manifest.
167        """
168        return self._source_config
169
170    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
171        """
172        Preprocesses the provided manifest dictionary by resolving any manifest references.
173
174        This method modifies the input manifest in place, resolving references using the
175        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
176
177        Args:
178            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
179
180        Returns:
181            None
182        """
183        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
184        manifest = self._fix_source_type(manifest)
185        # Resolve references in the manifest
186        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
187        # Propagate types and parameters throughout the manifest
188        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
189            "", resolved_manifest, {}
190        )
191
192        return propagated_manifest
193
194    def _post_process_manifest(self) -> None:
195        """
196        Post-processes the manifest after validation.
197        This method is responsible for any additional modifications or transformations needed
198        after the manifest has been validated and before it is used in the source.
199        """
200        # apply manifest migration, if required
201        self._migrate_manifest()
202        # apply manifest normalization, if required
203        self._normalize_manifest()
204
205    def _normalize_manifest(self) -> None:
206        """
207        This method is used to normalize the manifest. It should be called after the manifest has been validated.
208
209        Connector Builder UI rendering requires the manifest to be in a specific format.
210         - references have been resolved
211         - the commonly used definitions are extracted to the `definitions.linked.*`
212        """
213        if self._should_normalize:
214            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
215            self._source_config = normalizer.normalize()
216
217    def _migrate_and_transform_config(
218        self,
219        config_path: Optional[str],
220        config: Optional[Config],
221    ) -> Optional[Config]:
222        if not config:
223            return None
224        if not self._spec_component:
225            return config
226        mutable_config = dict(config)
227        self._spec_component.migrate_config(mutable_config)
228        if mutable_config != config:
229            if config_path:
230                with open(config_path, "w") as f:
231                    json.dump(mutable_config, f)
232            self.message_repository.emit_message(
233                create_connector_config_control_message(mutable_config)
234            )
235            # We have no mechanism for consuming the queue, so we print the messages to stdout
236            for message in self.message_repository.consume_queue():
237                print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
238        self._spec_component.transform_config(mutable_config)
239        return mutable_config
240
241    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
242        config = self._config or config
243        return super().configure(config, temp_dir)
244
245    def _migrate_manifest(self) -> None:
246        """
247        This method is used to migrate the manifest. It should be called after the manifest has been validated.
248        The migration is done in place, so the original manifest is modified.
249
250        The original manifest is returned if any error occurs during migration.
251        """
252        if self._should_migrate:
253            manifest_migrator = ManifestMigrationHandler(self._source_config)
254            self._source_config = manifest_migrator.apply_migrations()
255            # validate migrated manifest against the declarative component schema
256            self._validate_source()
257
258    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
259        """
260        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
261        """
262        if "type" not in manifest:
263            manifest["type"] = "DeclarativeSource"
264
265        return manifest
266
267    @property
268    def message_repository(self) -> MessageRepository:
269        return self._message_repository
270
271    @property
272    def dynamic_streams(self) -> List[Dict[str, Any]]:
273        return self._dynamic_stream_configs(
274            manifest=self._source_config,
275            config=self._config,
276            with_dynamic_stream_name=True,
277        )
278
279    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
280        return self._constructor.get_model_deprecations()
281
282    @property
283    def connection_checker(self) -> ConnectionChecker:
284        check = self._source_config["check"]
285        if "type" not in check:
286            check["type"] = "CheckStream"
287        check_stream = self._constructor.create_component(
288            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
289            check,
290            dict(),
291            emit_connector_builder_messages=self._emit_connector_builder_messages,
292        )
293        if isinstance(check_stream, ConnectionChecker):
294            return check_stream
295        else:
296            raise ValueError(
297                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
298            )
299
300    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
301        if self._spec_component:
302            self._spec_component.validate_config(config)
303
304        self._emit_manifest_debug_message(
305            extra_args={
306                "source_name": self.name,
307                "parsed_config": json.dumps(self._source_config),
308            }
309        )
310
311        stream_configs = (
312            self._stream_configs(self._source_config, config=config) + self.dynamic_streams
313        )
314
315        api_budget_model = self._source_config.get("api_budget")
316        if api_budget_model:
317            self._constructor.set_api_budget(api_budget_model, config)
318
319        source_streams = [
320            self._constructor.create_component(
321                (
322                    StateDelegatingStreamModel
323                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
324                    else DeclarativeStreamModel
325                ),
326                stream_config,
327                config,
328                emit_connector_builder_messages=self._emit_connector_builder_messages,
329            )
330            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
331        ]
332        return source_streams
333
334    @staticmethod
335    def _initialize_cache_for_parent_streams(
336        stream_configs: List[Dict[str, Any]],
337    ) -> List[Dict[str, Any]]:
338        parent_streams = set()
339
340        def update_with_cache_parent_configs(
341            parent_configs: list[dict[str, Any]],
342        ) -> None:
343            for parent_config in parent_configs:
344                parent_streams.add(parent_config["stream"]["name"])
345                if parent_config["stream"]["type"] == "StateDelegatingStream":
346                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
347                        "use_cache"
348                    ] = True
349                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
350                        "use_cache"
351                    ] = True
352                else:
353                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
354
355        for stream_config in stream_configs:
356            if stream_config.get("incremental_sync", {}).get("parent_stream"):
357                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
358                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
359                    "use_cache"
360                ] = True
361
362            elif stream_config.get("retriever", {}).get("partition_router", {}):
363                partition_router = stream_config["retriever"]["partition_router"]
364
365                if isinstance(partition_router, dict) and partition_router.get(
366                    "parent_stream_configs"
367                ):
368                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
369                elif isinstance(partition_router, list):
370                    for router in partition_router:
371                        if router.get("parent_stream_configs"):
372                            update_with_cache_parent_configs(router["parent_stream_configs"])
373
374        for stream_config in stream_configs:
375            if stream_config["name"] in parent_streams:
376                if stream_config["type"] == "StateDelegatingStream":
377                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
378                        True
379                    )
380                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
381                        True
382                    )
383                else:
384                    stream_config["retriever"]["requester"]["use_cache"] = True
385        return stream_configs
386
387    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
388        """
389        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
390        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
391        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
392        in the project root.
393        """
394        self._configure_logger_level(logger)
395        self._emit_manifest_debug_message(
396            extra_args={
397                "source_name": self.name,
398                "parsed_config": json.dumps(self._source_config),
399            }
400        )
401
402        return (
403            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
404        )
405
406    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
407        self._configure_logger_level(logger)
408        return super().check(logger, config)
409
410    def read(
411        self,
412        logger: logging.Logger,
413        config: Mapping[str, Any],
414        catalog: ConfiguredAirbyteCatalog,
415        state: Optional[List[AirbyteStateMessage]] = None,
416    ) -> Iterator[AirbyteMessage]:
417        self._configure_logger_level(logger)
418        yield from super().read(logger, config, catalog, state)
419
420    def _configure_logger_level(self, logger: logging.Logger) -> None:
421        """
422        Set the log level to logging.DEBUG if debug mode is enabled
423        """
424        if self._debug:
425            logger.setLevel(logging.DEBUG)
426
427    def _validate_source(self) -> None:
428        """
429        Validates the connector manifest against the declarative component schema
430        """
431
432        try:
433            validate(self._source_config, self._declarative_component_schema)
434        except ValidationError as e:
435            raise ValidationError(
436                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
437            ) from e
438
439        cdk_version_str = metadata.version("airbyte_cdk")
440        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
441        manifest_version_str = self._source_config.get("version")
442        if manifest_version_str is None:
443            raise RuntimeError(
444                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
445            )
446        manifest_version = self._parse_version(manifest_version_str, "manifest")
447
448        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
449            # Skipping version compatibility check on unreleased dev branch
450            pass
451        elif (cdk_version.major, cdk_version.minor) < (
452            manifest_version.major,
453            manifest_version.minor,
454        ):
455            raise ValidationError(
456                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
457                f"manifest may contain features that are not in the current CDK version."
458            )
459        elif (manifest_version.major, manifest_version.minor) < (0, 29):
460            raise ValidationError(
461                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
462                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
463                f"{cdk_version!s} which contains these breaking changes."
464            )
465
466    @staticmethod
467    def _parse_version(
468        version: str,
469        version_type: str,
470    ) -> Version:
471        """Takes a semantic version represented as a string and splits it into a tuple.
472
473        The fourth part (prerelease) is not returned in the tuple.
474
475        Returns:
476            Version: the parsed version object
477        """
478        try:
479            parsed_version = Version(version)
480        except InvalidVersion as ex:
481            raise ValidationError(
482                f"The {version_type} version '{version}' is not a valid version format."
483            ) from ex
484        else:
485            # No exception
486            return parsed_version
487
488    def _stream_configs(
489        self, manifest: Mapping[str, Any], config: Mapping[str, Any]
490    ) -> List[Dict[str, Any]]:
491        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
492        stream_configs = []
493        for current_stream_config in manifest.get("streams", []):
494            if (
495                "type" in current_stream_config
496                and current_stream_config["type"] == "ConditionalStreams"
497            ):
498                interpolated_boolean = InterpolatedBoolean(
499                    condition=current_stream_config.get("condition"),
500                    parameters={},
501                )
502
503                if interpolated_boolean.eval(config=config):
504                    stream_configs.extend(current_stream_config.get("streams", []))
505            else:
506                if "type" not in current_stream_config:
507                    current_stream_config["type"] = "DeclarativeStream"
508                stream_configs.append(current_stream_config)
509        return stream_configs
510
511    def _dynamic_stream_configs(
512        self,
513        manifest: Mapping[str, Any],
514        config: Mapping[str, Any],
515        with_dynamic_stream_name: Optional[bool] = None,
516    ) -> List[Dict[str, Any]]:
517        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
518        dynamic_stream_configs: List[Dict[str, Any]] = []
519        seen_dynamic_streams: Set[str] = set()
520
521        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
522            components_resolver_config = dynamic_definition["components_resolver"]
523
524            if not components_resolver_config:
525                raise ValueError(
526                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
527                )
528
529            resolver_type = components_resolver_config.get("type")
530            if not resolver_type:
531                raise ValueError(
532                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
533                )
534
535            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
536                raise ValueError(
537                    f"Invalid components resolver type '{resolver_type}'. "
538                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
539                )
540
541            if "retriever" in components_resolver_config:
542                components_resolver_config["retriever"]["requester"]["use_cache"] = True
543
544            # Create a resolver for dynamic components based on type
545            components_resolver = self._constructor.create_component(
546                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
547                components_resolver_config,
548                config,
549            )
550
551            stream_template_config = dynamic_definition["stream_template"]
552
553            for dynamic_stream in components_resolver.resolve_components(
554                stream_template_config=stream_template_config
555            ):
556                dynamic_stream = {
557                    **ManifestComponentTransformer().propagate_types_and_parameters(
558                        "", dynamic_stream, {}, use_parent_parameters=True
559                    )
560                }
561
562                if "type" not in dynamic_stream:
563                    dynamic_stream["type"] = "DeclarativeStream"
564
565                # Ensure that each stream is created with a unique name
566                name = dynamic_stream.get("name")
567
568                if with_dynamic_stream_name:
569                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
570                        "name", f"dynamic_stream_{dynamic_definition_index}"
571                    )
572
573                if not isinstance(name, str):
574                    raise ValueError(
575                        f"Expected stream name {name} to be a string, got {type(name)}."
576                    )
577
578                if name in seen_dynamic_streams:
579                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
580                    failure_type = FailureType.system_error
581
582                    if resolver_type == "ConfigComponentsResolver":
583                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
584                        failure_type = FailureType.config_error
585
586                    raise AirbyteTracedException(
587                        message=error_message,
588                        internal_message=error_message,
589                        failure_type=failure_type,
590                    )
591
592                seen_dynamic_streams.add(name)
593                dynamic_stream_configs.append(dynamic_stream)
594
595        return dynamic_stream_configs
596
597    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
598        self.logger.debug("declarative source created from manifest", extra=extra_args)
 98class ManifestDeclarativeSource(DeclarativeSource):
 99    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
100
101    def __init__(
102        self,
103        source_config: ConnectionDefinition,
104        *,
105        config: Mapping[str, Any] | None = None,
106        debug: bool = False,
107        emit_connector_builder_messages: bool = False,
108        component_factory: Optional[ModelToComponentFactory] = None,
109        migrate_manifest: Optional[bool] = False,
110        normalize_manifest: Optional[bool] = False,
111        config_path: Optional[str] = None,
112    ) -> None:
113        """
114        Args:
115            config: The provided config dict.
116            source_config: The manifest of low-code components that describe the source connector.
117            debug: True if debug mode is enabled.
118            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
119            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
120            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
121            config_path: Optional path to the config file.
122        """
123        self.logger = logging.getLogger(f"airbyte.{self.name}")
124        self._should_normalize = normalize_manifest
125        self._should_migrate = migrate_manifest
126        self._declarative_component_schema = _get_declarative_component_schema()
127        # If custom components are needed, locate and/or register them.
128        self.components_module: ModuleType | None = get_registered_components_module(config=config)
129        # set additional attributes
130        self._debug = debug
131        self._emit_connector_builder_messages = emit_connector_builder_messages
132        self._constructor = (
133            component_factory
134            if component_factory
135            else ModelToComponentFactory(
136                emit_connector_builder_messages=emit_connector_builder_messages,
137                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
138            )
139        )
140        self._message_repository = self._constructor.get_message_repository()
141        self._slice_logger: SliceLogger = (
142            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
143        )
144
145        # resolve all components in the manifest
146        self._source_config = self._pre_process_manifest(dict(source_config))
147        # validate resolved manifest against the declarative component schema
148        self._validate_source()
149        # apply additional post-processing to the manifest
150        self._post_process_manifest()
151
152        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
153        self._spec_component: Optional[Spec] = (
154            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
155        )
156        self._config = self._migrate_and_transform_config(config_path, config) or {}
157
158    @property
159    def resolved_manifest(self) -> Mapping[str, Any]:
160        """
161        Returns the resolved manifest configuration for the source.
162
163        This property provides access to the internal source configuration as a mapping,
164        which contains all settings and parameters required to define the source's behavior.
165
166        Returns:
167            Mapping[str, Any]: The resolved source configuration manifest.
168        """
169        return self._source_config
170
171    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
172        """
173        Preprocesses the provided manifest dictionary by resolving any manifest references.
174
175        This method modifies the input manifest in place, resolving references using the
176        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
177
178        Args:
179            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
180
181        Returns:
182            None
183        """
184        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
185        manifest = self._fix_source_type(manifest)
186        # Resolve references in the manifest
187        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
188        # Propagate types and parameters throughout the manifest
189        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
190            "", resolved_manifest, {}
191        )
192
193        return propagated_manifest
194
195    def _post_process_manifest(self) -> None:
196        """
197        Post-processes the manifest after validation.
198        This method is responsible for any additional modifications or transformations needed
199        after the manifest has been validated and before it is used in the source.
200        """
201        # apply manifest migration, if required
202        self._migrate_manifest()
203        # apply manifest normalization, if required
204        self._normalize_manifest()
205
206    def _normalize_manifest(self) -> None:
207        """
208        This method is used to normalize the manifest. It should be called after the manifest has been validated.
209
210        Connector Builder UI rendering requires the manifest to be in a specific format.
211         - references have been resolved
212         - the commonly used definitions are extracted to the `definitions.linked.*`
213        """
214        if self._should_normalize:
215            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
216            self._source_config = normalizer.normalize()
217
218    def _migrate_and_transform_config(
219        self,
220        config_path: Optional[str],
221        config: Optional[Config],
222    ) -> Optional[Config]:
223        if not config:
224            return None
225        if not self._spec_component:
226            return config
227        mutable_config = dict(config)
228        self._spec_component.migrate_config(mutable_config)
229        if mutable_config != config:
230            if config_path:
231                with open(config_path, "w") as f:
232                    json.dump(mutable_config, f)
233            self.message_repository.emit_message(
234                create_connector_config_control_message(mutable_config)
235            )
236            # We have no mechanism for consuming the queue, so we print the messages to stdout
237            for message in self.message_repository.consume_queue():
238                print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
239        self._spec_component.transform_config(mutable_config)
240        return mutable_config
241
242    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
243        config = self._config or config
244        return super().configure(config, temp_dir)
245
246    def _migrate_manifest(self) -> None:
247        """
248        This method is used to migrate the manifest. It should be called after the manifest has been validated.
249        The migration is done in place, so the original manifest is modified.
250
251        The original manifest is returned if any error occurs during migration.
252        """
253        if self._should_migrate:
254            manifest_migrator = ManifestMigrationHandler(self._source_config)
255            self._source_config = manifest_migrator.apply_migrations()
256            # validate migrated manifest against the declarative component schema
257            self._validate_source()
258
259    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
260        """
261        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
262        """
263        if "type" not in manifest:
264            manifest["type"] = "DeclarativeSource"
265
266        return manifest
267
268    @property
269    def message_repository(self) -> MessageRepository:
270        return self._message_repository
271
272    @property
273    def dynamic_streams(self) -> List[Dict[str, Any]]:
274        return self._dynamic_stream_configs(
275            manifest=self._source_config,
276            config=self._config,
277            with_dynamic_stream_name=True,
278        )
279
280    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
281        return self._constructor.get_model_deprecations()
282
283    @property
284    def connection_checker(self) -> ConnectionChecker:
285        check = self._source_config["check"]
286        if "type" not in check:
287            check["type"] = "CheckStream"
288        check_stream = self._constructor.create_component(
289            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
290            check,
291            dict(),
292            emit_connector_builder_messages=self._emit_connector_builder_messages,
293        )
294        if isinstance(check_stream, ConnectionChecker):
295            return check_stream
296        else:
297            raise ValueError(
298                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
299            )
300
301    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
302        if self._spec_component:
303            self._spec_component.validate_config(config)
304
305        self._emit_manifest_debug_message(
306            extra_args={
307                "source_name": self.name,
308                "parsed_config": json.dumps(self._source_config),
309            }
310        )
311
312        stream_configs = (
313            self._stream_configs(self._source_config, config=config) + self.dynamic_streams
314        )
315
316        api_budget_model = self._source_config.get("api_budget")
317        if api_budget_model:
318            self._constructor.set_api_budget(api_budget_model, config)
319
320        source_streams = [
321            self._constructor.create_component(
322                (
323                    StateDelegatingStreamModel
324                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
325                    else DeclarativeStreamModel
326                ),
327                stream_config,
328                config,
329                emit_connector_builder_messages=self._emit_connector_builder_messages,
330            )
331            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
332        ]
333        return source_streams
334
335    @staticmethod
336    def _initialize_cache_for_parent_streams(
337        stream_configs: List[Dict[str, Any]],
338    ) -> List[Dict[str, Any]]:
339        parent_streams = set()
340
341        def update_with_cache_parent_configs(
342            parent_configs: list[dict[str, Any]],
343        ) -> None:
344            for parent_config in parent_configs:
345                parent_streams.add(parent_config["stream"]["name"])
346                if parent_config["stream"]["type"] == "StateDelegatingStream":
347                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
348                        "use_cache"
349                    ] = True
350                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
351                        "use_cache"
352                    ] = True
353                else:
354                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
355
356        for stream_config in stream_configs:
357            if stream_config.get("incremental_sync", {}).get("parent_stream"):
358                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
359                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
360                    "use_cache"
361                ] = True
362
363            elif stream_config.get("retriever", {}).get("partition_router", {}):
364                partition_router = stream_config["retriever"]["partition_router"]
365
366                if isinstance(partition_router, dict) and partition_router.get(
367                    "parent_stream_configs"
368                ):
369                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
370                elif isinstance(partition_router, list):
371                    for router in partition_router:
372                        if router.get("parent_stream_configs"):
373                            update_with_cache_parent_configs(router["parent_stream_configs"])
374
375        for stream_config in stream_configs:
376            if stream_config["name"] in parent_streams:
377                if stream_config["type"] == "StateDelegatingStream":
378                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
379                        True
380                    )
381                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
382                        True
383                    )
384                else:
385                    stream_config["retriever"]["requester"]["use_cache"] = True
386        return stream_configs
387
388    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
389        """
390        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
391        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
392        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
393        in the project root.
394        """
395        self._configure_logger_level(logger)
396        self._emit_manifest_debug_message(
397            extra_args={
398                "source_name": self.name,
399                "parsed_config": json.dumps(self._source_config),
400            }
401        )
402
403        return (
404            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
405        )
406
407    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
408        self._configure_logger_level(logger)
409        return super().check(logger, config)
410
411    def read(
412        self,
413        logger: logging.Logger,
414        config: Mapping[str, Any],
415        catalog: ConfiguredAirbyteCatalog,
416        state: Optional[List[AirbyteStateMessage]] = None,
417    ) -> Iterator[AirbyteMessage]:
418        self._configure_logger_level(logger)
419        yield from super().read(logger, config, catalog, state)
420
421    def _configure_logger_level(self, logger: logging.Logger) -> None:
422        """
423        Set the log level to logging.DEBUG if debug mode is enabled
424        """
425        if self._debug:
426            logger.setLevel(logging.DEBUG)
427
428    def _validate_source(self) -> None:
429        """
430        Validates the connector manifest against the declarative component schema
431        """
432
433        try:
434            validate(self._source_config, self._declarative_component_schema)
435        except ValidationError as e:
436            raise ValidationError(
437                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
438            ) from e
439
440        cdk_version_str = metadata.version("airbyte_cdk")
441        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
442        manifest_version_str = self._source_config.get("version")
443        if manifest_version_str is None:
444            raise RuntimeError(
445                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
446            )
447        manifest_version = self._parse_version(manifest_version_str, "manifest")
448
449        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
450            # Skipping version compatibility check on unreleased dev branch
451            pass
452        elif (cdk_version.major, cdk_version.minor) < (
453            manifest_version.major,
454            manifest_version.minor,
455        ):
456            raise ValidationError(
457                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
458                f"manifest may contain features that are not in the current CDK version."
459            )
460        elif (manifest_version.major, manifest_version.minor) < (0, 29):
461            raise ValidationError(
462                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
463                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
464                f"{cdk_version!s} which contains these breaking changes."
465            )
466
467    @staticmethod
468    def _parse_version(
469        version: str,
470        version_type: str,
471    ) -> Version:
472        """Takes a semantic version represented as a string and splits it into a tuple.
473
474        The fourth part (prerelease) is not returned in the tuple.
475
476        Returns:
477            Version: the parsed version object
478        """
479        try:
480            parsed_version = Version(version)
481        except InvalidVersion as ex:
482            raise ValidationError(
483                f"The {version_type} version '{version}' is not a valid version format."
484            ) from ex
485        else:
486            # No exception
487            return parsed_version
488
489    def _stream_configs(
490        self, manifest: Mapping[str, Any], config: Mapping[str, Any]
491    ) -> List[Dict[str, Any]]:
492        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
493        stream_configs = []
494        for current_stream_config in manifest.get("streams", []):
495            if (
496                "type" in current_stream_config
497                and current_stream_config["type"] == "ConditionalStreams"
498            ):
499                interpolated_boolean = InterpolatedBoolean(
500                    condition=current_stream_config.get("condition"),
501                    parameters={},
502                )
503
504                if interpolated_boolean.eval(config=config):
505                    stream_configs.extend(current_stream_config.get("streams", []))
506            else:
507                if "type" not in current_stream_config:
508                    current_stream_config["type"] = "DeclarativeStream"
509                stream_configs.append(current_stream_config)
510        return stream_configs
511
512    def _dynamic_stream_configs(
513        self,
514        manifest: Mapping[str, Any],
515        config: Mapping[str, Any],
516        with_dynamic_stream_name: Optional[bool] = None,
517    ) -> List[Dict[str, Any]]:
518        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
519        dynamic_stream_configs: List[Dict[str, Any]] = []
520        seen_dynamic_streams: Set[str] = set()
521
522        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
523            components_resolver_config = dynamic_definition["components_resolver"]
524
525            if not components_resolver_config:
526                raise ValueError(
527                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
528                )
529
530            resolver_type = components_resolver_config.get("type")
531            if not resolver_type:
532                raise ValueError(
533                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
534                )
535
536            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
537                raise ValueError(
538                    f"Invalid components resolver type '{resolver_type}'. "
539                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
540                )
541
542            if "retriever" in components_resolver_config:
543                components_resolver_config["retriever"]["requester"]["use_cache"] = True
544
545            # Create a resolver for dynamic components based on type
546            components_resolver = self._constructor.create_component(
547                COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
548                components_resolver_config,
549                config,
550            )
551
552            stream_template_config = dynamic_definition["stream_template"]
553
554            for dynamic_stream in components_resolver.resolve_components(
555                stream_template_config=stream_template_config
556            ):
557                dynamic_stream = {
558                    **ManifestComponentTransformer().propagate_types_and_parameters(
559                        "", dynamic_stream, {}, use_parent_parameters=True
560                    )
561                }
562
563                if "type" not in dynamic_stream:
564                    dynamic_stream["type"] = "DeclarativeStream"
565
566                # Ensure that each stream is created with a unique name
567                name = dynamic_stream.get("name")
568
569                if with_dynamic_stream_name:
570                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
571                        "name", f"dynamic_stream_{dynamic_definition_index}"
572                    )
573
574                if not isinstance(name, str):
575                    raise ValueError(
576                        f"Expected stream name {name} to be a string, got {type(name)}."
577                    )
578
579                if name in seen_dynamic_streams:
580                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
581                    failure_type = FailureType.system_error
582
583                    if resolver_type == "ConfigComponentsResolver":
584                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
585                        failure_type = FailureType.config_error
586
587                    raise AirbyteTracedException(
588                        message=error_message,
589                        internal_message=error_message,
590                        failure_type=failure_type,
591                    )
592
593                seen_dynamic_streams.add(name)
594                dynamic_stream_configs.append(dynamic_stream)
595
596        return dynamic_stream_configs
597
598    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
599        self.logger.debug("declarative source created from manifest", extra=extra_args)

Declarative source defined by a manifest of low-code components that define source connector behavior

ManifestDeclarativeSource( source_config: Mapping[str, Any], *, config: Optional[Mapping[str, Any]] = None, debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[airbyte_cdk.sources.declarative.parsers.model_to_component_factory.ModelToComponentFactory] = None, migrate_manifest: Optional[bool] = False, normalize_manifest: Optional[bool] = False, config_path: Optional[str] = None)
101    def __init__(
102        self,
103        source_config: ConnectionDefinition,
104        *,
105        config: Mapping[str, Any] | None = None,
106        debug: bool = False,
107        emit_connector_builder_messages: bool = False,
108        component_factory: Optional[ModelToComponentFactory] = None,
109        migrate_manifest: Optional[bool] = False,
110        normalize_manifest: Optional[bool] = False,
111        config_path: Optional[str] = None,
112    ) -> None:
113        """
114        Args:
115            config: The provided config dict.
116            source_config: The manifest of low-code components that describe the source connector.
117            debug: True if debug mode is enabled.
118            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
119            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
120            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
121            config_path: Optional path to the config file.
122        """
123        self.logger = logging.getLogger(f"airbyte.{self.name}")
124        self._should_normalize = normalize_manifest
125        self._should_migrate = migrate_manifest
126        self._declarative_component_schema = _get_declarative_component_schema()
127        # If custom components are needed, locate and/or register them.
128        self.components_module: ModuleType | None = get_registered_components_module(config=config)
129        # set additional attributes
130        self._debug = debug
131        self._emit_connector_builder_messages = emit_connector_builder_messages
132        self._constructor = (
133            component_factory
134            if component_factory
135            else ModelToComponentFactory(
136                emit_connector_builder_messages=emit_connector_builder_messages,
137                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
138            )
139        )
140        self._message_repository = self._constructor.get_message_repository()
141        self._slice_logger: SliceLogger = (
142            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
143        )
144
145        # resolve all components in the manifest
146        self._source_config = self._pre_process_manifest(dict(source_config))
147        # validate resolved manifest against the declarative component schema
148        self._validate_source()
149        # apply additional post-processing to the manifest
150        self._post_process_manifest()
151
152        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
153        self._spec_component: Optional[Spec] = (
154            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
155        )
156        self._config = self._migrate_and_transform_config(config_path, config) or {}
Arguments:
  • config: The provided config dict.
  • source_config: The manifest of low-code components that describe the source connector.
  • debug: True if debug mode is enabled.
  • emit_connector_builder_messages: True if messages should be emitted to the connector builder.
  • component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
  • normalize_manifest: Optional flag to indicate if the manifest should be normalized.
  • config_path: Optional path to the config file.
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
158    @property
159    def resolved_manifest(self) -> Mapping[str, Any]:
160        """
161        Returns the resolved manifest configuration for the source.
162
163        This property provides access to the internal source configuration as a mapping,
164        which contains all settings and parameters required to define the source's behavior.
165
166        Returns:
167            Mapping[str, Any]: The resolved source configuration manifest.
168        """
169        return self._source_config

Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.

Returns:

Mapping[str, Any]: The resolved source configuration manifest.

message_repository: airbyte_cdk.MessageRepository
268    @property
269    def message_repository(self) -> MessageRepository:
270        return self._message_repository
dynamic_streams: List[Dict[str, Any]]
272    @property
273    def dynamic_streams(self) -> List[Dict[str, Any]]:
274        return self._dynamic_stream_configs(
275            manifest=self._source_config,
276            config=self._config,
277            with_dynamic_stream_name=True,
278        )
def deprecation_warnings(self) -> List[airbyte_cdk.connector_builder.models.LogMessage]:
280    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
281        return self._constructor.get_model_deprecations()

Returns a list of deprecation warnings for the source.

283    @property
284    def connection_checker(self) -> ConnectionChecker:
285        check = self._source_config["check"]
286        if "type" not in check:
287            check["type"] = "CheckStream"
288        check_stream = self._constructor.create_component(
289            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
290            check,
291            dict(),
292            emit_connector_builder_messages=self._emit_connector_builder_messages,
293        )
294        if isinstance(check_stream, ConnectionChecker):
295            return check_stream
296        else:
297            raise ValueError(
298                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
299            )

Returns the ConnectionChecker to use for the check operation

def streams( self, config: Mapping[str, Any]) -> List[airbyte_cdk.Stream]:
301    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
302        if self._spec_component:
303            self._spec_component.validate_config(config)
304
305        self._emit_manifest_debug_message(
306            extra_args={
307                "source_name": self.name,
308                "parsed_config": json.dumps(self._source_config),
309            }
310        )
311
312        stream_configs = (
313            self._stream_configs(self._source_config, config=config) + self.dynamic_streams
314        )
315
316        api_budget_model = self._source_config.get("api_budget")
317        if api_budget_model:
318            self._constructor.set_api_budget(api_budget_model, config)
319
320        source_streams = [
321            self._constructor.create_component(
322                (
323                    StateDelegatingStreamModel
324                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
325                    else DeclarativeStreamModel
326                ),
327                stream_config,
328                config,
329                emit_connector_builder_messages=self._emit_connector_builder_messages,
330            )
331            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
332        ]
333        return source_streams
Parameters
  • config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
Returns

A list of the streams in this source connector.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
388    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
389        """
390        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
391        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
392        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
393        in the project root.
394        """
395        self._configure_logger_level(logger)
396        self._emit_manifest_debug_message(
397            extra_args={
398                "source_name": self.name,
399                "parsed_config": json.dumps(self._source_config),
400            }
401        )
402
403        return (
404            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
405        )

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
407    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
408        self._configure_logger_level(logger)
409        return super().check(logger, config)

Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
411    def read(
412        self,
413        logger: logging.Logger,
414        config: Mapping[str, Any],
415        catalog: ConfiguredAirbyteCatalog,
416        state: Optional[List[AirbyteStateMessage]] = None,
417    ) -> Iterator[AirbyteMessage]:
418        self._configure_logger_level(logger)
419        yield from super().read(logger, config, catalog, state)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.