airbyte_cdk.legacy.sources.declarative.manifest_declarative_source

  1#
  2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  3#
  4
  5import json
  6import logging
  7import pkgutil
  8from copy import deepcopy
  9from importlib import metadata
 10from types import ModuleType
 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union
 12
 13import orjson
 14import yaml
 15from jsonschema.exceptions import ValidationError
 16from jsonschema.validators import validate
 17from packaging.version import InvalidVersion, Version
 18
 19from airbyte_cdk.config_observation import create_connector_config_control_message
 20from airbyte_cdk.connector_builder.models import (
 21    LogMessage as ConnectorBuilderLogMessage,
 22)
 23from airbyte_cdk.legacy.sources.declarative.declarative_source import DeclarativeSource
 24from airbyte_cdk.manifest_migrations.migration_handler import (
 25    ManifestMigrationHandler,
 26)
 27from airbyte_cdk.models import (
 28    AirbyteConnectionStatus,
 29    AirbyteMessage,
 30    AirbyteStateMessage,
 31    ConfiguredAirbyteCatalog,
 32    ConnectorSpecification,
 33    FailureType,
 34)
 35from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
 36from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 37from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 38from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
 39from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 40    ConditionalStreams as ConditionalStreamsModel,
 41)
 42from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 43    DeclarativeStream as DeclarativeStreamModel,
 44)
 45from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 46    Spec as SpecModel,
 47)
 48from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 49    StateDelegatingStream as StateDelegatingStreamModel,
 50)
 51from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 52    get_registered_components_module,
 53)
 54from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 55    ManifestComponentTransformer,
 56)
 57from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
 58    ManifestNormalizer,
 59)
 60from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 61    ManifestReferenceResolver,
 62)
 63from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 64    ModelToComponentFactory,
 65)
 66from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 67from airbyte_cdk.sources.declarative.spec.spec import Spec
 68from airbyte_cdk.sources.message import MessageRepository
 69from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 70from airbyte_cdk.sources.streams.core import Stream
 71from airbyte_cdk.sources.types import Config, ConnectionDefinition
 72from airbyte_cdk.sources.utils.slice_logger import (
 73    AlwaysLogSliceLogger,
 74    DebugSliceLogger,
 75    SliceLogger,
 76)
 77from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 78
 79
 80def _get_declarative_component_schema() -> Dict[str, Any]:
 81    try:
 82        raw_component_schema = pkgutil.get_data(
 83            "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
 84        )
 85        if raw_component_schema is not None:
 86            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
 87            return declarative_component_schema  # type: ignore
 88        else:
 89            raise RuntimeError(
 90                "Failed to read manifest component json schema required for deduplication"
 91            )
 92    except FileNotFoundError as e:
 93        raise FileNotFoundError(
 94            f"Failed to read manifest component json schema required for deduplication: {e}"
 95        )
 96
 97
 98class ManifestDeclarativeSource(DeclarativeSource):
 99    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
