airbyte.sources

Sources connectors module for PyAirbyte.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Sources connectors module for PyAirbyte."""
 3
 4from __future__ import annotations
 5
 6from typing import TYPE_CHECKING
 7
 8from airbyte.sources.base import Source
 9from airbyte.sources.registry import (
10    ConnectorMetadata,
11    get_available_connectors,
12    get_connector_metadata,
13)
14from airbyte.sources.util import (
15    get_benchmark_source,
16    get_source,
17)
18
19
20# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
21if TYPE_CHECKING:
22    # ruff: noqa: TC004  # imports used for more than type checking
23    from airbyte.sources import (
24        base,
25        registry,
26        util,
27    )
28
29__all__ = [
30    # Submodules
31    "base",
32    "registry",
33    "util",
34    # Factories
35    "get_source",
36    "get_benchmark_source",
37    # Helper Functions
38    "get_available_connectors",
39    "get_connector_metadata",
40    # Classes
41    "Source",
42    "ConnectorMetadata",
43]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    pip_url: str | None = None,
 56    local_executable: Path | str | None = None,
 57    docker_image: bool | str | None = None,
 58    use_host_network: bool = False,
 59    source_manifest: bool | dict | Path | str | None = None,
 60    install_if_missing: bool = True,
 61    install_root: Path | None = None,
 62) -> Source:
 63    """Get a connector by name and version.
 64
 65    If an explicit install or execution method is requested (e.g. `local_executable`,
 66    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 67
 68    Otherwise, an appropriate method will be selected based on the available connector metadata:
 69    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 70       will be downloaded and used to to execute the connector.
 71    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 72    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 73       will be executed using Docker.
 74
 75    Args:
 76        name: connector name
 77        config: connector config - if not provided, you need to set it later via the set_config
 78            method.
 79        config_change_callback: callback function to be called when the connector config changes.
 80        streams: list of stream names to select for reading. If set to "*", all streams will be
 81            selected. If not provided, you can set it later via the `select_streams()` or
 82            `select_all_streams()` method.
 83        version: connector version - if not provided, the currently installed version will be used.
 84            If no version is installed, the latest available version will be used. The version can
 85            also be set to "latest" to force the use of the latest available version.
 86        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 87            connector name.
 88        local_executable: If set, the connector will be assumed to already be installed and will be
 89            executed using this path or executable name. Otherwise, the connector will be installed
 90            automatically in a virtual environment.
 91        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 92            to use the default image for the connector, or you can specify a custom image name.
 93            If `version` is specified and your image name does not already contain a tag
 94            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 95        use_host_network: If set, along with docker_image, the connector will be executed using
 96            the host network. This is useful for connectors that need to access resources on
 97            the host machine, such as a local database. This parameter is ignored when
 98            `docker_image` is not set.
 99        source_manifest: If set, the connector will be executed based on a declarative YAML
100            source definition. This input can be `True` to attempt to auto-download a YAML spec,
101            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
102            the local file system, or `str` to pull the definition from a web URL.
103        install_if_missing: Whether to install the connector if it is not available locally. This
104            parameter is ignored when `local_executable` or `source_manifest` are set.
105        install_root: (Optional.) The root directory where the virtual environment will be
106            created. If not provided, the current working directory will be used.
107    """
108    return Source(
109        name=name,
110        config=config,
111        config_change_callback=config_change_callback,
112        streams=streams,
113        executor=get_connector_executor(
114            name=name,
115            version=version,
116            pip_url=pip_url,
117            local_executable=local_executable,
118            docker_image=docker_image,
119            use_host_network=use_host_network,
120            source_manifest=source_manifest,
121            install_if_missing=install_if_missing,
122            install_root=install_root,
123        ),
124    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def get_benchmark_source( num_records: int | str = '5e5', *, install_if_missing: bool = True) -> Source:
127def get_benchmark_source(
128    num_records: int | str = "5e5",
129    *,
130    install_if_missing: bool = True,
131) -> Source:
132    """Get a source for benchmarking.
133
134    This source will generate dummy records for performance benchmarking purposes.
135    You can specify the number of records to generate using the `num_records` parameter.
136    The `num_records` parameter can be an integer or a string in scientific notation.
137    For example, `"5e6"` will generate 5 million records. If underscores are providing
138    within a numeric a string, they will be ignored.
139
140    Args:
141        num_records: The number of records to generate. Defaults to "5e5", or
142            500,000 records.
143            Can be an integer (`1000`) or a string in scientific notation.
144            For example, `"5e6"` will generate 5 million records.
145        install_if_missing: Whether to install the source if it is not available locally.
146
147    Returns:
148        Source: The source object for benchmarking.
149    """
150    if isinstance(num_records, str):
151        try:
152            num_records = int(Decimal(num_records.replace("_", "")))
153        except InvalidOperation as ex:
154            raise PyAirbyteInputError(
155                message="Invalid number format.",
156                original_exception=ex,
157                input_value=str(num_records),
158            ) from None
159
160    return get_source(
161        name="source-e2e-test",
162        docker_image=True,
163        # docker_image="airbyte/source-e2e-test:latest",
164        config={
165            "type": "BENCHMARK",
166            "schema": "FIVE_STRING_COLUMNS",
167            "terminationCondition": {
168                "type": "MAX_RECORDS",
169                "max": num_records,
170            },
171        },
172        streams="*",
173        install_if_missing=install_if_missing,
174    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records: The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
  • install_if_missing: Whether to install the source if it is not available locally.
Returns:

Source: The source object for benchmarking.

def get_available_connectors( install_type: airbyte.sources.registry.InstallType | str | None = None) -> list[str]:
234def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
235    """Return a list of all available connectors.
236
237    Connectors will be returned in alphabetical order, with the standard prefix "source-".
238    """
239    if install_type is None:
240        # No install type specified. Filter for whatever is runnable.
241        if is_docker_installed():
242            logger.info("Docker is detected. Returning all connectors.")
243            # If Docker is available, return all connectors.
244            return sorted(conn.name for conn in _get_registry_cache().values())
245
246        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
247
248        # If Docker is not available, return only Python and Manifest-based connectors.
249        return sorted(
250            conn.name
251            for conn in _get_registry_cache().values()
252            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
253        )
254
255    if not isinstance(install_type, InstallType):
256        install_type = InstallType(install_type)
257
258    if install_type == InstallType.PYTHON:
259        return sorted(
260            conn.name
261            for conn in _get_registry_cache().values()
262            if conn.pypi_package_name is not None
263        )
264
265    if install_type == InstallType.JAVA:
266        warnings.warn(
267            message="Java connectors are not yet supported.",
268            stacklevel=2,
269        )
270        return sorted(
271            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
272        )
273
274    if install_type == InstallType.DOCKER:
275        return sorted(conn.name for conn in _get_registry_cache().values())
276
277    if install_type == InstallType.YAML:
278        return sorted(
279            conn.name
280            for conn in _get_registry_cache().values()
281            if InstallType.YAML in conn.install_types
282            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
283        )
284
285    # pragma: no cover  # Should never be reached.
286    raise exc.PyAirbyteInputError(
287        message="Invalid install type.",
288        context={
289            "install_type": install_type,
290        },
291    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

def get_connector_metadata(name: str) -> ConnectorMetadata | None:
204def get_connector_metadata(name: str) -> ConnectorMetadata | None:
205    """Check the cache for the connector.
206
207    If the cache is empty, populate by calling update_cache.
208    """
209    registry_url = _get_registry_url()
210
211    if _is_registry_disabled(registry_url):
212        return None
213
214    cache = copy(_get_registry_cache())
215
216    if not cache:
217        raise exc.PyAirbyteInternalError(
218            message="Connector registry could not be loaded.",
219            context={
220                "registry_url": _get_registry_url(),
221            },
222        )
223    if name not in cache:
224        raise exc.AirbyteConnectorNotRegisteredError(
225            connector_name=name,
226            context={
227                "registry_url": _get_registry_url(),
228                "available_connectors": get_available_connectors(),
229            },
230        )
231    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

class Source(airbyte._connector_base.ConnectorBase):
 53class Source(ConnectorBase):  # noqa: PLR0904
 54    """A class representing a source that can be called."""
 55
 56    connector_type = "source"
 57
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67        cursor_key_overrides: dict[str, str] | None = None,
 68        primary_key_overrides: dict[str, str | list[str]] | None = None,
 69    ) -> None:
 70        """Initialize the source.
 71
 72        If config is provided, it will be validated against the spec if validate is True.
 73        """
 74        self._to_be_selected_streams: list[str] | str = []
 75        """Used to hold selection criteria before catalog is known."""
 76
 77        super().__init__(
 78            executor=executor,
 79            name=name,
 80            config=config,
 81            config_change_callback=config_change_callback,
 82            validate=validate,
 83        )
 84        self._config_dict: dict[str, Any] | None = None
 85        self._last_log_messages: list[str] = []
 86        self._discovered_catalog: AirbyteCatalog | None = None
 87        self._selected_stream_names: list[str] = []
 88
 89        self._cursor_key_overrides: dict[str, str] = {}
 90        """A mapping of lower-cased stream names to cursor key overrides."""
 91
 92        self._primary_key_overrides: dict[str, list[str]] = {}
 93        """A mapping of lower-cased stream names to primary key overrides."""
 94
 95        if config is not None:
 96            self.set_config(config, validate=validate)
 97        if streams is not None:
 98            self.select_streams(streams)
 99        if cursor_key_overrides is not None:
100            self.set_cursor_keys(**cursor_key_overrides)
101        if primary_key_overrides is not None:
102            self.set_primary_keys(**primary_key_overrides)
103
104    def set_streams(self, streams: list[str]) -> None:
105        """Deprecated. See select_streams()."""
106        warnings.warn(
107            "The 'set_streams' method is deprecated and will be removed in a future version. "
108            "Please use the 'select_streams' method instead.",
109            DeprecationWarning,
110            stacklevel=2,
111        )
112        self.select_streams(streams)
113
114    def set_cursor_key(
115        self,
116        stream_name: str,
117        cursor_key: str,
118    ) -> None:
119        """Set the cursor for a single stream.
120
121        Note:
122        - This does not unset previously set cursors.
123        - The cursor key must be a single field name.
124        - Not all streams support custom cursors. If a stream does not support custom cursors,
125          the override may be ignored.
126        - Stream names are case insensitive, while field names are case sensitive.
127        - Stream names are not validated by PyAirbyte. If the stream name
128          does not exist in the catalog, the override may be ignored.
129        """
130        self._cursor_key_overrides[stream_name.lower()] = cursor_key
131
132    def set_cursor_keys(
133        self,
134        **kwargs: str,
135    ) -> None:
136        """Override the cursor key for one or more streams.
137
138        Usage:
139            source.set_cursor_keys(
140                stream1="cursor1",
141                stream2="cursor2",
142            )
143
144        Note:
145        - This does not unset previously set cursors.
146        - The cursor key must be a single field name.
147        - Not all streams support custom cursors. If a stream does not support custom cursors,
148          the override may be ignored.
149        - Stream names are case insensitive, while field names are case sensitive.
150        - Stream names are not validated by PyAirbyte. If the stream name
151          does not exist in the catalog, the override may be ignored.
152        """
153        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
154
155    def set_primary_key(
156        self,
157        stream_name: str,
158        primary_key: str | list[str],
159    ) -> None:
160        """Set the primary key for a single stream.
161
162        Note:
163        - This does not unset previously set primary keys.
164        - The primary key must be a single field name or a list of field names.
165        - Not all streams support overriding primary keys. If a stream does not support overriding
166          primary keys, the override may be ignored.
167        - Stream names are case insensitive, while field names are case sensitive.
168        - Stream names are not validated by PyAirbyte. If the stream name
169          does not exist in the catalog, the override may be ignored.
170        """
171        self._primary_key_overrides[stream_name.lower()] = (
172            primary_key if isinstance(primary_key, list) else [primary_key]
173        )
174
175    def set_primary_keys(
176        self,
177        **kwargs: str | list[str],
178    ) -> None:
179        """Override the primary keys for one or more streams.
180
181        This does not unset previously set primary keys.
182
183        Usage:
184            source.set_primary_keys(
185                stream1="pk1",
186                stream2=["pk1", "pk2"],
187            )
188
189        Note:
190        - This does not unset previously set primary keys.
191        - The primary key must be a single field name or a list of field names.
192        - Not all streams support overriding primary keys. If a stream does not support overriding
193          primary keys, the override may be ignored.
194        - Stream names are case insensitive, while field names are case sensitive.
195        - Stream names are not validated by PyAirbyte. If the stream name
196          does not exist in the catalog, the override may be ignored.
197        """
198        self._primary_key_overrides.update(
199            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
200        )
201
202    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
203        """Logs a warning message indicating stream selection which are not selected yet."""
204        if streams == "*":
205            print(
206                "Warning: Config is not set yet. All streams will be selected after config is set."
207            )
208        else:
209            print(
210                "Warning: Config is not set yet. "
211                f"Streams to be selected after config is set: {streams}"
212            )
213
214    def select_all_streams(self) -> None:
215        """Select all streams.
216
217        This is a more streamlined equivalent to:
218        > source.select_streams(source.get_available_streams()).
219        """
220        if self._config_dict is None:
221            self._to_be_selected_streams = "*"
222            self._log_warning_preselected_stream(self._to_be_selected_streams)
223            return
224
225        self._selected_stream_names = self.get_available_streams()
226
227    def select_streams(self, streams: str | list[str]) -> None:
228        """Select the stream names that should be read from the connector.
229
230        Args:
231            streams: A list of stream names to select. If set to "*", all streams will be selected.
232
233        Currently, if this is not set, all streams will be read.
234        """
235        if self._config_dict is None:
236            self._to_be_selected_streams = streams
237            self._log_warning_preselected_stream(streams)
238            return
239
240        if streams == "*":
241            self.select_all_streams()
242            return
243
244        if isinstance(streams, str):
245            # If a single stream is provided, convert it to a one-item list
246            streams = [streams]
247
248        available_streams = self.get_available_streams()
249        for stream in streams:
250            if stream not in available_streams:
251                raise exc.AirbyteStreamNotFoundError(
252                    stream_name=stream,
253                    connector_name=self.name,
254                    available_streams=available_streams,
255                )
256        self._selected_stream_names = streams
257
258    def get_selected_streams(self) -> list[str]:
259        """Get the selected streams.
260
261        If no streams are selected, return an empty list.
262        """
263        return self._selected_stream_names
264
265    def set_config(
266        self,
267        config: dict[str, Any],
268        *,
269        validate: bool = True,
270    ) -> None:
271        """Set the config for the connector.
272
273        If validate is True, raise an exception if the config fails validation.
274
275        If validate is False, validation will be deferred until check() or validate_config()
276        is called.
277        """
278        if validate:
279            self.validate_config(config)
280
281        self._config_dict = config
282
283        if self._to_be_selected_streams:
284            self.select_streams(self._to_be_selected_streams)
285            self._to_be_selected_streams = []
286
287    def get_config(self) -> dict[str, Any]:
288        """Get the config for the connector."""
289        return self._config
290
291    @property
292    def _config(self) -> dict[str, Any]:
293        if self._config_dict is None:
294            raise exc.AirbyteConnectorConfigurationMissingError(
295                connector_name=self.name,
296                guidance="Provide via get_source() or set_config()",
297            )
298        return self._config_dict
299
300    def _discover(self) -> AirbyteCatalog:
301        """Call discover on the connector.
302
303        This involves the following steps:
304        - Write the config to a temporary file
305        - execute the connector with discover --config <config_file>
306        - Listen to the messages and return the first AirbyteCatalog that comes along.
307        - Make sure the subprocess is killed when the function returns.
308        """
309        with as_temp_files([self._config]) as [config_file]:
310            for msg in self._execute(["discover", "--config", config_file]):
311                if msg.type == Type.CATALOG and msg.catalog:
312                    return msg.catalog
313            raise exc.AirbyteConnectorMissingCatalogError(
314                connector_name=self.name,
315                log_text=self._last_log_messages,
316            )
317
318    def get_available_streams(self) -> list[str]:
319        """Get the available streams from the spec."""
320        return [s.name for s in self.discovered_catalog.streams]
321
322    def _get_incremental_stream_names(self) -> list[str]:
323        """Get the name of streams that support incremental sync."""
324        return [
325            stream.name
326            for stream in self.discovered_catalog.streams
327            if SyncMode.incremental in stream.supported_sync_modes
328        ]
329
330    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
331        """Call spec on the connector.
332
333        This involves the following steps:
334        * execute the connector with spec
335        * Listen to the messages and return the first AirbyteCatalog that comes along.
336        * Make sure the subprocess is killed when the function returns.
337        """
338        if force_refresh or self._spec is None:
339            for msg in self._execute(["spec"]):
340                if msg.type == Type.SPEC and msg.spec:
341                    self._spec = msg.spec
342                    break
343
344        if self._spec:
345            return self._spec
346
347        raise exc.AirbyteConnectorMissingSpecError(
348            connector_name=self.name,
349            log_text=self._last_log_messages,
350        )
351
352    @property
353    def config_spec(self) -> dict[str, Any]:
354        """Generate a configuration spec for this connector, as a JSON Schema definition.
355
356        This function generates a JSON Schema dictionary with configuration specs for the
357        current connector, as a dictionary.
358
359        Returns:
360            dict: The JSON Schema configuration spec as a dictionary.
361        """
362        return self._get_spec(force_refresh=True).connectionSpecification
363
364    def print_config_spec(
365        self,
366        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
367        *,
368        output_file: Path | str | None = None,
369    ) -> None:
370        """Print the configuration spec for this connector.
371
372        Args:
373            format: The format to print the spec in. Must be "yaml" or "json".
374            output_file: Optional. If set, the spec will be written to the given file path.
375                Otherwise, it will be printed to the console.
376        """
377        if format not in {"yaml", "json"}:
378            raise exc.PyAirbyteInputError(
379                message="Invalid format. Expected 'yaml' or 'json'",
380                input_value=format,
381            )
382        if isinstance(output_file, str):
383            output_file = Path(output_file)
384
385        if format == "yaml":
386            content = yaml.dump(self.config_spec, indent=2)
387        elif format == "json":
388            content = json.dumps(self.config_spec, indent=2)
389
390        if output_file:
391            output_file.write_text(content)
392            return
393
394        syntax_highlighted = Syntax(content, format)
395        print(syntax_highlighted)
396
397    @property
398    def _yaml_spec(self) -> str:
399        """Get the spec as a yaml string.
400
401        For now, the primary use case is for writing and debugging a valid config for a source.
402
403        This is private for now because we probably want better polish before exposing this
404        as a stable interface. This will also get easier when we have docs links with this info
405        for each connector.
406        """
407        spec_obj: ConnectorSpecification = self._get_spec()
408        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
409        # convert to a yaml string
410        return yaml.dump(spec_dict)
411
412    @property
413    def docs_url(self) -> str:
414        """Get the URL to the connector's documentation."""
415        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
416            "source-", ""
417        )
418
419    @property
420    def discovered_catalog(self) -> AirbyteCatalog:
421        """Get the raw catalog for the given streams.
422
423        If the catalog is not yet known, we call discover to get it.
424        """
425        if self._discovered_catalog is None:
426            self._discovered_catalog = self._discover()
427
428        return self._discovered_catalog
429
430    @property
431    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
432        """Get the configured catalog for the given streams.
433
434        If the raw catalog is not yet known, we call discover to get it.
435
436        If no specific streams are selected, we return a catalog that syncs all available streams.
437
438        TODO: We should consider disabling by default the streams that the connector would
439        disable by default. (For instance, streams that require a premium license are sometimes
440        disabled by default within the connector.)
441        """
442        # Ensure discovered catalog is cached before we start
443        _ = self.discovered_catalog
444
445        # Filter for selected streams if set, otherwise use all available streams:
446        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
447        return self.get_configured_catalog(streams=streams_filter)
448
449    def get_configured_catalog(
450        self,
451        streams: Literal["*"] | list[str] | None = None,
452    ) -> ConfiguredAirbyteCatalog:
453        """Get a configured catalog for the given streams.
454
455        If no streams are provided, the selected streams will be used. If no streams are selected,
456        all available streams will be used.
457
458        If '*' is provided, all available streams will be used.
459        """
460        selected_streams: list[str] = []
461        if streams is None:
462            selected_streams = self._selected_stream_names or self.get_available_streams()
463        elif streams == "*":
464            selected_streams = self.get_available_streams()
465        elif isinstance(streams, list):
466            selected_streams = streams
467        else:
468            raise exc.PyAirbyteInputError(
469                message="Invalid streams argument.",
470                input_value=streams,
471            )
472
473        return ConfiguredAirbyteCatalog(
474            streams=[
475                ConfiguredAirbyteStream(
476                    stream=stream,
477                    destination_sync_mode=DestinationSyncMode.overwrite,
478                    sync_mode=SyncMode.incremental,
479                    primary_key=(
480                        [self._primary_key_overrides[stream.name.lower()]]
481                        if stream.name.lower() in self._primary_key_overrides
482                        else stream.source_defined_primary_key
483                    ),
484                    cursor_field=(
485                        [self._cursor_key_overrides[stream.name.lower()]]
486                        if stream.name.lower() in self._cursor_key_overrides
487                        else stream.default_cursor_field
488                    ),
489                    # These are unused in the current implementation:
490                    generation_id=None,
491                    minimum_generation_id=None,
492                    sync_id=None,
493                )
494                for stream in self.discovered_catalog.streams
495                if stream.name in selected_streams
496            ],
497        )
498
499    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
500        """Return the JSON Schema spec for the specified stream name."""
501        catalog: AirbyteCatalog = self.discovered_catalog
502        found: list[AirbyteStream] = [
503            stream for stream in catalog.streams if stream.name == stream_name
504        ]
505
506        if len(found) == 0:
507            raise exc.PyAirbyteInputError(
508                message="Stream name does not exist in catalog.",
509                input_value=stream_name,
510            )
511
512        if len(found) > 1:
513            raise exc.PyAirbyteInternalError(
514                message="Duplicate streams found with the same name.",
515                context={
516                    "found_streams": found,
517                },
518            )
519
520        return found[0].json_schema
521
522    def get_records(
523        self,
524        stream: str,
525        *,
526        normalize_field_names: bool = False,
527        prune_undeclared_fields: bool = True,
528    ) -> LazyDataset:
529        """Read a stream from the connector.
530
531        Args:
532            stream: The name of the stream to read.
533            normalize_field_names: When `True`, field names will be normalized to lower case, with
534                special characters removed. This matches the behavior of PyAirbyte caches and most
535                Airbyte destinations.
536            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
537                which generally matches the behavior of PyAirbyte caches and most Airbyte
538                destinations, specifically when you expect the catalog may be stale. You can disable
539                this to keep all fields in the records.
540
541        This involves the following steps:
542        * Call discover to get the catalog
543        * Generate a configured catalog that syncs the given stream in full_refresh mode
544        * Write the configured catalog and the config to a temporary file
545        * execute the connector with read --config <config_file> --catalog <catalog_file>
546        * Listen to the messages and return the first AirbyteRecordMessages that come along.
547        * Make sure the subprocess is killed when the function returns.
548        """
549        configured_catalog = self.get_configured_catalog(streams=[stream])
550        if len(configured_catalog.streams) == 0:
551            raise exc.PyAirbyteInputError(
552                message="Requested stream does not exist.",
553                context={
554                    "stream": stream,
555                    "available_streams": self.get_available_streams(),
556                    "connector_name": self.name,
557                },
558            ) from KeyError(stream)
559
560        configured_stream = configured_catalog.streams[0]
561
562        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
563            yield from records
564
565        stream_record_handler = StreamRecordHandler(
566            json_schema=self.get_stream_json_schema(stream),
567            prune_extra_fields=prune_undeclared_fields,
568            normalize_keys=normalize_field_names,
569        )
570
571        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
572        progress_tracker = ProgressTracker(
573            ProgressStyle.PLAIN,
574            source=self,
575            cache=None,
576            destination=None,
577            expected_streams=[stream],
578        )
579
580        iterator: Iterator[dict[str, Any]] = (
581            StreamRecord.from_record_message(
582                record_message=record.record,
583                stream_record_handler=stream_record_handler,
584            )
585            for record in self._read_with_catalog(
586                catalog=configured_catalog,
587                progress_tracker=progress_tracker,
588            )
589            if record.record
590        )
591        progress_tracker.log_success()
592        return LazyDataset(
593            iterator,
594            stream_metadata=configured_stream,
595        )
596
597    def get_documents(
598        self,
599        stream: str,
600        title_property: str | None = None,
601        content_properties: list[str] | None = None,
602        metadata_properties: list[str] | None = None,
603        *,
604        render_metadata: bool = False,
605    ) -> Iterable[Document]:
606        """Read a stream from the connector and return the records as documents.
607
608        If metadata_properties is not set, all properties that are not content will be added to
609        the metadata.
610
611        If render_metadata is True, metadata will be rendered in the document, as well as the
612        the main content.
613        """
614        return self.get_records(stream).to_documents(
615            title_property=title_property,
616            content_properties=content_properties,
617            metadata_properties=metadata_properties,
618            render_metadata=render_metadata,
619        )
620
621    def _get_airbyte_message_iterator(
622        self,
623        *,
624        streams: Literal["*"] | list[str] | None = None,
625        state_provider: StateProviderBase | None = None,
626        progress_tracker: ProgressTracker,
627        force_full_refresh: bool = False,
628    ) -> AirbyteMessageIterator:
629        """Get an AirbyteMessageIterator for this source."""
630        return AirbyteMessageIterator(
631            self._read_with_catalog(
632                catalog=self.get_configured_catalog(streams=streams),
633                state=state_provider if not force_full_refresh else None,
634                progress_tracker=progress_tracker,
635            )
636        )
637
638    def _read_with_catalog(
639        self,
640        catalog: ConfiguredAirbyteCatalog,
641        progress_tracker: ProgressTracker,
642        state: StateProviderBase | None = None,
643    ) -> Generator[AirbyteMessage, None, None]:
644        """Call read on the connector.
645
646        This involves the following steps:
647        * Write the config to a temporary file
648        * execute the connector with read --config <config_file> --catalog <catalog_file>
649        * Listen to the messages and return the AirbyteRecordMessages that come along.
650        * Send out telemetry on the performed sync (with information about which source was used and
651          the type of the cache)
652        """
653        with as_temp_files(
654            [
655                self._config,
656                catalog.model_dump_json(),
657                state.to_state_input_file_text() if state else "[]",
658            ]
659        ) as [
660            config_file,
661            catalog_file,
662            state_file,
663        ]:
664            message_generator = self._execute(
665                [
666                    "read",
667                    "--config",
668                    config_file,
669                    "--catalog",
670                    catalog_file,
671                    "--state",
672                    state_file,
673                ],
674                progress_tracker=progress_tracker,
675            )
676            yield from progress_tracker.tally_records_read(message_generator)
677        progress_tracker.log_read_complete()
678
679    def _peek_airbyte_message(
680        self,
681        message: AirbyteMessage,
682        *,
683        raise_on_error: bool = True,
684    ) -> None:
685        """Process an Airbyte message.
686
687        This method handles reading Airbyte messages and taking action, if needed, based on the
688        message type. For instance, log messages are logged, records are tallied, and errors are
689        raised as exceptions if `raise_on_error` is True.
690
691        Raises:
692            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
693        """
694        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
695
696    def _log_incremental_streams(
697        self,
698        *,
699        incremental_streams: set[str] | None = None,
700    ) -> None:
701        """Log the streams which are using incremental sync mode."""
702        log_message = (
703            "The following streams are currently using incremental sync:\n"
704            f"{incremental_streams}\n"
705            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
706        )
707        print(log_message)
708
709    def read(
710        self,
711        cache: CacheBase | None = None,
712        *,
713        streams: str | list[str] | None = None,
714        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
715        force_full_refresh: bool = False,
716        skip_validation: bool = False,
717    ) -> ReadResult:
718        """Read from the connector and write to the cache.
719
720        Args:
721            cache: The cache to write to. If not set, a default cache will be used.
722            streams: Optional if already set. A list of stream names to select for reading. If set
723                to "*", all streams will be selected.
724            write_strategy: The strategy to use when writing to the cache. If a string, it must be
725                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
726                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
727                WriteStrategy.AUTO.
728            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
729                streams will be read in incremental mode if supported by the connector. This option
730                must be True when using the "replace" strategy.
731            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
732                running the connector. This can be helpful in debugging, when you want to send
733                configurations to the connector that otherwise might be rejected by JSON Schema
734                validation rules.
735        """
736        cache = cache or get_default_cache()
737        progress_tracker = ProgressTracker(
738            source=self,
739            cache=cache,
740            destination=None,
741            expected_streams=None,  # Will be set later
742        )
743
744        # Set up state provider if not in full refresh mode
745        if force_full_refresh:
746            state_provider: StateProviderBase | None = None
747        else:
748            state_provider = cache.get_state_provider(
749                source_name=self._name,
750            )
751        state_writer = cache.get_state_writer(source_name=self._name)
752
753        if streams:
754            self.select_streams(streams)
755
756        if not self._selected_stream_names:
757            raise exc.PyAirbyteNoStreamsSelectedError(
758                connector_name=self.name,
759                available_streams=self.get_available_streams(),
760            )
761
762        try:
763            result = self._read_to_cache(
764                cache=cache,
765                catalog_provider=CatalogProvider(self.configured_catalog),
766                stream_names=self._selected_stream_names,
767                state_provider=state_provider,
768                state_writer=state_writer,
769                write_strategy=write_strategy,
770                force_full_refresh=force_full_refresh,
771                skip_validation=skip_validation,
772                progress_tracker=progress_tracker,
773            )
774        except exc.PyAirbyteInternalError as ex:
775            progress_tracker.log_failure(exception=ex)
776            raise exc.AirbyteConnectorFailedError(
777                connector_name=self.name,
778                log_text=self._last_log_messages,
779            ) from ex
780        except Exception as ex:
781            progress_tracker.log_failure(exception=ex)
782            raise
783
784        progress_tracker.log_success()
785        return result
786
787    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
788        self,
789        cache: CacheBase,
790        *,
791        catalog_provider: CatalogProvider,
792        stream_names: list[str],
793        state_provider: StateProviderBase | None,
794        state_writer: StateWriterBase | None,
795        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
796        force_full_refresh: bool = False,
797        skip_validation: bool = False,
798        progress_tracker: ProgressTracker,
799    ) -> ReadResult:
800        """Internal read method."""
801        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
802            warnings.warn(
803                message=(
804                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
805                    "could result in data loss. "
806                    "To silence this warning, use the following: "
807                    'warnings.filterwarnings("ignore", '
808                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
809                ),
810                category=exc.PyAirbyteDataLossWarning,
811                stacklevel=1,
812            )
813        if isinstance(write_strategy, str):
814            try:
815                write_strategy = WriteStrategy(write_strategy)
816            except ValueError:
817                raise exc.PyAirbyteInputError(
818                    message="Invalid strategy",
819                    context={
820                        "write_strategy": write_strategy,
821                        "available_strategies": [s.value for s in WriteStrategy],
822                    },
823                ) from None
824
825        # Run optional validation step
826        if not skip_validation:
827            self.validate_config()
828
829        # Log incremental stream if incremental streams are known
830        if state_provider and state_provider.known_stream_names:
831            # Retrieve set of the known streams support which support incremental sync
832            incremental_streams = (
833                set(self._get_incremental_stream_names())
834                & state_provider.known_stream_names
835                & set(self.get_selected_streams())
836            )
837            if incremental_streams:
838                self._log_incremental_streams(incremental_streams=incremental_streams)
839
840        airbyte_message_iterator = AirbyteMessageIterator(
841            self._read_with_catalog(
842                catalog=catalog_provider.configured_catalog,
843                state=state_provider,
844                progress_tracker=progress_tracker,
845            )
846        )
847        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
848            stdin=airbyte_message_iterator,
849            catalog_provider=catalog_provider,
850            write_strategy=write_strategy,
851            state_writer=state_writer,
852            progress_tracker=progress_tracker,
853        )
854
855        # Flush the WAL, if applicable
856        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
857
858        return ReadResult(
859            source_name=self.name,
860            progress_tracker=progress_tracker,
861            processed_streams=stream_names,
862            cache=cache,
863        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False, cursor_key_overrides: dict[str, str] | None = None, primary_key_overrides: dict[str, str | list[str]] | None = None)
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67        cursor_key_overrides: dict[str, str] | None = None,
 68        primary_key_overrides: dict[str, str | list[str]] | None = None,
 69    ) -> None:
 70        """Initialize the source.
 71
 72        If config is provided, it will be validated against the spec if validate is True.
 73        """
 74        self._to_be_selected_streams: list[str] | str = []
 75        """Used to hold selection criteria before catalog is known."""
 76
 77        super().__init__(
 78            executor=executor,
 79            name=name,
 80            config=config,
 81            config_change_callback=config_change_callback,
 82            validate=validate,
 83        )
 84        self._config_dict: dict[str, Any] | None = None
 85        self._last_log_messages: list[str] = []
 86        self._discovered_catalog: AirbyteCatalog | None = None
 87        self._selected_stream_names: list[str] = []
 88
 89        self._cursor_key_overrides: dict[str, str] = {}
 90        """A mapping of lower-cased stream names to cursor key overrides."""
 91
 92        self._primary_key_overrides: dict[str, list[str]] = {}
 93        """A mapping of lower-cased stream names to primary key overrides."""
 94
 95        if config is not None:
 96            self.set_config(config, validate=validate)
 97        if streams is not None:
 98            self.select_streams(streams)
 99        if cursor_key_overrides is not None:
100            self.set_cursor_keys(**cursor_key_overrides)
101        if primary_key_overrides is not None:
102            self.set_primary_keys(**primary_key_overrides)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
104    def set_streams(self, streams: list[str]) -> None:
105        """Deprecated. See select_streams()."""
106        warnings.warn(
107            "The 'set_streams' method is deprecated and will be removed in a future version. "
108            "Please use the 'select_streams' method instead.",
109            DeprecationWarning,
110            stacklevel=2,
111        )
112        self.select_streams(streams)

Deprecated. See select_streams().

def set_cursor_key(self, stream_name: str, cursor_key: str) -> None:
114    def set_cursor_key(
115        self,
116        stream_name: str,
117        cursor_key: str,
118    ) -> None:
119        """Set the cursor for a single stream.
120
121        Note:
122        - This does not unset previously set cursors.
123        - The cursor key must be a single field name.
124        - Not all streams support custom cursors. If a stream does not support custom cursors,
125          the override may be ignored.
126        - Stream names are case insensitive, while field names are case sensitive.
127        - Stream names are not validated by PyAirbyte. If the stream name
128          does not exist in the catalog, the override may be ignored.
129        """
130        self._cursor_key_overrides[stream_name.lower()] = cursor_key

Set the cursor for a single stream.

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_cursor_keys(self, **kwargs: str) -> None:
132    def set_cursor_keys(
133        self,
134        **kwargs: str,
135    ) -> None:
136        """Override the cursor key for one or more streams.
137
138        Usage:
139            source.set_cursor_keys(
140                stream1="cursor1",
141                stream2="cursor2",
142            )
143
144        Note:
145        - This does not unset previously set cursors.
146        - The cursor key must be a single field name.
147        - Not all streams support custom cursors. If a stream does not support custom cursors,
148          the override may be ignored.
149        - Stream names are case insensitive, while field names are case sensitive.
150        - Stream names are not validated by PyAirbyte. If the stream name
151          does not exist in the catalog, the override may be ignored.
152        """
153        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})

