airbyte.sources.base

Base class implementation for sources.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Base class implementation for sources."""
  3
  4from __future__ import annotations
  5
  6import json
  7import warnings
  8from pathlib import Path
  9from typing import TYPE_CHECKING, Any, Literal
 10
 11import yaml
 12from rich import print  # noqa: A004  # Allow shadowing the built-in
 13from rich.syntax import Syntax
 14
 15from airbyte_protocol.models import (
 16    AirbyteCatalog,
 17    AirbyteMessage,
 18    ConfiguredAirbyteCatalog,
 19    ConfiguredAirbyteStream,
 20    DestinationSyncMode,
 21    SyncMode,
 22    Type,
 23)
 24
 25from airbyte import exceptions as exc
 26from airbyte._connector_base import ConnectorBase
 27from airbyte._message_iterators import AirbyteMessageIterator
 28from airbyte._util.temp_files import as_temp_files
 29from airbyte.caches.util import get_default_cache
 30from airbyte.datasets._lazy import LazyDataset
 31from airbyte.progress import ProgressStyle, ProgressTracker
 32from airbyte.records import StreamRecord, StreamRecordHandler
 33from airbyte.results import ReadResult
 34from airbyte.shared.catalog_providers import CatalogProvider
 35from airbyte.strategies import WriteStrategy
 36
 37
 38if TYPE_CHECKING:
 39    from collections.abc import Generator, Iterable, Iterator
 40
 41    from airbyte_cdk import ConnectorSpecification
 42    from airbyte_protocol.models import AirbyteStream
 43
 44    from airbyte._executors.base import Executor
 45    from airbyte.caches import CacheBase
 46    from airbyte.callbacks import ConfigChangeCallback
 47    from airbyte.documents import Document
 48    from airbyte.shared.state_providers import StateProviderBase
 49    from airbyte.shared.state_writers import StateWriterBase
 50
 51
 52class Source(ConnectorBase):  # noqa: PLR0904
 53    """A class representing a source that can be called."""
 54
 55    connector_type = "source"
 56
 57    def __init__(
 58        self,
 59        executor: Executor,
 60        name: str,
 61        config: dict[str, Any] | None = None,
 62        *,
 63        config_change_callback: ConfigChangeCallback | None = None,
 64        streams: str | list[str] | None = None,
 65        validate: bool = False,
 66        cursor_key_overrides: dict[str, str] | None = None,
 67        primary_key_overrides: dict[str, str | list[str]] | None = None,
 68    ) -> None:
 69        """Initialize the source.
 70
 71        If config is provided, it will be validated against the spec if validate is True.
 72        """
 73        self._to_be_selected_streams: list[str] | str = []
 74        """Used to hold selection criteria before catalog is known."""
 75
 76        super().__init__(
 77            executor=executor,
 78            name=name,
 79            config=config,
 80            config_change_callback=config_change_callback,
 81            validate=validate,
 82        )
 83        self._config_dict: dict[str, Any] | None = None
 84        self._last_log_messages: list[str] = []
 85        self._discovered_catalog: AirbyteCatalog | None = None
 86        self._selected_stream_names: list[str] = []
 87
 88        self._cursor_key_overrides: dict[str, str] = {}
 89        """A mapping of lower-cased stream names to cursor key overrides."""
 90
 91        self._primary_key_overrides: dict[str, list[str]] = {}
 92        """A mapping of lower-cased stream names to primary key overrides."""
 93
 94        if config is not None:
 95            self.set_config(config, validate=validate)
 96        if streams is not None:
 97            self.select_streams(streams)
 98        if cursor_key_overrides is not None:
 99            self.set_cursor_keys(**cursor_key_overrides)
100        if primary_key_overrides is not None:
101            self.set_primary_keys(**primary_key_overrides)
102
103    def set_streams(self, streams: list[str]) -> None:
104        """Deprecated. See select_streams()."""
105        warnings.warn(
106            "The 'set_streams' method is deprecated and will be removed in a future version. "
107            "Please use the 'select_streams' method instead.",
108            DeprecationWarning,
109            stacklevel=2,
110        )
111        self.select_streams(streams)
112
113    def set_cursor_key(
114        self,
115        stream_name: str,
116        cursor_key: str,
117    ) -> None:
118        """Set the cursor for a single stream.
119
120        Note:
121        - This does not unset previously set cursors.
122        - The cursor key must be a single field name.
123        - Not all streams support custom cursors. If a stream does not support custom cursors,
124          the override may be ignored.
125        - Stream names are case insensitive, while field names are case sensitive.
126        - Stream names are not validated by PyAirbyte. If the stream name
127          does not exist in the catalog, the override may be ignored.
128        """
129        self._cursor_key_overrides[stream_name.lower()] = cursor_key
130
131    def set_cursor_keys(
132        self,
133        **kwargs: str,
134    ) -> None:
135        """Override the cursor key for one or more streams.
136
137        Usage:
138            source.set_cursor_keys(
139                stream1="cursor1",
140                stream2="cursor2",
141            )
142
143        Note:
144        - This does not unset previously set cursors.
145        - The cursor key must be a single field name.
146        - Not all streams support custom cursors. If a stream does not support custom cursors,
147          the override may be ignored.
148        - Stream names are case insensitive, while field names are case sensitive.
149        - Stream names are not validated by PyAirbyte. If the stream name
150          does not exist in the catalog, the override may be ignored.
151        """
152        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
153
154    def set_primary_key(
155        self,
156        stream_name: str,
157        primary_key: str | list[str],
158    ) -> None:
159        """Set the primary key for a single stream.
160
161        Note:
162        - This does not unset previously set primary keys.
163        - The primary key must be a single field name or a list of field names.
164        - Not all streams support overriding primary keys. If a stream does not support overriding
165          primary keys, the override may be ignored.
166        - Stream names are case insensitive, while field names are case sensitive.
167        - Stream names are not validated by PyAirbyte. If the stream name
168          does not exist in the catalog, the override may be ignored.
169        """
170        self._primary_key_overrides[stream_name.lower()] = (
171            primary_key if isinstance(primary_key, list) else [primary_key]
172        )
173
174    def set_primary_keys(
175        self,
176        **kwargs: str | list[str],
177    ) -> None:
178        """Override the primary keys for one or more streams.
179
180        This does not unset previously set primary keys.
181
182        Usage:
183            source.set_primary_keys(
184                stream1="pk1",
185                stream2=["pk1", "pk2"],
186            )
187
188        Note:
189        - This does not unset previously set primary keys.
190        - The primary key must be a single field name or a list of field names.
191        - Not all streams support overriding primary keys. If a stream does not support overriding
192          primary keys, the override may be ignored.
193        - Stream names are case insensitive, while field names are case sensitive.
194        - Stream names are not validated by PyAirbyte. If the stream name
195          does not exist in the catalog, the override may be ignored.
196        """
197        self._primary_key_overrides.update(
198            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
199        )
200
201    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
202        """Logs a warning message indicating stream selection which are not selected yet."""
203        if streams == "*":
204            print(
205                "Warning: Config is not set yet. All streams will be selected after config is set."
206            )
207        else:
208            print(
209                "Warning: Config is not set yet. "
210                f"Streams to be selected after config is set: {streams}"
211            )
212
213    def select_all_streams(self) -> None:
214        """Select all streams.
215
216        This is a more streamlined equivalent to:
217        > source.select_streams(source.get_available_streams()).
218        """
219        if self._config_dict is None:
220            self._to_be_selected_streams = "*"
221            self._log_warning_preselected_stream(self._to_be_selected_streams)
222            return
223
224        self._selected_stream_names = self.get_available_streams()
225
226    def select_streams(self, streams: str | list[str]) -> None:
227        """Select the stream names that should be read from the connector.
228
229        Args:
230            streams: A list of stream names to select. If set to "*", all streams will be selected.
231
232        Currently, if this is not set, all streams will be read.
233        """
234        if self._config_dict is None:
235            self._to_be_selected_streams = streams
236            self._log_warning_preselected_stream(streams)
237            return
238
239        if streams == "*":
240            self.select_all_streams()
241            return
242
243        if isinstance(streams, str):
244            # If a single stream is provided, convert it to a one-item list
245            streams = [streams]
246
247        available_streams = self.get_available_streams()
248        for stream in streams:
249            if stream not in available_streams:
250                raise exc.AirbyteStreamNotFoundError(
251                    stream_name=stream,
252                    connector_name=self.name,
253                    available_streams=available_streams,
254                )
255        self._selected_stream_names = streams
256
257    def get_selected_streams(self) -> list[str]:
258        """Get the selected streams.
259
260        If no streams are selected, return an empty list.
261        """
262        return self._selected_stream_names
263
264    def set_config(
265        self,
266        config: dict[str, Any],
267        *,
268        validate: bool = True,
269    ) -> None:
270        """Set the config for the connector.
271
272        If validate is True, raise an exception if the config fails validation.
273
274        If validate is False, validation will be deferred until check() or validate_config()
275        is called.
276        """
277        if validate:
278            self.validate_config(config)
279
280        self._config_dict = config
281
282        if self._to_be_selected_streams:
283            self.select_streams(self._to_be_selected_streams)
284            self._to_be_selected_streams = []
285
286    def get_config(self) -> dict[str, Any]:
287        """Get the config for the connector."""
288        return self._config
289
290    @property
291    def _config(self) -> dict[str, Any]:
292        if self._config_dict is None:
293            raise exc.AirbyteConnectorConfigurationMissingError(
294                connector_name=self.name,
295                guidance="Provide via get_source() or set_config()",
296            )
297        return self._config_dict
298
299    def _discover(self) -> AirbyteCatalog:
300        """Call discover on the connector.
301
302        This involves the following steps:
303        - Write the config to a temporary file
304        - execute the connector with discover --config <config_file>
305        - Listen to the messages and return the first AirbyteCatalog that comes along.
306        - Make sure the subprocess is killed when the function returns.
307        """
308        with as_temp_files([self._config]) as [config_file]:
309            for msg in self._execute(["discover", "--config", config_file]):
310                if msg.type == Type.CATALOG and msg.catalog:
311                    return msg.catalog
312            raise exc.AirbyteConnectorMissingCatalogError(
313                connector_name=self.name,
314                log_text=self._last_log_messages,
315            )
316
317    def get_available_streams(self) -> list[str]:
318        """Get the available streams from the spec."""
319        return [s.name for s in self.discovered_catalog.streams]
320
321    def _get_incremental_stream_names(self) -> list[str]:
322        """Get the name of streams that support incremental sync."""
323        return [
324            stream.name
325            for stream in self.discovered_catalog.streams
326            if SyncMode.incremental in stream.supported_sync_modes
327        ]
328
329    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
330        """Call spec on the connector.
331
332        This involves the following steps:
333        * execute the connector with spec
334        * Listen to the messages and return the first AirbyteCatalog that comes along.
335        * Make sure the subprocess is killed when the function returns.
336        """
337        if force_refresh or self._spec is None:
338            for msg in self._execute(["spec"]):
339                if msg.type == Type.SPEC and msg.spec:
340                    self._spec = msg.spec
341                    break
342
343        if self._spec:
344            return self._spec
345
346        raise exc.AirbyteConnectorMissingSpecError(
347            connector_name=self.name,
348            log_text=self._last_log_messages,
349        )
350
351    @property
352    def config_spec(self) -> dict[str, Any]:
353        """Generate a configuration spec for this connector, as a JSON Schema definition.
354
355        This function generates a JSON Schema dictionary with configuration specs for the
356        current connector, as a dictionary.
357
358        Returns:
359            dict: The JSON Schema configuration spec as a dictionary.
360        """
361        return self._get_spec(force_refresh=True).connectionSpecification
362
363    def print_config_spec(
364        self,
365        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
366        *,
367        output_file: Path | str | None = None,
368    ) -> None:
369        """Print the configuration spec for this connector.
370
371        Args:
372            format: The format to print the spec in. Must be "yaml" or "json".
373            output_file: Optional. If set, the spec will be written to the given file path.
374                Otherwise, it will be printed to the console.
375        """
376        if format not in {"yaml", "json"}:
377            raise exc.PyAirbyteInputError(
378                message="Invalid format. Expected 'yaml' or 'json'",
379                input_value=format,
380            )
381        if isinstance(output_file, str):
382            output_file = Path(output_file)
383
384        if format == "yaml":
385            content = yaml.dump(self.config_spec, indent=2)
386        elif format == "json":
387            content = json.dumps(self.config_spec, indent=2)
388
389        if output_file:
390            output_file.write_text(content)
391            return
392
393        syntax_highlighted = Syntax(content, format)
394        print(syntax_highlighted)
395
396    @property
397    def _yaml_spec(self) -> str:
398        """Get the spec as a yaml string.
399
400        For now, the primary use case is for writing and debugging a valid config for a source.
401
402        This is private for now because we probably want better polish before exposing this
403        as a stable interface. This will also get easier when we have docs links with this info
404        for each connector.
405        """
406        spec_obj: ConnectorSpecification = self._get_spec()
407        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
408        # convert to a yaml string
409        return yaml.dump(spec_dict)
410
411    @property
412    def docs_url(self) -> str:
413        """Get the URL to the connector's documentation."""
414        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
415            "source-", ""
416        )
417
418    @property
419    def discovered_catalog(self) -> AirbyteCatalog:
420        """Get the raw catalog for the given streams.
421
422        If the catalog is not yet known, we call discover to get it.
423        """
424        if self._discovered_catalog is None:
425            self._discovered_catalog = self._discover()
426
427        return self._discovered_catalog
428
429    @property
430    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
431        """Get the configured catalog for the given streams.
432
433        If the raw catalog is not yet known, we call discover to get it.
434
435        If no specific streams are selected, we return a catalog that syncs all available streams.
436
437        TODO: We should consider disabling by default the streams that the connector would
438        disable by default. (For instance, streams that require a premium license are sometimes
439        disabled by default within the connector.)
440        """
441        # Ensure discovered catalog is cached before we start
442        _ = self.discovered_catalog
443
444        # Filter for selected streams if set, otherwise use all available streams:
445        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
446        return self.get_configured_catalog(streams=streams_filter)
447
448    def get_configured_catalog(
449        self,
450        streams: Literal["*"] | list[str] | None = None,
451    ) -> ConfiguredAirbyteCatalog:
452        """Get a configured catalog for the given streams.
453
454        If no streams are provided, the selected streams will be used. If no streams are selected,
455        all available streams will be used.
456
457        If '*' is provided, all available streams will be used.
458        """
459        selected_streams: list[str] = []
460        if streams is None:
461            selected_streams = self._selected_stream_names or self.get_available_streams()
462        elif streams == "*":
463            selected_streams = self.get_available_streams()
464        elif isinstance(streams, list):
465            selected_streams = streams
466        else:
467            raise exc.PyAirbyteInputError(
468                message="Invalid streams argument.",
469                input_value=streams,
470            )
471
472        return ConfiguredAirbyteCatalog(
473            streams=[
474                ConfiguredAirbyteStream(
475                    stream=stream,
476                    destination_sync_mode=DestinationSyncMode.overwrite,
477                    sync_mode=SyncMode.incremental,
478                    primary_key=(
479                        [self._primary_key_overrides[stream.name.lower()]]
480                        if stream.name.lower() in self._primary_key_overrides
481                        else stream.source_defined_primary_key
482                    ),
483                    cursor_field=(
484                        [self._cursor_key_overrides[stream.name.lower()]]
485                        if stream.name.lower() in self._cursor_key_overrides
486                        else stream.default_cursor_field
487                    ),
488                    # These are unused in the current implementation:
489                    generation_id=None,
490                    minimum_generation_id=None,
491                    sync_id=None,
492                )
493                for stream in self.discovered_catalog.streams
494                if stream.name in selected_streams
495            ],
496        )
497
498    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
499        """Return the JSON Schema spec for the specified stream name."""
500        catalog: AirbyteCatalog = self.discovered_catalog
501        found: list[AirbyteStream] = [
502            stream for stream in catalog.streams if stream.name == stream_name
503        ]
504
505        if len(found) == 0:
506            raise exc.PyAirbyteInputError(
507                message="Stream name does not exist in catalog.",
508                input_value=stream_name,
509            )
510
511        if len(found) > 1:
512            raise exc.PyAirbyteInternalError(
513                message="Duplicate streams found with the same name.",
514                context={
515                    "found_streams": found,
516                },
517            )
518
519        return found[0].json_schema
520
521    def get_records(
522        self,
523        stream: str,
524        *,
525        normalize_field_names: bool = False,
526        prune_undeclared_fields: bool = True,
527    ) -> LazyDataset:
528        """Read a stream from the connector.
529
530        Args:
531            stream: The name of the stream to read.
532            normalize_field_names: When `True`, field names will be normalized to lower case, with
533                special characters removed. This matches the behavior of PyAirbyte caches and most
534                Airbyte destinations.
535            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
536                which generally matches the behavior of PyAirbyte caches and most Airbyte
537                destinations, specifically when you expect the catalog may be stale. You can disable
538                this to keep all fields in the records.
539
540        This involves the following steps:
541        * Call discover to get the catalog
542        * Generate a configured catalog that syncs the given stream in full_refresh mode
543        * Write the configured catalog and the config to a temporary file
544        * execute the connector with read --config <config_file> --catalog <catalog_file>
545        * Listen to the messages and return the first AirbyteRecordMessages that come along.
546        * Make sure the subprocess is killed when the function returns.
547        """
548        configured_catalog = self.get_configured_catalog(streams=[stream])
549        if len(configured_catalog.streams) == 0:
550            raise exc.PyAirbyteInputError(
551                message="Requested stream does not exist.",
552                context={
553                    "stream": stream,
554                    "available_streams": self.get_available_streams(),
555                    "connector_name": self.name,
556                },
557            ) from KeyError(stream)
558
559        configured_stream = configured_catalog.streams[0]
560
561        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
562            yield from records
563
564        stream_record_handler = StreamRecordHandler(
565            json_schema=self.get_stream_json_schema(stream),
566            prune_extra_fields=prune_undeclared_fields,
567            normalize_keys=normalize_field_names,
568        )
569
570        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
571        progress_tracker = ProgressTracker(
572            ProgressStyle.PLAIN,
573            source=self,
574            cache=None,
575            destination=None,
576            expected_streams=[stream],
577        )
578
579        iterator: Iterator[dict[str, Any]] = (
580            StreamRecord.from_record_message(
581                record_message=record.record,
582                stream_record_handler=stream_record_handler,
583            )
584            for record in self._read_with_catalog(
585                catalog=configured_catalog,
586                progress_tracker=progress_tracker,
587            )
588            if record.record
589        )
590        progress_tracker.log_success()
591        return LazyDataset(
592            iterator,
593            stream_metadata=configured_stream,
594        )
595
596    def get_documents(
597        self,
598        stream: str,
599        title_property: str | None = None,
600        content_properties: list[str] | None = None,
601        metadata_properties: list[str] | None = None,
602        *,
603        render_metadata: bool = False,
604    ) -> Iterable[Document]:
605        """Read a stream from the connector and return the records as documents.
606
607        If metadata_properties is not set, all properties that are not content will be added to
608        the metadata.
609
610        If render_metadata is True, metadata will be rendered in the document, as well as the
611        the main content.
612        """
613        return self.get_records(stream).to_documents(
614            title_property=title_property,
615            content_properties=content_properties,
616            metadata_properties=metadata_properties,
617            render_metadata=render_metadata,
618        )
619
620    def _get_airbyte_message_iterator(
621        self,
622        *,
623        streams: Literal["*"] | list[str] | None = None,
624        state_provider: StateProviderBase | None = None,
625        progress_tracker: ProgressTracker,
626        force_full_refresh: bool = False,
627    ) -> AirbyteMessageIterator:
628        """Get an AirbyteMessageIterator for this source."""
629        return AirbyteMessageIterator(
630            self._read_with_catalog(
631                catalog=self.get_configured_catalog(streams=streams),
632                state=state_provider if not force_full_refresh else None,
633                progress_tracker=progress_tracker,
634            )
635        )
636
637    def _read_with_catalog(
638        self,
639        catalog: ConfiguredAirbyteCatalog,
640        progress_tracker: ProgressTracker,
641        state: StateProviderBase | None = None,
642    ) -> Generator[AirbyteMessage, None, None]:
643        """Call read on the connector.
644
645        This involves the following steps:
646        * Write the config to a temporary file
647        * execute the connector with read --config <config_file> --catalog <catalog_file>
648        * Listen to the messages and return the AirbyteRecordMessages that come along.
649        * Send out telemetry on the performed sync (with information about which source was used and
650          the type of the cache)
651        """
652        with as_temp_files(
653            [
654                self._config,
655                catalog.model_dump_json(),
656                state.to_state_input_file_text() if state else "[]",
657            ]
658        ) as [
659            config_file,
660            catalog_file,
661            state_file,
662        ]:
663            message_generator = self._execute(
664                [
665                    "read",
666                    "--config",
667                    config_file,
668                    "--catalog",
669                    catalog_file,
670                    "--state",
671                    state_file,
672                ],
673                progress_tracker=progress_tracker,
674            )
675            yield from progress_tracker.tally_records_read(message_generator)
676        progress_tracker.log_read_complete()
677
678    def _peek_airbyte_message(
679        self,
680        message: AirbyteMessage,
681        *,
682        raise_on_error: bool = True,
683    ) -> None:
684        """Process an Airbyte message.
685
686        This method handles reading Airbyte messages and taking action, if needed, based on the
687        message type. For instance, log messages are logged, records are tallied, and errors are
688        raised as exceptions if `raise_on_error` is True.
689
690        Raises:
691            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
692        """
693        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
694
695    def _log_incremental_streams(
696        self,
697        *,
698        incremental_streams: set[str] | None = None,
699    ) -> None:
700        """Log the streams which are using incremental sync mode."""
701        log_message = (
702            "The following streams are currently using incremental sync:\n"
703            f"{incremental_streams}\n"
704            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
705        )
706        print(log_message)
707
708    def read(
709        self,
710        cache: CacheBase | None = None,
711        *,
712        streams: str | list[str] | None = None,
713        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
714        force_full_refresh: bool = False,
715        skip_validation: bool = False,
716    ) -> ReadResult:
717        """Read from the connector and write to the cache.
718
719        Args:
720            cache: The cache to write to. If not set, a default cache will be used.
721            streams: Optional if already set. A list of stream names to select for reading. If set
722                to "*", all streams will be selected.
723            write_strategy: The strategy to use when writing to the cache. If a string, it must be
724                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
725                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
726                WriteStrategy.AUTO.
727            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
728                streams will be read in incremental mode if supported by the connector. This option
729                must be True when using the "replace" strategy.
730            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
731                running the connector. This can be helpful in debugging, when you want to send
732                configurations to the connector that otherwise might be rejected by JSON Schema
733                validation rules.
734        """
735        cache = cache or get_default_cache()
736        progress_tracker = ProgressTracker(
737            source=self,
738            cache=cache,
739            destination=None,
740            expected_streams=None,  # Will be set later
741        )
742
743        # Set up state provider if not in full refresh mode
744        if force_full_refresh:
745            state_provider: StateProviderBase | None = None
746        else:
747            state_provider = cache.get_state_provider(
748                source_name=self._name,
749            )
750        state_writer = cache.get_state_writer(source_name=self._name)
751
752        if streams:
753            self.select_streams(streams)
754
755        if not self._selected_stream_names:
756            raise exc.PyAirbyteNoStreamsSelectedError(
757                connector_name=self.name,
758                available_streams=self.get_available_streams(),
759            )
760
761        try:
762            result = self._read_to_cache(
763                cache=cache,
764                catalog_provider=CatalogProvider(self.configured_catalog),
765                stream_names=self._selected_stream_names,
766                state_provider=state_provider,
767                state_writer=state_writer,
768                write_strategy=write_strategy,
769                force_full_refresh=force_full_refresh,
770                skip_validation=skip_validation,
771                progress_tracker=progress_tracker,
772            )
773        except exc.PyAirbyteInternalError as ex:
774            progress_tracker.log_failure(exception=ex)
775            raise exc.AirbyteConnectorFailedError(
776                connector_name=self.name,
777                log_text=self._last_log_messages,
778            ) from ex
779        except Exception as ex:
780            progress_tracker.log_failure(exception=ex)
781            raise
782
783        progress_tracker.log_success()
784        return result
785
786    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
787        self,
788        cache: CacheBase,
789        *,
790        catalog_provider: CatalogProvider,
791        stream_names: list[str],
792        state_provider: StateProviderBase | None,
793        state_writer: StateWriterBase | None,
794        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
795        force_full_refresh: bool = False,
796        skip_validation: bool = False,
797        progress_tracker: ProgressTracker,
798    ) -> ReadResult:
799        """Internal read method."""
800        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
801            warnings.warn(
802                message=(
803                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
804                    "could result in data loss. "
805                    "To silence this warning, use the following: "
806                    'warnings.filterwarnings("ignore", '
807                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
808                ),
809                category=exc.PyAirbyteDataLossWarning,
810                stacklevel=1,
811            )
812        if isinstance(write_strategy, str):
813            try:
814                write_strategy = WriteStrategy(write_strategy)
815            except ValueError:
816                raise exc.PyAirbyteInputError(
817                    message="Invalid strategy",
818                    context={
819                        "write_strategy": write_strategy,
820                        "available_strategies": [s.value for s in WriteStrategy],
821                    },
822                ) from None
823
824        # Run optional validation step
825        if not skip_validation:
826            self.validate_config()
827
828        # Log incremental stream if incremental streams are known
829        if state_provider and state_provider.known_stream_names:
830            # Retrieve set of the known streams support which support incremental sync
831            incremental_streams = (
832                set(self._get_incremental_stream_names())
833                & state_provider.known_stream_names
834                & set(self.get_selected_streams())
835            )
836            if incremental_streams:
837                self._log_incremental_streams(incremental_streams=incremental_streams)
838
839        airbyte_message_iterator = AirbyteMessageIterator(
840            self._read_with_catalog(
841                catalog=catalog_provider.configured_catalog,
842                state=state_provider,
843                progress_tracker=progress_tracker,
844            )
845        )
846        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
847            stdin=airbyte_message_iterator,
848            catalog_provider=catalog_provider,
849            write_strategy=write_strategy,
850            state_writer=state_writer,
851            progress_tracker=progress_tracker,
852        )
853
854        # Flush the WAL, if applicable
855        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
856
857        return ReadResult(
858            source_name=self.name,
859            progress_tracker=progress_tracker,
860            processed_streams=stream_names,
861            cache=cache,
862        )
863
864
865__all__ = [
866    "Source",
867]
class Source(airbyte._connector_base.ConnectorBase):
 53class Source(ConnectorBase):  # noqa: PLR0904
 54    """A class representing a source that can be called."""
 55
 56    connector_type = "source"
 57
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67        cursor_key_overrides: dict[str, str] | None = None,
 68        primary_key_overrides: dict[str, str | list[str]] | None = None,
 69    ) -> None:
 70        """Initialize the source.
 71
 72        If config is provided, it will be validated against the spec if validate is True.
 73        """
 74        self._to_be_selected_streams: list[str] | str = []
 75        """Used to hold selection criteria before catalog is known."""
 76
 77        super().__init__(
 78            executor=executor,
 79            name=name,
 80            config=config,
 81            config_change_callback=config_change_callback,
 82            validate=validate,
 83        )
 84        self._config_dict: dict[str, Any] | None = None
 85        self._last_log_messages: list[str] = []
 86        self._discovered_catalog: AirbyteCatalog | None = None
 87        self._selected_stream_names: list[str] = []
 88
 89        self._cursor_key_overrides: dict[str, str] = {}
 90        """A mapping of lower-cased stream names to cursor key overrides."""
 91
 92        self._primary_key_overrides: dict[str, list[str]] = {}
 93        """A mapping of lower-cased stream names to primary key overrides."""
 94
 95        if config is not None:
 96            self.set_config(config, validate=validate)
 97        if streams is not None:
 98            self.select_streams(streams)
 99        if cursor_key_overrides is not None:
100            self.set_cursor_keys(**cursor_key_overrides)
101        if primary_key_overrides is not None:
102            self.set_primary_keys(**primary_key_overrides)
103
104    def set_streams(self, streams: list[str]) -> None:
105        """Deprecated. See select_streams()."""
106        warnings.warn(
107            "The 'set_streams' method is deprecated and will be removed in a future version. "
108            "Please use the 'select_streams' method instead.",
109            DeprecationWarning,
110            stacklevel=2,
111        )
112        self.select_streams(streams)
113
114    def set_cursor_key(
115        self,
116        stream_name: str,
117        cursor_key: str,
118    ) -> None:
119        """Set the cursor for a single stream.
120
121        Note:
122        - This does not unset previously set cursors.
123        - The cursor key must be a single field name.
124        - Not all streams support custom cursors. If a stream does not support custom cursors,
125          the override may be ignored.
126        - Stream names are case insensitive, while field names are case sensitive.
127        - Stream names are not validated by PyAirbyte. If the stream name
128          does not exist in the catalog, the override may be ignored.
129        """
130        self._cursor_key_overrides[stream_name.lower()] = cursor_key
131
132    def set_cursor_keys(
133        self,
134        **kwargs: str,
135    ) -> None:
136        """Override the cursor key for one or more streams.
137
138        Usage:
139            source.set_cursor_keys(
140                stream1="cursor1",
141                stream2="cursor2",
142            )
143
144        Note:
145        - This does not unset previously set cursors.
146        - The cursor key must be a single field name.
147        - Not all streams support custom cursors. If a stream does not support custom cursors,
148          the override may be ignored.
149        - Stream names are case insensitive, while field names are case sensitive.
150        - Stream names are not validated by PyAirbyte. If the stream name
151          does not exist in the catalog, the override may be ignored.
152        """
153        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
154
155    def set_primary_key(
156        self,
157        stream_name: str,
158        primary_key: str | list[str],
159    ) -> None:
160        """Set the primary key for a single stream.
161
162        Note:
163        - This does not unset previously set primary keys.
164        - The primary key must be a single field name or a list of field names.
165        - Not all streams support overriding primary keys. If a stream does not support overriding
166          primary keys, the override may be ignored.
167        - Stream names are case insensitive, while field names are case sensitive.
168        - Stream names are not validated by PyAirbyte. If the stream name
169          does not exist in the catalog, the override may be ignored.
170        """
171        self._primary_key_overrides[stream_name.lower()] = (
172            primary_key if isinstance(primary_key, list) else [primary_key]
173        )
174
175    def set_primary_keys(
176        self,
177        **kwargs: str | list[str],
178    ) -> None:
179        """Override the primary keys for one or more streams.
180
181        This does not unset previously set primary keys.
182
183        Usage:
184            source.set_primary_keys(
185                stream1="pk1",
186                stream2=["pk1", "pk2"],
187            )
188
189        Note:
190        - This does not unset previously set primary keys.
191        - The primary key must be a single field name or a list of field names.
192        - Not all streams support overriding primary keys. If a stream does not support overriding
193          primary keys, the override may be ignored.
194        - Stream names are case insensitive, while field names are case sensitive.
195        - Stream names are not validated by PyAirbyte. If the stream name
196          does not exist in the catalog, the override may be ignored.
197        """
198        self._primary_key_overrides.update(
199            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
200        )
201
202    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
203        """Logs a warning message indicating stream selection which are not selected yet."""
204        if streams == "*":
205            print(
206                "Warning: Config is not set yet. All streams will be selected after config is set."
207            )
208        else:
209            print(
210                "Warning: Config is not set yet. "
211                f"Streams to be selected after config is set: {streams}"
212            )
213
214    def select_all_streams(self) -> None:
215        """Select all streams.
216
217        This is a more streamlined equivalent to:
218        > source.select_streams(source.get_available_streams()).
219        """
220        if self._config_dict is None:
221            self._to_be_selected_streams = "*"
222            self._log_warning_preselected_stream(self._to_be_selected_streams)
223            return
224
225        self._selected_stream_names = self.get_available_streams()
226
227    def select_streams(self, streams: str | list[str]) -> None:
228        """Select the stream names that should be read from the connector.
229
230        Args:
231            streams: A list of stream names to select. If set to "*", all streams will be selected.
232
233        Currently, if this is not set, all streams will be read.
234        """
235        if self._config_dict is None:
236            self._to_be_selected_streams = streams
237            self._log_warning_preselected_stream(streams)
238            return
239
240        if streams == "*":
241            self.select_all_streams()
242            return
243
244        if isinstance(streams, str):
245            # If a single stream is provided, convert it to a one-item list
246            streams = [streams]
247
248        available_streams = self.get_available_streams()
249        for stream in streams:
250            if stream not in available_streams:
251                raise exc.AirbyteStreamNotFoundError(
252                    stream_name=stream,
253                    connector_name=self.name,
254                    available_streams=available_streams,
255                )
256        self._selected_stream_names = streams
257
258    def get_selected_streams(self) -> list[str]:
259        """Get the selected streams.
260
261        If no streams are selected, return an empty list.
262        """
263        return self._selected_stream_names
264
265    def set_config(
266        self,
267        config: dict[str, Any],
268        *,
269        validate: bool = True,
270    ) -> None:
271        """Set the config for the connector.
272
273        If validate is True, raise an exception if the config fails validation.
274
275        If validate is False, validation will be deferred until check() or validate_config()
276        is called.
277        """
278        if validate:
279            self.validate_config(config)
280
281        self._config_dict = config
282
283        if self._to_be_selected_streams:
284            self.select_streams(self._to_be_selected_streams)
285            self._to_be_selected_streams = []
286
287    def get_config(self) -> dict[str, Any]:
288        """Get the config for the connector."""
289        return self._config
290
291    @property
292    def _config(self) -> dict[str, Any]:
293        if self._config_dict is None:
294            raise exc.AirbyteConnectorConfigurationMissingError(
295                connector_name=self.name,
296                guidance="Provide via get_source() or set_config()",
297            )
298        return self._config_dict
299
300    def _discover(self) -> AirbyteCatalog:
301        """Call discover on the connector.
302
303        This involves the following steps:
304        - Write the config to a temporary file
305        - execute the connector with discover --config <config_file>
306        - Listen to the messages and return the first AirbyteCatalog that comes along.
307        - Make sure the subprocess is killed when the function returns.
308        """
309        with as_temp_files([self._config]) as [config_file]:
310            for msg in self._execute(["discover", "--config", config_file]):
311                if msg.type == Type.CATALOG and msg.catalog:
312                    return msg.catalog
313            raise exc.AirbyteConnectorMissingCatalogError(
314                connector_name=self.name,
315                log_text=self._last_log_messages,
316            )
317
318    def get_available_streams(self) -> list[str]:
319        """Get the available streams from the spec."""
320        return [s.name for s in self.discovered_catalog.streams]
321
322    def _get_incremental_stream_names(self) -> list[str]:
323        """Get the name of streams that support incremental sync."""
324        return [
325            stream.name
326            for stream in self.discovered_catalog.streams
327            if SyncMode.incremental in stream.supported_sync_modes
328        ]
329
330    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
331        """Call spec on the connector.
332
333        This involves the following steps:
334        * execute the connector with spec
335        * Listen to the messages and return the first AirbyteCatalog that comes along.
336        * Make sure the subprocess is killed when the function returns.
337        """
338        if force_refresh or self._spec is None:
339            for msg in self._execute(["spec"]):
340                if msg.type == Type.SPEC and msg.spec:
341                    self._spec = msg.spec
342                    break
343
344        if self._spec:
345            return self._spec
346
347        raise exc.AirbyteConnectorMissingSpecError(
348            connector_name=self.name,
349            log_text=self._last_log_messages,
350        )
351
352    @property
353    def config_spec(self) -> dict[str, Any]:
354        """Generate a configuration spec for this connector, as a JSON Schema definition.
355
356        This function generates a JSON Schema dictionary with configuration specs for the
357        current connector, as a dictionary.
358
359        Returns:
360            dict: The JSON Schema configuration spec as a dictionary.
361        """
362        return self._get_spec(force_refresh=True).connectionSpecification
363
364    def print_config_spec(
365        self,
366        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
367        *,
368        output_file: Path | str | None = None,
369    ) -> None:
370        """Print the configuration spec for this connector.
371
372        Args:
373            format: The format to print the spec in. Must be "yaml" or "json".
374            output_file: Optional. If set, the spec will be written to the given file path.
375                Otherwise, it will be printed to the console.
376        """
377        if format not in {"yaml", "json"}:
378            raise exc.PyAirbyteInputError(
379                message="Invalid format. Expected 'yaml' or 'json'",
380                input_value=format,
381            )
382        if isinstance(output_file, str):
383            output_file = Path(output_file)
384
385        if format == "yaml":
386            content = yaml.dump(self.config_spec, indent=2)
387        elif format == "json":
388            content = json.dumps(self.config_spec, indent=2)
389
390        if output_file:
391            output_file.write_text(content)
392            return
393
394        syntax_highlighted = Syntax(content, format)
395        print(syntax_highlighted)
396
397    @property
398    def _yaml_spec(self) -> str:
399        """Get the spec as a yaml string.
400
401        For now, the primary use case is for writing and debugging a valid config for a source.
402
403        This is private for now because we probably want better polish before exposing this
404        as a stable interface. This will also get easier when we have docs links with this info
405        for each connector.
406        """
407        spec_obj: ConnectorSpecification = self._get_spec()
408        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
409        # convert to a yaml string
410        return yaml.dump(spec_dict)
411
412    @property
413    def docs_url(self) -> str:
414        """Get the URL to the connector's documentation."""
415        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
416            "source-", ""
417        )
418
419    @property
420    def discovered_catalog(self) -> AirbyteCatalog:
421        """Get the raw catalog for the given streams.
422
423        If the catalog is not yet known, we call discover to get it.
424        """
425        if self._discovered_catalog is None:
426            self._discovered_catalog = self._discover()
427
428        return self._discovered_catalog
429
430    @property
431    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
432        """Get the configured catalog for the given streams.
433
434        If the raw catalog is not yet known, we call discover to get it.
435
436        If no specific streams are selected, we return a catalog that syncs all available streams.
437
438        TODO: We should consider disabling by default the streams that the connector would
439        disable by default. (For instance, streams that require a premium license are sometimes
440        disabled by default within the connector.)
441        """
442        # Ensure discovered catalog is cached before we start
443        _ = self.discovered_catalog
444
445        # Filter for selected streams if set, otherwise use all available streams:
446        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
447        return self.get_configured_catalog(streams=streams_filter)
448
449    def get_configured_catalog(
450        self,
451        streams: Literal["*"] | list[str] | None = None,
452    ) -> ConfiguredAirbyteCatalog:
453        """Get a configured catalog for the given streams.
454
455        If no streams are provided, the selected streams will be used. If no streams are selected,
456        all available streams will be used.
457
458        If '*' is provided, all available streams will be used.
459        """
460        selected_streams: list[str] = []
461        if streams is None:
462            selected_streams = self._selected_stream_names or self.get_available_streams()
463        elif streams == "*":
464            selected_streams = self.get_available_streams()
465        elif isinstance(streams, list):
466            selected_streams = streams
467        else:
468            raise exc.PyAirbyteInputError(
469                message="Invalid streams argument.",
470                input_value=streams,
471            )
472
473        return ConfiguredAirbyteCatalog(
474            streams=[
475                ConfiguredAirbyteStream(
476                    stream=stream,
477                    destination_sync_mode=DestinationSyncMode.overwrite,
478                    sync_mode=SyncMode.incremental,
479                    primary_key=(
480                        [self._primary_key_overrides[stream.name.lower()]]
481                        if stream.name.lower() in self._primary_key_overrides
482                        else stream.source_defined_primary_key
483                    ),
484                    cursor_field=(
485                        [self._cursor_key_overrides[stream.name.lower()]]
486                        if stream.name.lower() in self._cursor_key_overrides
487                        else stream.default_cursor_field
488                    ),
489                    # These are unused in the current implementation:
490                    generation_id=None,
491                    minimum_generation_id=None,
492                    sync_id=None,
493                )
494                for stream in self.discovered_catalog.streams
495                if stream.name in selected_streams
496            ],
497        )
498
499    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
500        """Return the JSON Schema spec for the specified stream name."""
501        catalog: AirbyteCatalog = self.discovered_catalog
502        found: list[AirbyteStream] = [
503            stream for stream in catalog.streams if stream.name == stream_name
504        ]
505
506        if len(found) == 0:
507            raise exc.PyAirbyteInputError(
508                message="Stream name does not exist in catalog.",
509                input_value=stream_name,
510            )
511
512        if len(found) > 1:
513            raise exc.PyAirbyteInternalError(
514                message="Duplicate streams found with the same name.",
515                context={
516                    "found_streams": found,
517                },
518            )
519
520        return found[0].json_schema
521
522    def get_records(
523        self,
524        stream: str,
525        *,
526        normalize_field_names: bool = False,
527        prune_undeclared_fields: bool = True,
528    ) -> LazyDataset:
529        """Read a stream from the connector.
530
531        Args:
532            stream: The name of the stream to read.
533            normalize_field_names: When `True`, field names will be normalized to lower case, with
534                special characters removed. This matches the behavior of PyAirbyte caches and most
535                Airbyte destinations.
536            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
537                which generally matches the behavior of PyAirbyte caches and most Airbyte
538                destinations, specifically when you expect the catalog may be stale. You can disable
539                this to keep all fields in the records.
540
541        This involves the following steps:
542        * Call discover to get the catalog
543        * Generate a configured catalog that syncs the given stream in full_refresh mode
544        * Write the configured catalog and the config to a temporary file
545        * execute the connector with read --config <config_file> --catalog <catalog_file>
546        * Listen to the messages and return the first AirbyteRecordMessages that come along.
547        * Make sure the subprocess is killed when the function returns.
548        """
549        configured_catalog = self.get_configured_catalog(streams=[stream])
550        if len(configured_catalog.streams) == 0:
551            raise exc.PyAirbyteInputError(
552                message="Requested stream does not exist.",
553                context={
554                    "stream": stream,
555                    "available_streams": self.get_available_streams(),
556                    "connector_name": self.name,
557                },
558            ) from KeyError(stream)
559
560        configured_stream = configured_catalog.streams[0]
561
562        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
563            yield from records
564
565        stream_record_handler = StreamRecordHandler(
566            json_schema=self.get_stream_json_schema(stream),
567            prune_extra_fields=prune_undeclared_fields,
568            normalize_keys=normalize_field_names,
569        )
570
571        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
572        progress_tracker = ProgressTracker(
573            ProgressStyle.PLAIN,
574            source=self,
575            cache=None,
576            destination=None,
577            expected_streams=[stream],
578        )
579
580        iterator: Iterator[dict[str, Any]] = (
581            StreamRecord.from_record_message(
582                record_message=record.record,
583                stream_record_handler=stream_record_handler,
584            )
585            for record in self._read_with_catalog(
586                catalog=configured_catalog,
587                progress_tracker=progress_tracker,
588            )
589            if record.record
590        )
591        progress_tracker.log_success()
592        return LazyDataset(
593            iterator,
594            stream_metadata=configured_stream,
595        )
596
597    def get_documents(
598        self,
599        stream: str,
600        title_property: str | None = None,
601        content_properties: list[str] | None = None,
602        metadata_properties: list[str] | None = None,
603        *,
604        render_metadata: bool = False,
605    ) -> Iterable[Document]:
606        """Read a stream from the connector and return the records as documents.
607
608        If metadata_properties is not set, all properties that are not content will be added to
609        the metadata.
610
611        If render_metadata is True, metadata will be rendered in the document, as well as the
612        the main content.
613        """
614        return self.get_records(stream).to_documents(
615            title_property=title_property,
616            content_properties=content_properties,
617            metadata_properties=metadata_properties,
618            render_metadata=render_metadata,
619        )
620
621    def _get_airbyte_message_iterator(
622        self,
623        *,
624        streams: Literal["*"] | list[str] | None = None,
625        state_provider: StateProviderBase | None = None,
626        progress_tracker: ProgressTracker,
627        force_full_refresh: bool = False,
628    ) -> AirbyteMessageIterator:
629        """Get an AirbyteMessageIterator for this source."""
630        return AirbyteMessageIterator(
631            self._read_with_catalog(
632                catalog=self.get_configured_catalog(streams=streams),
633                state=state_provider if not force_full_refresh else None,
634                progress_tracker=progress_tracker,
635            )
636        )
637
638    def _read_with_catalog(
639        self,
640        catalog: ConfiguredAirbyteCatalog,
641        progress_tracker: ProgressTracker,
642        state: StateProviderBase | None = None,
643    ) -> Generator[AirbyteMessage, None, None]:
644        """Call read on the connector.
645
646        This involves the following steps:
647        * Write the config to a temporary file
648        * execute the connector with read --config <config_file> --catalog <catalog_file>
649        * Listen to the messages and return the AirbyteRecordMessages that come along.
650        * Send out telemetry on the performed sync (with information about which source was used and
651          the type of the cache)
652        """
653        with as_temp_files(
654            [
655                self._config,
656                catalog.model_dump_json(),
657                state.to_state_input_file_text() if state else "[]",
658            ]
659        ) as [
660            config_file,
661            catalog_file,
662            state_file,
663        ]:
664            message_generator = self._execute(
665                [
666                    "read",
667                    "--config",
668                    config_file,
669                    "--catalog",
670                    catalog_file,
671                    "--state",
672                    state_file,
673                ],
674                progress_tracker=progress_tracker,
675            )
676            yield from progress_tracker.tally_records_read(message_generator)
677        progress_tracker.log_read_complete()
678
679    def _peek_airbyte_message(
680        self,
681        message: AirbyteMessage,
682        *,
683        raise_on_error: bool = True,
684    ) -> None:
685        """Process an Airbyte message.
686
687        This method handles reading Airbyte messages and taking action, if needed, based on the
688        message type. For instance, log messages are logged, records are tallied, and errors are
689        raised as exceptions if `raise_on_error` is True.
690
691        Raises:
692            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
693        """
694        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
695
696    def _log_incremental_streams(
697        self,
698        *,
699        incremental_streams: set[str] | None = None,
700    ) -> None:
701        """Log the streams which are using incremental sync mode."""
702        log_message = (
703            "The following streams are currently using incremental sync:\n"
704            f"{incremental_streams}\n"
705            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
706        )
707        print(log_message)
708
709    def read(
710        self,
711        cache: CacheBase | None = None,
712        *,
713        streams: str | list[str] | None = None,
714        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
715        force_full_refresh: bool = False,
716        skip_validation: bool = False,
717    ) -> ReadResult:
718        """Read from the connector and write to the cache.
719
720        Args:
721            cache: The cache to write to. If not set, a default cache will be used.
722            streams: Optional if already set. A list of stream names to select for reading. If set
723                to "*", all streams will be selected.
724            write_strategy: The strategy to use when writing to the cache. If a string, it must be
725                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
726                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
727                WriteStrategy.AUTO.
728            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
729                streams will be read in incremental mode if supported by the connector. This option
730                must be True when using the "replace" strategy.
731            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
732                running the connector. This can be helpful in debugging, when you want to send
733                configurations to the connector that otherwise might be rejected by JSON Schema
734                validation rules.
735        """
736        cache = cache or get_default_cache()
737        progress_tracker = ProgressTracker(
738            source=self,
739            cache=cache,
740            destination=None,
741            expected_streams=None,  # Will be set later
742        )
743
744        # Set up state provider if not in full refresh mode
745        if force_full_refresh:
746            state_provider: StateProviderBase | None = None
747        else:
748            state_provider = cache.get_state_provider(
749                source_name=self._name,
750            )
751        state_writer = cache.get_state_writer(source_name=self._name)
752
753        if streams:
754            self.select_streams(streams)
755
756        if not self._selected_stream_names:
757            raise exc.PyAirbyteNoStreamsSelectedError(
758                connector_name=self.name,
759                available_streams=self.get_available_streams(),
760            )
761
762        try:
763            result = self._read_to_cache(
764                cache=cache,
765                catalog_provider=CatalogProvider(self.configured_catalog),
766                stream_names=self._selected_stream_names,
767                state_provider=state_provider,
768                state_writer=state_writer,
769                write_strategy=write_strategy,
770                force_full_refresh=force_full_refresh,
771                skip_validation=skip_validation,
772                progress_tracker=progress_tracker,
773            )
774        except exc.PyAirbyteInternalError as ex:
775            progress_tracker.log_failure(exception=ex)
776            raise exc.AirbyteConnectorFailedError(
777                connector_name=self.name,
778                log_text=self._last_log_messages,
779            ) from ex
780        except Exception as ex:
781            progress_tracker.log_failure(exception=ex)
782            raise
783
784        progress_tracker.log_success()
785        return result
786
787    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
788        self,
789        cache: CacheBase,
790        *,
791        catalog_provider: CatalogProvider,
792        stream_names: list[str],
793        state_provider: StateProviderBase | None,
794        state_writer: StateWriterBase | None,
795        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
796        force_full_refresh: bool = False,
797        skip_validation: bool = False,
798        progress_tracker: ProgressTracker,
799    ) -> ReadResult:
800        """Internal read method."""
801        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
802            warnings.warn(
803                message=(
804                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
805                    "could result in data loss. "
806                    "To silence this warning, use the following: "
807                    'warnings.filterwarnings("ignore", '
808                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
809                ),
810                category=exc.PyAirbyteDataLossWarning,
811                stacklevel=1,
812            )
813        if isinstance(write_strategy, str):
814            try:
815                write_strategy = WriteStrategy(write_strategy)
816            except ValueError:
817                raise exc.PyAirbyteInputError(
818                    message="Invalid strategy",
819                    context={
820                        "write_strategy": write_strategy,
821                        "available_strategies": [s.value for s in WriteStrategy],
822                    },
823                ) from None
824
825        # Run optional validation step
826        if not skip_validation:
827            self.validate_config()
828
829        # Log incremental stream if incremental streams are known
830        if state_provider and state_provider.known_stream_names:
831            # Retrieve set of the known streams support which support incremental sync
832            incremental_streams = (
833                set(self._get_incremental_stream_names())
834                & state_provider.known_stream_names
835                & set(self.get_selected_streams())
836            )
837            if incremental_streams:
838                self._log_incremental_streams(incremental_streams=incremental_streams)
839
840        airbyte_message_iterator = AirbyteMessageIterator(
841            self._read_with_catalog(
842                catalog=catalog_provider.configured_catalog,
843                state=state_provider,
844                progress_tracker=progress_tracker,
845            )
846        )
847        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
848            stdin=airbyte_message_iterator,
849            catalog_provider=catalog_provider,
850            write_strategy=write_strategy,
851            state_writer=state_writer,
852            progress_tracker=progress_tracker,
853        )
854
855        # Flush the WAL, if applicable
856        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
857
858        return ReadResult(
859            source_name=self.name,
860            progress_tracker=progress_tracker,
861            processed_streams=stream_names,
862            cache=cache,
863        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False, cursor_key_overrides: dict[str, str] | None = None, primary_key_overrides: dict[str, str | list[str]] | None = None)
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67        cursor_key_overrides: dict[str, str] | None = None,
 68        primary_key_overrides: dict[str, str | list[str]] | None = None,
 69    ) -> None:
 70        """Initialize the source.
 71
 72        If config is provided, it will be validated against the spec if validate is True.
 73        """
 74        self._to_be_selected_streams: list[str] | str = []
 75        """Used to hold selection criteria before catalog is known."""
 76
 77        super().__init__(
 78            executor=executor,
 79            name=name,
 80            config=config,
 81            config_change_callback=config_change_callback,
 82            validate=validate,
 83        )
 84        self._config_dict: dict[str, Any] | None = None
 85        self._last_log_messages: list[str] = []
 86        self._discovered_catalog: AirbyteCatalog | None = None
 87        self._selected_stream_names: list[str] = []
 88
 89        self._cursor_key_overrides: dict[str, str] = {}
 90        """A mapping of lower-cased stream names to cursor key overrides."""
 91
 92        self._primary_key_overrides: dict[str, list[str]] = {}
 93        """A mapping of lower-cased stream names to primary key overrides."""
 94
 95        if config is not None:
 96            self.set_config(config, validate=validate)
 97        if streams is not None:
 98            self.select_streams(streams)
 99        if cursor_key_overrides is not None:
100            self.set_cursor_keys(**cursor_key_overrides)
101        if primary_key_overrides is not None:
102            self.set_primary_keys(**primary_key_overrides)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
104    def set_streams(self, streams: list[str]) -> None:
105        """Deprecated. See select_streams()."""
106        warnings.warn(
107            "The 'set_streams' method is deprecated and will be removed in a future version. "
108            "Please use the 'select_streams' method instead.",
109            DeprecationWarning,
110            stacklevel=2,
111        )
112        self.select_streams(streams)

Deprecated. See select_streams().

def set_cursor_key(self, stream_name: str, cursor_key: str) -> None:
114    def set_cursor_key(
115        self,
116        stream_name: str,
117        cursor_key: str,
118    ) -> None:
119        """Set the cursor for a single stream.
120
121        Note:
122        - This does not unset previously set cursors.
123        - The cursor key must be a single field name.
124        - Not all streams support custom cursors. If a stream does not support custom cursors,
125          the override may be ignored.
126        - Stream names are case insensitive, while field names are case sensitive.
127        - Stream names are not validated by PyAirbyte. If the stream name
128          does not exist in the catalog, the override may be ignored.
129        """
130        self._cursor_key_overrides[stream_name.lower()] = cursor_key

Set the cursor for a single stream.

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_cursor_keys(self, **kwargs: str) -> None:
132    def set_cursor_keys(
133        self,
134        **kwargs: str,
135    ) -> None:
136        """Override the cursor key for one or more streams.
137
138        Usage:
139            source.set_cursor_keys(
140                stream1="cursor1",
141                stream2="cursor2",
142            )
143
144        Note:
145        - This does not unset previously set cursors.
146        - The cursor key must be a single field name.
147        - Not all streams support custom cursors. If a stream does not support custom cursors,
148          the override may be ignored.
149        - Stream names are case insensitive, while field names are case sensitive.
150        - Stream names are not validated by PyAirbyte. If the stream name
151          does not exist in the catalog, the override may be ignored.
152        """
153        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})

