airbyte_cdk.legacy.sources.declarative.manifest_declarative_source

  1#
  2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
  3#
  4
  5import json
  6import logging
  7import pkgutil
  8from copy import deepcopy
  9from importlib import metadata
 10from types import ModuleType
 11from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Union
 12
 13import orjson
 14import yaml
 15from jsonschema.exceptions import ValidationError
 16from jsonschema.validators import validate
 17from packaging.version import InvalidVersion, Version
 18
 19from airbyte_cdk.config_observation import create_connector_config_control_message
 20from airbyte_cdk.connector_builder.models import (
 21    LogMessage as ConnectorBuilderLogMessage,
 22)
 23from airbyte_cdk.legacy.sources.declarative.declarative_source import DeclarativeSource
 24from airbyte_cdk.manifest_migrations.migration_handler import (
 25    ManifestMigrationHandler,
 26)
 27from airbyte_cdk.models import (
 28    AirbyteConnectionStatus,
 29    AirbyteMessage,
 30    AirbyteStateMessage,
 31    ConfiguredAirbyteCatalog,
 32    ConnectorSpecification,
 33    FailureType,
 34)
 35from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
 36from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
 37from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
 38from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
 39from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 40    ConditionalStreams as ConditionalStreamsModel,
 41)
 42from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 43    DeclarativeStream as DeclarativeStreamModel,
 44)
 45from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 46    Spec as SpecModel,
 47)
 48from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
 49    StateDelegatingStream as StateDelegatingStreamModel,
 50)
 51from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
 52    get_registered_components_module,
 53)
 54from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
 55    ManifestComponentTransformer,
 56)
 57from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
 58    ManifestNormalizer,
 59)
 60from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
 61    ManifestReferenceResolver,
 62)
 63from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
 64    ModelToComponentFactory,
 65)
 66from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
 67from airbyte_cdk.sources.declarative.spec.spec import Spec
 68from airbyte_cdk.sources.message import MessageRepository
 69from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
 70from airbyte_cdk.sources.streams.core import Stream
 71from airbyte_cdk.sources.types import Config, ConnectionDefinition
 72from airbyte_cdk.sources.utils.slice_logger import (
 73    AlwaysLogSliceLogger,
 74    DebugSliceLogger,
 75    SliceLogger,
 76)
 77from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 78
 79
 80def _get_declarative_component_schema() -> Dict[str, Any]:
 81    try:
 82        raw_component_schema = pkgutil.get_data(
 83            "airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
 84        )
 85        if raw_component_schema is not None:
 86            declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
 87            return declarative_component_schema  # type: ignore
 88        else:
 89            raise RuntimeError(
 90                "Failed to read manifest component json schema required for deduplication"
 91            )
 92    except FileNotFoundError as e:
 93        raise FileNotFoundError(
 94            f"Failed to read manifest component json schema required for deduplication: {e}"
 95        )
 96
 97
 98class ManifestDeclarativeSource(DeclarativeSource):
 99    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
