airbyte.sources

Sources connectors module for PyAirbyte.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Sources connectors module for PyAirbyte."""
 3
 4from __future__ import annotations
 5
 6from typing import TYPE_CHECKING
 7
 8from airbyte.sources.base import Source
 9from airbyte.sources.registry import (
10    ConnectorMetadata,
11    get_available_connectors,
12    get_connector_metadata,
13)
14from airbyte.sources.util import (
15    get_benchmark_source,
16    get_source,
17)
18
19
20# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
21if TYPE_CHECKING:
22    # ruff: noqa: TC004  # imports used for more than type checking
23    from airbyte.sources import (
24        base,
25        registry,
26        util,
27    )
28
29__all__ = [
30    # Submodules
31    "base",
32    "registry",
33    "util",
34    # Factories
35    "get_source",
36    "get_benchmark_source",
37    # Helper Functions
38    "get_available_connectors",
39    "get_connector_metadata",
40    # Classes
41    "Source",
42    "ConnectorMetadata",
43]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    pip_url: str | None = None,
 56    local_executable: Path | str | None = None,
 57    docker_image: bool | str | None = None,
 58    use_host_network: bool = False,
 59    source_manifest: bool | dict | Path | str | None = None,
 60    install_if_missing: bool = True,
 61    install_root: Path | None = None,
 62) -> Source:
 63    """Get a connector by name and version.
 64
 65    If an explicit install or execution method is requested (e.g. `local_executable`,
 66    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 67
 68    Otherwise, an appropriate method will be selected based on the available connector metadata:
 69    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 70       will be downloaded and used to to execute the connector.
 71    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 72    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 73       will be executed using Docker.
 74
 75    Args:
 76        name: connector name
 77        config: connector config - if not provided, you need to set it later via the set_config
 78            method.
 79        config_change_callback: callback function to be called when the connector config changes.
 80        streams: list of stream names to select for reading. If set to "*", all streams will be
 81            selected. If not provided, you can set it later via the `select_streams()` or
 82            `select_all_streams()` method.
 83        version: connector version - if not provided, the currently installed version will be used.
 84            If no version is installed, the latest available version will be used. The version can
 85            also be set to "latest" to force the use of the latest available version.
 86        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 87            connector name.
 88        local_executable: If set, the connector will be assumed to already be installed and will be
 89            executed using this path or executable name. Otherwise, the connector will be installed
 90            automatically in a virtual environment.
 91        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 92            to use the default image for the connector, or you can specify a custom image name.
 93            If `version` is specified and your image name does not already contain a tag
 94            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 95        use_host_network: If set, along with docker_image, the connector will be executed using
 96            the host network. This is useful for connectors that need to access resources on
 97            the host machine, such as a local database. This parameter is ignored when
 98            `docker_image` is not set.
 99        source_manifest: If set, the connector will be executed based on a declarative YAML
100            source definition. This input can be `True` to attempt to auto-download a YAML spec,
101            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
102            the local file system, or `str` to pull the definition from a web URL.
103        install_if_missing: Whether to install the connector if it is not available locally. This
104            parameter is ignored when `local_executable` or `source_manifest` are set.
105        install_root: (Optional.) The root directory where the virtual environment will be
106            created. If not provided, the current working directory will be used.
107    """
108    return Source(
109        name=name,
110        config=config,
111        config_change_callback=config_change_callback,
112        streams=streams,
113        executor=get_connector_executor(
114            name=name,
115            version=version,
116            pip_url=pip_url,
117            local_executable=local_executable,
118            docker_image=docker_image,
119            use_host_network=use_host_network,
120            source_manifest=source_manifest,
121            install_if_missing=install_if_missing,
122            install_root=install_root,
123        ),
124    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def get_benchmark_source( num_records: int | str = '5e5', *, install_if_missing: bool = True) -> Source:
127def get_benchmark_source(
128    num_records: int | str = "5e5",
129    *,
130    install_if_missing: bool = True,
131) -> Source:
132    """Get a source for benchmarking.
133
134    This source will generate dummy records for performance benchmarking purposes.
135    You can specify the number of records to generate using the `num_records` parameter.
136    The `num_records` parameter can be an integer or a string in scientific notation.
137    For example, `"5e6"` will generate 5 million records. If underscores are providing
138    within a numeric a string, they will be ignored.
139
140    Args:
141        num_records: The number of records to generate. Defaults to "5e5", or
142            500,000 records.
143            Can be an integer (`1000`) or a string in scientific notation.
144            For example, `"5e6"` will generate 5 million records.
145        install_if_missing: Whether to install the source if it is not available locally.
146
147    Returns:
148        Source: The source object for benchmarking.
149    """
150    if isinstance(num_records, str):
151        try:
152            num_records = int(Decimal(num_records.replace("_", "")))
153        except InvalidOperation as ex:
154            raise PyAirbyteInputError(
155                message="Invalid number format.",
156                original_exception=ex,
157                input_value=str(num_records),
158            ) from None
159
160    return get_source(
161        name="source-e2e-test",
162        docker_image=True,
163        # docker_image="airbyte/source-e2e-test:latest",
164        config={
165            "type": "BENCHMARK",
166            "schema": "FIVE_STRING_COLUMNS",
167            "terminationCondition": {
168                "type": "MAX_RECORDS",
169                "max": num_records,
170            },
171        },
172        streams="*",
173        install_if_missing=install_if_missing,
174    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records: The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
  • install_if_missing: Whether to install the source if it is not available locally.
Returns:

Source: The source object for benchmarking.

def get_available_connectors( install_type: airbyte.sources.registry.InstallType | str | None = None) -> list[str]:
314def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
315    """Return a list of all available connectors.
316
317    Connectors will be returned in alphabetical order, with the standard prefix "source-".
318    """
319    if install_type is None:
320        # No install type specified. Filter for whatever is runnable.
321        if is_docker_installed():
322            logger.info("Docker is detected. Returning all connectors.")
323            # If Docker is available, return all connectors.
324            return sorted(conn.name for conn in _get_registry_cache().values())
325
326        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
327
328        # If Docker is not available, return only Python and Manifest-based connectors.
329        return sorted(
330            conn.name
331            for conn in _get_registry_cache().values()
332            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
333        )
334
335    if not isinstance(install_type, InstallType):
336        install_type = InstallType(install_type)
337
338    if install_type == InstallType.PYTHON:
339        return sorted(
340            conn.name
341            for conn in _get_registry_cache().values()
342            if conn.pypi_package_name is not None
343        )
344
345    if install_type == InstallType.JAVA:
346        warnings.warn(
347            message="Java connectors are not yet supported.",
348            stacklevel=2,
349        )
350        return sorted(
351            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
352        )
353
354    if install_type == InstallType.DOCKER:
355        return sorted(conn.name for conn in _get_registry_cache().values())
356
357    if install_type == InstallType.YAML:
358        return sorted(
359            conn.name
360            for conn in _get_registry_cache().values()
361            if InstallType.YAML in conn.install_types
362            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
363        )
364
365    # pragma: no cover  # Should never be reached.
366    raise exc.PyAirbyteInputError(
367        message="Invalid install type.",
368        context={
369            "install_type": install_type,
370        },
371    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

def get_connector_metadata(name: str) -> ConnectorMetadata | None:
284def get_connector_metadata(name: str) -> ConnectorMetadata | None:
285    """Check the cache for the connector.
286
287    If the cache is empty, populate by calling update_cache.
288    """
289    registry_url = _get_registry_url()
290
291    if _is_registry_disabled(registry_url):
292        return None
293
294    cache = copy(_get_registry_cache())
295
296    if not cache:
297        raise exc.PyAirbyteInternalError(
298            message="Connector registry could not be loaded.",
299            context={
300                "registry_url": _get_registry_url(),
301            },
302        )
303    if name not in cache:
304        raise exc.AirbyteConnectorNotRegisteredError(
305            connector_name=name,
306            context={
307                "registry_url": _get_registry_url(),
308                "available_connectors": get_available_connectors(),
309            },
310        )
311    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

class Source(airbyte._connector_base.ConnectorBase):
 53class Source(ConnectorBase):
 54    """A class representing a source that can be called."""
 55
 56    connector_type = "source"
 57
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67    ) -> None:
 68        """Initialize the source.
 69
 70        If config is provided, it will be validated against the spec if validate is True.
 71        """
 72        self._to_be_selected_streams: list[str] | str = []
 73        """Used to hold selection criteria before catalog is known."""
 74
 75        super().__init__(
 76            executor=executor,
 77            name=name,
 78            config=config,
 79            config_change_callback=config_change_callback,
 80            validate=validate,
 81        )
 82        self._config_dict: dict[str, Any] | None = None
 83        self._last_log_messages: list[str] = []
 84        self._discovered_catalog: AirbyteCatalog | None = None
 85        self._selected_stream_names: list[str] = []
 86        if config is not None:
 87            self.set_config(config, validate=validate)
 88        if streams is not None:
 89            self.select_streams(streams)
 90
 91    def set_streams(self, streams: list[str]) -> None:
 92        """Deprecated. See select_streams()."""
 93        warnings.warn(
 94            "The 'set_streams' method is deprecated and will be removed in a future version. "
 95            "Please use the 'select_streams' method instead.",
 96            DeprecationWarning,
 97            stacklevel=2,
 98        )
 99        self.select_streams(streams)
100
101    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
102        """Logs a warning message indicating stream selection which are not selected yet."""
103        if streams == "*":
104            print(
105                "Warning: Config is not set yet. All streams will be selected after config is set."
106            )
107        else:
108            print(
109                "Warning: Config is not set yet. "
110                f"Streams to be selected after config is set: {streams}"
111            )
112
113    def select_all_streams(self) -> None:
114        """Select all streams.
115
116        This is a more streamlined equivalent to:
117        > source.select_streams(source.get_available_streams()).
118        """
119        if self._config_dict is None:
120            self._to_be_selected_streams = "*"
121            self._log_warning_preselected_stream(self._to_be_selected_streams)
122            return
123
124        self._selected_stream_names = self.get_available_streams()
125
126    def select_streams(self, streams: str | list[str]) -> None:
127        """Select the stream names that should be read from the connector.
128
129        Args:
130            streams: A list of stream names to select. If set to "*", all streams will be selected.
131
132        Currently, if this is not set, all streams will be read.
133        """
134        if self._config_dict is None:
135            self._to_be_selected_streams = streams
136            self._log_warning_preselected_stream(streams)
137            return
138
139        if streams == "*":
140            self.select_all_streams()
141            return
142
143        if isinstance(streams, str):
144            # If a single stream is provided, convert it to a one-item list
145            streams = [streams]
146
147        available_streams = self.get_available_streams()
148        for stream in streams:
149            if stream not in available_streams:
150                raise exc.AirbyteStreamNotFoundError(
151                    stream_name=stream,
152                    connector_name=self.name,
153                    available_streams=available_streams,
154                )
155        self._selected_stream_names = streams
156
157    def get_selected_streams(self) -> list[str]:
158        """Get the selected streams.
159
160        If no streams are selected, return an empty list.
161        """
162        return self._selected_stream_names
163
164    def set_config(
165        self,
166        config: dict[str, Any],
167        *,
168        validate: bool = True,
169    ) -> None:
170        """Set the config for the connector.
171
172        If validate is True, raise an exception if the config fails validation.
173
174        If validate is False, validation will be deferred until check() or validate_config()
175        is called.
176        """
177        if validate:
178            self.validate_config(config)
179
180        self._config_dict = config
181
182        if self._to_be_selected_streams:
183            self.select_streams(self._to_be_selected_streams)
184            self._to_be_selected_streams = []
185
186    def get_config(self) -> dict[str, Any]:
187        """Get the config for the connector."""
188        return self._config
189
190    @property
191    def _config(self) -> dict[str, Any]:
192        if self._config_dict is None:
193            raise exc.AirbyteConnectorConfigurationMissingError(
194                connector_name=self.name,
195                guidance="Provide via get_source() or set_config()",
196            )
197        return self._config_dict
198
199    def _discover(self) -> AirbyteCatalog:
200        """Call discover on the connector.
201
202        This involves the following steps:
203        - Write the config to a temporary file
204        - execute the connector with discover --config <config_file>
205        - Listen to the messages and return the first AirbyteCatalog that comes along.
206        - Make sure the subprocess is killed when the function returns.
207        """
208        with as_temp_files([self._config]) as [config_file]:
209            for msg in self._execute(["discover", "--config", config_file]):
210                if msg.type == Type.CATALOG and msg.catalog:
211                    return msg.catalog
212            raise exc.AirbyteConnectorMissingCatalogError(
213                connector_name=self.name,
214                log_text=self._last_log_messages,
215            )
216
217    def get_available_streams(self) -> list[str]:
218        """Get the available streams from the spec."""
219        return [s.name for s in self.discovered_catalog.streams]
220
221    def _get_incremental_stream_names(self) -> list[str]:
222        """Get the name of streams that support incremental sync."""
223        return [
224            stream.name
225            for stream in self.discovered_catalog.streams
226            if SyncMode.incremental in stream.supported_sync_modes
227        ]
228
229    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
230        """Call spec on the connector.
231
232        This involves the following steps:
233        * execute the connector with spec
234        * Listen to the messages and return the first AirbyteCatalog that comes along.
235        * Make sure the subprocess is killed when the function returns.
236        """
237        if force_refresh or self._spec is None:
238            for msg in self._execute(["spec"]):
239                if msg.type == Type.SPEC and msg.spec:
240                    self._spec = msg.spec
241                    break
242
243        if self._spec:
244            return self._spec
245
246        raise exc.AirbyteConnectorMissingSpecError(
247            connector_name=self.name,
248            log_text=self._last_log_messages,
249        )
250
251    @property
252    def config_spec(self) -> dict[str, Any]:
253        """Generate a configuration spec for this connector, as a JSON Schema definition.
254
255        This function generates a JSON Schema dictionary with configuration specs for the
256        current connector, as a dictionary.
257
258        Returns:
259            dict: The JSON Schema configuration spec as a dictionary.
260        """
261        return self._get_spec(force_refresh=True).connectionSpecification
262
263    def print_config_spec(
264        self,
265        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
266        *,
267        output_file: Path | str | None = None,
268    ) -> None:
269        """Print the configuration spec for this connector.
270
271        Args:
272            format: The format to print the spec in. Must be "yaml" or "json".
273            output_file: Optional. If set, the spec will be written to the given file path.
274                Otherwise, it will be printed to the console.
275        """
276        if format not in {"yaml", "json"}:
277            raise exc.PyAirbyteInputError(
278                message="Invalid format. Expected 'yaml' or 'json'",
279                input_value=format,
280            )
281        if isinstance(output_file, str):
282            output_file = Path(output_file)
283
284        if format == "yaml":
285            content = yaml.dump(self.config_spec, indent=2)
286        elif format == "json":
287            content = json.dumps(self.config_spec, indent=2)
288
289        if output_file:
290            output_file.write_text(content)
291            return
292
293        syntax_highlighted = Syntax(content, format)
294        print(syntax_highlighted)
295
296    @property
297    def _yaml_spec(self) -> str:
298        """Get the spec as a yaml string.
299
300        For now, the primary use case is for writing and debugging a valid config for a source.
301
302        This is private for now because we probably want better polish before exposing this
303        as a stable interface. This will also get easier when we have docs links with this info
304        for each connector.
305        """
306        spec_obj: ConnectorSpecification = self._get_spec()
307        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
308        # convert to a yaml string
309        return yaml.dump(spec_dict)
310
311    @property
312    def docs_url(self) -> str:
313        """Get the URL to the connector's documentation."""
314        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
315            "source-", ""
316        )
317
318    @property
319    def discovered_catalog(self) -> AirbyteCatalog:
320        """Get the raw catalog for the given streams.
321
322        If the catalog is not yet known, we call discover to get it.
323        """
324        if self._discovered_catalog is None:
325            self._discovered_catalog = self._discover()
326
327        return self._discovered_catalog
328
329    @property
330    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
331        """Get the configured catalog for the given streams.
332
333        If the raw catalog is not yet known, we call discover to get it.
334
335        If no specific streams are selected, we return a catalog that syncs all available streams.
336
337        TODO: We should consider disabling by default the streams that the connector would
338        disable by default. (For instance, streams that require a premium license are sometimes
339        disabled by default within the connector.)
340        """
341        # Ensure discovered catalog is cached before we start
342        _ = self.discovered_catalog
343
344        # Filter for selected streams if set, otherwise use all available streams:
345        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
346        return self.get_configured_catalog(streams=streams_filter)
347
348    def get_configured_catalog(
349        self,
350        streams: Literal["*"] | list[str] | None = None,
351    ) -> ConfiguredAirbyteCatalog:
352        """Get a configured catalog for the given streams.
353
354        If no streams are provided, the selected streams will be used. If no streams are selected,
355        all available streams will be used.
356
357        If '*' is provided, all available streams will be used.
358        """
359        selected_streams: list[str] = []
360        if streams is None:
361            selected_streams = self._selected_stream_names or self.get_available_streams()
362        elif streams == "*":
363            selected_streams = self.get_available_streams()
364        elif isinstance(streams, list):
365            selected_streams = streams
366        else:
367            raise exc.PyAirbyteInputError(
368                message="Invalid streams argument.",
369                input_value=streams,
370            )
371
372        return ConfiguredAirbyteCatalog(
373            streams=[
374                ConfiguredAirbyteStream(
375                    stream=stream,
376                    destination_sync_mode=DestinationSyncMode.overwrite,
377                    primary_key=stream.source_defined_primary_key,
378                    sync_mode=SyncMode.incremental,
379                )
380                for stream in self.discovered_catalog.streams
381                if stream.name in selected_streams
382            ],
383        )
384
385    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
386        """Return the JSON Schema spec for the specified stream name."""
387        catalog: AirbyteCatalog = self.discovered_catalog
388        found: list[AirbyteStream] = [
389            stream for stream in catalog.streams if stream.name == stream_name
390        ]
391
392        if len(found) == 0:
393            raise exc.PyAirbyteInputError(
394                message="Stream name does not exist in catalog.",
395                input_value=stream_name,
396            )
397
398        if len(found) > 1:
399            raise exc.PyAirbyteInternalError(
400                message="Duplicate streams found with the same name.",
401                context={
402                    "found_streams": found,
403                },
404            )
405
406        return found[0].json_schema
407
408    def get_records(
409        self,
410        stream: str,
411        *,
412        normalize_field_names: bool = False,
413        prune_undeclared_fields: bool = True,
414    ) -> LazyDataset:
415        """Read a stream from the connector.
416
417        Args:
418            stream: The name of the stream to read.
419            normalize_field_names: When `True`, field names will be normalized to lower case, with
420                special characters removed. This matches the behavior of PyAirbyte caches and most
421                Airbyte destinations.
422            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
423                which generally matches the behavior of PyAirbyte caches and most Airbyte
424                destinations, specifically when you expect the catalog may be stale. You can disable
425                this to keep all fields in the records.
426
427        This involves the following steps:
428        * Call discover to get the catalog
429        * Generate a configured catalog that syncs the given stream in full_refresh mode
430        * Write the configured catalog and the config to a temporary file
431        * execute the connector with read --config <config_file> --catalog <catalog_file>
432        * Listen to the messages and return the first AirbyteRecordMessages that come along.
433        * Make sure the subprocess is killed when the function returns.
434        """
435        discovered_catalog: AirbyteCatalog = self.discovered_catalog
436        configured_catalog = ConfiguredAirbyteCatalog(
437            streams=[
438                ConfiguredAirbyteStream(
439                    stream=s,
440                    sync_mode=SyncMode.full_refresh,
441                    destination_sync_mode=DestinationSyncMode.overwrite,
442                )
443                for s in discovered_catalog.streams
444                if s.name == stream
445            ],
446        )
447        if len(configured_catalog.streams) == 0:
448            raise exc.PyAirbyteInputError(
449                message="Requested stream does not exist.",
450                context={
451                    "stream": stream,
452                    "available_streams": self.get_available_streams(),
453                    "connector_name": self.name,
454                },
455            ) from KeyError(stream)
456
457        configured_stream = configured_catalog.streams[0]
458
459        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
460            yield from records
461
462        stream_record_handler = StreamRecordHandler(
463            json_schema=self.get_stream_json_schema(stream),
464            prune_extra_fields=prune_undeclared_fields,
465            normalize_keys=normalize_field_names,
466        )
467
468        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
469        progress_tracker = ProgressTracker(
470            ProgressStyle.PLAIN,
471            source=self,
472            cache=None,
473            destination=None,
474            expected_streams=[stream],
475        )
476
477        iterator: Iterator[dict[str, Any]] = (
478            StreamRecord.from_record_message(
479                record_message=record.record,
480                stream_record_handler=stream_record_handler,
481            )
482            for record in self._read_with_catalog(
483                catalog=configured_catalog,
484                progress_tracker=progress_tracker,
485            )
486            if record.record
487        )
488        progress_tracker.log_success()
489        return LazyDataset(
490            iterator,
491            stream_metadata=configured_stream,
492        )
493
494    def get_documents(
495        self,
496        stream: str,
497        title_property: str | None = None,
498        content_properties: list[str] | None = None,
499        metadata_properties: list[str] | None = None,
500        *,
501        render_metadata: bool = False,
502    ) -> Iterable[Document]:
503        """Read a stream from the connector and return the records as documents.
504
505        If metadata_properties is not set, all properties that are not content will be added to
506        the metadata.
507
508        If render_metadata is True, metadata will be rendered in the document, as well as the
509        the main content.
510        """
511        return self.get_records(stream).to_documents(
512            title_property=title_property,
513            content_properties=content_properties,
514            metadata_properties=metadata_properties,
515            render_metadata=render_metadata,
516        )
517
518    def _get_airbyte_message_iterator(
519        self,
520        *,
521        streams: Literal["*"] | list[str] | None = None,
522        state_provider: StateProviderBase | None = None,
523        progress_tracker: ProgressTracker,
524        force_full_refresh: bool = False,
525    ) -> AirbyteMessageIterator:
526        """Get an AirbyteMessageIterator for this source."""
527        return AirbyteMessageIterator(
528            self._read_with_catalog(
529                catalog=self.get_configured_catalog(streams=streams),
530                state=state_provider if not force_full_refresh else None,
531                progress_tracker=progress_tracker,
532            )
533        )
534
535    def _read_with_catalog(
536        self,
537        catalog: ConfiguredAirbyteCatalog,
538        progress_tracker: ProgressTracker,
539        state: StateProviderBase | None = None,
540    ) -> Generator[AirbyteMessage, None, None]:
541        """Call read on the connector.
542
543        This involves the following steps:
544        * Write the config to a temporary file
545        * execute the connector with read --config <config_file> --catalog <catalog_file>
546        * Listen to the messages and return the AirbyteRecordMessages that come along.
547        * Send out telemetry on the performed sync (with information about which source was used and
548          the type of the cache)
549        """
550        with as_temp_files(
551            [
552                self._config,
553                catalog.model_dump_json(),
554                state.to_state_input_file_text() if state else "[]",
555            ]
556        ) as [
557            config_file,
558            catalog_file,
559            state_file,
560        ]:
561            message_generator = self._execute(
562                [
563                    "read",
564                    "--config",
565                    config_file,
566                    "--catalog",
567                    catalog_file,
568                    "--state",
569                    state_file,
570                ],
571                progress_tracker=progress_tracker,
572            )
573            yield from progress_tracker.tally_records_read(message_generator)
574        progress_tracker.log_read_complete()
575
576    def _peek_airbyte_message(
577        self,
578        message: AirbyteMessage,
579        *,
580        raise_on_error: bool = True,
581    ) -> None:
582        """Process an Airbyte message.
583
584        This method handles reading Airbyte messages and taking action, if needed, based on the
585        message type. For instance, log messages are logged, records are tallied, and errors are
586        raised as exceptions if `raise_on_error` is True.
587
588        Raises:
589            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
590        """
591        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
592
593    def _log_incremental_streams(
594        self,
595        *,
596        incremental_streams: set[str] | None = None,
597    ) -> None:
598        """Log the streams which are using incremental sync mode."""
599        log_message = (
600            "The following streams are currently using incremental sync:\n"
601            f"{incremental_streams}\n"
602            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
603        )
604        print(log_message)
605
606    def read(
607        self,
608        cache: CacheBase | None = None,
609        *,
610        streams: str | list[str] | None = None,
611        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
612        force_full_refresh: bool = False,
613        skip_validation: bool = False,
614    ) -> ReadResult:
615        """Read from the connector and write to the cache.
616
617        Args:
618            cache: The cache to write to. If not set, a default cache will be used.
619            streams: Optional if already set. A list of stream names to select for reading. If set
620                to "*", all streams will be selected.
621            write_strategy: The strategy to use when writing to the cache. If a string, it must be
622                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
623                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
624                WriteStrategy.AUTO.
625            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
626                streams will be read in incremental mode if supported by the connector. This option
627                must be True when using the "replace" strategy.
628            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
629                running the connector. This can be helpful in debugging, when you want to send
630                configurations to the connector that otherwise might be rejected by JSON Schema
631                validation rules.
632        """
633        cache = cache or get_default_cache()
634        progress_tracker = ProgressTracker(
635            source=self,
636            cache=cache,
637            destination=None,
638            expected_streams=None,  # Will be set later
639        )
640
641        # Set up state provider if not in full refresh mode
642        if force_full_refresh:
643            state_provider: StateProviderBase | None = None
644        else:
645            state_provider = cache.get_state_provider(
646                source_name=self._name,
647            )
648        state_writer = cache.get_state_writer(source_name=self._name)
649
650        if streams:
651            self.select_streams(streams)
652
653        if not self._selected_stream_names:
654            raise exc.PyAirbyteNoStreamsSelectedError(
655                connector_name=self.name,
656                available_streams=self.get_available_streams(),
657            )
658
659        try:
660            result = self._read_to_cache(
661                cache=cache,
662                catalog_provider=CatalogProvider(self.configured_catalog),
663                stream_names=self._selected_stream_names,
664                state_provider=state_provider,
665                state_writer=state_writer,
666                write_strategy=write_strategy,
667                force_full_refresh=force_full_refresh,
668                skip_validation=skip_validation,
669                progress_tracker=progress_tracker,
670            )
671        except exc.PyAirbyteInternalError as ex:
672            progress_tracker.log_failure(exception=ex)
673            raise exc.AirbyteConnectorFailedError(
674                connector_name=self.name,
675                log_text=self._last_log_messages,
676            ) from ex
677        except Exception as ex:
678            progress_tracker.log_failure(exception=ex)
679            raise
680
681        progress_tracker.log_success()
682        return result
683
684    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
685        self,
686        cache: CacheBase,
687        *,
688        catalog_provider: CatalogProvider,
689        stream_names: list[str],
690        state_provider: StateProviderBase | None,
691        state_writer: StateWriterBase | None,
692        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
693        force_full_refresh: bool = False,
694        skip_validation: bool = False,
695        progress_tracker: ProgressTracker,
696    ) -> ReadResult:
697        """Internal read method."""
698        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
699            warnings.warn(
700                message=(
701                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
702                    "could result in data loss. "
703                    "To silence this warning, use the following: "
704                    'warnings.filterwarnings("ignore", '
705                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
706                ),
707                category=exc.PyAirbyteDataLossWarning,
708                stacklevel=1,
709            )
710        if isinstance(write_strategy, str):
711            try:
712                write_strategy = WriteStrategy(write_strategy)
713            except ValueError:
714                raise exc.PyAirbyteInputError(
715                    message="Invalid strategy",
716                    context={
717                        "write_strategy": write_strategy,
718                        "available_strategies": [s.value for s in WriteStrategy],
719                    },
720                ) from None
721
722        # Run optional validation step
723        if not skip_validation:
724            self.validate_config()
725
726        # Log incremental stream if incremental streams are known
727        if state_provider and state_provider.known_stream_names:
728            # Retrieve set of the known streams support which support incremental sync
729            incremental_streams = (
730                set(self._get_incremental_stream_names())
731                & state_provider.known_stream_names
732                & set(self.get_selected_streams())
733            )
734            if incremental_streams:
735                self._log_incremental_streams(incremental_streams=incremental_streams)
736
737        airbyte_message_iterator = AirbyteMessageIterator(
738            self._read_with_catalog(
739                catalog=catalog_provider.configured_catalog,
740                state=state_provider,
741                progress_tracker=progress_tracker,
742            )
743        )
744        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
745            stdin=airbyte_message_iterator,
746            catalog_provider=catalog_provider,
747            write_strategy=write_strategy,
748            state_writer=state_writer,
749            progress_tracker=progress_tracker,
750        )
751
752        # Flush the WAL, if applicable
753        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
754
755        return ReadResult(
756            source_name=self.name,
757            progress_tracker=progress_tracker,
758            processed_streams=stream_names,
759            cache=cache,
760        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False)
58    def __init__(
59        self,
60        executor: Executor,
61        name: str,
62        config: dict[str, Any] | None = None,
63        *,
64        config_change_callback: ConfigChangeCallback | None = None,
65        streams: str | list[str] | None = None,
66        validate: bool = False,
67    ) -> None:
68        """Initialize the source.
69
70        If config is provided, it will be validated against the spec if validate is True.
71        """
72        self._to_be_selected_streams: list[str] | str = []
73        """Used to hold selection criteria before catalog is known."""
74
75        super().__init__(
76            executor=executor,
77            name=name,
78            config=config,
79            config_change_callback=config_change_callback,
80            validate=validate,
81        )
82        self._config_dict: dict[str, Any] | None = None
83        self._last_log_messages: list[str] = []
84        self._discovered_catalog: AirbyteCatalog | None = None
85        self._selected_stream_names: list[str] = []
86        if config is not None:
87            self.set_config(config, validate=validate)
88        if streams is not None:
89            self.select_streams(streams)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
91    def set_streams(self, streams: list[str]) -> None:
92        """Deprecated. See select_streams()."""
93        warnings.warn(
94            "The 'set_streams' method is deprecated and will be removed in a future version. "
95            "Please use the 'select_streams' method instead.",
96            DeprecationWarning,
97            stacklevel=2,
98        )
99        self.select_streams(streams)

