airbyte.sources.base

Base class implementation for sources.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Base class implementation for sources."""
  3
  4from __future__ import annotations
  5
  6import sys
  7import threading
  8import time
  9import warnings
 10from itertools import islice
 11from typing import TYPE_CHECKING, Any, Literal
 12
 13import yaml
 14from rich import print  # noqa: A004  # Allow shadowing the built-in
 15from rich.console import Console
 16from rich.markdown import Markdown
 17from rich.markup import escape
 18from rich.table import Table
 19from typing_extensions import override
 20
 21from airbyte_protocol.models import (
 22    AirbyteCatalog,
 23    AirbyteMessage,
 24    ConfiguredAirbyteCatalog,
 25    ConfiguredAirbyteStream,
 26    DestinationSyncMode,
 27    SyncMode,
 28    Type,
 29)
 30
 31from airbyte import exceptions as exc
 32from airbyte._connector_base import ConnectorBase
 33from airbyte._message_iterators import AirbyteMessageIterator
 34from airbyte._util.temp_files import as_temp_files
 35from airbyte.caches.util import get_default_cache
 36from airbyte.datasets._lazy import LazyDataset
 37from airbyte.progress import ProgressStyle, ProgressTracker
 38from airbyte.records import StreamRecord, StreamRecordHandler
 39from airbyte.results import ReadResult
 40from airbyte.shared.catalog_providers import CatalogProvider
 41from airbyte.strategies import WriteStrategy
 42
 43
 44if TYPE_CHECKING:
 45    from collections.abc import Generator, Iterable, Iterator
 46
 47    from airbyte_protocol.models import (
 48        AirbyteStream,
 49        ConnectorSpecification,
 50    )
 51
 52    from airbyte._executors.base import Executor
 53    from airbyte.caches import CacheBase
 54    from airbyte.callbacks import ConfigChangeCallback
 55    from airbyte.datasets._inmemory import InMemoryDataset
 56    from airbyte.documents import Document
 57    from airbyte.shared.state_providers import StateProviderBase
 58    from airbyte.shared.state_writers import StateWriterBase
 59
 60from airbyte.constants import (
 61    AB_EXTRACTED_AT_COLUMN,
 62    AB_META_COLUMN,
 63    AB_RAW_ID_COLUMN,
 64)
 65
 66
 67class Source(ConnectorBase):  # noqa: PLR0904
 68    """A class representing a source that can be called."""
 69
 70    connector_type = "source"
 71
 72    def __init__(
 73        self,
 74        executor: Executor,
 75        name: str,
 76        config: dict[str, Any] | None = None,
 77        *,
 78        config_change_callback: ConfigChangeCallback | None = None,
 79        streams: str | list[str] | None = None,
 80        validate: bool = False,
 81        cursor_key_overrides: dict[str, str] | None = None,
 82        primary_key_overrides: dict[str, str | list[str]] | None = None,
 83    ) -> None:
 84        """Initialize the source.
 85
 86        If config is provided, it will be validated against the spec if validate is True.
 87        """
 88        self._to_be_selected_streams: list[str] | str = []
 89        """Used to hold selection criteria before catalog is known."""
 90
 91        super().__init__(
 92            executor=executor,
 93            name=name,
 94            config=config,
 95            config_change_callback=config_change_callback,
 96            validate=validate,
 97        )
 98        self._config_dict: dict[str, Any] | None = None
 99        self._last_log_messages: list[str] = []
100        self._discovered_catalog: AirbyteCatalog | None = None
101        self._selected_stream_names: list[str] = []
102
103        self._cursor_key_overrides: dict[str, str] = {}
104        """A mapping of lower-cased stream names to cursor key overrides."""
105
106        self._primary_key_overrides: dict[str, list[str]] = {}
107        """A mapping of lower-cased stream names to primary key overrides."""
108
109        if config is not None:
110            self.set_config(config, validate=validate)
111        if streams is not None:
112            self.select_streams(streams)
113        if cursor_key_overrides is not None:
114            self.set_cursor_keys(**cursor_key_overrides)
115        if primary_key_overrides is not None:
116            self.set_primary_keys(**primary_key_overrides)
117
118    def set_streams(self, streams: list[str]) -> None:
119        """Deprecated. See select_streams()."""
120        warnings.warn(
121            "The 'set_streams' method is deprecated and will be removed in a future version. "
122            "Please use the 'select_streams' method instead.",
123            DeprecationWarning,
124            stacklevel=2,
125        )
126        self.select_streams(streams)
127
128    def set_cursor_key(
129        self,
130        stream_name: str,
131        cursor_key: str,
132    ) -> None:
133        """Set the cursor for a single stream.
134
135        Note:
136        - This does not unset previously set cursors.
137        - The cursor key must be a single field name.
138        - Not all streams support custom cursors. If a stream does not support custom cursors,
139          the override may be ignored.
140        - Stream names are case insensitive, while field names are case sensitive.
141        - Stream names are not validated by PyAirbyte. If the stream name
142          does not exist in the catalog, the override may be ignored.
143        """
144        self._cursor_key_overrides[stream_name.lower()] = cursor_key
145
146    def set_cursor_keys(
147        self,
148        **kwargs: str,
149    ) -> None:
150        """Override the cursor key for one or more streams.
151
152        Usage:
153            source.set_cursor_keys(
154                stream1="cursor1",
155                stream2="cursor2",
156            )
157
158        Note:
159        - This does not unset previously set cursors.
160        - The cursor key must be a single field name.
161        - Not all streams support custom cursors. If a stream does not support custom cursors,
162          the override may be ignored.
163        - Stream names are case insensitive, while field names are case sensitive.
164        - Stream names are not validated by PyAirbyte. If the stream name
165          does not exist in the catalog, the override may be ignored.
166        """
167        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
168
169    def set_primary_key(
170        self,
171        stream_name: str,
172        primary_key: str | list[str],
173    ) -> None:
174        """Set the primary key for a single stream.
175
176        Note:
177        - This does not unset previously set primary keys.
178        - The primary key must be a single field name or a list of field names.
179        - Not all streams support overriding primary keys. If a stream does not support overriding
180          primary keys, the override may be ignored.
181        - Stream names are case insensitive, while field names are case sensitive.
182        - Stream names are not validated by PyAirbyte. If the stream name
183          does not exist in the catalog, the override may be ignored.
184        """
185        self._primary_key_overrides[stream_name.lower()] = (
186            primary_key if isinstance(primary_key, list) else [primary_key]
187        )
188
189    def set_primary_keys(
190        self,
191        **kwargs: str | list[str],
192    ) -> None:
193        """Override the primary keys for one or more streams.
194
195        This does not unset previously set primary keys.
196
197        Usage:
198            source.set_primary_keys(
199                stream1="pk1",
200                stream2=["pk1", "pk2"],
201            )
202
203        Note:
204        - This does not unset previously set primary keys.
205        - The primary key must be a single field name or a list of field names.
206        - Not all streams support overriding primary keys. If a stream does not support overriding
207          primary keys, the override may be ignored.
208        - Stream names are case insensitive, while field names are case sensitive.
209        - Stream names are not validated by PyAirbyte. If the stream name
210          does not exist in the catalog, the override may be ignored.
211        """
212        self._primary_key_overrides.update(
213            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
214        )
215
216    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
217        """Logs a warning message indicating stream selection which are not selected yet."""
218        if streams == "*":
219            print(
220                "Warning: Config is not set yet. All streams will be selected after config is set.",
221                file=sys.stderr,
222            )
223        else:
224            print(
225                "Warning: Config is not set yet. "
226                f"Streams to be selected after config is set: {streams}",
227                file=sys.stderr,
228            )
229
230    def select_all_streams(self) -> None:
231        """Select all streams.
232
233        This is a more streamlined equivalent to:
234        > source.select_streams(source.get_available_streams()).
235        """
236        if self._config_dict is None:
237            self._to_be_selected_streams = "*"
238            self._log_warning_preselected_stream(self._to_be_selected_streams)
239            return
240
241        self._selected_stream_names = self.get_available_streams()
242
243    def select_streams(self, streams: str | list[str]) -> None:
244        """Select the stream names that should be read from the connector.
245
246        Args:
247            streams: A list of stream names to select. If set to "*", all streams will be selected.
248
249        Currently, if this is not set, all streams will be read.
250        """
251        if self._config_dict is None:
252            self._to_be_selected_streams = streams
253            self._log_warning_preselected_stream(streams)
254            return
255
256        if streams == "*":
257            self.select_all_streams()
258            return
259
260        if isinstance(streams, str):
261            # If a single stream is provided, convert it to a one-item list
262            streams = [streams]
263
264        available_streams = self.get_available_streams()
265        for stream in streams:
266            if stream not in available_streams:
267                raise exc.AirbyteStreamNotFoundError(
268                    stream_name=stream,
269                    connector_name=self.name,
270                    available_streams=available_streams,
271                )
272        self._selected_stream_names = streams
273
274    def get_selected_streams(self) -> list[str]:
275        """Get the selected streams.
276
277        If no streams are selected, return an empty list.
278        """
279        return self._selected_stream_names
280
281    def set_config(
282        self,
283        config: dict[str, Any],
284        *,
285        validate: bool = True,
286    ) -> None:
287        """Set the config for the connector.
288
289        If validate is True, raise an exception if the config fails validation.
290
291        If validate is False, validation will be deferred until check() or validate_config()
292        is called.
293        """
294        if validate:
295            self.validate_config(config)
296
297        self._config_dict = config
298
299        if self._to_be_selected_streams:
300            self.select_streams(self._to_be_selected_streams)
301            self._to_be_selected_streams = []
302
303    def _discover(self) -> AirbyteCatalog:
304        """Call discover on the connector.
305
306        This involves the following steps:
307        - Write the config to a temporary file
308        - execute the connector with discover --config <config_file>
309        - Listen to the messages and return the first AirbyteCatalog that comes along.
310        - Make sure the subprocess is killed when the function returns.
311        """
312        with as_temp_files([self._hydrated_config]) as [config_file]:
313            for msg in self._execute(["discover", "--config", config_file]):
314                if msg.type == Type.CATALOG and msg.catalog:
315                    return msg.catalog
316            raise exc.AirbyteConnectorMissingCatalogError(
317                connector_name=self.name,
318                log_text=self._last_log_messages,
319            )
320
321    def get_available_streams(self) -> list[str]:
322        """Get the available streams from the spec."""
323        return [s.name for s in self.discovered_catalog.streams]
324
325    def _get_incremental_stream_names(self) -> list[str]:
326        """Get the name of streams that support incremental sync."""
327        return [
328            stream.name
329            for stream in self.discovered_catalog.streams
330            if SyncMode.incremental in stream.supported_sync_modes
331        ]
332
333    @override
334    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
335        """Call spec on the connector.
336
337        This involves the following steps:
338        * execute the connector with spec
339        * Listen to the messages and return the first AirbyteCatalog that comes along.
340        * Make sure the subprocess is killed when the function returns.
341        """
342        if force_refresh or self._spec is None:
343            for msg in self._execute(["spec"]):
344                if msg.type == Type.SPEC and msg.spec:
345                    self._spec = msg.spec
346                    break
347
348        if self._spec:
349            return self._spec
350
351        raise exc.AirbyteConnectorMissingSpecError(
352            connector_name=self.name,
353            log_text=self._last_log_messages,
354        )
355
356    @property
357    def config_spec(self) -> dict[str, Any]:
358        """Generate a configuration spec for this connector, as a JSON Schema definition.
359
360        This function generates a JSON Schema dictionary with configuration specs for the
361        current connector, as a dictionary.
362
363        Returns:
364            dict: The JSON Schema configuration spec as a dictionary.
365        """
366        return self._get_spec(force_refresh=True).connectionSpecification
367
368    @property
369    def _yaml_spec(self) -> str:
370        """Get the spec as a yaml string.
371
372        For now, the primary use case is for writing and debugging a valid config for a source.
373
374        This is private for now because we probably want better polish before exposing this
375        as a stable interface. This will also get easier when we have docs links with this info
376        for each connector.
377        """
378        spec_obj: ConnectorSpecification = self._get_spec()
379        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
380        # convert to a yaml string
381        return yaml.dump(spec_dict)
382
383    @property
384    def docs_url(self) -> str:
385        """Get the URL to the connector's documentation."""
386        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
387            "source-", ""
388        )
389
390    @property
391    def discovered_catalog(self) -> AirbyteCatalog:
392        """Get the raw catalog for the given streams.
393
394        If the catalog is not yet known, we call discover to get it.
395        """
396        if self._discovered_catalog is None:
397            self._discovered_catalog = self._discover()
398
399        return self._discovered_catalog
400
401    @property
402    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
403        """Get the configured catalog for the given streams.
404
405        If the raw catalog is not yet known, we call discover to get it.
406
407        If no specific streams are selected, we return a catalog that syncs all available streams.
408
409        TODO: We should consider disabling by default the streams that the connector would
410        disable by default. (For instance, streams that require a premium license are sometimes
411        disabled by default within the connector.)
412        """
413        # Ensure discovered catalog is cached before we start
414        _ = self.discovered_catalog
415
416        # Filter for selected streams if set, otherwise use all available streams:
417        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
418        return self.get_configured_catalog(streams=streams_filter)
419
420    def get_configured_catalog(
421        self,
422        streams: Literal["*"] | list[str] | None = None,
423    ) -> ConfiguredAirbyteCatalog:
424        """Get a configured catalog for the given streams.
425
426        If no streams are provided, the selected streams will be used. If no streams are selected,
427        all available streams will be used.
428
429        If '*' is provided, all available streams will be used.
430        """
431        selected_streams: list[str] = []
432        if streams is None:
433            selected_streams = self._selected_stream_names or self.get_available_streams()
434        elif streams == "*":
435            selected_streams = self.get_available_streams()
436        elif isinstance(streams, list):
437            selected_streams = streams
438        else:
439            raise exc.PyAirbyteInputError(
440                message="Invalid streams argument.",
441                input_value=streams,
442            )
443
444        return ConfiguredAirbyteCatalog(
445            streams=[
446                ConfiguredAirbyteStream(
447                    stream=stream,
448                    destination_sync_mode=DestinationSyncMode.overwrite,
449                    sync_mode=SyncMode.incremental,
450                    primary_key=(
451                        [self._primary_key_overrides[stream.name.lower()]]
452                        if stream.name.lower() in self._primary_key_overrides
453                        else stream.source_defined_primary_key
454                    ),
455                    cursor_field=(
456                        [self._cursor_key_overrides[stream.name.lower()]]
457                        if stream.name.lower() in self._cursor_key_overrides
458                        else stream.default_cursor_field
459                    ),
460                    # These are unused in the current implementation:
461                    generation_id=None,
462                    minimum_generation_id=None,
463                    sync_id=None,
464                )
465                for stream in self.discovered_catalog.streams
466                if stream.name in selected_streams
467            ],
468        )
469
470    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
471        """Return the JSON Schema spec for the specified stream name."""
472        catalog: AirbyteCatalog = self.discovered_catalog
473        found: list[AirbyteStream] = [
474            stream for stream in catalog.streams if stream.name == stream_name
475        ]
476
477        if len(found) == 0:
478            raise exc.PyAirbyteInputError(
479                message="Stream name does not exist in catalog.",
480                input_value=stream_name,
481            )
482
483        if len(found) > 1:
484            raise exc.PyAirbyteInternalError(
485                message="Duplicate streams found with the same name.",
486                context={
487                    "found_streams": found,
488                },
489            )
490
491        return found[0].json_schema
492
493    def get_records(
494        self,
495        stream: str,
496        *,
497        limit: int | None = None,
498        stop_event: threading.Event | None = None,
499        normalize_field_names: bool = False,
500        prune_undeclared_fields: bool = True,
501    ) -> LazyDataset:
502        """Read a stream from the connector.
503
504        Args:
505            stream: The name of the stream to read.
506            limit: The maximum number of records to read. If None, all records will be read.
507            stop_event: If set, the event can be triggered by the caller to stop reading records
508                and terminate the process.
509            normalize_field_names: When `True`, field names will be normalized to lower case, with
510                special characters removed. This matches the behavior of PyAirbyte caches and most
511                Airbyte destinations.
512            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
513                which generally matches the behavior of PyAirbyte caches and most Airbyte
514                destinations, specifically when you expect the catalog may be stale. You can disable
515                this to keep all fields in the records.
516
517        This involves the following steps:
518        * Call discover to get the catalog
519        * Generate a configured catalog that syncs the given stream in full_refresh mode
520        * Write the configured catalog and the config to a temporary file
521        * execute the connector with read --config <config_file> --catalog <catalog_file>
522        * Listen to the messages and return the first AirbyteRecordMessages that come along.
523        * Make sure the subprocess is killed when the function returns.
524        """
525        stop_event = stop_event or threading.Event()
526        configured_catalog = self.get_configured_catalog(streams=[stream])
527        if len(configured_catalog.streams) == 0:
528            raise exc.PyAirbyteInputError(
529                message="Requested stream does not exist.",
530                context={
531                    "stream": stream,
532                    "available_streams": self.get_available_streams(),
533                    "connector_name": self.name,
534                },
535            ) from KeyError(stream)
536
537        configured_stream = configured_catalog.streams[0]
538
539        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
540            yield from records
541
542        stream_record_handler = StreamRecordHandler(
543            json_schema=self.get_stream_json_schema(stream),
544            prune_extra_fields=prune_undeclared_fields,
545            normalize_keys=normalize_field_names,
546        )
547
548        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
549        progress_tracker = ProgressTracker(
550            ProgressStyle.PLAIN,
551            source=self,
552            cache=None,
553            destination=None,
554            expected_streams=[stream],
555        )
556
557        iterator: Iterator[dict[str, Any]] = (
558            StreamRecord.from_record_message(
559                record_message=record.record,
560                stream_record_handler=stream_record_handler,
561            )
562            for record in self._read_with_catalog(
563                catalog=configured_catalog,
564                progress_tracker=progress_tracker,
565                stop_event=stop_event,
566            )
567            if record.record
568        )
569        if limit is not None:
570            # Stop the iterator after the limit is reached
571            iterator = islice(iterator, limit)
572
573        return LazyDataset(
574            iterator,
575            stream_metadata=configured_stream,
576            stop_event=stop_event,
577            progress_tracker=progress_tracker,
578        )
579
580    def get_documents(
581        self,
582        stream: str,
583        title_property: str | None = None,
584        content_properties: list[str] | None = None,
585        metadata_properties: list[str] | None = None,
586        *,
587        render_metadata: bool = False,
588    ) -> Iterable[Document]:
589        """Read a stream from the connector and return the records as documents.
590
591        If metadata_properties is not set, all properties that are not content will be added to
592        the metadata.
593
594        If render_metadata is True, metadata will be rendered in the document, as well as the
595        the main content.
596        """
597        return self.get_records(stream).to_documents(
598            title_property=title_property,
599            content_properties=content_properties,
600            metadata_properties=metadata_properties,
601            render_metadata=render_metadata,
602        )
603
604    def get_samples(
605        self,
606        streams: list[str] | Literal["*"] | None = None,
607        *,
608        limit: int = 5,
609        on_error: Literal["raise", "ignore", "log"] = "raise",
610    ) -> dict[str, InMemoryDataset | None]:
611        """Get a sample of records from the given streams."""
612        if streams == "*":
613            streams = self.get_available_streams()
614        elif streams is None:
615            streams = self.get_selected_streams()
616
617        results: dict[str, InMemoryDataset | None] = {}
618        for stream in streams:
619            stop_event = threading.Event()
620            try:
621                results[stream] = self.get_records(
622                    stream,
623                    limit=limit,
624                    stop_event=stop_event,
625                ).fetch_all()
626                stop_event.set()
627            except Exception as ex:
628                results[stream] = None
629                if on_error == "ignore":
630                    continue
631
632                if on_error == "raise":
633                    raise ex from None
634
635                if on_error == "log":
636                    print(f"Error fetching sample for stream '{stream}': {ex}")
637
638        return results
639
640    def print_samples(
641        self,
642        streams: list[str] | Literal["*"] | None = None,
643        *,
644        limit: int = 5,
645        on_error: Literal["raise", "ignore", "log"] = "log",
646    ) -> None:
647        """Print a sample of records from the given streams."""
648        internal_cols: list[str] = [
649            AB_EXTRACTED_AT_COLUMN,
650            AB_META_COLUMN,
651            AB_RAW_ID_COLUMN,
652        ]
653        col_limit = 10
654        if streams == "*":
655            streams = self.get_available_streams()
656        elif streams is None:
657            streams = self.get_selected_streams()
658
659        console = Console()
660
661        console.print(
662            Markdown(
663                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
664                justify="left",
665            )
666        )
667
668        for stream in streams:
669            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
670            samples = self.get_samples(
671                streams=[stream],
672                limit=limit,
673                on_error=on_error,
674            )
675            dataset = samples[stream]
676
677            table = Table(
678                show_header=True,
679                show_lines=True,
680            )
681            if dataset is None:
682                console.print(
683                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
684                )
685                continue
686
687            if len(dataset.column_names) > col_limit:
688                # We'll pivot the columns so each column is its own row
689                table.add_column("Column Name")
690                for _ in range(len(dataset)):
691                    table.add_column(overflow="fold")
692                for col in dataset.column_names:
693                    table.add_row(
694                        Markdown(f"**`{col}`**"),
695                        *[escape(str(record[col])) for record in dataset],
696                    )
697            else:
698                for col in dataset.column_names:
699                    table.add_column(
700                        Markdown(f"**`{col}`**"),
701                        overflow="fold",
702                    )
703
704                for record in dataset:
705                    table.add_row(
706                        *[
707                            escape(str(val))
708                            for key, val in record.items()
709                            # Exclude internal Airbyte columns.
710                            if key not in internal_cols
711                        ]
712                    )
713
714            console.print(table)
715
716        console.print(Markdown("--------------"))
717
718    def _get_airbyte_message_iterator(
719        self,
720        *,
721        streams: Literal["*"] | list[str] | None = None,
722        state_provider: StateProviderBase | None = None,
723        progress_tracker: ProgressTracker,
724        force_full_refresh: bool = False,
725    ) -> AirbyteMessageIterator:
726        """Get an AirbyteMessageIterator for this source."""
727        return AirbyteMessageIterator(
728            self._read_with_catalog(
729                catalog=self.get_configured_catalog(streams=streams),
730                state=state_provider if not force_full_refresh else None,
731                progress_tracker=progress_tracker,
732            )
733        )
734
735    def _read_with_catalog(
736        self,
737        catalog: ConfiguredAirbyteCatalog,
738        progress_tracker: ProgressTracker,
739        *,
740        state: StateProviderBase | None = None,
741        stop_event: threading.Event | None = None,
742    ) -> Generator[AirbyteMessage, None, None]:
743        """Call read on the connector.
744
745        This involves the following steps:
746        * Write the config to a temporary file
747        * execute the connector with read --config <config_file> --catalog <catalog_file>
748        * Listen to the messages and return the AirbyteRecordMessages that come along.
749        * Send out telemetry on the performed sync (with information about which source was used and
750          the type of the cache)
751        """
752        with as_temp_files(
753            [
754                self._hydrated_config,
755                catalog.model_dump_json(),
756                state.to_state_input_file_text() if state else "[]",
757            ]
758        ) as [
759            config_file,
760            catalog_file,
761            state_file,
762        ]:
763            message_generator = self._execute(
764                [
765                    "read",
766                    "--config",
767                    config_file,
768                    "--catalog",
769                    catalog_file,
770                    "--state",
771                    state_file,
772                ],
773                progress_tracker=progress_tracker,
774            )
775            for message in progress_tracker.tally_records_read(message_generator):
776                if stop_event and stop_event.is_set():
777                    progress_tracker._log_sync_cancel()  # noqa: SLF001
778                    time.sleep(0.1)
779                    return
780
781                yield message
782
783        progress_tracker.log_read_complete()
784
785    def _peek_airbyte_message(
786        self,
787        message: AirbyteMessage,
788        *,
789        raise_on_error: bool = True,
790    ) -> None:
791        """Process an Airbyte message.
792
793        This method handles reading Airbyte messages and taking action, if needed, based on the
794        message type. For instance, log messages are logged, records are tallied, and errors are
795        raised as exceptions if `raise_on_error` is True.
796
797        Raises:
798            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
799        """
800        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
801
802    def _log_incremental_streams(
803        self,
804        *,
805        incremental_streams: set[str] | None = None,
806    ) -> None:
807        """Log the streams which are using incremental sync mode."""
808        log_message = (
809            "The following streams are currently using incremental sync:\n"
810            f"{incremental_streams}\n"
811            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
812        )
813        print(log_message, file=sys.stderr)
814
815    def read(
816        self,
817        cache: CacheBase | None = None,
818        *,
819        streams: str | list[str] | None = None,
820        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
821        force_full_refresh: bool = False,
822        skip_validation: bool = False,
823    ) -> ReadResult:
824        """Read from the connector and write to the cache.
825
826        Args:
827            cache: The cache to write to. If not set, a default cache will be used.
828            streams: Optional if already set. A list of stream names to select for reading. If set
829                to "*", all streams will be selected.
830            write_strategy: The strategy to use when writing to the cache. If a string, it must be
831                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
832                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
833                WriteStrategy.AUTO.
834            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
835                streams will be read in incremental mode if supported by the connector. This option
836                must be True when using the "replace" strategy.
837            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
838                running the connector. This can be helpful in debugging, when you want to send
839                configurations to the connector that otherwise might be rejected by JSON Schema
840                validation rules.
841        """
842        cache = cache or get_default_cache()
843        progress_tracker = ProgressTracker(
844            source=self,
845            cache=cache,
846            destination=None,
847            expected_streams=None,  # Will be set later
848        )
849
850        # Set up state provider if not in full refresh mode
851        if force_full_refresh:
852            state_provider: StateProviderBase | None = None
853        else:
854            state_provider = cache.get_state_provider(
855                source_name=self._name,
856            )
857        state_writer = cache.get_state_writer(source_name=self._name)
858
859        if streams:
860            self.select_streams(streams)
861
862        if not self._selected_stream_names:
863            raise exc.PyAirbyteNoStreamsSelectedError(
864                connector_name=self.name,
865                available_streams=self.get_available_streams(),
866            )
867
868        try:
869            result = self._read_to_cache(
870                cache=cache,
871                catalog_provider=CatalogProvider(self.configured_catalog),
872                stream_names=self._selected_stream_names,
873                state_provider=state_provider,
874                state_writer=state_writer,
875                write_strategy=write_strategy,
876                force_full_refresh=force_full_refresh,
877                skip_validation=skip_validation,
878                progress_tracker=progress_tracker,
879            )
880        except exc.PyAirbyteInternalError as ex:
881            progress_tracker.log_failure(exception=ex)
882            raise exc.AirbyteConnectorFailedError(
883                connector_name=self.name,
884                log_text=self._last_log_messages,
885            ) from ex
886        except Exception as ex:
887            progress_tracker.log_failure(exception=ex)
888            raise
889
890        progress_tracker.log_success()
891        return result
892
893    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
894        self,
895        cache: CacheBase,
896        *,
897        catalog_provider: CatalogProvider,
898        stream_names: list[str],
899        state_provider: StateProviderBase | None,
900        state_writer: StateWriterBase | None,
901        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
902        force_full_refresh: bool = False,
903        skip_validation: bool = False,
904        progress_tracker: ProgressTracker,
905    ) -> ReadResult:
906        """Internal read method."""
907        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
908            warnings.warn(
909                message=(
910                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
911                    "could result in data loss. "
912                    "To silence this warning, use the following: "
913                    'warnings.filterwarnings("ignore", '
914                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
915                ),
916                category=exc.PyAirbyteDataLossWarning,
917                stacklevel=1,
918            )
919        if isinstance(write_strategy, str):
920            try:
921                write_strategy = WriteStrategy(write_strategy)
922            except ValueError:
923                raise exc.PyAirbyteInputError(
924                    message="Invalid strategy",
925                    context={
926                        "write_strategy": write_strategy,
927                        "available_strategies": [s.value for s in WriteStrategy],
928                    },
929                ) from None
930
931        # Run optional validation step
932        if not skip_validation:
933            self.validate_config()
934
935        # Log incremental stream if incremental streams are known
936        if state_provider and state_provider.known_stream_names:
937            # Retrieve set of the known streams support which support incremental sync
938            incremental_streams = (
939                set(self._get_incremental_stream_names())
940                & state_provider.known_stream_names
941                & set(self.get_selected_streams())
942            )
943            if incremental_streams:
944                self._log_incremental_streams(incremental_streams=incremental_streams)
945
946        airbyte_message_iterator = AirbyteMessageIterator(
947            self._read_with_catalog(
948                catalog=catalog_provider.configured_catalog,
949                state=state_provider,
950                progress_tracker=progress_tracker,
951            )
952        )
953        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
954            stdin=airbyte_message_iterator,
955            catalog_provider=catalog_provider,
956            write_strategy=write_strategy,
957            state_writer=state_writer,
958            progress_tracker=progress_tracker,
959        )
960
961        # Flush the WAL, if applicable
962        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
963
964        return ReadResult(
965            source_name=self.name,
966            progress_tracker=progress_tracker,
967            processed_streams=stream_names,
968            cache=cache,
969        )
970
971
972__all__ = [
973    "Source",
974]
class Source(airbyte._connector_base.ConnectorBase):
 68class Source(ConnectorBase):  # noqa: PLR0904
 69    """A class representing a source that can be called."""
 70
 71    connector_type = "source"
 72
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)
118
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)
128
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key
146
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
169
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )
189
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )
216
217    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
218        """Logs a warning message indicating stream selection which are not selected yet."""
219        if streams == "*":
220            print(
221                "Warning: Config is not set yet. All streams will be selected after config is set.",
222                file=sys.stderr,
223            )
224        else:
225            print(
226                "Warning: Config is not set yet. "
227                f"Streams to be selected after config is set: {streams}",
228                file=sys.stderr,
229            )
230
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()
243
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams
274
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names
281
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []
303
304    def _discover(self) -> AirbyteCatalog:
305        """Call discover on the connector.
306
307        This involves the following steps:
308        - Write the config to a temporary file
309        - execute the connector with discover --config <config_file>
310        - Listen to the messages and return the first AirbyteCatalog that comes along.
311        - Make sure the subprocess is killed when the function returns.
312        """
313        with as_temp_files([self._hydrated_config]) as [config_file]:
314            for msg in self._execute(["discover", "--config", config_file]):
315                if msg.type == Type.CATALOG and msg.catalog:
316                    return msg.catalog
317            raise exc.AirbyteConnectorMissingCatalogError(
318                connector_name=self.name,
319                log_text=self._last_log_messages,
320            )
321
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]
325
326    def _get_incremental_stream_names(self) -> list[str]:
327        """Get the name of streams that support incremental sync."""
328        return [
329            stream.name
330            for stream in self.discovered_catalog.streams
331            if SyncMode.incremental in stream.supported_sync_modes
332        ]
333
334    @override
335    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
336        """Call spec on the connector.
337
338        This involves the following steps:
339        * execute the connector with spec
340        * Listen to the messages and return the first AirbyteCatalog that comes along.
341        * Make sure the subprocess is killed when the function returns.
342        """
343        if force_refresh or self._spec is None:
344            for msg in self._execute(["spec"]):
345                if msg.type == Type.SPEC and msg.spec:
346                    self._spec = msg.spec
347                    break
348
349        if self._spec:
350            return self._spec
351
352        raise exc.AirbyteConnectorMissingSpecError(
353            connector_name=self.name,
354            log_text=self._last_log_messages,
355        )
356
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification
368
369    @property
370    def _yaml_spec(self) -> str:
371        """Get the spec as a yaml string.
372
373        For now, the primary use case is for writing and debugging a valid config for a source.
374
375        This is private for now because we probably want better polish before exposing this
376        as a stable interface. This will also get easier when we have docs links with this info
377        for each connector.
378        """
379        spec_obj: ConnectorSpecification = self._get_spec()
380        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
381        # convert to a yaml string
382        return yaml.dump(spec_dict)
383
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )
390
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog
401
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)
420
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424    ) -> ConfiguredAirbyteCatalog:
425        """Get a configured catalog for the given streams.
426
427        If no streams are provided, the selected streams will be used. If no streams are selected,
428        all available streams will be used.
429
430        If '*' is provided, all available streams will be used.
431        """
432        selected_streams: list[str] = []
433        if streams is None:
434            selected_streams = self._selected_stream_names or self.get_available_streams()
435        elif streams == "*":
436            selected_streams = self.get_available_streams()
437        elif isinstance(streams, list):
438            selected_streams = streams
439        else:
440            raise exc.PyAirbyteInputError(
441                message="Invalid streams argument.",
442                input_value=streams,
443            )
444
445        return ConfiguredAirbyteCatalog(
446            streams=[
447                ConfiguredAirbyteStream(
448                    stream=stream,
449                    destination_sync_mode=DestinationSyncMode.overwrite,
450                    sync_mode=SyncMode.incremental,
451                    primary_key=(
452                        [self._primary_key_overrides[stream.name.lower()]]
453                        if stream.name.lower() in self._primary_key_overrides
454                        else stream.source_defined_primary_key
455                    ),
456                    cursor_field=(
457                        [self._cursor_key_overrides[stream.name.lower()]]
458                        if stream.name.lower() in self._cursor_key_overrides
459                        else stream.default_cursor_field
460                    ),
461                    # These are unused in the current implementation:
462                    generation_id=None,
463                    minimum_generation_id=None,
464                    sync_id=None,
465                )
466                for stream in self.discovered_catalog.streams
467                if stream.name in selected_streams
468            ],
469        )
470
471    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
472        """Return the JSON Schema spec for the specified stream name."""
473        catalog: AirbyteCatalog = self.discovered_catalog
474        found: list[AirbyteStream] = [
475            stream for stream in catalog.streams if stream.name == stream_name
476        ]
477
478        if len(found) == 0:
479            raise exc.PyAirbyteInputError(
480                message="Stream name does not exist in catalog.",
481                input_value=stream_name,
482            )
483
484        if len(found) > 1:
485            raise exc.PyAirbyteInternalError(
486                message="Duplicate streams found with the same name.",
487                context={
488                    "found_streams": found,
489                },
490            )
491
492        return found[0].json_schema
493
494    def get_records(
495        self,
496        stream: str,
497        *,
498        limit: int | None = None,
499        stop_event: threading.Event | None = None,
500        normalize_field_names: bool = False,
501        prune_undeclared_fields: bool = True,
502    ) -> LazyDataset:
503        """Read a stream from the connector.
504
505        Args:
506            stream: The name of the stream to read.
507            limit: The maximum number of records to read. If None, all records will be read.
508            stop_event: If set, the event can be triggered by the caller to stop reading records
509                and terminate the process.
510            normalize_field_names: When `True`, field names will be normalized to lower case, with
511                special characters removed. This matches the behavior of PyAirbyte caches and most
512                Airbyte destinations.
513            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
514                which generally matches the behavior of PyAirbyte caches and most Airbyte
515                destinations, specifically when you expect the catalog may be stale. You can disable
516                this to keep all fields in the records.
517
518        This involves the following steps:
519        * Call discover to get the catalog
520        * Generate a configured catalog that syncs the given stream in full_refresh mode
521        * Write the configured catalog and the config to a temporary file
522        * execute the connector with read --config <config_file> --catalog <catalog_file>
523        * Listen to the messages and return the first AirbyteRecordMessages that come along.
524        * Make sure the subprocess is killed when the function returns.
525        """
526        stop_event = stop_event or threading.Event()
527        configured_catalog = self.get_configured_catalog(streams=[stream])
528        if len(configured_catalog.streams) == 0:
529            raise exc.PyAirbyteInputError(
530                message="Requested stream does not exist.",
531                context={
532                    "stream": stream,
533                    "available_streams": self.get_available_streams(),
534                    "connector_name": self.name,
535                },
536            ) from KeyError(stream)
537
538        configured_stream = configured_catalog.streams[0]
539
540        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
541            yield from records
542
543        stream_record_handler = StreamRecordHandler(
544            json_schema=self.get_stream_json_schema(stream),
545            prune_extra_fields=prune_undeclared_fields,
546            normalize_keys=normalize_field_names,
547        )
548
549        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
550        progress_tracker = ProgressTracker(
551            ProgressStyle.PLAIN,
552            source=self,
553            cache=None,
554            destination=None,
555            expected_streams=[stream],
556        )
557
558        iterator: Iterator[dict[str, Any]] = (
559            StreamRecord.from_record_message(
560                record_message=record.record,
561                stream_record_handler=stream_record_handler,
562            )
563            for record in self._read_with_catalog(
564                catalog=configured_catalog,
565                progress_tracker=progress_tracker,
566                stop_event=stop_event,
567            )
568            if record.record
569        )
570        if limit is not None:
571            # Stop the iterator after the limit is reached
572            iterator = islice(iterator, limit)
573
574        return LazyDataset(
575            iterator,
576            stream_metadata=configured_stream,
577            stop_event=stop_event,
578            progress_tracker=progress_tracker,
579        )
580
581    def get_documents(
582        self,
583        stream: str,
584        title_property: str | None = None,
585        content_properties: list[str] | None = None,
586        metadata_properties: list[str] | None = None,
587        *,
588        render_metadata: bool = False,
589    ) -> Iterable[Document]:
590        """Read a stream from the connector and return the records as documents.
591
592        If metadata_properties is not set, all properties that are not content will be added to
593        the metadata.
594
595        If render_metadata is True, metadata will be rendered in the document, as well as the
596        the main content.
597        """
598        return self.get_records(stream).to_documents(
599            title_property=title_property,
600            content_properties=content_properties,
601            metadata_properties=metadata_properties,
602            render_metadata=render_metadata,
603        )
604
605    def get_samples(
606        self,
607        streams: list[str] | Literal["*"] | None = None,
608        *,
609        limit: int = 5,
610        on_error: Literal["raise", "ignore", "log"] = "raise",
611    ) -> dict[str, InMemoryDataset | None]:
612        """Get a sample of records from the given streams."""
613        if streams == "*":
614            streams = self.get_available_streams()
615        elif streams is None:
616            streams = self.get_selected_streams()
617
618        results: dict[str, InMemoryDataset | None] = {}
619        for stream in streams:
620            stop_event = threading.Event()
621            try:
622                results[stream] = self.get_records(
623                    stream,
624                    limit=limit,
625                    stop_event=stop_event,
626                ).fetch_all()
627                stop_event.set()
628            except Exception as ex:
629                results[stream] = None
630                if on_error == "ignore":
631                    continue
632
633                if on_error == "raise":
634                    raise ex from None
635
636                if on_error == "log":
637                    print(f"Error fetching sample for stream '{stream}': {ex}")
638
639        return results
640
641    def print_samples(
642        self,
643        streams: list[str] | Literal["*"] | None = None,
644        *,
645        limit: int = 5,
646        on_error: Literal["raise", "ignore", "log"] = "log",
647    ) -> None:
648        """Print a sample of records from the given streams."""
649        internal_cols: list[str] = [
650            AB_EXTRACTED_AT_COLUMN,
651            AB_META_COLUMN,
652            AB_RAW_ID_COLUMN,
653        ]
654        col_limit = 10
655        if streams == "*":
656            streams = self.get_available_streams()
657        elif streams is None:
658            streams = self.get_selected_streams()
659
660        console = Console()
661
662        console.print(
663            Markdown(
664                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
665                justify="left",
666            )
667        )
668
669        for stream in streams:
670            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
671            samples = self.get_samples(
672                streams=[stream],
673                limit=limit,
674                on_error=on_error,
675            )
676            dataset = samples[stream]
677
678            table = Table(
679                show_header=True,
680                show_lines=True,
681            )
682            if dataset is None:
683                console.print(
684                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
685                )
686                continue
687
688            if len(dataset.column_names) > col_limit:
689                # We'll pivot the columns so each column is its own row
690                table.add_column("Column Name")
691                for _ in range(len(dataset)):
692                    table.add_column(overflow="fold")
693                for col in dataset.column_names:
694                    table.add_row(
695                        Markdown(f"**`{col}`**"),
696                        *[escape(str(record[col])) for record in dataset],
697                    )
698            else:
699                for col in dataset.column_names:
700                    table.add_column(
701                        Markdown(f"**`{col}`**"),
702                        overflow="fold",
703                    )
704
705                for record in dataset:
706                    table.add_row(
707                        *[
708                            escape(str(val))
709                            for key, val in record.items()
710                            # Exclude internal Airbyte columns.
711                            if key not in internal_cols
712                        ]
713                    )
714
715            console.print(table)
716
717        console.print(Markdown("--------------"))
718
719    def _get_airbyte_message_iterator(
720        self,
721        *,
722        streams: Literal["*"] | list[str] | None = None,
723        state_provider: StateProviderBase | None = None,
724        progress_tracker: ProgressTracker,
725        force_full_refresh: bool = False,
726    ) -> AirbyteMessageIterator:
727        """Get an AirbyteMessageIterator for this source."""
728        return AirbyteMessageIterator(
729            self._read_with_catalog(
730                catalog=self.get_configured_catalog(streams=streams),
731                state=state_provider if not force_full_refresh else None,
732                progress_tracker=progress_tracker,
733            )
734        )
735
736    def _read_with_catalog(
737        self,
738        catalog: ConfiguredAirbyteCatalog,
739        progress_tracker: ProgressTracker,
740        *,
741        state: StateProviderBase | None = None,
742        stop_event: threading.Event | None = None,
743    ) -> Generator[AirbyteMessage, None, None]:
744        """Call read on the connector.
745
746        This involves the following steps:
747        * Write the config to a temporary file
748        * execute the connector with read --config <config_file> --catalog <catalog_file>
749        * Listen to the messages and return the AirbyteRecordMessages that come along.
750        * Send out telemetry on the performed sync (with information about which source was used and
751          the type of the cache)
752        """
753        with as_temp_files(
754            [
755                self._hydrated_config,
756                catalog.model_dump_json(),
757                state.to_state_input_file_text() if state else "[]",
758            ]
759        ) as [
760            config_file,
761            catalog_file,
762            state_file,
763        ]:
764            message_generator = self._execute(
765                [
766                    "read",
767                    "--config",
768                    config_file,
769                    "--catalog",
770                    catalog_file,
771                    "--state",
772                    state_file,
773                ],
774                progress_tracker=progress_tracker,
775            )
776            for message in progress_tracker.tally_records_read(message_generator):
777                if stop_event and stop_event.is_set():
778                    progress_tracker._log_sync_cancel()  # noqa: SLF001
779                    time.sleep(0.1)
780                    return
781
782                yield message
783
784        progress_tracker.log_read_complete()
785
786    def _peek_airbyte_message(
787        self,
788        message: AirbyteMessage,
789        *,
790        raise_on_error: bool = True,
791    ) -> None:
792        """Process an Airbyte message.
793
794        This method handles reading Airbyte messages and taking action, if needed, based on the
795        message type. For instance, log messages are logged, records are tallied, and errors are
796        raised as exceptions if `raise_on_error` is True.
797
798        Raises:
799            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
800        """
801        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
802
803    def _log_incremental_streams(
804        self,
805        *,
806        incremental_streams: set[str] | None = None,
807    ) -> None:
808        """Log the streams which are using incremental sync mode."""
809        log_message = (
810            "The following streams are currently using incremental sync:\n"
811            f"{incremental_streams}\n"
812            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
813        )
814        print(log_message, file=sys.stderr)
815
816    def read(
817        self,
818        cache: CacheBase | None = None,
819        *,
820        streams: str | list[str] | None = None,
821        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
822        force_full_refresh: bool = False,
823        skip_validation: bool = False,
824    ) -> ReadResult:
825        """Read from the connector and write to the cache.
826
827        Args:
828            cache: The cache to write to. If not set, a default cache will be used.
829            streams: Optional if already set. A list of stream names to select for reading. If set
830                to "*", all streams will be selected.
831            write_strategy: The strategy to use when writing to the cache. If a string, it must be
832                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
833                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
834                WriteStrategy.AUTO.
835            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
836                streams will be read in incremental mode if supported by the connector. This option
837                must be True when using the "replace" strategy.
838            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
839                running the connector. This can be helpful in debugging, when you want to send
840                configurations to the connector that otherwise might be rejected by JSON Schema
841                validation rules.
842        """
843        cache = cache or get_default_cache()
844        progress_tracker = ProgressTracker(
845            source=self,
846            cache=cache,
847            destination=None,
848            expected_streams=None,  # Will be set later
849        )
850
851        # Set up state provider if not in full refresh mode
852        if force_full_refresh:
853            state_provider: StateProviderBase | None = None
854        else:
855            state_provider = cache.get_state_provider(
856                source_name=self._name,
857            )
858        state_writer = cache.get_state_writer(source_name=self._name)
859
860        if streams:
861            self.select_streams(streams)
862
863        if not self._selected_stream_names:
864            raise exc.PyAirbyteNoStreamsSelectedError(
865                connector_name=self.name,
866                available_streams=self.get_available_streams(),
867            )
868
869        try:
870            result = self._read_to_cache(
871                cache=cache,
872                catalog_provider=CatalogProvider(self.configured_catalog),
873                stream_names=self._selected_stream_names,
874                state_provider=state_provider,
875                state_writer=state_writer,
876                write_strategy=write_strategy,
877                force_full_refresh=force_full_refresh,
878                skip_validation=skip_validation,
879                progress_tracker=progress_tracker,
880            )
881        except exc.PyAirbyteInternalError as ex:
882            progress_tracker.log_failure(exception=ex)
883            raise exc.AirbyteConnectorFailedError(
884                connector_name=self.name,
885                log_text=self._last_log_messages,
886            ) from ex
887        except Exception as ex:
888            progress_tracker.log_failure(exception=ex)
889            raise
890
891        progress_tracker.log_success()
892        return result
893
894    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
895        self,
896        cache: CacheBase,
897        *,
898        catalog_provider: CatalogProvider,
899        stream_names: list[str],
900        state_provider: StateProviderBase | None,
901        state_writer: StateWriterBase | None,
902        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
903        force_full_refresh: bool = False,
904        skip_validation: bool = False,
905        progress_tracker: ProgressTracker,
906    ) -> ReadResult:
907        """Internal read method."""
908        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
909            warnings.warn(
910                message=(
911                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
912                    "could result in data loss. "
913                    "To silence this warning, use the following: "
914                    'warnings.filterwarnings("ignore", '
915                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
916                ),
917                category=exc.PyAirbyteDataLossWarning,
918                stacklevel=1,
919            )
920        if isinstance(write_strategy, str):
921            try:
922                write_strategy = WriteStrategy(write_strategy)
923            except ValueError:
924                raise exc.PyAirbyteInputError(
925                    message="Invalid strategy",
926                    context={
927                        "write_strategy": write_strategy,
928                        "available_strategies": [s.value for s in WriteStrategy],
929                    },
930                ) from None
931
932        # Run optional validation step
933        if not skip_validation:
934            self.validate_config()
935
936        # Log incremental stream if incremental streams are known
937        if state_provider and state_provider.known_stream_names:
938            # Retrieve set of the known streams support which support incremental sync
939            incremental_streams = (
940                set(self._get_incremental_stream_names())
941                & state_provider.known_stream_names
942                & set(self.get_selected_streams())
943            )
944            if incremental_streams:
945                self._log_incremental_streams(incremental_streams=incremental_streams)
946
947        airbyte_message_iterator = AirbyteMessageIterator(
948            self._read_with_catalog(
949                catalog=catalog_provider.configured_catalog,
950                state=state_provider,
951                progress_tracker=progress_tracker,
952            )
953        )
954        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
955            stdin=airbyte_message_iterator,
956            catalog_provider=catalog_provider,
957            write_strategy=write_strategy,
958            state_writer=state_writer,
959            progress_tracker=progress_tracker,
960        )
961
962        # Flush the WAL, if applicable
963        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
964
965        return ReadResult(
966            source_name=self.name,
967            progress_tracker=progress_tracker,
968            processed_streams=stream_names,
969            cache=cache,
970        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False, cursor_key_overrides: dict[str, str] | None = None, primary_key_overrides: dict[str, str | list[str]] | None = None)
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)