Override the cursor key for one or more streams.

Usage:

source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_key(self, stream_name: str, primary_key: str | list[str]) -> None:
155    def set_primary_key(
156        self,
157        stream_name: str,
158        primary_key: str | list[str],
159    ) -> None:
160        """Set the primary key for a single stream.
161
162        Note:
163        - This does not unset previously set primary keys.
164        - The primary key must be a single field name or a list of field names.
165        - Not all streams support overriding primary keys. If a stream does not support overriding
166          primary keys, the override may be ignored.
167        - Stream names are case insensitive, while field names are case sensitive.
168        - Stream names are not validated by PyAirbyte. If the stream name
169          does not exist in the catalog, the override may be ignored.
170        """
171        self._primary_key_overrides[stream_name.lower()] = (
172            primary_key if isinstance(primary_key, list) else [primary_key]
173        )

Set the primary key for a single stream.

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_keys(self, **kwargs: str | list[str]) -> None:
175    def set_primary_keys(
176        self,
177        **kwargs: str | list[str],
178    ) -> None:
179        """Override the primary keys for one or more streams.
180
181        This does not unset previously set primary keys.
182
183        Usage:
184            source.set_primary_keys(
185                stream1="pk1",
186                stream2=["pk1", "pk2"],
187            )
188
189        Note:
190        - This does not unset previously set primary keys.
191        - The primary key must be a single field name or a list of field names.
192        - Not all streams support overriding primary keys. If a stream does not support overriding
193          primary keys, the override may be ignored.
194        - Stream names are case insensitive, while field names are case sensitive.
195        - Stream names are not validated by PyAirbyte. If the stream name
196          does not exist in the catalog, the override may be ignored.
197        """
198        self._primary_key_overrides.update(
199            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
200        )

