airbyte.sources.base

Base class implementation for sources.

   1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
   2"""Base class implementation for sources."""
   3
   4from __future__ import annotations
   5
   6import sys
   7import threading
   8import time
   9import warnings
  10from itertools import islice
  11from typing import TYPE_CHECKING, Any, Literal
  12
  13import yaml
  14from rich import print  # noqa: A004  # Allow shadowing the built-in
  15from rich.console import Console
  16from rich.markdown import Markdown
  17from rich.markup import escape
  18from rich.table import Table
  19from typing_extensions import override
  20
  21from airbyte_protocol.models import (
  22    AirbyteCatalog,
  23    AirbyteMessage,
  24    ConfiguredAirbyteCatalog,
  25    ConfiguredAirbyteStream,
  26    DestinationSyncMode,
  27    SyncMode,
  28    Type,
  29)
  30
  31from airbyte import exceptions as exc
  32from airbyte._connector_base import ConnectorBase
  33from airbyte._message_iterators import AirbyteMessageIterator
  34from airbyte._util.temp_files import as_temp_files
  35from airbyte.caches.util import get_default_cache
  36from airbyte.datasets._lazy import LazyDataset
  37from airbyte.progress import ProgressStyle, ProgressTracker
  38from airbyte.records import StreamRecord, StreamRecordHandler
  39from airbyte.results import ReadResult
  40from airbyte.shared.catalog_providers import CatalogProvider
  41from airbyte.strategies import WriteStrategy
  42
  43
  44if TYPE_CHECKING:
  45    from collections.abc import Generator, Iterable, Iterator
  46
  47    from airbyte_protocol.models import (
  48        AirbyteStream,
  49        ConnectorSpecification,
  50    )
  51
  52    from airbyte._executors.base import Executor
  53    from airbyte.caches import CacheBase
  54    from airbyte.callbacks import ConfigChangeCallback
  55    from airbyte.datasets._inmemory import InMemoryDataset
  56    from airbyte.documents import Document
  57    from airbyte.shared.state_providers import StateProviderBase
  58    from airbyte.shared.state_writers import StateWriterBase
  59
  60from airbyte.constants import (
  61    AB_EXTRACTED_AT_COLUMN,
  62    AB_META_COLUMN,
  63    AB_RAW_ID_COLUMN,
  64)
  65
  66
  67class Source(ConnectorBase):  # noqa: PLR0904
  68    """A class representing a source that can be called."""
  69
  70    connector_type = "source"
  71
  72    def __init__(
  73        self,
  74        executor: Executor,
  75        name: str,
  76        config: dict[str, Any] | None = None,
  77        *,
  78        config_change_callback: ConfigChangeCallback | None = None,
  79        streams: str | list[str] | None = None,
  80        validate: bool = False,
  81        cursor_key_overrides: dict[str, str] | None = None,
  82        primary_key_overrides: dict[str, str | list[str]] | None = None,
  83    ) -> None:
  84        """Initialize the source.
  85
  86        If config is provided, it will be validated against the spec if validate is True.
  87        """
  88        self._to_be_selected_streams: list[str] | str = []
  89        """Used to hold selection criteria before catalog is known."""
  90
  91        super().__init__(
  92            executor=executor,
  93            name=name,
  94            config=config,
  95            config_change_callback=config_change_callback,
  96            validate=validate,
  97        )
  98        self._config_dict: dict[str, Any] | None = None
  99        self._last_log_messages: list[str] = []
 100        self._discovered_catalog: AirbyteCatalog | None = None
 101        self._selected_stream_names: list[str] = []
 102
 103        self._cursor_key_overrides: dict[str, str] = {}
 104        """A mapping of lower-cased stream names to cursor key overrides."""
 105
 106        self._primary_key_overrides: dict[str, list[str]] = {}
 107        """A mapping of lower-cased stream names to primary key overrides."""
 108
 109        if config is not None:
 110            self.set_config(config, validate=validate)
 111        if streams is not None:
 112            self.select_streams(streams)
 113        if cursor_key_overrides is not None:
 114            self.set_cursor_keys(**cursor_key_overrides)
 115        if primary_key_overrides is not None:
 116            self.set_primary_keys(**primary_key_overrides)
 117
 118    def set_streams(self, streams: list[str]) -> None:
 119        """Deprecated. See select_streams()."""
 120        warnings.warn(
 121            "The 'set_streams' method is deprecated and will be removed in a future version. "
 122            "Please use the 'select_streams' method instead.",
 123            DeprecationWarning,
 124            stacklevel=2,
 125        )
 126        self.select_streams(streams)
 127
 128    def set_cursor_key(
 129        self,
 130        stream_name: str,
 131        cursor_key: str,
 132    ) -> None:
 133        """Set the cursor for a single stream.
 134
 135        Note:
 136        - This does not unset previously set cursors.
 137        - The cursor key must be a single field name.
 138        - Not all streams support custom cursors. If a stream does not support custom cursors,
 139          the override may be ignored.
 140        - Stream names are case insensitive, while field names are case sensitive.
 141        - Stream names are not validated by PyAirbyte. If the stream name
 142          does not exist in the catalog, the override may be ignored.
 143        """
 144        self._cursor_key_overrides[stream_name.lower()] = cursor_key
 145
 146    def set_cursor_keys(
 147        self,
 148        **kwargs: str,
 149    ) -> None:
 150        """Override the cursor key for one or more streams.
 151
 152        Usage:
 153            source.set_cursor_keys(
 154                stream1="cursor1",
 155                stream2="cursor2",
 156            )
 157
 158        Note:
 159        - This does not unset previously set cursors.
 160        - The cursor key must be a single field name.
 161        - Not all streams support custom cursors. If a stream does not support custom cursors,
 162          the override may be ignored.
 163        - Stream names are case insensitive, while field names are case sensitive.
 164        - Stream names are not validated by PyAirbyte. If the stream name
 165          does not exist in the catalog, the override may be ignored.
 166        """
 167        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
 168
 169    def set_primary_key(
 170        self,
 171        stream_name: str,
 172        primary_key: str | list[str],
 173    ) -> None:
 174        """Set the primary key for a single stream.
 175
 176        Note:
 177        - This does not unset previously set primary keys.
 178        - The primary key must be a single field name or a list of field names.
 179        - Not all streams support overriding primary keys. If a stream does not support overriding
 180          primary keys, the override may be ignored.
 181        - Stream names are case insensitive, while field names are case sensitive.
 182        - Stream names are not validated by PyAirbyte. If the stream name
 183          does not exist in the catalog, the override may be ignored.
 184        """
 185        self._primary_key_overrides[stream_name.lower()] = (
 186            primary_key if isinstance(primary_key, list) else [primary_key]
 187        )
 188
 189    def set_primary_keys(
 190        self,
 191        **kwargs: str | list[str],
 192    ) -> None:
 193        """Override the primary keys for one or more streams.
 194
 195        This does not unset previously set primary keys.
 196
 197        Usage:
 198            source.set_primary_keys(
 199                stream1="pk1",
 200                stream2=["pk1", "pk2"],
 201            )
 202
 203        Note:
 204        - This does not unset previously set primary keys.
 205        - The primary key must be a single field name or a list of field names.
 206        - Not all streams support overriding primary keys. If a stream does not support overriding
 207          primary keys, the override may be ignored.
 208        - Stream names are case insensitive, while field names are case sensitive.
 209        - Stream names are not validated by PyAirbyte. If the stream name
 210          does not exist in the catalog, the override may be ignored.
 211        """
 212        self._primary_key_overrides.update(
 213            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
 214        )
 215
 216    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
 217        """Logs a warning message indicating stream selection which are not selected yet."""
 218        if streams == "*":
 219            print(
 220                "Warning: Config is not set yet. All streams will be selected after config is set.",
 221                file=sys.stderr,
 222            )
 223        else:
 224            print(
 225                "Warning: Config is not set yet. "
 226                f"Streams to be selected after config is set: {streams}",
 227                file=sys.stderr,
 228            )
 229
 230    def select_all_streams(self) -> None:
 231        """Select all streams.
 232
 233        This is a more streamlined equivalent to:
 234        > source.select_streams(source.get_available_streams()).
 235        """
 236        if self._config_dict is None:
 237            self._to_be_selected_streams = "*"
 238            self._log_warning_preselected_stream(self._to_be_selected_streams)
 239            return
 240
 241        self._selected_stream_names = self.get_available_streams()
 242
 243    def select_streams(self, streams: str | list[str]) -> None:
 244        """Select the stream names that should be read from the connector.
 245
 246        Args:
 247            streams: A list of stream names to select. If set to "*", all streams will be selected.
 248
 249        Currently, if this is not set, all streams will be read.
 250        """
 251        if self._config_dict is None:
 252            self._to_be_selected_streams = streams
 253            self._log_warning_preselected_stream(streams)
 254            return
 255
 256        if streams == "*":
 257            self.select_all_streams()
 258            return
 259
 260        if isinstance(streams, str):
 261            # If a single stream is provided, convert it to a one-item list
 262            streams = [streams]
 263
 264        available_streams = self.get_available_streams()
 265        for stream in streams:
 266            if stream not in available_streams:
 267                raise exc.AirbyteStreamNotFoundError(
 268                    stream_name=stream,
 269                    connector_name=self.name,
 270                    available_streams=available_streams,
 271                )
 272        self._selected_stream_names = streams
 273
 274    def get_selected_streams(self) -> list[str]:
 275        """Get the selected streams.
 276
 277        If no streams are selected, return an empty list.
 278        """
 279        return self._selected_stream_names
 280
 281    def set_config(
 282        self,
 283        config: dict[str, Any],
 284        *,
 285        validate: bool = True,
 286    ) -> None:
 287        """Set the config for the connector.
 288
 289        If validate is True, raise an exception if the config fails validation.
 290
 291        If validate is False, validation will be deferred until check() or validate_config()
 292        is called.
 293        """
 294        if validate:
 295            self.validate_config(config)
 296
 297        self._config_dict = config
 298
 299        if self._to_be_selected_streams:
 300            self.select_streams(self._to_be_selected_streams)
 301            self._to_be_selected_streams = []
 302
 303    def _discover(self) -> AirbyteCatalog:
 304        """Call discover on the connector.
 305
 306        This involves the following steps:
 307        - Write the config to a temporary file
 308        - execute the connector with discover --config <config_file>
 309        - Listen to the messages and return the first AirbyteCatalog that comes along.
 310        - Make sure the subprocess is killed when the function returns.
 311        """
 312        with as_temp_files([self._hydrated_config]) as [config_file]:
 313            for msg in self._execute(["discover", "--config", config_file]):
 314                if msg.type == Type.CATALOG and msg.catalog:
 315                    return msg.catalog
 316            raise exc.AirbyteConnectorMissingCatalogError(
 317                connector_name=self.name,
 318                log_text=self._last_log_messages,
 319            )
 320
 321    def get_available_streams(self) -> list[str]:
 322        """Get the available streams from the spec."""
 323        return [s.name for s in self.discovered_catalog.streams]
 324
 325    def _get_incremental_stream_names(self) -> list[str]:
 326        """Get the name of streams that support incremental sync."""
 327        return [
 328            stream.name
 329            for stream in self.discovered_catalog.streams
 330            if SyncMode.incremental in stream.supported_sync_modes
 331        ]
 332
 333    @override
 334    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
 335        """Call spec on the connector.
 336
 337        This involves the following steps:
 338        * execute the connector with spec
 339        * Listen to the messages and return the first AirbyteCatalog that comes along.
 340        * Make sure the subprocess is killed when the function returns.
 341        """
 342        if force_refresh or self._spec is None:
 343            for msg in self._execute(["spec"]):
 344                if msg.type == Type.SPEC and msg.spec:
 345                    self._spec = msg.spec
 346                    break
 347
 348        if self._spec:
 349            return self._spec
 350
 351        raise exc.AirbyteConnectorMissingSpecError(
 352            connector_name=self.name,
 353            log_text=self._last_log_messages,
 354        )
 355
 356    @property
 357    def config_spec(self) -> dict[str, Any]:
 358        """Generate a configuration spec for this connector, as a JSON Schema definition.
 359
 360        This function generates a JSON Schema dictionary with configuration specs for the
 361        current connector, as a dictionary.
 362
 363        Returns:
 364            dict: The JSON Schema configuration spec as a dictionary.
 365        """
 366        return self._get_spec(force_refresh=True).connectionSpecification
 367
 368    @property
 369    def _yaml_spec(self) -> str:
 370        """Get the spec as a yaml string.
 371
 372        For now, the primary use case is for writing and debugging a valid config for a source.
 373
 374        This is private for now because we probably want better polish before exposing this
 375        as a stable interface. This will also get easier when we have docs links with this info
 376        for each connector.
 377        """
 378        spec_obj: ConnectorSpecification = self._get_spec()
 379        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
 380        # convert to a yaml string
 381        return yaml.dump(spec_dict)
 382
 383    @property
 384    def docs_url(self) -> str:
 385        """Get the URL to the connector's documentation."""
 386        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
 387            "source-", ""
 388        )
 389
 390    @property
 391    def discovered_catalog(self) -> AirbyteCatalog:
 392        """Get the raw catalog for the given streams.
 393
 394        If the catalog is not yet known, we call discover to get it.
 395        """
 396        if self._discovered_catalog is None:
 397            self._discovered_catalog = self._discover()
 398
 399        return self._discovered_catalog
 400
 401    @property
 402    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
 403        """Get the configured catalog for the given streams.
 404
 405        If the raw catalog is not yet known, we call discover to get it.
 406
 407        If no specific streams are selected, we return a catalog that syncs all available streams.
 408
 409        TODO: We should consider disabling by default the streams that the connector would
 410        disable by default. (For instance, streams that require a premium license are sometimes
 411        disabled by default within the connector.)
 412        """
 413        # Ensure discovered catalog is cached before we start
 414        _ = self.discovered_catalog
 415
 416        # Filter for selected streams if set, otherwise use all available streams:
 417        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
 418        return self.get_configured_catalog(streams=streams_filter)
 419
 420    def get_configured_catalog(
 421        self,
 422        streams: Literal["*"] | list[str] | None = None,
 423        *,
 424        force_full_refresh: bool = False,
 425    ) -> ConfiguredAirbyteCatalog:
 426        """Get a configured catalog for the given streams.
 427
 428        If no streams are provided, the selected streams will be used. If no streams are selected,
 429        all available streams will be used.
 430
 431        If '*' is provided, all available streams will be used.
 432
 433        If force_full_refresh is True, streams will be configured with full_refresh sync mode
 434        when supported by the stream. Otherwise, incremental sync mode is used when supported.
 435        """
 436        selected_streams: list[str] = []
 437        if streams is None:
 438            selected_streams = self._selected_stream_names or self.get_available_streams()
 439        elif streams == "*":
 440            selected_streams = self.get_available_streams()
 441        elif isinstance(streams, list):
 442            selected_streams = streams
 443        else:
 444            raise exc.PyAirbyteInputError(
 445                message="Invalid streams argument.",
 446                input_value=streams,
 447            )
 448
 449        def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
 450            """Determine the sync mode for a stream based on force_full_refresh and support."""
 451            # Use getattr to handle mocks or streams without supported_sync_modes attribute
 452            supported_modes = getattr(stream, "supported_sync_modes", None)
 453
 454            if force_full_refresh:
 455                # When force_full_refresh is True, prefer full_refresh if supported
 456                if supported_modes and SyncMode.full_refresh in supported_modes:
 457                    return SyncMode.full_refresh
 458                # Fall back to incremental if full_refresh is not supported
 459                return SyncMode.incremental
 460
 461            # Default behavior: preserve previous semantics (always incremental)
 462            return SyncMode.incremental
 463
 464        return ConfiguredAirbyteCatalog(
 465            streams=[
 466                ConfiguredAirbyteStream(
 467                    stream=stream,
 468                    destination_sync_mode=DestinationSyncMode.overwrite,
 469                    sync_mode=_get_sync_mode(stream),
 470                    primary_key=(
 471                        [self._primary_key_overrides[stream.name.lower()]]
 472                        if stream.name.lower() in self._primary_key_overrides
 473                        else stream.source_defined_primary_key
 474                    ),
 475                    cursor_field=(
 476                        [self._cursor_key_overrides[stream.name.lower()]]
 477                        if stream.name.lower() in self._cursor_key_overrides
 478                        else stream.default_cursor_field
 479                    ),
 480                    # These are unused in the current implementation:
 481                    generation_id=None,
 482                    minimum_generation_id=None,
 483                    sync_id=None,
 484                )
 485                for stream in self.discovered_catalog.streams
 486                if stream.name in selected_streams
 487            ],
 488        )
 489
 490    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
 491        """Return the JSON Schema spec for the specified stream name."""
 492        catalog: AirbyteCatalog = self.discovered_catalog
 493        found: list[AirbyteStream] = [
 494            stream for stream in catalog.streams if stream.name == stream_name
 495        ]
 496
 497        if len(found) == 0:
 498            raise exc.PyAirbyteInputError(
 499                message="Stream name does not exist in catalog.",
 500                input_value=stream_name,
 501            )
 502
 503        if len(found) > 1:
 504            raise exc.PyAirbyteInternalError(
 505                message="Duplicate streams found with the same name.",
 506                context={
 507                    "found_streams": found,
 508                },
 509            )
 510
 511        return found[0].json_schema
 512
 513    def get_records(
 514        self,
 515        stream: str,
 516        *,
 517        limit: int | None = None,
 518        stop_event: threading.Event | None = None,
 519        normalize_field_names: bool = False,
 520        prune_undeclared_fields: bool = True,
 521    ) -> LazyDataset:
 522        """Read a stream from the connector.
 523
 524        Args:
 525            stream: The name of the stream to read.
 526            limit: The maximum number of records to read. If None, all records will be read.
 527            stop_event: If set, the event can be triggered by the caller to stop reading records
 528                and terminate the process.
 529            normalize_field_names: When `True`, field names will be normalized to lower case, with
 530                special characters removed. This matches the behavior of PyAirbyte caches and most
 531                Airbyte destinations.
 532            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
 533                which generally matches the behavior of PyAirbyte caches and most Airbyte
 534                destinations, specifically when you expect the catalog may be stale. You can disable
 535                this to keep all fields in the records.
 536
 537        This involves the following steps:
 538        * Call discover to get the catalog
 539        * Generate a configured catalog that syncs the given stream in full_refresh mode
 540        * Write the configured catalog and the config to a temporary file
 541        * execute the connector with read --config <config_file> --catalog <catalog_file>
 542        * Listen to the messages and return the first AirbyteRecordMessages that come along.
 543        * Make sure the subprocess is killed when the function returns.
 544        """
 545        stop_event = stop_event or threading.Event()
 546        configured_catalog = self.get_configured_catalog(streams=[stream])
 547        if len(configured_catalog.streams) == 0:
 548            raise exc.PyAirbyteInputError(
 549                message="Requested stream does not exist.",
 550                context={
 551                    "stream": stream,
 552                    "available_streams": self.get_available_streams(),
 553                    "connector_name": self.name,
 554                },
 555            ) from KeyError(stream)
 556
 557        configured_stream = configured_catalog.streams[0]
 558
 559        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
 560            yield from records
 561
 562        stream_record_handler = StreamRecordHandler(
 563            json_schema=self.get_stream_json_schema(stream),
 564            prune_extra_fields=prune_undeclared_fields,
 565            normalize_keys=normalize_field_names,
 566        )
 567
 568        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
 569        progress_tracker = ProgressTracker(
 570            ProgressStyle.PLAIN,
 571            source=self,
 572            cache=None,
 573            destination=None,
 574            expected_streams=[stream],
 575        )
 576
 577        iterator: Iterator[dict[str, Any]] = (
 578            StreamRecord.from_record_message(
 579                record_message=record.record,
 580                stream_record_handler=stream_record_handler,
 581            )
 582            for record in self._read_with_catalog(
 583                catalog=configured_catalog,
 584                progress_tracker=progress_tracker,
 585                stop_event=stop_event,
 586            )
 587            if record.record
 588        )
 589        if limit is not None:
 590            # Stop the iterator after the limit is reached
 591            iterator = islice(iterator, limit)
 592
 593        return LazyDataset(
 594            iterator,
 595            stream_metadata=configured_stream,
 596            stop_event=stop_event,
 597            progress_tracker=progress_tracker,
 598        )
 599
 600    def get_documents(
 601        self,
 602        stream: str,
 603        title_property: str | None = None,
 604        content_properties: list[str] | None = None,
 605        metadata_properties: list[str] | None = None,
 606        *,
 607        render_metadata: bool = False,
 608    ) -> Iterable[Document]:
 609        """Read a stream from the connector and return the records as documents.
 610
 611        If metadata_properties is not set, all properties that are not content will be added to
 612        the metadata.
 613
 614        If render_metadata is True, metadata will be rendered in the document, as well as the
 615        the main content.
 616        """
 617        return self.get_records(stream).to_documents(
 618            title_property=title_property,
 619            content_properties=content_properties,
 620            metadata_properties=metadata_properties,
 621            render_metadata=render_metadata,
 622        )
 623
 624    def get_samples(
 625        self,
 626        streams: list[str] | Literal["*"] | None = None,
 627        *,
 628        limit: int = 5,
 629        on_error: Literal["raise", "ignore", "log"] = "raise",
 630    ) -> dict[str, InMemoryDataset | None]:
 631        """Get a sample of records from the given streams."""
 632        if streams == "*":
 633            streams = self.get_available_streams()
 634        elif streams is None:
 635            streams = self.get_selected_streams()
 636
 637        results: dict[str, InMemoryDataset | None] = {}
 638        for stream in streams:
 639            stop_event = threading.Event()
 640            try:
 641                results[stream] = self.get_records(
 642                    stream,
 643                    limit=limit,
 644                    stop_event=stop_event,
 645                ).fetch_all()
 646                stop_event.set()
 647            except Exception as ex:
 648                results[stream] = None
 649                if on_error == "ignore":
 650                    continue
 651
 652                if on_error == "raise":
 653                    raise ex from None
 654
 655                if on_error == "log":
 656                    print(f"Error fetching sample for stream '{stream}': {ex}")
 657
 658        return results
 659
 660    def print_samples(
 661        self,
 662        streams: list[str] | Literal["*"] | None = None,
 663        *,
 664        limit: int = 5,
 665        on_error: Literal["raise", "ignore", "log"] = "log",
 666    ) -> None:
 667        """Print a sample of records from the given streams."""
 668        internal_cols: list[str] = [
 669            AB_EXTRACTED_AT_COLUMN,
 670            AB_META_COLUMN,
 671            AB_RAW_ID_COLUMN,
 672        ]
 673        col_limit = 10
 674        if streams == "*":
 675            streams = self.get_available_streams()
 676        elif streams is None:
 677            streams = self.get_selected_streams()
 678
 679        console = Console()
 680
 681        console.print(
 682            Markdown(
 683                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
 684                justify="left",
 685            )
 686        )
 687
 688        for stream in streams:
 689            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
 690            samples = self.get_samples(
 691                streams=[stream],
 692                limit=limit,
 693                on_error=on_error,
 694            )
 695            dataset = samples[stream]
 696
 697            table = Table(
 698                show_header=True,
 699                show_lines=True,
 700            )
 701            if dataset is None:
 702                console.print(
 703                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
 704                )
 705                continue
 706
 707            if len(dataset.column_names) > col_limit:
 708                # We'll pivot the columns so each column is its own row
 709                table.add_column("Column Name")
 710                for _ in range(len(dataset)):
 711                    table.add_column(overflow="fold")
 712                for col in dataset.column_names:
 713                    table.add_row(
 714                        Markdown(f"**`{col}`**"),
 715                        *[escape(str(record[col])) for record in dataset],
 716                    )
 717            else:
 718                for col in dataset.column_names:
 719                    table.add_column(
 720                        Markdown(f"**`{col}`**"),
 721                        overflow="fold",
 722                    )
 723
 724                for record in dataset:
 725                    table.add_row(
 726                        *[
 727                            escape(str(val))
 728                            for key, val in record.items()
 729                            # Exclude internal Airbyte columns.
 730                            if key not in internal_cols
 731                        ]
 732                    )
 733
 734            console.print(table)
 735
 736        console.print(Markdown("--------------"))
 737
 738    def _get_airbyte_message_iterator(
 739        self,
 740        *,
 741        streams: Literal["*"] | list[str] | None = None,
 742        state_provider: StateProviderBase | None = None,
 743        progress_tracker: ProgressTracker,
 744        force_full_refresh: bool = False,
 745    ) -> AirbyteMessageIterator:
 746        """Get an AirbyteMessageIterator for this source."""
 747        return AirbyteMessageIterator(
 748            self._read_with_catalog(
 749                catalog=self.get_configured_catalog(
 750                    streams=streams,
 751                    force_full_refresh=force_full_refresh,
 752                ),
 753                state=state_provider if not force_full_refresh else None,
 754                progress_tracker=progress_tracker,
 755            )
 756        )
 757
 758    def _read_with_catalog(
 759        self,
 760        catalog: ConfiguredAirbyteCatalog,
 761        progress_tracker: ProgressTracker,
 762        *,
 763        state: StateProviderBase | None = None,
 764        stop_event: threading.Event | None = None,
 765    ) -> Generator[AirbyteMessage, None, None]:
 766        """Call read on the connector.
 767
 768        This involves the following steps:
 769        * Write the config to a temporary file
 770        * execute the connector with read --config <config_file> --catalog <catalog_file>
 771        * Listen to the messages and return the AirbyteRecordMessages that come along.
 772        * Send out telemetry on the performed sync (with information about which source was used and
 773          the type of the cache)
 774        """
 775        with as_temp_files(
 776            [
 777                self._hydrated_config,
 778                catalog.model_dump_json(exclude_none=True),
 779                state.to_state_input_file_text() if state else "[]",
 780            ]
 781        ) as [
 782            config_file,
 783            catalog_file,
 784            state_file,
 785        ]:
 786            message_generator = self._execute(
 787                [
 788                    "read",
 789                    "--config",
 790                    config_file,
 791                    "--catalog",
 792                    catalog_file,
 793                    "--state",
 794                    state_file,
 795                ],
 796                progress_tracker=progress_tracker,
 797            )
 798            for message in progress_tracker.tally_records_read(message_generator):
 799                if stop_event and stop_event.is_set():
 800                    progress_tracker._log_sync_cancel()  # noqa: SLF001
 801                    time.sleep(0.1)
 802                    return
 803
 804                yield message
 805
 806        progress_tracker.log_read_complete()
 807
 808    def _peek_airbyte_message(
 809        self,
 810        message: AirbyteMessage,
 811        *,
 812        raise_on_error: bool = True,
 813    ) -> None:
 814        """Process an Airbyte message.
 815
 816        This method handles reading Airbyte messages and taking action, if needed, based on the
 817        message type. For instance, log messages are logged, records are tallied, and errors are
 818        raised as exceptions if `raise_on_error` is True.
 819
 820        Raises:
 821            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
 822        """
 823        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
 824
 825    def _log_incremental_streams(
 826        self,
 827        *,
 828        incremental_streams: set[str] | None = None,
 829    ) -> None:
 830        """Log the streams which are using incremental sync mode."""
 831        log_message = (
 832            "The following streams are currently using incremental sync:\n"
 833            f"{incremental_streams}\n"
 834            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
 835        )
 836        print(log_message, file=sys.stderr)
 837
 838    def read(
 839        self,
 840        cache: CacheBase | None = None,
 841        *,
 842        streams: str | list[str] | None = None,
 843        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
 844        force_full_refresh: bool = False,
 845        skip_validation: bool = False,
 846    ) -> ReadResult:
 847        """Read from the connector and write to the cache.
 848
 849        Args:
 850            cache: The cache to write to. If not set, a default cache will be used.
 851            streams: Optional if already set. A list of stream names to select for reading. If set
 852                to "*", all streams will be selected.
 853            write_strategy: The strategy to use when writing to the cache. If a string, it must be
 854                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
 855                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
 856                WriteStrategy.AUTO.
 857            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
 858                streams will be read in incremental mode if supported by the connector. This option
 859                must be True when using the "replace" strategy.
 860            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
 861                running the connector. This can be helpful in debugging, when you want to send
 862                configurations to the connector that otherwise might be rejected by JSON Schema
 863                validation rules.
 864        """
 865        cache = cache or get_default_cache()
 866        progress_tracker = ProgressTracker(
 867            source=self,
 868            cache=cache,
 869            destination=None,
 870            expected_streams=None,  # Will be set later
 871        )
 872
 873        # Set up state provider if not in full refresh mode
 874        if force_full_refresh:
 875            state_provider: StateProviderBase | None = None
 876        else:
 877            state_provider = cache.get_state_provider(
 878                source_name=self._name,
 879            )
 880        state_writer = cache.get_state_writer(source_name=self._name)
 881
 882        if streams:
 883            self.select_streams(streams)
 884
 885        if not self._selected_stream_names:
 886            raise exc.PyAirbyteNoStreamsSelectedError(
 887                connector_name=self.name,
 888                available_streams=self.get_available_streams(),
 889            )
 890
 891        try:
 892            result = self._read_to_cache(
 893                cache=cache,
 894                catalog_provider=CatalogProvider(
 895                    self.get_configured_catalog(force_full_refresh=force_full_refresh)
 896                ),
 897                stream_names=self._selected_stream_names,
 898                state_provider=state_provider,
 899                state_writer=state_writer,
 900                write_strategy=write_strategy,
 901                force_full_refresh=force_full_refresh,
 902                skip_validation=skip_validation,
 903                progress_tracker=progress_tracker,
 904            )
 905        except exc.PyAirbyteInternalError as ex:
 906            progress_tracker.log_failure(exception=ex)
 907            raise exc.AirbyteConnectorFailedError(
 908                connector_name=self.name,
 909                log_text=self._last_log_messages,
 910            ) from ex
 911        except Exception as ex:
 912            progress_tracker.log_failure(exception=ex)
 913            raise
 914
 915        progress_tracker.log_success()
 916        return result
 917
 918    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
 919        self,
 920        cache: CacheBase,
 921        *,
 922        catalog_provider: CatalogProvider,
 923        stream_names: list[str],
 924        state_provider: StateProviderBase | None,
 925        state_writer: StateWriterBase | None,
 926        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
 927        force_full_refresh: bool = False,
 928        skip_validation: bool = False,
 929        progress_tracker: ProgressTracker,
 930    ) -> ReadResult:
 931        """Internal read method."""
 932        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
 933            warnings.warn(
 934                message=(
 935                    "Using `REPLACE` strategy without also setting `force_full_refresh=True` "
 936                    "could result in data loss. "
 937                    "To silence this warning, use the following: "
 938                    'warnings.filterwarnings("ignore", '
 939                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
 940                ),
 941                category=exc.PyAirbyteDataLossWarning,
 942                stacklevel=1,
 943            )
 944        if isinstance(write_strategy, str):
 945            try:
 946                write_strategy = WriteStrategy(write_strategy)
 947            except ValueError:
 948                raise exc.PyAirbyteInputError(
 949                    message="Invalid strategy",
 950                    context={
 951                        "write_strategy": write_strategy,
 952                        "available_strategies": [
 953                            s.value
 954                            for s in WriteStrategy  # pyrefly: ignore[not-iterable]
 955                        ],
 956                    },
 957                ) from None
 958
 959        # Run optional validation step
 960        if not skip_validation:
 961            self.validate_config()
 962
 963        # Log incremental stream if incremental streams are known
 964        if state_provider and state_provider.known_stream_names:
 965            # Retrieve set of the known streams support which support incremental sync
 966            incremental_streams = (
 967                set(self._get_incremental_stream_names())
 968                & state_provider.known_stream_names
 969                & set(self.get_selected_streams())
 970            )
 971            if incremental_streams:
 972                self._log_incremental_streams(incremental_streams=incremental_streams)
 973
 974        airbyte_message_iterator = AirbyteMessageIterator(
 975            self._read_with_catalog(
 976                catalog=catalog_provider.configured_catalog,
 977                state=state_provider,
 978                progress_tracker=progress_tracker,
 979            )
 980        )
 981        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
 982            stdin=airbyte_message_iterator,
 983            catalog_provider=catalog_provider,
 984            write_strategy=write_strategy,
 985            state_writer=state_writer,
 986            progress_tracker=progress_tracker,
 987        )
 988
 989        # Flush the WAL, if applicable
 990        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
 991
 992        return ReadResult(
 993            source_name=self.name,
 994            progress_tracker=progress_tracker,
 995            processed_streams=stream_names,
 996            cache=cache,
 997        )
 998
 999
