airbyte.sources.base

Base class implementation for sources.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Base class implementation for sources."""
  3
  4from __future__ import annotations
  5
  6import json
  7import warnings
  8from pathlib import Path
  9from typing import TYPE_CHECKING, Any, Literal
 10
 11import yaml
 12from rich import print  # noqa: A004  # Allow shadowing the built-in
 13from rich.syntax import Syntax
 14
 15from airbyte_protocol.models import (
 16    AirbyteCatalog,
 17    AirbyteMessage,
 18    ConfiguredAirbyteCatalog,
 19    ConfiguredAirbyteStream,
 20    DestinationSyncMode,
 21    SyncMode,
 22    Type,
 23)
 24
 25from airbyte import exceptions as exc
 26from airbyte._connector_base import ConnectorBase
 27from airbyte._message_iterators import AirbyteMessageIterator
 28from airbyte._util.temp_files import as_temp_files
 29from airbyte.caches.util import get_default_cache
 30from airbyte.datasets._lazy import LazyDataset
 31from airbyte.progress import ProgressStyle, ProgressTracker
 32from airbyte.records import StreamRecord, StreamRecordHandler
 33from airbyte.results import ReadResult
 34from airbyte.shared.catalog_providers import CatalogProvider
 35from airbyte.strategies import WriteStrategy
 36
 37
 38if TYPE_CHECKING:
 39    from collections.abc import Generator, Iterable, Iterator
 40
 41    from airbyte_cdk import ConnectorSpecification
 42    from airbyte_protocol.models import AirbyteStream
 43
 44    from airbyte._executors.base import Executor
 45    from airbyte.caches import CacheBase
 46    from airbyte.callbacks import ConfigChangeCallback
 47    from airbyte.documents import Document
 48    from airbyte.shared.state_providers import StateProviderBase
 49    from airbyte.shared.state_writers import StateWriterBase
 50
 51
 52class Source(ConnectorBase):
 53    """A class representing a source that can be called."""
 54
 55    connector_type: Literal["source"] = "source"
 56
 57    def __init__(
 58        self,
 59        executor: Executor,
 60        name: str,
 61        config: dict[str, Any] | None = None,
 62        *,
 63        config_change_callback: ConfigChangeCallback | None = None,
 64        streams: str | list[str] | None = None,
 65        validate: bool = False,
 66    ) -> None:
 67        """Initialize the source.
 68
 69        If config is provided, it will be validated against the spec if validate is True.
 70        """
 71        self._to_be_selected_streams: list[str] | str = []
 72        """Used to hold selection criteria before catalog is known."""
 73
 74        super().__init__(
 75            executor=executor,
 76            name=name,
 77            config=config,
 78            config_change_callback=config_change_callback,
 79            validate=validate,
 80        )
 81        self._config_dict: dict[str, Any] | None = None
 82        self._last_log_messages: list[str] = []
 83        self._discovered_catalog: AirbyteCatalog | None = None
 84        self._selected_stream_names: list[str] = []
 85        if config is not None:
 86            self.set_config(config, validate=validate)
 87        if streams is not None:
 88            self.select_streams(streams)
 89
 90        self._deployed_api_root: str | None = None
 91        self._deployed_workspace_id: str | None = None
 92        self._deployed_source_id: str | None = None
 93
 94    def set_streams(self, streams: list[str]) -> None:
 95        """Deprecated. See select_streams()."""
 96        warnings.warn(
 97            "The 'set_streams' method is deprecated and will be removed in a future version. "
 98            "Please use the 'select_streams' method instead.",
 99            DeprecationWarning,
100            stacklevel=2,
101        )
102        self.select_streams(streams)
103
104    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
105        """Logs a warning message indicating stream selection which are not selected yet."""
106        if streams == "*":
107            print(
108                "Warning: Config is not set yet. All streams will be selected after config is set."
109            )
110        else:
111            print(
112                "Warning: Config is not set yet. "
113                f"Streams to be selected after config is set: {streams}"
114            )
115
116    def select_all_streams(self) -> None:
117        """Select all streams.
118
119        This is a more streamlined equivalent to:
120        > source.select_streams(source.get_available_streams()).
121        """
122        if self._config_dict is None:
123            self._to_be_selected_streams = "*"
124            self._log_warning_preselected_stream(self._to_be_selected_streams)
125            return
126
127        self._selected_stream_names = self.get_available_streams()
128
129    def select_streams(self, streams: str | list[str]) -> None:
130        """Select the stream names that should be read from the connector.
131
132        Args:
133            streams: A list of stream names to select. If set to "*", all streams will be selected.
134
135        Currently, if this is not set, all streams will be read.
136        """
137        if self._config_dict is None:
138            self._to_be_selected_streams = streams
139            self._log_warning_preselected_stream(streams)
140            return
141
142        if streams == "*":
143            self.select_all_streams()
144            return
145
146        if isinstance(streams, str):
147            # If a single stream is provided, convert it to a one-item list
148            streams = [streams]
149
150        available_streams = self.get_available_streams()
151        for stream in streams:
152            if stream not in available_streams:
153                raise exc.AirbyteStreamNotFoundError(
154                    stream_name=stream,
155                    connector_name=self.name,
156                    available_streams=available_streams,
157                )
158        self._selected_stream_names = streams
159
160    def get_selected_streams(self) -> list[str]:
161        """Get the selected streams.
162
163        If no streams are selected, return an empty list.
164        """
165        return self._selected_stream_names
166
167    def set_config(
168        self,
169        config: dict[str, Any],
170        *,
171        validate: bool = True,
172    ) -> None:
173        """Set the config for the connector.
174
175        If validate is True, raise an exception if the config fails validation.
176
177        If validate is False, validation will be deferred until check() or validate_config()
178        is called.
179        """
180        if validate:
181            self.validate_config(config)
182
183        self._config_dict = config
184
185        if self._to_be_selected_streams:
186            self.select_streams(self._to_be_selected_streams)
187            self._to_be_selected_streams = []
188
189    def get_config(self) -> dict[str, Any]:
190        """Get the config for the connector."""
191        return self._config
192
193    @property
194    def _config(self) -> dict[str, Any]:
195        if self._config_dict is None:
196            raise exc.AirbyteConnectorConfigurationMissingError(
197                connector_name=self.name,
198                guidance="Provide via get_source() or set_config()",
199            )
200        return self._config_dict
201
202    def _discover(self) -> AirbyteCatalog:
203        """Call discover on the connector.
204
205        This involves the following steps:
206        - Write the config to a temporary file
207        - execute the connector with discover --config <config_file>
208        - Listen to the messages and return the first AirbyteCatalog that comes along.
209        - Make sure the subprocess is killed when the function returns.
210        """
211        with as_temp_files([self._config]) as [config_file]:
212            for msg in self._execute(["discover", "--config", config_file]):
213                if msg.type == Type.CATALOG and msg.catalog:
214                    return msg.catalog
215            raise exc.AirbyteConnectorMissingCatalogError(
216                connector_name=self.name,
217                log_text=self._last_log_messages,
218            )
219
220    def get_available_streams(self) -> list[str]:
221        """Get the available streams from the spec."""
222        return [s.name for s in self.discovered_catalog.streams]
223
224    def _get_incremental_stream_names(self) -> list[str]:
225        """Get the name of streams that support incremental sync."""
226        return [
227            stream.name
228            for stream in self.discovered_catalog.streams
229            if SyncMode.incremental in stream.supported_sync_modes
230        ]
231
232    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
233        """Call spec on the connector.
234
235        This involves the following steps:
236        * execute the connector with spec
237        * Listen to the messages and return the first AirbyteCatalog that comes along.
238        * Make sure the subprocess is killed when the function returns.
239        """
240        if force_refresh or self._spec is None:
241            for msg in self._execute(["spec"]):
242                if msg.type == Type.SPEC and msg.spec:
243                    self._spec = msg.spec
244                    break
245
246        if self._spec:
247            return self._spec
248
249        raise exc.AirbyteConnectorMissingSpecError(
250            connector_name=self.name,
251            log_text=self._last_log_messages,
252        )
253
254    @property
255    def config_spec(self) -> dict[str, Any]:
256        """Generate a configuration spec for this connector, as a JSON Schema definition.
257
258        This function generates a JSON Schema dictionary with configuration specs for the
259        current connector, as a dictionary.
260
261        Returns:
262            dict: The JSON Schema configuration spec as a dictionary.
263        """
264        return self._get_spec(force_refresh=True).connectionSpecification
265
266    def print_config_spec(
267        self,
268        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
269        *,
270        output_file: Path | str | None = None,
271    ) -> None:
272        """Print the configuration spec for this connector.
273
274        Args:
275            format: The format to print the spec in. Must be "yaml" or "json".
276            output_file: Optional. If set, the spec will be written to the given file path.
277                Otherwise, it will be printed to the console.
278        """
279        if format not in {"yaml", "json"}:
280            raise exc.PyAirbyteInputError(
281                message="Invalid format. Expected 'yaml' or 'json'",
282                input_value=format,
283            )
284        if isinstance(output_file, str):
285            output_file = Path(output_file)
286
287        if format == "yaml":
288            content = yaml.dump(self.config_spec, indent=2)
289        elif format == "json":
290            content = json.dumps(self.config_spec, indent=2)
291
292        if output_file:
293            output_file.write_text(content)
294            return
295
296        syntax_highlighted = Syntax(content, format)
297        print(syntax_highlighted)
298
299    @property
300    def _yaml_spec(self) -> str:
301        """Get the spec as a yaml string.
302
303        For now, the primary use case is for writing and debugging a valid config for a source.
304
305        This is private for now because we probably want better polish before exposing this
306        as a stable interface. This will also get easier when we have docs links with this info
307        for each connector.
308        """
309        spec_obj: ConnectorSpecification = self._get_spec()
310        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
311        # convert to a yaml string
312        return yaml.dump(spec_dict)
313
314    @property
315    def docs_url(self) -> str:
316        """Get the URL to the connector's documentation."""
317        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
318            "source-", ""
319        )
320
321    @property
322    def discovered_catalog(self) -> AirbyteCatalog:
323        """Get the raw catalog for the given streams.
324
325        If the catalog is not yet known, we call discover to get it.
326        """
327        if self._discovered_catalog is None:
328            self._discovered_catalog = self._discover()
329
330        return self._discovered_catalog
331
332    @property
333    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
334        """Get the configured catalog for the given streams.
335
336        If the raw catalog is not yet known, we call discover to get it.
337
338        If no specific streams are selected, we return a catalog that syncs all available streams.
339
340        TODO: We should consider disabling by default the streams that the connector would
341        disable by default. (For instance, streams that require a premium license are sometimes
342        disabled by default within the connector.)
343        """
344        # Ensure discovered catalog is cached before we start
345        _ = self.discovered_catalog
346
347        # Filter for selected streams if set, otherwise use all available streams:
348        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
349        return self.get_configured_catalog(streams=streams_filter)
350
351    def get_configured_catalog(
352        self,
353        streams: Literal["*"] | list[str] | None = None,
354    ) -> ConfiguredAirbyteCatalog:
355        """Get a configured catalog for the given streams.
356
357        If no streams are provided, the selected streams will be used. If no streams are selected,
358        all available streams will be used.
359
360        If '*' is provided, all available streams will be used.
361        """
362        selected_streams: list[str] = []
363        if streams is None:
364            selected_streams = self._selected_stream_names or self.get_available_streams()
365        elif streams == "*":
366            selected_streams = self.get_available_streams()
367        elif isinstance(streams, list):
368            selected_streams = streams
369        else:
370            raise exc.PyAirbyteInputError(
371                message="Invalid streams argument.",
372                input_value=streams,
373            )
374
375        return ConfiguredAirbyteCatalog(
376            streams=[
377                ConfiguredAirbyteStream(
378                    stream=stream,
379                    destination_sync_mode=DestinationSyncMode.overwrite,
380                    primary_key=stream.source_defined_primary_key,
381                    sync_mode=SyncMode.incremental,
382                )
383                for stream in self.discovered_catalog.streams
384                if stream.name in selected_streams
385            ],
386        )
387
388    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
389        """Return the JSON Schema spec for the specified stream name."""
390        catalog: AirbyteCatalog = self.discovered_catalog
391        found: list[AirbyteStream] = [
392            stream for stream in catalog.streams if stream.name == stream_name
393        ]
394
395        if len(found) == 0:
396            raise exc.PyAirbyteInputError(
397                message="Stream name does not exist in catalog.",
398                input_value=stream_name,
399            )
400
401        if len(found) > 1:
402            raise exc.PyAirbyteInternalError(
403                message="Duplicate streams found with the same name.",
404                context={
405                    "found_streams": found,
406                },
407            )
408
409        return found[0].json_schema
410
411    def get_records(
412        self,
413        stream: str,
414        *,
415        normalize_field_names: bool = False,
416        prune_undeclared_fields: bool = True,
417    ) -> LazyDataset:
418        """Read a stream from the connector.
419
420        Args:
421            stream: The name of the stream to read.
422            normalize_field_names: When `True`, field names will be normalized to lower case, with
423                special characters removed. This matches the behavior of PyAirbyte caches and most
424                Airbyte destinations.
425            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
426                which generally matches the behavior of PyAirbyte caches and most Airbyte
427                destinations, specifically when you expect the catalog may be stale. You can disable
428                this to keep all fields in the records.
429
430        This involves the following steps:
431        * Call discover to get the catalog
432        * Generate a configured catalog that syncs the given stream in full_refresh mode
433        * Write the configured catalog and the config to a temporary file
434        * execute the connector with read --config <config_file> --catalog <catalog_file>
435        * Listen to the messages and return the first AirbyteRecordMessages that come along.
436        * Make sure the subprocess is killed when the function returns.
437        """
438        discovered_catalog: AirbyteCatalog = self.discovered_catalog
439        configured_catalog = ConfiguredAirbyteCatalog(
440            streams=[
441                ConfiguredAirbyteStream(
442                    stream=s,
443                    sync_mode=SyncMode.full_refresh,
444                    destination_sync_mode=DestinationSyncMode.overwrite,
445                )
446                for s in discovered_catalog.streams
447                if s.name == stream
448            ],
449        )
450        if len(configured_catalog.streams) == 0:
451            raise exc.PyAirbyteInputError(
452                message="Requested stream does not exist.",
453                context={
454                    "stream": stream,
455                    "available_streams": self.get_available_streams(),
456                    "connector_name": self.name,
457                },
458            ) from KeyError(stream)
459
460        configured_stream = configured_catalog.streams[0]
461
462        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
463            yield from records
464
465        stream_record_handler = StreamRecordHandler(
466            json_schema=self.get_stream_json_schema(stream),
467            prune_extra_fields=prune_undeclared_fields,
468            normalize_keys=normalize_field_names,
469        )
470
471        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
472        progress_tracker = ProgressTracker(
473            ProgressStyle.PLAIN,
474            source=self,
475            cache=None,
476            destination=None,
477            expected_streams=[stream],
478        )
479
480        iterator: Iterator[dict[str, Any]] = (
481            StreamRecord.from_record_message(
482                record_message=record.record,
483                stream_record_handler=stream_record_handler,
484            )
485            for record in self._read_with_catalog(
486                catalog=configured_catalog,
487                progress_tracker=progress_tracker,
488            )
489            if record.record
490        )
491        progress_tracker.log_success()
492        return LazyDataset(
493            iterator,
494            stream_metadata=configured_stream,
495        )
496
497    def get_documents(
498        self,
499        stream: str,
500        title_property: str | None = None,
501        content_properties: list[str] | None = None,
502        metadata_properties: list[str] | None = None,
503        *,
504        render_metadata: bool = False,
505    ) -> Iterable[Document]:
506        """Read a stream from the connector and return the records as documents.
507
508        If metadata_properties is not set, all properties that are not content will be added to
509        the metadata.
510
511        If render_metadata is True, metadata will be rendered in the document, as well as the
512        the main content.
513        """
514        return self.get_records(stream).to_documents(
515            title_property=title_property,
516            content_properties=content_properties,
517            metadata_properties=metadata_properties,
518            render_metadata=render_metadata,
519        )
520
521    def _get_airbyte_message_iterator(
522        self,
523        *,
524        streams: Literal["*"] | list[str] | None = None,
525        state_provider: StateProviderBase | None = None,
526        progress_tracker: ProgressTracker,
527        force_full_refresh: bool = False,
528    ) -> AirbyteMessageIterator:
529        """Get an AirbyteMessageIterator for this source."""
530        return AirbyteMessageIterator(
531            self._read_with_catalog(
532                catalog=self.get_configured_catalog(streams=streams),
533                state=state_provider if not force_full_refresh else None,
534                progress_tracker=progress_tracker,
535            )
536        )
537
538    def _read_with_catalog(
539        self,
540        catalog: ConfiguredAirbyteCatalog,
541        progress_tracker: ProgressTracker,
542        state: StateProviderBase | None = None,
543    ) -> Generator[AirbyteMessage, None, None]:
544        """Call read on the connector.
545
546        This involves the following steps:
547        * Write the config to a temporary file
548        * execute the connector with read --config <config_file> --catalog <catalog_file>
549        * Listen to the messages and return the AirbyteRecordMessages that come along.
550        * Send out telemetry on the performed sync (with information about which source was used and
551          the type of the cache)
552        """
553        with as_temp_files(
554            [
555                self._config,
556                catalog.model_dump_json(),
557                state.to_state_input_file_text() if state else "[]",
558            ]
559        ) as [
560            config_file,
561            catalog_file,
562            state_file,
563        ]:
564            message_generator = self._execute(
565                [
566                    "read",
567                    "--config",
568                    config_file,
569                    "--catalog",
570                    catalog_file,
571                    "--state",
572                    state_file,
573                ],
574                progress_tracker=progress_tracker,
575            )
576            yield from progress_tracker.tally_records_read(message_generator)
577        progress_tracker.log_read_complete()
578
579    def _peek_airbyte_message(
580        self,
581        message: AirbyteMessage,
582        *,
583        raise_on_error: bool = True,
584    ) -> None:
585        """Process an Airbyte message.
586
587        This method handles reading Airbyte messages and taking action, if needed, based on the
588        message type. For instance, log messages are logged, records are tallied, and errors are
589        raised as exceptions if `raise_on_error` is True.
590
591        Raises:
592            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
593        """
594        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
595
596    def _log_incremental_streams(
597        self,
598        *,
599        incremental_streams: set[str] | None = None,
600    ) -> None:
601        """Log the streams which are using incremental sync mode."""
602        log_message = (
603            "The following streams are currently using incremental sync:\n"
604            f"{incremental_streams}\n"
605            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
606        )
607        print(log_message)
608
609    def read(
610        self,
611        cache: CacheBase | None = None,
612        *,
613        streams: str | list[str] | None = None,
614        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
615        force_full_refresh: bool = False,
616        skip_validation: bool = False,
617    ) -> ReadResult:
618        """Read from the connector and write to the cache.
619
620        Args:
621            cache: The cache to write to. If not set, a default cache will be used.
622            streams: Optional if already set. A list of stream names to select for reading. If set
623                to "*", all streams will be selected.
624            write_strategy: The strategy to use when writing to the cache. If a string, it must be
625                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
626                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
627                WriteStrategy.AUTO.
628            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
629                streams will be read in incremental mode if supported by the connector. This option
630                must be True when using the "replace" strategy.
631            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
632                running the connector. This can be helpful in debugging, when you want to send
633                configurations to the connector that otherwise might be rejected by JSON Schema
634                validation rules.
635        """
636        cache = cache or get_default_cache()
637        progress_tracker = ProgressTracker(
638            source=self,
639            cache=cache,
640            destination=None,
641            expected_streams=None,  # Will be set later
642        )
643
644        # Set up state provider if not in full refresh mode
645        if force_full_refresh:
646            state_provider: StateProviderBase | None = None
647        else:
648            state_provider = cache.get_state_provider(
649                source_name=self._name,
650            )
651        state_writer = cache.get_state_writer(source_name=self._name)
652
653        if streams:
654            self.select_streams(streams)
655
656        if not self._selected_stream_names:
657            raise exc.PyAirbyteNoStreamsSelectedError(
658                connector_name=self.name,
659                available_streams=self.get_available_streams(),
660            )
661
662        try:
663            result = self._read_to_cache(
664                cache=cache,
665                catalog_provider=CatalogProvider(self.configured_catalog),
666                stream_names=self._selected_stream_names,
667                state_provider=state_provider,
668                state_writer=state_writer,
669                write_strategy=write_strategy,
670                force_full_refresh=force_full_refresh,
671                skip_validation=skip_validation,
672                progress_tracker=progress_tracker,
673            )
674        except exc.PyAirbyteInternalError as ex:
675            progress_tracker.log_failure(exception=ex)
676            raise exc.AirbyteConnectorFailedError(
677                connector_name=self.name,
678                log_text=self._last_log_messages,
679            ) from ex
680        except Exception as ex:
681            progress_tracker.log_failure(exception=ex)
682            raise
683
684        progress_tracker.log_success()
685        return result
686
687    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
688        self,
689        cache: CacheBase,
690        *,
691        catalog_provider: CatalogProvider,
692        stream_names: list[str],
693        state_provider: StateProviderBase | None,
694        state_writer: StateWriterBase | None,
695        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
696        force_full_refresh: bool = False,
697        skip_validation: bool = False,
698        progress_tracker: ProgressTracker,
699    ) -> ReadResult:
700        """Internal read method."""
701        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
702            warnings.warn(
703                message=(
704                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
705                    "could result in data loss. "
706                    "To silence this warning, use the following: "
707                    'warnings.filterwarnings("ignore", '
708                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
709                ),
710                category=exc.PyAirbyteDataLossWarning,
711                stacklevel=1,
712            )
713        if isinstance(write_strategy, str):
714            try:
715                write_strategy = WriteStrategy(write_strategy)
716            except ValueError:
717                raise exc.PyAirbyteInputError(
718                    message="Invalid strategy",
719                    context={
720                        "write_strategy": write_strategy,
721                        "available_strategies": [s.value for s in WriteStrategy],
722                    },
723                ) from None
724
725        # Run optional validation step
726        if not skip_validation:
727            self.validate_config()
728
729        # Log incremental stream if incremental streams are known
730        if state_provider and state_provider.known_stream_names:
731            # Retrieve set of the known streams support which support incremental sync
732            incremental_streams = (
733                set(self._get_incremental_stream_names())
734                & state_provider.known_stream_names
735                & set(self.get_selected_streams())
736            )
737            if incremental_streams:
738                self._log_incremental_streams(incremental_streams=incremental_streams)
739
740        airbyte_message_iterator = AirbyteMessageIterator(
741            self._read_with_catalog(
742                catalog=catalog_provider.configured_catalog,
743                state=state_provider,
744                progress_tracker=progress_tracker,
745            )
746        )
747        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
748            stdin=airbyte_message_iterator,
749            catalog_provider=catalog_provider,
750            write_strategy=write_strategy,
751            state_writer=state_writer,
752            progress_tracker=progress_tracker,
753        )
754
755        # Flush the WAL, if applicable
756        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
757
758        return ReadResult(
759            source_name=self.name,
760            progress_tracker=progress_tracker,
761            processed_streams=stream_names,
762            cache=cache,
763        )
764
765
766__all__ = [
767    "Source",
768]
class Source(airbyte._connector_base.ConnectorBase):
 53class Source(ConnectorBase):
 54    """A class representing a source that can be called."""
 55
 56    connector_type: Literal["source"] = "source"
 57
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67    ) -> None:
 68        """Initialize the source.
 69
 70        If config is provided, it will be validated against the spec if validate is True.
 71        """
 72        self._to_be_selected_streams: list[str] | str = []
 73        """Used to hold selection criteria before catalog is known."""
 74
 75        super().__init__(
 76            executor=executor,
 77            name=name,
 78            config=config,
 79            config_change_callback=config_change_callback,
 80            validate=validate,
 81        )
 82        self._config_dict: dict[str, Any] | None = None
 83        self._last_log_messages: list[str] = []
 84        self._discovered_catalog: AirbyteCatalog | None = None
 85        self._selected_stream_names: list[str] = []
 86        if config is not None:
 87            self.set_config(config, validate=validate)
 88        if streams is not None:
 89            self.select_streams(streams)
 90
 91        self._deployed_api_root: str | None = None
 92        self._deployed_workspace_id: str | None = None
 93        self._deployed_source_id: str | None = None
 94
 95    def set_streams(self, streams: list[str]) -> None:
 96        """Deprecated. See select_streams()."""
 97        warnings.warn(
 98            "The 'set_streams' method is deprecated and will be removed in a future version. "
 99            "Please use the 'select_streams' method instead.",
100            DeprecationWarning,
101            stacklevel=2,
102        )
103        self.select_streams(streams)
104
105    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
106        """Logs a warning message indicating stream selection which are not selected yet."""
107        if streams == "*":
108            print(
109                "Warning: Config is not set yet. All streams will be selected after config is set."
110            )
111        else:
112            print(
113                "Warning: Config is not set yet. "
114                f"Streams to be selected after config is set: {streams}"
115            )
116
117    def select_all_streams(self) -> None:
118        """Select all streams.
119
120        This is a more streamlined equivalent to:
121        > source.select_streams(source.get_available_streams()).
122        """
123        if self._config_dict is None:
124            self._to_be_selected_streams = "*"
125            self._log_warning_preselected_stream(self._to_be_selected_streams)
126            return
127
128        self._selected_stream_names = self.get_available_streams()
129
130    def select_streams(self, streams: str | list[str]) -> None:
131        """Select the stream names that should be read from the connector.
132
133        Args:
134            streams: A list of stream names to select. If set to "*", all streams will be selected.
135
136        Currently, if this is not set, all streams will be read.
137        """
138        if self._config_dict is None:
139            self._to_be_selected_streams = streams
140            self._log_warning_preselected_stream(streams)
141            return
142
143        if streams == "*":
144            self.select_all_streams()
145            return
146
147        if isinstance(streams, str):
148            # If a single stream is provided, convert it to a one-item list
149            streams = [streams]
150
151        available_streams = self.get_available_streams()
152        for stream in streams:
153            if stream not in available_streams:
154                raise exc.AirbyteStreamNotFoundError(
155                    stream_name=stream,
156                    connector_name=self.name,
157                    available_streams=available_streams,
158                )
159        self._selected_stream_names = streams
160
161    def get_selected_streams(self) -> list[str]:
162        """Get the selected streams.
163
164        If no streams are selected, return an empty list.
165        """
166        return self._selected_stream_names
167
168    def set_config(
169        self,
170        config: dict[str, Any],
171        *,
172        validate: bool = True,
173    ) -> None:
174        """Set the config for the connector.
175
176        If validate is True, raise an exception if the config fails validation.
177
178        If validate is False, validation will be deferred until check() or validate_config()
179        is called.
180        """
181        if validate:
182            self.validate_config(config)
183
184        self._config_dict = config
185
186        if self._to_be_selected_streams:
187            self.select_streams(self._to_be_selected_streams)
188            self._to_be_selected_streams = []
189
190    def get_config(self) -> dict[str, Any]:
191        """Get the config for the connector."""
192        return self._config
193
194    @property
195    def _config(self) -> dict[str, Any]:
196        if self._config_dict is None:
197            raise exc.AirbyteConnectorConfigurationMissingError(
198                connector_name=self.name,
199                guidance="Provide via get_source() or set_config()",
200            )
201        return self._config_dict
202
203    def _discover(self) -> AirbyteCatalog:
204        """Call discover on the connector.
205
206        This involves the following steps:
207        - Write the config to a temporary file
208        - execute the connector with discover --config <config_file>
209        - Listen to the messages and return the first AirbyteCatalog that comes along.
210        - Make sure the subprocess is killed when the function returns.
211        """
212        with as_temp_files([self._config]) as [config_file]:
213            for msg in self._execute(["discover", "--config", config_file]):
214                if msg.type == Type.CATALOG and msg.catalog:
215                    return msg.catalog
216            raise exc.AirbyteConnectorMissingCatalogError(
217                connector_name=self.name,
218                log_text=self._last_log_messages,
219            )
220
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]
224
225    def _get_incremental_stream_names(self) -> list[str]:
226        """Get the name of streams that support incremental sync."""
227        return [
228            stream.name
229            for stream in self.discovered_catalog.streams
230            if SyncMode.incremental in stream.supported_sync_modes
231        ]
232
233    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
234        """Call spec on the connector.
235
236        This involves the following steps:
237        * execute the connector with spec
238        * Listen to the messages and return the first AirbyteCatalog that comes along.
239        * Make sure the subprocess is killed when the function returns.
240        """
241        if force_refresh or self._spec is None:
242            for msg in self._execute(["spec"]):
243                if msg.type == Type.SPEC and msg.spec:
244                    self._spec = msg.spec
245                    break
246
247        if self._spec:
248            return self._spec
249
250        raise exc.AirbyteConnectorMissingSpecError(
251            connector_name=self.name,
252            log_text=self._last_log_messages,
253        )
254
255    @property
256    def config_spec(self) -> dict[str, Any]:
257        """Generate a configuration spec for this connector, as a JSON Schema definition.
258
259        This function generates a JSON Schema dictionary with configuration specs for the
260        current connector, as a dictionary.
261
262        Returns:
263            dict: The JSON Schema configuration spec as a dictionary.
264        """
265        return self._get_spec(force_refresh=True).connectionSpecification
266
267    def print_config_spec(
268        self,
269        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
270        *,
271        output_file: Path | str | None = None,
272    ) -> None:
273        """Print the configuration spec for this connector.
274
275        Args:
276            format: The format to print the spec in. Must be "yaml" or "json".
277            output_file: Optional. If set, the spec will be written to the given file path.
278                Otherwise, it will be printed to the console.
279        """
280        if format not in {"yaml", "json"}:
281            raise exc.PyAirbyteInputError(
282                message="Invalid format. Expected 'yaml' or 'json'",
283                input_value=format,
284            )
285        if isinstance(output_file, str):
286            output_file = Path(output_file)
287
288        if format == "yaml":
289            content = yaml.dump(self.config_spec, indent=2)
290        elif format == "json":
291            content = json.dumps(self.config_spec, indent=2)
292
293        if output_file:
294            output_file.write_text(content)
295            return
296
297        syntax_highlighted = Syntax(content, format)
298        print(syntax_highlighted)
299
300    @property
301    def _yaml_spec(self) -> str:
302        """Get the spec as a yaml string.
303
304        For now, the primary use case is for writing and debugging a valid config for a source.
305
306        This is private for now because we probably want better polish before exposing this
307        as a stable interface. This will also get easier when we have docs links with this info
308        for each connector.
309        """
310        spec_obj: ConnectorSpecification = self._get_spec()
311        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
312        # convert to a yaml string
313        return yaml.dump(spec_dict)
314
315    @property
316    def docs_url(self) -> str:
317        """Get the URL to the connector's documentation."""
318        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
319            "source-", ""
320        )
321
322    @property
323    def discovered_catalog(self) -> AirbyteCatalog:
324        """Get the raw catalog for the given streams.
325
326        If the catalog is not yet known, we call discover to get it.
327        """
328        if self._discovered_catalog is None:
329            self._discovered_catalog = self._discover()
330
331        return self._discovered_catalog
332
333    @property
334    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
335        """Get the configured catalog for the given streams.
336
337        If the raw catalog is not yet known, we call discover to get it.
338
339        If no specific streams are selected, we return a catalog that syncs all available streams.
340
341        TODO: We should consider disabling by default the streams that the connector would
342        disable by default. (For instance, streams that require a premium license are sometimes
343        disabled by default within the connector.)
344        """
345        # Ensure discovered catalog is cached before we start
346        _ = self.discovered_catalog
347
348        # Filter for selected streams if set, otherwise use all available streams:
349        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
350        return self.get_configured_catalog(streams=streams_filter)
351
352    def get_configured_catalog(
353        self,
354        streams: Literal["*"] | list[str] | None = None,
355    ) -> ConfiguredAirbyteCatalog:
356        """Get a configured catalog for the given streams.
357
358        If no streams are provided, the selected streams will be used. If no streams are selected,
359        all available streams will be used.
360
361        If '*' is provided, all available streams will be used.
362        """
363        selected_streams: list[str] = []
364        if streams is None:
365            selected_streams = self._selected_stream_names or self.get_available_streams()
366        elif streams == "*":
367            selected_streams = self.get_available_streams()
368        elif isinstance(streams, list):
369            selected_streams = streams
370        else:
371            raise exc.PyAirbyteInputError(
372                message="Invalid streams argument.",
373                input_value=streams,
374            )
375
376        return ConfiguredAirbyteCatalog(
377            streams=[
378                ConfiguredAirbyteStream(
379                    stream=stream,
380                    destination_sync_mode=DestinationSyncMode.overwrite,
381                    primary_key=stream.source_defined_primary_key,
382                    sync_mode=SyncMode.incremental,
383                )
384                for stream in self.discovered_catalog.streams
385                if stream.name in selected_streams
386            ],
387        )
388
389    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
390        """Return the JSON Schema spec for the specified stream name."""
391        catalog: AirbyteCatalog = self.discovered_catalog
392        found: list[AirbyteStream] = [
393            stream for stream in catalog.streams if stream.name == stream_name
394        ]
395
396        if len(found) == 0:
397            raise exc.PyAirbyteInputError(
398                message="Stream name does not exist in catalog.",
399                input_value=stream_name,
400            )
401
402        if len(found) > 1:
403            raise exc.PyAirbyteInternalError(
404                message="Duplicate streams found with the same name.",
405                context={
406                    "found_streams": found,
407                },
408            )
409
410        return found[0].json_schema
411
412    def get_records(
413        self,
414        stream: str,
415        *,
416        normalize_field_names: bool = False,
417        prune_undeclared_fields: bool = True,
418    ) -> LazyDataset:
419        """Read a stream from the connector.
420
421        Args:
422            stream: The name of the stream to read.
423            normalize_field_names: When `True`, field names will be normalized to lower case, with
424                special characters removed. This matches the behavior of PyAirbyte caches and most
425                Airbyte destinations.
426            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
427                which generally matches the behavior of PyAirbyte caches and most Airbyte
428                destinations, specifically when you expect the catalog may be stale. You can disable
429                this to keep all fields in the records.
430
431        This involves the following steps:
432        * Call discover to get the catalog
433        * Generate a configured catalog that syncs the given stream in full_refresh mode
434        * Write the configured catalog and the config to a temporary file
435        * execute the connector with read --config <config_file> --catalog <catalog_file>
436        * Listen to the messages and return the first AirbyteRecordMessages that come along.
437        * Make sure the subprocess is killed when the function returns.
438        """
439        discovered_catalog: AirbyteCatalog = self.discovered_catalog
440        configured_catalog = ConfiguredAirbyteCatalog(
441            streams=[
442                ConfiguredAirbyteStream(
443                    stream=s,
444                    sync_mode=SyncMode.full_refresh,
445                    destination_sync_mode=DestinationSyncMode.overwrite,
446                )
447                for s in discovered_catalog.streams
448                if s.name == stream
449            ],
450        )
451        if len(configured_catalog.streams) == 0:
452            raise exc.PyAirbyteInputError(
453                message="Requested stream does not exist.",
454                context={
455                    "stream": stream,
456                    "available_streams": self.get_available_streams(),
457                    "connector_name": self.name,
458                },
459            ) from KeyError(stream)
460
461        configured_stream = configured_catalog.streams[0]
462
463        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
464            yield from records
465
466        stream_record_handler = StreamRecordHandler(
467            json_schema=self.get_stream_json_schema(stream),
468            prune_extra_fields=prune_undeclared_fields,
469            normalize_keys=normalize_field_names,
470        )
471
472        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
473        progress_tracker = ProgressTracker(
474            ProgressStyle.PLAIN,
475            source=self,
476            cache=None,
477            destination=None,
478            expected_streams=[stream],
479        )
480
481        iterator: Iterator[dict[str, Any]] = (
482            StreamRecord.from_record_message(
483                record_message=record.record,
484                stream_record_handler=stream_record_handler,
485            )
486            for record in self._read_with_catalog(
487                catalog=configured_catalog,
488                progress_tracker=progress_tracker,
489            )
490            if record.record
491        )
492        progress_tracker.log_success()
493        return LazyDataset(
494            iterator,
495            stream_metadata=configured_stream,
496        )
497
498    def get_documents(
499        self,
500        stream: str,
501        title_property: str | None = None,
502        content_properties: list[str] | None = None,
503        metadata_properties: list[str] | None = None,
504        *,
505        render_metadata: bool = False,
506    ) -> Iterable[Document]:
507        """Read a stream from the connector and return the records as documents.
508
509        If metadata_properties is not set, all properties that are not content will be added to
510        the metadata.
511
512        If render_metadata is True, metadata will be rendered in the document, as well as the
513        the main content.
514        """
515        return self.get_records(stream).to_documents(
516            title_property=title_property,
517            content_properties=content_properties,
518            metadata_properties=metadata_properties,
519            render_metadata=render_metadata,
520        )
521
522    def _get_airbyte_message_iterator(
523        self,
524        *,
525        streams: Literal["*"] | list[str] | None = None,
526        state_provider: StateProviderBase | None = None,
527        progress_tracker: ProgressTracker,
528        force_full_refresh: bool = False,
529    ) -> AirbyteMessageIterator:
530        """Get an AirbyteMessageIterator for this source."""
531        return AirbyteMessageIterator(
532            self._read_with_catalog(
533                catalog=self.get_configured_catalog(streams=streams),
534                state=state_provider if not force_full_refresh else None,
535                progress_tracker=progress_tracker,
536            )
537        )
538
539    def _read_with_catalog(
540        self,
541        catalog: ConfiguredAirbyteCatalog,
542        progress_tracker: ProgressTracker,
543        state: StateProviderBase | None = None,
544    ) -> Generator[AirbyteMessage, None, None]:
545        """Call read on the connector.
546
547        This involves the following steps:
548        * Write the config to a temporary file
549        * execute the connector with read --config <config_file> --catalog <catalog_file>
550        * Listen to the messages and return the AirbyteRecordMessages that come along.
551        * Send out telemetry on the performed sync (with information about which source was used and
552          the type of the cache)
553        """
554        with as_temp_files(
555            [
556                self._config,
557                catalog.model_dump_json(),
558                state.to_state_input_file_text() if state else "[]",
559            ]
560        ) as [
561            config_file,
562            catalog_file,
563            state_file,
564        ]:
565            message_generator = self._execute(
566                [
567                    "read",
568                    "--config",
569                    config_file,
570                    "--catalog",
571                    catalog_file,
572                    "--state",
573                    state_file,
574                ],
575                progress_tracker=progress_tracker,
576            )
577            yield from progress_tracker.tally_records_read(message_generator)
578        progress_tracker.log_read_complete()
579
580    def _peek_airbyte_message(
581        self,
582        message: AirbyteMessage,
583        *,
584        raise_on_error: bool = True,
585    ) -> None:
586        """Process an Airbyte message.
587
588        This method handles reading Airbyte messages and taking action, if needed, based on the
589        message type. For instance, log messages are logged, records are tallied, and errors are
590        raised as exceptions if `raise_on_error` is True.
591
592        Raises:
593            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
594        """
595        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
596
597    def _log_incremental_streams(
598        self,
599        *,
600        incremental_streams: set[str] | None = None,
601    ) -> None:
602        """Log the streams which are using incremental sync mode."""
603        log_message = (
604            "The following streams are currently using incremental sync:\n"
605            f"{incremental_streams}\n"
606            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
607        )
608        print(log_message)
609
610    def read(
611        self,
612        cache: CacheBase | None = None,
613        *,
614        streams: str | list[str] | None = None,
615        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
616        force_full_refresh: bool = False,
617        skip_validation: bool = False,
618    ) -> ReadResult:
619        """Read from the connector and write to the cache.
620
621        Args:
622            cache: The cache to write to. If not set, a default cache will be used.
623            streams: Optional if already set. A list of stream names to select for reading. If set
624                to "*", all streams will be selected.
625            write_strategy: The strategy to use when writing to the cache. If a string, it must be
626                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
627                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
628                WriteStrategy.AUTO.
629            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
630                streams will be read in incremental mode if supported by the connector. This option
631                must be True when using the "replace" strategy.
632            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
633                running the connector. This can be helpful in debugging, when you want to send
634                configurations to the connector that otherwise might be rejected by JSON Schema
635                validation rules.
636        """
637        cache = cache or get_default_cache()
638        progress_tracker = ProgressTracker(
639            source=self,
640            cache=cache,
641            destination=None,
642            expected_streams=None,  # Will be set later
643        )
644
645        # Set up state provider if not in full refresh mode
646        if force_full_refresh:
647            state_provider: StateProviderBase | None = None
648        else:
649            state_provider = cache.get_state_provider(
650                source_name=self._name,
651            )
652        state_writer = cache.get_state_writer(source_name=self._name)
653
654        if streams:
655            self.select_streams(streams)
656
657        if not self._selected_stream_names:
658            raise exc.PyAirbyteNoStreamsSelectedError(
659                connector_name=self.name,
660                available_streams=self.get_available_streams(),
661            )
662
663        try:
664            result = self._read_to_cache(
665                cache=cache,
666                catalog_provider=CatalogProvider(self.configured_catalog),
667                stream_names=self._selected_stream_names,
668                state_provider=state_provider,
669                state_writer=state_writer,
670                write_strategy=write_strategy,
671                force_full_refresh=force_full_refresh,
672                skip_validation=skip_validation,
673                progress_tracker=progress_tracker,
674            )
675        except exc.PyAirbyteInternalError as ex:
676            progress_tracker.log_failure(exception=ex)
677            raise exc.AirbyteConnectorFailedError(
678                connector_name=self.name,
679                log_text=self._last_log_messages,
680            ) from ex
681        except Exception as ex:
682            progress_tracker.log_failure(exception=ex)
683            raise
684
685        progress_tracker.log_success()
686        return result
687
688    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
689        self,
690        cache: CacheBase,
691        *,
692        catalog_provider: CatalogProvider,
693        stream_names: list[str],
694        state_provider: StateProviderBase | None,
695        state_writer: StateWriterBase | None,
696        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
697        force_full_refresh: bool = False,
698        skip_validation: bool = False,
699        progress_tracker: ProgressTracker,
700    ) -> ReadResult:
701        """Internal read method."""
702        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
703            warnings.warn(
704                message=(
705                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
706                    "could result in data loss. "
707                    "To silence this warning, use the following: "
708                    'warnings.filterwarnings("ignore", '
709                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
710                ),
711                category=exc.PyAirbyteDataLossWarning,
712                stacklevel=1,
713            )
714        if isinstance(write_strategy, str):
715            try:
716                write_strategy = WriteStrategy(write_strategy)
717            except ValueError:
718                raise exc.PyAirbyteInputError(
719                    message="Invalid strategy",
720                    context={
721                        "write_strategy": write_strategy,
722                        "available_strategies": [s.value for s in WriteStrategy],
723                    },
724                ) from None
725
726        # Run optional validation step
727        if not skip_validation:
728            self.validate_config()
729
730        # Log incremental stream if incremental streams are known
731        if state_provider and state_provider.known_stream_names:
732            # Retrieve set of the known streams support which support incremental sync
733            incremental_streams = (
734                set(self._get_incremental_stream_names())
735                & state_provider.known_stream_names
736                & set(self.get_selected_streams())
737            )
738            if incremental_streams:
739                self._log_incremental_streams(incremental_streams=incremental_streams)
740
741        airbyte_message_iterator = AirbyteMessageIterator(
742            self._read_with_catalog(
743                catalog=catalog_provider.configured_catalog,
744                state=state_provider,
745                progress_tracker=progress_tracker,
746            )
747        )
748        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
749            stdin=airbyte_message_iterator,
750            catalog_provider=catalog_provider,
751            write_strategy=write_strategy,
752            state_writer=state_writer,
753            progress_tracker=progress_tracker,
754        )
755
756        # Flush the WAL, if applicable
757        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
758
759        return ReadResult(
760            source_name=self.name,
761            progress_tracker=progress_tracker,
762            processed_streams=stream_names,
763            cache=cache,
764        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False)
58    def __init__(
59        self,
60        executor: Executor,
61        name: str,
62        config: dict[str, Any] | None = None,
63        *,
64        config_change_callback: ConfigChangeCallback | None = None,
65        streams: str | list[str] | None = None,
66        validate: bool = False,
67    ) -> None:
68        """Initialize the source.
69
70        If config is provided, it will be validated against the spec if validate is True.
71        """
72        self._to_be_selected_streams: list[str] | str = []
73        """Used to hold selection criteria before catalog is known."""
74
75        super().__init__(
76            executor=executor,
77            name=name,
78            config=config,
79            config_change_callback=config_change_callback,
80            validate=validate,
81        )
82        self._config_dict: dict[str, Any] | None = None
83        self._last_log_messages: list[str] = []
84        self._discovered_catalog: AirbyteCatalog | None = None
85        self._selected_stream_names: list[str] = []
86        if config is not None:
87            self.set_config(config, validate=validate)
88        if streams is not None:
89            self.select_streams(streams)
90
91        self._deployed_api_root: str | None = None
92        self._deployed_workspace_id: str | None = None
93        self._deployed_source_id: str | None = None

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type: Literal['source'] = 'source'
def set_streams(self, streams: list[str]) -> None:
 95    def set_streams(self, streams: list[str]) -> None:
 96        """Deprecated. See select_streams()."""
 97        warnings.warn(
 98            "The 'set_streams' method is deprecated and will be removed in a future version. "
 99            "Please use the 'select_streams' method instead.",
100            DeprecationWarning,
101            stacklevel=2,
102        )
103        self.select_streams(streams)

