airbyte.sources.base

Base class implementation for sources.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Base class implementation for sources."""
  3
  4from __future__ import annotations
  5
  6import sys
  7import warnings
  8from typing import TYPE_CHECKING, Any, Literal
  9
 10import yaml
 11from rich import print  # noqa: A004  # Allow shadowing the built-in
 12
 13from airbyte_protocol.models import (
 14    AirbyteCatalog,
 15    AirbyteMessage,
 16    ConfiguredAirbyteCatalog,
 17    ConfiguredAirbyteStream,
 18    DestinationSyncMode,
 19    SyncMode,
 20    Type,
 21)
 22
 23from airbyte import exceptions as exc
 24from airbyte._connector_base import ConnectorBase
 25from airbyte._message_iterators import AirbyteMessageIterator
 26from airbyte._util.temp_files import as_temp_files
 27from airbyte.caches.util import get_default_cache
 28from airbyte.datasets._lazy import LazyDataset
 29from airbyte.progress import ProgressStyle, ProgressTracker
 30from airbyte.records import StreamRecord, StreamRecordHandler
 31from airbyte.results import ReadResult
 32from airbyte.shared.catalog_providers import CatalogProvider
 33from airbyte.strategies import WriteStrategy
 34
 35
 36if TYPE_CHECKING:
 37    from collections.abc import Generator, Iterable, Iterator
 38
 39    from airbyte_cdk import ConnectorSpecification
 40    from airbyte_protocol.models import AirbyteStream
 41
 42    from airbyte._executors.base import Executor
 43    from airbyte.caches import CacheBase
 44    from airbyte.callbacks import ConfigChangeCallback
 45    from airbyte.documents import Document
 46    from airbyte.shared.state_providers import StateProviderBase
 47    from airbyte.shared.state_writers import StateWriterBase
 48
 49
 50class Source(ConnectorBase):
 51    """A class representing a source that can be called."""
 52
 53    connector_type = "source"
 54
 55    def __init__(
 56        self,
 57        executor: Executor,
 58        name: str,
 59        config: dict[str, Any] | None = None,
 60        *,
 61        config_change_callback: ConfigChangeCallback | None = None,
 62        streams: str | list[str] | None = None,
 63        validate: bool = False,
 64        cursor_key_overrides: dict[str, str] | None = None,
 65        primary_key_overrides: dict[str, str | list[str]] | None = None,
 66    ) -> None:
 67        """Initialize the source.
 68
 69        If config is provided, it will be validated against the spec if validate is True.
 70        """
 71        self._to_be_selected_streams: list[str] | str = []
 72        """Used to hold selection criteria before catalog is known."""
 73
 74        super().__init__(
 75            executor=executor,
 76            name=name,
 77            config=config,
 78            config_change_callback=config_change_callback,
 79            validate=validate,
 80        )
 81        self._config_dict: dict[str, Any] | None = None
 82        self._last_log_messages: list[str] = []
 83        self._discovered_catalog: AirbyteCatalog | None = None
 84        self._selected_stream_names: list[str] = []
 85
 86        self._cursor_key_overrides: dict[str, str] = {}
 87        """A mapping of lower-cased stream names to cursor key overrides."""
 88
 89        self._primary_key_overrides: dict[str, list[str]] = {}
 90        """A mapping of lower-cased stream names to primary key overrides."""
 91
 92        if config is not None:
 93            self.set_config(config, validate=validate)
 94        if streams is not None:
 95            self.select_streams(streams)
 96        if cursor_key_overrides is not None:
 97            self.set_cursor_keys(**cursor_key_overrides)
 98        if primary_key_overrides is not None:
 99            self.set_primary_keys(**primary_key_overrides)
100
101    def set_streams(self, streams: list[str]) -> None:
102        """Deprecated. See select_streams()."""
103        warnings.warn(
104            "The 'set_streams' method is deprecated and will be removed in a future version. "
105            "Please use the 'select_streams' method instead.",
106            DeprecationWarning,
107            stacklevel=2,
108        )
109        self.select_streams(streams)
110
111    def set_cursor_key(
112        self,
113        stream_name: str,
114        cursor_key: str,
115    ) -> None:
116        """Set the cursor for a single stream.
117
118        Note:
119        - This does not unset previously set cursors.
120        - The cursor key must be a single field name.
121        - Not all streams support custom cursors. If a stream does not support custom cursors,
122          the override may be ignored.
123        - Stream names are case insensitive, while field names are case sensitive.
124        - Stream names are not validated by PyAirbyte. If the stream name
125          does not exist in the catalog, the override may be ignored.
126        """
127        self._cursor_key_overrides[stream_name.lower()] = cursor_key
128
129    def set_cursor_keys(
130        self,
131        **kwargs: str,
132    ) -> None:
133        """Override the cursor key for one or more streams.
134
135        Usage:
136            source.set_cursor_keys(
137                stream1="cursor1",
138                stream2="cursor2",
139            )
140
141        Note:
142        - This does not unset previously set cursors.
143        - The cursor key must be a single field name.
144        - Not all streams support custom cursors. If a stream does not support custom cursors,
145          the override may be ignored.
146        - Stream names are case insensitive, while field names are case sensitive.
147        - Stream names are not validated by PyAirbyte. If the stream name
148          does not exist in the catalog, the override may be ignored.
149        """
150        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
151
152    def set_primary_key(
153        self,
154        stream_name: str,
155        primary_key: str | list[str],
156    ) -> None:
157        """Set the primary key for a single stream.
158
159        Note:
160        - This does not unset previously set primary keys.
161        - The primary key must be a single field name or a list of field names.
162        - Not all streams support overriding primary keys. If a stream does not support overriding
163          primary keys, the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._primary_key_overrides[stream_name.lower()] = (
169            primary_key if isinstance(primary_key, list) else [primary_key]
170        )
171
172    def set_primary_keys(
173        self,
174        **kwargs: str | list[str],
175    ) -> None:
176        """Override the primary keys for one or more streams.
177
178        This does not unset previously set primary keys.
179
180        Usage:
181            source.set_primary_keys(
182                stream1="pk1",
183                stream2=["pk1", "pk2"],
184            )
185
186        Note:
187        - This does not unset previously set primary keys.
188        - The primary key must be a single field name or a list of field names.
189        - Not all streams support overriding primary keys. If a stream does not support overriding
190          primary keys, the override may be ignored.
191        - Stream names are case insensitive, while field names are case sensitive.
192        - Stream names are not validated by PyAirbyte. If the stream name
193          does not exist in the catalog, the override may be ignored.
194        """
195        self._primary_key_overrides.update(
196            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
197        )
198
199    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
200        """Logs a warning message indicating stream selection which are not selected yet."""
201        if streams == "*":
202            print(
203                "Warning: Config is not set yet. All streams will be selected after config is set.",
204                file=sys.stderr,
205            )
206        else:
207            print(
208                "Warning: Config is not set yet. "
209                f"Streams to be selected after config is set: {streams}",
210                file=sys.stderr,
211            )
212
213    def select_all_streams(self) -> None:
214        """Select all streams.
215
216        This is a more streamlined equivalent to:
217        > source.select_streams(source.get_available_streams()).
218        """
219        if self._config_dict is None:
220            self._to_be_selected_streams = "*"
221            self._log_warning_preselected_stream(self._to_be_selected_streams)
222            return
223
224        self._selected_stream_names = self.get_available_streams()
225
226    def select_streams(self, streams: str | list[str]) -> None:
227        """Select the stream names that should be read from the connector.
228
229        Args:
230            streams: A list of stream names to select. If set to "*", all streams will be selected.
231
232        Currently, if this is not set, all streams will be read.
233        """
234        if self._config_dict is None:
235            self._to_be_selected_streams = streams
236            self._log_warning_preselected_stream(streams)
237            return
238
239        if streams == "*":
240            self.select_all_streams()
241            return
242
243        if isinstance(streams, str):
244            # If a single stream is provided, convert it to a one-item list
245            streams = [streams]
246
247        available_streams = self.get_available_streams()
248        for stream in streams:
249            if stream not in available_streams:
250                raise exc.AirbyteStreamNotFoundError(
251                    stream_name=stream,
252                    connector_name=self.name,
253                    available_streams=available_streams,
254                )
255        self._selected_stream_names = streams
256
257    def get_selected_streams(self) -> list[str]:
258        """Get the selected streams.
259
260        If no streams are selected, return an empty list.
261        """
262        return self._selected_stream_names
263
264    def set_config(
265        self,
266        config: dict[str, Any],
267        *,
268        validate: bool = True,
269    ) -> None:
270        """Set the config for the connector.
271
272        If validate is True, raise an exception if the config fails validation.
273
274        If validate is False, validation will be deferred until check() or validate_config()
275        is called.
276        """
277        if validate:
278            self.validate_config(config)
279
280        self._config_dict = config
281
282        if self._to_be_selected_streams:
283            self.select_streams(self._to_be_selected_streams)
284            self._to_be_selected_streams = []
285
286    def _discover(self) -> AirbyteCatalog:
287        """Call discover on the connector.
288
289        This involves the following steps:
290        - Write the config to a temporary file
291        - execute the connector with discover --config <config_file>
292        - Listen to the messages and return the first AirbyteCatalog that comes along.
293        - Make sure the subprocess is killed when the function returns.
294        """
295        with as_temp_files([self._hydrated_config]) as [config_file]:
296            for msg in self._execute(["discover", "--config", config_file]):
297                if msg.type == Type.CATALOG and msg.catalog:
298                    return msg.catalog
299            raise exc.AirbyteConnectorMissingCatalogError(
300                connector_name=self.name,
301                log_text=self._last_log_messages,
302            )
303
304    def get_available_streams(self) -> list[str]:
305        """Get the available streams from the spec."""
306        return [s.name for s in self.discovered_catalog.streams]
307
308    def _get_incremental_stream_names(self) -> list[str]:
309        """Get the name of streams that support incremental sync."""
310        return [
311            stream.name
312            for stream in self.discovered_catalog.streams
313            if SyncMode.incremental in stream.supported_sync_modes
314        ]
315
316    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
317        """Call spec on the connector.
318
319        This involves the following steps:
320        * execute the connector with spec
321        * Listen to the messages and return the first AirbyteCatalog that comes along.
322        * Make sure the subprocess is killed when the function returns.
323        """
324        if force_refresh or self._spec is None:
325            for msg in self._execute(["spec"]):
326                if msg.type == Type.SPEC and msg.spec:
327                    self._spec = msg.spec
328                    break
329
330        if self._spec:
331            return self._spec
332
333        raise exc.AirbyteConnectorMissingSpecError(
334            connector_name=self.name,
335            log_text=self._last_log_messages,
336        )
337
338    @property
339    def config_spec(self) -> dict[str, Any]:
340        """Generate a configuration spec for this connector, as a JSON Schema definition.
341
342        This function generates a JSON Schema dictionary with configuration specs for the
343        current connector, as a dictionary.
344
345        Returns:
346            dict: The JSON Schema configuration spec as a dictionary.
347        """
348        return self._get_spec(force_refresh=True).connectionSpecification
349
350    @property
351    def _yaml_spec(self) -> str:
352        """Get the spec as a yaml string.
353
354        For now, the primary use case is for writing and debugging a valid config for a source.
355
356        This is private for now because we probably want better polish before exposing this
357        as a stable interface. This will also get easier when we have docs links with this info
358        for each connector.
359        """
360        spec_obj: ConnectorSpecification = self._get_spec()
361        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
362        # convert to a yaml string
363        return yaml.dump(spec_dict)
364
365    @property
366    def docs_url(self) -> str:
367        """Get the URL to the connector's documentation."""
368        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
369            "source-", ""
370        )
371
372    @property
373    def discovered_catalog(self) -> AirbyteCatalog:
374        """Get the raw catalog for the given streams.
375
376        If the catalog is not yet known, we call discover to get it.
377        """
378        if self._discovered_catalog is None:
379            self._discovered_catalog = self._discover()
380
381        return self._discovered_catalog
382
383    @property
384    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
385        """Get the configured catalog for the given streams.
386
387        If the raw catalog is not yet known, we call discover to get it.
388
389        If no specific streams are selected, we return a catalog that syncs all available streams.
390
391        TODO: We should consider disabling by default the streams that the connector would
392        disable by default. (For instance, streams that require a premium license are sometimes
393        disabled by default within the connector.)
394        """
395        # Ensure discovered catalog is cached before we start
396        _ = self.discovered_catalog
397
398        # Filter for selected streams if set, otherwise use all available streams:
399        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
400        return self.get_configured_catalog(streams=streams_filter)
401
402    def get_configured_catalog(
403        self,
404        streams: Literal["*"] | list[str] | None = None,
405    ) -> ConfiguredAirbyteCatalog:
406        """Get a configured catalog for the given streams.
407
408        If no streams are provided, the selected streams will be used. If no streams are selected,
409        all available streams will be used.
410
411        If '*' is provided, all available streams will be used.
412        """
413        selected_streams: list[str] = []
414        if streams is None:
415            selected_streams = self._selected_stream_names or self.get_available_streams()
416        elif streams == "*":
417            selected_streams = self.get_available_streams()
418        elif isinstance(streams, list):
419            selected_streams = streams
420        else:
421            raise exc.PyAirbyteInputError(
422                message="Invalid streams argument.",
423                input_value=streams,
424            )
425
426        return ConfiguredAirbyteCatalog(
427            streams=[
428                ConfiguredAirbyteStream(
429                    stream=stream,
430                    destination_sync_mode=DestinationSyncMode.overwrite,
431                    sync_mode=SyncMode.incremental,
432                    primary_key=(
433                        [self._primary_key_overrides[stream.name.lower()]]
434                        if stream.name.lower() in self._primary_key_overrides
435                        else stream.source_defined_primary_key
436                    ),
437                    cursor_field=(
438                        [self._cursor_key_overrides[stream.name.lower()]]
439                        if stream.name.lower() in self._cursor_key_overrides
440                        else stream.default_cursor_field
441                    ),
442                    # These are unused in the current implementation:
443                    generation_id=None,
444                    minimum_generation_id=None,
445                    sync_id=None,
446                )
447                for stream in self.discovered_catalog.streams
448                if stream.name in selected_streams
449            ],
450        )
451
452    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
453        """Return the JSON Schema spec for the specified stream name."""
454        catalog: AirbyteCatalog = self.discovered_catalog
455        found: list[AirbyteStream] = [
456            stream for stream in catalog.streams if stream.name == stream_name
457        ]
458
459        if len(found) == 0:
460            raise exc.PyAirbyteInputError(
461                message="Stream name does not exist in catalog.",
462                input_value=stream_name,
463            )
464
465        if len(found) > 1:
466            raise exc.PyAirbyteInternalError(
467                message="Duplicate streams found with the same name.",
468                context={
469                    "found_streams": found,
470                },
471            )
472
473        return found[0].json_schema
474
475    def get_records(
476        self,
477        stream: str,
478        *,
479        normalize_field_names: bool = False,
480        prune_undeclared_fields: bool = True,
481    ) -> LazyDataset:
482        """Read a stream from the connector.
483
484        Args:
485            stream: The name of the stream to read.
486            normalize_field_names: When `True`, field names will be normalized to lower case, with
487                special characters removed. This matches the behavior of PyAirbyte caches and most
488                Airbyte destinations.
489            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
490                which generally matches the behavior of PyAirbyte caches and most Airbyte
491                destinations, specifically when you expect the catalog may be stale. You can disable
492                this to keep all fields in the records.
493
494        This involves the following steps:
495        * Call discover to get the catalog
496        * Generate a configured catalog that syncs the given stream in full_refresh mode
497        * Write the configured catalog and the config to a temporary file
498        * execute the connector with read --config <config_file> --catalog <catalog_file>
499        * Listen to the messages and return the first AirbyteRecordMessages that come along.
500        * Make sure the subprocess is killed when the function returns.
501        """
502        configured_catalog = self.get_configured_catalog(streams=[stream])
503        if len(configured_catalog.streams) == 0:
504            raise exc.PyAirbyteInputError(
505                message="Requested stream does not exist.",
506                context={
507                    "stream": stream,
508                    "available_streams": self.get_available_streams(),
509                    "connector_name": self.name,
510                },
511            ) from KeyError(stream)
512
513        configured_stream = configured_catalog.streams[0]
514
515        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
516            yield from records
517
518        stream_record_handler = StreamRecordHandler(
519            json_schema=self.get_stream_json_schema(stream),
520            prune_extra_fields=prune_undeclared_fields,
521            normalize_keys=normalize_field_names,
522        )
523
524        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
525        progress_tracker = ProgressTracker(
526            ProgressStyle.PLAIN,
527            source=self,
528            cache=None,
529            destination=None,
530            expected_streams=[stream],
531        )
532
533        iterator: Iterator[dict[str, Any]] = (
534            StreamRecord.from_record_message(
535                record_message=record.record,
536                stream_record_handler=stream_record_handler,
537            )
538            for record in self._read_with_catalog(
539                catalog=configured_catalog,
540                progress_tracker=progress_tracker,
541            )
542            if record.record
543        )
544        progress_tracker.log_success()
545        return LazyDataset(
546            iterator,
547            stream_metadata=configured_stream,
548        )
549
550    def get_documents(
551        self,
552        stream: str,
553        title_property: str | None = None,
554        content_properties: list[str] | None = None,
555        metadata_properties: list[str] | None = None,
556        *,
557        render_metadata: bool = False,
558    ) -> Iterable[Document]:
559        """Read a stream from the connector and return the records as documents.
560
561        If metadata_properties is not set, all properties that are not content will be added to
562        the metadata.
563
564        If render_metadata is True, metadata will be rendered in the document, as well as the
565        the main content.
566        """
567        return self.get_records(stream).to_documents(
568            title_property=title_property,
569            content_properties=content_properties,
570            metadata_properties=metadata_properties,
571            render_metadata=render_metadata,
572        )
573
574    def _get_airbyte_message_iterator(
575        self,
576        *,
577        streams: Literal["*"] | list[str] | None = None,
578        state_provider: StateProviderBase | None = None,
579        progress_tracker: ProgressTracker,
580        force_full_refresh: bool = False,
581    ) -> AirbyteMessageIterator:
582        """Get an AirbyteMessageIterator for this source."""
583        return AirbyteMessageIterator(
584            self._read_with_catalog(
585                catalog=self.get_configured_catalog(streams=streams),
586                state=state_provider if not force_full_refresh else None,
587                progress_tracker=progress_tracker,
588            )
589        )
590
591    def _read_with_catalog(
592        self,
593        catalog: ConfiguredAirbyteCatalog,
594        progress_tracker: ProgressTracker,
595        state: StateProviderBase | None = None,
596    ) -> Generator[AirbyteMessage, None, None]:
597        """Call read on the connector.
598
599        This involves the following steps:
600        * Write the config to a temporary file
601        * execute the connector with read --config <config_file> --catalog <catalog_file>
602        * Listen to the messages and return the AirbyteRecordMessages that come along.
603        * Send out telemetry on the performed sync (with information about which source was used and
604          the type of the cache)
605        """
606        with as_temp_files(
607            [
608                self._hydrated_config,
609                catalog.model_dump_json(),
610                state.to_state_input_file_text() if state else "[]",
611            ]
612        ) as [
613            config_file,
614            catalog_file,
615            state_file,
616        ]:
617            message_generator = self._execute(
618                [
619                    "read",
620                    "--config",
621                    config_file,
622                    "--catalog",
623                    catalog_file,
624                    "--state",
625                    state_file,
626                ],
627                progress_tracker=progress_tracker,
628            )
629            yield from progress_tracker.tally_records_read(message_generator)
630        progress_tracker.log_read_complete()
631
632    def _peek_airbyte_message(
633        self,
634        message: AirbyteMessage,
635        *,
636        raise_on_error: bool = True,
637    ) -> None:
638        """Process an Airbyte message.
639
640        This method handles reading Airbyte messages and taking action, if needed, based on the
641        message type. For instance, log messages are logged, records are tallied, and errors are
642        raised as exceptions if `raise_on_error` is True.
643
644        Raises:
645            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
646        """
647        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
648
649    def _log_incremental_streams(
650        self,
651        *,
652        incremental_streams: set[str] | None = None,
653    ) -> None:
654        """Log the streams which are using incremental sync mode."""
655        log_message = (
656            "The following streams are currently using incremental sync:\n"
657            f"{incremental_streams}\n"
658            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
659        )
660        print(log_message, file=sys.stderr)
661
662    def read(
663        self,
664        cache: CacheBase | None = None,
665        *,
666        streams: str | list[str] | None = None,
667        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
668        force_full_refresh: bool = False,
669        skip_validation: bool = False,
670    ) -> ReadResult:
671        """Read from the connector and write to the cache.
672
673        Args:
674            cache: The cache to write to. If not set, a default cache will be used.
675            streams: Optional if already set. A list of stream names to select for reading. If set
676                to "*", all streams will be selected.
677            write_strategy: The strategy to use when writing to the cache. If a string, it must be
678                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
679                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
680                WriteStrategy.AUTO.
681            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
682                streams will be read in incremental mode if supported by the connector. This option
683                must be True when using the "replace" strategy.
684            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
685                running the connector. This can be helpful in debugging, when you want to send
686                configurations to the connector that otherwise might be rejected by JSON Schema
687                validation rules.
688        """
689        cache = cache or get_default_cache()
690        progress_tracker = ProgressTracker(
691            source=self,
692            cache=cache,
693            destination=None,
694            expected_streams=None,  # Will be set later
695        )
696
697        # Set up state provider if not in full refresh mode
698        if force_full_refresh:
699            state_provider: StateProviderBase | None = None
700        else:
701            state_provider = cache.get_state_provider(
702                source_name=self._name,
703            )
704        state_writer = cache.get_state_writer(source_name=self._name)
705
706        if streams:
707            self.select_streams(streams)
708
709        if not self._selected_stream_names:
710            raise exc.PyAirbyteNoStreamsSelectedError(
711                connector_name=self.name,
712                available_streams=self.get_available_streams(),
713            )
714
715        try:
716            result = self._read_to_cache(
717                cache=cache,
718                catalog_provider=CatalogProvider(self.configured_catalog),
719                stream_names=self._selected_stream_names,
720                state_provider=state_provider,
721                state_writer=state_writer,
722                write_strategy=write_strategy,
723                force_full_refresh=force_full_refresh,
724                skip_validation=skip_validation,
725                progress_tracker=progress_tracker,
726            )
727        except exc.PyAirbyteInternalError as ex:
728            progress_tracker.log_failure(exception=ex)
729            raise exc.AirbyteConnectorFailedError(
730                connector_name=self.name,
731                log_text=self._last_log_messages,
732            ) from ex
733        except Exception as ex:
734            progress_tracker.log_failure(exception=ex)
735            raise
736
737        progress_tracker.log_success()
738        return result
739
740    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
741        self,
742        cache: CacheBase,
743        *,
744        catalog_provider: CatalogProvider,
745        stream_names: list[str],
746        state_provider: StateProviderBase | None,
747        state_writer: StateWriterBase | None,
748        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
749        force_full_refresh: bool = False,
750        skip_validation: bool = False,
751        progress_tracker: ProgressTracker,
752    ) -> ReadResult:
753        """Internal read method."""
754        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
755            warnings.warn(
756                message=(
757                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
758                    "could result in data loss. "
759                    "To silence this warning, use the following: "
760                    'warnings.filterwarnings("ignore", '
761                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
762                ),
763                category=exc.PyAirbyteDataLossWarning,
764                stacklevel=1,
765            )
766        if isinstance(write_strategy, str):
767            try:
768                write_strategy = WriteStrategy(write_strategy)
769            except ValueError:
770                raise exc.PyAirbyteInputError(
771                    message="Invalid strategy",
772                    context={
773                        "write_strategy": write_strategy,
774                        "available_strategies": [s.value for s in WriteStrategy],
775                    },
776                ) from None
777
778        # Run optional validation step
779        if not skip_validation:
780            self.validate_config()
781
782        # Log incremental stream if incremental streams are known
783        if state_provider and state_provider.known_stream_names:
784            # Retrieve set of the known streams support which support incremental sync
785            incremental_streams = (
786                set(self._get_incremental_stream_names())
787                & state_provider.known_stream_names
788                & set(self.get_selected_streams())
789            )
790            if incremental_streams:
791                self._log_incremental_streams(incremental_streams=incremental_streams)
792
793        airbyte_message_iterator = AirbyteMessageIterator(
794            self._read_with_catalog(
795                catalog=catalog_provider.configured_catalog,
796                state=state_provider,
797                progress_tracker=progress_tracker,
798            )
799        )
800        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
801            stdin=airbyte_message_iterator,
802            catalog_provider=catalog_provider,
803            write_strategy=write_strategy,
804            state_writer=state_writer,
805            progress_tracker=progress_tracker,
806        )
807
808        # Flush the WAL, if applicable
809        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
810
811        return ReadResult(
812            source_name=self.name,
813            progress_tracker=progress_tracker,
814            processed_streams=stream_names,
815            cache=cache,
816        )
817
818
819__all__ = [
820    "Source",
821]
class Source(airbyte._connector_base.ConnectorBase):
 51class Source(ConnectorBase):
 52    """A class representing a source that can be called."""
 53
 54    connector_type = "source"
 55
 56    def __init__(
 57        self,
 58        executor: Executor,
 59        name: str,
 60        config: dict[str, Any] | None = None,
 61        *,
 62        config_change_callback: ConfigChangeCallback | None = None,
 63        streams: str | list[str] | None = None,
 64        validate: bool = False,
 65        cursor_key_overrides: dict[str, str] | None = None,
 66        primary_key_overrides: dict[str, str | list[str]] | None = None,
 67    ) -> None:
 68        """Initialize the source.
 69
 70        If config is provided, it will be validated against the spec if validate is True.
 71        """
 72        self._to_be_selected_streams: list[str] | str = []
 73        """Used to hold selection criteria before catalog is known."""
 74
 75        super().__init__(
 76            executor=executor,
 77            name=name,
 78            config=config,
 79            config_change_callback=config_change_callback,
 80            validate=validate,
 81        )
 82        self._config_dict: dict[str, Any] | None = None
 83        self._last_log_messages: list[str] = []
 84        self._discovered_catalog: AirbyteCatalog | None = None
 85        self._selected_stream_names: list[str] = []
 86
 87        self._cursor_key_overrides: dict[str, str] = {}
 88        """A mapping of lower-cased stream names to cursor key overrides."""
 89
 90        self._primary_key_overrides: dict[str, list[str]] = {}
 91        """A mapping of lower-cased stream names to primary key overrides."""
 92
 93        if config is not None:
 94            self.set_config(config, validate=validate)
 95        if streams is not None:
 96            self.select_streams(streams)
 97        if cursor_key_overrides is not None:
 98            self.set_cursor_keys(**cursor_key_overrides)
 99        if primary_key_overrides is not None:
100            self.set_primary_keys(**primary_key_overrides)
101
102    def set_streams(self, streams: list[str]) -> None:
103        """Deprecated. See select_streams()."""
104        warnings.warn(
105            "The 'set_streams' method is deprecated and will be removed in a future version. "
106            "Please use the 'select_streams' method instead.",
107            DeprecationWarning,
108            stacklevel=2,
109        )
110        self.select_streams(streams)
111
112    def set_cursor_key(
113        self,
114        stream_name: str,
115        cursor_key: str,
116    ) -> None:
117        """Set the cursor for a single stream.
118
119        Note:
120        - This does not unset previously set cursors.
121        - The cursor key must be a single field name.
122        - Not all streams support custom cursors. If a stream does not support custom cursors,
123          the override may be ignored.
124        - Stream names are case insensitive, while field names are case sensitive.
125        - Stream names are not validated by PyAirbyte. If the stream name
126          does not exist in the catalog, the override may be ignored.
127        """
128        self._cursor_key_overrides[stream_name.lower()] = cursor_key
129
130    def set_cursor_keys(
131        self,
132        **kwargs: str,
133    ) -> None:
134        """Override the cursor key for one or more streams.
135
136        Usage:
137            source.set_cursor_keys(
138                stream1="cursor1",
139                stream2="cursor2",
140            )
141
142        Note:
143        - This does not unset previously set cursors.
144        - The cursor key must be a single field name.
145        - Not all streams support custom cursors. If a stream does not support custom cursors,
146          the override may be ignored.
147        - Stream names are case insensitive, while field names are case sensitive.
148        - Stream names are not validated by PyAirbyte. If the stream name
149          does not exist in the catalog, the override may be ignored.
150        """
151        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
152
153    def set_primary_key(
154        self,
155        stream_name: str,
156        primary_key: str | list[str],
157    ) -> None:
158        """Set the primary key for a single stream.
159
160        Note:
161        - This does not unset previously set primary keys.
162        - The primary key must be a single field name or a list of field names.
163        - Not all streams support overriding primary keys. If a stream does not support overriding
164          primary keys, the override may be ignored.
165        - Stream names are case insensitive, while field names are case sensitive.
166        - Stream names are not validated by PyAirbyte. If the stream name
167          does not exist in the catalog, the override may be ignored.
168        """
169        self._primary_key_overrides[stream_name.lower()] = (
170            primary_key if isinstance(primary_key, list) else [primary_key]
171        )
172
173    def set_primary_keys(
174        self,
175        **kwargs: str | list[str],
176    ) -> None:
177        """Override the primary keys for one or more streams.
178
179        This does not unset previously set primary keys.
180
181        Usage:
182            source.set_primary_keys(
183                stream1="pk1",
184                stream2=["pk1", "pk2"],
185            )
186
187        Note:
188        - This does not unset previously set primary keys.
189        - The primary key must be a single field name or a list of field names.
190        - Not all streams support overriding primary keys. If a stream does not support overriding
191          primary keys, the override may be ignored.
192        - Stream names are case insensitive, while field names are case sensitive.
193        - Stream names are not validated by PyAirbyte. If the stream name
194          does not exist in the catalog, the override may be ignored.
195        """
196        self._primary_key_overrides.update(
197            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
198        )
199
200    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
201        """Logs a warning message indicating stream selection which are not selected yet."""
202        if streams == "*":
203            print(
204                "Warning: Config is not set yet. All streams will be selected after config is set.",
205                file=sys.stderr,
206            )
207        else:
208            print(
209                "Warning: Config is not set yet. "
210                f"Streams to be selected after config is set: {streams}",
211                file=sys.stderr,
212            )
213
214    def select_all_streams(self) -> None:
215        """Select all streams.
216
217        This is a more streamlined equivalent to:
218        > source.select_streams(source.get_available_streams()).
219        """
220        if self._config_dict is None:
221            self._to_be_selected_streams = "*"
222            self._log_warning_preselected_stream(self._to_be_selected_streams)
223            return
224
225        self._selected_stream_names = self.get_available_streams()
226
227    def select_streams(self, streams: str | list[str]) -> None:
228        """Select the stream names that should be read from the connector.
229
230        Args:
231            streams: A list of stream names to select. If set to "*", all streams will be selected.
232
233        Currently, if this is not set, all streams will be read.
234        """
235        if self._config_dict is None:
236            self._to_be_selected_streams = streams
237            self._log_warning_preselected_stream(streams)
238            return
239
240        if streams == "*":
241            self.select_all_streams()
242            return
243
244        if isinstance(streams, str):
245            # If a single stream is provided, convert it to a one-item list
246            streams = [streams]
247
248        available_streams = self.get_available_streams()
249        for stream in streams:
250            if stream not in available_streams:
251                raise exc.AirbyteStreamNotFoundError(
252                    stream_name=stream,
253                    connector_name=self.name,
254                    available_streams=available_streams,
255                )
256        self._selected_stream_names = streams
257
258    def get_selected_streams(self) -> list[str]:
259        """Get the selected streams.
260
261        If no streams are selected, return an empty list.
262        """
263        return self._selected_stream_names
264
265    def set_config(
266        self,
267        config: dict[str, Any],
268        *,
269        validate: bool = True,
270    ) -> None:
271        """Set the config for the connector.
272
273        If validate is True, raise an exception if the config fails validation.
274
275        If validate is False, validation will be deferred until check() or validate_config()
276        is called.
277        """
278        if validate:
279            self.validate_config(config)
280
281        self._config_dict = config
282
283        if self._to_be_selected_streams:
284            self.select_streams(self._to_be_selected_streams)
285            self._to_be_selected_streams = []
286
287    def _discover(self) -> AirbyteCatalog:
288        """Call discover on the connector.
289
290        This involves the following steps:
291        - Write the config to a temporary file
292        - execute the connector with discover --config <config_file>
293        - Listen to the messages and return the first AirbyteCatalog that comes along.
294        - Make sure the subprocess is killed when the function returns.
295        """
296        with as_temp_files([self._hydrated_config]) as [config_file]:
297            for msg in self._execute(["discover", "--config", config_file]):
298                if msg.type == Type.CATALOG and msg.catalog:
299                    return msg.catalog
300            raise exc.AirbyteConnectorMissingCatalogError(
301                connector_name=self.name,
302                log_text=self._last_log_messages,
303            )
304
305    def get_available_streams(self) -> list[str]:
306        """Get the available streams from the spec."""
307        return [s.name for s in self.discovered_catalog.streams]
308
309    def _get_incremental_stream_names(self) -> list[str]:
310        """Get the name of streams that support incremental sync."""
311        return [
312            stream.name
313            for stream in self.discovered_catalog.streams
314            if SyncMode.incremental in stream.supported_sync_modes
315        ]
316
317    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
318        """Call spec on the connector.
319
320        This involves the following steps:
321        * execute the connector with spec
322        * Listen to the messages and return the first AirbyteCatalog that comes along.
323        * Make sure the subprocess is killed when the function returns.
324        """
325        if force_refresh or self._spec is None:
326            for msg in self._execute(["spec"]):
327                if msg.type == Type.SPEC and msg.spec:
328                    self._spec = msg.spec
329                    break
330
331        if self._spec:
332            return self._spec
333
334        raise exc.AirbyteConnectorMissingSpecError(
335            connector_name=self.name,
336            log_text=self._last_log_messages,
337        )
338
339    @property
340    def config_spec(self) -> dict[str, Any]:
341        """Generate a configuration spec for this connector, as a JSON Schema definition.
342
343        This function generates a JSON Schema dictionary with configuration specs for the
344        current connector, as a dictionary.
345
346        Returns:
347            dict: The JSON Schema configuration spec as a dictionary.
348        """
349        return self._get_spec(force_refresh=True).connectionSpecification
350
351    @property
352    def _yaml_spec(self) -> str:
353        """Get the spec as a yaml string.
354
355        For now, the primary use case is for writing and debugging a valid config for a source.
356
357        This is private for now because we probably want better polish before exposing this
358        as a stable interface. This will also get easier when we have docs links with this info
359        for each connector.
360        """
361        spec_obj: ConnectorSpecification = self._get_spec()
362        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
363        # convert to a yaml string
364        return yaml.dump(spec_dict)
365
366    @property
367    def docs_url(self) -> str:
368        """Get the URL to the connector's documentation."""
369        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
370            "source-", ""
371        )
372
373    @property
374    def discovered_catalog(self) -> AirbyteCatalog:
375        """Get the raw catalog for the given streams.
376
377        If the catalog is not yet known, we call discover to get it.
378        """
379        if self._discovered_catalog is None:
380            self._discovered_catalog = self._discover()
381
382        return self._discovered_catalog
383
384    @property
385    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
386        """Get the configured catalog for the given streams.
387
388        If the raw catalog is not yet known, we call discover to get it.
389
390        If no specific streams are selected, we return a catalog that syncs all available streams.
391
392        TODO: We should consider disabling by default the streams that the connector would
393        disable by default. (For instance, streams that require a premium license are sometimes
394        disabled by default within the connector.)
395        """
396        # Ensure discovered catalog is cached before we start
397        _ = self.discovered_catalog
398
399        # Filter for selected streams if set, otherwise use all available streams:
400        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
401        return self.get_configured_catalog(streams=streams_filter)
402
403    def get_configured_catalog(
404        self,
405        streams: Literal["*"] | list[str] | None = None,
406    ) -> ConfiguredAirbyteCatalog:
407        """Get a configured catalog for the given streams.
408
409        If no streams are provided, the selected streams will be used. If no streams are selected,
410        all available streams will be used.
411
412        If '*' is provided, all available streams will be used.
413        """
414        selected_streams: list[str] = []
415        if streams is None:
416            selected_streams = self._selected_stream_names or self.get_available_streams()
417        elif streams == "*":
418            selected_streams = self.get_available_streams()
419        elif isinstance(streams, list):
420            selected_streams = streams
421        else:
422            raise exc.PyAirbyteInputError(
423                message="Invalid streams argument.",
424                input_value=streams,
425            )
426
427        return ConfiguredAirbyteCatalog(
428            streams=[
429                ConfiguredAirbyteStream(
430                    stream=stream,
431                    destination_sync_mode=DestinationSyncMode.overwrite,
432                    sync_mode=SyncMode.incremental,
433                    primary_key=(
434                        [self._primary_key_overrides[stream.name.lower()]]
435                        if stream.name.lower() in self._primary_key_overrides
436                        else stream.source_defined_primary_key
437                    ),
438                    cursor_field=(
439                        [self._cursor_key_overrides[stream.name.lower()]]
440                        if stream.name.lower() in self._cursor_key_overrides
441                        else stream.default_cursor_field
442                    ),
443                    # These are unused in the current implementation:
444                    generation_id=None,
445                    minimum_generation_id=None,
446                    sync_id=None,
447                )
448                for stream in self.discovered_catalog.streams
449                if stream.name in selected_streams
450            ],
451        )
452
453    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
454        """Return the JSON Schema spec for the specified stream name."""
455        catalog: AirbyteCatalog = self.discovered_catalog
456        found: list[AirbyteStream] = [
457            stream for stream in catalog.streams if stream.name == stream_name
458        ]
459
460        if len(found) == 0:
461            raise exc.PyAirbyteInputError(
462                message="Stream name does not exist in catalog.",
463                input_value=stream_name,
464            )
465
466        if len(found) > 1:
467            raise exc.PyAirbyteInternalError(
468                message="Duplicate streams found with the same name.",
469                context={
470                    "found_streams": found,
471                },
472            )
473
474        return found[0].json_schema
475
476    def get_records(
477        self,
478        stream: str,
479        *,
480        normalize_field_names: bool = False,
481        prune_undeclared_fields: bool = True,
482    ) -> LazyDataset:
483        """Read a stream from the connector.
484
485        Args:
486            stream: The name of the stream to read.
487            normalize_field_names: When `True`, field names will be normalized to lower case, with
488                special characters removed. This matches the behavior of PyAirbyte caches and most
489                Airbyte destinations.
490            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
491                which generally matches the behavior of PyAirbyte caches and most Airbyte
492                destinations, specifically when you expect the catalog may be stale. You can disable
493                this to keep all fields in the records.
494
495        This involves the following steps:
496        * Call discover to get the catalog
497        * Generate a configured catalog that syncs the given stream in full_refresh mode
498        * Write the configured catalog and the config to a temporary file
499        * execute the connector with read --config <config_file> --catalog <catalog_file>
500        * Listen to the messages and return the first AirbyteRecordMessages that come along.
501        * Make sure the subprocess is killed when the function returns.
502        """
503        configured_catalog = self.get_configured_catalog(streams=[stream])
504        if len(configured_catalog.streams) == 0:
505            raise exc.PyAirbyteInputError(
506                message="Requested stream does not exist.",
507                context={
508                    "stream": stream,
509                    "available_streams": self.get_available_streams(),
510                    "connector_name": self.name,
511                },
512            ) from KeyError(stream)
513
514        configured_stream = configured_catalog.streams[0]
515
516        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
517            yield from records
518
519        stream_record_handler = StreamRecordHandler(
520            json_schema=self.get_stream_json_schema(stream),
521            prune_extra_fields=prune_undeclared_fields,
522            normalize_keys=normalize_field_names,
523        )
524
525        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
526        progress_tracker = ProgressTracker(
527            ProgressStyle.PLAIN,
528            source=self,
529            cache=None,
530            destination=None,
531            expected_streams=[stream],
532        )
533
534        iterator: Iterator[dict[str, Any]] = (
535            StreamRecord.from_record_message(
536                record_message=record.record,
537                stream_record_handler=stream_record_handler,
538            )
539            for record in self._read_with_catalog(
540                catalog=configured_catalog,
541                progress_tracker=progress_tracker,
542            )
543            if record.record
544        )
545        progress_tracker.log_success()
546        return LazyDataset(
547            iterator,
548            stream_metadata=configured_stream,
549        )
550
551    def get_documents(
552        self,
553        stream: str,
554        title_property: str | None = None,
555        content_properties: list[str] | None = None,
556        metadata_properties: list[str] | None = None,
557        *,
558        render_metadata: bool = False,
559    ) -> Iterable[Document]:
560        """Read a stream from the connector and return the records as documents.
561
562        If metadata_properties is not set, all properties that are not content will be added to
563        the metadata.
564
565        If render_metadata is True, metadata will be rendered in the document, as well as the
566        the main content.
567        """
568        return self.get_records(stream).to_documents(
569            title_property=title_property,
570            content_properties=content_properties,
571            metadata_properties=metadata_properties,
572            render_metadata=render_metadata,
573        )
574
575    def _get_airbyte_message_iterator(
576        self,
577        *,
578        streams: Literal["*"] | list[str] | None = None,
579        state_provider: StateProviderBase | None = None,
580        progress_tracker: ProgressTracker,
581        force_full_refresh: bool = False,
582    ) -> AirbyteMessageIterator:
583        """Get an AirbyteMessageIterator for this source."""
584        return AirbyteMessageIterator(
585            self._read_with_catalog(
586                catalog=self.get_configured_catalog(streams=streams),
587                state=state_provider if not force_full_refresh else None,
588                progress_tracker=progress_tracker,
589            )
590        )
591
592    def _read_with_catalog(
593        self,
594        catalog: ConfiguredAirbyteCatalog,
595        progress_tracker: ProgressTracker,
596        state: StateProviderBase | None = None,
597    ) -> Generator[AirbyteMessage, None, None]:
598        """Call read on the connector.
599
600        This involves the following steps:
601        * Write the config to a temporary file
602        * execute the connector with read --config <config_file> --catalog <catalog_file>
603        * Listen to the messages and return the AirbyteRecordMessages that come along.
604        * Send out telemetry on the performed sync (with information about which source was used and
605          the type of the cache)
606        """
607        with as_temp_files(
608            [
609                self._hydrated_config,
610                catalog.model_dump_json(),
611                state.to_state_input_file_text() if state else "[]",
612            ]
613        ) as [
614            config_file,
615            catalog_file,
616            state_file,
617        ]:
618            message_generator = self._execute(
619                [
620                    "read",
621                    "--config",
622                    config_file,
623                    "--catalog",
624                    catalog_file,
625                    "--state",
626                    state_file,
627                ],
628                progress_tracker=progress_tracker,
629            )
630            yield from progress_tracker.tally_records_read(message_generator)
631        progress_tracker.log_read_complete()
632
633    def _peek_airbyte_message(
634        self,
635        message: AirbyteMessage,
636        *,
637        raise_on_error: bool = True,
638    ) -> None:
639        """Process an Airbyte message.
640
641        This method handles reading Airbyte messages and taking action, if needed, based on the
642        message type. For instance, log messages are logged, records are tallied, and errors are
643        raised as exceptions if `raise_on_error` is True.
644
645        Raises:
646            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
647        """
648        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
649
650    def _log_incremental_streams(
651        self,
652        *,
653        incremental_streams: set[str] | None = None,
654    ) -> None:
655        """Log the streams which are using incremental sync mode."""
656        log_message = (
657            "The following streams are currently using incremental sync:\n"
658            f"{incremental_streams}\n"
659            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
660        )
661        print(log_message, file=sys.stderr)
662
663    def read(
664        self,
665        cache: CacheBase | None = None,
666        *,
667        streams: str | list[str] | None = None,
668        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
669        force_full_refresh: bool = False,
670        skip_validation: bool = False,
671    ) -> ReadResult:
672        """Read from the connector and write to the cache.
673
674        Args:
675            cache: The cache to write to. If not set, a default cache will be used.
676            streams: Optional if already set. A list of stream names to select for reading. If set
677                to "*", all streams will be selected.
678            write_strategy: The strategy to use when writing to the cache. If a string, it must be
679                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
680                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
681                WriteStrategy.AUTO.
682            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
683                streams will be read in incremental mode if supported by the connector. This option
684                must be True when using the "replace" strategy.
685            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
686                running the connector. This can be helpful in debugging, when you want to send
687                configurations to the connector that otherwise might be rejected by JSON Schema
688                validation rules.
689        """
690        cache = cache or get_default_cache()
691        progress_tracker = ProgressTracker(
692            source=self,
693            cache=cache,
694            destination=None,
695            expected_streams=None,  # Will be set later
696        )
697
698        # Set up state provider if not in full refresh mode
699        if force_full_refresh:
700            state_provider: StateProviderBase | None = None
701        else:
702            state_provider = cache.get_state_provider(
703                source_name=self._name,
704            )
705        state_writer = cache.get_state_writer(source_name=self._name)
706
707        if streams:
708            self.select_streams(streams)
709
710        if not self._selected_stream_names:
711            raise exc.PyAirbyteNoStreamsSelectedError(
712                connector_name=self.name,
713                available_streams=self.get_available_streams(),
714            )
715
716        try:
717            result = self._read_to_cache(
718                cache=cache,
719                catalog_provider=CatalogProvider(self.configured_catalog),
720                stream_names=self._selected_stream_names,
721                state_provider=state_provider,
722                state_writer=state_writer,
723                write_strategy=write_strategy,
724                force_full_refresh=force_full_refresh,
725                skip_validation=skip_validation,
726                progress_tracker=progress_tracker,
727            )
728        except exc.PyAirbyteInternalError as ex:
729            progress_tracker.log_failure(exception=ex)
730            raise exc.AirbyteConnectorFailedError(
731                connector_name=self.name,
732                log_text=self._last_log_messages,
733            ) from ex
734        except Exception as ex:
735            progress_tracker.log_failure(exception=ex)
736            raise
737
738        progress_tracker.log_success()
739        return result
740
741    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
742        self,
743        cache: CacheBase,
744        *,
745        catalog_provider: CatalogProvider,
746        stream_names: list[str],
747        state_provider: StateProviderBase | None,
748        state_writer: StateWriterBase | None,
749        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
750        force_full_refresh: bool = False,
751        skip_validation: bool = False,
752        progress_tracker: ProgressTracker,
753    ) -> ReadResult:
754        """Internal read method."""
755        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
756            warnings.warn(
757                message=(
758                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
759                    "could result in data loss. "
760                    "To silence this warning, use the following: "
761                    'warnings.filterwarnings("ignore", '
762                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
763                ),
764                category=exc.PyAirbyteDataLossWarning,
765                stacklevel=1,
766            )
767        if isinstance(write_strategy, str):
768            try:
769                write_strategy = WriteStrategy(write_strategy)
770            except ValueError:
771                raise exc.PyAirbyteInputError(
772                    message="Invalid strategy",
773                    context={
774                        "write_strategy": write_strategy,
775                        "available_strategies": [s.value for s in WriteStrategy],
776                    },
777                ) from None
778
779        # Run optional validation step
780        if not skip_validation:
781            self.validate_config()
782
783        # Log incremental stream if incremental streams are known
784        if state_provider and state_provider.known_stream_names:
785            # Retrieve set of the known streams support which support incremental sync
786            incremental_streams = (
787                set(self._get_incremental_stream_names())
788                & state_provider.known_stream_names
789                & set(self.get_selected_streams())
790            )
791            if incremental_streams:
792                self._log_incremental_streams(incremental_streams=incremental_streams)
793
794        airbyte_message_iterator = AirbyteMessageIterator(
795            self._read_with_catalog(
796                catalog=catalog_provider.configured_catalog,
797                state=state_provider,
798                progress_tracker=progress_tracker,
799            )
800        )
801        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
802            stdin=airbyte_message_iterator,
803            catalog_provider=catalog_provider,
804            write_strategy=write_strategy,
805            state_writer=state_writer,
806            progress_tracker=progress_tracker,
807        )
808
809        # Flush the WAL, if applicable
810        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
811
812        return ReadResult(
813            source_name=self.name,
814            progress_tracker=progress_tracker,
815            processed_streams=stream_names,
816            cache=cache,
817        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False, cursor_key_overrides: dict[str, str] | None = None, primary_key_overrides: dict[str, str | list[str]] | None = None)
 56    def __init__(
 57        self,
 58        executor: Executor,
 59        name: str,
 60        config: dict[str, Any] | None = None,
 61        *,
 62        config_change_callback: ConfigChangeCallback | None = None,
 63        streams: str | list[str] | None = None,
 64        validate: bool = False,
 65        cursor_key_overrides: dict[str, str] | None = None,
 66        primary_key_overrides: dict[str, str | list[str]] | None = None,
 67    ) -> None:
 68        """Initialize the source.
 69
 70        If config is provided, it will be validated against the spec if validate is True.
 71        """
 72        self._to_be_selected_streams: list[str] | str = []
 73        """Used to hold selection criteria before catalog is known."""
 74
 75        super().__init__(
 76            executor=executor,
 77            name=name,
 78            config=config,
 79            config_change_callback=config_change_callback,
 80            validate=validate,
 81        )
 82        self._config_dict: dict[str, Any] | None = None
 83        self._last_log_messages: list[str] = []
 84        self._discovered_catalog: AirbyteCatalog | None = None
 85        self._selected_stream_names: list[str] = []
 86
 87        self._cursor_key_overrides: dict[str, str] = {}
 88        """A mapping of lower-cased stream names to cursor key overrides."""
 89
 90        self._primary_key_overrides: dict[str, list[str]] = {}
 91        """A mapping of lower-cased stream names to primary key overrides."""
 92
 93        if config is not None:
 94            self.set_config(config, validate=validate)
 95        if streams is not None:
 96            self.select_streams(streams)
 97        if cursor_key_overrides is not None:
 98            self.set_cursor_keys(**cursor_key_overrides)
 99        if primary_key_overrides is not None:
100            self.set_primary_keys(**primary_key_overrides)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
102    def set_streams(self, streams: list[str]) -> None:
103        """Deprecated. See select_streams()."""
104        warnings.warn(
105            "The 'set_streams' method is deprecated and will be removed in a future version. "
106            "Please use the 'select_streams' method instead.",
107            DeprecationWarning,
108            stacklevel=2,
109        )
110        self.select_streams(streams)

Deprecated. See select_streams().

def set_cursor_key(self, stream_name: str, cursor_key: str) -> None:
112    def set_cursor_key(
113        self,
114        stream_name: str,
115        cursor_key: str,
116    ) -> None:
117        """Set the cursor for a single stream.
118
119        Note:
120        - This does not unset previously set cursors.
121        - The cursor key must be a single field name.
122        - Not all streams support custom cursors. If a stream does not support custom cursors,
123          the override may be ignored.
124        - Stream names are case insensitive, while field names are case sensitive.
125        - Stream names are not validated by PyAirbyte. If the stream name
126          does not exist in the catalog, the override may be ignored.
127        """
128        self._cursor_key_overrides[stream_name.lower()] = cursor_key

Set the cursor for a single stream.

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_cursor_keys(self, **kwargs: str) -> None:
130    def set_cursor_keys(
131        self,
132        **kwargs: str,
133    ) -> None:
134        """Override the cursor key for one or more streams.
135
136        Usage:
137            source.set_cursor_keys(
138                stream1="cursor1",
139                stream2="cursor2",
140            )
141
142        Note:
143        - This does not unset previously set cursors.
144        - The cursor key must be a single field name.
145        - Not all streams support custom cursors. If a stream does not support custom cursors,
146          the override may be ignored.
147        - Stream names are case insensitive, while field names are case sensitive.
148        - Stream names are not validated by PyAirbyte. If the stream name
149          does not exist in the catalog, the override may be ignored.
150        """
151        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})