Deprecated. See select_streams().

def select_all_streams(self) -> None:
113    def select_all_streams(self) -> None:
114        """Select all streams.
115
116        This is a more streamlined equivalent to:
117        > source.select_streams(source.get_available_streams()).
118        """
119        if self._config_dict is None:
120            self._to_be_selected_streams = "*"
121            self._log_warning_preselected_stream(self._to_be_selected_streams)
122            return
123
124        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
126    def select_streams(self, streams: str | list[str]) -> None:
127        """Select the stream names that should be read from the connector.
128
129        Args:
130            streams: A list of stream names to select. If set to "*", all streams will be selected.
131
132        Currently, if this is not set, all streams will be read.
133        """
134        if self._config_dict is None:
135            self._to_be_selected_streams = streams
136            self._log_warning_preselected_stream(streams)
137            return
138
139        if streams == "*":
140            self.select_all_streams()
141            return
142
143        if isinstance(streams, str):
144            # If a single stream is provided, convert it to a one-item list
145            streams = [streams]
146
147        available_streams = self.get_available_streams()
148        for stream in streams:
149            if stream not in available_streams:
150                raise exc.AirbyteStreamNotFoundError(
151                    stream_name=stream,
152                    connector_name=self.name,
153                    available_streams=available_streams,
154                )
155        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
157    def get_selected_streams(self) -> list[str]:
158        """Get the selected streams.
159
160        If no streams are selected, return an empty list.
161        """
162        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
164    def set_config(
165        self,
166        config: dict[str, Any],
167        *,
168        validate: bool = True,
169    ) -> None:
170        """Set the config for the connector.
171
172        If validate is True, raise an exception if the config fails validation.
173
174        If validate is False, validation will be deferred until check() or validate_config()
175        is called.
176        """
177        if validate:
178            self.validate_config(config)
179
180        self._config_dict = config
181
182        if self._to_be_selected_streams:
183            self.select_streams(self._to_be_selected_streams)
184            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_config(self) -> dict[str, typing.Any]:
186    def get_config(self) -> dict[str, Any]:
187        """Get the config for the connector."""
188        return self._config

