airbyte.sources.base

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2from __future__ import annotations
  3
  4import json
  5import warnings
  6from pathlib import Path
  7from typing import TYPE_CHECKING, Any, cast
  8
  9import jsonschema
 10import pendulum
 11import yaml
 12from rich import print
 13from rich.syntax import Syntax
 14from typing_extensions import Literal
 15
 16from airbyte_protocol.models import (
 17    AirbyteCatalog,
 18    AirbyteMessage,
 19    ConfiguredAirbyteCatalog,
 20    ConfiguredAirbyteStream,
 21    ConnectorSpecification,
 22    DestinationSyncMode,
 23    Status,
 24    SyncMode,
 25    TraceType,
 26    Type,
 27)
 28
 29from airbyte import exceptions as exc
 30from airbyte._future_cdk.catalog_providers import CatalogProvider
 31from airbyte._util.telemetry import (
 32    EventState,
 33    EventType,
 34    log_config_validation_result,
 35    log_source_check_result,
 36    send_telemetry,
 37)
 38from airbyte._util.temp_files import as_temp_files
 39from airbyte.caches.util import get_default_cache
 40from airbyte.datasets._lazy import LazyDataset
 41from airbyte.progress import progress
 42from airbyte.records import StreamRecord
 43from airbyte.results import ReadResult
 44from airbyte.strategies import WriteStrategy
 45from airbyte.warnings import PyAirbyteDataLossWarning
 46
 47
 48if TYPE_CHECKING:
 49    from collections.abc import Generator, Iterable, Iterator
 50
 51    from airbyte_protocol.models.airbyte_protocol import AirbyteStream
 52
 53    from airbyte._executor import Executor
 54    from airbyte._future_cdk.state_providers import StateProviderBase
 55    from airbyte.caches import CacheBase
 56    from airbyte.documents import Document
 57
 58
 59class Source:  # noqa: PLR0904  # Ignore max publish methods
 60    """A class representing a source that can be called."""
 61
 62    def __init__(
 63        self,
 64        executor: Executor,
 65        name: str,
 66        config: dict[str, Any] | None = None,
 67        streams: str | list[str] | None = None,
 68        *,
 69        validate: bool = False,
 70    ) -> None:
 71        """Initialize the source.
 72
 73        If config is provided, it will be validated against the spec if validate is True.
 74        """
 75        self.executor = executor
 76        self.name = name
 77        self._processed_records = 0
 78        self._config_dict: dict[str, Any] | None = None
 79        self._last_log_messages: list[str] = []
 80        self._discovered_catalog: AirbyteCatalog | None = None
 81        self._spec: ConnectorSpecification | None = None
 82        self._selected_stream_names: list[str] = []
 83        if config is not None:
 84            self.set_config(config, validate=validate)
 85        if streams is not None:
 86            self.select_streams(streams)
 87
 88        self._deployed_api_root: str | None = None
 89        self._deployed_workspace_id: str | None = None
 90        self._deployed_source_id: str | None = None
 91
 92    def set_streams(self, streams: list[str]) -> None:
 93        """Deprecated. See select_streams()."""
 94        warnings.warn(
 95            "The 'set_streams' method is deprecated and will be removed in a future version. "
 96            "Please use the 'select_streams' method instead.",
 97            DeprecationWarning,
 98            stacklevel=2,
 99        )
