airbyte.sources.base

Base class implementation for sources.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Base class implementation for sources."""
  3
  4from __future__ import annotations
  5
  6import json
  7import warnings
  8from pathlib import Path
  9from typing import TYPE_CHECKING, Any, Literal
 10
 11import yaml
 12from rich import print  # noqa: A004  # Allow shadowing the built-in
 13from rich.syntax import Syntax
 14
 15from airbyte_protocol.models import (
 16    AirbyteCatalog,
 17    AirbyteMessage,
 18    ConfiguredAirbyteCatalog,
 19    ConfiguredAirbyteStream,
 20    DestinationSyncMode,
 21    SyncMode,
 22    Type,
 23)
 24
 25from airbyte import exceptions as exc
 26from airbyte._connector_base import ConnectorBase
 27from airbyte._message_iterators import AirbyteMessageIterator
 28from airbyte._util.temp_files import as_temp_files
 29from airbyte.caches.util import get_default_cache
 30from airbyte.datasets._lazy import LazyDataset
 31from airbyte.progress import ProgressStyle, ProgressTracker
 32from airbyte.records import StreamRecord, StreamRecordHandler
 33from airbyte.results import ReadResult
 34from airbyte.shared.catalog_providers import CatalogProvider
 35from airbyte.strategies import WriteStrategy
 36
 37
 38if TYPE_CHECKING:
 39    from collections.abc import Generator, Iterable, Iterator
 40
 41    from airbyte_cdk import ConnectorSpecification
 42    from airbyte_protocol.models import AirbyteStream
 43
 44    from airbyte._executors.base import Executor
 45    from airbyte.caches import CacheBase
 46    from airbyte.callbacks import ConfigChangeCallback
 47    from airbyte.documents import Document
 48    from airbyte.shared.state_providers import StateProviderBase
 49    from airbyte.shared.state_writers import StateWriterBase
 50
 51
 52class Source(ConnectorBase):
 53    """A class representing a source that can be called."""
 54
 55    connector_type = "source"
 56
 57    def __init__(
 58        self,
 59        executor: Executor,
 60        name: str,
 61        config: dict[str, Any] | None = None,
 62        *,
 63        config_change_callback: ConfigChangeCallback | None = None,
 64        streams: str | list[str] | None = None,
 65        validate: bool = False,
 66    ) -> None:
 67        """Initialize the source.
 68
 69        If config is provided, it will be validated against the spec if validate is True.
 70        """
 71        self._to_be_selected_streams: list[str] | str = []
 72        """Used to hold selection criteria before catalog is known."""
 73
 74        super().__init__(
 75            executor=executor,
 76            name=name,
 77            config=config,
 78            config_change_callback=config_change_callback,
 79            validate=validate,
 80        )
 81        self._config_dict: dict[str, Any] | None = None
 82        self._last_log_messages: list[str] = []
 83        self._discovered_catalog: AirbyteCatalog | None = None
 84        self._selected_stream_names: list[str] = []
 85        if config is not None:
 86            self.set_config(config, validate=validate)
 87        if streams is not None:
 88            self.select_streams(streams)
 89
 90    def set_streams(self, streams: list[str]) -> None:
 91        """Deprecated. See select_streams()."""
 92        warnings.warn(
 93            "The 'set_streams' method is deprecated and will be removed in a future version. "
 94            "Please use the 'select_streams' method instead.",
 95            DeprecationWarning,
 96            stacklevel=2,
 97        )
 98        self.select_streams(streams)
 99
100    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
101        """Logs a warning message indicating stream selection which are not selected yet."""
102        if streams == "*":
103            print(
104                "Warning: Config is not set yet. All streams will be selected after config is set."
105            )
106        else:
107            print(
108                "Warning: Config is not set yet. "
109                f"Streams to be selected after config is set: {streams}"
110            )
111
112    def select_all_streams(self) -> None:
113        """Select all streams.
114
115        This is a more streamlined equivalent to:
116        > source.select_streams(source.get_available_streams()).
117        """
118        if self._config_dict is None:
119            self._to_be_selected_streams = "*"
120            self._log_warning_preselected_stream(self._to_be_selected_streams)
121            return
122
123        self._selected_stream_names = self.get_available_streams()
124
125    def select_streams(self, streams: str | list[str]) -> None:
126        """Select the stream names that should be read from the connector.
127
128        Args:
129            streams: A list of stream names to select. If set to "*", all streams will be selected.
130
131        Currently, if this is not set, all streams will be read.
132        """
133        if self._config_dict is None:
134            self._to_be_selected_streams = streams
135            self._log_warning_preselected_stream(streams)
136            return
137
138        if streams == "*":
139            self.select_all_streams()
140            return
141
142        if isinstance(streams, str):
143            # If a single stream is provided, convert it to a one-item list
144            streams = [streams]
145
146        available_streams = self.get_available_streams()
147        for stream in streams:
148            if stream not in available_streams:
149                raise exc.AirbyteStreamNotFoundError(
150                    stream_name=stream,
151                    connector_name=self.name,
152                    available_streams=available_streams,
153                )
154        self._selected_stream_names = streams
155
156    def get_selected_streams(self) -> list[str]:
157        """Get the selected streams.
158
159        If no streams are selected, return an empty list.
160        """
161        return self._selected_stream_names
162
163    def set_config(
164        self,
165        config: dict[str, Any],
166        *,
167        validate: bool = True,
168    ) -> None:
169        """Set the config for the connector.
170
171        If validate is True, raise an exception if the config fails validation.
172
173        If validate is False, validation will be deferred until check() or validate_config()
174        is called.
175        """
176        if validate:
177            self.validate_config(config)
178
179        self._config_dict = config
180
181        if self._to_be_selected_streams:
182            self.select_streams(self._to_be_selected_streams)
183            self._to_be_selected_streams = []
184
185    def get_config(self) -> dict[str, Any]:
186        """Get the config for the connector."""
187        return self._config
188
189    @property
190    def _config(self) -> dict[str, Any]:
191        if self._config_dict is None:
192            raise exc.AirbyteConnectorConfigurationMissingError(
193                connector_name=self.name,
194                guidance="Provide via get_source() or set_config()",
195            )
196        return self._config_dict
197
198    def _discover(self) -> AirbyteCatalog:
199        """Call discover on the connector.
200
201        This involves the following steps:
202        - Write the config to a temporary file
203        - execute the connector with discover --config <config_file>
204        - Listen to the messages and return the first AirbyteCatalog that comes along.
205        - Make sure the subprocess is killed when the function returns.
206        """
207        with as_temp_files([self._config]) as [config_file]:
208            for msg in self._execute(["discover", "--config", config_file]):
209                if msg.type == Type.CATALOG and msg.catalog:
210                    return msg.catalog
211            raise exc.AirbyteConnectorMissingCatalogError(
212                connector_name=self.name,
213                log_text=self._last_log_messages,
214            )
215
216    def get_available_streams(self) -> list[str]:
217        """Get the available streams from the spec."""
218        return [s.name for s in self.discovered_catalog.streams]
219
220    def _get_incremental_stream_names(self) -> list[str]:
221        """Get the name of streams that support incremental sync."""
222        return [
223            stream.name
224            for stream in self.discovered_catalog.streams
225            if SyncMode.incremental in stream.supported_sync_modes
226        ]
227
228    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
229        """Call spec on the connector.
230
231        This involves the following steps:
232        * execute the connector with spec
233        * Listen to the messages and return the first AirbyteCatalog that comes along.
234        * Make sure the subprocess is killed when the function returns.
235        """
236        if force_refresh or self._spec is None:
237            for msg in self._execute(["spec"]):
238                if msg.type == Type.SPEC and msg.spec:
239                    self._spec = msg.spec
240                    break
241
242        if self._spec:
243            return self._spec
244
245        raise exc.AirbyteConnectorMissingSpecError(
246            connector_name=self.name,
247            log_text=self._last_log_messages,
248        )
249
250    @property
251    def config_spec(self) -> dict[str, Any]:
252        """Generate a configuration spec for this connector, as a JSON Schema definition.
253
254        This function generates a JSON Schema dictionary with configuration specs for the
255        current connector, as a dictionary.
256
257        Returns:
258            dict: The JSON Schema configuration spec as a dictionary.
259        """
260        return self._get_spec(force_refresh=True).connectionSpecification
261
262    def print_config_spec(
263        self,
264        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
265        *,
266        output_file: Path | str | None = None,
267    ) -> None:
268        """Print the configuration spec for this connector.
269
270        Args:
271            format: The format to print the spec in. Must be "yaml" or "json".
272            output_file: Optional. If set, the spec will be written to the given file path.
273                Otherwise, it will be printed to the console.
274        """
275        if format not in {"yaml", "json"}:
276            raise exc.PyAirbyteInputError(
277                message="Invalid format. Expected 'yaml' or 'json'",
278                input_value=format,
279            )
280        if isinstance(output_file, str):
281            output_file = Path(output_file)
282
283        if format == "yaml":
284            content = yaml.dump(self.config_spec, indent=2)
285        elif format == "json":
286            content = json.dumps(self.config_spec, indent=2)
287
288        if output_file:
289            output_file.write_text(content)
290            return
291
292        syntax_highlighted = Syntax(content, format)
293        print(syntax_highlighted)
294
295    @property
296    def _yaml_spec(self) -> str:
297        """Get the spec as a yaml string.
298
299        For now, the primary use case is for writing and debugging a valid config for a source.
300
301        This is private for now because we probably want better polish before exposing this
302        as a stable interface. This will also get easier when we have docs links with this info
303        for each connector.
304        """
305        spec_obj: ConnectorSpecification = self._get_spec()
306        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
307        # convert to a yaml string
308        return yaml.dump(spec_dict)
309
310    @property
311    def docs_url(self) -> str:
312        """Get the URL to the connector's documentation."""
313        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
314            "source-", ""
315        )
316
317    @property
318    def discovered_catalog(self) -> AirbyteCatalog:
319        """Get the raw catalog for the given streams.
320
321        If the catalog is not yet known, we call discover to get it.
322        """
323        if self._discovered_catalog is None:
324            self._discovered_catalog = self._discover()
325
326        return self._discovered_catalog
327
328    @property
329    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
330        """Get the configured catalog for the given streams.
331
332        If the raw catalog is not yet known, we call discover to get it.
333
334        If no specific streams are selected, we return a catalog that syncs all available streams.
335
336        TODO: We should consider disabling by default the streams that the connector would
337        disable by default. (For instance, streams that require a premium license are sometimes
338        disabled by default within the connector.)
339        """
340        # Ensure discovered catalog is cached before we start
341        _ = self.discovered_catalog
342
343        # Filter for selected streams if set, otherwise use all available streams:
344        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
345        return self.get_configured_catalog(streams=streams_filter)
346
347    def get_configured_catalog(
348        self,
349        streams: Literal["*"] | list[str] | None = None,
350    ) -> ConfiguredAirbyteCatalog:
351        """Get a configured catalog for the given streams.
352
353        If no streams are provided, the selected streams will be used. If no streams are selected,
354        all available streams will be used.
355
356        If '*' is provided, all available streams will be used.
357        """
358        selected_streams: list[str] = []
359        if streams is None:
360            selected_streams = self._selected_stream_names or self.get_available_streams()
361        elif streams == "*":
362            selected_streams = self.get_available_streams()
363        elif isinstance(streams, list):
364            selected_streams = streams
365        else:
366            raise exc.PyAirbyteInputError(
367                message="Invalid streams argument.",
368                input_value=streams,
369            )
370
371        return ConfiguredAirbyteCatalog(
372            streams=[
373                ConfiguredAirbyteStream(
374                    stream=stream,
375                    destination_sync_mode=DestinationSyncMode.overwrite,
376                    primary_key=stream.source_defined_primary_key,
377                    sync_mode=SyncMode.incremental,
378                )
379                for stream in self.discovered_catalog.streams
380                if stream.name in selected_streams
381            ],
382        )
383
384    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
385        """Return the JSON Schema spec for the specified stream name."""
386        catalog: AirbyteCatalog = self.discovered_catalog
387        found: list[AirbyteStream] = [
388            stream for stream in catalog.streams if stream.name == stream_name
389        ]
390
391        if len(found) == 0:
392            raise exc.PyAirbyteInputError(
393                message="Stream name does not exist in catalog.",
394                input_value=stream_name,
395            )
396
397        if len(found) > 1:
398            raise exc.PyAirbyteInternalError(
399                message="Duplicate streams found with the same name.",
400                context={
401                    "found_streams": found,
402                },
403            )
404
405        return found[0].json_schema
406
407    def get_records(
408        self,
409        stream: str,
410        *,
411        normalize_field_names: bool = False,
412        prune_undeclared_fields: bool = True,
413    ) -> LazyDataset:
414        """Read a stream from the connector.
415
416        Args:
417            stream: The name of the stream to read.
418            normalize_field_names: When `True`, field names will be normalized to lower case, with
419                special characters removed. This matches the behavior of PyAirbyte caches and most
420                Airbyte destinations.
421            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
422                which generally matches the behavior of PyAirbyte caches and most Airbyte
423                destinations, specifically when you expect the catalog may be stale. You can disable
424                this to keep all fields in the records.
425
426        This involves the following steps:
427        * Call discover to get the catalog
428        * Generate a configured catalog that syncs the given stream in full_refresh mode
429        * Write the configured catalog and the config to a temporary file
430        * execute the connector with read --config <config_file> --catalog <catalog_file>
431        * Listen to the messages and return the first AirbyteRecordMessages that come along.
432        * Make sure the subprocess is killed when the function returns.
433        """
434        discovered_catalog: AirbyteCatalog = self.discovered_catalog
435        configured_catalog = ConfiguredAirbyteCatalog(
436            streams=[
437                ConfiguredAirbyteStream(
438                    stream=s,
439                    sync_mode=SyncMode.full_refresh,
440                    destination_sync_mode=DestinationSyncMode.overwrite,
441                )
442                for s in discovered_catalog.streams
443                if s.name == stream
444            ],
445        )
446        if len(configured_catalog.streams) == 0:
447            raise exc.PyAirbyteInputError(
448                message="Requested stream does not exist.",
449                context={
450                    "stream": stream,
451                    "available_streams": self.get_available_streams(),
452                    "connector_name": self.name,
453                },
454            ) from KeyError(stream)
455
456        configured_stream = configured_catalog.streams[0]
457
458        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
459            yield from records
460
461        stream_record_handler = StreamRecordHandler(
462            json_schema=self.get_stream_json_schema(stream),
463            prune_extra_fields=prune_undeclared_fields,
464            normalize_keys=normalize_field_names,
465        )
466
467        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
468        progress_tracker = ProgressTracker(
469            ProgressStyle.PLAIN,
470            source=self,
471            cache=None,
472            destination=None,
473            expected_streams=[stream],
474        )
475
476        iterator: Iterator[dict[str, Any]] = (
477            StreamRecord.from_record_message(
478                record_message=record.record,
479                stream_record_handler=stream_record_handler,
480            )
481            for record in self._read_with_catalog(
482                catalog=configured_catalog,
483                progress_tracker=progress_tracker,
484            )
485            if record.record
486        )
487        progress_tracker.log_success()
488        return LazyDataset(
489            iterator,
490            stream_metadata=configured_stream,
491        )
492
493    def get_documents(
494        self,
495        stream: str,
496        title_property: str | None = None,
497        content_properties: list[str] | None = None,
498        metadata_properties: list[str] | None = None,
499        *,
500        render_metadata: bool = False,
501    ) -> Iterable[Document]:
502        """Read a stream from the connector and return the records as documents.
503
504        If metadata_properties is not set, all properties that are not content will be added to
505        the metadata.
506
507        If render_metadata is True, metadata will be rendered in the document, as well as the
508        the main content.
509        """
510        return self.get_records(stream).to_documents(
511            title_property=title_property,
512            content_properties=content_properties,
513            metadata_properties=metadata_properties,
514            render_metadata=render_metadata,
515        )
516
517    def _get_airbyte_message_iterator(
518        self,
519        *,
520        streams: Literal["*"] | list[str] | None = None,
521        state_provider: StateProviderBase | None = None,
522        progress_tracker: ProgressTracker,
523        force_full_refresh: bool = False,
524    ) -> AirbyteMessageIterator:
525        """Get an AirbyteMessageIterator for this source."""
526        return AirbyteMessageIterator(
527            self._read_with_catalog(
528                catalog=self.get_configured_catalog(streams=streams),
529                state=state_provider if not force_full_refresh else None,
530                progress_tracker=progress_tracker,
531            )
532        )
533
534    def _read_with_catalog(
535        self,
536        catalog: ConfiguredAirbyteCatalog,
537        progress_tracker: ProgressTracker,
538        state: StateProviderBase | None = None,
539    ) -> Generator[AirbyteMessage, None, None]:
540        """Call read on the connector.
541
542        This involves the following steps:
543        * Write the config to a temporary file
544        * execute the connector with read --config <config_file> --catalog <catalog_file>
545        * Listen to the messages and return the AirbyteRecordMessages that come along.
546        * Send out telemetry on the performed sync (with information about which source was used and
547          the type of the cache)
548        """
549        with as_temp_files(
550            [
551                self._config,
552                catalog.model_dump_json(),
553                state.to_state_input_file_text() if state else "[]",
554            ]
555        ) as [
556            config_file,
557            catalog_file,
558            state_file,
559        ]:
560            message_generator = self._execute(
561                [
562                    "read",
563                    "--config",
564                    config_file,
565                    "--catalog",
566                    catalog_file,
567                    "--state",
568                    state_file,
569                ],
570                progress_tracker=progress_tracker,
571            )
572            yield from progress_tracker.tally_records_read(message_generator)
573        progress_tracker.log_read_complete()
574
575    def _peek_airbyte_message(
576        self,
577        message: AirbyteMessage,
578        *,
579        raise_on_error: bool = True,
580    ) -> None:
581        """Process an Airbyte message.
582
583        This method handles reading Airbyte messages and taking action, if needed, based on the
584        message type. For instance, log messages are logged, records are tallied, and errors are
585        raised as exceptions if `raise_on_error` is True.
586
587        Raises:
588            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
589        """
590        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
591
592    def _log_incremental_streams(
593        self,
594        *,
595        incremental_streams: set[str] | None = None,
596    ) -> None:
597        """Log the streams which are using incremental sync mode."""
598        log_message = (
599            "The following streams are currently using incremental sync:\n"
600            f"{incremental_streams}\n"
601            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
602        )
603        print(log_message)
604
605    def read(
606        self,
607        cache: CacheBase | None = None,
608        *,
609        streams: str | list[str] | None = None,
610        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
611        force_full_refresh: bool = False,
612        skip_validation: bool = False,
613    ) -> ReadResult:
614        """Read from the connector and write to the cache.
615
616        Args:
617            cache: The cache to write to. If not set, a default cache will be used.
618            streams: Optional if already set. A list of stream names to select for reading. If set
619                to "*", all streams will be selected.
620            write_strategy: The strategy to use when writing to the cache. If a string, it must be
621                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
622                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
623                WriteStrategy.AUTO.
624            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
625                streams will be read in incremental mode if supported by the connector. This option
626                must be True when using the "replace" strategy.
627            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
628                running the connector. This can be helpful in debugging, when you want to send
629                configurations to the connector that otherwise might be rejected by JSON Schema
630                validation rules.
631        """
632        cache = cache or get_default_cache()
633        progress_tracker = ProgressTracker(
634            source=self,
635            cache=cache,
636            destination=None,
637            expected_streams=None,  # Will be set later
638        )
639
640        # Set up state provider if not in full refresh mode
641        if force_full_refresh:
642            state_provider: StateProviderBase | None = None
643        else:
644            state_provider = cache.get_state_provider(
645                source_name=self._name,
646            )
647        state_writer = cache.get_state_writer(source_name=self._name)
648
649        if streams:
650            self.select_streams(streams)
651
652        if not self._selected_stream_names:
653            raise exc.PyAirbyteNoStreamsSelectedError(
654                connector_name=self.name,
655                available_streams=self.get_available_streams(),
656            )
657
658        try:
659            result = self._read_to_cache(
660                cache=cache,
661                catalog_provider=CatalogProvider(self.configured_catalog),
662                stream_names=self._selected_stream_names,
663                state_provider=state_provider,
664                state_writer=state_writer,
665                write_strategy=write_strategy,
666                force_full_refresh=force_full_refresh,
667                skip_validation=skip_validation,
668                progress_tracker=progress_tracker,
669            )
670        except exc.PyAirbyteInternalError as ex:
671            progress_tracker.log_failure(exception=ex)
672            raise exc.AirbyteConnectorFailedError(
673                connector_name=self.name,
674                log_text=self._last_log_messages,
675            ) from ex
676        except Exception as ex:
677            progress_tracker.log_failure(exception=ex)
678            raise
679
680        progress_tracker.log_success()
681        return result
682
683    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
684        self,
685        cache: CacheBase,
686        *,
687        catalog_provider: CatalogProvider,
688        stream_names: list[str],
689        state_provider: StateProviderBase | None,
690        state_writer: StateWriterBase | None,
691        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
692        force_full_refresh: bool = False,
693        skip_validation: bool = False,
694        progress_tracker: ProgressTracker,
695    ) -> ReadResult:
696        """Internal read method."""
697        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
698            warnings.warn(
699                message=(
700                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
701                    "could result in data loss. "
702                    "To silence this warning, use the following: "
703                    'warnings.filterwarnings("ignore", '
704                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
705                ),
706                category=exc.PyAirbyteDataLossWarning,
707                stacklevel=1,
708            )
709        if isinstance(write_strategy, str):
710            try:
711                write_strategy = WriteStrategy(write_strategy)
712            except ValueError:
713                raise exc.PyAirbyteInputError(
714                    message="Invalid strategy",
715                    context={
716                        "write_strategy": write_strategy,
717                        "available_strategies": [s.value for s in WriteStrategy],
718                    },
719                ) from None
720
721        # Run optional validation step
722        if not skip_validation:
723            self.validate_config()
724
725        # Log incremental stream if incremental streams are known
726        if state_provider and state_provider.known_stream_names:
727            # Retrieve set of the known streams support which support incremental sync
728            incremental_streams = (
729                set(self._get_incremental_stream_names())
730                & state_provider.known_stream_names
731                & set(self.get_selected_streams())
732            )
733            if incremental_streams:
734                self._log_incremental_streams(incremental_streams=incremental_streams)
735
736        airbyte_message_iterator = AirbyteMessageIterator(
737            self._read_with_catalog(
738                catalog=catalog_provider.configured_catalog,
739                state=state_provider,
740                progress_tracker=progress_tracker,
741            )
742        )
743        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
744            stdin=airbyte_message_iterator,
745            catalog_provider=catalog_provider,
746            write_strategy=write_strategy,
747            state_writer=state_writer,
748            progress_tracker=progress_tracker,
749        )
750
751        # Flush the WAL, if applicable
752        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
753
754        return ReadResult(
755            source_name=self.name,
756            progress_tracker=progress_tracker,
757            processed_streams=stream_names,
758            cache=cache,
759        )
760
761
762__all__ = [
763    "Source",
764]
class Source(airbyte._connector_base.ConnectorBase):
 53class Source(ConnectorBase):
 54    """A class representing a source that can be called."""
 55
 56    connector_type = "source"
 57
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67    ) -> None:
 68        """Initialize the source.
 69
 70        If config is provided, it will be validated against the spec if validate is True.
 71        """
 72        self._to_be_selected_streams: list[str] | str = []
 73        """Used to hold selection criteria before catalog is known."""
 74
 75        super().__init__(
 76            executor=executor,
 77            name=name,
 78            config=config,
 79            config_change_callback=config_change_callback,
 80            validate=validate,
 81        )
 82        self._config_dict: dict[str, Any] | None = None
 83        self._last_log_messages: list[str] = []
 84        self._discovered_catalog: AirbyteCatalog | None = None
 85        self._selected_stream_names: list[str] = []
 86        if config is not None:
 87            self.set_config(config, validate=validate)
 88        if streams is not None:
 89            self.select_streams(streams)
 90
 91    def set_streams(self, streams: list[str]) -> None:
 92        """Deprecated. See select_streams()."""
 93        warnings.warn(
 94            "The 'set_streams' method is deprecated and will be removed in a future version. "
 95            "Please use the 'select_streams' method instead.",
 96            DeprecationWarning,
 97            stacklevel=2,
 98        )
 99        self.select_streams(streams)