Deprecated. See select_streams().

def set_cursor_key(self, stream_name: str, cursor_key: str) -> None:
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key

Set the cursor for a single stream.

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_cursor_keys(self, **kwargs: str) -> None:
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})

Override the cursor key for one or more streams.

Usage:

source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_key(self, stream_name: str, primary_key: str | list[str]) -> None:
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )

Set the primary key for a single stream.

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_keys(self, **kwargs: str | list[str]) -> None:
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )

Override the primary keys for one or more streams.

This does not unset previously set primary keys.

Usage:

source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def select_all_streams(self) -> None:
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_available_streams(self) -> list[str]:
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

docs_url: str
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424    ) -> ConfiguredAirbyteCatalog:
425        """Get a configured catalog for the given streams.
426
427        If no streams are provided, the selected streams will be used. If no streams are selected,
428        all available streams will be used.
429
430        If '*' is provided, all available streams will be used.
431        """
432        selected_streams: list[str] = []
433        if streams is None:
434            selected_streams = self._selected_stream_names or self.get_available_streams()
435        elif streams == "*":
436            selected_streams = self.get_available_streams()
437        elif isinstance(streams, list):
438            selected_streams = streams
439        else:
440            raise exc.PyAirbyteInputError(
441                message="Invalid streams argument.",
442                input_value=streams,
443            )
444
445        return ConfiguredAirbyteCatalog(
446            streams=[
447                ConfiguredAirbyteStream(
448                    stream=stream,
449                    destination_sync_mode=DestinationSyncMode.overwrite,
450                    sync_mode=SyncMode.incremental,
451                    primary_key=(
452                        [self._primary_key_overrides[stream.name.lower()]]
453                        if stream.name.lower() in self._primary_key_overrides
454                        else stream.source_defined_primary_key
455                    ),
456                    cursor_field=(
457                        [self._cursor_key_overrides[stream.name.lower()]]
458                        if stream.name.lower() in self._cursor_key_overrides
459                        else stream.default_cursor_field
460                    ),
461                    # These are unused in the current implementation:
462                    generation_id=None,
463                    minimum_generation_id=None,
464                    sync_id=None,
465                )
466                for stream in self.discovered_catalog.streams
467                if stream.name in selected_streams
468            ],
469        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
471    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
472        """Return the JSON Schema spec for the specified stream name."""
473        catalog: AirbyteCatalog = self.discovered_catalog
474        found: list[AirbyteStream] = [
475            stream for stream in catalog.streams if stream.name == stream_name
476        ]
477
478        if len(found) == 0:
479            raise exc.PyAirbyteInputError(
480                message="Stream name does not exist in catalog.",
481                input_value=stream_name,
482            )
483
484        if len(found) > 1:
485            raise exc.PyAirbyteInternalError(
486                message="Duplicate streams found with the same name.",
487                context={
488                    "found_streams": found,
489                },
490            )
491
492        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, limit: int | None = None, stop_event: threading.Event | None = None, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
494    def get_records(
495        self,
496        stream: str,
497        *,
498        limit: int | None = None,
499        stop_event: threading.Event | None = None,
500        normalize_field_names: bool = False,
501        prune_undeclared_fields: bool = True,
502    ) -> LazyDataset:
503        """Read a stream from the connector.
504
505        Args:
506            stream: The name of the stream to read.
507            limit: The maximum number of records to read. If None, all records will be read.
508            stop_event: If set, the event can be triggered by the caller to stop reading records
509                and terminate the process.
510            normalize_field_names: When `True`, field names will be normalized to lower case, with
511                special characters removed. This matches the behavior of PyAirbyte caches and most
512                Airbyte destinations.
513            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
514                which generally matches the behavior of PyAirbyte caches and most Airbyte
515                destinations, specifically when you expect the catalog may be stale. You can disable
516                this to keep all fields in the records.
517
518        This involves the following steps:
519        * Call discover to get the catalog
520        * Generate a configured catalog that syncs the given stream in full_refresh mode
521        * Write the configured catalog and the config to a temporary file
522        * execute the connector with read --config <config_file> --catalog <catalog_file>
523        * Listen to the messages and return the first AirbyteRecordMessages that come along.
524        * Make sure the subprocess is killed when the function returns.
525        """
526        stop_event = stop_event or threading.Event()
527        configured_catalog = self.get_configured_catalog(streams=[stream])
528        if len(configured_catalog.streams) == 0:
529            raise exc.PyAirbyteInputError(
530                message="Requested stream does not exist.",
531                context={
532                    "stream": stream,
533                    "available_streams": self.get_available_streams(),
534                    "connector_name": self.name,
535                },
536            ) from KeyError(stream)
537
538        configured_stream = configured_catalog.streams[0]
539
540        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
541            yield from records
542
543        stream_record_handler = StreamRecordHandler(
544            json_schema=self.get_stream_json_schema(stream),
545            prune_extra_fields=prune_undeclared_fields,
546            normalize_keys=normalize_field_names,
547        )
548
549        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
550        progress_tracker = ProgressTracker(
551            ProgressStyle.PLAIN,
552            source=self,
553            cache=None,
554            destination=None,
555            expected_streams=[stream],
556        )
557
558        iterator: Iterator[dict[str, Any]] = (
559            StreamRecord.from_record_message(
560                record_message=record.record,
561                stream_record_handler=stream_record_handler,
562            )
563            for record in self._read_with_catalog(
564                catalog=configured_catalog,
565                progress_tracker=progress_tracker,
566                stop_event=stop_event,
567            )
568            if record.record
569        )
570        if limit is not None:
571            # Stop the iterator after the limit is reached
572            iterator = islice(iterator, limit)
573
574        return LazyDataset(
575            iterator,
576            stream_metadata=configured_stream,
577            stop_event=stop_event,
578            progress_tracker=progress_tracker,
579        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • limit: The maximum number of records to read. If None, all records will be read.
  • stop_event: If set, the event can be triggered by the caller to stop reading records and terminate the process.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
581    def get_documents(
582        self,
583        stream: str,
584        title_property: str | None = None,
585        content_properties: list[str] | None = None,
586        metadata_properties: list[str] | None = None,
587        *,
588        render_metadata: bool = False,
589    ) -> Iterable[Document]:
590        """Read a stream from the connector and return the records as documents.
591
592        If metadata_properties is not set, all properties that are not content will be added to
593        the metadata.
594
595        If render_metadata is True, metadata will be rendered in the document, as well as the
596        the main content.
597        """
598        return self.get_records(stream).to_documents(
599            title_property=title_property,
600            content_properties=content_properties,
601            metadata_properties=metadata_properties,
602            render_metadata=render_metadata,
603        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def get_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'raise') -> dict[str, airbyte.datasets._inmemory.InMemoryDataset | None]:
605    def get_samples(
606        self,
607        streams: list[str] | Literal["*"] | None = None,
608        *,
609        limit: int = 5,
610        on_error: Literal["raise", "ignore", "log"] = "raise",
611    ) -> dict[str, InMemoryDataset | None]:
612        """Get a sample of records from the given streams."""
613        if streams == "*":
614            streams = self.get_available_streams()
615        elif streams is None:
616            streams = self.get_selected_streams()
617
618        results: dict[str, InMemoryDataset | None] = {}
619        for stream in streams:
620            stop_event = threading.Event()
621            try:
622                results[stream] = self.get_records(
623                    stream,
624                    limit=limit,
625                    stop_event=stop_event,
626                ).fetch_all()
627                stop_event.set()
628            except Exception as ex:
629                results[stream] = None
630                if on_error == "ignore":
631                    continue
632
633                if on_error == "raise":
634                    raise ex from None
635
636                if on_error == "log":
637                    print(f"Error fetching sample for stream '{stream}': {ex}")
638
639        return results

Get a sample of records from the given streams.

def print_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'log') -> None:
641    def print_samples(
642        self,
643        streams: list[str] | Literal["*"] | None = None,
644        *,
645        limit: int = 5,
646        on_error: Literal["raise", "ignore", "log"] = "log",
647    ) -> None:
648        """Print a sample of records from the given streams."""
649        internal_cols: list[str] = [
650            AB_EXTRACTED_AT_COLUMN,
651            AB_META_COLUMN,
652            AB_RAW_ID_COLUMN,
653        ]
654        col_limit = 10
655        if streams == "*":
656            streams = self.get_available_streams()
657        elif streams is None:
658            streams = self.get_selected_streams()
659
660        console = Console()
661
662        console.print(
663            Markdown(
664                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
665                justify="left",
666            )
667        )
668
669        for stream in streams:
670            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
671            samples = self.get_samples(
672                streams=[stream],
673                limit=limit,
674                on_error=on_error,
675            )
676            dataset = samples[stream]
677
678            table = Table(
679                show_header=True,
680                show_lines=True,
681            )
682            if dataset is None:
683                console.print(
684                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
685                )
686                continue
687
688            if len(dataset.column_names) > col_limit:
689                # We'll pivot the columns so each column is its own row
690                table.add_column("Column Name")
691                for _ in range(len(dataset)):
692                    table.add_column(overflow="fold")
693                for col in dataset.column_names:
694                    table.add_row(
695                        Markdown(f"**`{col}`**"),
696                        *[escape(str(record[col])) for record in dataset],
697                    )
698            else:
699                for col in dataset.column_names:
700                    table.add_column(
701                        Markdown(f"**`{col}`**"),
702                        overflow="fold",
703                    )
704
705                for record in dataset:
706                    table.add_row(
707                        *[
708                            escape(str(val))
709                            for key, val in record.items()
710                            # Exclude internal Airbyte columns.
711                            if key not in internal_cols
712                        ]
713                    )
714
715            console.print(table)
716
717        console.print(Markdown("--------------"))

Print a sample of records from the given streams.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
816    def read(
817        self,
818        cache: CacheBase | None = None,
819        *,
820        streams: str | list[str] | None = None,
821        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
822        force_full_refresh: bool = False,
823        skip_validation: bool = False,
824    ) -> ReadResult:
825        """Read from the connector and write to the cache.
826
827        Args:
828            cache: The cache to write to. If not set, a default cache will be used.
829            streams: Optional if already set. A list of stream names to select for reading. If set
830                to "*", all streams will be selected.
831            write_strategy: The strategy to use when writing to the cache. If a string, it must be
832                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
833                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
834                WriteStrategy.AUTO.
835            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
836                streams will be read in incremental mode if supported by the connector. This option
837                must be True when using the "replace" strategy.
838            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
839                running the connector. This can be helpful in debugging, when you want to send
840                configurations to the connector that otherwise might be rejected by JSON Schema
841                validation rules.
842        """
843        cache = cache or get_default_cache()
844        progress_tracker = ProgressTracker(
845            source=self,
846            cache=cache,
847            destination=None,
848            expected_streams=None,  # Will be set later
849        )
850
851        # Set up state provider if not in full refresh mode
852        if force_full_refresh:
853            state_provider: StateProviderBase | None = None
854        else:
855            state_provider = cache.get_state_provider(
856                source_name=self._name,
857            )
858        state_writer = cache.get_state_writer(source_name=self._name)
859
860        if streams:
861            self.select_streams(streams)
862
863        if not self._selected_stream_names:
864            raise exc.PyAirbyteNoStreamsSelectedError(
865                connector_name=self.name,
866                available_streams=self.get_available_streams(),
867            )
868
869        try:
870            result = self._read_to_cache(
871                cache=cache,
872                catalog_provider=CatalogProvider(self.configured_catalog),
873                stream_names=self._selected_stream_names,
874                state_provider=state_provider,
875                state_writer=state_writer,
876                write_strategy=write_strategy,
877                force_full_refresh=force_full_refresh,
878                skip_validation=skip_validation,
879                progress_tracker=progress_tracker,
880            )
881        except exc.PyAirbyteInternalError as ex:
882            progress_tracker.log_failure(exception=ex)
883            raise exc.AirbyteConnectorFailedError(
884                connector_name=self.name,
885                log_text=self._last_log_messages,
886            ) from ex
887        except Exception as ex:
888            progress_tracker.log_failure(exception=ex)
889            raise
890
891        progress_tracker.log_success()
892        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
get_config
config_hash
validate_config
print_config_spec
connector_version
check
install
uninstall