1000__all__ = [
1001    "Source",
1002]
class Source(airbyte._connector_base.ConnectorBase):
 68class Source(ConnectorBase):  # noqa: PLR0904
 69    """A class representing a source that can be called."""
 70
 71    connector_type = "source"
 72
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)
118
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)
128
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key
146
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
169
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )
189
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )
216
217    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
218        """Logs a warning message indicating stream selection which are not selected yet."""
219        if streams == "*":
220            print(
221                "Warning: Config is not set yet. All streams will be selected after config is set.",
222                file=sys.stderr,
223            )
224        else:
225            print(
226                "Warning: Config is not set yet. "
227                f"Streams to be selected after config is set: {streams}",
228                file=sys.stderr,
229            )
230
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()
243
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams
274
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names
281
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []
303
304    def _discover(self) -> AirbyteCatalog:
305        """Call discover on the connector.
306
307        This involves the following steps:
308        - Write the config to a temporary file
309        - execute the connector with discover --config <config_file>
310        - Listen to the messages and return the first AirbyteCatalog that comes along.
311        - Make sure the subprocess is killed when the function returns.
312        """
313        with as_temp_files([self._hydrated_config]) as [config_file]:
314            for msg in self._execute(["discover", "--config", config_file]):
315                if msg.type == Type.CATALOG and msg.catalog:
316                    return msg.catalog
317            raise exc.AirbyteConnectorMissingCatalogError(
318                connector_name=self.name,
319                log_text=self._last_log_messages,
320            )
321
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]
325
326    def _get_incremental_stream_names(self) -> list[str]:
327        """Get the name of streams that support incremental sync."""
328        return [
329            stream.name
330            for stream in self.discovered_catalog.streams
331            if SyncMode.incremental in stream.supported_sync_modes
332        ]
333
334    @override
335    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
336        """Call spec on the connector.
337
338        This involves the following steps:
339        * execute the connector with spec
340        * Listen to the messages and return the first AirbyteCatalog that comes along.
341        * Make sure the subprocess is killed when the function returns.
342        """
343        if force_refresh or self._spec is None:
344            for msg in self._execute(["spec"]):
345                if msg.type == Type.SPEC and msg.spec:
346                    self._spec = msg.spec
347                    break
348
349        if self._spec:
350            return self._spec
351
352        raise exc.AirbyteConnectorMissingSpecError(
353            connector_name=self.name,
354            log_text=self._last_log_messages,
355        )
356
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification
368
369    @property
370    def _yaml_spec(self) -> str:
371        """Get the spec as a yaml string.
372
373        For now, the primary use case is for writing and debugging a valid config for a source.
374
375        This is private for now because we probably want better polish before exposing this
376        as a stable interface. This will also get easier when we have docs links with this info
377        for each connector.
378        """
379        spec_obj: ConnectorSpecification = self._get_spec()
380        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
381        # convert to a yaml string
382        return yaml.dump(spec_dict)
383
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )
390
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog
401
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)
420
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424        *,
425        force_full_refresh: bool = False,
426    ) -> ConfiguredAirbyteCatalog:
427        """Get a configured catalog for the given streams.
428
429        If no streams are provided, the selected streams will be used. If no streams are selected,
430        all available streams will be used.
431
432        If '*' is provided, all available streams will be used.
433
434        If force_full_refresh is True, streams will be configured with full_refresh sync mode
435        when supported by the stream. Otherwise, incremental sync mode is used when supported.
436        """
437        selected_streams: list[str] = []
438        if streams is None:
439            selected_streams = self._selected_stream_names or self.get_available_streams()
440        elif streams == "*":
441            selected_streams = self.get_available_streams()
442        elif isinstance(streams, list):
443            selected_streams = streams
444        else:
445            raise exc.PyAirbyteInputError(
446                message="Invalid streams argument.",
447                input_value=streams,
448            )
449
450        def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
451            """Determine the sync mode for a stream based on force_full_refresh and support."""
452            # Use getattr to handle mocks or streams without supported_sync_modes attribute
453            supported_modes = getattr(stream, "supported_sync_modes", None)
454
455            if force_full_refresh:
456                # When force_full_refresh is True, prefer full_refresh if supported
457                if supported_modes and SyncMode.full_refresh in supported_modes:
458                    return SyncMode.full_refresh
459                # Fall back to incremental if full_refresh is not supported
460                return SyncMode.incremental
461
462            # Default behavior: preserve previous semantics (always incremental)
463            return SyncMode.incremental
464
465        return ConfiguredAirbyteCatalog(
466            streams=[
467                ConfiguredAirbyteStream(
468                    stream=stream,
469                    destination_sync_mode=DestinationSyncMode.overwrite,
470                    sync_mode=_get_sync_mode(stream),
471                    primary_key=(
472                        [self._primary_key_overrides[stream.name.lower()]]
473                        if stream.name.lower() in self._primary_key_overrides
474                        else stream.source_defined_primary_key
475                    ),
476                    cursor_field=(
477                        [self._cursor_key_overrides[stream.name.lower()]]
478                        if stream.name.lower() in self._cursor_key_overrides
479                        else stream.default_cursor_field
480                    ),
481                    # These are unused in the current implementation:
482                    generation_id=None,
483                    minimum_generation_id=None,
484                    sync_id=None,
485                )
486                for stream in self.discovered_catalog.streams
487                if stream.name in selected_streams
488            ],
489        )
490
491    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
492        """Return the JSON Schema spec for the specified stream name."""
493        catalog: AirbyteCatalog = self.discovered_catalog
494        found: list[AirbyteStream] = [
495            stream for stream in catalog.streams if stream.name == stream_name
496        ]
497
498        if len(found) == 0:
499            raise exc.PyAirbyteInputError(
500                message="Stream name does not exist in catalog.",
501                input_value=stream_name,
502            )
503
504        if len(found) > 1:
505            raise exc.PyAirbyteInternalError(
506                message="Duplicate streams found with the same name.",
507                context={
508                    "found_streams": found,
509                },
510            )
511
512        return found[0].json_schema
513
514    def get_records(
515        self,
516        stream: str,
517        *,
518        limit: int | None = None,
519        stop_event: threading.Event | None = None,
520        normalize_field_names: bool = False,
521        prune_undeclared_fields: bool = True,
522    ) -> LazyDataset:
523        """Read a stream from the connector.
524
525        Args:
526            stream: The name of the stream to read.
527            limit: The maximum number of records to read. If None, all records will be read.
528            stop_event: If set, the event can be triggered by the caller to stop reading records
529                and terminate the process.
530            normalize_field_names: When `True`, field names will be normalized to lower case, with
531                special characters removed. This matches the behavior of PyAirbyte caches and most
532                Airbyte destinations.
533            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
534                which generally matches the behavior of PyAirbyte caches and most Airbyte
535                destinations, specifically when you expect the catalog may be stale. You can disable
536                this to keep all fields in the records.
537
538        This involves the following steps:
539        * Call discover to get the catalog
540        * Generate a configured catalog that syncs the given stream in full_refresh mode
541        * Write the configured catalog and the config to a temporary file
542        * execute the connector with read --config <config_file> --catalog <catalog_file>
543        * Listen to the messages and return the first AirbyteRecordMessages that come along.
544        * Make sure the subprocess is killed when the function returns.
545        """
546        stop_event = stop_event or threading.Event()
547        configured_catalog = self.get_configured_catalog(streams=[stream])
548        if len(configured_catalog.streams) == 0:
549            raise exc.PyAirbyteInputError(
550                message="Requested stream does not exist.",
551                context={
552                    "stream": stream,
553                    "available_streams": self.get_available_streams(),
554                    "connector_name": self.name,
555                },
556            ) from KeyError(stream)
557
558        configured_stream = configured_catalog.streams[0]
559
560        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
561            yield from records
562
563        stream_record_handler = StreamRecordHandler(
564            json_schema=self.get_stream_json_schema(stream),
565            prune_extra_fields=prune_undeclared_fields,
566            normalize_keys=normalize_field_names,
567        )
568
569        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
570        progress_tracker = ProgressTracker(
571            ProgressStyle.PLAIN,
572            source=self,
573            cache=None,
574            destination=None,
575            expected_streams=[stream],
576        )
577
578        iterator: Iterator[dict[str, Any]] = (
579            StreamRecord.from_record_message(
580                record_message=record.record,
581                stream_record_handler=stream_record_handler,
582            )
583            for record in self._read_with_catalog(
584                catalog=configured_catalog,
585                progress_tracker=progress_tracker,
586                stop_event=stop_event,
587            )
588            if record.record
589        )
590        if limit is not None:
591            # Stop the iterator after the limit is reached
592            iterator = islice(iterator, limit)
593
594        return LazyDataset(
595            iterator,
596            stream_metadata=configured_stream,
597            stop_event=stop_event,
598            progress_tracker=progress_tracker,
599        )
600
601    def get_documents(
602        self,
603        stream: str,
604        title_property: str | None = None,
605        content_properties: list[str] | None = None,
606        metadata_properties: list[str] | None = None,
607        *,
608        render_metadata: bool = False,
609    ) -> Iterable[Document]:
610        """Read a stream from the connector and return the records as documents.
611
612        If metadata_properties is not set, all properties that are not content will be added to
613        the metadata.
614
615        If render_metadata is True, metadata will be rendered in the document, as well as the
616        the main content.
617        """
618        return self.get_records(stream).to_documents(
619            title_property=title_property,
620            content_properties=content_properties,
621            metadata_properties=metadata_properties,
622            render_metadata=render_metadata,
623        )
624
625    def get_samples(
626        self,
627        streams: list[str] | Literal["*"] | None = None,
628        *,
629        limit: int = 5,
630        on_error: Literal["raise", "ignore", "log"] = "raise",
631    ) -> dict[str, InMemoryDataset | None]:
632        """Get a sample of records from the given streams."""
633        if streams == "*":
634            streams = self.get_available_streams()
635        elif streams is None:
636            streams = self.get_selected_streams()
637
638        results: dict[str, InMemoryDataset | None] = {}
639        for stream in streams:
640            stop_event = threading.Event()
641            try:
642                results[stream] = self.get_records(
643                    stream,
644                    limit=limit,
645                    stop_event=stop_event,
646                ).fetch_all()
647                stop_event.set()
648            except Exception as ex:
649                results[stream] = None
650                if on_error == "ignore":
651                    continue
652
653                if on_error == "raise":
654                    raise ex from None
655
656                if on_error == "log":
657                    print(f"Error fetching sample for stream '{stream}': {ex}")
658
659        return results
660
661    def print_samples(
662        self,
663        streams: list[str] | Literal["*"] | None = None,
664        *,
665        limit: int = 5,
666        on_error: Literal["raise", "ignore", "log"] = "log",
667    ) -> None:
668        """Print a sample of records from the given streams."""
669        internal_cols: list[str] = [
670            AB_EXTRACTED_AT_COLUMN,
671            AB_META_COLUMN,
672            AB_RAW_ID_COLUMN,
673        ]
674        col_limit = 10
675        if streams == "*":
676            streams = self.get_available_streams()
677        elif streams is None:
678            streams = self.get_selected_streams()
679
680        console = Console()
681
682        console.print(
683            Markdown(
684                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
685                justify="left",
686            )
687        )
688
689        for stream in streams:
690            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
691            samples = self.get_samples(
692                streams=[stream],
693                limit=limit,
694                on_error=on_error,
695            )
696            dataset = samples[stream]
697
698            table = Table(
699                show_header=True,
700                show_lines=True,
701            )
702            if dataset is None:
703                console.print(
704                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
705                )
706                continue
707
708            if len(dataset.column_names) > col_limit:
709                # We'll pivot the columns so each column is its own row
710                table.add_column("Column Name")
711                for _ in range(len(dataset)):
712                    table.add_column(overflow="fold")
713                for col in dataset.column_names:
714                    table.add_row(
715                        Markdown(f"**`{col}`**"),
716                        *[escape(str(record[col])) for record in dataset],
717                    )
718            else:
719                for col in dataset.column_names:
720                    table.add_column(
721                        Markdown(f"**`{col}`**"),
722                        overflow="fold",
723                    )
724
725                for record in dataset:
726                    table.add_row(
727                        *[
728                            escape(str(val))
729                            for key, val in record.items()
730                            # Exclude internal Airbyte columns.
731                            if key not in internal_cols
732                        ]
733                    )
734
735            console.print(table)
736
737        console.print(Markdown("--------------"))
738
739    def _get_airbyte_message_iterator(
740        self,
741        *,
742        streams: Literal["*"] | list[str] | None = None,
743        state_provider: StateProviderBase | None = None,
744        progress_tracker: ProgressTracker,
745        force_full_refresh: bool = False,
746    ) -> AirbyteMessageIterator:
747        """Get an AirbyteMessageIterator for this source."""
748        return AirbyteMessageIterator(
749            self._read_with_catalog(
750                catalog=self.get_configured_catalog(
751                    streams=streams,
752                    force_full_refresh=force_full_refresh,
753                ),
754                state=state_provider if not force_full_refresh else None,
755                progress_tracker=progress_tracker,
756            )
757        )
758
759    def _read_with_catalog(
760        self,
761        catalog: ConfiguredAirbyteCatalog,
762        progress_tracker: ProgressTracker,
763        *,
764        state: StateProviderBase | None = None,
765        stop_event: threading.Event | None = None,
766    ) -> Generator[AirbyteMessage, None, None]:
767        """Call read on the connector.
768
769        This involves the following steps:
770        * Write the config to a temporary file
771        * execute the connector with read --config <config_file> --catalog <catalog_file>
772        * Listen to the messages and return the AirbyteRecordMessages that come along.
773        * Send out telemetry on the performed sync (with information about which source was used and
774          the type of the cache)
775        """
776        with as_temp_files(
777            [
778                self._hydrated_config,
779                catalog.model_dump_json(exclude_none=True),
780                state.to_state_input_file_text() if state else "[]",
781            ]
782        ) as [
783            config_file,
784            catalog_file,
785            state_file,
786        ]:
787            message_generator = self._execute(
788                [
789                    "read",
790                    "--config",
791                    config_file,
792                    "--catalog",
793                    catalog_file,
794                    "--state",
795                    state_file,
796                ],
797                progress_tracker=progress_tracker,
798            )
799            for message in progress_tracker.tally_records_read(message_generator):
800                if stop_event and stop_event.is_set():
801                    progress_tracker._log_sync_cancel()  # noqa: SLF001
802                    time.sleep(0.1)
803                    return
804
805                yield message
806
807        progress_tracker.log_read_complete()
808
809    def _peek_airbyte_message(
810        self,
811        message: AirbyteMessage,
812        *,
813        raise_on_error: bool = True,
814    ) -> None:
815        """Process an Airbyte message.
816
817        This method handles reading Airbyte messages and taking action, if needed, based on the
818        message type. For instance, log messages are logged, records are tallied, and errors are
819        raised as exceptions if `raise_on_error` is True.
820
821        Raises:
822            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
823        """
824        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
825
826    def _log_incremental_streams(
827        self,
828        *,
829        incremental_streams: set[str] | None = None,
830    ) -> None:
831        """Log the streams which are using incremental sync mode."""
832        log_message = (
833            "The following streams are currently using incremental sync:\n"
834            f"{incremental_streams}\n"
835            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
836        )
837        print(log_message, file=sys.stderr)
838
839    def read(
840        self,
841        cache: CacheBase | None = None,
842        *,
843        streams: str | list[str] | None = None,
844        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
845        force_full_refresh: bool = False,
846        skip_validation: bool = False,
847    ) -> ReadResult:
848        """Read from the connector and write to the cache.
849
850        Args:
851            cache: The cache to write to. If not set, a default cache will be used.
852            streams: Optional if already set. A list of stream names to select for reading. If set
853                to "*", all streams will be selected.
854            write_strategy: The strategy to use when writing to the cache. If a string, it must be
855                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
856                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
857                WriteStrategy.AUTO.
858            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
859                streams will be read in incremental mode if supported by the connector. This option
860                must be True when using the "replace" strategy.
861            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
862                running the connector. This can be helpful in debugging, when you want to send
863                configurations to the connector that otherwise might be rejected by JSON Schema
864                validation rules.
865        """
866        cache = cache or get_default_cache()
867        progress_tracker = ProgressTracker(
868            source=self,
869            cache=cache,
870            destination=None,
871            expected_streams=None,  # Will be set later
872        )
873
874        # Set up state provider if not in full refresh mode
875        if force_full_refresh:
876            state_provider: StateProviderBase | None = None
877        else:
878            state_provider = cache.get_state_provider(
879                source_name=self._name,
880            )
881        state_writer = cache.get_state_writer(source_name=self._name)
882
883        if streams:
884            self.select_streams(streams)
885
886        if not self._selected_stream_names:
887            raise exc.PyAirbyteNoStreamsSelectedError(
888                connector_name=self.name,
889                available_streams=self.get_available_streams(),
890            )
891
892        try:
893            result = self._read_to_cache(
894                cache=cache,
895                catalog_provider=CatalogProvider(
896                    self.get_configured_catalog(force_full_refresh=force_full_refresh)
897                ),
898                stream_names=self._selected_stream_names,
899                state_provider=state_provider,
900                state_writer=state_writer,
901                write_strategy=write_strategy,
902                force_full_refresh=force_full_refresh,
903                skip_validation=skip_validation,
904                progress_tracker=progress_tracker,
905            )
906        except exc.PyAirbyteInternalError as ex:
907            progress_tracker.log_failure(exception=ex)
908            raise exc.AirbyteConnectorFailedError(
909                connector_name=self.name,
910                log_text=self._last_log_messages,
911            ) from ex
912        except Exception as ex:
913            progress_tracker.log_failure(exception=ex)
914            raise
915
916        progress_tracker.log_success()
917        return result
918
919    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
920        self,
921        cache: CacheBase,
922        *,
923        catalog_provider: CatalogProvider,
924        stream_names: list[str],
925        state_provider: StateProviderBase | None,
926        state_writer: StateWriterBase | None,
927        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
928        force_full_refresh: bool = False,
929        skip_validation: bool = False,
930        progress_tracker: ProgressTracker,
931    ) -> ReadResult:
932        """Internal read method."""
933        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
934            warnings.warn(
935                message=(
936                    "Using `REPLACE` strategy without also setting `force_full_refresh=True` "
937                    "could result in data loss. "
938                    "To silence this warning, use the following: "
939                    'warnings.filterwarnings("ignore", '
940                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
941                ),
942                category=exc.PyAirbyteDataLossWarning,
943                stacklevel=1,
944            )
945        if isinstance(write_strategy, str):
946            try:
947                write_strategy = WriteStrategy(write_strategy)
948            except ValueError:
949                raise exc.PyAirbyteInputError(
950                    message="Invalid strategy",
951                    context={
952                        "write_strategy": write_strategy,
953                        "available_strategies": [
954                            s.value
955                            for s in WriteStrategy  # pyrefly: ignore[not-iterable]
956                        ],
957                    },
958                ) from None
959
960        # Run optional validation step
961        if not skip_validation:
962            self.validate_config()
963
964        # Log incremental stream if incremental streams are known
965        if state_provider and state_provider.known_stream_names:
966            # Retrieve set of the known streams support which support incremental sync
967            incremental_streams = (
968                set(self._get_incremental_stream_names())
969                & state_provider.known_stream_names
970                & set(self.get_selected_streams())
971            )
972            if incremental_streams:
973                self._log_incremental_streams(incremental_streams=incremental_streams)
974
975        airbyte_message_iterator = AirbyteMessageIterator(
976            self._read_with_catalog(
977                catalog=catalog_provider.configured_catalog,
978                state=state_provider,
979                progress_tracker=progress_tracker,
980            )
981        )
982        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
983            stdin=airbyte_message_iterator,
984            catalog_provider=catalog_provider,
985            write_strategy=write_strategy,
986            state_writer=state_writer,
987            progress_tracker=progress_tracker,
988        )
989
990        # Flush the WAL, if applicable
991        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
992
993        return ReadResult(
994            source_name=self.name,
995            progress_tracker=progress_tracker,
996            processed_streams=stream_names,
997            cache=cache,
998        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False, cursor_key_overrides: dict[str, str] | None = None, primary_key_overrides: dict[str, str | list[str]] | None = None)
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)