100
101    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
102        """Logs a warning message indicating stream selection which are not selected yet."""
103        if streams == "*":
104            print(
105                "Warning: Config is not set yet. All streams will be selected after config is set."
106            )
107        else:
108            print(
109                "Warning: Config is not set yet. "
110                f"Streams to be selected after config is set: {streams}"
111            )
112
113    def select_all_streams(self) -> None:
114        """Select all streams.
115
116        This is a more streamlined equivalent to:
117        > source.select_streams(source.get_available_streams()).
118        """
119        if self._config_dict is None:
120            self._to_be_selected_streams = "*"
121            self._log_warning_preselected_stream(self._to_be_selected_streams)
122            return
123
124        self._selected_stream_names = self.get_available_streams()
125
126    def select_streams(self, streams: str | list[str]) -> None:
127        """Select the stream names that should be read from the connector.
128
129        Args:
130            streams: A list of stream names to select. If set to "*", all streams will be selected.
131
132        Currently, if this is not set, all streams will be read.
133        """
134        if self._config_dict is None:
135            self._to_be_selected_streams = streams
136            self._log_warning_preselected_stream(streams)
137            return
138
139        if streams == "*":
140            self.select_all_streams()
141            return
142
143        if isinstance(streams, str):
144            # If a single stream is provided, convert it to a one-item list
145            streams = [streams]
146
147        available_streams = self.get_available_streams()
148        for stream in streams:
149            if stream not in available_streams:
150                raise exc.AirbyteStreamNotFoundError(
151                    stream_name=stream,
152                    connector_name=self.name,
153                    available_streams=available_streams,
154                )
155        self._selected_stream_names = streams
156
157    def get_selected_streams(self) -> list[str]:
158        """Get the selected streams.
159
160        If no streams are selected, return an empty list.
161        """
162        return self._selected_stream_names
163
164    def set_config(
165        self,
166        config: dict[str, Any],
167        *,
168        validate: bool = True,
169    ) -> None:
170        """Set the config for the connector.
171
172        If validate is True, raise an exception if the config fails validation.
173
174        If validate is False, validation will be deferred until check() or validate_config()
175        is called.
176        """
177        if validate:
178            self.validate_config(config)
179
180        self._config_dict = config
181
182        if self._to_be_selected_streams:
183            self.select_streams(self._to_be_selected_streams)
184            self._to_be_selected_streams = []
185
186    def get_config(self) -> dict[str, Any]:
187        """Get the config for the connector."""
188        return self._config
189
190    @property
191    def _config(self) -> dict[str, Any]:
192        if self._config_dict is None:
193            raise exc.AirbyteConnectorConfigurationMissingError(
194                connector_name=self.name,
195                guidance="Provide via get_source() or set_config()",
196            )
197        return self._config_dict
198
199    def _discover(self) -> AirbyteCatalog:
200        """Call discover on the connector.
201
202        This involves the following steps:
203        - Write the config to a temporary file
204        - execute the connector with discover --config <config_file>
205        - Listen to the messages and return the first AirbyteCatalog that comes along.
206        - Make sure the subprocess is killed when the function returns.
207        """
208        with as_temp_files([self._config]) as [config_file]:
209            for msg in self._execute(["discover", "--config", config_file]):
210                if msg.type == Type.CATALOG and msg.catalog:
211                    return msg.catalog
212            raise exc.AirbyteConnectorMissingCatalogError(
213                connector_name=self.name,
214                log_text=self._last_log_messages,
215            )
216
217    def get_available_streams(self) -> list[str]:
218        """Get the available streams from the spec."""
219        return [s.name for s in self.discovered_catalog.streams]
220
221    def _get_incremental_stream_names(self) -> list[str]:
222        """Get the name of streams that support incremental sync."""
223        return [
224            stream.name
225            for stream in self.discovered_catalog.streams
226            if SyncMode.incremental in stream.supported_sync_modes
227        ]
228
229    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
230        """Call spec on the connector.
231
232        This involves the following steps:
233        * execute the connector with spec
234        * Listen to the messages and return the first AirbyteCatalog that comes along.
235        * Make sure the subprocess is killed when the function returns.
236        """
237        if force_refresh or self._spec is None:
238            for msg in self._execute(["spec"]):
239                if msg.type == Type.SPEC and msg.spec:
240                    self._spec = msg.spec
241                    break
242
243        if self._spec:
244            return self._spec
245
246        raise exc.AirbyteConnectorMissingSpecError(
247            connector_name=self.name,
248            log_text=self._last_log_messages,
249        )
250
251    @property
252    def config_spec(self) -> dict[str, Any]:
253        """Generate a configuration spec for this connector, as a JSON Schema definition.
254
255        This function generates a JSON Schema dictionary with configuration specs for the
256        current connector, as a dictionary.
257
258        Returns:
259            dict: The JSON Schema configuration spec as a dictionary.
260        """
261        return self._get_spec(force_refresh=True).connectionSpecification
262
263    def print_config_spec(
264        self,
265        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
266        *,
267        output_file: Path | str | None = None,
268    ) -> None:
269        """Print the configuration spec for this connector.
270
271        Args:
272            format: The format to print the spec in. Must be "yaml" or "json".
273            output_file: Optional. If set, the spec will be written to the given file path.
274                Otherwise, it will be printed to the console.
275        """
276        if format not in {"yaml", "json"}:
277            raise exc.PyAirbyteInputError(
278                message="Invalid format. Expected 'yaml' or 'json'",
279                input_value=format,
280            )
281        if isinstance(output_file, str):
282            output_file = Path(output_file)
283
284        if format == "yaml":
285            content = yaml.dump(self.config_spec, indent=2)
286        elif format == "json":
287            content = json.dumps(self.config_spec, indent=2)
288
289        if output_file:
290            output_file.write_text(content)
291            return
292
293        syntax_highlighted = Syntax(content, format)
294        print(syntax_highlighted)
295
296    @property
297    def _yaml_spec(self) -> str:
298        """Get the spec as a yaml string.
299
300        For now, the primary use case is for writing and debugging a valid config for a source.
301
302        This is private for now because we probably want better polish before exposing this
303        as a stable interface. This will also get easier when we have docs links with this info
304        for each connector.
305        """
306        spec_obj: ConnectorSpecification = self._get_spec()
307        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
308        # convert to a yaml string
309        return yaml.dump(spec_dict)
310
311    @property
312    def docs_url(self) -> str:
313        """Get the URL to the connector's documentation."""
314        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
315            "source-", ""
316        )
317
318    @property
319    def discovered_catalog(self) -> AirbyteCatalog:
320        """Get the raw catalog for the given streams.
321
322        If the catalog is not yet known, we call discover to get it.
323        """
324        if self._discovered_catalog is None:
325            self._discovered_catalog = self._discover()
326
327        return self._discovered_catalog
328
329    @property
330    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
331        """Get the configured catalog for the given streams.
332
333        If the raw catalog is not yet known, we call discover to get it.
334
335        If no specific streams are selected, we return a catalog that syncs all available streams.
336
337        TODO: We should consider disabling by default the streams that the connector would
338        disable by default. (For instance, streams that require a premium license are sometimes
339        disabled by default within the connector.)
340        """
341        # Ensure discovered catalog is cached before we start
342        _ = self.discovered_catalog
343
344        # Filter for selected streams if set, otherwise use all available streams:
345        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
346        return self.get_configured_catalog(streams=streams_filter)
347
348    def get_configured_catalog(
349        self,
350        streams: Literal["*"] | list[str] | None = None,
351    ) -> ConfiguredAirbyteCatalog:
352        """Get a configured catalog for the given streams.
353
354        If no streams are provided, the selected streams will be used. If no streams are selected,
355        all available streams will be used.
356
357        If '*' is provided, all available streams will be used.
358        """
359        selected_streams: list[str] = []
360        if streams is None:
361            selected_streams = self._selected_stream_names or self.get_available_streams()
362        elif streams == "*":
363            selected_streams = self.get_available_streams()
364        elif isinstance(streams, list):
365            selected_streams = streams
366        else:
367            raise exc.PyAirbyteInputError(
368                message="Invalid streams argument.",
369                input_value=streams,
370            )
371
372        return ConfiguredAirbyteCatalog(
373            streams=[
374                ConfiguredAirbyteStream(
375                    stream=stream,
376                    destination_sync_mode=DestinationSyncMode.overwrite,
377                    primary_key=stream.source_defined_primary_key,
378                    sync_mode=SyncMode.incremental,
379                )
380                for stream in self.discovered_catalog.streams
381                if stream.name in selected_streams
382            ],
383        )
384
385    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
386        """Return the JSON Schema spec for the specified stream name."""
387        catalog: AirbyteCatalog = self.discovered_catalog
388        found: list[AirbyteStream] = [
389            stream for stream in catalog.streams if stream.name == stream_name
390        ]
391
392        if len(found) == 0:
393            raise exc.PyAirbyteInputError(
394                message="Stream name does not exist in catalog.",
395                input_value=stream_name,
396            )
397
398        if len(found) > 1:
399            raise exc.PyAirbyteInternalError(
400                message="Duplicate streams found with the same name.",
401                context={
402                    "found_streams": found,
403                },
404            )
405
406        return found[0].json_schema
407
408    def get_records(
409        self,
410        stream: str,
411        *,
412        normalize_field_names: bool = False,
413        prune_undeclared_fields: bool = True,
414    ) -> LazyDataset:
415        """Read a stream from the connector.
416
417        Args:
418            stream: The name of the stream to read.
419            normalize_field_names: When `True`, field names will be normalized to lower case, with
420                special characters removed. This matches the behavior of PyAirbyte caches and most
421                Airbyte destinations.
422            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
423                which generally matches the behavior of PyAirbyte caches and most Airbyte
424                destinations, specifically when you expect the catalog may be stale. You can disable
425                this to keep all fields in the records.
426
427        This involves the following steps:
428        * Call discover to get the catalog
429        * Generate a configured catalog that syncs the given stream in full_refresh mode
430        * Write the configured catalog and the config to a temporary file
431        * execute the connector with read --config <config_file> --catalog <catalog_file>
432        * Listen to the messages and return the first AirbyteRecordMessages that come along.
433        * Make sure the subprocess is killed when the function returns.
434        """
435        discovered_catalog: AirbyteCatalog = self.discovered_catalog
436        configured_catalog = ConfiguredAirbyteCatalog(
437            streams=[
438                ConfiguredAirbyteStream(
439                    stream=s,
440                    sync_mode=SyncMode.full_refresh,
441                    destination_sync_mode=DestinationSyncMode.overwrite,
442                )
443                for s in discovered_catalog.streams
444                if s.name == stream
445            ],
446        )
447        if len(configured_catalog.streams) == 0:
448            raise exc.PyAirbyteInputError(
449                message="Requested stream does not exist.",
450                context={
451                    "stream": stream,
452                    "available_streams": self.get_available_streams(),
453                    "connector_name": self.name,
454                },
455            ) from KeyError(stream)
456
457        configured_stream = configured_catalog.streams[0]
458
459        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
460            yield from records
461
462        stream_record_handler = StreamRecordHandler(
463            json_schema=self.get_stream_json_schema(stream),
464            prune_extra_fields=prune_undeclared_fields,
465            normalize_keys=normalize_field_names,
466        )
467
468        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
469        progress_tracker = ProgressTracker(
470            ProgressStyle.PLAIN,
471            source=self,
472            cache=None,
473            destination=None,
474            expected_streams=[stream],
475        )
476
477        iterator: Iterator[dict[str, Any]] = (
478            StreamRecord.from_record_message(
479                record_message=record.record,
480                stream_record_handler=stream_record_handler,
481            )
482            for record in self._read_with_catalog(
483                catalog=configured_catalog,
484                progress_tracker=progress_tracker,
485            )
486            if record.record
487        )
488        progress_tracker.log_success()
489        return LazyDataset(
490            iterator,
491            stream_metadata=configured_stream,
492        )
493
494    def get_documents(
495        self,
496        stream: str,
497        title_property: str | None = None,
498        content_properties: list[str] | None = None,
499        metadata_properties: list[str] | None = None,
500        *,
501        render_metadata: bool = False,
502    ) -> Iterable[Document]:
503        """Read a stream from the connector and return the records as documents.
504
505        If metadata_properties is not set, all properties that are not content will be added to
506        the metadata.
507
508        If render_metadata is True, metadata will be rendered in the document, as well as the
509        the main content.
510        """
511        return self.get_records(stream).to_documents(
512            title_property=title_property,
513            content_properties=content_properties,
514            metadata_properties=metadata_properties,
515            render_metadata=render_metadata,
516        )
517
518    def _get_airbyte_message_iterator(
519        self,
520        *,
521        streams: Literal["*"] | list[str] | None = None,
522        state_provider: StateProviderBase | None = None,
523        progress_tracker: ProgressTracker,
524        force_full_refresh: bool = False,
525    ) -> AirbyteMessageIterator:
526        """Get an AirbyteMessageIterator for this source."""
527        return AirbyteMessageIterator(
528            self._read_with_catalog(
529                catalog=self.get_configured_catalog(streams=streams),
530                state=state_provider if not force_full_refresh else None,
531                progress_tracker=progress_tracker,
532            )
533        )
534
535    def _read_with_catalog(
536        self,
537        catalog: ConfiguredAirbyteCatalog,
538        progress_tracker: ProgressTracker,
539        state: StateProviderBase | None = None,
540    ) -> Generator[AirbyteMessage, None, None]:
541        """Call read on the connector.
542
543        This involves the following steps:
544        * Write the config to a temporary file
545        * execute the connector with read --config <config_file> --catalog <catalog_file>
546        * Listen to the messages and return the AirbyteRecordMessages that come along.
547        * Send out telemetry on the performed sync (with information about which source was used and
548          the type of the cache)
549        """
550        with as_temp_files(
551            [
552                self._config,
553                catalog.model_dump_json(),
554                state.to_state_input_file_text() if state else "[]",
555            ]
556        ) as [
557            config_file,
558            catalog_file,
559            state_file,
560        ]:
561            message_generator = self._execute(
562                [
563                    "read",
564                    "--config",
565                    config_file,
566                    "--catalog",
567                    catalog_file,
568                    "--state",
569                    state_file,
570                ],
571                progress_tracker=progress_tracker,
572            )
573            yield from progress_tracker.tally_records_read(message_generator)
574        progress_tracker.log_read_complete()
575
576    def _peek_airbyte_message(
577        self,
578        message: AirbyteMessage,
579        *,
580        raise_on_error: bool = True,
581    ) -> None:
582        """Process an Airbyte message.
583
584        This method handles reading Airbyte messages and taking action, if needed, based on the
585        message type. For instance, log messages are logged, records are tallied, and errors are
586        raised as exceptions if `raise_on_error` is True.
587
588        Raises:
589            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
590        """
591        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
592
593    def _log_incremental_streams(
594        self,
595        *,
596        incremental_streams: set[str] | None = None,
597    ) -> None:
598        """Log the streams which are using incremental sync mode."""
599        log_message = (
600            "The following streams are currently using incremental sync:\n"
601            f"{incremental_streams}\n"
602            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
603        )
604        print(log_message)
605
606    def read(
607        self,
608        cache: CacheBase | None = None,
609        *,
610        streams: str | list[str] | None = None,
611        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
612        force_full_refresh: bool = False,
613        skip_validation: bool = False,
614    ) -> ReadResult:
615        """Read from the connector and write to the cache.
616
617        Args:
618            cache: The cache to write to. If not set, a default cache will be used.
619            streams: Optional if already set. A list of stream names to select for reading. If set
620                to "*", all streams will be selected.
621            write_strategy: The strategy to use when writing to the cache. If a string, it must be
622                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
623                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
624                WriteStrategy.AUTO.
625            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
626                streams will be read in incremental mode if supported by the connector. This option
627                must be True when using the "replace" strategy.
628            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
629                running the connector. This can be helpful in debugging, when you want to send
630                configurations to the connector that otherwise might be rejected by JSON Schema
631                validation rules.
632        """
633        cache = cache or get_default_cache()
634        progress_tracker = ProgressTracker(
635            source=self,
636            cache=cache,
637            destination=None,
638            expected_streams=None,  # Will be set later
639        )
640
641        # Set up state provider if not in full refresh mode
642        if force_full_refresh:
643            state_provider: StateProviderBase | None = None
644        else:
645            state_provider = cache.get_state_provider(
646                source_name=self._name,
647            )
648        state_writer = cache.get_state_writer(source_name=self._name)
649
650        if streams:
651            self.select_streams(streams)
652
653        if not self._selected_stream_names:
654            raise exc.PyAirbyteNoStreamsSelectedError(
655                connector_name=self.name,
656                available_streams=self.get_available_streams(),
657            )
658
659        try:
660            result = self._read_to_cache(
661                cache=cache,
662                catalog_provider=CatalogProvider(self.configured_catalog),
663                stream_names=self._selected_stream_names,
664                state_provider=state_provider,
665                state_writer=state_writer,
666                write_strategy=write_strategy,
667                force_full_refresh=force_full_refresh,
668                skip_validation=skip_validation,
669                progress_tracker=progress_tracker,
670            )
671        except exc.PyAirbyteInternalError as ex:
672            progress_tracker.log_failure(exception=ex)
673            raise exc.AirbyteConnectorFailedError(
674                connector_name=self.name,
675                log_text=self._last_log_messages,
676            ) from ex
677        except Exception as ex:
678            progress_tracker.log_failure(exception=ex)
679            raise
680
681        progress_tracker.log_success()
682        return result
683
684    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
685        self,
686        cache: CacheBase,
687        *,
688        catalog_provider: CatalogProvider,
689        stream_names: list[str],
690        state_provider: StateProviderBase | None,
691        state_writer: StateWriterBase | None,
692        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
693        force_full_refresh: bool = False,
694        skip_validation: bool = False,
695        progress_tracker: ProgressTracker,
696    ) -> ReadResult:
697        """Internal read method."""
698        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
699            warnings.warn(
700                message=(
701                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
702                    "could result in data loss. "
703                    "To silence this warning, use the following: "
704                    'warnings.filterwarnings("ignore", '
705                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
706                ),
707                category=exc.PyAirbyteDataLossWarning,
708                stacklevel=1,
709            )
710        if isinstance(write_strategy, str):
711            try:
712                write_strategy = WriteStrategy(write_strategy)
713            except ValueError:
714                raise exc.PyAirbyteInputError(
715                    message="Invalid strategy",
716                    context={
717                        "write_strategy": write_strategy,
718                        "available_strategies": [s.value for s in WriteStrategy],
719                    },
720                ) from None
721
722        # Run optional validation step
723        if not skip_validation:
724            self.validate_config()
725
726        # Log incremental stream if incremental streams are known
727        if state_provider and state_provider.known_stream_names:
728            # Retrieve set of the known streams support which support incremental sync
729            incremental_streams = (
730                set(self._get_incremental_stream_names())
731                & state_provider.known_stream_names
732                & set(self.get_selected_streams())
733            )
734            if incremental_streams:
735                self._log_incremental_streams(incremental_streams=incremental_streams)
736
737        airbyte_message_iterator = AirbyteMessageIterator(
738            self._read_with_catalog(
739                catalog=catalog_provider.configured_catalog,
740                state=state_provider,
741                progress_tracker=progress_tracker,
742            )
743        )
744        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
745            stdin=airbyte_message_iterator,
746            catalog_provider=catalog_provider,
747            write_strategy=write_strategy,
748            state_writer=state_writer,
749            progress_tracker=progress_tracker,
750        )
751
752        # Flush the WAL, if applicable
753        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
754
755        return ReadResult(
756            source_name=self.name,
757            progress_tracker=progress_tracker,
758            processed_streams=stream_names,
759            cache=cache,
760        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False)
58    def __init__(
59        self,
60        executor: Executor,
61        name: str,
62        config: dict[str, Any] | None = None,
63        *,
64        config_change_callback: ConfigChangeCallback | None = None,
65        streams: str | list[str] | None = None,
66        validate: bool = False,
67    ) -> None:
68        """Initialize the source.
69
70        If config is provided, it will be validated against the spec if validate is True.
71        """
72        self._to_be_selected_streams: list[str] | str = []
73        """Used to hold selection criteria before catalog is known."""
74
75        super().__init__(
76            executor=executor,
77            name=name,
78            config=config,
79            config_change_callback=config_change_callback,
80            validate=validate,
81        )
82        self._config_dict: dict[str, Any] | None = None
83        self._last_log_messages: list[str] = []
84        self._discovered_catalog: AirbyteCatalog | None = None
85        self._selected_stream_names: list[str] = []
86        if config is not None:
87            self.set_config(config, validate=validate)
88        if streams is not None:
89            self.select_streams(streams)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
91    def set_streams(self, streams: list[str]) -> None:
92        """Deprecated. See select_streams()."""
93        warnings.warn(
94            "The 'set_streams' method is deprecated and will be removed in a future version. "
95            "Please use the 'select_streams' method instead.",
96            DeprecationWarning,
97            stacklevel=2,
98        )
99        self.select_streams(streams)