100
101    def __init__(
102        self,
103        source_config: ConnectionDefinition,
104        *,
105        config: Mapping[str, Any] | None = None,
106        debug: bool = False,
107        emit_connector_builder_messages: bool = False,
108        component_factory: Optional[ModelToComponentFactory] = None,
109        migrate_manifest: Optional[bool] = False,
110        normalize_manifest: Optional[bool] = False,
111        config_path: Optional[str] = None,
112    ) -> None:
113        """
114        Args:
115            config: The provided config dict.
116            source_config: The manifest of low-code components that describe the source connector.
117            debug: True if debug mode is enabled.
118            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
119            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
120            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
121            config_path: Optional path to the config file.
122        """
123        self.logger = logging.getLogger(f"airbyte.{self.name}")
124        self._should_normalize = normalize_manifest
125        self._should_migrate = migrate_manifest
126        self._declarative_component_schema = _get_declarative_component_schema()
127        # If custom components are needed, locate and/or register them.
128        self.components_module: ModuleType | None = get_registered_components_module(config=config)
129        # set additional attributes
130        self._debug = debug
131        self._emit_connector_builder_messages = emit_connector_builder_messages
132        self._constructor = (
133            component_factory
134            if component_factory
135            else ModelToComponentFactory(
136                emit_connector_builder_messages=emit_connector_builder_messages,
137                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
138            )
139        )
140        self._message_repository = self._constructor.get_message_repository()
141        self._slice_logger: SliceLogger = (
142            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
143        )
144
145        # resolve all components in the manifest
146        self._source_config = self._pre_process_manifest(dict(source_config))
147        # validate resolved manifest against the declarative component schema
148        self._validate_source()
149        # apply additional post-processing to the manifest
150        self._post_process_manifest()
151
152        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
153        self._spec_component: Optional[Spec] = (
154            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
155        )
156        self._config = self._migrate_and_transform_config(config_path, config) or {}
157
158    @property
159    def resolved_manifest(self) -> Mapping[str, Any]:
160        """
161        Returns the resolved manifest configuration for the source.
162
163        This property provides access to the internal source configuration as a mapping,
164        which contains all settings and parameters required to define the source's behavior.
165
166        Returns:
167            Mapping[str, Any]: The resolved source configuration manifest.
168        """
169        return self._source_config
170
171    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
172        """
173        Preprocesses the provided manifest dictionary by resolving any manifest references.
174
175        This method modifies the input manifest in place, resolving references using the
176        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
177
178        Args:
179            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
180
181        Returns:
182            None
183        """
184        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
185        manifest = self._fix_source_type(manifest)
186        # Resolve references in the manifest
187        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
188        # Propagate types and parameters throughout the manifest
189        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
190            "", resolved_manifest, {}
191        )
192
193        return propagated_manifest
194
195    def _post_process_manifest(self) -> None:
196        """
197        Post-processes the manifest after validation.
198        This method is responsible for any additional modifications or transformations needed
199        after the manifest has been validated and before it is used in the source.
200        """
201        # apply manifest migration, if required
202        self._migrate_manifest()
203        # apply manifest normalization, if required
204        self._normalize_manifest()
205
206    def _normalize_manifest(self) -> None:
207        """
208        This method is used to normalize the manifest. It should be called after the manifest has been validated.
209
210        Connector Builder UI rendering requires the manifest to be in a specific format.
211         - references have been resolved
212         - the commonly used definitions are extracted to the `definitions.linked.*`
213        """
214        if self._should_normalize:
215            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
216            self._source_config = normalizer.normalize()
217
218    def _migrate_and_transform_config(
219        self,
220        config_path: Optional[str],
221        config: Optional[Config],
222    ) -> Optional[Config]:
223        if not config:
224            return None
225        if not self._spec_component:
226            return config
227        mutable_config = dict(config)
228        self._spec_component.migrate_config(mutable_config)
229        if mutable_config != config:
230            if config_path:
231                with open(config_path, "w") as f:
232                    json.dump(mutable_config, f)
233            self.message_repository.emit_message(
234                create_connector_config_control_message(mutable_config)
235            )
236            # We have no mechanism for consuming the queue, so we print the messages to stdout
237            for message in self.message_repository.consume_queue():
238                print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
239        self._spec_component.transform_config(mutable_config)
240        return mutable_config
241
242    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
243        config = self._config or config
244        return super().configure(config, temp_dir)
245
246    def _migrate_manifest(self) -> None:
247        """
248        This method is used to migrate the manifest. It should be called after the manifest has been validated.
249        The migration is done in place, so the original manifest is modified.
250
251        The original manifest is returned if any error occurs during migration.
252        """
253        if self._should_migrate:
254            manifest_migrator = ManifestMigrationHandler(self._source_config)
255            self._source_config = manifest_migrator.apply_migrations()
256            # validate migrated manifest against the declarative component schema
257            self._validate_source()
258
259    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
260        """
261        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
262        """
263        if "type" not in manifest:
264            manifest["type"] = "DeclarativeSource"
265
266        return manifest
267
268    @property
269    def message_repository(self) -> MessageRepository:
270        return self._message_repository
271
272    @property
273    def dynamic_streams(self) -> List[Dict[str, Any]]:
274        return self._dynamic_stream_configs(
275            manifest=self._source_config,
276            config=self._config,
277            with_dynamic_stream_name=True,
278        )
279
280    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
281        return self._constructor.get_model_deprecations()
282
283    @property
284    def connection_checker(self) -> ConnectionChecker:
285        check = self._source_config["check"]
286        if "type" not in check:
287            check["type"] = "CheckStream"
288        check_stream = self._constructor.create_component(
289            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
290            check,
291            dict(),
292            emit_connector_builder_messages=self._emit_connector_builder_messages,
293        )
294        if isinstance(check_stream, ConnectionChecker):
295            return check_stream
296        else:
297            raise ValueError(
298                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
299            )
300
301    def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
302        """
303        As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
304        Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
305        fully decouple this from the AbstractSource.
306        """
307        if self._spec_component:
308            self._spec_component.validate_config(config)
309
310        self._emit_manifest_debug_message(
311            extra_args={
312                "source_name": self.name,
313                "parsed_config": json.dumps(self._source_config),
314            }
315        )
316
317        stream_configs = (
318            self._stream_configs(self._source_config, config=config) + self.dynamic_streams
319        )
320
321        api_budget_model = self._source_config.get("api_budget")
322        if api_budget_model:
323            self._constructor.set_api_budget(api_budget_model, config)
324
325        source_streams = [
326            self._constructor.create_component(
327                (
328                    StateDelegatingStreamModel
329                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
330                    else DeclarativeStreamModel
331                ),
332                stream_config,
333                config,
334                emit_connector_builder_messages=self._emit_connector_builder_messages,
335            )
336            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
337        ]
338        return source_streams
339
340    @staticmethod
341    def _initialize_cache_for_parent_streams(
342        stream_configs: List[Dict[str, Any]],
343    ) -> List[Dict[str, Any]]:
344        """Enable caching for parent streams unless explicitly disabled.
345
346        Caching is enabled by default for parent streams to optimize performance when the same
347        parent data is needed by multiple child streams. However, explicit `use_cache: false`
348        settings are respected for streams that cannot use caching (e.g., scroll-based pagination
349        APIs where caching causes duplicate records).
350        """
351        parent_streams = set()
352
353        def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
354            """Set use_cache to True only if not explicitly disabled."""
355            if requester.get("use_cache") is not False:
356                requester["use_cache"] = True
357
358        def update_with_cache_parent_configs(
359            parent_configs: list[dict[str, Any]],
360        ) -> None:
361            for parent_config in parent_configs:
362                parent_streams.add(parent_config["stream"]["name"])
363                if parent_config["stream"]["type"] == "StateDelegatingStream":
364                    _set_cache_if_not_disabled(
365                        parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
366                    )
367                    _set_cache_if_not_disabled(
368                        parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
369                    )
370                else:
371                    _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
372
373        for stream_config in stream_configs:
374            if stream_config.get("incremental_sync", {}).get("parent_stream"):
375                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
376                _set_cache_if_not_disabled(
377                    stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
378                )
379
380            elif stream_config.get("retriever", {}).get("partition_router", {}):
381                partition_router = stream_config["retriever"]["partition_router"]
382
383                if isinstance(partition_router, dict) and partition_router.get(
384                    "parent_stream_configs"
385                ):
386                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
387                elif isinstance(partition_router, list):
388                    for router in partition_router:
389                        if router.get("parent_stream_configs"):
390                            update_with_cache_parent_configs(router["parent_stream_configs"])
391
392        for stream_config in stream_configs:
393            if stream_config["name"] in parent_streams:
394                if stream_config["type"] == "StateDelegatingStream":
395                    _set_cache_if_not_disabled(
396                        stream_config["full_refresh_stream"]["retriever"]["requester"]
397                    )
398                    _set_cache_if_not_disabled(
399                        stream_config["incremental_stream"]["retriever"]["requester"]
400                    )
401                else:
402                    _set_cache_if_not_disabled(stream_config["retriever"]["requester"])
403        return stream_configs
404
405    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
406        """
407        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
408        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
409        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
410        in the project root.
411        """
412        self._configure_logger_level(logger)
413        self._emit_manifest_debug_message(
414            extra_args={
415                "source_name": self.name,
416                "parsed_config": json.dumps(self._source_config),
417            }
418        )
419
420        return (
421            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
422        )
423
424    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
425        self._configure_logger_level(logger)
426        return super().check(logger, config)
427
428    def read(
429        self,
430        logger: logging.Logger,
431        config: Mapping[str, Any],
432        catalog: ConfiguredAirbyteCatalog,
433        state: Optional[List[AirbyteStateMessage]] = None,
434    ) -> Iterator[AirbyteMessage]:
435        self._configure_logger_level(logger)
436        yield from super().read(logger, config, catalog, state)
437
438    def _configure_logger_level(self, logger: logging.Logger) -> None:
439        """
440        Set the log level to logging.DEBUG if debug mode is enabled
441        """
442        if self._debug:
443            logger.setLevel(logging.DEBUG)
444
445    def _validate_source(self) -> None:
446        """
447        Validates the connector manifest against the declarative component schema
448        """
449
450        try:
451            validate(self._source_config, self._declarative_component_schema)
452        except ValidationError as e:
453            raise ValidationError(
454                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
455            ) from e
456
457        cdk_version_str = metadata.version("airbyte_cdk")
458        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
459        manifest_version_str = self._source_config.get("version")
460        if manifest_version_str is None:
461            raise RuntimeError(
462                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
463            )
464        manifest_version = self._parse_version(manifest_version_str, "manifest")
465
466        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
467            # Skipping version compatibility check on unreleased dev branch
468            pass
469        elif (cdk_version.major, cdk_version.minor) < (
470            manifest_version.major,
471            manifest_version.minor,
472        ):
473            raise ValidationError(
474                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
475                f"manifest may contain features that are not in the current CDK version."
476            )
477        elif (manifest_version.major, manifest_version.minor) < (0, 29):
478            raise ValidationError(
479                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
480                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
481                f"{cdk_version!s} which contains these breaking changes."
482            )
483
484    @staticmethod
485    def _parse_version(
486        version: str,
487        version_type: str,
488    ) -> Version:
489        """Takes a semantic version represented as a string and splits it into a tuple.
490
491        The fourth part (prerelease) is not returned in the tuple.
492
493        Returns:
494            Version: the parsed version object
495        """
496        try:
497            parsed_version = Version(version)
498        except InvalidVersion as ex:
499            raise ValidationError(
500                f"The {version_type} version '{version}' is not a valid version format."
501            ) from ex
502        else:
503            # No exception
504            return parsed_version
505
506    def _stream_configs(
507        self, manifest: Mapping[str, Any], config: Mapping[str, Any]
508    ) -> List[Dict[str, Any]]:
509        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
510        stream_configs = []
511        for current_stream_config in manifest.get("streams", []):
512            if (
513                "type" in current_stream_config
514                and current_stream_config["type"] == "ConditionalStreams"
515            ):
516                interpolated_boolean = InterpolatedBoolean(
517                    condition=current_stream_config.get("condition"),
518                    parameters={},
519                )
520
521                if interpolated_boolean.eval(config=config):
522                    stream_configs.extend(current_stream_config.get("streams", []))
523            else:
524                if "type" not in current_stream_config:
525                    current_stream_config["type"] = "DeclarativeStream"
526                stream_configs.append(current_stream_config)
527        return stream_configs
528
529    def _dynamic_stream_configs(
530        self,
531        manifest: Mapping[str, Any],
532        config: Mapping[str, Any],
533        with_dynamic_stream_name: Optional[bool] = None,
534    ) -> List[Dict[str, Any]]:
535        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
536        dynamic_stream_configs: List[Dict[str, Any]] = []
537        seen_dynamic_streams: Set[str] = set()
538
539        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
540            components_resolver_config = dynamic_definition["components_resolver"]
541
542            if not components_resolver_config:
543                raise ValueError(
544                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
545                )
546
547            resolver_type = components_resolver_config.get("type")
548            if not resolver_type:
549                raise ValueError(
550                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
551                )
552
553            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
554                raise ValueError(
555                    f"Invalid components resolver type '{resolver_type}'. "
556                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
557                )
558
559            if "retriever" in components_resolver_config:
560                components_resolver_config["retriever"]["requester"]["use_cache"] = True
561
562            # Create a resolver for dynamic components based on type
563            if resolver_type == "HttpComponentsResolver":
564                components_resolver = self._constructor.create_component(
565                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
566                    component_definition=components_resolver_config,
567                    config=config,
568                    stream_name=dynamic_definition.get("name"),
569                )
570            else:
571                components_resolver = self._constructor.create_component(
572                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
573                    component_definition=components_resolver_config,
574                    config=config,
575                )
576
577            stream_template_config = dynamic_definition["stream_template"]
578
579            for dynamic_stream in components_resolver.resolve_components(
580                stream_template_config=stream_template_config
581            ):
582                # Get the use_parent_parameters configuration from the dynamic definition
583                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
584                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
585
586                dynamic_stream = {
587                    **ManifestComponentTransformer().propagate_types_and_parameters(
588                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
589                    )
590                }
591
592                if "type" not in dynamic_stream:
593                    dynamic_stream["type"] = "DeclarativeStream"
594
595                # Ensure that each stream is created with a unique name
596                name = dynamic_stream.get("name")
597
598                if with_dynamic_stream_name:
599                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
600                        "name", f"dynamic_stream_{dynamic_definition_index}"
601                    )
602
603                if not isinstance(name, str):
604                    raise ValueError(
605                        f"Expected stream name {name} to be a string, got {type(name)}."
606                    )
607
608                if name in seen_dynamic_streams:
609                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
610                    failure_type = FailureType.system_error
611
612                    if resolver_type == "ConfigComponentsResolver":
613                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
614                        failure_type = FailureType.config_error
615
616                    raise AirbyteTracedException(
617                        message=error_message,
618                        internal_message=error_message,
619                        failure_type=failure_type,
620                    )
621
622                seen_dynamic_streams.add(name)
623                dynamic_stream_configs.append(dynamic_stream)
624
625        return dynamic_stream_configs
626
627    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
628        self.logger.debug("declarative source created from manifest", extra=extra_args)
 99class ManifestDeclarativeSource(DeclarativeSource):