Override the cursor key for one or more streams.

Usage:

source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_key(self, stream_name: str, primary_key: str | list[str]) -> None:
155    def set_primary_key(
156        self,
157        stream_name: str,
158        primary_key: str | list[str],
159    ) -> None:
160        """Set the primary key for a single stream.
161
162        Note:
163        - This does not unset previously set primary keys.
164        - The primary key must be a single field name or a list of field names.
165        - Not all streams support overriding primary keys. If a stream does not support overriding
166          primary keys, the override may be ignored.
167        - Stream names are case insensitive, while field names are case sensitive.
168        - Stream names are not validated by PyAirbyte. If the stream name
169          does not exist in the catalog, the override may be ignored.
170        """
171        self._primary_key_overrides[stream_name.lower()] = (
172            primary_key if isinstance(primary_key, list) else [primary_key]
173        )

Set the primary key for a single stream.

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_keys(self, **kwargs: str | list[str]) -> None:
175    def set_primary_keys(
176        self,
177        **kwargs: str | list[str],
178    ) -> None:
179        """Override the primary keys for one or more streams.
180
181        This does not unset previously set primary keys.
182
183        Usage:
184            source.set_primary_keys(
185                stream1="pk1",
186                stream2=["pk1", "pk2"],
187            )
188
189        Note:
190        - This does not unset previously set primary keys.
191        - The primary key must be a single field name or a list of field names.
192        - Not all streams support overriding primary keys. If a stream does not support overriding
193          primary keys, the override may be ignored.
194        - Stream names are case insensitive, while field names are case sensitive.
195        - Stream names are not validated by PyAirbyte. If the stream name
196          does not exist in the catalog, the override may be ignored.
197        """
198        self._primary_key_overrides.update(
199            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
200        )