Deprecated. See select_streams().

def select_all_streams(self) -> None:
113    def select_all_streams(self) -> None:
114        """Select all streams.
115
116        This is a more streamlined equivalent to:
117        > source.select_streams(source.get_available_streams()).
118        """
119        if self._config_dict is None:
120            self._to_be_selected_streams = "*"
121            self._log_warning_preselected_stream(self._to_be_selected_streams)
122            return
123
124        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
126    def select_streams(self, streams: str | list[str]) -> None:
127        """Select the stream names that should be read from the connector.
128
129        Args:
130            streams: A list of stream names to select. If set to "*", all streams will be selected.
131
132        Currently, if this is not set, all streams will be read.
133        """
134        if self._config_dict is None:
135            self._to_be_selected_streams = streams
136            self._log_warning_preselected_stream(streams)
137            return
138
139        if streams == "*":
140            self.select_all_streams()
141            return
142
143        if isinstance(streams, str):
144            # If a single stream is provided, convert it to a one-item list
145            streams = [streams]
146
147        available_streams = self.get_available_streams()
148        for stream in streams:
149            if stream not in available_streams:
150                raise exc.AirbyteStreamNotFoundError(
151                    stream_name=stream,
152                    connector_name=self.name,
153                    available_streams=available_streams,
154                )
155        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
157    def get_selected_streams(self) -> list[str]:
158        """Get the selected streams.
159
160        If no streams are selected, return an empty list.
161        """
162        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
164    def set_config(
165        self,
166        config: dict[str, Any],
167        *,
168        validate: bool = True,
169    ) -> None:
170        """Set the config for the connector.
171
172        If validate is True, raise an exception if the config fails validation.
173
174        If validate is False, validation will be deferred until check() or validate_config()
175        is called.
176        """
177        if validate:
178            self.validate_config(config)
179
180        self._config_dict = config
181
182        if self._to_be_selected_streams:
183            self.select_streams(self._to_be_selected_streams)
184            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_config(self) -> dict[str, typing.Any]:
186    def get_config(self) -> dict[str, Any]:
187        """Get the config for the connector."""
188        return self._config