100
101    def __init__(
102        self,
103        source_config: ConnectionDefinition,
104        *,
105        config: Mapping[str, Any] | None = None,
106        debug: bool = False,
107        emit_connector_builder_messages: bool = False,
108        component_factory: Optional[ModelToComponentFactory] = None,
109        migrate_manifest: Optional[bool] = False,
110        normalize_manifest: Optional[bool] = False,
111        config_path: Optional[str] = None,
112    ) -> None:
113        """
114        Args:
115            config: The provided config dict.
116            source_config: The manifest of low-code components that describe the source connector.
117            debug: True if debug mode is enabled.
118            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
119            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
120            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
121            config_path: Optional path to the config file.
122        """
123        self.logger = logging.getLogger(f"airbyte.{self.name}")
124        self._should_normalize = normalize_manifest
125        self._should_migrate = migrate_manifest
126        self._declarative_component_schema = _get_declarative_component_schema()
127        # If custom components are needed, locate and/or register them.
128        self.components_module: ModuleType | None = get_registered_components_module(config=config)
129        # set additional attributes
130        self._debug = debug
131        self._emit_connector_builder_messages = emit_connector_builder_messages
132        self._constructor = (
133            component_factory
134            if component_factory
135            else ModelToComponentFactory(
136                emit_connector_builder_messages=emit_connector_builder_messages,
137                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
138            )
139        )
140        self._message_repository = self._constructor.get_message_repository()
141        self._slice_logger: SliceLogger = (
142            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
143        )
144
145        # resolve all components in the manifest
146        self._source_config = self._pre_process_manifest(dict(source_config))
147        # validate resolved manifest against the declarative component schema
148        self._validate_source()
149        # apply additional post-processing to the manifest
150        self._post_process_manifest()
151
152        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
153        self._spec_component: Optional[Spec] = (
154            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
155        )
156        self._config = self._migrate_and_transform_config(config_path, config) or {}
157
158    @property
159    def resolved_manifest(self) -> Mapping[str, Any]:
160        """
161        Returns the resolved manifest configuration for the source.
162
163        This property provides access to the internal source configuration as a mapping,
164        which contains all settings and parameters required to define the source's behavior.
165
166        Returns:
167            Mapping[str, Any]: The resolved source configuration manifest.
168        """
169        return self._source_config
170
171    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
172        """
173        Preprocesses the provided manifest dictionary by resolving any manifest references.
174
175        This method modifies the input manifest in place, resolving references using the
176        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
177
178        Args:
179            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
180
181        Returns:
182            None
183        """
184        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
185        manifest = self._fix_source_type(manifest)
186        # Resolve references in the manifest
187        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
188        # Propagate types and parameters throughout the manifest
189        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
190            "", resolved_manifest, {}
191        )
192
193        return propagated_manifest
194
195    def _post_process_manifest(self) -> None:
196        """
197        Post-processes the manifest after validation.
198        This method is responsible for any additional modifications or transformations needed
199        after the manifest has been validated and before it is used in the source.
200        """
201        # apply manifest migration, if required
202        self._migrate_manifest()
203        # apply manifest normalization, if required
204        self._normalize_manifest()
205
206    def _normalize_manifest(self) -> None:
207        """
208        This method is used to normalize the manifest. It should be called after the manifest has been validated.
209
210        Connector Builder UI rendering requires the manifest to be in a specific format.
211         - references have been resolved
212         - the commonly used definitions are extracted to the `definitions.linked.*`
213        """
214        if self._should_normalize:
215            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
216            self._source_config = normalizer.normalize()
217
218    def _migrate_and_transform_config(
219        self,
220        config_path: Optional[str],
221        config: Optional[Config],
222    ) -> Optional[Config]:
223        if not config:
224            return None
225        if not self._spec_component:
226            return config
227        mutable_config = dict(config)
228        self._spec_component.migrate_config(mutable_config)
229        if mutable_config != config:
230            if config_path:
231                with open(config_path, "w") as f:
232                    json.dump(mutable_config, f)
233            self.message_repository.emit_message(
234                create_connector_config_control_message(mutable_config)
235            )
236            # We have no mechanism for consuming the queue, so we print the messages to stdout
237            for message in self.message_repository.consume_queue():
238                print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
239        self._spec_component.transform_config(mutable_config)
240        return mutable_config
241
242    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
243        config = self._config or config
244        return super().configure(config, temp_dir)
245
246    def _migrate_manifest(self) -> None:
247        """
248        This method is used to migrate the manifest. It should be called after the manifest has been validated.
249        The migration is done in place, so the original manifest is modified.
250
251        The original manifest is returned if any error occurs during migration.
252        """
253        if self._should_migrate:
254            manifest_migrator = ManifestMigrationHandler(self._source_config)
255            self._source_config = manifest_migrator.apply_migrations()
256            # validate migrated manifest against the declarative component schema
257            self._validate_source()
258
259    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
260        """
261        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
262        """
263        if "type" not in manifest:
264            manifest["type"] = "DeclarativeSource"
265
266        return manifest
267
268    @property
269    def message_repository(self) -> MessageRepository:
270        return self._message_repository
271
272    @property
273    def dynamic_streams(self) -> List[Dict[str, Any]]:
274        return self._dynamic_stream_configs(
275            manifest=self._source_config,
276            config=self._config,
277            with_dynamic_stream_name=True,
278        )
279
280    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
281        return self._constructor.get_model_deprecations()
282
283    @property
284    def connection_checker(self) -> ConnectionChecker:
285        check = self._source_config["check"]
286        if "type" not in check:
287            check["type"] = "CheckStream"
288        check_stream = self._constructor.create_component(
289            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
290            check,
291            dict(),
292            emit_connector_builder_messages=self._emit_connector_builder_messages,
293        )
294        if isinstance(check_stream, ConnectionChecker):
295            return check_stream
296        else:
297            raise ValueError(
298                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
299            )
300
301    def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
302        """
303        As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
304        Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
305        fully decouple this from the AbstractSource.
306        """
307        if self._spec_component:
308            self._spec_component.validate_config(config)
309
310        self._emit_manifest_debug_message(
311            extra_args={
312                "source_name": self.name,
313                "parsed_config": json.dumps(self._source_config),
314            }
315        )
316
317        stream_configs = (
318            self._stream_configs(self._source_config, config=config) + self.dynamic_streams
319        )
320
321        api_budget_model = self._source_config.get("api_budget")
322        if api_budget_model:
323            self._constructor.set_api_budget(api_budget_model, config)
324
325        source_streams = [
326            self._constructor.create_component(
327                (
328                    StateDelegatingStreamModel
329                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
330                    else DeclarativeStreamModel
331                ),
332                stream_config,
333                config,
334                emit_connector_builder_messages=self._emit_connector_builder_messages,
335            )
336            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
337        ]
338        return source_streams
339
340    @staticmethod
341    def _initialize_cache_for_parent_streams(
342        stream_configs: List[Dict[str, Any]],
343    ) -> List[Dict[str, Any]]:
344        parent_streams = set()
345
346        def update_with_cache_parent_configs(
347            parent_configs: list[dict[str, Any]],
348        ) -> None:
349            for parent_config in parent_configs:
350                parent_streams.add(parent_config["stream"]["name"])
351                if parent_config["stream"]["type"] == "StateDelegatingStream":
352                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
353                        "use_cache"
354                    ] = True
355                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
356                        "use_cache"
357                    ] = True
358                else:
359                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
360
361        for stream_config in stream_configs:
362            if stream_config.get("incremental_sync", {}).get("parent_stream"):
363                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
364                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
365                    "use_cache"
366                ] = True
367
368            elif stream_config.get("retriever", {}).get("partition_router", {}):
369                partition_router = stream_config["retriever"]["partition_router"]
370
371                if isinstance(partition_router, dict) and partition_router.get(
372                    "parent_stream_configs"
373                ):
374                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
375                elif isinstance(partition_router, list):
376                    for router in partition_router:
377                        if router.get("parent_stream_configs"):
378                            update_with_cache_parent_configs(router["parent_stream_configs"])
379
380        for stream_config in stream_configs:
381            if stream_config["name"] in parent_streams:
382                if stream_config["type"] == "StateDelegatingStream":
383                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
384                        True
385                    )
386                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
387                        True
388                    )
389                else:
390                    stream_config["retriever"]["requester"]["use_cache"] = True
391        return stream_configs
392
393    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
394        """
395        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
396        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
397        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
398        in the project root.
399        """
400        self._configure_logger_level(logger)
401        self._emit_manifest_debug_message(
402            extra_args={
403                "source_name": self.name,
404                "parsed_config": json.dumps(self._source_config),
405            }
406        )
407
408        return (
409            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
410        )
411
412    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
413        self._configure_logger_level(logger)
414        return super().check(logger, config)
415
416    def read(
417        self,
418        logger: logging.Logger,
419        config: Mapping[str, Any],
420        catalog: ConfiguredAirbyteCatalog,
421        state: Optional[List[AirbyteStateMessage]] = None,
422    ) -> Iterator[AirbyteMessage]:
423        self._configure_logger_level(logger)
424        yield from super().read(logger, config, catalog, state)
425
426    def _configure_logger_level(self, logger: logging.Logger) -> None:
427        """
428        Set the log level to logging.DEBUG if debug mode is enabled
429        """
430        if self._debug:
431            logger.setLevel(logging.DEBUG)
432
433    def _validate_source(self) -> None:
434        """
435        Validates the connector manifest against the declarative component schema
436        """
437
438        try:
439            validate(self._source_config, self._declarative_component_schema)
440        except ValidationError as e:
441            raise ValidationError(
442                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
443            ) from e
444
445        cdk_version_str = metadata.version("airbyte_cdk")
446        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
447        manifest_version_str = self._source_config.get("version")
448        if manifest_version_str is None:
449            raise RuntimeError(
450                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
451            )
452        manifest_version = self._parse_version(manifest_version_str, "manifest")
453
454        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
455            # Skipping version compatibility check on unreleased dev branch
456            pass
457        elif (cdk_version.major, cdk_version.minor) < (
458            manifest_version.major,
459            manifest_version.minor,
460        ):
461            raise ValidationError(
462                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
463                f"manifest may contain features that are not in the current CDK version."
464            )
465        elif (manifest_version.major, manifest_version.minor) < (0, 29):
466            raise ValidationError(
467                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
468                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
469                f"{cdk_version!s} which contains these breaking changes."
470            )
471
472    @staticmethod
473    def _parse_version(
474        version: str,
475        version_type: str,
476    ) -> Version:
477        """Takes a semantic version represented as a string and splits it into a tuple.
478
479        The fourth part (prerelease) is not returned in the tuple.
480
481        Returns:
482            Version: the parsed version object
483        """
484        try:
485            parsed_version = Version(version)
486        except InvalidVersion as ex:
487            raise ValidationError(
488                f"The {version_type} version '{version}' is not a valid version format."
489            ) from ex
490        else:
491            # No exception
492            return parsed_version
493
494    def _stream_configs(
495        self, manifest: Mapping[str, Any], config: Mapping[str, Any]
496    ) -> List[Dict[str, Any]]:
497        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
498        stream_configs = []
499        for current_stream_config in manifest.get("streams", []):
500            if (
501                "type" in current_stream_config
502                and current_stream_config["type"] == "ConditionalStreams"
503            ):
504                interpolated_boolean = InterpolatedBoolean(
505                    condition=current_stream_config.get("condition"),
506                    parameters={},
507                )
508
509                if interpolated_boolean.eval(config=config):
510                    stream_configs.extend(current_stream_config.get("streams", []))
511            else:
512                if "type" not in current_stream_config:
513                    current_stream_config["type"] = "DeclarativeStream"
514                stream_configs.append(current_stream_config)
515        return stream_configs
516
517    def _dynamic_stream_configs(
518        self,
519        manifest: Mapping[str, Any],
520        config: Mapping[str, Any],
521        with_dynamic_stream_name: Optional[bool] = None,
522    ) -> List[Dict[str, Any]]:
523        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
524        dynamic_stream_configs: List[Dict[str, Any]] = []
525        seen_dynamic_streams: Set[str] = set()
526
527        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
528            components_resolver_config = dynamic_definition["components_resolver"]
529
530            if not components_resolver_config:
531                raise ValueError(
532                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
533                )
534
535            resolver_type = components_resolver_config.get("type")
536            if not resolver_type:
537                raise ValueError(
538                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
539                )
540
541            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
542                raise ValueError(
543                    f"Invalid components resolver type '{resolver_type}'. "
544                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
545                )
546
547            if "retriever" in components_resolver_config:
548                components_resolver_config["retriever"]["requester"]["use_cache"] = True
549
550            # Create a resolver for dynamic components based on type
551            if resolver_type == "HttpComponentsResolver":
552                components_resolver = self._constructor.create_component(
553                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
554                    component_definition=components_resolver_config,
555                    config=config,
556                    stream_name=dynamic_definition.get("name"),
557                )
558            else:
559                components_resolver = self._constructor.create_component(
560                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
561                    component_definition=components_resolver_config,
562                    config=config,
563                )
564
565            stream_template_config = dynamic_definition["stream_template"]
566
567            for dynamic_stream in components_resolver.resolve_components(
568                stream_template_config=stream_template_config
569            ):
570                # Get the use_parent_parameters configuration from the dynamic definition
571                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
572                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
573
574                dynamic_stream = {
575                    **ManifestComponentTransformer().propagate_types_and_parameters(
576                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
577                    )
578                }
579
580                if "type" not in dynamic_stream:
581                    dynamic_stream["type"] = "DeclarativeStream"
582
583                # Ensure that each stream is created with a unique name
584                name = dynamic_stream.get("name")
585
586                if with_dynamic_stream_name:
587                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
588                        "name", f"dynamic_stream_{dynamic_definition_index}"
589                    )
590
591                if not isinstance(name, str):
592                    raise ValueError(
593                        f"Expected stream name {name} to be a string, got {type(name)}."
594                    )
595
596                if name in seen_dynamic_streams:
597                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
598                    failure_type = FailureType.system_error
599
600                    if resolver_type == "ConfigComponentsResolver":
601                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
602                        failure_type = FailureType.config_error
603
604                    raise AirbyteTracedException(
605                        message=error_message,
606                        internal_message=error_message,
607                        failure_type=failure_type,
608                    )
609
610                seen_dynamic_streams.add(name)
611                dynamic_stream_configs.append(dynamic_stream)
612
613        return dynamic_stream_configs
614
615    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
616        self.logger.debug("declarative source created from manifest", extra=extra_args)
 99class ManifestDeclarativeSource(DeclarativeSource):