100    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
101
102    def __init__(
103        self,
104        source_config: ConnectionDefinition,
105        *,
106        config: Mapping[str, Any] | None = None,
107        debug: bool = False,
108        emit_connector_builder_messages: bool = False,
109        component_factory: Optional[ModelToComponentFactory] = None,
110        migrate_manifest: Optional[bool] = False,
111        normalize_manifest: Optional[bool] = False,
112        config_path: Optional[str] = None,
113    ) -> None:
114        """
115        Args:
116            config: The provided config dict.
117            source_config: The manifest of low-code components that describe the source connector.
118            debug: True if debug mode is enabled.
119            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
120            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
121            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
122            config_path: Optional path to the config file.
123        """
124        self.logger = logging.getLogger(f"airbyte.{self.name}")
125        self._should_normalize = normalize_manifest
126        self._should_migrate = migrate_manifest
127        self._declarative_component_schema = _get_declarative_component_schema()
128        # If custom components are needed, locate and/or register them.
129        self.components_module: ModuleType | None = get_registered_components_module(config=config)
130        # set additional attributes
131        self._debug = debug
132        self._emit_connector_builder_messages = emit_connector_builder_messages
133        self._constructor = (
134            component_factory
135            if component_factory
136            else ModelToComponentFactory(
137                emit_connector_builder_messages=emit_connector_builder_messages,
138                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
139            )
140        )
141        self._message_repository = self._constructor.get_message_repository()
142        self._slice_logger: SliceLogger = (
143            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
144        )
145
146        # resolve all components in the manifest
147        self._source_config = self._pre_process_manifest(dict(source_config))
148        # validate resolved manifest against the declarative component schema
149        self._validate_source()
150        # apply additional post-processing to the manifest
151        self._post_process_manifest()
152
153        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
154        self._spec_component: Optional[Spec] = (
155            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
156        )
157        self._config = self._migrate_and_transform_config(config_path, config) or {}
158
159    @property
160    def resolved_manifest(self) -> Mapping[str, Any]:
161        """
162        Returns the resolved manifest configuration for the source.
163
164        This property provides access to the internal source configuration as a mapping,
165        which contains all settings and parameters required to define the source's behavior.
166
167        Returns:
168            Mapping[str, Any]: The resolved source configuration manifest.
169        """
170        return self._source_config
171
172    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
173        """
174        Preprocesses the provided manifest dictionary by resolving any manifest references.
175
176        This method modifies the input manifest in place, resolving references using the
177        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
178
179        Args:
180            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
181
182        Returns:
183            None
184        """
185        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
186        manifest = self._fix_source_type(manifest)
187        # Resolve references in the manifest
188        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
189        # Propagate types and parameters throughout the manifest
190        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
191            "", resolved_manifest, {}
192        )
193
194        return propagated_manifest
195
196    def _post_process_manifest(self) -> None:
197        """
198        Post-processes the manifest after validation.
199        This method is responsible for any additional modifications or transformations needed
200        after the manifest has been validated and before it is used in the source.
201        """
202        # apply manifest migration, if required
203        self._migrate_manifest()
204        # apply manifest normalization, if required
205        self._normalize_manifest()
206
207    def _normalize_manifest(self) -> None:
208        """
209        This method is used to normalize the manifest. It should be called after the manifest has been validated.
210
211        Connector Builder UI rendering requires the manifest to be in a specific format.
212         - references have been resolved
213         - the commonly used definitions are extracted to the `definitions.linked.*`
214        """
215        if self._should_normalize:
216            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
217            self._source_config = normalizer.normalize()
218
219    def _migrate_and_transform_config(
220        self,
221        config_path: Optional[str],
222        config: Optional[Config],
223    ) -> Optional[Config]:
224        if not config:
225            return None
226        if not self._spec_component:
227            return config
228        mutable_config = dict(config)
229        self._spec_component.migrate_config(mutable_config)
230        if mutable_config != config:
231            if config_path:
232                with open(config_path, "w") as f:
233                    json.dump(mutable_config, f)
234            self.message_repository.emit_message(
235                create_connector_config_control_message(mutable_config)
236            )
237            # We have no mechanism for consuming the queue, so we print the messages to stdout
238            for message in self.message_repository.consume_queue():
239                print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
240        self._spec_component.transform_config(mutable_config)
241        return mutable_config
242
243    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
244        config = self._config or config
245        return super().configure(config, temp_dir)
246
247    def _migrate_manifest(self) -> None:
248        """
249        This method is used to migrate the manifest. It should be called after the manifest has been validated.
250        The migration is done in place, so the original manifest is modified.
251
252        The original manifest is returned if any error occurs during migration.
253        """
254        if self._should_migrate:
255            manifest_migrator = ManifestMigrationHandler(self._source_config)
256            self._source_config = manifest_migrator.apply_migrations()
257            # validate migrated manifest against the declarative component schema
258            self._validate_source()
259
260    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
261        """
262        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
263        """
264        if "type" not in manifest:
265            manifest["type"] = "DeclarativeSource"
266
267        return manifest
268
269    @property
270    def message_repository(self) -> MessageRepository:
271        return self._message_repository
272
273    @property
274    def dynamic_streams(self) -> List[Dict[str, Any]]:
275        return self._dynamic_stream_configs(
276            manifest=self._source_config,
277            config=self._config,
278            with_dynamic_stream_name=True,
279        )
280
281    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
282        return self._constructor.get_model_deprecations()
283
284    @property
285    def connection_checker(self) -> ConnectionChecker:
286        check = self._source_config["check"]
287        if "type" not in check:
288            check["type"] = "CheckStream"
289        check_stream = self._constructor.create_component(
290            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
291            check,
292            dict(),
293            emit_connector_builder_messages=self._emit_connector_builder_messages,
294        )
295        if isinstance(check_stream, ConnectionChecker):
296            return check_stream
297        else:
298            raise ValueError(
299                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
300            )
301
302    def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
303        """
304        As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
305        Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
306        fully decouple this from the AbstractSource.
307        """
308        if self._spec_component:
309            self._spec_component.validate_config(config)
310
311        self._emit_manifest_debug_message(
312            extra_args={
313                "source_name": self.name,
314                "parsed_config": json.dumps(self._source_config),
315            }
316        )
317
318        stream_configs = (
319            self._stream_configs(self._source_config, config=config) + self.dynamic_streams
320        )
321
322        api_budget_model = self._source_config.get("api_budget")
323        if api_budget_model:
324            self._constructor.set_api_budget(api_budget_model, config)
325
326        source_streams = [
327            self._constructor.create_component(
328                (
329                    StateDelegatingStreamModel
330                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
331                    else DeclarativeStreamModel
332                ),
333                stream_config,
334                config,
335                emit_connector_builder_messages=self._emit_connector_builder_messages,
336            )
337            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
338        ]
339        return source_streams
340
341    @staticmethod
342    def _initialize_cache_for_parent_streams(
343        stream_configs: List[Dict[str, Any]],
344    ) -> List[Dict[str, Any]]:
345        """Enable caching for parent streams unless explicitly disabled.
346
347        Caching is enabled by default for parent streams to optimize performance when the same
348        parent data is needed by multiple child streams. However, explicit `use_cache: false`
349        settings are respected for streams that cannot use caching (e.g., scroll-based pagination
350        APIs where caching causes duplicate records).
351        """
352        parent_streams = set()
353
354        def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
355            """Set use_cache to True only if not explicitly disabled."""
356            if requester.get("use_cache") is not False:
357                requester["use_cache"] = True
358
359        def update_with_cache_parent_configs(
360            parent_configs: list[dict[str, Any]],
361        ) -> None:
362            for parent_config in parent_configs:
363                parent_streams.add(parent_config["stream"]["name"])
364                if parent_config["stream"]["type"] == "StateDelegatingStream":
365                    _set_cache_if_not_disabled(
366                        parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
367                    )
368                    _set_cache_if_not_disabled(
369                        parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
370                    )
371                else:
372                    _set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
373
374        for stream_config in stream_configs:
375            if stream_config.get("incremental_sync", {}).get("parent_stream"):
376                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
377                _set_cache_if_not_disabled(
378                    stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
379                )
380
381            elif stream_config.get("retriever", {}).get("partition_router", {}):
382                partition_router = stream_config["retriever"]["partition_router"]
383
384                if isinstance(partition_router, dict) and partition_router.get(
385                    "parent_stream_configs"
386                ):
387                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
388                elif isinstance(partition_router, list):
389                    for router in partition_router:
390                        if router.get("parent_stream_configs"):
391                            update_with_cache_parent_configs(router["parent_stream_configs"])
392
393        for stream_config in stream_configs:
394            if stream_config["name"] in parent_streams:
395                if stream_config["type"] == "StateDelegatingStream":
396                    _set_cache_if_not_disabled(
397                        stream_config["full_refresh_stream"]["retriever"]["requester"]
398                    )
399                    _set_cache_if_not_disabled(
400                        stream_config["incremental_stream"]["retriever"]["requester"]
401                    )
402                else:
403                    _set_cache_if_not_disabled(stream_config["retriever"]["requester"])
404        return stream_configs
405
406    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
407        """
408        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
409        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
410        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
411        in the project root.
412        """
413        self._configure_logger_level(logger)
414        self._emit_manifest_debug_message(
415            extra_args={
416                "source_name": self.name,
417                "parsed_config": json.dumps(self._source_config),
418            }
419        )
420
421        return (
422            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
423        )
424
425    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
426        self._configure_logger_level(logger)
427        return super().check(logger, config)
428
429    def read(
430        self,
431        logger: logging.Logger,
432        config: Mapping[str, Any],
433        catalog: ConfiguredAirbyteCatalog,
434        state: Optional[List[AirbyteStateMessage]] = None,
435    ) -> Iterator[AirbyteMessage]:
436        self._configure_logger_level(logger)
437        yield from super().read(logger, config, catalog, state)
438
439    def _configure_logger_level(self, logger: logging.Logger) -> None:
440        """
441        Set the log level to logging.DEBUG if debug mode is enabled
442        """
443        if self._debug:
444            logger.setLevel(logging.DEBUG)
445
446    def _validate_source(self) -> None:
447        """
448        Validates the connector manifest against the declarative component schema
449        """
450
451        try:
452            validate(self._source_config, self._declarative_component_schema)
453        except ValidationError as e:
454            raise ValidationError(
455                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
456            ) from e
457
458        cdk_version_str = metadata.version("airbyte_cdk")
459        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
460        manifest_version_str = self._source_config.get("version")
461        if manifest_version_str is None:
462            raise RuntimeError(
463                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
464            )
465        manifest_version = self._parse_version(manifest_version_str, "manifest")
466
467        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
468            # Skipping version compatibility check on unreleased dev branch
469            pass
470        elif (cdk_version.major, cdk_version.minor) < (
471            manifest_version.major,
472            manifest_version.minor,
473        ):
474            raise ValidationError(
475                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
476                f"manifest may contain features that are not in the current CDK version."
477            )
478        elif (manifest_version.major, manifest_version.minor) < (0, 29):
479            raise ValidationError(
480                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
481                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
482                f"{cdk_version!s} which contains these breaking changes."
483            )
484
485    @staticmethod
486    def _parse_version(
487        version: str,
488        version_type: str,
489    ) -> Version:
490        """Takes a semantic version represented as a string and splits it into a tuple.
491
492        The fourth part (prerelease) is not returned in the tuple.
493
494        Returns:
495            Version: the parsed version object
496        """
497        try:
498            parsed_version = Version(version)
499        except InvalidVersion as ex:
500            raise ValidationError(
501                f"The {version_type} version '{version}' is not a valid version format."
502            ) from ex
503        else:
504            # No exception
505            return parsed_version
506
507    def _stream_configs(
508        self, manifest: Mapping[str, Any], config: Mapping[str, Any]
509    ) -> List[Dict[str, Any]]:
510        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
511        stream_configs = []
512        for current_stream_config in manifest.get("streams", []):
513            if (
514                "type" in current_stream_config
515                and current_stream_config["type"] == "ConditionalStreams"
516            ):
517                interpolated_boolean = InterpolatedBoolean(
518                    condition=current_stream_config.get("condition"),
519                    parameters={},
520                )
521
522                if interpolated_boolean.eval(config=config):
523                    stream_configs.extend(current_stream_config.get("streams", []))
524            else:
525                if "type" not in current_stream_config:
526                    current_stream_config["type"] = "DeclarativeStream"
527                stream_configs.append(current_stream_config)
528        return stream_configs
529
530    def _dynamic_stream_configs(
531        self,
532        manifest: Mapping[str, Any],
533        config: Mapping[str, Any],
534        with_dynamic_stream_name: Optional[bool] = None,
535    ) -> List[Dict[str, Any]]:
536        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
537        dynamic_stream_configs: List[Dict[str, Any]] = []
538        seen_dynamic_streams: Set[str] = set()
539
540        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
541            components_resolver_config = dynamic_definition["components_resolver"]
542
543            if not components_resolver_config:
544                raise ValueError(
545                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
546                )
547
548            resolver_type = components_resolver_config.get("type")
549            if not resolver_type:
550                raise ValueError(
551                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
552                )
553
554            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
555                raise ValueError(
556                    f"Invalid components resolver type '{resolver_type}'. "
557                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
558                )
559
560            if "retriever" in components_resolver_config:
561                components_resolver_config["retriever"]["requester"]["use_cache"] = True
562
563            # Create a resolver for dynamic components based on type
564            if resolver_type == "HttpComponentsResolver":
565                components_resolver = self._constructor.create_component(
566                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
567                    component_definition=components_resolver_config,
568                    config=config,
569                    stream_name=dynamic_definition.get("name"),
570                )
571            else:
572                components_resolver = self._constructor.create_component(
573                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
574                    component_definition=components_resolver_config,
575                    config=config,
576                )
577
578            stream_template_config = dynamic_definition["stream_template"]
579
580            for dynamic_stream in components_resolver.resolve_components(
581                stream_template_config=stream_template_config
582            ):
583                # Get the use_parent_parameters configuration from the dynamic definition
584                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
585                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
586
587                dynamic_stream = {
588                    **ManifestComponentTransformer().propagate_types_and_parameters(
589                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
590                    )
591                }
592
593                if "type" not in dynamic_stream:
594                    dynamic_stream["type"] = "DeclarativeStream"
595
596                # Ensure that each stream is created with a unique name
597                name = dynamic_stream.get("name")
598
599                if with_dynamic_stream_name:
600                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
601                        "name", f"dynamic_stream_{dynamic_definition_index}"
602                    )
603
604                if not isinstance(name, str):
605                    raise ValueError(
606                        f"Expected stream name {name} to be a string, got {type(name)}."
607                    )
608
609                if name in seen_dynamic_streams:
610                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
611                    failure_type = FailureType.system_error
612
613                    if resolver_type == "ConfigComponentsResolver":
614                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
615                        failure_type = FailureType.config_error
616
617                    raise AirbyteTracedException(
618                        message=error_message,
619                        internal_message=error_message,
620                        failure_type=failure_type,
621                    )
622
623                seen_dynamic_streams.add(name)
624                dynamic_stream_configs.append(dynamic_stream)
625
626        return dynamic_stream_configs
627
628    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
629        self.logger.debug("declarative source created from manifest", extra=extra_args)