Deprecated. See select_streams().

def select_all_streams(self) -> None:
117    def select_all_streams(self) -> None:
118        """Select all streams.
119
120        This is a more streamlined equivalent to:
121        > source.select_streams(source.get_available_streams()).
122        """
123        if self._config_dict is None:
124            self._to_be_selected_streams = "*"
125            self._log_warning_preselected_stream(self._to_be_selected_streams)
126            return
127
128        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
130    def select_streams(self, streams: str | list[str]) -> None:
131        """Select the stream names that should be read from the connector.
132
133        Args:
134            streams: A list of stream names to select. If set to "*", all streams will be selected.
135
136        Currently, if this is not set, all streams will be read.
137        """
138        if self._config_dict is None:
139            self._to_be_selected_streams = streams
140            self._log_warning_preselected_stream(streams)
141            return
142
143        if streams == "*":
144            self.select_all_streams()
145            return
146
147        if isinstance(streams, str):
148            # If a single stream is provided, convert it to a one-item list
149            streams = [streams]
150
151        available_streams = self.get_available_streams()
152        for stream in streams:
153            if stream not in available_streams:
154                raise exc.AirbyteStreamNotFoundError(
155                    stream_name=stream,
156                    connector_name=self.name,
157                    available_streams=available_streams,
158                )
159        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
161    def get_selected_streams(self) -> list[str]:
162        """Get the selected streams.
163
164        If no streams are selected, return an empty list.
165        """
166        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
168    def set_config(
169        self,
170        config: dict[str, Any],
171        *,
172        validate: bool = True,
173    ) -> None:
174        """Set the config for the connector.
175
176        If validate is True, raise an exception if the config fails validation.
177
178        If validate is False, validation will be deferred until check() or validate_config()
179        is called.
180        """
181        if validate:
182            self.validate_config(config)
183
184        self._config_dict = config
185
186        if self._to_be_selected_streams:
187            self.select_streams(self._to_be_selected_streams)
188            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_config(self) -> dict[str, typing.Any]:
190    def get_config(self) -> dict[str, Any]:
191        """Get the config for the connector."""
192        return self._config