100    """Declarative source defined by a manifest of low-code components that define source connector behavior"""
101
102    def __init__(
103        self,
104        source_config: ConnectionDefinition,
105        *,
106        config: Mapping[str, Any] | None = None,
107        debug: bool = False,
108        emit_connector_builder_messages: bool = False,
109        component_factory: Optional[ModelToComponentFactory] = None,
110        migrate_manifest: Optional[bool] = False,
111        normalize_manifest: Optional[bool] = False,
112        config_path: Optional[str] = None,
113    ) -> None:
114        """
115        Args:
116            config: The provided config dict.
117            source_config: The manifest of low-code components that describe the source connector.
118            debug: True if debug mode is enabled.
119            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
120            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
121            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
122            config_path: Optional path to the config file.
123        """
124        self.logger = logging.getLogger(f"airbyte.{self.name}")
125        self._should_normalize = normalize_manifest
126        self._should_migrate = migrate_manifest
127        self._declarative_component_schema = _get_declarative_component_schema()
128        # If custom components are needed, locate and/or register them.
129        self.components_module: ModuleType | None = get_registered_components_module(config=config)
130        # set additional attributes
131        self._debug = debug
132        self._emit_connector_builder_messages = emit_connector_builder_messages
133        self._constructor = (
134            component_factory
135            if component_factory
136            else ModelToComponentFactory(
137                emit_connector_builder_messages=emit_connector_builder_messages,
138                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
139            )
140        )
141        self._message_repository = self._constructor.get_message_repository()
142        self._slice_logger: SliceLogger = (
143            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
144        )
145
146        # resolve all components in the manifest
147        self._source_config = self._pre_process_manifest(dict(source_config))
148        # validate resolved manifest against the declarative component schema
149        self._validate_source()
150        # apply additional post-processing to the manifest
151        self._post_process_manifest()
152
153        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
154        self._spec_component: Optional[Spec] = (
155            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
156        )
157        self._config = self._migrate_and_transform_config(config_path, config) or {}
158
159    @property
160    def resolved_manifest(self) -> Mapping[str, Any]:
161        """
162        Returns the resolved manifest configuration for the source.
163
164        This property provides access to the internal source configuration as a mapping,
165        which contains all settings and parameters required to define the source's behavior.
166
167        Returns:
168            Mapping[str, Any]: The resolved source configuration manifest.
169        """
170        return self._source_config
171
172    def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
173        """
174        Preprocesses the provided manifest dictionary by resolving any manifest references.
175
176        This method modifies the input manifest in place, resolving references using the
177        ManifestReferenceResolver to ensure all references within the manifest are properly handled.
178
179        Args:
180            manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
181
182        Returns:
183            None
184        """
185        # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
186        manifest = self._fix_source_type(manifest)
187        # Resolve references in the manifest
188        resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
189        # Propagate types and parameters throughout the manifest
190        propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
191            "", resolved_manifest, {}
192        )
193
194        return propagated_manifest
195
196    def _post_process_manifest(self) -> None:
197        """
198        Post-processes the manifest after validation.
199        This method is responsible for any additional modifications or transformations needed
200        after the manifest has been validated and before it is used in the source.
201        """
202        # apply manifest migration, if required
203        self._migrate_manifest()
204        # apply manifest normalization, if required
205        self._normalize_manifest()
206
207    def _normalize_manifest(self) -> None:
208        """
209        This method is used to normalize the manifest. It should be called after the manifest has been validated.
210
211        Connector Builder UI rendering requires the manifest to be in a specific format.
212         - references have been resolved
213         - the commonly used definitions are extracted to the `definitions.linked.*`
214        """
215        if self._should_normalize:
216            normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
217            self._source_config = normalizer.normalize()
218
219    def _migrate_and_transform_config(
220        self,
221        config_path: Optional[str],
222        config: Optional[Config],
223    ) -> Optional[Config]:
224        if not config:
225            return None
226        if not self._spec_component:
227            return config
228        mutable_config = dict(config)
229        self._spec_component.migrate_config(mutable_config)
230        if mutable_config != config:
231            if config_path:
232                with open(config_path, "w") as f:
233                    json.dump(mutable_config, f)
234            self.message_repository.emit_message(
235                create_connector_config_control_message(mutable_config)
236            )
237            # We have no mechanism for consuming the queue, so we print the messages to stdout
238            for message in self.message_repository.consume_queue():
239                print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
240        self._spec_component.transform_config(mutable_config)
241        return mutable_config
242
243    def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
244        config = self._config or config
245        return super().configure(config, temp_dir)
246
247    def _migrate_manifest(self) -> None:
248        """
249        This method is used to migrate the manifest. It should be called after the manifest has been validated.
250        The migration is done in place, so the original manifest is modified.
251
252        The original manifest is returned if any error occurs during migration.
253        """
254        if self._should_migrate:
255            manifest_migrator = ManifestMigrationHandler(self._source_config)
256            self._source_config = manifest_migrator.apply_migrations()
257            # validate migrated manifest against the declarative component schema
258            self._validate_source()
259
260    def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
261        """
262        Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
263        """
264        if "type" not in manifest:
265            manifest["type"] = "DeclarativeSource"
266
267        return manifest
268
269    @property
270    def message_repository(self) -> MessageRepository:
271        return self._message_repository
272
273    @property
274    def dynamic_streams(self) -> List[Dict[str, Any]]:
275        return self._dynamic_stream_configs(
276            manifest=self._source_config,
277            config=self._config,
278            with_dynamic_stream_name=True,
279        )
280
281    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
282        return self._constructor.get_model_deprecations()
283
284    @property
285    def connection_checker(self) -> ConnectionChecker:
286        check = self._source_config["check"]
287        if "type" not in check:
288            check["type"] = "CheckStream"
289        check_stream = self._constructor.create_component(
290            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
291            check,
292            dict(),
293            emit_connector_builder_messages=self._emit_connector_builder_messages,
294        )
295        if isinstance(check_stream, ConnectionChecker):
296            return check_stream
297        else:
298            raise ValueError(
299                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
300            )
301
302    def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
303        """
304        As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
305        Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
306        fully decouple this from the AbstractSource.
307        """
308        if self._spec_component:
309            self._spec_component.validate_config(config)
310
311        self._emit_manifest_debug_message(
312            extra_args={
313                "source_name": self.name,
314                "parsed_config": json.dumps(self._source_config),
315            }
316        )
317
318        stream_configs = (
319            self._stream_configs(self._source_config, config=config) + self.dynamic_streams
320        )
321
322        api_budget_model = self._source_config.get("api_budget")
323        if api_budget_model:
324            self._constructor.set_api_budget(api_budget_model, config)
325
326        source_streams = [
327            self._constructor.create_component(
328                (
329                    StateDelegatingStreamModel
330                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
331                    else DeclarativeStreamModel
332                ),
333                stream_config,
334                config,
335                emit_connector_builder_messages=self._emit_connector_builder_messages,
336            )
337            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
338        ]
339        return source_streams
340
341    @staticmethod
342    def _initialize_cache_for_parent_streams(
343        stream_configs: List[Dict[str, Any]],
344    ) -> List[Dict[str, Any]]:
345        parent_streams = set()
346
347        def update_with_cache_parent_configs(
348            parent_configs: list[dict[str, Any]],
349        ) -> None:
350            for parent_config in parent_configs:
351                parent_streams.add(parent_config["stream"]["name"])
352                if parent_config["stream"]["type"] == "StateDelegatingStream":
353                    parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
354                        "use_cache"
355                    ] = True
356                    parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
357                        "use_cache"
358                    ] = True
359                else:
360                    parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
361
362        for stream_config in stream_configs:
363            if stream_config.get("incremental_sync", {}).get("parent_stream"):
364                parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
365                stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
366                    "use_cache"
367                ] = True
368
369            elif stream_config.get("retriever", {}).get("partition_router", {}):
370                partition_router = stream_config["retriever"]["partition_router"]
371
372                if isinstance(partition_router, dict) and partition_router.get(
373                    "parent_stream_configs"
374                ):
375                    update_with_cache_parent_configs(partition_router["parent_stream_configs"])
376                elif isinstance(partition_router, list):
377                    for router in partition_router:
378                        if router.get("parent_stream_configs"):
379                            update_with_cache_parent_configs(router["parent_stream_configs"])
380
381        for stream_config in stream_configs:
382            if stream_config["name"] in parent_streams:
383                if stream_config["type"] == "StateDelegatingStream":
384                    stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
385                        True
386                    )
387                    stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
388                        True
389                    )
390                else:
391                    stream_config["retriever"]["requester"]["use_cache"] = True
392        return stream_configs
393
394    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
395        """
396        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
397        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
398        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
399        in the project root.
400        """
401        self._configure_logger_level(logger)
402        self._emit_manifest_debug_message(
403            extra_args={
404                "source_name": self.name,
405                "parsed_config": json.dumps(self._source_config),
406            }
407        )
408
409        return (
410            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
411        )
412
413    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
414        self._configure_logger_level(logger)
415        return super().check(logger, config)
416
417    def read(
418        self,
419        logger: logging.Logger,
420        config: Mapping[str, Any],
421        catalog: ConfiguredAirbyteCatalog,
422        state: Optional[List[AirbyteStateMessage]] = None,
423    ) -> Iterator[AirbyteMessage]:
424        self._configure_logger_level(logger)
425        yield from super().read(logger, config, catalog, state)
426
427    def _configure_logger_level(self, logger: logging.Logger) -> None:
428        """
429        Set the log level to logging.DEBUG if debug mode is enabled
430        """
431        if self._debug:
432            logger.setLevel(logging.DEBUG)
433
434    def _validate_source(self) -> None:
435        """
436        Validates the connector manifest against the declarative component schema
437        """
438
439        try:
440            validate(self._source_config, self._declarative_component_schema)
441        except ValidationError as e:
442            raise ValidationError(
443                "Validation against json schema defined in declarative_component_schema.yaml schema failed"
444            ) from e
445
446        cdk_version_str = metadata.version("airbyte_cdk")
447        cdk_version = self._parse_version(cdk_version_str, "airbyte-cdk")
448        manifest_version_str = self._source_config.get("version")
449        if manifest_version_str is None:
450            raise RuntimeError(
451                "Manifest version is not defined in the manifest. This is unexpected since it should be a required field. Please contact support."
452            )
453        manifest_version = self._parse_version(manifest_version_str, "manifest")
454
455        if (cdk_version.major, cdk_version.minor, cdk_version.micro) == (0, 0, 0):
456            # Skipping version compatibility check on unreleased dev branch
457            pass
458        elif (cdk_version.major, cdk_version.minor) < (
459            manifest_version.major,
460            manifest_version.minor,
461        ):
462            raise ValidationError(
463                f"The manifest version {manifest_version!s} is greater than the airbyte-cdk package version ({cdk_version!s}). Your "
464                f"manifest may contain features that are not in the current CDK version."
465            )
466        elif (manifest_version.major, manifest_version.minor) < (0, 29):
467            raise ValidationError(
468                f"The low-code framework was promoted to Beta in airbyte-cdk version 0.29.0 and contains many breaking changes to the "
469                f"language. The manifest version {manifest_version!s} is incompatible with the airbyte-cdk package version "
470                f"{cdk_version!s} which contains these breaking changes."
471            )
472
473    @staticmethod
474    def _parse_version(
475        version: str,
476        version_type: str,
477    ) -> Version:
478        """Takes a semantic version represented as a string and splits it into a tuple.
479
480        The fourth part (prerelease) is not returned in the tuple.
481
482        Returns:
483            Version: the parsed version object
484        """
485        try:
486            parsed_version = Version(version)
487        except InvalidVersion as ex:
488            raise ValidationError(
489                f"The {version_type} version '{version}' is not a valid version format."
490            ) from ex
491        else:
492            # No exception
493            return parsed_version
494
495    def _stream_configs(
496        self, manifest: Mapping[str, Any], config: Mapping[str, Any]
497    ) -> List[Dict[str, Any]]:
498        # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
499        stream_configs = []
500        for current_stream_config in manifest.get("streams", []):
501            if (
502                "type" in current_stream_config
503                and current_stream_config["type"] == "ConditionalStreams"
504            ):
505                interpolated_boolean = InterpolatedBoolean(
506                    condition=current_stream_config.get("condition"),
507                    parameters={},
508                )
509
510                if interpolated_boolean.eval(config=config):
511                    stream_configs.extend(current_stream_config.get("streams", []))
512            else:
513                if "type" not in current_stream_config:
514                    current_stream_config["type"] = "DeclarativeStream"
515                stream_configs.append(current_stream_config)
516        return stream_configs
517
518    def _dynamic_stream_configs(
519        self,
520        manifest: Mapping[str, Any],
521        config: Mapping[str, Any],
522        with_dynamic_stream_name: Optional[bool] = None,
523    ) -> List[Dict[str, Any]]:
524        dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", [])
525        dynamic_stream_configs: List[Dict[str, Any]] = []
526        seen_dynamic_streams: Set[str] = set()
527
528        for dynamic_definition_index, dynamic_definition in enumerate(dynamic_stream_definitions):
529            components_resolver_config = dynamic_definition["components_resolver"]
530
531            if not components_resolver_config:
532                raise ValueError(
533                    f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}"
534                )
535
536            resolver_type = components_resolver_config.get("type")
537            if not resolver_type:
538                raise ValueError(
539                    f"Missing 'type' in components resolver configuration: {components_resolver_config}"
540                )
541
542            if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING:
543                raise ValueError(
544                    f"Invalid components resolver type '{resolver_type}'. "
545                    f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}."
546                )
547
548            if "retriever" in components_resolver_config:
549                components_resolver_config["retriever"]["requester"]["use_cache"] = True
550
551            # Create a resolver for dynamic components based on type
552            if resolver_type == "HttpComponentsResolver":
553                components_resolver = self._constructor.create_component(
554                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
555                    component_definition=components_resolver_config,
556                    config=config,
557                    stream_name=dynamic_definition.get("name"),
558                )
559            else:
560                components_resolver = self._constructor.create_component(
561                    model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
562                    component_definition=components_resolver_config,
563                    config=config,
564                )
565
566            stream_template_config = dynamic_definition["stream_template"]
567
568            for dynamic_stream in components_resolver.resolve_components(
569                stream_template_config=stream_template_config
570            ):
571                # Get the use_parent_parameters configuration from the dynamic definition
572                # Default to True for backward compatibility, since connectors were already using it by default when this param was added
573                use_parent_parameters = dynamic_definition.get("use_parent_parameters", True)
574
575                dynamic_stream = {
576                    **ManifestComponentTransformer().propagate_types_and_parameters(
577                        "", dynamic_stream, {}, use_parent_parameters=use_parent_parameters
578                    )
579                }
580
581                if "type" not in dynamic_stream:
582                    dynamic_stream["type"] = "DeclarativeStream"
583
584                # Ensure that each stream is created with a unique name
585                name = dynamic_stream.get("name")
586
587                if with_dynamic_stream_name:
588                    dynamic_stream["dynamic_stream_name"] = dynamic_definition.get(
589                        "name", f"dynamic_stream_{dynamic_definition_index}"
590                    )
591
592                if not isinstance(name, str):
593                    raise ValueError(
594                        f"Expected stream name {name} to be a string, got {type(name)}."
595                    )
596
597                if name in seen_dynamic_streams:
598                    error_message = f"Dynamic streams list contains a duplicate name: {name}. Please contact Airbyte Support."
599                    failure_type = FailureType.system_error
600
601                    if resolver_type == "ConfigComponentsResolver":
602                        error_message = f"Dynamic streams list contains a duplicate name: {name}. Please check your configuration."
603                        failure_type = FailureType.config_error
604
605                    raise AirbyteTracedException(
606                        message=error_message,
607                        internal_message=error_message,
608                        failure_type=failure_type,
609                    )
610
611                seen_dynamic_streams.add(name)
612                dynamic_stream_configs.append(dynamic_stream)
613
614        return dynamic_stream_configs
615
616    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
617        self.logger.debug("declarative source created from manifest", extra=extra_args)