Declarative source defined by a manifest of low-code components that define source connector behavior

ManifestDeclarativeSource( source_config: Mapping[str, Any], *, config: Optional[Mapping[str, Any]] = None, debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[airbyte_cdk.sources.declarative.parsers.model_to_component_factory.ModelToComponentFactory] = None, migrate_manifest: Optional[bool] = False, normalize_manifest: Optional[bool] = False, config_path: Optional[str] = None)
102    def __init__(
103        self,
104        source_config: ConnectionDefinition,
105        *,
106        config: Mapping[str, Any] | None = None,
107        debug: bool = False,
108        emit_connector_builder_messages: bool = False,
109        component_factory: Optional[ModelToComponentFactory] = None,
110        migrate_manifest: Optional[bool] = False,
111        normalize_manifest: Optional[bool] = False,
112        config_path: Optional[str] = None,
113    ) -> None:
114        """
115        Args:
116            config: The provided config dict.
117            source_config: The manifest of low-code components that describe the source connector.
118            debug: True if debug mode is enabled.
119            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
120            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
121            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
122            config_path: Optional path to the config file.
123        """
124        self.logger = logging.getLogger(f"airbyte.{self.name}")
125        self._should_normalize = normalize_manifest
126        self._should_migrate = migrate_manifest
127        self._declarative_component_schema = _get_declarative_component_schema()
128        # If custom components are needed, locate and/or register them.
129        self.components_module: ModuleType | None = get_registered_components_module(config=config)
130        # set additional attributes
131        self._debug = debug
132        self._emit_connector_builder_messages = emit_connector_builder_messages
133        self._constructor = (
134            component_factory
135            if component_factory
136            else ModelToComponentFactory(
137                emit_connector_builder_messages=emit_connector_builder_messages,
138                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
139            )
140        )
141        self._message_repository = self._constructor.get_message_repository()
142        self._slice_logger: SliceLogger = (
143            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
144        )
145
146        # resolve all components in the manifest
147        self._source_config = self._pre_process_manifest(dict(source_config))
148        # validate resolved manifest against the declarative component schema
149        self._validate_source()
150        # apply additional post-processing to the manifest
151        self._post_process_manifest()
152
153        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
154        self._spec_component: Optional[Spec] = (
155            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
156        )
157        self._config = self._migrate_and_transform_config(config_path, config) or {}
Arguments:
  • config: The provided config dict.
  • source_config: The manifest of low-code components that describe the source connector.
  • debug: True if debug mode is enabled.
  • emit_connector_builder_messages: True if messages should be emitted to the connector builder.
  • component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
  • normalize_manifest: Optional flag to indicate if the manifest should be normalized.
  • config_path: Optional path to the config file.
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
159    @property
160    def resolved_manifest(self) -> Mapping[str, Any]:
161        """
162        Returns the resolved manifest configuration for the source.
163
164        This property provides access to the internal source configuration as a mapping,
165        which contains all settings and parameters required to define the source's behavior.
166
167        Returns:
168            Mapping[str, Any]: The resolved source configuration manifest.
169        """
170        return self._source_config

Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.