Get the config for the connector.

def get_available_streams(self) -> list[str]:
217    def get_available_streams(self) -> list[str]:
218        """Get the available streams from the spec."""
219        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
251    @property
252    def config_spec(self) -> dict[str, Any]:
253        """Generate a configuration spec for this connector, as a JSON Schema definition.
254
255        This function generates a JSON Schema dictionary with configuration specs for the
256        current connector, as a dictionary.
257
258        Returns:
259            dict: The JSON Schema configuration spec as a dictionary.
260        """
261        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
263    def print_config_spec(
264        self,
265        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
266        *,
267        output_file: Path | str | None = None,
268    ) -> None:
269        """Print the configuration spec for this connector.
270
271        Args:
272            format: The format to print the spec in. Must be "yaml" or "json".
273            output_file: Optional. If set, the spec will be written to the given file path.
274                Otherwise, it will be printed to the console.
275        """
276        if format not in {"yaml", "json"}:
277            raise exc.PyAirbyteInputError(
278                message="Invalid format. Expected 'yaml' or 'json'",
279                input_value=format,
280            )
281        if isinstance(output_file, str):
282            output_file = Path(output_file)
283
284        if format == "yaml":
285            content = yaml.dump(self.config_spec, indent=2)
286        elif format == "json":
287            content = json.dumps(self.config_spec, indent=2)
288
289        if output_file:
290            output_file.write_text(content)
291            return
292
293        syntax_highlighted = Syntax(content, format)
294        print(syntax_highlighted)