Override the primary keys for one or more streams.

This does not unset previously set primary keys.

Usage:

source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def select_all_streams(self) -> None:
214    def select_all_streams(self) -> None:
215        """Select all streams.
216
217        This is a more streamlined equivalent to:
218        > source.select_streams(source.get_available_streams()).
219        """
220        if self._config_dict is None:
221            self._to_be_selected_streams = "*"
222            self._log_warning_preselected_stream(self._to_be_selected_streams)
223            return
224
225        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
227    def select_streams(self, streams: str | list[str]) -> None:
228        """Select the stream names that should be read from the connector.
229
230        Args:
231            streams: A list of stream names to select. If set to "*", all streams will be selected.
232
233        Currently, if this is not set, all streams will be read.
234        """
235        if self._config_dict is None:
236            self._to_be_selected_streams = streams
237            self._log_warning_preselected_stream(streams)
238            return
239
240        if streams == "*":
241            self.select_all_streams()
242            return
243
244        if isinstance(streams, str):
245            # If a single stream is provided, convert it to a one-item list
246            streams = [streams]
247
248        available_streams = self.get_available_streams()
249        for stream in streams:
250            if stream not in available_streams:
251                raise exc.AirbyteStreamNotFoundError(
252                    stream_name=stream,
253                    connector_name=self.name,
254                    available_streams=available_streams,
255                )
256        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
258    def get_selected_streams(self) -> list[str]:
259        """Get the selected streams.
260
261        If no streams are selected, return an empty list.
262        """
263        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
265    def set_config(
266        self,
267        config: dict[str, Any],
268        *,
269        validate: bool = True,
270    ) -> None:
271        """Set the config for the connector.
272
273        If validate is True, raise an exception if the config fails validation.
274
275        If validate is False, validation will be deferred until check() or validate_config()
276        is called.
277        """
278        if validate:
279            self.validate_config(config)
280
281        self._config_dict = config
282
283        if self._to_be_selected_streams:
284            self.select_streams(self._to_be_selected_streams)
285            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_config(self) -> dict[str, typing.Any]:
287    def get_config(self) -> dict[str, Any]:
288        """Get the config for the connector."""
289        return self._config