Override the primary keys for one or more streams.

This does not unset previously set primary keys.

Usage:

source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def select_all_streams(self) -> None:
214    def select_all_streams(self) -> None:
215        """Select all streams.
216
217        This is a more streamlined equivalent to:
218        > source.select_streams(source.get_available_streams()).
219        """
220        if self._config_dict is None:
221            self._to_be_selected_streams = "*"
222            self._log_warning_preselected_stream(self._to_be_selected_streams)
223            return
224
225        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
227    def select_streams(self, streams: str | list[str]) -> None:
228        """Select the stream names that should be read from the connector.
229
230        Args:
231            streams: A list of stream names to select. If set to "*", all streams will be selected.
232
233        Currently, if this is not set, all streams will be read.
234        """
235        if self._config_dict is None:
236            self._to_be_selected_streams = streams
237            self._log_warning_preselected_stream(streams)
238            return
239
240        if streams == "*":
241            self.select_all_streams()
242            return
243
244        if isinstance(streams, str):
245            # If a single stream is provided, convert it to a one-item list
246            streams = [streams]
247
248        available_streams = self.get_available_streams()
249        for stream in streams:
250            if stream not in available_streams:
251                raise exc.AirbyteStreamNotFoundError(
252                    stream_name=stream,
253                    connector_name=self.name,
254                    available_streams=available_streams,
255                )
256        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
258    def get_selected_streams(self) -> list[str]:
259        """Get the selected streams.
260
261        If no streams are selected, return an empty list.
262        """
263        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
265    def set_config(
266        self,
267        config: dict[str, Any],
268        *,
269        validate: bool = True,
270    ) -> None:
271        """Set the config for the connector.
272
273        If validate is True, raise an exception if the config fails validation.
274
275        If validate is False, validation will be deferred until check() or validate_config()
276        is called.
277        """
278        if validate:
279            self.validate_config(config)
280
281        self._config_dict = config
282
283        if self._to_be_selected_streams:
284            self.select_streams(self._to_be_selected_streams)
285            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_config(self) -> dict[str, typing.Any]:
287    def get_config(self) -> dict[str, Any]:
288        """Get the config for the connector."""
289        return self._config