Print the configuration spec for this connector.

Arguments:
  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
311    @property
312    def docs_url(self) -> str:
313        """Get the URL to the connector's documentation."""
314        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
315            "source-", ""
316        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
318    @property
319    def discovered_catalog(self) -> AirbyteCatalog:
320        """Get the raw catalog for the given streams.
321
322        If the catalog is not yet known, we call discover to get it.
323        """
324        if self._discovered_catalog is None:
325            self._discovered_catalog = self._discover()
326
327        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
329    @property
330    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
331        """Get the configured catalog for the given streams.
332
333        If the raw catalog is not yet known, we call discover to get it.
334
335        If no specific streams are selected, we return a catalog that syncs all available streams.
336
337        TODO: We should consider disabling by default the streams that the connector would
338        disable by default. (For instance, streams that require a premium license are sometimes
339        disabled by default within the connector.)
340        """
341        # Ensure discovered catalog is cached before we start
342        _ = self.discovered_catalog
343
344        # Filter for selected streams if set, otherwise use all available streams:
345        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
346        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
348    def get_configured_catalog(
349        self,
350        streams: Literal["*"] | list[str] | None = None,
351    ) -> ConfiguredAirbyteCatalog:
352        """Get a configured catalog for the given streams.
353
354        If no streams are provided, the selected streams will be used. If no streams are selected,
355        all available streams will be used.
356
357        If '*' is provided, all available streams will be used.
358        """
359        selected_streams: list[str] = []
360        if streams is None:
361            selected_streams = self._selected_stream_names or self.get_available_streams()
362        elif streams == "*":
363            selected_streams = self.get_available_streams()
364        elif isinstance(streams, list):
365            selected_streams = streams
366        else:
367            raise exc.PyAirbyteInputError(
368                message="Invalid streams argument.",
369                input_value=streams,
370            )
371
372        return ConfiguredAirbyteCatalog(
373            streams=[
374                ConfiguredAirbyteStream(
375                    stream=stream,
376                    destination_sync_mode=DestinationSyncMode.overwrite,
377                    primary_key=stream.source_defined_primary_key,
378                    sync_mode=SyncMode.incremental,
379                )
380                for stream in self.discovered_catalog.streams
381                if stream.name in selected_streams
382            ],
383        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
385    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
386        """Return the JSON Schema spec for the specified stream name."""
387        catalog: AirbyteCatalog = self.discovered_catalog
388        found: list[AirbyteStream] = [
389            stream for stream in catalog.streams if stream.name == stream_name
390        ]
391
392        if len(found) == 0:
393            raise exc.PyAirbyteInputError(
394                message="Stream name does not exist in catalog.",
395                input_value=stream_name,
396            )
397
398        if len(found) > 1:
399            raise exc.PyAirbyteInternalError(
400                message="Duplicate streams found with the same name.",
401                context={
402                    "found_streams": found,
403                },
404            )
405
406        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
408    def get_records(
409        self,
410        stream: str,
411        *,
412        normalize_field_names: bool = False,
413        prune_undeclared_fields: bool = True,
414    ) -> LazyDataset:
415        """Read a stream from the connector.
416
417        Args:
418            stream: The name of the stream to read.
419            normalize_field_names: When `True`, field names will be normalized to lower case, with
420                special characters removed. This matches the behavior of PyAirbyte caches and most
421                Airbyte destinations.
422            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
423                which generally matches the behavior of PyAirbyte caches and most Airbyte
424                destinations, specifically when you expect the catalog may be stale. You can disable
425                this to keep all fields in the records.
426
427        This involves the following steps:
428        * Call discover to get the catalog
429        * Generate a configured catalog that syncs the given stream in full_refresh mode
430        * Write the configured catalog and the config to a temporary file
431        * execute the connector with read --config <config_file> --catalog <catalog_file>
432        * Listen to the messages and return the first AirbyteRecordMessages that come along.
433        * Make sure the subprocess is killed when the function returns.
434        """
435        discovered_catalog: AirbyteCatalog = self.discovered_catalog
436        configured_catalog = ConfiguredAirbyteCatalog(
437            streams=[
438                ConfiguredAirbyteStream(
439                    stream=s,
440                    sync_mode=SyncMode.full_refresh,
441                    destination_sync_mode=DestinationSyncMode.overwrite,
442                )
443                for s in discovered_catalog.streams
444                if s.name == stream
445            ],
446        )
447        if len(configured_catalog.streams) == 0:
448            raise exc.PyAirbyteInputError(
449                message="Requested stream does not exist.",
450                context={
451                    "stream": stream,
452                    "available_streams": self.get_available_streams(),
453                    "connector_name": self.name,
454                },
455            ) from KeyError(stream)
456
457        configured_stream = configured_catalog.streams[0]
458
459        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
460            yield from records
461
462        stream_record_handler = StreamRecordHandler(
463            json_schema=self.get_stream_json_schema(stream),
464            prune_extra_fields=prune_undeclared_fields,
465            normalize_keys=normalize_field_names,
466        )
467
468        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
469        progress_tracker = ProgressTracker(
470            ProgressStyle.PLAIN,
471            source=self,
472            cache=None,
473            destination=None,
474            expected_streams=[stream],
475        )
476
477        iterator: Iterator[dict[str, Any]] = (
478            StreamRecord.from_record_message(
479                record_message=record.record,
480                stream_record_handler=stream_record_handler,
481            )
482            for record in self._read_with_catalog(
483                catalog=configured_catalog,
484                progress_tracker=progress_tracker,
485            )
486            if record.record
487        )
488        progress_tracker.log_success()
489        return LazyDataset(
490            iterator,
491            stream_metadata=configured_stream,
492        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
494    def get_documents(
495        self,
496        stream: str,
497        title_property: str | None = None,
498        content_properties: list[str] | None = None,
499        metadata_properties: list[str] | None = None,
500        *,
501        render_metadata: bool = False,
502    ) -> Iterable[Document]:
503        """Read a stream from the connector and return the records as documents.
504
505        If metadata_properties is not set, all properties that are not content will be added to
506        the metadata.
507
508        If render_metadata is True, metadata will be rendered in the document, as well as the
509        the main content.
510        """
511        return self.get_records(stream).to_documents(
512            title_property=title_property,
513            content_properties=content_properties,
514            metadata_properties=metadata_properties,
515            render_metadata=render_metadata,
516        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
606    def read(
607        self,
608        cache: CacheBase | None = None,
609        *,
610        streams: str | list[str] | None = None,
611        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
612        force_full_refresh: bool = False,
613        skip_validation: bool = False,
614    ) -> ReadResult:
615        """Read from the connector and write to the cache.
616
617        Args:
618            cache: The cache to write to. If not set, a default cache will be used.
619            streams: Optional if already set. A list of stream names to select for reading. If set
620                to "*", all streams will be selected.
621            write_strategy: The strategy to use when writing to the cache. If a string, it must be
622                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
623                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
624                WriteStrategy.AUTO.
625            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
626                streams will be read in incremental mode if supported by the connector. This option
627                must be True when using the "replace" strategy.
628            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
629                running the connector. This can be helpful in debugging, when you want to send
630                configurations to the connector that otherwise might be rejected by JSON Schema
631                validation rules.
632        """
633        cache = cache or get_default_cache()
634        progress_tracker = ProgressTracker(
635            source=self,
636            cache=cache,
637            destination=None,
638            expected_streams=None,  # Will be set later
639        )
640
641        # Set up state provider if not in full refresh mode
642        if force_full_refresh:
643            state_provider: StateProviderBase | None = None
644        else:
645            state_provider = cache.get_state_provider(
646                source_name=self._name,
647            )
648        state_writer = cache.get_state_writer(source_name=self._name)
649
650        if streams:
651            self.select_streams(streams)
652
653        if not self._selected_stream_names:
654            raise exc.PyAirbyteNoStreamsSelectedError(
655                connector_name=self.name,
656                available_streams=self.get_available_streams(),
657            )
658
659        try:
660            result = self._read_to_cache(
661                cache=cache,
662                catalog_provider=CatalogProvider(self.configured_catalog),
663                stream_names=self._selected_stream_names,
664                state_provider=state_provider,
665                state_writer=state_writer,
666                write_strategy=write_strategy,
667                force_full_refresh=force_full_refresh,
668                skip_validation=skip_validation,
669                progress_tracker=progress_tracker,
670            )
671        except exc.PyAirbyteInternalError as ex:
672            progress_tracker.log_failure(exception=ex)
673            raise exc.AirbyteConnectorFailedError(
674                connector_name=self.name,
675                log_text=self._last_log_messages,
676            ) from ex
677        except Exception as ex:
678            progress_tracker.log_failure(exception=ex)
679            raise
680
681        progress_tracker.log_success()
682        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
config_hash
validate_config
connector_version
check
install
uninstall