Get the config for the connector.

def get_available_streams(self) -> list[str]:
318    def get_available_streams(self) -> list[str]:
319        """Get the available streams from the spec."""
320        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
352    @property
353    def config_spec(self) -> dict[str, Any]:
354        """Generate a configuration spec for this connector, as a JSON Schema definition.
355
356        This function generates a JSON Schema dictionary with configuration specs for the
357        current connector, as a dictionary.
358
359        Returns:
360            dict: The JSON Schema configuration spec as a dictionary.
361        """
362        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
364    def print_config_spec(
365        self,
366        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
367        *,
368        output_file: Path | str | None = None,
369    ) -> None:
370        """Print the configuration spec for this connector.
371
372        Args:
373            format: The format to print the spec in. Must be "yaml" or "json".
374            output_file: Optional. If set, the spec will be written to the given file path.
375                Otherwise, it will be printed to the console.
376        """
377        if format not in {"yaml", "json"}:
378            raise exc.PyAirbyteInputError(
379                message="Invalid format. Expected 'yaml' or 'json'",
380                input_value=format,
381            )
382        if isinstance(output_file, str):
383            output_file = Path(output_file)
384
385        if format == "yaml":
386            content = yaml.dump(self.config_spec, indent=2)
387        elif format == "json":
388            content = json.dumps(self.config_spec, indent=2)
389
390        if output_file:
391            output_file.write_text(content)
392            return
393
394        syntax_highlighted = Syntax(content, format)
395        print(syntax_highlighted)