Get the config for the connector.

def get_available_streams(self) -> list[str]:
217    def get_available_streams(self) -> list[str]:
218        """Get the available streams from the spec."""
219        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
251    @property
252    def config_spec(self) -> dict[str, Any]:
253        """Generate a configuration spec for this connector, as a JSON Schema definition.
254
255        This function generates a JSON Schema dictionary with configuration specs for the
256        current connector, as a dictionary.
257
258        Returns:
259            dict: The JSON Schema configuration spec as a dictionary.
260        """
261        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
263    def print_config_spec(
264        self,
265        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
266        *,
267        output_file: Path | str | None = None,
268    ) -> None:
269        """Print the configuration spec for this connector.
270
271        Args:
272            format: The format to print the spec in. Must be "yaml" or "json".
273            output_file: Optional. If set, the spec will be written to the given file path.
274                Otherwise, it will be printed to the console.
275        """
276        if format not in {"yaml", "json"}:
277            raise exc.PyAirbyteInputError(
278                message="Invalid format. Expected 'yaml' or 'json'",
279                input_value=format,
280            )
281        if isinstance(output_file, str):
282            output_file = Path(output_file)
283
284        if format == "yaml":
285            content = yaml.dump(self.config_spec, indent=2)
286        elif format == "json":
287            content = json.dumps(self.config_spec, indent=2)
288
289        if output_file:
290            output_file.write_text(content)
291            return
292
293        syntax_highlighted = Syntax(content, format)
294        print(syntax_highlighted)

