airbyte.sources

Sources connectors module for PyAirbyte.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Sources connectors module for PyAirbyte."""
 3
 4from __future__ import annotations
 5
 6from typing import TYPE_CHECKING
 7
 8from airbyte.registry import (
 9    ConnectorMetadata,
10    get_available_connectors,
11    get_connector_metadata,
12)
13from airbyte.sources.base import Source
14from airbyte.sources.util import (
15    get_benchmark_source,
16    get_source,
17)
18
19
20# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
21if TYPE_CHECKING:
22    # ruff: noqa: TC004  # imports used for more than type checking
23    from airbyte.sources import (
24        base,
25        registry,
26        util,
27    )
28
29__all__ = [
30    # Submodules
31    "base",
32    "registry",
33    "util",
34    # Factories
35    "get_source",
36    "get_benchmark_source",
37    # Helper Functions
38    "get_available_connectors",
39    "get_connector_metadata",
40    # Classes
41    "Source",
42    "ConnectorMetadata",
43]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, use_python: bool | pathlib.Path | str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None, no_executor: bool = False) -> Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    use_python: bool | Path | str | None = None,
 56    pip_url: str | None = None,
 57    local_executable: Path | str | None = None,
 58    docker_image: bool | str | None = None,
 59    use_host_network: bool = False,
 60    source_manifest: bool | dict | Path | str | None = None,
 61    install_if_missing: bool = True,
 62    install_root: Path | None = None,
 63    no_executor: bool = False,
 64) -> Source:
 65    """Get a connector by name and version.
 66
 67    If an explicit install or execution method is requested (e.g. `local_executable`,
 68    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 69
 70    Otherwise, an appropriate method will be selected based on the available connector metadata:
 71    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 72       will be downloaded and used to to execute the connector.
 73    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 74    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 75       will be executed using Docker.
 76
 77    Args:
 78        name: connector name
 79        config: connector config - if not provided, you need to set it later via the set_config
 80            method.
 81        config_change_callback: callback function to be called when the connector config changes.
 82        streams: list of stream names to select for reading. If set to "*", all streams will be
 83            selected. If not provided, you can set it later via the `select_streams()` or
 84            `select_all_streams()` method.
 85        version: connector version - if not provided, the currently installed version will be used.
 86            If no version is installed, the latest available version will be used. The version can
 87            also be set to "latest" to force the use of the latest available version.
 88        use_python: (Optional.) Python interpreter specification:
 89            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
 90            - False: Use Docker instead.
 91            - Path: Use interpreter at this path.
 92            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
 93                installed, it will be installed by uv. (This generally adds less than 3 seconds
 94                to install times.)
 95        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 96            connector name.
 97        local_executable: If set, the connector will be assumed to already be installed and will be
 98            executed using this path or executable name. Otherwise, the connector will be installed
 99            automatically in a virtual environment.
100        docker_image: If set, the connector will be executed using Docker. You can specify `True`
101            to use the default image for the connector, or you can specify a custom image name.
102            If `version` is specified and your image name does not already contain a tag
103            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
104        use_host_network: If set, along with docker_image, the connector will be executed using
105            the host network. This is useful for connectors that need to access resources on
106            the host machine, such as a local database. This parameter is ignored when
107            `docker_image` is not set.
108        source_manifest: If set, the connector will be executed based on a declarative YAML
109            source definition. This input can be `True` to attempt to auto-download a YAML spec,
110            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
111            the local file system, or `str` to pull the definition from a web URL.
112        install_if_missing: Whether to install the connector if it is not available locally. This
113            parameter is ignored when `local_executable` or `source_manifest` are set.
114        install_root: (Optional.) The root directory where the virtual environment will be
115            created. If not provided, the current working directory will be used.
116        no_executor: If True, use NoOpExecutor which fetches specs from the registry without
117            local installation. This is useful for scenarios where you need to validate
118            configurations but don't need to run the connector locally (e.g., deploying to Cloud).
119    """
120    executor = get_connector_executor(
121        name=name,
122        version=version,
123        use_python=use_python,
124        pip_url=pip_url,
125        local_executable=local_executable,
126        docker_image=docker_image,
127        use_host_network=use_host_network,
128        source_manifest=source_manifest,
129        install_if_missing=install_if_missing,
130        install_root=install_root,
131        no_executor=no_executor,
132    )
133
134    return Source(
135        name=name,
136        config=config,
137        config_change_callback=config_change_callback,
138        streams=streams,
139        executor=executor,
140    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • use_python: (Optional.) Python interpreter specification:
    • True: Use current Python interpreter. (Inferred if pip_url is set.)
    • False: Use Docker instead.
    • Path: Use interpreter at this path.
    • str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
  • no_executor: If True, use NoOpExecutor which fetches specs from the registry without local installation. This is useful for scenarios where you need to validate configurations but don't need to run the connector locally (e.g., deploying to Cloud).
def get_benchmark_source( num_records: int | str = '5e5', *, install_if_missing: bool = True) -> Source:
143def get_benchmark_source(
144    num_records: int | str = "5e5",
145    *,
146    install_if_missing: bool = True,
147) -> Source:
148    """Get a source for benchmarking.
149
150    This source will generate dummy records for performance benchmarking purposes.
151    You can specify the number of records to generate using the `num_records` parameter.
152    The `num_records` parameter can be an integer or a string in scientific notation.
153    For example, `"5e6"` will generate 5 million records. If underscores are providing
154    within a numeric a string, they will be ignored.
155
156    Args:
157        num_records: The number of records to generate. Defaults to "5e5", or
158            500,000 records.
159            Can be an integer (`1000`) or a string in scientific notation.
160            For example, `"5e6"` will generate 5 million records.
161        install_if_missing: Whether to install the source if it is not available locally.
162
163    Returns:
164        Source: The source object for benchmarking.
165    """
166    if isinstance(num_records, str):
167        try:
168            num_records = int(Decimal(num_records.replace("_", "")))
169        except InvalidOperation as ex:
170            raise PyAirbyteInputError(
171                message="Invalid number format.",
172                original_exception=ex,
173                input_value=str(num_records),
174            ) from None
175
176    return get_source(
177        name="source-e2e-test",
178        docker_image=True,
179        # docker_image="airbyte/source-e2e-test:latest",
180        config={
181            "type": "BENCHMARK",
182            "schema": "FIVE_STRING_COLUMNS",
183            "terminationCondition": {
184                "type": "MAX_RECORDS",
185                "max": num_records,
186            },
187        },
188        streams="*",
189        install_if_missing=install_if_missing,
190    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records: The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
  • install_if_missing: Whether to install the source if it is not available locally.
Returns:

Source: The source object for benchmarking.

def get_available_connectors( install_type: airbyte.registry.InstallType | str | None = <InstallType.INSTALLABLE: 'installable'>) -> list[str]:
251def get_available_connectors(
252    install_type: InstallType | str | None = InstallType.INSTALLABLE,
253) -> list[str]:
254    """Return a list of all available connectors.
255
256    Connectors will be returned in alphabetical order, with the standard prefix "source-".
257
258    Args:
259        install_type: The type of installation for the connector.
260            Defaults to `InstallType.INSTALLABLE`.
261    """
262    if install_type is None or install_type == InstallType.INSTALLABLE:
263        # Filter for installable connectors (default behavior).
264        if is_docker_installed():
265            logger.info("Docker is detected. Returning all connectors.")
266            return sorted(_get_registry_cache().keys())
267
268        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
269        return sorted(
270            [
271                connector_name
272                for connector_name, conn_info in _get_registry_cache().items()
273                if conn_info.language in {Language.PYTHON, Language.MANIFEST_ONLY}
274            ]
275        )
276
277    if not isinstance(install_type, InstallType):
278        install_type = InstallType(install_type)
279
280    if install_type == InstallType.PYTHON:
281        return sorted(
282            connector_name
283            for connector_name, conn_info in _get_registry_cache().items()
284            if conn_info.pypi_package_name is not None
285        )
286
287    if install_type == InstallType.JAVA:
288        warnings.warn(
289            message="Java connectors are not yet supported.",
290            stacklevel=2,
291        )
292        return sorted(
293            connector_name
294            for connector_name, conn_info in _get_registry_cache().items()
295            if conn_info.language == Language.JAVA
296        )
297
298    if install_type in {InstallType.DOCKER, InstallType.ANY}:
299        return sorted(_get_registry_cache().keys())
300
301    if install_type == InstallType.YAML:
302        return sorted(
303            conn.name
304            for conn in _get_registry_cache().values()
305            if InstallType.YAML in conn.install_types
306        )
307
308    # pragma: no cover  # Should never be reached.
309    raise exc.PyAirbyteInputError(
310        message="Invalid install type.",
311        context={
312            "install_type": install_type,
313        },
314    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

Arguments:
  • install_type: The type of installation for the connector. Defaults to InstallType.INSTALLABLE.
def get_connector_metadata(name: str) -> ConnectorMetadata | None:
221def get_connector_metadata(name: str) -> ConnectorMetadata | None:
222    """Check the cache for the connector.
223
224    If the cache is empty, populate by calling update_cache.
225    """
226    registry_url = _get_registry_url()
227
228    if _is_registry_disabled(registry_url):
229        return None
230
231    cache = copy(_get_registry_cache())
232
233    if not cache:
234        raise exc.PyAirbyteInternalError(
235            message="Connector registry could not be loaded.",
236            context={
237                "registry_url": _get_registry_url(),
238            },
239        )
240    if name not in cache:
241        raise exc.AirbyteConnectorNotRegisteredError(
242            connector_name=name,
243            context={
244                "registry_url": _get_registry_url(),
245                "available_connectors": get_available_connectors(),
246            },
247        )
248    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

class Source(airbyte._connector_base.ConnectorBase):
 68class Source(ConnectorBase):  # noqa: PLR0904
 69    """A class representing a source that can be called."""
 70
 71    connector_type = "source"
 72
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)
118
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)
128
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key
146
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
169
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )
189
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )
216
217    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
218        """Logs a warning message indicating stream selection which are not selected yet."""
219        if streams == "*":
220            print(
221                "Warning: Config is not set yet. All streams will be selected after config is set.",
222                file=sys.stderr,
223            )
224        else:
225            print(
226                "Warning: Config is not set yet. "
227                f"Streams to be selected after config is set: {streams}",
228                file=sys.stderr,
229            )
230
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()
243
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams
274
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names
281
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []
303
304    def _discover(self) -> AirbyteCatalog:
305        """Call discover on the connector.
306
307        This involves the following steps:
308        - Write the config to a temporary file
309        - execute the connector with discover --config <config_file>
310        - Listen to the messages and return the first AirbyteCatalog that comes along.
311        - Make sure the subprocess is killed when the function returns.
312        """
313        with as_temp_files([self._hydrated_config]) as [config_file]:
314            for msg in self._execute(["discover", "--config", config_file]):
315                if msg.type == Type.CATALOG and msg.catalog:
316                    return msg.catalog
317            raise exc.AirbyteConnectorMissingCatalogError(
318                connector_name=self.name,
319                log_text=self._last_log_messages,
320            )
321
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]
325
326    def _get_incremental_stream_names(self) -> list[str]:
327        """Get the name of streams that support incremental sync."""
328        return [
329            stream.name
330            for stream in self.discovered_catalog.streams
331            if SyncMode.incremental in stream.supported_sync_modes
332        ]
333
334    @override
335    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
336        """Call spec on the connector.
337
338        This involves the following steps:
339        * execute the connector with spec
340        * Listen to the messages and return the first AirbyteCatalog that comes along.
341        * Make sure the subprocess is killed when the function returns.
342        """
343        if force_refresh or self._spec is None:
344            for msg in self._execute(["spec"]):
345                if msg.type == Type.SPEC and msg.spec:
346                    self._spec = msg.spec
347                    break
348
349        if self._spec:
350            return self._spec
351
352        raise exc.AirbyteConnectorMissingSpecError(
353            connector_name=self.name,
354            log_text=self._last_log_messages,
355        )
356
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification
368
369    @property
370    def _yaml_spec(self) -> str:
371        """Get the spec as a yaml string.
372
373        For now, the primary use case is for writing and debugging a valid config for a source.
374
375        This is private for now because we probably want better polish before exposing this
376        as a stable interface. This will also get easier when we have docs links with this info
377        for each connector.
378        """
379        spec_obj: ConnectorSpecification = self._get_spec()
380        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
381        # convert to a yaml string
382        return yaml.dump(spec_dict)
383
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )
390
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog
401
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)
420
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424        *,
425        force_full_refresh: bool = False,
426    ) -> ConfiguredAirbyteCatalog:
427        """Get a configured catalog for the given streams.
428
429        If no streams are provided, the selected streams will be used. If no streams are selected,
430        all available streams will be used.
431
432        If '*' is provided, all available streams will be used.
433
434        If force_full_refresh is True, streams will be configured with full_refresh sync mode
435        when supported by the stream. Otherwise, incremental sync mode is used when supported.
436        """
437        selected_streams: list[str] = []
438        if streams is None:
439            selected_streams = self._selected_stream_names or self.get_available_streams()
440        elif streams == "*":
441            selected_streams = self.get_available_streams()
442        elif isinstance(streams, list):
443            selected_streams = streams
444        else:
445            raise exc.PyAirbyteInputError(
446                message="Invalid streams argument.",
447                input_value=streams,
448            )
449
450        def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
451            """Determine the sync mode for a stream based on force_full_refresh and support."""
452            # Use getattr to handle mocks or streams without supported_sync_modes attribute
453            supported_modes = getattr(stream, "supported_sync_modes", None)
454
455            if force_full_refresh:
456                # When force_full_refresh is True, prefer full_refresh if supported
457                if supported_modes and SyncMode.full_refresh in supported_modes:
458                    return SyncMode.full_refresh
459                # Fall back to incremental if full_refresh is not supported
460                return SyncMode.incremental
461
462            # Default behavior: preserve previous semantics (always incremental)
463            return SyncMode.incremental
464
465        return ConfiguredAirbyteCatalog(
466            streams=[
467                ConfiguredAirbyteStream(
468                    stream=stream,
469                    destination_sync_mode=DestinationSyncMode.overwrite,
470                    sync_mode=_get_sync_mode(stream),
471                    primary_key=(
472                        [self._primary_key_overrides[stream.name.lower()]]
473                        if stream.name.lower() in self._primary_key_overrides
474                        else stream.source_defined_primary_key
475                    ),
476                    cursor_field=(
477                        [self._cursor_key_overrides[stream.name.lower()]]
478                        if stream.name.lower() in self._cursor_key_overrides
479                        else stream.default_cursor_field
480                    ),
481                    # These are unused in the current implementation:
482                    generation_id=None,
483                    minimum_generation_id=None,
484                    sync_id=None,
485                )
486                for stream in self.discovered_catalog.streams
487                if stream.name in selected_streams
488            ],
489        )
490
491    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
492        """Return the JSON Schema spec for the specified stream name."""
493        catalog: AirbyteCatalog = self.discovered_catalog
494        found: list[AirbyteStream] = [
495            stream for stream in catalog.streams if stream.name == stream_name
496        ]
497
498        if len(found) == 0:
499            raise exc.PyAirbyteInputError(
500                message="Stream name does not exist in catalog.",
501                input_value=stream_name,
502            )
503
504        if len(found) > 1:
505            raise exc.PyAirbyteInternalError(
506                message="Duplicate streams found with the same name.",
507                context={
508                    "found_streams": found,
509                },
510            )
511
512        return found[0].json_schema
513
514    def get_records(
515        self,
516        stream: str,
517        *,
518        limit: int | None = None,
519        stop_event: threading.Event | None = None,
520        normalize_field_names: bool = False,
521        prune_undeclared_fields: bool = True,
522    ) -> LazyDataset:
523        """Read a stream from the connector.
524
525        Args:
526            stream: The name of the stream to read.
527            limit: The maximum number of records to read. If None, all records will be read.
528            stop_event: If set, the event can be triggered by the caller to stop reading records
529                and terminate the process.
530            normalize_field_names: When `True`, field names will be normalized to lower case, with
531                special characters removed. This matches the behavior of PyAirbyte caches and most
532                Airbyte destinations.
533            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
534                which generally matches the behavior of PyAirbyte caches and most Airbyte
535                destinations, specifically when you expect the catalog may be stale. You can disable
536                this to keep all fields in the records.
537
538        This involves the following steps:
539        * Call discover to get the catalog
540        * Generate a configured catalog that syncs the given stream in full_refresh mode
541        * Write the configured catalog and the config to a temporary file
542        * execute the connector with read --config <config_file> --catalog <catalog_file>
543        * Listen to the messages and return the first AirbyteRecordMessages that come along.
544        * Make sure the subprocess is killed when the function returns.
545        """
546        stop_event = stop_event or threading.Event()
547        configured_catalog = self.get_configured_catalog(streams=[stream])
548        if len(configured_catalog.streams) == 0:
549            raise exc.PyAirbyteInputError(
550                message="Requested stream does not exist.",
551                context={
552                    "stream": stream,
553                    "available_streams": self.get_available_streams(),
554                    "connector_name": self.name,
555                },
556            ) from KeyError(stream)
557
558        configured_stream = configured_catalog.streams[0]
559
560        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
561            yield from records
562
563        stream_record_handler = StreamRecordHandler(
564            json_schema=self.get_stream_json_schema(stream),
565            prune_extra_fields=prune_undeclared_fields,
566            normalize_keys=normalize_field_names,
567        )
568
569        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
570        progress_tracker = ProgressTracker(
571            ProgressStyle.PLAIN,
572            source=self,
573            cache=None,
574            destination=None,
575            expected_streams=[stream],
576        )
577
578        iterator: Iterator[dict[str, Any]] = (
579            StreamRecord.from_record_message(
580                record_message=record.record,
581                stream_record_handler=stream_record_handler,
582            )
583            for record in self._read_with_catalog(
584                catalog=configured_catalog,
585                progress_tracker=progress_tracker,
586                stop_event=stop_event,
587            )
588            if record.record
589        )
590        if limit is not None:
591            # Stop the iterator after the limit is reached
592            iterator = islice(iterator, limit)
593
594        return LazyDataset(
595            iterator,
596            stream_metadata=configured_stream,
597            stop_event=stop_event,
598            progress_tracker=progress_tracker,
599        )
600
601    def get_documents(
602        self,
603        stream: str,
604        title_property: str | None = None,
605        content_properties: list[str] | None = None,
606        metadata_properties: list[str] | None = None,
607        *,
608        render_metadata: bool = False,
609    ) -> Iterable[Document]:
610        """Read a stream from the connector and return the records as documents.
611
612        If metadata_properties is not set, all properties that are not content will be added to
613        the metadata.
614
615        If render_metadata is True, metadata will be rendered in the document, as well as the
616        the main content.
617        """
618        return self.get_records(stream).to_documents(
619            title_property=title_property,
620            content_properties=content_properties,
621            metadata_properties=metadata_properties,
622            render_metadata=render_metadata,
623        )
624
625    def get_samples(
626        self,
627        streams: list[str] | Literal["*"] | None = None,
628        *,
629        limit: int = 5,
630        on_error: Literal["raise", "ignore", "log"] = "raise",
631    ) -> dict[str, InMemoryDataset | None]:
632        """Get a sample of records from the given streams."""
633        if streams == "*":
634            streams = self.get_available_streams()
635        elif streams is None:
636            streams = self.get_selected_streams()
637
638        results: dict[str, InMemoryDataset | None] = {}
639        for stream in streams:
640            stop_event = threading.Event()
641            try:
642                results[stream] = self.get_records(
643                    stream,
644                    limit=limit,
645                    stop_event=stop_event,
646                ).fetch_all()
647                stop_event.set()
648            except Exception as ex:
649                results[stream] = None
650                if on_error == "ignore":
651                    continue
652
653                if on_error == "raise":
654                    raise ex from None
655
656                if on_error == "log":
657                    print(f"Error fetching sample for stream '{stream}': {ex}")
658
659        return results
660
661    def print_samples(
662        self,
663        streams: list[str] | Literal["*"] | None = None,
664        *,
665        limit: int = 5,
666        on_error: Literal["raise", "ignore", "log"] = "log",
667    ) -> None:
668        """Print a sample of records from the given streams."""
669        internal_cols: list[str] = [
670            AB_EXTRACTED_AT_COLUMN,
671            AB_META_COLUMN,
672            AB_RAW_ID_COLUMN,
673        ]
674        col_limit = 10
675        if streams == "*":
676            streams = self.get_available_streams()
677        elif streams is None:
678            streams = self.get_selected_streams()
679
680        console = Console()
681
682        console.print(
683            Markdown(
684                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
685                justify="left",
686            )
687        )
688
689        for stream in streams:
690            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
691            samples = self.get_samples(
692                streams=[stream],
693                limit=limit,
694                on_error=on_error,
695            )
696            dataset = samples[stream]
697
698            table = Table(
699                show_header=True,
700                show_lines=True,
701            )
702            if dataset is None:
703                console.print(
704                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
705                )
706                continue
707
708            if len(dataset.column_names) > col_limit:
709                # We'll pivot the columns so each column is its own row
710                table.add_column("Column Name")
711                for _ in range(len(dataset)):
712                    table.add_column(overflow="fold")
713                for col in dataset.column_names:
714                    table.add_row(
715                        Markdown(f"**`{col}`**"),
716                        *[escape(str(record[col])) for record in dataset],
717                    )
718            else:
719                for col in dataset.column_names:
720                    table.add_column(
721                        Markdown(f"**`{col}`**"),
722                        overflow="fold",
723                    )
724
725                for record in dataset:
726                    table.add_row(
727                        *[
728                            escape(str(val))
729                            for key, val in record.items()
730                            # Exclude internal Airbyte columns.
731                            if key not in internal_cols
732                        ]
733                    )
734
735            console.print(table)
736
737        console.print(Markdown("--------------"))
738
739    def _get_airbyte_message_iterator(
740        self,
741        *,
742        streams: Literal["*"] | list[str] | None = None,
743        state_provider: StateProviderBase | None = None,
744        progress_tracker: ProgressTracker,
745        force_full_refresh: bool = False,
746    ) -> AirbyteMessageIterator:
747        """Get an AirbyteMessageIterator for this source."""
748        return AirbyteMessageIterator(
749            self._read_with_catalog(
750                catalog=self.get_configured_catalog(
751                    streams=streams,
752                    force_full_refresh=force_full_refresh,
753                ),
754                state=state_provider if not force_full_refresh else None,
755                progress_tracker=progress_tracker,
756            )
757        )
758
759    def _read_with_catalog(
760        self,
761        catalog: ConfiguredAirbyteCatalog,
762        progress_tracker: ProgressTracker,
763        *,
764        state: StateProviderBase | None = None,
765        stop_event: threading.Event | None = None,
766    ) -> Generator[AirbyteMessage, None, None]:
767        """Call read on the connector.
768
769        This involves the following steps:
770        * Write the config to a temporary file
771        * execute the connector with read --config <config_file> --catalog <catalog_file>
772        * Listen to the messages and return the AirbyteRecordMessages that come along.
773        * Send out telemetry on the performed sync (with information about which source was used and
774          the type of the cache)
775        """
776        with as_temp_files(
777            [
778                self._hydrated_config,
779                catalog.model_dump_json(exclude_none=True),
780                state.to_state_input_file_text() if state else "[]",
781            ]
782        ) as [
783            config_file,
784            catalog_file,
785            state_file,
786        ]:
787            message_generator = self._execute(
788                [
789                    "read",
790                    "--config",
791                    config_file,
792                    "--catalog",
793                    catalog_file,
794                    "--state",
795                    state_file,
796                ],
797                progress_tracker=progress_tracker,
798            )
799            for message in progress_tracker.tally_records_read(message_generator):
800                if stop_event and stop_event.is_set():
801                    progress_tracker._log_sync_cancel()  # noqa: SLF001
802                    time.sleep(0.1)
803                    return
804
805                yield message
806
807        progress_tracker.log_read_complete()
808
809    def _peek_airbyte_message(
810        self,
811        message: AirbyteMessage,
812        *,
813        raise_on_error: bool = True,
814    ) -> None:
815        """Process an Airbyte message.
816
817        This method handles reading Airbyte messages and taking action, if needed, based on the
818        message type. For instance, log messages are logged, records are tallied, and errors are
819        raised as exceptions if `raise_on_error` is True.
820
821        Raises:
822            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
823        """
824        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
825
826    def _log_incremental_streams(
827        self,
828        *,
829        incremental_streams: set[str] | None = None,
830    ) -> None:
831        """Log the streams which are using incremental sync mode."""
832        log_message = (
833            "The following streams are currently using incremental sync:\n"
834            f"{incremental_streams}\n"
835            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
836        )
837        print(log_message, file=sys.stderr)
838
839    def read(
840        self,
841        cache: CacheBase | None = None,
842        *,
843        streams: str | list[str] | None = None,
844        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
845        force_full_refresh: bool = False,
846        skip_validation: bool = False,
847    ) -> ReadResult:
848        """Read from the connector and write to the cache.
849
850        Args:
851            cache: The cache to write to. If not set, a default cache will be used.
852            streams: Optional if already set. A list of stream names to select for reading. If set
853                to "*", all streams will be selected.
854            write_strategy: The strategy to use when writing to the cache. If a string, it must be
855                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
856                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
857                WriteStrategy.AUTO.
858            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
859                streams will be read in incremental mode if supported by the connector. This option
860                must be True when using the "replace" strategy.
861            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
862                running the connector. This can be helpful in debugging, when you want to send
863                configurations to the connector that otherwise might be rejected by JSON Schema
864                validation rules.
865        """
866        cache = cache or get_default_cache()
867        progress_tracker = ProgressTracker(
868            source=self,
869            cache=cache,
870            destination=None,
871            expected_streams=None,  # Will be set later
872        )
873
874        # Set up state provider if not in full refresh mode
875        if force_full_refresh:
876            state_provider: StateProviderBase | None = None
877        else:
878            state_provider = cache.get_state_provider(
879                source_name=self._name,
880            )
881        state_writer = cache.get_state_writer(source_name=self._name)
882
883        if streams:
884            self.select_streams(streams)
885
886        if not self._selected_stream_names:
887            raise exc.PyAirbyteNoStreamsSelectedError(
888                connector_name=self.name,
889                available_streams=self.get_available_streams(),
890            )
891
892        try:
893            result = self._read_to_cache(
894                cache=cache,
895                catalog_provider=CatalogProvider(
896                    self.get_configured_catalog(force_full_refresh=force_full_refresh)
897                ),
898                stream_names=self._selected_stream_names,
899                state_provider=state_provider,
900                state_writer=state_writer,
901                write_strategy=write_strategy,
902                force_full_refresh=force_full_refresh,
903                skip_validation=skip_validation,
904                progress_tracker=progress_tracker,
905            )
906        except exc.PyAirbyteInternalError as ex:
907            progress_tracker.log_failure(exception=ex)
908            raise exc.AirbyteConnectorFailedError(
909                connector_name=self.name,
910                log_text=self._last_log_messages,
911            ) from ex
912        except Exception as ex:
913            progress_tracker.log_failure(exception=ex)
914            raise
915
916        progress_tracker.log_success()
917        return result
918
919    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
920        self,
921        cache: CacheBase,
922        *,
923        catalog_provider: CatalogProvider,
924        stream_names: list[str],
925        state_provider: StateProviderBase | None,
926        state_writer: StateWriterBase | None,
927        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
928        force_full_refresh: bool = False,
929        skip_validation: bool = False,
930        progress_tracker: ProgressTracker,
931    ) -> ReadResult:
932        """Internal read method."""
933        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
934            warnings.warn(
935                message=(
936                    "Using `REPLACE` strategy without also setting `force_full_refresh=True` "
937                    "could result in data loss. "
938                    "To silence this warning, use the following: "
939                    'warnings.filterwarnings("ignore", '
940                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
941                ),
942                category=exc.PyAirbyteDataLossWarning,
943                stacklevel=1,
944            )
945        if isinstance(write_strategy, str):
946            try:
947                write_strategy = WriteStrategy(write_strategy)
948            except ValueError:
949                raise exc.PyAirbyteInputError(
950                    message="Invalid strategy",
951                    context={
952                        "write_strategy": write_strategy,
953                        "available_strategies": [
954                            s.value
955                            for s in WriteStrategy  # pyrefly: ignore[not-iterable]
956                        ],
957                    },
958                ) from None
959
960        # Run optional validation step
961        if not skip_validation:
962            self.validate_config()
963
964        # Log incremental stream if incremental streams are known
965        if state_provider and state_provider.known_stream_names:
966            # Retrieve set of the known streams support which support incremental sync
967            incremental_streams = (
968                set(self._get_incremental_stream_names())
969                & state_provider.known_stream_names
970                & set(self.get_selected_streams())
971            )
972            if incremental_streams:
973                self._log_incremental_streams(incremental_streams=incremental_streams)
974
975        airbyte_message_iterator = AirbyteMessageIterator(
976            self._read_with_catalog(
977                catalog=catalog_provider.configured_catalog,
978                state=state_provider,
979                progress_tracker=progress_tracker,
980            )
981        )
982        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
983            stdin=airbyte_message_iterator,
984            catalog_provider=catalog_provider,
985            write_strategy=write_strategy,
986            state_writer=state_writer,
987            progress_tracker=progress_tracker,
988        )
989
990        # Flush the WAL, if applicable
991        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
992
993        return ReadResult(
994            source_name=self.name,
995            progress_tracker=progress_tracker,
996            processed_streams=stream_names,
997            cache=cache,
998        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False, cursor_key_overrides: dict[str, str] | None = None, primary_key_overrides: dict[str, str | list[str]] | None = None)
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)

Deprecated. See select_streams().

def set_cursor_key(self, stream_name: str, cursor_key: str) -> None:
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key

Set the cursor for a single stream.

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_cursor_keys(self, **kwargs: str) -> None:
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})

Override the cursor key for one or more streams.

Usage:

source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_key(self, stream_name: str, primary_key: str | list[str]) -> None:
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )

Set the primary key for a single stream.

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_keys(self, **kwargs: str | list[str]) -> None:
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )

Override the primary keys for one or more streams.

This does not unset previously set primary keys.

Usage:

source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def select_all_streams(self) -> None:
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_available_streams(self) -> list[str]:
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

docs_url: str
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, force_full_refresh: bool = False) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424        *,
425        force_full_refresh: bool = False,
426    ) -> ConfiguredAirbyteCatalog:
427        """Get a configured catalog for the given streams.
428
429        If no streams are provided, the selected streams will be used. If no streams are selected,
430        all available streams will be used.
431
432        If '*' is provided, all available streams will be used.
433
434        If force_full_refresh is True, streams will be configured with full_refresh sync mode
435        when supported by the stream. Otherwise, incremental sync mode is used when supported.
436        """
437        selected_streams: list[str] = []
438        if streams is None:
439            selected_streams = self._selected_stream_names or self.get_available_streams()
440        elif streams == "*":
441            selected_streams = self.get_available_streams()
442        elif isinstance(streams, list):
443            selected_streams = streams
444        else:
445            raise exc.PyAirbyteInputError(
446                message="Invalid streams argument.",
447                input_value=streams,
448            )
449
450        def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
451            """Determine the sync mode for a stream based on force_full_refresh and support."""
452            # Use getattr to handle mocks or streams without supported_sync_modes attribute
453            supported_modes = getattr(stream, "supported_sync_modes", None)
454
455            if force_full_refresh:
456                # When force_full_refresh is True, prefer full_refresh if supported
457                if supported_modes and SyncMode.full_refresh in supported_modes:
458                    return SyncMode.full_refresh
459                # Fall back to incremental if full_refresh is not supported
460                return SyncMode.incremental
461
462            # Default behavior: preserve previous semantics (always incremental)
463            return SyncMode.incremental
464
465        return ConfiguredAirbyteCatalog(
466            streams=[
467                ConfiguredAirbyteStream(
468                    stream=stream,
469                    destination_sync_mode=DestinationSyncMode.overwrite,
470                    sync_mode=_get_sync_mode(stream),
471                    primary_key=(
472                        [self._primary_key_overrides[stream.name.lower()]]
473                        if stream.name.lower() in self._primary_key_overrides
474                        else stream.source_defined_primary_key
475                    ),
476                    cursor_field=(
477                        [self._cursor_key_overrides[stream.name.lower()]]
478                        if stream.name.lower() in self._cursor_key_overrides
479                        else stream.default_cursor_field
480                    ),
481                    # These are unused in the current implementation:
482                    generation_id=None,
483                    minimum_generation_id=None,
484                    sync_id=None,
485                )
486                for stream in self.discovered_catalog.streams
487                if stream.name in selected_streams
488            ],
489        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

If force_full_refresh is True, streams will be configured with full_refresh sync mode when supported by the stream. Otherwise, incremental sync mode is used when supported.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
491    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
492        """Return the JSON Schema spec for the specified stream name."""
493        catalog: AirbyteCatalog = self.discovered_catalog
494        found: list[AirbyteStream] = [
495            stream for stream in catalog.streams if stream.name == stream_name
496        ]
497
498        if len(found) == 0:
499            raise exc.PyAirbyteInputError(
500                message="Stream name does not exist in catalog.",
501                input_value=stream_name,
502            )
503
504        if len(found) > 1:
505            raise exc.PyAirbyteInternalError(
506                message="Duplicate streams found with the same name.",
507                context={
508                    "found_streams": found,
509                },
510            )
511
512        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, limit: int | None = None, stop_event: threading.Event | None = None, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
514    def get_records(
515        self,
516        stream: str,
517        *,
518        limit: int | None = None,
519        stop_event: threading.Event | None = None,
520        normalize_field_names: bool = False,
521        prune_undeclared_fields: bool = True,
522    ) -> LazyDataset:
523        """Read a stream from the connector.
524
525        Args:
526            stream: The name of the stream to read.
527            limit: The maximum number of records to read. If None, all records will be read.
528            stop_event: If set, the event can be triggered by the caller to stop reading records
529                and terminate the process.
530            normalize_field_names: When `True`, field names will be normalized to lower case, with
531                special characters removed. This matches the behavior of PyAirbyte caches and most
532                Airbyte destinations.
533            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
534                which generally matches the behavior of PyAirbyte caches and most Airbyte
535                destinations, specifically when you expect the catalog may be stale. You can disable
536                this to keep all fields in the records.
537
538        This involves the following steps:
539        * Call discover to get the catalog
540        * Generate a configured catalog that syncs the given stream in full_refresh mode
541        * Write the configured catalog and the config to a temporary file
542        * execute the connector with read --config <config_file> --catalog <catalog_file>
543        * Listen to the messages and return the first AirbyteRecordMessages that come along.
544        * Make sure the subprocess is killed when the function returns.
545        """
546        stop_event = stop_event or threading.Event()
547        configured_catalog = self.get_configured_catalog(streams=[stream])
548        if len(configured_catalog.streams) == 0:
549            raise exc.PyAirbyteInputError(
550                message="Requested stream does not exist.",
551                context={
552                    "stream": stream,
553                    "available_streams": self.get_available_streams(),
554                    "connector_name": self.name,
555                },
556            ) from KeyError(stream)
557
558        configured_stream = configured_catalog.streams[0]
559
560        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
561            yield from records
562
563        stream_record_handler = StreamRecordHandler(
564            json_schema=self.get_stream_json_schema(stream),
565            prune_extra_fields=prune_undeclared_fields,
566            normalize_keys=normalize_field_names,
567        )
568
569        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
570        progress_tracker = ProgressTracker(
571            ProgressStyle.PLAIN,
572            source=self,
573            cache=None,
574            destination=None,
575            expected_streams=[stream],
576        )
577
578        iterator: Iterator[dict[str, Any]] = (
579            StreamRecord.from_record_message(
580                record_message=record.record,
581                stream_record_handler=stream_record_handler,
582            )
583            for record in self._read_with_catalog(
584                catalog=configured_catalog,
585                progress_tracker=progress_tracker,
586                stop_event=stop_event,
587            )
588            if record.record
589        )
590        if limit is not None:
591            # Stop the iterator after the limit is reached
592            iterator = islice(iterator, limit)
593
594        return LazyDataset(
595            iterator,
596            stream_metadata=configured_stream,
597            stop_event=stop_event,
598            progress_tracker=progress_tracker,
599        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • limit: The maximum number of records to read. If None, all records will be read.
  • stop_event: If set, the event can be triggered by the caller to stop reading records and terminate the process.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
601    def get_documents(
602        self,
603        stream: str,
604        title_property: str | None = None,
605        content_properties: list[str] | None = None,
606        metadata_properties: list[str] | None = None,
607        *,
608        render_metadata: bool = False,
609    ) -> Iterable[Document]:
610        """Read a stream from the connector and return the records as documents.
611
612        If metadata_properties is not set, all properties that are not content will be added to
613        the metadata.
614
615        If render_metadata is True, metadata will be rendered in the document, as well as the
616        the main content.
617        """
618        return self.get_records(stream).to_documents(
619            title_property=title_property,
620            content_properties=content_properties,
621            metadata_properties=metadata_properties,
622            render_metadata=render_metadata,
623        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def get_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'raise') -> dict[str, airbyte.datasets._inmemory.InMemoryDataset | None]:
625    def get_samples(
626        self,
627        streams: list[str] | Literal["*"] | None = None,
628        *,
629        limit: int = 5,
630        on_error: Literal["raise", "ignore", "log"] = "raise",
631    ) -> dict[str, InMemoryDataset | None]:
632        """Get a sample of records from the given streams."""
633        if streams == "*":
634            streams = self.get_available_streams()
635        elif streams is None:
636            streams = self.get_selected_streams()
637
638        results: dict[str, InMemoryDataset | None] = {}
639        for stream in streams:
640            stop_event = threading.Event()
641            try:
642                results[stream] = self.get_records(
643                    stream,
644                    limit=limit,
645                    stop_event=stop_event,
646                ).fetch_all()
647                stop_event.set()
648            except Exception as ex:
649                results[stream] = None
650                if on_error == "ignore":
651                    continue
652
653                if on_error == "raise":
654                    raise ex from None
655
656                if on_error == "log":
657                    print(f"Error fetching sample for stream '{stream}': {ex}")
658
659        return results

Get a sample of records from the given streams.

def print_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'log') -> None:
661    def print_samples(
662        self,
663        streams: list[str] | Literal["*"] | None = None,
664        *,
665        limit: int = 5,
666        on_error: Literal["raise", "ignore", "log"] = "log",
667    ) -> None:
668        """Print a sample of records from the given streams."""
669        internal_cols: list[str] = [
670            AB_EXTRACTED_AT_COLUMN,
671            AB_META_COLUMN,
672            AB_RAW_ID_COLUMN,
673        ]
674        col_limit = 10
675        if streams == "*":
676            streams = self.get_available_streams()
677        elif streams is None:
678            streams = self.get_selected_streams()
679
680        console = Console()
681
682        console.print(
683            Markdown(
684                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
685                justify="left",
686            )
687        )
688
689        for stream in streams:
690            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
691            samples = self.get_samples(
692                streams=[stream],
693                limit=limit,
694                on_error=on_error,
695            )
696            dataset = samples[stream]
697
698            table = Table(
699                show_header=True,
700                show_lines=True,
701            )
702            if dataset is None:
703                console.print(
704                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
705                )
706                continue
707
708            if len(dataset.column_names) > col_limit:
709                # We'll pivot the columns so each column is its own row
710                table.add_column("Column Name")
711                for _ in range(len(dataset)):
712                    table.add_column(overflow="fold")
713                for col in dataset.column_names:
714                    table.add_row(
715                        Markdown(f"**`{col}`**"),
716                        *[escape(str(record[col])) for record in dataset],
717                    )
718            else:
719                for col in dataset.column_names:
720                    table.add_column(
721                        Markdown(f"**`{col}`**"),
722                        overflow="fold",
723                    )
724
725                for record in dataset:
726                    table.add_row(
727                        *[
728                            escape(str(val))
729                            for key, val in record.items()
730                            # Exclude internal Airbyte columns.
731                            if key not in internal_cols
732                        ]
733                    )
734
735            console.print(table)
736
737        console.print(Markdown("--------------"))

Print a sample of records from the given streams.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
839    def read(
840        self,
841        cache: CacheBase | None = None,
842        *,
843        streams: str | list[str] | None = None,
844        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
845        force_full_refresh: bool = False,
846        skip_validation: bool = False,
847    ) -> ReadResult:
848        """Read from the connector and write to the cache.
849
850        Args:
851            cache: The cache to write to. If not set, a default cache will be used.
852            streams: Optional if already set. A list of stream names to select for reading. If set
853                to "*", all streams will be selected.
854            write_strategy: The strategy to use when writing to the cache. If a string, it must be
855                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
856                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
857                WriteStrategy.AUTO.
858            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
859                streams will be read in incremental mode if supported by the connector. This option
860                must be True when using the "replace" strategy.
861            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
862                running the connector. This can be helpful in debugging, when you want to send
863                configurations to the connector that otherwise might be rejected by JSON Schema
864                validation rules.
865        """
866        cache = cache or get_default_cache()
867        progress_tracker = ProgressTracker(
868            source=self,
869            cache=cache,
870            destination=None,
871            expected_streams=None,  # Will be set later
872        )
873
874        # Set up state provider if not in full refresh mode
875        if force_full_refresh:
876            state_provider: StateProviderBase | None = None
877        else:
878            state_provider = cache.get_state_provider(
879                source_name=self._name,
880            )
881        state_writer = cache.get_state_writer(source_name=self._name)
882
883        if streams:
884            self.select_streams(streams)
885
886        if not self._selected_stream_names:
887            raise exc.PyAirbyteNoStreamsSelectedError(
888                connector_name=self.name,
889                available_streams=self.get_available_streams(),
890            )
891
892        try:
893            result = self._read_to_cache(
894                cache=cache,
895                catalog_provider=CatalogProvider(
896                    self.get_configured_catalog(force_full_refresh=force_full_refresh)
897                ),
898                stream_names=self._selected_stream_names,
899                state_provider=state_provider,
900                state_writer=state_writer,
901                write_strategy=write_strategy,
902                force_full_refresh=force_full_refresh,
903                skip_validation=skip_validation,
904                progress_tracker=progress_tracker,
905            )
906        except exc.PyAirbyteInternalError as ex:
907            progress_tracker.log_failure(exception=ex)
908            raise exc.AirbyteConnectorFailedError(
909                connector_name=self.name,
910                log_text=self._last_log_messages,
911            ) from ex
912        except Exception as ex:
913            progress_tracker.log_failure(exception=ex)
914            raise
915
916        progress_tracker.log_success()
917        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
class ConnectorMetadata(pydantic.main.BaseModel):
 78class ConnectorMetadata(BaseModel):
 79    """Metadata for a connector."""
 80
 81    name: str
 82    """Connector name. For example, "source-google-sheets"."""
 83
 84    latest_available_version: str | None
 85    """The latest available version of the connector."""
 86
 87    pypi_package_name: str | None
 88    """The name of the PyPI package for the connector, if it exists."""
 89
 90    language: Language | None
 91    """The language of the connector."""
 92
 93    install_types: set[InstallType]
 94    """The supported install types for the connector."""
 95
 96    suggested_streams: list[str] | None = None
 97    """A list of suggested streams for the connector, if available."""
 98
 99    @property
100    def default_install_type(self) -> InstallType:
101        """Return the default install type for the connector."""
102        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
103            return InstallType.YAML
104
105        if InstallType.PYTHON in self.install_types:
106            return InstallType.PYTHON
107
108        # Else: Java or Docker
109        return InstallType.DOCKER

Metadata for a connector.

name: str = PydanticUndefined

Connector name. For example, "source-google-sheets".

latest_available_version: str | None = PydanticUndefined

The latest available version of the connector.

pypi_package_name: str | None = PydanticUndefined

The name of the PyPI package for the connector, if it exists.

language: airbyte.registry.Language | None = PydanticUndefined

The language of the connector.

install_types: set[airbyte.registry.InstallType] = PydanticUndefined

The supported install types for the connector.

suggested_streams: list[str] | None = None

A list of suggested streams for the connector, if available.

default_install_type: airbyte.registry.InstallType
 99    @property
100    def default_install_type(self) -> InstallType:
101        """Return the default install type for the connector."""
102        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
103            return InstallType.YAML
104
105        if InstallType.PYTHON in self.install_types:
106            return InstallType.PYTHON
107
108        # Else: Java or Docker
109        return InstallType.DOCKER

Return the default install type for the connector.