Get the config for the connector.

def get_available_streams(self) -> list[str]:
318    def get_available_streams(self) -> list[str]:
319        """Get the available streams from the spec."""
320        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
352    @property
353    def config_spec(self) -> dict[str, Any]:
354        """Generate a configuration spec for this connector, as a JSON Schema definition.
355
356        This function generates a JSON Schema dictionary with configuration specs for the
357        current connector, as a dictionary.
358
359        Returns:
360            dict: The JSON Schema configuration spec as a dictionary.
361        """
362        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
364    def print_config_spec(
365        self,
366        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
367        *,
368        output_file: Path | str | None = None,
369    ) -> None:
370        """Print the configuration spec for this connector.
371
372        Args:
373            format: The format to print the spec in. Must be "yaml" or "json".
374            output_file: Optional. If set, the spec will be written to the given file path.
375                Otherwise, it will be printed to the console.
376        """
377        if format not in {"yaml", "json"}:
378            raise exc.PyAirbyteInputError(
379                message="Invalid format. Expected 'yaml' or 'json'",
380                input_value=format,
381            )
382        if isinstance(output_file, str):
383            output_file = Path(output_file)
384
385        if format == "yaml":
386            content = yaml.dump(self.config_spec, indent=2)
387        elif format == "json":
388            content = json.dumps(self.config_spec, indent=2)
389
390        if output_file:
391            output_file.write_text(content)
392            return
393
394        syntax_highlighted = Syntax(content, format)
395        print(syntax_highlighted)