Get the config for the connector.

def get_available_streams(self) -> list[str]:
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
255    @property
256    def config_spec(self) -> dict[str, Any]:
257        """Generate a configuration spec for this connector, as a JSON Schema definition.
258
259        This function generates a JSON Schema dictionary with configuration specs for the
260        current connector, as a dictionary.
261
262        Returns:
263            dict: The JSON Schema configuration spec as a dictionary.
264        """
265        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
267    def print_config_spec(
268        self,
269        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
270        *,
271        output_file: Path | str | None = None,
272    ) -> None:
273        """Print the configuration spec for this connector.
274
275        Args:
276            format: The format to print the spec in. Must be "yaml" or "json".
277            output_file: Optional. If set, the spec will be written to the given file path.
278                Otherwise, it will be printed to the console.
279        """
280        if format not in {"yaml", "json"}:
281            raise exc.PyAirbyteInputError(
282                message="Invalid format. Expected 'yaml' or 'json'",
283                input_value=format,
284            )
285        if isinstance(output_file, str):
286            output_file = Path(output_file)
287
288        if format == "yaml":
289            content = yaml.dump(self.config_spec, indent=2)
290        elif format == "json":
291            content = json.dumps(self.config_spec, indent=2)
292
293        if output_file:
294            output_file.write_text(content)
295            return
296
297        syntax_highlighted = Syntax(content, format)
298        print(syntax_highlighted)