Override the cursor key for one or more streams.

Usage:

source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_key(self, stream_name: str, primary_key: str | list[str]) -> None:
153    def set_primary_key(
154        self,
155        stream_name: str,
156        primary_key: str | list[str],
157    ) -> None:
158        """Set the primary key for a single stream.
159
160        Note:
161        - This does not unset previously set primary keys.
162        - The primary key must be a single field name or a list of field names.
163        - Not all streams support overriding primary keys. If a stream does not support overriding
164          primary keys, the override may be ignored.
165        - Stream names are case insensitive, while field names are case sensitive.
166        - Stream names are not validated by PyAirbyte. If the stream name
167          does not exist in the catalog, the override may be ignored.
168        """
169        self._primary_key_overrides[stream_name.lower()] = (
170            primary_key if isinstance(primary_key, list) else [primary_key]
171        )

Set the primary key for a single stream.

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_keys(self, **kwargs: str | list[str]) -> None:
173    def set_primary_keys(
174        self,
175        **kwargs: str | list[str],
176    ) -> None:
177        """Override the primary keys for one or more streams.
178
179        This does not unset previously set primary keys.
180
181        Usage:
182            source.set_primary_keys(
183                stream1="pk1",
184                stream2=["pk1", "pk2"],
185            )
186
187        Note:
188        - This does not unset previously set primary keys.
189        - The primary key must be a single field name or a list of field names.
190        - Not all streams support overriding primary keys. If a stream does not support overriding
191          primary keys, the override may be ignored.
192        - Stream names are case insensitive, while field names are case sensitive.
193        - Stream names are not validated by PyAirbyte. If the stream name
194          does not exist in the catalog, the override may be ignored.
195        """
196        self._primary_key_overrides.update(
197            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
198        )