Print the configuration spec for this connector.

Arguments:
  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
412    @property
413    def docs_url(self) -> str:
414        """Get the URL to the connector's documentation."""
415        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
416            "source-", ""
417        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
419    @property
420    def discovered_catalog(self) -> AirbyteCatalog:
421        """Get the raw catalog for the given streams.
422
423        If the catalog is not yet known, we call discover to get it.
424        """
425        if self._discovered_catalog is None:
426            self._discovered_catalog = self._discover()
427
428        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
430    @property
431    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
432        """Get the configured catalog for the given streams.
433
434        If the raw catalog is not yet known, we call discover to get it.
435
436        If no specific streams are selected, we return a catalog that syncs all available streams.
437
438        TODO: We should consider disabling by default the streams that the connector would
439        disable by default. (For instance, streams that require a premium license are sometimes
440        disabled by default within the connector.)
441        """
442        # Ensure discovered catalog is cached before we start
443        _ = self.discovered_catalog
444
445        # Filter for selected streams if set, otherwise use all available streams:
446        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
447        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
449    def get_configured_catalog(
450        self,
451        streams: Literal["*"] | list[str] | None = None,
452    ) -> ConfiguredAirbyteCatalog:
453        """Get a configured catalog for the given streams.
454
455        If no streams are provided, the selected streams will be used. If no streams are selected,
456        all available streams will be used.
457
458        If '*' is provided, all available streams will be used.
459        """
460        selected_streams: list[str] = []
461        if streams is None:
462            selected_streams = self._selected_stream_names or self.get_available_streams()
463        elif streams == "*":
464            selected_streams = self.get_available_streams()
465        elif isinstance(streams, list):
466            selected_streams = streams
467        else:
468            raise exc.PyAirbyteInputError(
469                message="Invalid streams argument.",
470                input_value=streams,
471            )
472
473        return ConfiguredAirbyteCatalog(
474            streams=[
475                ConfiguredAirbyteStream(
476                    stream=stream,
477                    destination_sync_mode=DestinationSyncMode.overwrite,
478                    sync_mode=SyncMode.incremental,
479                    primary_key=(
480                        [self._primary_key_overrides[stream.name.lower()]]
481                        if stream.name.lower() in self._primary_key_overrides
482                        else stream.source_defined_primary_key
483                    ),
484                    cursor_field=(
485                        [self._cursor_key_overrides[stream.name.lower()]]
486                        if stream.name.lower() in self._cursor_key_overrides
487                        else stream.default_cursor_field
488                    ),
489                    # These are unused in the current implementation:
490                    generation_id=None,
491                    minimum_generation_id=None,
492                    sync_id=None,
493                )
494                for stream in self.discovered_catalog.streams
495                if stream.name in selected_streams
496            ],
497        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
499    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
500        """Return the JSON Schema spec for the specified stream name."""
501        catalog: AirbyteCatalog = self.discovered_catalog
502        found: list[AirbyteStream] = [
503            stream for stream in catalog.streams if stream.name == stream_name
504        ]
505
506        if len(found) == 0:
507            raise exc.PyAirbyteInputError(
508                message="Stream name does not exist in catalog.",
509                input_value=stream_name,
510            )
511
512        if len(found) > 1:
513            raise exc.PyAirbyteInternalError(
514                message="Duplicate streams found with the same name.",
515                context={
516                    "found_streams": found,
517                },
518            )
519
520        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
522    def get_records(
523        self,
524        stream: str,
525        *,
526        normalize_field_names: bool = False,
527        prune_undeclared_fields: bool = True,
528    ) -> LazyDataset:
529        """Read a stream from the connector.
530
531        Args:
532            stream: The name of the stream to read.
533            normalize_field_names: When `True`, field names will be normalized to lower case, with
534                special characters removed. This matches the behavior of PyAirbyte caches and most
535                Airbyte destinations.
536            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
537                which generally matches the behavior of PyAirbyte caches and most Airbyte
538                destinations, specifically when you expect the catalog may be stale. You can disable
539                this to keep all fields in the records.
540
541        This involves the following steps:
542        * Call discover to get the catalog
543        * Generate a configured catalog that syncs the given stream in full_refresh mode
544        * Write the configured catalog and the config to a temporary file
545        * execute the connector with read --config <config_file> --catalog <catalog_file>
546        * Listen to the messages and return the first AirbyteRecordMessages that come along.
547        * Make sure the subprocess is killed when the function returns.
548        """
549        configured_catalog = self.get_configured_catalog(streams=[stream])
550        if len(configured_catalog.streams) == 0:
551            raise exc.PyAirbyteInputError(
552                message="Requested stream does not exist.",
553                context={
554                    "stream": stream,
555                    "available_streams": self.get_available_streams(),
556                    "connector_name": self.name,
557                },
558            ) from KeyError(stream)
559
560        configured_stream = configured_catalog.streams[0]
561
562        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
563            yield from records
564
565        stream_record_handler = StreamRecordHandler(
566            json_schema=self.get_stream_json_schema(stream),
567            prune_extra_fields=prune_undeclared_fields,
568            normalize_keys=normalize_field_names,
569        )
570
571        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
572        progress_tracker = ProgressTracker(
573            ProgressStyle.PLAIN,
574            source=self,
575            cache=None,
576            destination=None,
577            expected_streams=[stream],
578        )
579
580        iterator: Iterator[dict[str, Any]] = (
581            StreamRecord.from_record_message(
582                record_message=record.record,
583                stream_record_handler=stream_record_handler,
584            )
585            for record in self._read_with_catalog(
586                catalog=configured_catalog,
587                progress_tracker=progress_tracker,
588            )
589            if record.record
590        )
591        progress_tracker.log_success()
592        return LazyDataset(
593            iterator,
594            stream_metadata=configured_stream,
595        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
597    def get_documents(
598        self,
599        stream: str,
600        title_property: str | None = None,
601        content_properties: list[str] | None = None,
602        metadata_properties: list[str] | None = None,
603        *,
604        render_metadata: bool = False,
605    ) -> Iterable[Document]:
606        """Read a stream from the connector and return the records as documents.
607
608        If metadata_properties is not set, all properties that are not content will be added to
609        the metadata.
610
611        If render_metadata is True, metadata will be rendered in the document, as well as the
612        the main content.
613        """
614        return self.get_records(stream).to_documents(
615            title_property=title_property,
616            content_properties=content_properties,
617            metadata_properties=metadata_properties,
618            render_metadata=render_metadata,
619        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
709    def read(
710        self,
711        cache: CacheBase | None = None,
712        *,
713        streams: str | list[str] | None = None,
714        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
715        force_full_refresh: bool = False,
716        skip_validation: bool = False,
717    ) -> ReadResult:
718        """Read from the connector and write to the cache.
719
720        Args:
721            cache: The cache to write to. If not set, a default cache will be used.
722            streams: Optional if already set. A list of stream names to select for reading. If set
723                to "*", all streams will be selected.
724            write_strategy: The strategy to use when writing to the cache. If a string, it must be
725                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
726                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
727                WriteStrategy.AUTO.
728            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
729                streams will be read in incremental mode if supported by the connector. This option
730                must be True when using the "replace" strategy.
731            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
732                running the connector. This can be helpful in debugging, when you want to send
733                configurations to the connector that otherwise might be rejected by JSON Schema
734                validation rules.
735        """
736        cache = cache or get_default_cache()
737        progress_tracker = ProgressTracker(
738            source=self,
739            cache=cache,
740            destination=None,
741            expected_streams=None,  # Will be set later
742        )
743
744        # Set up state provider if not in full refresh mode
745        if force_full_refresh:
746            state_provider: StateProviderBase | None = None
747        else:
748            state_provider = cache.get_state_provider(
749                source_name=self._name,
750            )
751        state_writer = cache.get_state_writer(source_name=self._name)
752
753        if streams:
754            self.select_streams(streams)
755
756        if not self._selected_stream_names:
757            raise exc.PyAirbyteNoStreamsSelectedError(
758                connector_name=self.name,
759                available_streams=self.get_available_streams(),
760            )
761
762        try:
763            result = self._read_to_cache(
764                cache=cache,
765                catalog_provider=CatalogProvider(self.configured_catalog),
766                stream_names=self._selected_stream_names,
767                state_provider=state_provider,
768                state_writer=state_writer,
769                write_strategy=write_strategy,
770                force_full_refresh=force_full_refresh,
771                skip_validation=skip_validation,
772                progress_tracker=progress_tracker,
773            )
774        except exc.PyAirbyteInternalError as ex:
775            progress_tracker.log_failure(exception=ex)
776            raise exc.AirbyteConnectorFailedError(
777                connector_name=self.name,
778                log_text=self._last_log_messages,
779            ) from ex
780        except Exception as ex:
781            progress_tracker.log_failure(exception=ex)
782            raise
783
784        progress_tracker.log_success()
785        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
config_hash
validate_config
connector_version
check
install
uninstall