Print the configuration spec for this connector.

Arguments:
  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
315    @property
316    def docs_url(self) -> str:
317        """Get the URL to the connector's documentation."""
318        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
319            "source-", ""
320        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
322    @property
323    def discovered_catalog(self) -> AirbyteCatalog:
324        """Get the raw catalog for the given streams.
325
326        If the catalog is not yet known, we call discover to get it.
327        """
328        if self._discovered_catalog is None:
329            self._discovered_catalog = self._discover()
330
331        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
333    @property
334    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
335        """Get the configured catalog for the given streams.
336
337        If the raw catalog is not yet known, we call discover to get it.
338
339        If no specific streams are selected, we return a catalog that syncs all available streams.
340
341        TODO: We should consider disabling by default the streams that the connector would
342        disable by default. (For instance, streams that require a premium license are sometimes
343        disabled by default within the connector.)
344        """
345        # Ensure discovered catalog is cached before we start
346        _ = self.discovered_catalog
347
348        # Filter for selected streams if set, otherwise use all available streams:
349        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
350        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
352    def get_configured_catalog(
353        self,
354        streams: Literal["*"] | list[str] | None = None,
355    ) -> ConfiguredAirbyteCatalog:
356        """Get a configured catalog for the given streams.
357
358        If no streams are provided, the selected streams will be used. If no streams are selected,
359        all available streams will be used.
360
361        If '*' is provided, all available streams will be used.
362        """
363        selected_streams: list[str] = []
364        if streams is None:
365            selected_streams = self._selected_stream_names or self.get_available_streams()
366        elif streams == "*":
367            selected_streams = self.get_available_streams()
368        elif isinstance(streams, list):
369            selected_streams = streams
370        else:
371            raise exc.PyAirbyteInputError(
372                message="Invalid streams argument.",
373                input_value=streams,
374            )
375
376        return ConfiguredAirbyteCatalog(
377            streams=[
378                ConfiguredAirbyteStream(
379                    stream=stream,
380                    destination_sync_mode=DestinationSyncMode.overwrite,
381                    primary_key=stream.source_defined_primary_key,
382                    sync_mode=SyncMode.incremental,
383                )
384                for stream in self.discovered_catalog.streams
385                if stream.name in selected_streams
386            ],
387        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
389    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
390        """Return the JSON Schema spec for the specified stream name."""
391        catalog: AirbyteCatalog = self.discovered_catalog
392        found: list[AirbyteStream] = [
393            stream for stream in catalog.streams if stream.name == stream_name
394        ]
395
396        if len(found) == 0:
397            raise exc.PyAirbyteInputError(
398                message="Stream name does not exist in catalog.",
399                input_value=stream_name,
400            )
401
402        if len(found) > 1:
403            raise exc.PyAirbyteInternalError(
404                message="Duplicate streams found with the same name.",
405                context={
406                    "found_streams": found,
407                },
408            )
409
410        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
412    def get_records(
413        self,
414        stream: str,
415        *,
416        normalize_field_names: bool = False,
417        prune_undeclared_fields: bool = True,
418    ) -> LazyDataset:
419        """Read a stream from the connector.
420
421        Args:
422            stream: The name of the stream to read.
423            normalize_field_names: When `True`, field names will be normalized to lower case, with
424                special characters removed. This matches the behavior of PyAirbyte caches and most
425                Airbyte destinations.
426            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
427                which generally matches the behavior of PyAirbyte caches and most Airbyte
428                destinations, specifically when you expect the catalog may be stale. You can disable
429                this to keep all fields in the records.
430
431        This involves the following steps:
432        * Call discover to get the catalog
433        * Generate a configured catalog that syncs the given stream in full_refresh mode
434        * Write the configured catalog and the config to a temporary file
435        * execute the connector with read --config <config_file> --catalog <catalog_file>
436        * Listen to the messages and return the first AirbyteRecordMessages that come along.
437        * Make sure the subprocess is killed when the function returns.
438        """
439        discovered_catalog: AirbyteCatalog = self.discovered_catalog
440        configured_catalog = ConfiguredAirbyteCatalog(
441            streams=[
442                ConfiguredAirbyteStream(
443                    stream=s,
444                    sync_mode=SyncMode.full_refresh,
445                    destination_sync_mode=DestinationSyncMode.overwrite,
446                )
447                for s in discovered_catalog.streams
448                if s.name == stream
449            ],
450        )
451        if len(configured_catalog.streams) == 0:
452            raise exc.PyAirbyteInputError(
453                message="Requested stream does not exist.",
454                context={
455                    "stream": stream,
456                    "available_streams": self.get_available_streams(),
457                    "connector_name": self.name,
458                },
459            ) from KeyError(stream)
460
461        configured_stream = configured_catalog.streams[0]
462
463        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
464            yield from records
465
466        stream_record_handler = StreamRecordHandler(
467            json_schema=self.get_stream_json_schema(stream),
468            prune_extra_fields=prune_undeclared_fields,
469            normalize_keys=normalize_field_names,
470        )
471
472        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
473        progress_tracker = ProgressTracker(
474            ProgressStyle.PLAIN,
475            source=self,
476            cache=None,
477            destination=None,
478            expected_streams=[stream],
479        )
480
481        iterator: Iterator[dict[str, Any]] = (
482            StreamRecord.from_record_message(
483                record_message=record.record,
484                stream_record_handler=stream_record_handler,
485            )
486            for record in self._read_with_catalog(
487                catalog=configured_catalog,
488                progress_tracker=progress_tracker,
489            )
490            if record.record
491        )
492        progress_tracker.log_success()
493        return LazyDataset(
494            iterator,
495            stream_metadata=configured_stream,
496        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
498    def get_documents(
499        self,
500        stream: str,
501        title_property: str | None = None,
502        content_properties: list[str] | None = None,
503        metadata_properties: list[str] | None = None,
504        *,
505        render_metadata: bool = False,
506    ) -> Iterable[Document]:
507        """Read a stream from the connector and return the records as documents.
508
509        If metadata_properties is not set, all properties that are not content will be added to
510        the metadata.
511
512        If render_metadata is True, metadata will be rendered in the document, as well as the
513        the main content.
514        """
515        return self.get_records(stream).to_documents(
516            title_property=title_property,
517            content_properties=content_properties,
518            metadata_properties=metadata_properties,
519            render_metadata=render_metadata,
520        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
610    def read(
611        self,
612        cache: CacheBase | None = None,
613        *,
614        streams: str | list[str] | None = None,
615        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
616        force_full_refresh: bool = False,
617        skip_validation: bool = False,
618    ) -> ReadResult:
619        """Read from the connector and write to the cache.
620
621        Args:
622            cache: The cache to write to. If not set, a default cache will be used.
623            streams: Optional if already set. A list of stream names to select for reading. If set
624                to "*", all streams will be selected.
625            write_strategy: The strategy to use when writing to the cache. If a string, it must be
626                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
627                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
628                WriteStrategy.AUTO.
629            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
630                streams will be read in incremental mode if supported by the connector. This option
631                must be True when using the "replace" strategy.
632            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
633                running the connector. This can be helpful in debugging, when you want to send
634                configurations to the connector that otherwise might be rejected by JSON Schema
635                validation rules.
636        """
637        cache = cache or get_default_cache()
638        progress_tracker = ProgressTracker(
639            source=self,
640            cache=cache,
641            destination=None,
642            expected_streams=None,  # Will be set later
643        )
644
645        # Set up state provider if not in full refresh mode
646        if force_full_refresh:
647            state_provider: StateProviderBase | None = None
648        else:
649            state_provider = cache.get_state_provider(
650                source_name=self._name,
651            )
652        state_writer = cache.get_state_writer(source_name=self._name)
653
654        if streams:
655            self.select_streams(streams)
656
657        if not self._selected_stream_names:
658            raise exc.PyAirbyteNoStreamsSelectedError(
659                connector_name=self.name,
660                available_streams=self.get_available_streams(),
661            )
662
663        try:
664            result = self._read_to_cache(
665                cache=cache,
666                catalog_provider=CatalogProvider(self.configured_catalog),
667                stream_names=self._selected_stream_names,
668                state_provider=state_provider,
669                state_writer=state_writer,
670                write_strategy=write_strategy,
671                force_full_refresh=force_full_refresh,
672                skip_validation=skip_validation,
673                progress_tracker=progress_tracker,
674            )
675        except exc.PyAirbyteInternalError as ex:
676            progress_tracker.log_failure(exception=ex)
677            raise exc.AirbyteConnectorFailedError(
678                connector_name=self.name,
679                log_text=self._last_log_messages,
680            ) from ex
681        except Exception as ex:
682            progress_tracker.log_failure(exception=ex)
683            raise
684
685        progress_tracker.log_success()
686        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
config_hash
validate_config
connector_version
check
install
uninstall