Declarative source defined by a manifest of low-code components that define source connector behavior

ManifestDeclarativeSource( source_config: Mapping[str, Any], *, config: Optional[Mapping[str, Any]] = None, debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[airbyte_cdk.sources.declarative.parsers.model_to_component_factory.ModelToComponentFactory] = None, migrate_manifest: Optional[bool] = False, normalize_manifest: Optional[bool] = False, config_path: Optional[str] = None)
102    def __init__(
103        self,
104        source_config: ConnectionDefinition,
105        *,
106        config: Mapping[str, Any] | None = None,
107        debug: bool = False,
108        emit_connector_builder_messages: bool = False,
109        component_factory: Optional[ModelToComponentFactory] = None,
110        migrate_manifest: Optional[bool] = False,
111        normalize_manifest: Optional[bool] = False,
112        config_path: Optional[str] = None,
113    ) -> None:
114        """
115        Args:
116            config: The provided config dict.
117            source_config: The manifest of low-code components that describe the source connector.
118            debug: True if debug mode is enabled.
119            emit_connector_builder_messages: True if messages should be emitted to the connector builder.
120            component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
121            normalize_manifest: Optional flag to indicate if the manifest should be normalized.
122            config_path: Optional path to the config file.
123        """
124        self.logger = logging.getLogger(f"airbyte.{self.name}")
125        self._should_normalize = normalize_manifest
126        self._should_migrate = migrate_manifest
127        self._declarative_component_schema = _get_declarative_component_schema()
128        # If custom components are needed, locate and/or register them.
129        self.components_module: ModuleType | None = get_registered_components_module(config=config)
130        # set additional attributes
131        self._debug = debug
132        self._emit_connector_builder_messages = emit_connector_builder_messages
133        self._constructor = (
134            component_factory
135            if component_factory
136            else ModelToComponentFactory(
137                emit_connector_builder_messages=emit_connector_builder_messages,
138                max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
139            )
140        )
141        self._message_repository = self._constructor.get_message_repository()
142        self._slice_logger: SliceLogger = (
143            AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
144        )
145
146        # resolve all components in the manifest
147        self._source_config = self._pre_process_manifest(dict(source_config))
148        # validate resolved manifest against the declarative component schema
149        self._validate_source()
150        # apply additional post-processing to the manifest
151        self._post_process_manifest()
152
153        spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
154        self._spec_component: Optional[Spec] = (
155            self._constructor.create_component(SpecModel, spec, dict()) if spec else None
156        )
157        self._config = self._migrate_and_transform_config(config_path, config) or {}
Arguments:
  • config: The provided config dict.
  • source_config: The manifest of low-code components that describe the source connector.
  • debug: True if debug mode is enabled.
  • emit_connector_builder_messages: True if messages should be emitted to the connector builder.
  • component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
  • normalize_manifest: Optional flag to indicate if the manifest should be normalized.
  • config_path: Optional path to the config file.