100        self.select_streams(streams)
101
102    def select_all_streams(self) -> None:
103        """Select all streams.
104
105        This is a more streamlined equivalent to:
106        > source.select_streams(source.get_available_streams()).
107        """
108        self._selected_stream_names = self.get_available_streams()
109
110    def select_streams(self, streams: str | list[str]) -> None:
111        """Select the stream names that should be read from the connector.
112
113        Args:
114        - streams: A list of stream names to select. If set to "*", all streams will be selected.
115
116        Currently, if this is not set, all streams will be read.
117        """
118        if streams == "*":
119            self.select_all_streams()
120            return
121
122        if isinstance(streams, str):
123            # If a single stream is provided, convert it to a one-item list
124            streams = [streams]
125
126        available_streams = self.get_available_streams()
127        for stream in streams:
128            if stream not in available_streams:
129                raise exc.AirbyteStreamNotFoundError(
130                    stream_name=stream,
131                    connector_name=self.name,
132                    available_streams=available_streams,
133                )
134        self._selected_stream_names = streams
135
136    def get_selected_streams(self) -> list[str]:
137        """Get the selected streams.
138
139        If no streams are selected, return an empty list.
140        """
141        return self._selected_stream_names
142
143    def set_config(
144        self,
145        config: dict[str, Any],
146        *,
147        validate: bool = True,
148    ) -> None:
149        """Set the config for the connector.
150
151        If validate is True, raise an exception if the config fails validation.
152
153        If validate is False, validation will be deferred until check() or validate_config()
154        is called.
155        """
156        if validate:
157            self.validate_config(config)
158
159        self._config_dict = config
160
161    def get_config(self) -> dict[str, Any]:
162        """Get the config for the connector."""
163        return self._config
164
165    @property
166    def _config(self) -> dict[str, Any]:
167        if self._config_dict is None:
168            raise exc.AirbyteConnectorConfigurationMissingError(
169                guidance="Provide via get_source() or set_config()"
170            )
171        return self._config_dict
172
173    def _discover(self) -> AirbyteCatalog:
174        """Call discover on the connector.
175
176        This involves the following steps:
177        * Write the config to a temporary file
178        * execute the connector with discover --config <config_file>
179        * Listen to the messages and return the first AirbyteCatalog that comes along.
180        * Make sure the subprocess is killed when the function returns.
181        """
182        with as_temp_files([self._config]) as [config_file]:
183            for msg in self._execute(["discover", "--config", config_file]):
184                if msg.type == Type.CATALOG and msg.catalog:
185                    return msg.catalog
186            raise exc.AirbyteConnectorMissingCatalogError(
187                log_text=self._last_log_messages,
188            )
189
190    def validate_config(self, config: dict[str, Any] | None = None) -> None:
191        """Validate the config against the spec.
192
193        If config is not provided, the already-set config will be validated.
194        """
195        spec = self._get_spec(force_refresh=False)
196        config = self._config if config is None else config
197        try:
198            jsonschema.validate(config, spec.connectionSpecification)
199            log_config_validation_result(
200                name=self.name,
201                state=EventState.SUCCEEDED,
202            )
203        except jsonschema.ValidationError as ex:
204            validation_ex = exc.AirbyteConnectorValidationFailedError(
205                message="The provided config is not valid.",
206                context={
207                    "error_message": ex.message,
208                    "error_path": ex.path,
209                    "error_instance": ex.instance,
210                    "error_schema": ex.schema,
211                },
212            )
213            log_config_validation_result(
214                name=self.name,
215                state=EventState.FAILED,
216                exception=validation_ex,
217            )
218            raise validation_ex from ex
219
220    def get_available_streams(self) -> list[str]:
221        """Get the available streams from the spec."""
222        return [s.name for s in self.discovered_catalog.streams]
223
224    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
225        """Call spec on the connector.
226
227        This involves the following steps:
228        * execute the connector with spec
229        * Listen to the messages and return the first AirbyteCatalog that comes along.
230        * Make sure the subprocess is killed when the function returns.
231        """
232        if force_refresh or self._spec is None:
233            for msg in self._execute(["spec"]):
234                if msg.type == Type.SPEC and msg.spec:
235                    self._spec = msg.spec
236                    break
237
238        if self._spec:
239            return self._spec
240
241        raise exc.AirbyteConnectorMissingSpecError(
242            log_text=self._last_log_messages,
243        )
244
245    @property
246    def config_spec(self) -> dict[str, Any]:
247        """Generate a configuration spec for this connector, as a JSON Schema definition.
248
249        This function generates a JSON Schema dictionary with configuration specs for the
250        current connector, as a dictionary.
251
252        Returns:
253            dict: The JSON Schema configuration spec as a dictionary.
254        """
255        return self._get_spec(force_refresh=True).connectionSpecification
256
257    def print_config_spec(
258        self,
259        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
260        *,
261        output_file: Path | str | None = None,
262    ) -> None:
263        """Print the configuration spec for this connector.
264
265        Args:
266        - format: The format to print the spec in. Must be "yaml" or "json".
267        - output_file: Optional. If set, the spec will be written to the given file path. Otherwise,
268          it will be printed to the console.
269        """
270        if format not in {"yaml", "json"}:
271            raise exc.PyAirbyteInputError(
272                message="Invalid format. Expected 'yaml' or 'json'",
273                input_value=format,
274            )
275        if isinstance(output_file, str):
276            output_file = Path(output_file)
277
278        if format == "yaml":
279            content = yaml.dump(self.config_spec, indent=2)
280        elif format == "json":
281            content = json.dumps(self.config_spec, indent=2)
282
283        if output_file:
284            output_file.write_text(content)
285            return
286
287        syntax_highlighted = Syntax(content, format)
288        print(syntax_highlighted)
289
290    @property
291    def _yaml_spec(self) -> str:
292        """Get the spec as a yaml string.
293
294        For now, the primary use case is for writing and debugging a valid config for a source.
295
296        This is private for now because we probably want better polish before exposing this
297        as a stable interface. This will also get easier when we have docs links with this info
298        for each connector.
299        """
300        spec_obj: ConnectorSpecification = self._get_spec()
301        spec_dict = spec_obj.dict(exclude_unset=True)
302        # convert to a yaml string
303        return yaml.dump(spec_dict)
304
305    @property
306    def docs_url(self) -> str:
307        """Get the URL to the connector's documentation."""
308        # TODO: Replace with docs URL from metadata when available
309        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
310            "source-", ""
311        )
312
313    @property
314    def discovered_catalog(self) -> AirbyteCatalog:
315        """Get the raw catalog for the given streams.
316
317        If the catalog is not yet known, we call discover to get it.
318        """
319        if self._discovered_catalog is None:
320            self._discovered_catalog = self._discover()
321
322        return self._discovered_catalog
323
324    @property
325    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
326        """Get the configured catalog for the given streams.
327
328        If the raw catalog is not yet known, we call discover to get it.
329
330        If no specific streams are selected, we return a catalog that syncs all available streams.
331
332        TODO: We should consider disabling by default the streams that the connector would
333        disable by default. (For instance, streams that require a premium license are sometimes
334        disabled by default within the connector.)
335        """
336        # Ensure discovered catalog is cached before we start
337        _ = self.discovered_catalog
338
339        # Filter for selected streams if set, otherwise use all available streams:
340        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
341
342        return ConfiguredAirbyteCatalog(
343            streams=[
344                ConfiguredAirbyteStream(
345                    stream=stream,
346                    destination_sync_mode=DestinationSyncMode.overwrite,
347                    primary_key=stream.source_defined_primary_key,
348                    # TODO: The below assumes all sources can coalesce from incremental sync to
349                    # full_table as needed. CDK supports this, so it might be safe:
350                    sync_mode=SyncMode.incremental,
351                )
352                for stream in self.discovered_catalog.streams
353                if stream.name in streams_filter
354            ],
355        )
356
357    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
358        """Return the JSON Schema spec for the specified stream name."""
359        catalog: AirbyteCatalog = self.discovered_catalog
360        found: list[AirbyteStream] = [
361            stream for stream in catalog.streams if stream.name == stream_name
362        ]
363
364        if len(found) == 0:
365            raise exc.PyAirbyteInputError(
366                message="Stream name does not exist in catalog.",
367                input_value=stream_name,
368            )
369
370        if len(found) > 1:
371            raise exc.PyAirbyteInternalError(
372                message="Duplicate streams found with the same name.",
373                context={
374                    "found_streams": found,
375                },
376            )
377
378        return found[0].json_schema
379
380    def get_records(self, stream: str) -> LazyDataset:
381        """Read a stream from the connector.
382
383        This involves the following steps:
384        * Call discover to get the catalog
385        * Generate a configured catalog that syncs the given stream in full_refresh mode
386        * Write the configured catalog and the config to a temporary file
387        * execute the connector with read --config <config_file> --catalog <catalog_file>
388        * Listen to the messages and return the first AirbyteRecordMessages that come along.
389        * Make sure the subprocess is killed when the function returns.
390        """
391        discovered_catalog: AirbyteCatalog = self.discovered_catalog
392        configured_catalog = ConfiguredAirbyteCatalog(
393            streams=[
394                ConfiguredAirbyteStream(
395                    stream=s,
396                    sync_mode=SyncMode.full_refresh,
397                    destination_sync_mode=DestinationSyncMode.overwrite,
398                )
399                for s in discovered_catalog.streams
400                if s.name == stream
401            ],
402        )
403        if len(configured_catalog.streams) == 0:
404            raise exc.PyAirbyteInputError(
405                message="Requested stream does not exist.",
406                context={
407                    "stream": stream,
408                    "available_streams": self.get_available_streams(),
409                    "connector_name": self.name,
410                },
411            ) from KeyError(stream)
412
413        configured_stream = configured_catalog.streams[0]
414        all_properties = cast(
415            list[str], list(configured_stream.stream.json_schema["properties"].keys())
416        )
417
418        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
419            self._log_sync_start(cache=None)
420            yield from records
421            self._log_sync_success(cache=None)
422
423        iterator: Iterator[dict[str, Any]] = _with_logging(
424            records=(  # Generator comprehension yields StreamRecord objects for each record
425                StreamRecord.from_record_message(
426                    record_message=record.record,
427                    expected_keys=all_properties,
428                    prune_extra_fields=True,
429                )
430                for record in self._read_with_catalog(configured_catalog)
431                if record.record
432            )
433        )
434        return LazyDataset(
435            iterator,
436            stream_metadata=configured_stream,
437        )
438
439    @property
440    def connector_version(self) -> str | None:
441        """Return the version of the connector as reported by the executor.
442
443        Returns None if the version cannot be determined.
444        """
445        return self.executor.get_installed_version()
446
447    def get_documents(
448        self,
449        stream: str,
450        title_property: str | None = None,
451        content_properties: list[str] | None = None,
452        metadata_properties: list[str] | None = None,
453        *,
454        render_metadata: bool = False,
455    ) -> Iterable[Document]:
456        """Read a stream from the connector and return the records as documents.
457
458        If metadata_properties is not set, all properties that are not content will be added to
459        the metadata.
460
461        If render_metadata is True, metadata will be rendered in the document, as well as the
462        the main content.
463        """
464        return self.get_records(stream).to_documents(
465            title_property=title_property,
466            content_properties=content_properties,
467            metadata_properties=metadata_properties,
468            render_metadata=render_metadata,
469        )
470
471    def check(self) -> None:
472        """Call check on the connector.
473
474        This involves the following steps:
475        * Write the config to a temporary file
476        * execute the connector with check --config <config_file>
477        * Listen to the messages and return the first AirbyteCatalog that comes along.
478        * Make sure the subprocess is killed when the function returns.
479        """
480        with as_temp_files([self._config]) as [config_file]:
481            try:
482                for msg in self._execute(["check", "--config", config_file]):
483                    if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
484                        if msg.connectionStatus.status != Status.FAILED:
485                            print(f"Connection check succeeded for `{self.name}`.")
486                            log_source_check_result(
487                                name=self.name,
488                                state=EventState.SUCCEEDED,
489                            )
490                            return
491
492                        log_source_check_result(
493                            name=self.name,
494                            state=EventState.FAILED,
495                        )
496                        raise exc.AirbyteConnectorCheckFailedError(
497                            help_url=self.docs_url,
498                            context={
499                                "failure_reason": msg.connectionStatus.message,
500                            },
501                        )
502                raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
503            except exc.AirbyteConnectorReadError as ex:
504                raise exc.AirbyteConnectorCheckFailedError(
505                    message="The connector failed to check the connection.",
506                    log_text=ex.log_text,
507                ) from ex
508
509    def install(self) -> None:
510        """Install the connector if it is not yet installed."""
511        self.executor.install()
512        print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n")
513
514    def uninstall(self) -> None:
515        """Uninstall the connector if it is installed.
516
517        This only works if the use_local_install flag wasn't used and installation is managed by
518        PyAirbyte.
519        """
520        self.executor.uninstall()
521
522    def _read_with_catalog(
523        self,
524        catalog: ConfiguredAirbyteCatalog,
525        state: StateProviderBase | None = None,
526    ) -> Iterator[AirbyteMessage]:
527        """Call read on the connector.
528
529        This involves the following steps:
530        * Write the config to a temporary file
531        * execute the connector with read --config <config_file> --catalog <catalog_file>
532        * Listen to the messages and return the AirbyteRecordMessages that come along.
533        * Send out telemetry on the performed sync (with information about which source was used and
534          the type of the cache)
535        """
536        self._processed_records = 0  # Reset the counter before we start
537        with as_temp_files(
538            [
539                self._config,
540                catalog.json(),
541                state.to_state_input_file_text() if state else "[]",
542            ]
543        ) as [
544            config_file,
545            catalog_file,
546            state_file,
547        ]:
548            yield from self._tally_records(
549                self._execute(
550                    [
551                        "read",
552                        "--config",
553                        config_file,
554                        "--catalog",
555                        catalog_file,
556                        "--state",
557                        state_file,
558                    ],
559                )
560            )
561
562    def _add_to_logs(self, message: str) -> None:
563        self._last_log_messages.append(message)
564        self._last_log_messages = self._last_log_messages[-10:]
565
566    def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]:
567        """Execute the connector with the given arguments.
568
569        This involves the following steps:
570        * Locate the right venv. It is called ".venv-<connector_name>"
571        * Spawn a subprocess with .venv-<connector_name>/bin/<connector-name> <args>
572        * Read the output line by line of the subprocess and serialize them AirbyteMessage objects.
573          Drop if not valid.
574        """
575        # Fail early if the connector is not installed.
576        self.executor.ensure_installation(auto_fix=False)
577
578        try:
579            self._last_log_messages = []
580            for line in self.executor.execute(args):
581                try:
582                    message = AirbyteMessage.parse_raw(line)
583                    if message.type is Type.RECORD:
584                        self._processed_records += 1
585                    if message.type == Type.LOG:
586                        self._add_to_logs(message.log.message)
587                    if message.type == Type.TRACE and message.trace.type == TraceType.ERROR:
588                        self._add_to_logs(message.trace.error.message)
589                    yield message
590                except Exception:
591                    self._add_to_logs(line)
592        except Exception as e:
593            raise exc.AirbyteConnectorReadError(
594                log_text=self._last_log_messages,
595            ) from e
596
597    def _tally_records(
598        self,
599        messages: Iterable[AirbyteMessage],
600    ) -> Generator[AirbyteMessage, Any, None]:
601        """This method simply tallies the number of records processed and yields the messages."""
602        self._processed_records = 0  # Reset the counter before we start
603        progress.reset(len(self._selected_stream_names or []))
604
605        for message in messages:
606            yield message
607            progress.log_records_read(new_total_count=self._processed_records)
608
609    def _log_sync_start(
610        self,
611        *,
612        cache: CacheBase | None,
613    ) -> None:
614        """Log the start of a sync operation."""
615        print(f"Started `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}...")
616        send_telemetry(
617            source=self,
618            cache=cache,
619            state=EventState.STARTED,
620            event_type=EventType.SYNC,
621        )
622
623    def _log_sync_success(
624        self,
625        *,
626        cache: CacheBase | None,
627    ) -> None:
628        """Log the success of a sync operation."""
629        print(f"Completed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.")
630        send_telemetry(
631            source=self,
632            cache=cache,
633            state=EventState.SUCCEEDED,
634            number_of_records=self._processed_records,
635            event_type=EventType.SYNC,
636        )
637
638    def _log_sync_failure(
639        self,
640        *,
641        cache: CacheBase | None,
642        exception: Exception,
643    ) -> None:
644        """Log the failure of a sync operation."""
645        print(f"Failed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.")
646        send_telemetry(
647            state=EventState.FAILED,
648            source=self,
649            cache=cache,
650            number_of_records=self._processed_records,
651            exception=exception,
652            event_type=EventType.SYNC,
653        )
654
655    def read(
656        self,
657        cache: CacheBase | None = None,
658        *,
659        streams: str | list[str] | None = None,
660        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
661        force_full_refresh: bool = False,
662        skip_validation: bool = False,
663    ) -> ReadResult:
664        """Read from the connector and write to the cache.
665
666        Args:
667            cache: The cache to write to. If None, a default cache will be used.
668            write_strategy: The strategy to use when writing to the cache. If a string, it must be
669                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
670                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
671                WriteStrategy.AUTO.
672            streams: Optional if already set. A list of stream names to select for reading. If set
673                to "*", all streams will be selected.
674            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
675                streams will be read in incremental mode if supported by the connector. This option
676                must be True when using the "replace" strategy.
677        """
678        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
679            warnings.warn(
680                message=(
681                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
682                    "could result in data loss. "
683                    "To silence this warning, use the following: "
684                    'warnings.filterwarnings("ignore", '
685                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
686                ),
687                category=PyAirbyteDataLossWarning,
688                stacklevel=1,
689            )
690        if isinstance(write_strategy, str):
691            try:
692                write_strategy = WriteStrategy(write_strategy)
693            except ValueError:
694                raise exc.PyAirbyteInputError(
695                    message="Invalid strategy",
696                    context={
697                        "write_strategy": write_strategy,
698                        "available_strategies": [s.value for s in WriteStrategy],
699                    },
700                ) from None
701
702        if streams:
703            self.select_streams(streams)
704
705        if not self._selected_stream_names:
706            raise exc.PyAirbyteNoStreamsSelectedError(
707                connector_name=self.name,
708                available_streams=self.get_available_streams(),
709            )
710
711        # Run optional validation step
712        if not skip_validation:
713            self.validate_config()
714
715        # Set up cache and related resources
716        if cache is None:
717            cache = get_default_cache()
718
719        # Set up state provider if not in full refresh mode
720        if force_full_refresh:
721            state_provider: StateProviderBase | None = None
722        else:
723            state_provider = cache.get_state_provider(
724                source_name=self.name,
725            )
726
727        self._log_sync_start(cache=cache)
728
729        cache_processor = cache.get_record_processor(
730            source_name=self.name,
731            catalog_provider=CatalogProvider(self.configured_catalog),
732        )
733        try:
734            cache_processor.process_airbyte_messages(
735                self._read_with_catalog(
736                    catalog=self.configured_catalog,
737                    state=state_provider,
738                ),
739                write_strategy=write_strategy,
740            )
741
742        # TODO: We should catch more specific exceptions here
743        except Exception as ex:
744            self._log_sync_failure(cache=cache, exception=ex)
745            raise exc.AirbyteConnectorFailedError(
746                log_text=self._last_log_messages,
747            ) from ex
748
749        self._log_sync_success(cache=cache)
750        return ReadResult(
751            processed_records=self._processed_records,
752            cache=cache,
753            processed_streams=[stream.stream.name for stream in self.configured_catalog.streams],
754        )
755
756
757__all__ = [
758    "Source",
759]
class Source:
 60class Source:  # noqa: PLR0904  # Ignore max publish methods
 61    """A class representing a source that can be called."""
 62
 63    def __init__(
 64        self,
 65        executor: Executor,
 66        name: str,
 67        config: dict[str, Any] | None = None,
 68        streams: str | list[str] | None = None,
 69        *,
 70        validate: bool = False,
 71    ) -> None:
 72        """Initialize the source.
 73
 74        If config is provided, it will be validated against the spec if validate is True.
 75        """
 76        self.executor = executor
 77        self.name = name
 78        self._processed_records = 0
 79        self._config_dict: dict[str, Any] | None = None
 80        self._last_log_messages: list[str] = []
 81        self._discovered_catalog: AirbyteCatalog | None = None
 82        self._spec: ConnectorSpecification | None = None
 83        self._selected_stream_names: list[str] = []
 84        if config is not None:
 85            self.set_config(config, validate=validate)
 86        if streams is not None:
 87            self.select_streams(streams)
 88
 89        self._deployed_api_root: str | None = None
 90        self._deployed_workspace_id: str | None = None
 91        self._deployed_source_id: str | None = None
 92
 93    def set_streams(self, streams: list[str]) -> None:
 94        """Deprecated. See select_streams()."""
 95        warnings.warn(
 96            "The 'set_streams' method is deprecated and will be removed in a future version. "
 97            "Please use the 'select_streams' method instead.",
 98            DeprecationWarning,
 99            stacklevel=2,
100        )
101        self.select_streams(streams)
102
103    def select_all_streams(self) -> None:
104        """Select all streams.
105
106        This is a more streamlined equivalent to:
107        > source.select_streams(source.get_available_streams()).
108        """
109        self._selected_stream_names = self.get_available_streams()
110
111    def select_streams(self, streams: str | list[str]) -> None:
112        """Select the stream names that should be read from the connector.
113
114        Args:
115        - streams: A list of stream names to select. If set to "*", all streams will be selected.
116
117        Currently, if this is not set, all streams will be read.
118        """
119        if streams == "*":
120            self.select_all_streams()
121            return
122
123        if isinstance(streams, str):
124            # If a single stream is provided, convert it to a one-item list
125            streams = [streams]
126
127        available_streams = self.get_available_streams()
128        for stream in streams:
129            if stream not in available_streams:
130                raise exc.AirbyteStreamNotFoundError(
131                    stream_name=stream,
132                    connector_name=self.name,
133                    available_streams=available_streams,
134                )
135        self._selected_stream_names = streams
136
137    def get_selected_streams(self) -> list[str]:
138        """Get the selected streams.
139
140        If no streams are selected, return an empty list.
141        """
142        return self._selected_stream_names
143
144    def set_config(
145        self,
146        config: dict[str, Any],
147        *,
148        validate: bool = True,
149    ) -> None:
150        """Set the config for the connector.
151
152        If validate is True, raise an exception if the config fails validation.
153
154        If validate is False, validation will be deferred until check() or validate_config()
155        is called.
156        """
157        if validate:
158            self.validate_config(config)
159
160        self._config_dict = config
161
162    def get_config(self) -> dict[str, Any]:
163        """Get the config for the connector."""
164        return self._config
165
166    @property
167    def _config(self) -> dict[str, Any]:
168        if self._config_dict is None:
169            raise exc.AirbyteConnectorConfigurationMissingError(
170                guidance="Provide via get_source() or set_config()"
171            )
172        return self._config_dict
173
174    def _discover(self) -> AirbyteCatalog:
175        """Call discover on the connector.
176
177        This involves the following steps:
178        * Write the config to a temporary file
179        * execute the connector with discover --config <config_file>
180        * Listen to the messages and return the first AirbyteCatalog that comes along.
181        * Make sure the subprocess is killed when the function returns.
182        """
183        with as_temp_files([self._config]) as [config_file]:
184            for msg in self._execute(["discover", "--config", config_file]):
185                if msg.type == Type.CATALOG and msg.catalog:
186                    return msg.catalog
187            raise exc.AirbyteConnectorMissingCatalogError(
188                log_text=self._last_log_messages,
189            )
190
191    def validate_config(self, config: dict[str, Any] | None = None) -> None:
192        """Validate the config against the spec.
193
194        If config is not provided, the already-set config will be validated.
195        """
196        spec = self._get_spec(force_refresh=False)
197        config = self._config if config is None else config
198        try:
199            jsonschema.validate(config, spec.connectionSpecification)
200            log_config_validation_result(
201                name=self.name,
202                state=EventState.SUCCEEDED,
203            )
204        except jsonschema.ValidationError as ex:
205            validation_ex = exc.AirbyteConnectorValidationFailedError(
206                message="The provided config is not valid.",
207                context={
208                    "error_message": ex.message,
209                    "error_path": ex.path,
210                    "error_instance": ex.instance,
211                    "error_schema": ex.schema,
212                },
213            )
214            log_config_validation_result(
215                name=self.name,
216                state=EventState.FAILED,
217                exception=validation_ex,
218            )
219            raise validation_ex from ex
220
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]
224
225    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
226        """Call spec on the connector.
227
228        This involves the following steps:
229        * execute the connector with spec
230        * Listen to the messages and return the first AirbyteCatalog that comes along.
231        * Make sure the subprocess is killed when the function returns.
232        """
233        if force_refresh or self._spec is None:
234            for msg in self._execute(["spec"]):
235                if msg.type == Type.SPEC and msg.spec:
236                    self._spec = msg.spec
237                    break
238
239        if self._spec:
240            return self._spec
241
242        raise exc.AirbyteConnectorMissingSpecError(
243            log_text=self._last_log_messages,
244        )
245
246    @property
247    def config_spec(self) -> dict[str, Any]:
248        """Generate a configuration spec for this connector, as a JSON Schema definition.
249
250        This function generates a JSON Schema dictionary with configuration specs for the
251        current connector, as a dictionary.
252
253        Returns:
254            dict: The JSON Schema configuration spec as a dictionary.
255        """
256        return self._get_spec(force_refresh=True).connectionSpecification
257
258    def print_config_spec(
259        self,
260        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
261        *,
262        output_file: Path | str | None = None,
263    ) -> None:
264        """Print the configuration spec for this connector.
265
266        Args:
267        - format: The format to print the spec in. Must be "yaml" or "json".
268        - output_file: Optional. If set, the spec will be written to the given file path. Otherwise,
269          it will be printed to the console.
270        """
271        if format not in {"yaml", "json"}:
272            raise exc.PyAirbyteInputError(
273                message="Invalid format. Expected 'yaml' or 'json'",
274                input_value=format,
275            )
276        if isinstance(output_file, str):
277            output_file = Path(output_file)
278
279        if format == "yaml":
280            content = yaml.dump(self.config_spec, indent=2)
281        elif format == "json":
282            content = json.dumps(self.config_spec, indent=2)
283
284        if output_file:
285            output_file.write_text(content)
286            return
287
288        syntax_highlighted = Syntax(content, format)
289        print(syntax_highlighted)
290
291    @property
292    def _yaml_spec(self) -> str:
293        """Get the spec as a yaml string.
294
295        For now, the primary use case is for writing and debugging a valid config for a source.
296
297        This is private for now because we probably want better polish before exposing this
298        as a stable interface. This will also get easier when we have docs links with this info
299        for each connector.
300        """
301        spec_obj: ConnectorSpecification = self._get_spec()
302        spec_dict = spec_obj.dict(exclude_unset=True)
303        # convert to a yaml string
304        return yaml.dump(spec_dict)
305
306    @property
307    def docs_url(self) -> str:
308        """Get the URL to the connector's documentation."""
309        # TODO: Replace with docs URL from metadata when available
310        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
311            "source-", ""
312        )
313
314    @property
315    def discovered_catalog(self) -> AirbyteCatalog:
316        """Get the raw catalog for the given streams.
317
318        If the catalog is not yet known, we call discover to get it.
319        """
320        if self._discovered_catalog is None:
321            self._discovered_catalog = self._discover()
322
323        return self._discovered_catalog
324
325    @property
326    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
327        """Get the configured catalog for the given streams.
328
329        If the raw catalog is not yet known, we call discover to get it.
330
331        If no specific streams are selected, we return a catalog that syncs all available streams.
332
333        TODO: We should consider disabling by default the streams that the connector would
334        disable by default. (For instance, streams that require a premium license are sometimes
335        disabled by default within the connector.)
336        """
337        # Ensure discovered catalog is cached before we start
338        _ = self.discovered_catalog
339
340        # Filter for selected streams if set, otherwise use all available streams:
341        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
342
343        return ConfiguredAirbyteCatalog(
344            streams=[
345                ConfiguredAirbyteStream(
346                    stream=stream,
347                    destination_sync_mode=DestinationSyncMode.overwrite,
348                    primary_key=stream.source_defined_primary_key,
349                    # TODO: The below assumes all sources can coalesce from incremental sync to
350                    # full_table as needed. CDK supports this, so it might be safe:
351                    sync_mode=SyncMode.incremental,
352                )
353                for stream in self.discovered_catalog.streams
354                if stream.name in streams_filter
355            ],
356        )
357
358    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
359        """Return the JSON Schema spec for the specified stream name."""
360        catalog: AirbyteCatalog = self.discovered_catalog
361        found: list[AirbyteStream] = [
362            stream for stream in catalog.streams if stream.name == stream_name
363        ]
364
365        if len(found) == 0:
366            raise exc.PyAirbyteInputError(
367                message="Stream name does not exist in catalog.",
368                input_value=stream_name,
369            )
370
371        if len(found) > 1:
372            raise exc.PyAirbyteInternalError(
373                message="Duplicate streams found with the same name.",
374                context={
375                    "found_streams": found,
376                },
377            )
378
379        return found[0].json_schema
380
381    def get_records(self, stream: str) -> LazyDataset:
382        """Read a stream from the connector.
383
384        This involves the following steps:
385        * Call discover to get the catalog
386        * Generate a configured catalog that syncs the given stream in full_refresh mode
387        * Write the configured catalog and the config to a temporary file
388        * execute the connector with read --config <config_file> --catalog <catalog_file>
389        * Listen to the messages and return the first AirbyteRecordMessages that come along.
390        * Make sure the subprocess is killed when the function returns.
391        """
392        discovered_catalog: AirbyteCatalog = self.discovered_catalog
393        configured_catalog = ConfiguredAirbyteCatalog(
394            streams=[
395                ConfiguredAirbyteStream(
396                    stream=s,
397                    sync_mode=SyncMode.full_refresh,
398                    destination_sync_mode=DestinationSyncMode.overwrite,
399                )
400                for s in discovered_catalog.streams
401                if s.name == stream
402            ],
403        )
404        if len(configured_catalog.streams) == 0:
405            raise exc.PyAirbyteInputError(
406                message="Requested stream does not exist.",
407                context={
408                    "stream": stream,
409                    "available_streams": self.get_available_streams(),
410                    "connector_name": self.name,
411                },
412            ) from KeyError(stream)
413
414        configured_stream = configured_catalog.streams[0]
415        all_properties = cast(
416            list[str], list(configured_stream.stream.json_schema["properties"].keys())
417        )
418
419        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
420            self._log_sync_start(cache=None)
421            yield from records
422            self._log_sync_success(cache=None)
423
424        iterator: Iterator[dict[str, Any]] = _with_logging(
425            records=(  # Generator comprehension yields StreamRecord objects for each record
426                StreamRecord.from_record_message(
427                    record_message=record.record,
428                    expected_keys=all_properties,
429                    prune_extra_fields=True,
430                )
431                for record in self._read_with_catalog(configured_catalog)
432                if record.record
433            )
434        )
435        return LazyDataset(
436            iterator,
437            stream_metadata=configured_stream,
438        )
439
440    @property
441    def connector_version(self) -> str | None:
442        """Return the version of the connector as reported by the executor.
443
444        Returns None if the version cannot be determined.
445        """
446        return self.executor.get_installed_version()
447
448    def get_documents(
449        self,
450        stream: str,
451        title_property: str | None = None,
452        content_properties: list[str] | None = None,
453        metadata_properties: list[str] | None = None,
454        *,
455        render_metadata: bool = False,
456    ) -> Iterable[Document]:
457        """Read a stream from the connector and return the records as documents.
458
459        If metadata_properties is not set, all properties that are not content will be added to
460        the metadata.
461
462        If render_metadata is True, metadata will be rendered in the document, as well as the
463        the main content.
464        """
465        return self.get_records(stream).to_documents(
466            title_property=title_property,
467            content_properties=content_properties,
468            metadata_properties=metadata_properties,
469            render_metadata=render_metadata,
470        )
471
472    def check(self) -> None:
473        """Call check on the connector.
474
475        This involves the following steps:
476        * Write the config to a temporary file
477        * execute the connector with check --config <config_file>
478        * Listen to the messages and return the first AirbyteCatalog that comes along.
479        * Make sure the subprocess is killed when the function returns.
480        """
481        with as_temp_files([self._config]) as [config_file]:
482            try:
483                for msg in self._execute(["check", "--config", config_file]):
484                    if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
485                        if msg.connectionStatus.status != Status.FAILED:
486                            print(f"Connection check succeeded for `{self.name}`.")
487                            log_source_check_result(
488                                name=self.name,
489                                state=EventState.SUCCEEDED,
490                            )
491                            return
492
493                        log_source_check_result(
494                            name=self.name,
495                            state=EventState.FAILED,
496                        )
497                        raise exc.AirbyteConnectorCheckFailedError(
498                            help_url=self.docs_url,
499                            context={
500                                "failure_reason": msg.connectionStatus.message,
501                            },
502                        )
503                raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
504            except exc.AirbyteConnectorReadError as ex:
505                raise exc.AirbyteConnectorCheckFailedError(
506                    message="The connector failed to check the connection.",
507                    log_text=ex.log_text,
508                ) from ex
509
510    def install(self) -> None:
511        """Install the connector if it is not yet installed."""
512        self.executor.install()
513        print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n")
514
515    def uninstall(self) -> None:
516        """Uninstall the connector if it is installed.
517
518        This only works if the use_local_install flag wasn't used and installation is managed by
519        PyAirbyte.
520        """
521        self.executor.uninstall()
522
523    def _read_with_catalog(
524        self,
525        catalog: ConfiguredAirbyteCatalog,
526        state: StateProviderBase | None = None,
527    ) -> Iterator[AirbyteMessage]:
528        """Call read on the connector.
529
530        This involves the following steps:
531        * Write the config to a temporary file
532        * execute the connector with read --config <config_file> --catalog <catalog_file>
533        * Listen to the messages and return the AirbyteRecordMessages that come along.
534        * Send out telemetry on the performed sync (with information about which source was used and
535          the type of the cache)
536        """
537        self._processed_records = 0  # Reset the counter before we start
538        with as_temp_files(
539            [
540                self._config,
541                catalog.json(),
542                state.to_state_input_file_text() if state else "[]",
543            ]
544        ) as [
545            config_file,
546            catalog_file,
547            state_file,
548        ]:
549            yield from self._tally_records(
550                self._execute(
551                    [
552                        "read",
553                        "--config",
554                        config_file,
555                        "--catalog",
556                        catalog_file,
557                        "--state",
558                        state_file,
559                    ],
560                )
561            )
562
563    def _add_to_logs(self, message: str) -> None:
564        self._last_log_messages.append(message)
565        self._last_log_messages = self._last_log_messages[-10:]
566
567    def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]:
568        """Execute the connector with the given arguments.
569
570        This involves the following steps:
571        * Locate the right venv. It is called ".venv-<connector_name>"
572        * Spawn a subprocess with .venv-<connector_name>/bin/<connector-name> <args>
573        * Read the output line by line of the subprocess and serialize them AirbyteMessage objects.
574          Drop if not valid.
575        """
576        # Fail early if the connector is not installed.
577        self.executor.ensure_installation(auto_fix=False)
578
579        try:
580            self._last_log_messages = []
581            for line in self.executor.execute(args):
582                try:
583                    message = AirbyteMessage.parse_raw(line)
584                    if message.type is Type.RECORD:
585                        self._processed_records += 1
586                    if message.type == Type.LOG:
587                        self._add_to_logs(message.log.message)
588                    if message.type == Type.TRACE and message.trace.type == TraceType.ERROR:
589                        self._add_to_logs(message.trace.error.message)
590                    yield message
591                except Exception:
592                    self._add_to_logs(line)
593        except Exception as e:
594            raise exc.AirbyteConnectorReadError(
595                log_text=self._last_log_messages,
596            ) from e
597
598    def _tally_records(
599        self,
600        messages: Iterable[AirbyteMessage],
601    ) -> Generator[AirbyteMessage, Any, None]:
602        """This method simply tallies the number of records processed and yields the messages."""
603        self._processed_records = 0  # Reset the counter before we start
604        progress.reset(len(self._selected_stream_names or []))
605
606        for message in messages:
607            yield message
608            progress.log_records_read(new_total_count=self._processed_records)
609
610    def _log_sync_start(
611        self,
612        *,
613        cache: CacheBase | None,
614    ) -> None:
615        """Log the start of a sync operation."""
616        print(f"Started `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}...")
617        send_telemetry(
618            source=self,
619            cache=cache,
620            state=EventState.STARTED,
621            event_type=EventType.SYNC,
622        )
623
624    def _log_sync_success(
625        self,
626        *,
627        cache: CacheBase | None,
628    ) -> None:
629        """Log the success of a sync operation."""
630        print(f"Completed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.")
631        send_telemetry(
632            source=self,
633            cache=cache,
634            state=EventState.SUCCEEDED,
635            number_of_records=self._processed_records,
636            event_type=EventType.SYNC,
637        )
638
639    def _log_sync_failure(
640        self,
641        *,
642        cache: CacheBase | None,
643        exception: Exception,
644    ) -> None:
645        """Log the failure of a sync operation."""
646        print(f"Failed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.")
647        send_telemetry(
648            state=EventState.FAILED,
649            source=self,
650            cache=cache,
651            number_of_records=self._processed_records,
652            exception=exception,
653            event_type=EventType.SYNC,
654        )
655
656    def read(
657        self,
658        cache: CacheBase | None = None,
659        *,
660        streams: str | list[str] | None = None,
661        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
662        force_full_refresh: bool = False,
663        skip_validation: bool = False,
664    ) -> ReadResult:
665        """Read from the connector and write to the cache.
666
667        Args:
668            cache: The cache to write to. If None, a default cache will be used.
669            write_strategy: The strategy to use when writing to the cache. If a string, it must be
670                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
671                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
672                WriteStrategy.AUTO.
673            streams: Optional if already set. A list of stream names to select for reading. If set
674                to "*", all streams will be selected.
675            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
676                streams will be read in incremental mode if supported by the connector. This option
677                must be True when using the "replace" strategy.
678        """
679        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
680            warnings.warn(
681                message=(
682                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
683                    "could result in data loss. "
684                    "To silence this warning, use the following: "
685                    'warnings.filterwarnings("ignore", '
686                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
687                ),
688                category=PyAirbyteDataLossWarning,
689                stacklevel=1,
690            )
691        if isinstance(write_strategy, str):
692            try:
693                write_strategy = WriteStrategy(write_strategy)
694            except ValueError:
695                raise exc.PyAirbyteInputError(
696                    message="Invalid strategy",
697                    context={
698                        "write_strategy": write_strategy,
699                        "available_strategies": [s.value for s in WriteStrategy],
700                    },
701                ) from None
702
703        if streams:
704            self.select_streams(streams)
705
706        if not self._selected_stream_names:
707            raise exc.PyAirbyteNoStreamsSelectedError(
708                connector_name=self.name,
709                available_streams=self.get_available_streams(),
710            )
711
712        # Run optional validation step
713        if not skip_validation:
714            self.validate_config()
715
716        # Set up cache and related resources
717        if cache is None:
718            cache = get_default_cache()
719
720        # Set up state provider if not in full refresh mode
721        if force_full_refresh:
722            state_provider: StateProviderBase | None = None
723        else:
724            state_provider = cache.get_state_provider(
725                source_name=self.name,
726            )
727
728        self._log_sync_start(cache=cache)
729
730        cache_processor = cache.get_record_processor(
731            source_name=self.name,
732            catalog_provider=CatalogProvider(self.configured_catalog),
733        )
734        try:
735            cache_processor.process_airbyte_messages(
736                self._read_with_catalog(
737                    catalog=self.configured_catalog,
738                    state=state_provider,
739                ),
740                write_strategy=write_strategy,
741            )
742
743        # TODO: We should catch more specific exceptions here
744        except Exception as ex:
745            self._log_sync_failure(cache=cache, exception=ex)
746            raise exc.AirbyteConnectorFailedError(
747                log_text=self._last_log_messages,
748            ) from ex
749
750        self._log_sync_success(cache=cache)
751        return ReadResult(
752            processed_records=self._processed_records,
753            cache=cache,
754            processed_streams=[stream.stream.name for stream in self.configured_catalog.streams],
755        )