Print the configuration spec for this connector.

Arguments:
  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
311    @property
312    def docs_url(self) -> str:
313        """Get the URL to the connector's documentation."""
314        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
315            "source-", ""
316        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
318    @property
319    def discovered_catalog(self) -> AirbyteCatalog:
320        """Get the raw catalog for the given streams.
321
322        If the catalog is not yet known, we call discover to get it.
323        """
324        if self._discovered_catalog is None:
325            self._discovered_catalog = self._discover()
326
327        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
329    @property
330    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
331        """Get the configured catalog for the given streams.
332
333        If the raw catalog is not yet known, we call discover to get it.
334
335        If no specific streams are selected, we return a catalog that syncs all available streams.
336
337        TODO: We should consider disabling by default the streams that the connector would
338        disable by default. (For instance, streams that require a premium license are sometimes
339        disabled by default within the connector.)
340        """
341        # Ensure discovered catalog is cached before we start
342        _ = self.discovered_catalog
343
344        # Filter for selected streams if set, otherwise use all available streams:
345        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
346        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
348    def get_configured_catalog(
349        self,
350        streams: Literal["*"] | list[str] | None = None,
351    ) -> ConfiguredAirbyteCatalog:
352        """Get a configured catalog for the given streams.
353
354        If no streams are provided, the selected streams will be used. If no streams are selected,
355        all available streams will be used.
356
357        If '*' is provided, all available streams will be used.
358        """
359        selected_streams: list[str] = []
360        if streams is None:
361            selected_streams = self._selected_stream_names or self.get_available_streams()
362        elif streams == "*":
363            selected_streams = self.get_available_streams()
364        elif isinstance(streams, list):
365            selected_streams = streams
366        else:
367            raise exc.PyAirbyteInputError(
368                message="Invalid streams argument.",
369                input_value=streams,
370            )
371
372        return ConfiguredAirbyteCatalog(
373            streams=[
374                ConfiguredAirbyteStream(
375                    stream=stream,
376                    destination_sync_mode=DestinationSyncMode.overwrite,
377                    primary_key=stream.source_defined_primary_key,
378                    sync_mode=SyncMode.incremental,
379                )
380                for stream in self.discovered_catalog.streams
381                if stream.name in selected_streams
382            ],
383        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
385    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
386        """Return the JSON Schema spec for the specified stream name."""
387        catalog: AirbyteCatalog = self.discovered_catalog
388        found: list[AirbyteStream] = [
389            stream for stream in catalog.streams if stream.name == stream_name
390        ]
391
392        if len(found) == 0:
393            raise exc.PyAirbyteInputError(
394                message="Stream name does not exist in catalog.",
395                input_value=stream_name,
396            )
397
398        if len(found) > 1:
399            raise exc.PyAirbyteInternalError(
400                message="Duplicate streams found with the same name.",
401                context={
402                    "found_streams": found,
403                },
404            )
405
406        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
408    def get_records(
409        self,
410        stream: str,
411        *,
412        normalize_field_names: bool = False,
413        prune_undeclared_fields: bool = True,
414    ) -> LazyDataset:
415        """Read a stream from the connector.
416
417        Args:
418            stream: The name of the stream to read.
419            normalize_field_names: When `True`, field names will be normalized to lower case, with
420                special characters removed. This matches the behavior of PyAirbyte caches and most
421                Airbyte destinations.
422            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
423                which generally matches the behavior of PyAirbyte caches and most Airbyte
424                destinations, specifically when you expect the catalog may be stale. You can disable
425                this to keep all fields in the records.
426
427        This involves the following steps:
428        * Call discover to get the catalog
429        * Generate a configured catalog that syncs the given stream in full_refresh mode
430        * Write the configured catalog and the config to a temporary file
431        * execute the connector with read --config <config_file> --catalog <catalog_file>
432        * Listen to the messages and return the first AirbyteRecordMessages that come along.
433        * Make sure the subprocess is killed when the function returns.
434        """
435        discovered_catalog: AirbyteCatalog = self.discovered_catalog
436        configured_catalog = ConfiguredAirbyteCatalog(
437            streams=[
438                ConfiguredAirbyteStream(
439                    stream=s,
440                    sync_mode=SyncMode.full_refresh,
441                    destination_sync_mode=DestinationSyncMode.overwrite,
442                )
443                for s in discovered_catalog.streams
444                if s.name == stream
445            ],
446        )
447        if len(configured_catalog.streams) == 0:
448            raise exc.PyAirbyteInputError(
449                message="Requested stream does not exist.",
450                context={
451                    "stream": stream,
452                    "available_streams": self.get_available_streams(),
453                    "connector_name": self.name,
454                },
455            ) from KeyError(stream)
456
457        configured_stream = configured_catalog.streams[0]
458
459        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
460            yield from records
461
462        stream_record_handler = StreamRecordHandler(
463            json_schema=self.get_stream_json_schema(stream),
464            prune_extra_fields=prune_undeclared_fields,
465            normalize_keys=normalize_field_names,
466        )
467
468        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
469        progress_tracker = ProgressTracker(
470            ProgressStyle.PLAIN,
471            source=self,
472            cache=None,
473            destination=None,
474            expected_streams=[stream],
475        )
476
477        iterator: Iterator[dict[str, Any]] = (
478            StreamRecord.from_record_message(
479                record_message=record.record,
480                stream_record_handler=stream_record_handler,
481            )
482            for record in self._read_with_catalog(
483                catalog=configured_catalog,
484                progress_tracker=progress_tracker,
485            )
486            if record.record
487        )
488        progress_tracker.log_success()
489        return LazyDataset(
490            iterator,
491            stream_metadata=configured_stream,
492        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
494    def get_documents(
495        self,
496        stream: str,
497        title_property: str | None = None,
498        content_properties: list[str] | None = None,
499        metadata_properties: list[str] | None = None,
500        *,
501        render_metadata: bool = False,
502    ) -> Iterable[Document]:
503        """Read a stream from the connector and return the records as documents.
504
505        If metadata_properties is not set, all properties that are not content will be added to
506        the metadata.
507
508        If render_metadata is True, metadata will be rendered in the document, as well as the
509        the main content.
510        """
511        return self.get_records(stream).to_documents(
512            title_property=title_property,
513            content_properties=content_properties,
514            metadata_properties=metadata_properties,
515            render_metadata=render_metadata,
516        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
606    def read(
607        self,
608        cache: CacheBase | None = None,
609        *,
610        streams: str | list[str] | None = None,
611        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
612        force_full_refresh: bool = False,
613        skip_validation: bool = False,
614    ) -> ReadResult:
615        """Read from the connector and write to the cache.
616
617        Args:
618            cache: The cache to write to. If not set, a default cache will be used.
619            streams: Optional if already set. A list of stream names to select for reading. If set
620                to "*", all streams will be selected.
621            write_strategy: The strategy to use when writing to the cache. If a string, it must be
622                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
623                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
624                WriteStrategy.AUTO.
625            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
626                streams will be read in incremental mode if supported by the connector. This option
627                must be True when using the "replace" strategy.
628            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
629                running the connector. This can be helpful in debugging, when you want to send
630                configurations to the connector that otherwise might be rejected by JSON Schema
631                validation rules.
632        """
633        cache = cache or get_default_cache()
634        progress_tracker = ProgressTracker(
635            source=self,
636            cache=cache,
637            destination=None,
638            expected_streams=None,  # Will be set later
639        )
640
641        # Set up state provider if not in full refresh mode
642        if force_full_refresh:
643            state_provider: StateProviderBase | None = None
644        else:
645            state_provider = cache.get_state_provider(
646                source_name=self._name,
647            )
648        state_writer = cache.get_state_writer(source_name=self._name)
649
650        if streams:
651            self.select_streams(streams)
652
653        if not self._selected_stream_names:
654            raise exc.PyAirbyteNoStreamsSelectedError(
655                connector_name=self.name,
656                available_streams=self.get_available_streams(),
657            )
658
659        try:
660            result = self._read_to_cache(
661                cache=cache,
662                catalog_provider=CatalogProvider(self.configured_catalog),
663                stream_names=self._selected_stream_names,
664                state_provider=state_provider,
665                state_writer=state_writer,
666                write_strategy=write_strategy,
667                force_full_refresh=force_full_refresh,
668                skip_validation=skip_validation,
669                progress_tracker=progress_tracker,
670            )
671        except exc.PyAirbyteInternalError as ex:
672            progress_tracker.log_failure(exception=ex)
673            raise exc.AirbyteConnectorFailedError(
674                connector_name=self.name,
675                log_text=self._last_log_messages,
676            ) from ex
677        except Exception as ex:
678            progress_tracker.log_failure(exception=ex)
679            raise
680
681        progress_tracker.log_success()
682        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
config_hash
validate_config
connector_version
check
install
uninstall
@dataclass
class ConnectorMetadata:
150@dataclass
151class ConnectorMetadata:
152    """Metadata for a connector."""
153
154    name: str
155    """Connector name. For example, "source-google-sheets"."""
156
157    latest_available_version: str | None
158    """The latest available version of the connector."""
159
160    pypi_package_name: str | None
161    """The name of the PyPI package for the connector, if it exists."""
162
163    language: Language | None
164    """The language of the connector."""
165
166    install_types: set[InstallType]
167    """The supported install types for the connector."""
168
169    @property
170    def default_install_type(self) -> InstallType:
171        """Return the default install type for the connector."""
172        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
173            return InstallType.YAML
174
175        if InstallType.PYTHON in self.install_types:
176            return InstallType.PYTHON
177
178        # Else: Java or Docker
179        return InstallType.DOCKER

Metadata for a connector.

ConnectorMetadata( name: str, latest_available_version: str | None, pypi_package_name: str | None, language: airbyte.sources.registry.Language | None, install_types: set[airbyte.sources.registry.InstallType])
name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

The language of the connector.

The supported install types for the connector.

default_install_type: airbyte.sources.registry.InstallType
169    @property
170    def default_install_type(self) -> InstallType:
171        """Return the default install type for the connector."""
172        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
173            return InstallType.YAML
174
175        if InstallType.PYTHON in self.install_types:
176            return InstallType.PYTHON
177
178        # Else: Java or Docker
179        return InstallType.DOCKER

Return the default install type for the connector.