logger
components_module: module | None
resolved_manifest: Mapping[str, Any]
159    @property
160    def resolved_manifest(self) -> Mapping[str, Any]:
161        """
162        Returns the resolved manifest configuration for the source.
163
164        This property provides access to the internal source configuration as a mapping,
165        which contains all settings and parameters required to define the source's behavior.
166
167        Returns:
168            Mapping[str, Any]: The resolved source configuration manifest.
169        """
170        return self._source_config

Returns the resolved manifest configuration for the source.

This property provides access to the internal source configuration as a mapping, which contains all settings and parameters required to define the source's behavior.

Returns:

Mapping[str, Any]: The resolved source configuration manifest.

message_repository: airbyte_cdk.MessageRepository
269    @property
270    def message_repository(self) -> MessageRepository:
271        return self._message_repository
dynamic_streams: List[Dict[str, Any]]
273    @property
274    def dynamic_streams(self) -> List[Dict[str, Any]]:
275        return self._dynamic_stream_configs(
276            manifest=self._source_config,
277            config=self._config,
278            with_dynamic_stream_name=True,
279        )
def deprecation_warnings(self) -> List[airbyte_cdk.connector_builder.models.LogMessage]:
281    def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]:
282        return self._constructor.get_model_deprecations()

Returns a list of deprecation warnings for the source.