A class representing a source that can be called.

Source( executor: airbyte._executor.Executor, name: str, config: dict[str, typing.Any] | None = None, streams: str | list[str] | None = None, *, validate: bool = False)
63    def __init__(
64        self,
65        executor: Executor,
66        name: str,
67        config: dict[str, Any] | None = None,
68        streams: str | list[str] | None = None,
69        *,
70        validate: bool = False,
71    ) -> None:
72        """Initialize the source.
73
74        If config is provided, it will be validated against the spec if validate is True.
75        """
76        self.executor = executor
77        self.name = name
78        self._processed_records = 0
79        self._config_dict: dict[str, Any] | None = None
80        self._last_log_messages: list[str] = []
81        self._discovered_catalog: AirbyteCatalog | None = None
82        self._spec: ConnectorSpecification | None = None
83        self._selected_stream_names: list[str] = []
84        if config is not None:
85            self.set_config(config, validate=validate)
86        if streams is not None:
87            self.select_streams(streams)
88
89        self._deployed_api_root: str | None = None
90        self._deployed_workspace_id: str | None = None
91        self._deployed_source_id: str | None = None

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

executor
name
def set_streams(self, streams: list[str]) -> None:
 93    def set_streams(self, streams: list[str]) -> None:
 94        """Deprecated. See select_streams()."""
 95        warnings.warn(
 96            "The 'set_streams' method is deprecated and will be removed in a future version. "
 97            "Please use the 'select_streams' method instead.",
 98            DeprecationWarning,
 99            stacklevel=2,
100        )
101        self.select_streams(streams)