Deprecated. See select_streams().

def set_cursor_key(self, stream_name: str, cursor_key: str) -> None:
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key

Set the cursor for a single stream.

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_cursor_keys(self, **kwargs: str) -> None:
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})

Override the cursor key for one or more streams.

Usage:

source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_key(self, stream_name: str, primary_key: str | list[str]) -> None:
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )

Set the primary key for a single stream.

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_keys(self, **kwargs: str | list[str]) -> None:
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )

Override the primary keys for one or more streams.

This does not unset previously set primary keys.

Usage:

source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def select_all_streams(self) -> None:
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_available_streams(self) -> list[str]:
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

docs_url: str
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, force_full_refresh: bool = False) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424        *,
425        force_full_refresh: bool = False,
426    ) -> ConfiguredAirbyteCatalog:
427        """Get a configured catalog for the given streams.
428
429        If no streams are provided, the selected streams will be used. If no streams are selected,
430        all available streams will be used.
431
432        If '*' is provided, all available streams will be used.
433
434        If force_full_refresh is True, streams will be configured with full_refresh sync mode
435        when supported by the stream. Otherwise, incremental sync mode is used when supported.
436        """
437        selected_streams: list[str] = []
438        if streams is None:
439            selected_streams = self._selected_stream_names or self.get_available_streams()
440        elif streams == "*":
441            selected_streams = self.get_available_streams()
442        elif isinstance(streams, list):
443            selected_streams = streams
444        else:
445            raise exc.PyAirbyteInputError(
446                message="Invalid streams argument.",
447                input_value=streams,
448            )
449
450        def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
451            """Determine the sync mode for a stream based on force_full_refresh and support."""
452            # Use getattr to handle mocks or streams without supported_sync_modes attribute
453            supported_modes = getattr(stream, "supported_sync_modes", None)
454
455            if force_full_refresh:
456                # When force_full_refresh is True, prefer full_refresh if supported
457                if supported_modes and SyncMode.full_refresh in supported_modes:
458                    return SyncMode.full_refresh
459                # Fall back to incremental if full_refresh is not supported
460                return SyncMode.incremental
461
462            # Default behavior: preserve previous semantics (always incremental)
463            return SyncMode.incremental
464
465        return ConfiguredAirbyteCatalog(
466            streams=[
467                ConfiguredAirbyteStream(
468                    stream=stream,
469                    destination_sync_mode=DestinationSyncMode.overwrite,
470                    sync_mode=_get_sync_mode(stream),
471                    primary_key=(
472                        [self._primary_key_overrides[stream.name.lower()]]
473                        if stream.name.lower() in self._primary_key_overrides
474                        else stream.source_defined_primary_key
475                    ),
476                    cursor_field=(
477                        [self._cursor_key_overrides[stream.name.lower()]]
478                        if stream.name.lower() in self._cursor_key_overrides
479                        else stream.default_cursor_field
480                    ),
481                    # These are unused in the current implementation:
482                    generation_id=None,
483                    minimum_generation_id=None,
484                    sync_id=None,
485                )
486                for stream in self.discovered_catalog.streams
487                if stream.name in selected_streams
488            ],
489        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

If force_full_refresh is True, streams will be configured with full_refresh sync mode when supported by the stream. Otherwise, incremental sync mode is used when supported.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
491    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
492        """Return the JSON Schema spec for the specified stream name."""
493        catalog: AirbyteCatalog = self.discovered_catalog
494        found: list[AirbyteStream] = [
495            stream for stream in catalog.streams if stream.name == stream_name
496        ]
497
498        if len(found) == 0:
499            raise exc.PyAirbyteInputError(
500                message="Stream name does not exist in catalog.",
501                input_value=stream_name,
502            )
503
504        if len(found) > 1:
505            raise exc.PyAirbyteInternalError(
506                message="Duplicate streams found with the same name.",
507                context={
508                    "found_streams": found,
509                },
510            )
511
512        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, limit: int | None = None, stop_event: threading.Event | None = None, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
514    def get_records(
515        self,
516        stream: str,
517        *,
518        limit: int | None = None,
519        stop_event: threading.Event | None = None,
520        normalize_field_names: bool = False,
521        prune_undeclared_fields: bool = True,
522    ) -> LazyDataset:
523        """Read a stream from the connector.
524
525        Args:
526            stream: The name of the stream to read.
527            limit: The maximum number of records to read. If None, all records will be read.
528            stop_event: If set, the event can be triggered by the caller to stop reading records
529                and terminate the process.
530            normalize_field_names: When `True`, field names will be normalized to lower case, with
531                special characters removed. This matches the behavior of PyAirbyte caches and most
532                Airbyte destinations.
533            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
534                which generally matches the behavior of PyAirbyte caches and most Airbyte
535                destinations, specifically when you expect the catalog may be stale. You can disable
536                this to keep all fields in the records.
537
538        This involves the following steps:
539        * Call discover to get the catalog
540        * Generate a configured catalog that syncs the given stream in full_refresh mode
541        * Write the configured catalog and the config to a temporary file
542        * execute the connector with read --config <config_file> --catalog <catalog_file>
543        * Listen to the messages and return the first AirbyteRecordMessages that come along.
544        * Make sure the subprocess is killed when the function returns.
545        """
546        stop_event = stop_event or threading.Event()
547        configured_catalog = self.get_configured_catalog(streams=[stream])
548        if len(configured_catalog.streams) == 0:
549            raise exc.PyAirbyteInputError(
550                message="Requested stream does not exist.",
551                context={
552                    "stream": stream,
553                    "available_streams": self.get_available_streams(),
554                    "connector_name": self.name,
555                },
556            ) from KeyError(stream)
557
558        configured_stream = configured_catalog.streams[0]
559
560        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
561            yield from records
562
563        stream_record_handler = StreamRecordHandler(
564            json_schema=self.get_stream_json_schema(stream),
565            prune_extra_fields=prune_undeclared_fields,
566            normalize_keys=normalize_field_names,
567        )
568
569        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
570        progress_tracker = ProgressTracker(
571            ProgressStyle.PLAIN,
572            source=self,
573            cache=None,
574            destination=None,
575            expected_streams=[stream],
576        )
577
578        iterator: Iterator[dict[str, Any]] = (
579            StreamRecord.from_record_message(
580                record_message=record.record,
581                stream_record_handler=stream_record_handler,
582            )
583            for record in self._read_with_catalog(
584                catalog=configured_catalog,
585                progress_tracker=progress_tracker,
586                stop_event=stop_event,
587            )
588            if record.record
589        )
590        if limit is not None:
591            # Stop the iterator after the limit is reached
592            iterator = islice(iterator, limit)
593
594        return LazyDataset(
595            iterator,
596            stream_metadata=configured_stream,
597            stop_event=stop_event,
598            progress_tracker=progress_tracker,
599        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • limit: The maximum number of records to read. If None, all records will be read.
  • stop_event: If set, the event can be triggered by the caller to stop reading records and terminate the process.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
601    def get_documents(
602        self,
603        stream: str,
604        title_property: str | None = None,
605        content_properties: list[str] | None = None,
606        metadata_properties: list[str] | None = None,
607        *,
608        render_metadata: bool = False,
609    ) -> Iterable[Document]:
610        """Read a stream from the connector and return the records as documents.
611
612        If metadata_properties is not set, all properties that are not content will be added to
613        the metadata.
614
615        If render_metadata is True, metadata will be rendered in the document, as well as the
616        the main content.
617        """
618        return self.get_records(stream).to_documents(
619            title_property=title_property,
620            content_properties=content_properties,
621            metadata_properties=metadata_properties,
622            render_metadata=render_metadata,
623        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def get_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'raise') -> dict[str, airbyte.datasets._inmemory.InMemoryDataset | None]:
625    def get_samples(
626        self,
627        streams: list[str] | Literal["*"] | None = None,
628        *,
629        limit: int = 5,
630        on_error: Literal["raise", "ignore", "log"] = "raise",
631    ) -> dict[str, InMemoryDataset | None]:
632        """Get a sample of records from the given streams."""
633        if streams == "*":
634            streams = self.get_available_streams()
635        elif streams is None:
636            streams = self.get_selected_streams()
637
638        results: dict[str, InMemoryDataset | None] = {}
639        for stream in streams:
640            stop_event = threading.Event()
641            try:
642                results[stream] = self.get_records(
643                    stream,
644                    limit=limit,
645                    stop_event=stop_event,
646                ).fetch_all()
647                stop_event.set()
648            except Exception as ex:
649                results[stream] = None
650                if on_error == "ignore":
651                    continue
652
653                if on_error == "raise":
654                    raise ex from None
655
656                if on_error == "log":
657                    print(f"Error fetching sample for stream '{stream}': {ex}")
658
659        return results

Get a sample of records from the given streams.

def print_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'log') -> None:
661    def print_samples(
662        self,
663        streams: list[str] | Literal["*"] | None = None,
664        *,
665        limit: int = 5,
666        on_error: Literal["raise", "ignore", "log"] = "log",
667    ) -> None:
668        """Print a sample of records from the given streams."""
669        internal_cols: list[str] = [
670            AB_EXTRACTED_AT_COLUMN,
671            AB_META_COLUMN,
672            AB_RAW_ID_COLUMN,
673        ]
674        col_limit = 10
675        if streams == "*":
676            streams = self.get_available_streams()
677        elif streams is None:
678            streams = self.get_selected_streams()
679
680        console = Console()
681
682        console.print(
683            Markdown(
684                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
685                justify="left",
686            )
687        )
688
689        for stream in streams:
690            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
691            samples = self.get_samples(
692                streams=[stream],
693                limit=limit,
694                on_error=on_error,
695            )
696            dataset = samples[stream]
697
698            table = Table(
699                show_header=True,
700                show_lines=True,
701            )
702            if dataset is None:
703                console.print(
704                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
705                )
706                continue
707
708            if len(dataset.column_names) > col_limit:
709                # We'll pivot the columns so each column is its own row
710                table.add_column("Column Name")
711                for _ in range(len(dataset)):
712                    table.add_column(overflow="fold")
713                for col in dataset.column_names:
714                    table.add_row(
715                        Markdown(f"**`{col}`**"),
716                        *[escape(str(record[col])) for record in dataset],
717                    )
718            else:
719                for col in dataset.column_names:
720                    table.add_column(
721                        Markdown(f"**`{col}`**"),
722                        overflow="fold",
723                    )
724
725                for record in dataset:
726                    table.add_row(
727                        *[
728                            escape(str(val))
729                            for key, val in record.items()
730                            # Exclude internal Airbyte columns.
731                            if key not in internal_cols
732                        ]
733                    )
734
735            console.print(table)
736
737        console.print(Markdown("--------------"))

Print a sample of records from the given streams.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
839    def read(
840        self,
841        cache: CacheBase | None = None,
842        *,
843        streams: str | list[str] | None = None,
844        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
845        force_full_refresh: bool = False,
846        skip_validation: bool = False,
847    ) -> ReadResult:
848        """Read from the connector and write to the cache.
849
850        Args:
851            cache: The cache to write to. If not set, a default cache will be used.
852            streams: Optional if already set. A list of stream names to select for reading. If set
853                to "*", all streams will be selected.
854            write_strategy: The strategy to use when writing to the cache. If a string, it must be
855                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
856                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
857                WriteStrategy.AUTO.
858            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
859                streams will be read in incremental mode if supported by the connector. This option
860                must be True when using the "replace" strategy.
861            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
862                running the connector. This can be helpful in debugging, when you want to send
863                configurations to the connector that otherwise might be rejected by JSON Schema
864                validation rules.
865        """
866        cache = cache or get_default_cache()
867        progress_tracker = ProgressTracker(
868            source=self,
869            cache=cache,
870            destination=None,
871            expected_streams=None,  # Will be set later
872        )
873
874        # Set up state provider if not in full refresh mode
875        if force_full_refresh:
876            state_provider: StateProviderBase | None = None
877        else:
878            state_provider = cache.get_state_provider(
879                source_name=self._name,
880            )
881        state_writer = cache.get_state_writer(source_name=self._name)
882
883        if streams:
884            self.select_streams(streams)
885
886        if not self._selected_stream_names:
887            raise exc.PyAirbyteNoStreamsSelectedError(
888                connector_name=self.name,
889                available_streams=self.get_available_streams(),
890            )
891
892        try:
893            result = self._read_to_cache(
894                cache=cache,
895                catalog_provider=CatalogProvider(
896                    self.get_configured_catalog(force_full_refresh=force_full_refresh)
897                ),
898                stream_names=self._selected_stream_names,
899                state_provider=state_provider,
900                state_writer=state_writer,
901                write_strategy=write_strategy,
902                force_full_refresh=force_full_refresh,
903                skip_validation=skip_validation,
904                progress_tracker=progress_tracker,
905            )
906        except exc.PyAirbyteInternalError as ex:
907            progress_tracker.log_failure(exception=ex)
908            raise exc.AirbyteConnectorFailedError(
909                connector_name=self.name,
910                log_text=self._last_log_messages,
911            ) from ex
912        except Exception as ex:
913            progress_tracker.log_failure(exception=ex)
914            raise
915
916        progress_tracker.log_success()
917        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.