284    @property
285    def connection_checker(self) -> ConnectionChecker:
286        check = self._source_config["check"]
287        if "type" not in check:
288            check["type"] = "CheckStream"
289        check_stream = self._constructor.create_component(
290            COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]],
291            check,
292            dict(),
293            emit_connector_builder_messages=self._emit_connector_builder_messages,
294        )
295        if isinstance(check_stream, ConnectionChecker):
296            return check_stream
297        else:
298            raise ValueError(
299                f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
300            )

Returns the ConnectionChecker to use for the check operation

def streams( self, config: Mapping[str, Any]) -> List[Union[airbyte_cdk.Stream, airbyte_cdk.sources.streams.concurrent.abstract_stream.AbstractStream]]:
302    def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]:  # type: ignore  # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder
303        """
304        As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream).
305        Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to
306        fully decouple this from the AbstractSource.
307        """
308        if self._spec_component:
309            self._spec_component.validate_config(config)
310
311        self._emit_manifest_debug_message(
312            extra_args={
313                "source_name": self.name,
314                "parsed_config": json.dumps(self._source_config),
315            }
316        )
317
318        stream_configs = (
319            self._stream_configs(self._source_config, config=config) + self.dynamic_streams
320        )
321
322        api_budget_model = self._source_config.get("api_budget")
323        if api_budget_model:
324            self._constructor.set_api_budget(api_budget_model, config)
325
326        source_streams = [
327            self._constructor.create_component(
328                (
329                    StateDelegatingStreamModel
330                    if stream_config.get("type") == StateDelegatingStreamModel.__name__
331                    else DeclarativeStreamModel
332                ),
333                stream_config,
334                config,
335                emit_connector_builder_messages=self._emit_connector_builder_messages,
336            )
337            for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
338        ]
339        return source_streams