Deprecated. See select_streams().

def select_all_streams(self) -> None:
103    def select_all_streams(self) -> None:
104        """Select all streams.
105
106        This is a more streamlined equivalent to:
107        > source.select_streams(source.get_available_streams()).
108        """
109        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
111    def select_streams(self, streams: str | list[str]) -> None:
112        """Select the stream names that should be read from the connector.
113
114        Args:
115        - streams: A list of stream names to select. If set to "*", all streams will be selected.
116
117        Currently, if this is not set, all streams will be read.
118        """
119        if streams == "*":
120            self.select_all_streams()
121            return
122
123        if isinstance(streams, str):
124            # If a single stream is provided, convert it to a one-item list
125            streams = [streams]
126
127        available_streams = self.get_available_streams()
128        for stream in streams:
129            if stream not in available_streams:
130                raise exc.AirbyteStreamNotFoundError(
131                    stream_name=stream,
132                    connector_name=self.name,
133                    available_streams=available_streams,
134                )
135        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Args:

  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
137    def get_selected_streams(self) -> list[str]:
138        """Get the selected streams.
139
140        If no streams are selected, return an empty list.
141        """
142        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
144    def set_config(
145        self,
146        config: dict[str, Any],
147        *,
148        validate: bool = True,
149    ) -> None:
150        """Set the config for the connector.
151
152        If validate is True, raise an exception if the config fails validation.
153
154        If validate is False, validation will be deferred until check() or validate_config()
155        is called.
156        """
157        if validate:
158            self.validate_config(config)
159
160        self._config_dict = config

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_config(self) -> dict[str, typing.Any]:
162    def get_config(self) -> dict[str, Any]:
163        """Get the config for the connector."""
164        return self._config