Override the primary keys for one or more streams.

This does not unset previously set primary keys.

Usage:

source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def select_all_streams(self) -> None:
214    def select_all_streams(self) -> None:
215        """Select all streams.
216
217        This is a more streamlined equivalent to:
218        > source.select_streams(source.get_available_streams()).
219        """
220        if self._config_dict is None:
221            self._to_be_selected_streams = "*"
222            self._log_warning_preselected_stream(self._to_be_selected_streams)
223            return
224
225        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
227    def select_streams(self, streams: str | list[str]) -> None:
228        """Select the stream names that should be read from the connector.
229
230        Args:
231            streams: A list of stream names to select. If set to "*", all streams will be selected.
232
233        Currently, if this is not set, all streams will be read.
234        """
235        if self._config_dict is None:
236            self._to_be_selected_streams = streams
237            self._log_warning_preselected_stream(streams)
238            return
239
240        if streams == "*":
241            self.select_all_streams()
242            return
243
244        if isinstance(streams, str):
245            # If a single stream is provided, convert it to a one-item list
246            streams = [streams]
247
248        available_streams = self.get_available_streams()
249        for stream in streams:
250            if stream not in available_streams:
251                raise exc.AirbyteStreamNotFoundError(
252                    stream_name=stream,
253                    connector_name=self.name,
254                    available_streams=available_streams,
255                )
256        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
258    def get_selected_streams(self) -> list[str]:
259        """Get the selected streams.
260
261        If no streams are selected, return an empty list.
262        """
263        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
265    def set_config(
266        self,
267        config: dict[str, Any],
268        *,
269        validate: bool = True,
270    ) -> None:
271        """Set the config for the connector.
272
273        If validate is True, raise an exception if the config fails validation.
274
275        If validate is False, validation will be deferred until check() or validate_config()
276        is called.
277        """
278        if validate:
279            self.validate_config(config)
280
281        self._config_dict = config
282
283        if self._to_be_selected_streams:
284            self.select_streams(self._to_be_selected_streams)
285            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_available_streams(self) -> list[str]:
305    def get_available_streams(self) -> list[str]:
306        """Get the available streams from the spec."""
307        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
339    @property
340    def config_spec(self) -> dict[str, Any]:
341        """Generate a configuration spec for this connector, as a JSON Schema definition.
342
343        This function generates a JSON Schema dictionary with configuration specs for the
344        current connector, as a dictionary.
345
346        Returns:
347            dict: The JSON Schema configuration spec as a dictionary.
348        """
349        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