Print the configuration spec for this connector.

Arguments:
  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
412    @property
413    def docs_url(self) -> str:
414        """Get the URL to the connector's documentation."""
415        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
416            "source-", ""
417        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
419    @property
420    def discovered_catalog(self) -> AirbyteCatalog:
421        """Get the raw catalog for the given streams.
422
423        If the catalog is not yet known, we call discover to get it.
424        """
425        if self._discovered_catalog is None:
426            self._discovered_catalog = self._discover()
427
428        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
430    @property
431    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
432        """Get the configured catalog for the given streams.
433
434        If the raw catalog is not yet known, we call discover to get it.
435
436        If no specific streams are selected, we return a catalog that syncs all available streams.
437
438        TODO: We should consider disabling by default the streams that the connector would
439        disable by default. (For instance, streams that require a premium license are sometimes
440        disabled by default within the connector.)
441        """
442        # Ensure discovered catalog is cached before we start
443        _ = self.discovered_catalog
444
445        # Filter for selected streams if set, otherwise use all available streams:
446        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
447        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
449    def get_configured_catalog(
450        self,
451        streams: Literal["*"] | list[str] | None = None,
452    ) -> ConfiguredAirbyteCatalog:
453        """Get a configured catalog for the given streams.
454
455        If no streams are provided, the selected streams will be used. If no streams are selected,
456        all available streams will be used.
457
458        If '*' is provided, all available streams will be used.
459        """
460        selected_streams: list[str] = []
461        if streams is None:
462            selected_streams = self._selected_stream_names or self.get_available_streams()
463        elif streams == "*":
464            selected_streams = self.get_available_streams()
465        elif isinstance(streams, list):
466            selected_streams = streams
467        else:
468            raise exc.PyAirbyteInputError(
469                message="Invalid streams argument.",
470                input_value=streams,
471            )
472
473        return ConfiguredAirbyteCatalog(
474            streams=[
475                ConfiguredAirbyteStream(
476                    stream=stream,
477                    destination_sync_mode=DestinationSyncMode.overwrite,
478                    sync_mode=SyncMode.incremental,
479                    primary_key=(
480                        [self._primary_key_overrides[stream.name.lower()]]
481                        if stream.name.lower() in self._primary_key_overrides
482                        else stream.source_defined_primary_key
483                    ),
484                    cursor_field=(
485                        [self._cursor_key_overrides[stream.name.lower()]]
486                        if stream.name.lower() in self._cursor_key_overrides
487                        else stream.default_cursor_field
488                    ),
489                    # These are unused in the current implementation:
490                    generation_id=None,
491                    minimum_generation_id=None,
492                    sync_id=None,
493                )
494                for stream in self.discovered_catalog.streams
495                if stream.name in selected_streams
496            ],
497        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
499    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
500        """Return the JSON Schema spec for the specified stream name."""
501        catalog: AirbyteCatalog = self.discovered_catalog
502        found: list[AirbyteStream] = [
503            stream for stream in catalog.streams if stream.name == stream_name
504        ]
505
506        if len(found) == 0:
507            raise exc.PyAirbyteInputError(
508                message="Stream name does not exist in catalog.",
509                input_value=stream_name,
510            )
511
512        if len(found) > 1:
513            raise exc.PyAirbyteInternalError(
514                message="Duplicate streams found with the same name.",
515                context={
516                    "found_streams": found,
517                },
518            )
519
520        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
522    def get_records(
523        self,
524        stream: str,
525        *,
526        normalize_field_names: bool = False,
527        prune_undeclared_fields: bool = True,
528    ) -> LazyDataset:
529        """Read a stream from the connector.
530
531        Args:
532            stream: The name of the stream to read.
533            normalize_field_names: When `True`, field names will be normalized to lower case, with
534                special characters removed. This matches the behavior of PyAirbyte caches and most
535                Airbyte destinations.
536            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
537                which generally matches the behavior of PyAirbyte caches and most Airbyte
538                destinations, specifically when you expect the catalog may be stale. You can disable
539                this to keep all fields in the records.
540
541        This involves the following steps:
542        * Call discover to get the catalog
543        * Generate a configured catalog that syncs the given stream in full_refresh mode
544        * Write the configured catalog and the config to a temporary file
545        * execute the connector with read --config <config_file> --catalog <catalog_file>
546        * Listen to the messages and return the first AirbyteRecordMessages that come along.
547        * Make sure the subprocess is killed when the function returns.
548        """
549        configured_catalog = self.get_configured_catalog(streams=[stream])
550        if len(configured_catalog.streams) == 0:
551            raise exc.PyAirbyteInputError(
552                message="Requested stream does not exist.",
553                context={
554                    "stream": stream,
555                    "available_streams": self.get_available_streams(),
556                    "connector_name": self.name,
557                },
558            ) from KeyError(stream)
559
560        configured_stream = configured_catalog.streams[0]
561
562        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
563            yield from records
564
565        stream_record_handler = StreamRecordHandler(
566            json_schema=self.get_stream_json_schema(stream),
567            prune_extra_fields=prune_undeclared_fields,
568            normalize_keys=normalize_field_names,
569        )
570
571        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
572        progress_tracker = ProgressTracker(
573            ProgressStyle.PLAIN,
574            source=self,
575            cache=None,
576            destination=None,
577            expected_streams=[stream],
578        )
579
580        iterator: Iterator[dict[str, Any]] = (
581            StreamRecord.from_record_message(
582                record_message=record.record,
583                stream_record_handler=stream_record_handler,
584            )
585            for record in self._read_with_catalog(
586                catalog=configured_catalog,
587                progress_tracker=progress_tracker,
588            )
589            if record.record
590        )
591        progress_tracker.log_success()
592        return LazyDataset(
593            iterator,
594            stream_metadata=configured_stream,
595        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
597    def get_documents(
598        self,
599        stream: str,
600        title_property: str | None = None,
601        content_properties: list[str] | None = None,
602        metadata_properties: list[str] | None = None,
603        *,
604        render_metadata: bool = False,
605    ) -> Iterable[Document]:
606        """Read a stream from the connector and return the records as documents.
607
608        If metadata_properties is not set, all properties that are not content will be added to
609        the metadata.
610
611        If render_metadata is True, metadata will be rendered in the document, as well as the
612        the main content.
613        """
614        return self.get_records(stream).to_documents(
615            title_property=title_property,
616            content_properties=content_properties,
617            metadata_properties=metadata_properties,
618            render_metadata=render_metadata,
619        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
709    def read(
710        self,
711        cache: CacheBase | None = None,
712        *,
713        streams: str | list[str] | None = None,
714        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
715        force_full_refresh: bool = False,
716        skip_validation: bool = False,
717    ) -> ReadResult:
718        """Read from the connector and write to the cache.
719
720        Args:
721            cache: The cache to write to. If not set, a default cache will be used.
722            streams: Optional if already set. A list of stream names to select for reading. If set
723                to "*", all streams will be selected.
724            write_strategy: The strategy to use when writing to the cache. If a string, it must be
725                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
726                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
727                WriteStrategy.AUTO.
728            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
729                streams will be read in incremental mode if supported by the connector. This option
730                must be True when using the "replace" strategy.
731            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
732                running the connector. This can be helpful in debugging, when you want to send
733                configurations to the connector that otherwise might be rejected by JSON Schema
734                validation rules.
735        """
736        cache = cache or get_default_cache()
737        progress_tracker = ProgressTracker(
738            source=self,
739            cache=cache,
740            destination=None,
741            expected_streams=None,  # Will be set later
742        )
743
744        # Set up state provider if not in full refresh mode
745        if force_full_refresh:
746            state_provider: StateProviderBase | None = None
747        else:
748            state_provider = cache.get_state_provider(
749                source_name=self._name,
750            )
751        state_writer = cache.get_state_writer(source_name=self._name)
752
753        if streams:
754            self.select_streams(streams)
755
756        if not self._selected_stream_names:
757            raise exc.PyAirbyteNoStreamsSelectedError(
758                connector_name=self.name,
759                available_streams=self.get_available_streams(),
760            )
761
762        try:
763            result = self._read_to_cache(
764                cache=cache,
765                catalog_provider=CatalogProvider(self.configured_catalog),
766                stream_names=self._selected_stream_names,
767                state_provider=state_provider,
768                state_writer=state_writer,
769                write_strategy=write_strategy,
770                force_full_refresh=force_full_refresh,
771                skip_validation=skip_validation,
772                progress_tracker=progress_tracker,
773            )
774        except exc.PyAirbyteInternalError as ex:
775            progress_tracker.log_failure(exception=ex)
776            raise exc.AirbyteConnectorFailedError(
777                connector_name=self.name,
778                log_text=self._last_log_messages,
779            ) from ex
780        except Exception as ex:
781            progress_tracker.log_failure(exception=ex)
782            raise
783
784        progress_tracker.log_success()
785        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
config_hash
validate_config
connector_version
check
install
uninstall
@dataclass
class ConnectorMetadata:
 73@dataclass
 74class ConnectorMetadata:
 75    """Metadata for a connector."""
 76
 77    name: str
 78    """Connector name. For example, "source-google-sheets"."""
 79
 80    latest_available_version: str | None
 81    """The latest available version of the connector."""
 82
 83    pypi_package_name: str | None
 84    """The name of the PyPI package for the connector, if it exists."""
 85
 86    language: Language | None
 87    """The language of the connector."""
 88
 89    install_types: set[InstallType]
 90    """The supported install types for the connector."""
 91
 92    @property
 93    def default_install_type(self) -> InstallType:
 94        """Return the default install type for the connector."""
 95        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 96            return InstallType.YAML
 97
 98        if InstallType.PYTHON in self.install_types:
 99            return InstallType.PYTHON
100
101        # Else: Java or Docker
102        return InstallType.DOCKER

Metadata for a connector.

ConnectorMetadata( name: str, latest_available_version: str | None, pypi_package_name: str | None, language: airbyte.sources.registry.Language | None, install_types: set[airbyte.sources.registry.InstallType])
name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

The language of the connector.

The supported install types for the connector.

default_install_type: airbyte.sources.registry.InstallType
 92    @property
 93    def default_install_type(self) -> InstallType:
 94        """Return the default install type for the connector."""
 95        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 96            return InstallType.YAML
 97
 98        if InstallType.PYTHON in self.install_types:
 99            return InstallType.PYTHON
100
101        # Else: Java or Docker
102        return InstallType.DOCKER

Return the default install type for the connector.