Get the config for the connector.

def validate_config(self, config: dict[str, typing.Any] | None = None) -> None:
191    def validate_config(self, config: dict[str, Any] | None = None) -> None:
192        """Validate the config against the spec.
193
194        If config is not provided, the already-set config will be validated.
195        """
196        spec = self._get_spec(force_refresh=False)
197        config = self._config if config is None else config
198        try:
199            jsonschema.validate(config, spec.connectionSpecification)
200            log_config_validation_result(
201                name=self.name,
202                state=EventState.SUCCEEDED,
203            )
204        except jsonschema.ValidationError as ex:
205            validation_ex = exc.AirbyteConnectorValidationFailedError(
206                message="The provided config is not valid.",
207                context={
208                    "error_message": ex.message,
209                    "error_path": ex.path,
210                    "error_instance": ex.instance,
211                    "error_schema": ex.schema,
212                },
213            )
214            log_config_validation_result(
215                name=self.name,
216                state=EventState.FAILED,
217                exception=validation_ex,
218            )
219            raise validation_ex from ex

Validate the config against the spec.

If config is not provided, the already-set config will be validated.

def get_available_streams(self) -> list[str]:
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
246    @property
247    def config_spec(self) -> dict[str, Any]:
248        """Generate a configuration spec for this connector, as a JSON Schema definition.
249
250        This function generates a JSON Schema dictionary with configuration specs for the
251        current connector, as a dictionary.
252
253        Returns:
254            dict: The JSON Schema configuration spec as a dictionary.
255        """
256        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
258    def print_config_spec(
259        self,
260        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
261        *,
262        output_file: Path | str | None = None,
263    ) -> None:
264        """Print the configuration spec for this connector.
265
266        Args:
267        - format: The format to print the spec in. Must be "yaml" or "json".
268        - output_file: Optional. If set, the spec will be written to the given file path. Otherwise,
269          it will be printed to the console.
270        """
271        if format not in {"yaml", "json"}:
272            raise exc.PyAirbyteInputError(
273                message="Invalid format. Expected 'yaml' or 'json'",
274                input_value=format,
275            )
276        if isinstance(output_file, str):
277            output_file = Path(output_file)
278
279        if format == "yaml":
280            content = yaml.dump(self.config_spec, indent=2)
281        elif format == "json":
282            content = json.dumps(self.config_spec, indent=2)
283
284        if output_file:
285            output_file.write_text(content)
286            return
287
288        syntax_highlighted = Syntax(content, format)
289        print(syntax_highlighted)