Returns:

Mapping[str, Any]: The resolved source configuration manifest.

message_repository: airbyte_cdk.MessageRepository
269    @property
270    def message_repository(self) -> MessageRepository:
271        return self._message_repository
dynamic_streams: List[Dict[str, Any]]
273    @property
274    def dynamic_streams(self) -> List[Dict[str, Any]]:
275        return self._dynamic_stream_configs(
276            manifest=self._source_config,
277            config=self._config,
278            with_dynamic_stream_name=True,
279        )
def deprecation_warnings(self) -> List[airbyte_cdk.connector_builder.models.LogMessage]:
281    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
282        return self._constructor.get_model_deprecations()

Returns a list of deprecation warnings for the source.

284    @property
285    def connection_checker(self) -> ConnectionChecker:
286        check = self._source_config["check"]
287        if "type" not in check:
288            check["type"] = "CheckStream"
289        check_stream = self._constructor.create_component(
290            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
291            check,
292            dict(),
293            emit_connector_builder_messages=self._emit_connector_builder_messages,
294        )
295        if isinstance(check_stream, ConnectionChecker):
296            return check_stream
297        else:
298            raise ValueError(
299                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
300            )

Returns the ConnectionChecker to use for the check operation

def streams( self, config: Mapping[str, Any]) -> List[Union[airbyte_cdk.Stream, airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream]]:
302    def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
303        """
304        As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
305        Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
306        fully decouple this from the AbstractSource.
307        """
308        if self._spec_component:
309            self._spec_component.validate_config(config)
310
311        self._emit_manifest_debug_message(
312            extra_args={
313                "source_name": self.name,
314                "parsed_config": json.dumps(self._source_config),
315            }
316        )
317
318        stream_configs = (
319            self._stream_configs(self._source_config, config=config) + self.dynamic_streams
320        )
321
322        api_budget_model = self._source_config.get("api_budget")
323        if api_budget_model:
324            self._constructor.set_api_budget(api_budget_model, config)
325
326        source_streams = [
327            self._constructor.create_component(
328                (
329                    StateDelegatingStreamModel
330                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
331                    else DeclarativeStreamModel
332                ),
333                stream_config,
334                config,
335                emit_connector_builder_messages=self._emit_connector_builder_messages,
336            )
337            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
338        ]
339        return source_streams

As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to fully decouple this from the AbstractSource.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
406    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
407        """
408        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
409        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
410        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
411        in the project root.
412        """
413        self._configure_logger_level(logger)
414        self._emit_manifest_debug_message(
415            extra_args={
416                "source_name": self.name,
417                "parsed_config": json.dumps(self._source_config),
418            }
419        )
420
421        return (
422            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
423        )

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
425    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
426        self._configure_logger_level(logger)
427        return super().check(logger, config)

Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
429    def read(
430        self,
431        logger: logging.Logger,
432        config: Mapping[str, Any],
433        catalog: ConfiguredAirbyteCatalog,
434        state: Optional[List[AirbyteStateMessage]] = None,
435    ) -> Iterator[AirbyteMessage]:
436        self._configure_logger_level(logger)
437        yield from super().read(logger, config, catalog, state)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.