docs_url: str
366    @property
367    def docs_url(self) -> str:
368        """Get the URL to the connector's documentation."""
369        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
370            "source-", ""
371        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
373    @property
374    def discovered_catalog(self) -> AirbyteCatalog:
375        """Get the raw catalog for the given streams.
376
377        If the catalog is not yet known, we call discover to get it.
378        """
379        if self._discovered_catalog is None:
380            self._discovered_catalog = self._discover()
381
382        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
384    @property
385    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
386        """Get the configured catalog for the given streams.
387
388        If the raw catalog is not yet known, we call discover to get it.
389
390        If no specific streams are selected, we return a catalog that syncs all available streams.
391
392        TODO: We should consider disabling by default the streams that the connector would
393        disable by default. (For instance, streams that require a premium license are sometimes
394        disabled by default within the connector.)
395        """
396        # Ensure discovered catalog is cached before we start
397        _ = self.discovered_catalog
398
399        # Filter for selected streams if set, otherwise use all available streams:
400        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
401        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
403    def get_configured_catalog(
404        self,
405        streams: Literal["*"] | list[str] | None = None,
406    ) -> ConfiguredAirbyteCatalog:
407        """Get a configured catalog for the given streams.
408
409        If no streams are provided, the selected streams will be used. If no streams are selected,
410        all available streams will be used.
411
412        If '*' is provided, all available streams will be used.
413        """
414        selected_streams: list[str] = []
415        if streams is None:
416            selected_streams = self._selected_stream_names or self.get_available_streams()
417        elif streams == "*":
418            selected_streams = self.get_available_streams()
419        elif isinstance(streams, list):
420            selected_streams = streams
421        else:
422            raise exc.PyAirbyteInputError(
423                message="Invalid streams argument.",
424                input_value=streams,
425            )
426
427        return ConfiguredAirbyteCatalog(
428            streams=[
429                ConfiguredAirbyteStream(
430                    stream=stream,
431                    destination_sync_mode=DestinationSyncMode.overwrite,
432                    sync_mode=SyncMode.incremental,
433                    primary_key=(
434                        [self._primary_key_overrides[stream.name.lower()]]
435                        if stream.name.lower() in self._primary_key_overrides
436                        else stream.source_defined_primary_key
437                    ),
438                    cursor_field=(
439                        [self._cursor_key_overrides[stream.name.lower()]]
440                        if stream.name.lower() in self._cursor_key_overrides
441                        else stream.default_cursor_field
442                    ),
443                    # These are unused in the current implementation:
444                    generation_id=None,
445                    minimum_generation_id=None,
446                    sync_id=None,
447                )
448                for stream in self.discovered_catalog.streams
449                if stream.name in selected_streams
450            ],
451        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
453    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
454        """Return the JSON Schema spec for the specified stream name."""
455        catalog: AirbyteCatalog = self.discovered_catalog
456        found: list[AirbyteStream] = [
457            stream for stream in catalog.streams if stream.name == stream_name
458        ]
459
460        if len(found) == 0:
461            raise exc.PyAirbyteInputError(
462                message="Stream name does not exist in catalog.",
463                input_value=stream_name,
464            )
465
466        if len(found) > 1:
467            raise exc.PyAirbyteInternalError(
468                message="Duplicate streams found with the same name.",
469                context={
470                    "found_streams": found,
471                },
472            )
473
474        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
476    def get_records(
477        self,
478        stream: str,
479        *,
480        normalize_field_names: bool = False,
481        prune_undeclared_fields: bool = True,
482    ) -> LazyDataset:
483        """Read a stream from the connector.
484
485        Args:
486            stream: The name of the stream to read.
487            normalize_field_names: When `True`, field names will be normalized to lower case, with
488                special characters removed. This matches the behavior of PyAirbyte caches and most
489                Airbyte destinations.
490            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
491                which generally matches the behavior of PyAirbyte caches and most Airbyte
492                destinations, specifically when you expect the catalog may be stale. You can disable
493                this to keep all fields in the records.
494
495        This involves the following steps:
496        * Call discover to get the catalog
497        * Generate a configured catalog that syncs the given stream in full_refresh mode
498        * Write the configured catalog and the config to a temporary file
499        * execute the connector with read --config <config_file> --catalog <catalog_file>
500        * Listen to the messages and return the first AirbyteRecordMessages that come along.
501        * Make sure the subprocess is killed when the function returns.
502        """
503        configured_catalog = self.get_configured_catalog(streams=[stream])
504        if len(configured_catalog.streams) == 0:
505            raise exc.PyAirbyteInputError(
506                message="Requested stream does not exist.",
507                context={
508                    "stream": stream,
509                    "available_streams": self.get_available_streams(),
510                    "connector_name": self.name,
511                },
512            ) from KeyError(stream)
513
514        configured_stream = configured_catalog.streams[0]
515
516        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
517            yield from records
518
519        stream_record_handler = StreamRecordHandler(
520            json_schema=self.get_stream_json_schema(stream),
521            prune_extra_fields=prune_undeclared_fields,
522            normalize_keys=normalize_field_names,
523        )
524
525        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
526        progress_tracker = ProgressTracker(
527            ProgressStyle.PLAIN,
528            source=self,
529            cache=None,
530            destination=None,
531            expected_streams=[stream],
532        )
533
534        iterator: Iterator[dict[str, Any]] = (
535            StreamRecord.from_record_message(
536                record_message=record.record,
537                stream_record_handler=stream_record_handler,
538            )
539            for record in self._read_with_catalog(
540                catalog=configured_catalog,
541                progress_tracker=progress_tracker,
542            )
543            if record.record
544        )
545        progress_tracker.log_success()
546        return LazyDataset(
547            iterator,
548            stream_metadata=configured_stream,
549        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
551    def get_documents(
552        self,
553        stream: str,
554        title_property: str | None = None,
555        content_properties: list[str] | None = None,
556        metadata_properties: list[str] | None = None,
557        *,
558        render_metadata: bool = False,
559    ) -> Iterable[Document]:
560        """Read a stream from the connector and return the records as documents.
561
562        If metadata_properties is not set, all properties that are not content will be added to
563        the metadata.
564
565        If render_metadata is True, metadata will be rendered in the document, as well as the
566        the main content.
567        """
568        return self.get_records(stream).to_documents(
569            title_property=title_property,
570            content_properties=content_properties,
571            metadata_properties=metadata_properties,
572            render_metadata=render_metadata,
573        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
663    def read(
664        self,
665        cache: CacheBase | None = None,
666        *,
667        streams: str | list[str] | None = None,
668        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
669        force_full_refresh: bool = False,
670        skip_validation: bool = False,
671    ) -> ReadResult:
672        """Read from the connector and write to the cache.
673
674        Args:
675            cache: The cache to write to. If not set, a default cache will be used.
676            streams: Optional if already set. A list of stream names to select for reading. If set
677                to "*", all streams will be selected.
678            write_strategy: The strategy to use when writing to the cache. If a string, it must be
679                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
680                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
681                WriteStrategy.AUTO.
682            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
683                streams will be read in incremental mode if supported by the connector. This option
684                must be True when using the "replace" strategy.
685            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
686                running the connector. This can be helpful in debugging, when you want to send
687                configurations to the connector that otherwise might be rejected by JSON Schema
688                validation rules.
689        """
690        cache = cache or get_default_cache()
691        progress_tracker = ProgressTracker(
692            source=self,
693            cache=cache,
694            destination=None,
695            expected_streams=None,  # Will be set later
696        )
697
698        # Set up state provider if not in full refresh mode
699        if force_full_refresh:
700            state_provider: StateProviderBase | None = None
701        else:
702            state_provider = cache.get_state_provider(
703                source_name=self._name,
704            )
705        state_writer = cache.get_state_writer(source_name=self._name)
706
707        if streams:
708            self.select_streams(streams)
709
710        if not self._selected_stream_names:
711            raise exc.PyAirbyteNoStreamsSelectedError(
712                connector_name=self.name,
713                available_streams=self.get_available_streams(),
714            )
715
716        try:
717            result = self._read_to_cache(
718                cache=cache,
719                catalog_provider=CatalogProvider(self.configured_catalog),
720                stream_names=self._selected_stream_names,
721                state_provider=state_provider,
722                state_writer=state_writer,
723                write_strategy=write_strategy,
724                force_full_refresh=force_full_refresh,
725                skip_validation=skip_validation,
726                progress_tracker=progress_tracker,
727            )
728        except exc.PyAirbyteInternalError as ex:
729            progress_tracker.log_failure(exception=ex)
730            raise exc.AirbyteConnectorFailedError(
731                connector_name=self.name,
732                log_text=self._last_log_messages,
733            ) from ex
734        except Exception as ex:
735            progress_tracker.log_failure(exception=ex)
736            raise
737
738        progress_tracker.log_success()
739        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
get_config
config_hash
validate_config
print_config_spec
connector_version
check
install
uninstall