Print the configuration spec for this connector.

Args:

  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
306    @property
307    def docs_url(self) -> str:
308        """Get the URL to the connector's documentation."""
309        # TODO: Replace with docs URL from metadata when available
310        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
311            "source-", ""
312        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
314    @property
315    def discovered_catalog(self) -> AirbyteCatalog:
316        """Get the raw catalog for the given streams.
317
318        If the catalog is not yet known, we call discover to get it.
319        """
320        if self._discovered_catalog is None:
321            self._discovered_catalog = self._discover()
322
323        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
325    @property
326    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
327        """Get the configured catalog for the given streams.
328
329        If the raw catalog is not yet known, we call discover to get it.
330
331        If no specific streams are selected, we return a catalog that syncs all available streams.
332
333        TODO: We should consider disabling by default the streams that the connector would
334        disable by default. (For instance, streams that require a premium license are sometimes
335        disabled by default within the connector.)
336        """
337        # Ensure discovered catalog is cached before we start
338        _ = self.discovered_catalog
339
340        # Filter for selected streams if set, otherwise use all available streams:
341        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
342
343        return ConfiguredAirbyteCatalog(
344            streams=[
345                ConfiguredAirbyteStream(
346                    stream=stream,
347                    destination_sync_mode=DestinationSyncMode.overwrite,
348                    primary_key=stream.source_defined_primary_key,
349                    # TODO: The below assumes all sources can coalesce from incremental sync to
350                    # full_table as needed. CDK supports this, so it might be safe:
351                    sync_mode=SyncMode.incremental,
352                )
353                for stream in self.discovered_catalog.streams
354                if stream.name in streams_filter
355            ],
356        )

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
358    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
359        """Return the JSON Schema spec for the specified stream name."""
360        catalog: AirbyteCatalog = self.discovered_catalog
361        found: list[AirbyteStream] = [
362            stream for stream in catalog.streams if stream.name == stream_name
363        ]
364
365        if len(found) == 0:
366            raise exc.PyAirbyteInputError(
367                message="Stream name does not exist in catalog.",
368                input_value=stream_name,
369            )
370
371        if len(found) > 1:
372            raise exc.PyAirbyteInternalError(
373                message="Duplicate streams found with the same name.",
374                context={
375                    "found_streams": found,
376                },
377            )
378
379        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records(self, stream: str) -> airbyte.datasets._lazy.LazyDataset:
381    def get_records(self, stream: str) -> LazyDataset:
382        """Read a stream from the connector.
383
384        This involves the following steps:
385        * Call discover to get the catalog
386        * Generate a configured catalog that syncs the given stream in full_refresh mode
387        * Write the configured catalog and the config to a temporary file
388        * execute the connector with read --config <config_file> --catalog <catalog_file>
389        * Listen to the messages and return the first AirbyteRecordMessages that come along.
390        * Make sure the subprocess is killed when the function returns.
391        """
392        discovered_catalog: AirbyteCatalog = self.discovered_catalog
393        configured_catalog = ConfiguredAirbyteCatalog(
394            streams=[
395                ConfiguredAirbyteStream(
396                    stream=s,
397                    sync_mode=SyncMode.full_refresh,
398                    destination_sync_mode=DestinationSyncMode.overwrite,
399                )
400                for s in discovered_catalog.streams
401                if s.name == stream
402            ],
403        )
404        if len(configured_catalog.streams) == 0:
405            raise exc.PyAirbyteInputError(
406                message="Requested stream does not exist.",
407                context={
408                    "stream": stream,
409                    "available_streams": self.get_available_streams(),
410                    "connector_name": self.name,
411                },
412            ) from KeyError(stream)
413
414        configured_stream = configured_catalog.streams[0]
415        all_properties = cast(
416            list[str], list(configured_stream.stream.json_schema["properties"].keys())
417        )
418
419        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
420            self._log_sync_start(cache=None)
421            yield from records
422            self._log_sync_success(cache=None)
423
424        iterator: Iterator[dict[str, Any]] = _with_logging(
425            records=(  # Generator comprehension yields StreamRecord objects for each record
426                StreamRecord.from_record_message(
427                    record_message=record.record,
428                    expected_keys=all_properties,
429                    prune_extra_fields=True,
430                )
431                for record in self._read_with_catalog(configured_catalog)
432                if record.record
433            )
434        )
435        return LazyDataset(
436            iterator,
437            stream_metadata=configured_stream,
438        )