As a migration step, this method will return both legacy stream (Stream) and concurrent stream (AbstractStream). Once the migration is done, we can probably have this method throw "not implemented" as we figure out how to fully decouple this from the AbstractSource.

def spec( self, logger: logging.Logger) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConnectorSpecification:
394    def spec(self, logger: logging.Logger) -> ConnectorSpecification:
395        """
396        Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
397        configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
398        will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
399        in the project root.
400        """
401        self._configure_logger_level(logger)
402        self._emit_manifest_debug_message(
403            extra_args={
404                "source_name": self.name,
405                "parsed_config": json.dumps(self._source_config),
406            }
407        )
408
409        return (
410            self._spec_component.generate_spec() if self._spec_component else super().spec(logger)
411        )

Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" in the project root.

def check( self, logger: logging.Logger, config: Mapping[str, Any]) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteConnectionStatus:
413    def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
414        self._configure_logger_level(logger)
415        return super().check(logger, config)

Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.

def read( self, logger: logging.Logger, config: Mapping[str, Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None) -> Iterator[airbyte_cdk.AirbyteMessage]:
417    def read(
418        self,
419        logger: logging.Logger,
420        config: Mapping[str, Any],
421        catalog: ConfiguredAirbyteCatalog,
422        state: Optional[List[AirbyteStateMessage]] = None,
423    ) -> Iterator[AirbyteMessage]:
424        self._configure_logger_level(logger)
425        yield from super().read(logger, config, catalog, state)

Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.