Read a stream from the connector.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
connector_version: str | None
440    @property
441    def connector_version(self) -> str | None:
442        """Return the version of the connector as reported by the executor.
443
444        Returns None if the version cannot be determined.
445        """
446        return self.executor.get_installed_version()

Return the version of the connector as reported by the executor.

Returns None if the version cannot be determined.

def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> collections.abc.Iterable[airbyte.documents.Document]:
448    def get_documents(
449        self,
450        stream: str,
451        title_property: str | None = None,
452        content_properties: list[str] | None = None,
453        metadata_properties: list[str] | None = None,
454        *,
455        render_metadata: bool = False,
456    ) -> Iterable[Document]:
457        """Read a stream from the connector and return the records as documents.
458
459        If metadata_properties is not set, all properties that are not content will be added to
460        the metadata.
461
462        If render_metadata is True, metadata will be rendered in the document, as well as the
463        the main content.
464        """
465        return self.get_records(stream).to_documents(
466            title_property=title_property,
467            content_properties=content_properties,
468            metadata_properties=metadata_properties,
469            render_metadata=render_metadata,
470        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def check(self) -> None:
472    def check(self) -> None:
473        """Call check on the connector.
474
475        This involves the following steps:
476        * Write the config to a temporary file
477        * execute the connector with check --config <config_file>
478        * Listen to the messages and return the first AirbyteCatalog that comes along.
479        * Make sure the subprocess is killed when the function returns.
480        """
481        with as_temp_files([self._config]) as [config_file]:
482            try:
483                for msg in self._execute(["check", "--config", config_file]):
484                    if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
485                        if msg.connectionStatus.status != Status.FAILED:
486                            print(f"Connection check succeeded for `{self.name}`.")
487                            log_source_check_result(
488                                name=self.name,
489                                state=EventState.SUCCEEDED,
490                            )
491                            return
492
493                        log_source_check_result(
494                            name=self.name,
495                            state=EventState.FAILED,
496                        )
497                        raise exc.AirbyteConnectorCheckFailedError(
498                            help_url=self.docs_url,
499                            context={
500                                "failure_reason": msg.connectionStatus.message,
501                            },
502                        )
503                raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
504            except exc.AirbyteConnectorReadError as ex:
505                raise exc.AirbyteConnectorCheckFailedError(
506                    message="The connector failed to check the connection.",
507                    log_text=ex.log_text,
508                ) from ex

Call check on the connector.

This involves the following steps:

  • Write the config to a temporary file
  • execute the connector with check --config
  • Listen to the messages and return the first AirbyteCatalog that comes along.
  • Make sure the subprocess is killed when the function returns.
def install(self) -> None:
510    def install(self) -> None:
511        """Install the connector if it is not yet installed."""
512        self.executor.install()
513        print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n")

Install the connector if it is not yet installed.

def uninstall(self) -> None:
515    def uninstall(self) -> None:
516        """Uninstall the connector if it is installed.
517
518        This only works if the use_local_install flag wasn't used and installation is managed by
519        PyAirbyte.
520        """
521        self.executor.uninstall()

Uninstall the connector if it is installed.

This only works if the use_local_install flag wasn't used and installation is managed by PyAirbyte.

def read( self, cache: airbyte.caches.base.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.results.ReadResult:
656    def read(
657        self,
658        cache: CacheBase | None = None,
659        *,
660        streams: str | list[str] | None = None,
661        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
662        force_full_refresh: bool = False,
663        skip_validation: bool = False,
664    ) -> ReadResult:
665        """Read from the connector and write to the cache.
666
667        Args:
668            cache: The cache to write to. If None, a default cache will be used.
669            write_strategy: The strategy to use when writing to the cache. If a string, it must be
670                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
671                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
672                WriteStrategy.AUTO.
673            streams: Optional if already set. A list of stream names to select for reading. If set
674                to "*", all streams will be selected.
675            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
676                streams will be read in incremental mode if supported by the connector. This option
677                must be True when using the "replace" strategy.
678        """
679        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
680            warnings.warn(
681                message=(
682                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
683                    "could result in data loss. "
684                    "To silence this warning, use the following: "
685                    'warnings.filterwarnings("ignore", '
686                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
687                ),
688                category=PyAirbyteDataLossWarning,
689                stacklevel=1,
690            )
691        if isinstance(write_strategy, str):
692            try:
693                write_strategy = WriteStrategy(write_strategy)
694            except ValueError:
695                raise exc.PyAirbyteInputError(
696                    message="Invalid strategy",
697                    context={
698                        "write_strategy": write_strategy,
699                        "available_strategies": [s.value for s in WriteStrategy],
700                    },
701                ) from None
702
703        if streams:
704            self.select_streams(streams)
705
706        if not self._selected_stream_names:
707            raise exc.PyAirbyteNoStreamsSelectedError(
708                connector_name=self.name,
709                available_streams=self.get_available_streams(),
710            )
711
712        # Run optional validation step
713        if not skip_validation:
714            self.validate_config()
715
716        # Set up cache and related resources
717        if cache is None:
718            cache = get_default_cache()
719
720        # Set up state provider if not in full refresh mode
721        if force_full_refresh:
722            state_provider: StateProviderBase | None = None
723        else:
724            state_provider = cache.get_state_provider(
725                source_name=self.name,
726            )
727
728        self._log_sync_start(cache=cache)
729
730        cache_processor = cache.get_record_processor(
731            source_name=self.name,
732            catalog_provider=CatalogProvider(self.configured_catalog),
733        )
734        try:
735            cache_processor.process_airbyte_messages(
736                self._read_with_catalog(
737                    catalog=self.configured_catalog,
738                    state=state_provider,
739                ),
740                write_strategy=write_strategy,
741            )
742
743        # TODO: We should catch more specific exceptions here
744        except Exception as ex:
745            self._log_sync_failure(cache=cache, exception=ex)
746            raise exc.AirbyteConnectorFailedError(
747                log_text=self._last_log_messages,
748            ) from ex
749
750        self._log_sync_success(cache=cache)
751        return ReadResult(
752            processed_records=self._processed_records,
753            cache=cache,
754            processed_streams=[stream.stream.name for stream in self.configured_catalog.streams],
755        )

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If None, a default cache will be used.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.