airbyte.sources

Sources connectors module for PyAirbyte.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Sources connectors module for PyAirbyte."""
 3
 4from __future__ import annotations
 5
 6from typing import TYPE_CHECKING
 7
 8from airbyte.sources.base import Source
 9from airbyte.sources.registry import (
10    ConnectorMetadata,
11    get_available_connectors,
12    get_connector_metadata,
13)
14from airbyte.sources.util import (
15    get_benchmark_source,
16    get_source,
17)
18
19
20# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
21if TYPE_CHECKING:
22    # ruff: noqa: TC004  # imports used for more than type checking
23    from airbyte.sources import (
24        base,
25        registry,
26        util,
27    )
28
29__all__ = [
30    # Submodules
31    "base",
32    "registry",
33    "util",
34    # Factories
35    "get_source",
36    "get_benchmark_source",
37    # Helper Functions
38    "get_available_connectors",
39    "get_connector_metadata",
40    # Classes
41    "Source",
42    "ConnectorMetadata",
43]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, use_python: bool | pathlib.Path | str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    use_python: bool | Path | str | None = None,
 56    pip_url: str | None = None,
 57    local_executable: Path | str | None = None,
 58    docker_image: bool | str | None = None,
 59    use_host_network: bool = False,
 60    source_manifest: bool | dict | Path | str | None = None,
 61    install_if_missing: bool = True,
 62    install_root: Path | None = None,
 63) -> Source:
 64    """Get a connector by name and version.
 65
 66    If an explicit install or execution method is requested (e.g. `local_executable`,
 67    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 68
 69    Otherwise, an appropriate method will be selected based on the available connector metadata:
 70    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 71       will be downloaded and used to to execute the connector.
 72    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 73    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 74       will be executed using Docker.
 75
 76    Args:
 77        name: connector name
 78        config: connector config - if not provided, you need to set it later via the set_config
 79            method.
 80        config_change_callback: callback function to be called when the connector config changes.
 81        streams: list of stream names to select for reading. If set to "*", all streams will be
 82            selected. If not provided, you can set it later via the `select_streams()` or
 83            `select_all_streams()` method.
 84        version: connector version - if not provided, the currently installed version will be used.
 85            If no version is installed, the latest available version will be used. The version can
 86            also be set to "latest" to force the use of the latest available version.
 87        use_python: (Optional.) Python interpreter specification:
 88            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
 89            - False: Use Docker instead.
 90            - Path: Use interpreter at this path.
 91            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
 92                installed, it will be installed by uv. (This generally adds less than 3 seconds
 93                to install times.)
 94        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 95            connector name.
 96        local_executable: If set, the connector will be assumed to already be installed and will be
 97            executed using this path or executable name. Otherwise, the connector will be installed
 98            automatically in a virtual environment.
 99        docker_image: If set, the connector will be executed using Docker. You can specify `True`
100            to use the default image for the connector, or you can specify a custom image name.
101            If `version` is specified and your image name does not already contain a tag
102            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
103        use_host_network: If set, along with docker_image, the connector will be executed using
104            the host network. This is useful for connectors that need to access resources on
105            the host machine, such as a local database. This parameter is ignored when
106            `docker_image` is not set.
107        source_manifest: If set, the connector will be executed based on a declarative YAML
108            source definition. This input can be `True` to attempt to auto-download a YAML spec,
109            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
110            the local file system, or `str` to pull the definition from a web URL.
111        install_if_missing: Whether to install the connector if it is not available locally. This
112            parameter is ignored when `local_executable` or `source_manifest` are set.
113        install_root: (Optional.) The root directory where the virtual environment will be
114            created. If not provided, the current working directory will be used.
115    """
116    return Source(
117        name=name,
118        config=config,
119        config_change_callback=config_change_callback,
120        streams=streams,
121        executor=get_connector_executor(
122            name=name,
123            version=version,
124            use_python=use_python,
125            pip_url=pip_url,
126            local_executable=local_executable,
127            docker_image=docker_image,
128            use_host_network=use_host_network,
129            source_manifest=source_manifest,
130            install_if_missing=install_if_missing,
131            install_root=install_root,
132        ),
133    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • use_python: (Optional.) Python interpreter specification:
    • True: Use current Python interpreter. (Inferred if pip_url is set.)
    • False: Use Docker instead.
    • Path: Use interpreter at this path.
    • str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def get_benchmark_source( num_records: int | str = '5e5', *, install_if_missing: bool = True) -> Source:
136def get_benchmark_source(
137    num_records: int | str = "5e5",
138    *,
139    install_if_missing: bool = True,
140) -> Source:
141    """Get a source for benchmarking.
142
143    This source will generate dummy records for performance benchmarking purposes.
144    You can specify the number of records to generate using the `num_records` parameter.
145    The `num_records` parameter can be an integer or a string in scientific notation.
146    For example, `"5e6"` will generate 5 million records. If underscores are providing
147    within a numeric a string, they will be ignored.
148
149    Args:
150        num_records: The number of records to generate. Defaults to "5e5", or
151            500,000 records.
152            Can be an integer (`1000`) or a string in scientific notation.
153            For example, `"5e6"` will generate 5 million records.
154        install_if_missing: Whether to install the source if it is not available locally.
155
156    Returns:
157        Source: The source object for benchmarking.
158    """
159    if isinstance(num_records, str):
160        try:
161            num_records = int(Decimal(num_records.replace("_", "")))
162        except InvalidOperation as ex:
163            raise PyAirbyteInputError(
164                message="Invalid number format.",
165                original_exception=ex,
166                input_value=str(num_records),
167            ) from None
168
169    return get_source(
170        name="source-e2e-test",
171        docker_image=True,
172        # docker_image="airbyte/source-e2e-test:latest",
173        config={
174            "type": "BENCHMARK",
175            "schema": "FIVE_STRING_COLUMNS",
176            "terminationCondition": {
177                "type": "MAX_RECORDS",
178                "max": num_records,
179            },
180        },
181        streams="*",
182        install_if_missing=install_if_missing,
183    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records: The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
  • install_if_missing: Whether to install the source if it is not available locally.
Returns:

Source: The source object for benchmarking.

def get_available_connectors( install_type: airbyte.sources.registry.InstallType | str | None = None) -> list[str]:
222def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
223    """Return a list of all available connectors.
224
225    Connectors will be returned in alphabetical order, with the standard prefix "source-".
226    """
227    if install_type is None:
228        # No install type specified. Filter for whatever is runnable.
229        if is_docker_installed():
230            logger.info("Docker is detected. Returning all connectors.")
231            # If Docker is available, return all connectors.
232            return sorted(conn.name for conn in _get_registry_cache().values())
233
234        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
235
236        # If Docker is not available, return only Python and Manifest-based connectors.
237        return sorted(
238            conn.name
239            for conn in _get_registry_cache().values()
240            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
241        )
242
243    if not isinstance(install_type, InstallType):
244        install_type = InstallType(install_type)
245
246    if install_type == InstallType.PYTHON:
247        return sorted(
248            conn.name
249            for conn in _get_registry_cache().values()
250            if conn.pypi_package_name is not None
251        )
252
253    if install_type == InstallType.JAVA:
254        warnings.warn(
255            message="Java connectors are not yet supported.",
256            stacklevel=2,
257        )
258        return sorted(
259            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
260        )
261
262    if install_type == InstallType.DOCKER:
263        return sorted(conn.name for conn in _get_registry_cache().values())
264
265    if install_type == InstallType.YAML:
266        return sorted(
267            conn.name
268            for conn in _get_registry_cache().values()
269            if InstallType.YAML in conn.install_types
270        )
271
272    # pragma: no cover  # Should never be reached.
273    raise exc.PyAirbyteInputError(
274        message="Invalid install type.",
275        context={
276            "install_type": install_type,
277        },
278    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

def get_connector_metadata(name: str) -> ConnectorMetadata | None:
192def get_connector_metadata(name: str) -> ConnectorMetadata | None:
193    """Check the cache for the connector.
194
195    If the cache is empty, populate by calling update_cache.
196    """
197    registry_url = _get_registry_url()
198
199    if _is_registry_disabled(registry_url):
200        return None
201
202    cache = copy(_get_registry_cache())
203
204    if not cache:
205        raise exc.PyAirbyteInternalError(
206            message="Connector registry could not be loaded.",
207            context={
208                "registry_url": _get_registry_url(),
209            },
210        )
211    if name not in cache:
212        raise exc.AirbyteConnectorNotRegisteredError(
213            connector_name=name,
214            context={
215                "registry_url": _get_registry_url(),
216                "available_connectors": get_available_connectors(),
217            },
218        )
219    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

class Source(airbyte._connector_base.ConnectorBase):
 68class Source(ConnectorBase):  # noqa: PLR0904
 69    """A class representing a source that can be called."""
 70
 71    connector_type = "source"
 72
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)
118
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)
128
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key
146
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
169
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )
189
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )
216
217    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
218        """Logs a warning message indicating stream selection which are not selected yet."""
219        if streams == "*":
220            print(
221                "Warning: Config is not set yet. All streams will be selected after config is set.",
222                file=sys.stderr,
223            )
224        else:
225            print(
226                "Warning: Config is not set yet. "
227                f"Streams to be selected after config is set: {streams}",
228                file=sys.stderr,
229            )
230
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()
243
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams
274
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names
281
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []
303
304    def _discover(self) -> AirbyteCatalog:
305        """Call discover on the connector.
306
307        This involves the following steps:
308        - Write the config to a temporary file
309        - execute the connector with discover --config <config_file>
310        - Listen to the messages and return the first AirbyteCatalog that comes along.
311        - Make sure the subprocess is killed when the function returns.
312        """
313        with as_temp_files([self._hydrated_config]) as [config_file]:
314            for msg in self._execute(["discover", "--config", config_file]):
315                if msg.type == Type.CATALOG and msg.catalog:
316                    return msg.catalog
317            raise exc.AirbyteConnectorMissingCatalogError(
318                connector_name=self.name,
319                log_text=self._last_log_messages,
320            )
321
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]
325
326    def _get_incremental_stream_names(self) -> list[str]:
327        """Get the name of streams that support incremental sync."""
328        return [
329            stream.name
330            for stream in self.discovered_catalog.streams
331            if SyncMode.incremental in stream.supported_sync_modes
332        ]
333
334    @override
335    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
336        """Call spec on the connector.
337
338        This involves the following steps:
339        * execute the connector with spec
340        * Listen to the messages and return the first AirbyteCatalog that comes along.
341        * Make sure the subprocess is killed when the function returns.
342        """
343        if force_refresh or self._spec is None:
344            for msg in self._execute(["spec"]):
345                if msg.type == Type.SPEC and msg.spec:
346                    self._spec = msg.spec
347                    break
348
349        if self._spec:
350            return self._spec
351
352        raise exc.AirbyteConnectorMissingSpecError(
353            connector_name=self.name,
354            log_text=self._last_log_messages,
355        )
356
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification
368
369    @property
370    def _yaml_spec(self) -> str:
371        """Get the spec as a yaml string.
372
373        For now, the primary use case is for writing and debugging a valid config for a source.
374
375        This is private for now because we probably want better polish before exposing this
376        as a stable interface. This will also get easier when we have docs links with this info
377        for each connector.
378        """
379        spec_obj: ConnectorSpecification = self._get_spec()
380        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
381        # convert to a yaml string
382        return yaml.dump(spec_dict)
383
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )
390
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog
401
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)
420
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424    ) -> ConfiguredAirbyteCatalog:
425        """Get a configured catalog for the given streams.
426
427        If no streams are provided, the selected streams will be used. If no streams are selected,
428        all available streams will be used.
429
430        If '*' is provided, all available streams will be used.
431        """
432        selected_streams: list[str] = []
433        if streams is None:
434            selected_streams = self._selected_stream_names or self.get_available_streams()
435        elif streams == "*":
436            selected_streams = self.get_available_streams()
437        elif isinstance(streams, list):
438            selected_streams = streams
439        else:
440            raise exc.PyAirbyteInputError(
441                message="Invalid streams argument.",
442                input_value=streams,
443            )
444
445        return ConfiguredAirbyteCatalog(
446            streams=[
447                ConfiguredAirbyteStream(
448                    stream=stream,
449                    destination_sync_mode=DestinationSyncMode.overwrite,
450                    sync_mode=SyncMode.incremental,
451                    primary_key=(
452                        [self._primary_key_overrides[stream.name.lower()]]
453                        if stream.name.lower() in self._primary_key_overrides
454                        else stream.source_defined_primary_key
455                    ),
456                    cursor_field=(
457                        [self._cursor_key_overrides[stream.name.lower()]]
458                        if stream.name.lower() in self._cursor_key_overrides
459                        else stream.default_cursor_field
460                    ),
461                    # These are unused in the current implementation:
462                    generation_id=None,
463                    minimum_generation_id=None,
464                    sync_id=None,
465                )
466                for stream in self.discovered_catalog.streams
467                if stream.name in selected_streams
468            ],
469        )
470
471    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
472        """Return the JSON Schema spec for the specified stream name."""
473        catalog: AirbyteCatalog = self.discovered_catalog
474        found: list[AirbyteStream] = [
475            stream for stream in catalog.streams if stream.name == stream_name
476        ]
477
478        if len(found) == 0:
479            raise exc.PyAirbyteInputError(
480                message="Stream name does not exist in catalog.",
481                input_value=stream_name,
482            )
483
484        if len(found) > 1:
485            raise exc.PyAirbyteInternalError(
486                message="Duplicate streams found with the same name.",
487                context={
488                    "found_streams": found,
489                },
490            )
491
492        return found[0].json_schema
493
494    def get_records(
495        self,
496        stream: str,
497        *,
498        limit: int | None = None,
499        stop_event: threading.Event | None = None,
500        normalize_field_names: bool = False,
501        prune_undeclared_fields: bool = True,
502    ) -> LazyDataset:
503        """Read a stream from the connector.
504
505        Args:
506            stream: The name of the stream to read.
507            limit: The maximum number of records to read. If None, all records will be read.
508            stop_event: If set, the event can be triggered by the caller to stop reading records
509                and terminate the process.
510            normalize_field_names: When `True`, field names will be normalized to lower case, with
511                special characters removed. This matches the behavior of PyAirbyte caches and most
512                Airbyte destinations.
513            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
514                which generally matches the behavior of PyAirbyte caches and most Airbyte
515                destinations, specifically when you expect the catalog may be stale. You can disable
516                this to keep all fields in the records.
517
518        This involves the following steps:
519        * Call discover to get the catalog
520        * Generate a configured catalog that syncs the given stream in full_refresh mode
521        * Write the configured catalog and the config to a temporary file
522        * execute the connector with read --config <config_file> --catalog <catalog_file>
523        * Listen to the messages and return the first AirbyteRecordMessages that come along.
524        * Make sure the subprocess is killed when the function returns.
525        """
526        stop_event = stop_event or threading.Event()
527        configured_catalog = self.get_configured_catalog(streams=[stream])
528        if len(configured_catalog.streams) == 0:
529            raise exc.PyAirbyteInputError(
530                message="Requested stream does not exist.",
531                context={
532                    "stream": stream,
533                    "available_streams": self.get_available_streams(),
534                    "connector_name": self.name,
535                },
536            ) from KeyError(stream)
537
538        configured_stream = configured_catalog.streams[0]
539
540        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
541            yield from records
542
543        stream_record_handler = StreamRecordHandler(
544            json_schema=self.get_stream_json_schema(stream),
545            prune_extra_fields=prune_undeclared_fields,
546            normalize_keys=normalize_field_names,
547        )
548
549        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
550        progress_tracker = ProgressTracker(
551            ProgressStyle.PLAIN,
552            source=self,
553            cache=None,
554            destination=None,
555            expected_streams=[stream],
556        )
557
558        iterator: Iterator[dict[str, Any]] = (
559            StreamRecord.from_record_message(
560                record_message=record.record,
561                stream_record_handler=stream_record_handler,
562            )
563            for record in self._read_with_catalog(
564                catalog=configured_catalog,
565                progress_tracker=progress_tracker,
566                stop_event=stop_event,
567            )
568            if record.record
569        )
570        if limit is not None:
571            # Stop the iterator after the limit is reached
572            iterator = islice(iterator, limit)
573
574        return LazyDataset(
575            iterator,
576            stream_metadata=configured_stream,
577            stop_event=stop_event,
578            progress_tracker=progress_tracker,
579        )
580
581    def get_documents(
582        self,
583        stream: str,
584        title_property: str | None = None,
585        content_properties: list[str] | None = None,
586        metadata_properties: list[str] | None = None,
587        *,
588        render_metadata: bool = False,
589    ) -> Iterable[Document]:
590        """Read a stream from the connector and return the records as documents.
591
592        If metadata_properties is not set, all properties that are not content will be added to
593        the metadata.
594
595        If render_metadata is True, metadata will be rendered in the document, as well as the
596        the main content.
597        """
598        return self.get_records(stream).to_documents(
599            title_property=title_property,
600            content_properties=content_properties,
601            metadata_properties=metadata_properties,
602            render_metadata=render_metadata,
603        )
604
605    def get_samples(
606        self,
607        streams: list[str] | Literal["*"] | None = None,
608        *,
609        limit: int = 5,
610        on_error: Literal["raise", "ignore", "log"] = "raise",
611    ) -> dict[str, InMemoryDataset | None]:
612        """Get a sample of records from the given streams."""
613        if streams == "*":
614            streams = self.get_available_streams()
615        elif streams is None:
616            streams = self.get_selected_streams()
617
618        results: dict[str, InMemoryDataset | None] = {}
619        for stream in streams:
620            stop_event = threading.Event()
621            try:
622                results[stream] = self.get_records(
623                    stream,
624                    limit=limit,
625                    stop_event=stop_event,
626                ).fetch_all()
627                stop_event.set()
628            except Exception as ex:
629                results[stream] = None
630                if on_error == "ignore":
631                    continue
632
633                if on_error == "raise":
634                    raise ex from None
635
636                if on_error == "log":
637                    print(f"Error fetching sample for stream '{stream}': {ex}")
638
639        return results
640
641    def print_samples(
642        self,
643        streams: list[str] | Literal["*"] | None = None,
644        *,
645        limit: int = 5,
646        on_error: Literal["raise", "ignore", "log"] = "log",
647    ) -> None:
648        """Print a sample of records from the given streams."""
649        internal_cols: list[str] = [
650            AB_EXTRACTED_AT_COLUMN,
651            AB_META_COLUMN,
652            AB_RAW_ID_COLUMN,
653        ]
654        col_limit = 10
655        if streams == "*":
656            streams = self.get_available_streams()
657        elif streams is None:
658            streams = self.get_selected_streams()
659
660        console = Console()
661
662        console.print(
663            Markdown(
664                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
665                justify="left",
666            )
667        )
668
669        for stream in streams:
670            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
671            samples = self.get_samples(
672                streams=[stream],
673                limit=limit,
674                on_error=on_error,
675            )
676            dataset = samples[stream]
677
678            table = Table(
679                show_header=True,
680                show_lines=True,
681            )
682            if dataset is None:
683                console.print(
684                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
685                )
686                continue
687
688            if len(dataset.column_names) > col_limit:
689                # We'll pivot the columns so each column is its own row
690                table.add_column("Column Name")
691                for _ in range(len(dataset)):
692                    table.add_column(overflow="fold")
693                for col in dataset.column_names:
694                    table.add_row(
695                        Markdown(f"**`{col}`**"),
696                        *[escape(str(record[col])) for record in dataset],
697                    )
698            else:
699                for col in dataset.column_names:
700                    table.add_column(
701                        Markdown(f"**`{col}`**"),
702                        overflow="fold",
703                    )
704
705                for record in dataset:
706                    table.add_row(
707                        *[
708                            escape(str(val))
709                            for key, val in record.items()
710                            # Exclude internal Airbyte columns.
711                            if key not in internal_cols
712                        ]
713                    )
714
715            console.print(table)
716
717        console.print(Markdown("--------------"))
718
719    def _get_airbyte_message_iterator(
720        self,
721        *,
722        streams: Literal["*"] | list[str] | None = None,
723        state_provider: StateProviderBase | None = None,
724        progress_tracker: ProgressTracker,
725        force_full_refresh: bool = False,
726    ) -> AirbyteMessageIterator:
727        """Get an AirbyteMessageIterator for this source."""
728        return AirbyteMessageIterator(
729            self._read_with_catalog(
730                catalog=self.get_configured_catalog(streams=streams),
731                state=state_provider if not force_full_refresh else None,
732                progress_tracker=progress_tracker,
733            )
734        )
735
736    def _read_with_catalog(
737        self,
738        catalog: ConfiguredAirbyteCatalog,
739        progress_tracker: ProgressTracker,
740        *,
741        state: StateProviderBase | None = None,
742        stop_event: threading.Event | None = None,
743    ) -> Generator[AirbyteMessage, None, None]:
744        """Call read on the connector.
745
746        This involves the following steps:
747        * Write the config to a temporary file
748        * execute the connector with read --config <config_file> --catalog <catalog_file>
749        * Listen to the messages and return the AirbyteRecordMessages that come along.
750        * Send out telemetry on the performed sync (with information about which source was used and
751          the type of the cache)
752        """
753        with as_temp_files(
754            [
755                self._hydrated_config,
756                catalog.model_dump_json(),
757                state.to_state_input_file_text() if state else "[]",
758            ]
759        ) as [
760            config_file,
761            catalog_file,
762            state_file,
763        ]:
764            message_generator = self._execute(
765                [
766                    "read",
767                    "--config",
768                    config_file,
769                    "--catalog",
770                    catalog_file,
771                    "--state",
772                    state_file,
773                ],
774                progress_tracker=progress_tracker,
775            )
776            for message in progress_tracker.tally_records_read(message_generator):
777                if stop_event and stop_event.is_set():
778                    progress_tracker._log_sync_cancel()  # noqa: SLF001
779                    time.sleep(0.1)
780                    return
781
782                yield message
783
784        progress_tracker.log_read_complete()
785
786    def _peek_airbyte_message(
787        self,
788        message: AirbyteMessage,
789        *,
790        raise_on_error: bool = True,
791    ) -> None:
792        """Process an Airbyte message.
793
794        This method handles reading Airbyte messages and taking action, if needed, based on the
795        message type. For instance, log messages are logged, records are tallied, and errors are
796        raised as exceptions if `raise_on_error` is True.
797
798        Raises:
799            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
800        """
801        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
802
803    def _log_incremental_streams(
804        self,
805        *,
806        incremental_streams: set[str] | None = None,
807    ) -> None:
808        """Log the streams which are using incremental sync mode."""
809        log_message = (
810            "The following streams are currently using incremental sync:\n"
811            f"{incremental_streams}\n"
812            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
813        )
814        print(log_message, file=sys.stderr)
815
816    def read(
817        self,
818        cache: CacheBase | None = None,
819        *,
820        streams: str | list[str] | None = None,
821        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
822        force_full_refresh: bool = False,
823        skip_validation: bool = False,
824    ) -> ReadResult:
825        """Read from the connector and write to the cache.
826
827        Args:
828            cache: The cache to write to. If not set, a default cache will be used.
829            streams: Optional if already set. A list of stream names to select for reading. If set
830                to "*", all streams will be selected.
831            write_strategy: The strategy to use when writing to the cache. If a string, it must be
832                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
833                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
834                WriteStrategy.AUTO.
835            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
836                streams will be read in incremental mode if supported by the connector. This option
837                must be True when using the "replace" strategy.
838            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
839                running the connector. This can be helpful in debugging, when you want to send
840                configurations to the connector that otherwise might be rejected by JSON Schema
841                validation rules.
842        """
843        cache = cache or get_default_cache()
844        progress_tracker = ProgressTracker(
845            source=self,
846            cache=cache,
847            destination=None,
848            expected_streams=None,  # Will be set later
849        )
850
851        # Set up state provider if not in full refresh mode
852        if force_full_refresh:
853            state_provider: StateProviderBase | None = None
854        else:
855            state_provider = cache.get_state_provider(
856                source_name=self._name,
857            )
858        state_writer = cache.get_state_writer(source_name=self._name)
859
860        if streams:
861            self.select_streams(streams)
862
863        if not self._selected_stream_names:
864            raise exc.PyAirbyteNoStreamsSelectedError(
865                connector_name=self.name,
866                available_streams=self.get_available_streams(),
867            )
868
869        try:
870            result = self._read_to_cache(
871                cache=cache,
872                catalog_provider=CatalogProvider(self.configured_catalog),
873                stream_names=self._selected_stream_names,
874                state_provider=state_provider,
875                state_writer=state_writer,
876                write_strategy=write_strategy,
877                force_full_refresh=force_full_refresh,
878                skip_validation=skip_validation,
879                progress_tracker=progress_tracker,
880            )
881        except exc.PyAirbyteInternalError as ex:
882            progress_tracker.log_failure(exception=ex)
883            raise exc.AirbyteConnectorFailedError(
884                connector_name=self.name,
885                log_text=self._last_log_messages,
886            ) from ex
887        except Exception as ex:
888            progress_tracker.log_failure(exception=ex)
889            raise
890
891        progress_tracker.log_success()
892        return result
893
894    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
895        self,
896        cache: CacheBase,
897        *,
898        catalog_provider: CatalogProvider,
899        stream_names: list[str],
900        state_provider: StateProviderBase | None,
901        state_writer: StateWriterBase | None,
902        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
903        force_full_refresh: bool = False,
904        skip_validation: bool = False,
905        progress_tracker: ProgressTracker,
906    ) -> ReadResult:
907        """Internal read method."""
908        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
909            warnings.warn(
910                message=(
911                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
912                    "could result in data loss. "
913                    "To silence this warning, use the following: "
914                    'warnings.filterwarnings("ignore", '
915                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
916                ),
917                category=exc.PyAirbyteDataLossWarning,
918                stacklevel=1,
919            )
920        if isinstance(write_strategy, str):
921            try:
922                write_strategy = WriteStrategy(write_strategy)
923            except ValueError:
924                raise exc.PyAirbyteInputError(
925                    message="Invalid strategy",
926                    context={
927                        "write_strategy": write_strategy,
928                        "available_strategies": [s.value for s in WriteStrategy],
929                    },
930                ) from None
931
932        # Run optional validation step
933        if not skip_validation:
934            self.validate_config()
935
936        # Log incremental stream if incremental streams are known
937        if state_provider and state_provider.known_stream_names:
938            # Retrieve set of the known streams support which support incremental sync
939            incremental_streams = (
940                set(self._get_incremental_stream_names())
941                & state_provider.known_stream_names
942                & set(self.get_selected_streams())
943            )
944            if incremental_streams:
945                self._log_incremental_streams(incremental_streams=incremental_streams)
946
947        airbyte_message_iterator = AirbyteMessageIterator(
948            self._read_with_catalog(
949                catalog=catalog_provider.configured_catalog,
950                state=state_provider,
951                progress_tracker=progress_tracker,
952            )
953        )
954        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
955            stdin=airbyte_message_iterator,
956            catalog_provider=catalog_provider,
957            write_strategy=write_strategy,
958            state_writer=state_writer,
959            progress_tracker=progress_tracker,
960        )
961
962        # Flush the WAL, if applicable
963        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
964
965        return ReadResult(
966            source_name=self.name,
967            progress_tracker=progress_tracker,
968            processed_streams=stream_names,
969            cache=cache,
970        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False, cursor_key_overrides: dict[str, str] | None = None, primary_key_overrides: dict[str, str | list[str]] | None = None)
 73    def __init__(
 74        self,
 75        executor: Executor,
 76        name: str,
 77        config: dict[str, Any] | None = None,
 78        *,
 79        config_change_callback: ConfigChangeCallback | None = None,
 80        streams: str | list[str] | None = None,
 81        validate: bool = False,
 82        cursor_key_overrides: dict[str, str] | None = None,
 83        primary_key_overrides: dict[str, str | list[str]] | None = None,
 84    ) -> None:
 85        """Initialize the source.
 86
 87        If config is provided, it will be validated against the spec if validate is True.
 88        """
 89        self._to_be_selected_streams: list[str] | str = []
 90        """Used to hold selection criteria before catalog is known."""
 91
 92        super().__init__(
 93            executor=executor,
 94            name=name,
 95            config=config,
 96            config_change_callback=config_change_callback,
 97            validate=validate,
 98        )
 99        self._config_dict: dict[str, Any] | None = None
100        self._last_log_messages: list[str] = []
101        self._discovered_catalog: AirbyteCatalog | None = None
102        self._selected_stream_names: list[str] = []
103
104        self._cursor_key_overrides: dict[str, str] = {}
105        """A mapping of lower-cased stream names to cursor key overrides."""
106
107        self._primary_key_overrides: dict[str, list[str]] = {}
108        """A mapping of lower-cased stream names to primary key overrides."""
109
110        if config is not None:
111            self.set_config(config, validate=validate)
112        if streams is not None:
113            self.select_streams(streams)
114        if cursor_key_overrides is not None:
115            self.set_cursor_keys(**cursor_key_overrides)
116        if primary_key_overrides is not None:
117            self.set_primary_keys(**primary_key_overrides)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
119    def set_streams(self, streams: list[str]) -> None:
120        """Deprecated. See select_streams()."""
121        warnings.warn(
122            "The 'set_streams' method is deprecated and will be removed in a future version. "
123            "Please use the 'select_streams' method instead.",
124            DeprecationWarning,
125            stacklevel=2,
126        )
127        self.select_streams(streams)

Deprecated. See select_streams().

def set_cursor_key(self, stream_name: str, cursor_key: str) -> None:
129    def set_cursor_key(
130        self,
131        stream_name: str,
132        cursor_key: str,
133    ) -> None:
134        """Set the cursor for a single stream.
135
136        Note:
137        - This does not unset previously set cursors.
138        - The cursor key must be a single field name.
139        - Not all streams support custom cursors. If a stream does not support custom cursors,
140          the override may be ignored.
141        - Stream names are case insensitive, while field names are case sensitive.
142        - Stream names are not validated by PyAirbyte. If the stream name
143          does not exist in the catalog, the override may be ignored.
144        """
145        self._cursor_key_overrides[stream_name.lower()] = cursor_key

Set the cursor for a single stream.

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_cursor_keys(self, **kwargs: str) -> None:
147    def set_cursor_keys(
148        self,
149        **kwargs: str,
150    ) -> None:
151        """Override the cursor key for one or more streams.
152
153        Usage:
154            source.set_cursor_keys(
155                stream1="cursor1",
156                stream2="cursor2",
157            )
158
159        Note:
160        - This does not unset previously set cursors.
161        - The cursor key must be a single field name.
162        - Not all streams support custom cursors. If a stream does not support custom cursors,
163          the override may be ignored.
164        - Stream names are case insensitive, while field names are case sensitive.
165        - Stream names are not validated by PyAirbyte. If the stream name
166          does not exist in the catalog, the override may be ignored.
167        """
168        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})

Override the cursor key for one or more streams.

Usage:

source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_key(self, stream_name: str, primary_key: str | list[str]) -> None:
170    def set_primary_key(
171        self,
172        stream_name: str,
173        primary_key: str | list[str],
174    ) -> None:
175        """Set the primary key for a single stream.
176
177        Note:
178        - This does not unset previously set primary keys.
179        - The primary key must be a single field name or a list of field names.
180        - Not all streams support overriding primary keys. If a stream does not support overriding
181          primary keys, the override may be ignored.
182        - Stream names are case insensitive, while field names are case sensitive.
183        - Stream names are not validated by PyAirbyte. If the stream name
184          does not exist in the catalog, the override may be ignored.
185        """
186        self._primary_key_overrides[stream_name.lower()] = (
187            primary_key if isinstance(primary_key, list) else [primary_key]
188        )

Set the primary key for a single stream.

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_keys(self, **kwargs: str | list[str]) -> None:
190    def set_primary_keys(
191        self,
192        **kwargs: str | list[str],
193    ) -> None:
194        """Override the primary keys for one or more streams.
195
196        This does not unset previously set primary keys.
197
198        Usage:
199            source.set_primary_keys(
200                stream1="pk1",
201                stream2=["pk1", "pk2"],
202            )
203
204        Note:
205        - This does not unset previously set primary keys.
206        - The primary key must be a single field name or a list of field names.
207        - Not all streams support overriding primary keys. If a stream does not support overriding
208          primary keys, the override may be ignored.
209        - Stream names are case insensitive, while field names are case sensitive.
210        - Stream names are not validated by PyAirbyte. If the stream name
211          does not exist in the catalog, the override may be ignored.
212        """
213        self._primary_key_overrides.update(
214            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
215        )

Override the primary keys for one or more streams.

This does not unset previously set primary keys.

Usage:

source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def select_all_streams(self) -> None:
231    def select_all_streams(self) -> None:
232        """Select all streams.
233
234        This is a more streamlined equivalent to:
235        > source.select_streams(source.get_available_streams()).
236        """
237        if self._config_dict is None:
238            self._to_be_selected_streams = "*"
239            self._log_warning_preselected_stream(self._to_be_selected_streams)
240            return
241
242        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
244    def select_streams(self, streams: str | list[str]) -> None:
245        """Select the stream names that should be read from the connector.
246
247        Args:
248            streams: A list of stream names to select. If set to "*", all streams will be selected.
249
250        Currently, if this is not set, all streams will be read.
251        """
252        if self._config_dict is None:
253            self._to_be_selected_streams = streams
254            self._log_warning_preselected_stream(streams)
255            return
256
257        if streams == "*":
258            self.select_all_streams()
259            return
260
261        if isinstance(streams, str):
262            # If a single stream is provided, convert it to a one-item list
263            streams = [streams]
264
265        available_streams = self.get_available_streams()
266        for stream in streams:
267            if stream not in available_streams:
268                raise exc.AirbyteStreamNotFoundError(
269                    stream_name=stream,
270                    connector_name=self.name,
271                    available_streams=available_streams,
272                )
273        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
275    def get_selected_streams(self) -> list[str]:
276        """Get the selected streams.
277
278        If no streams are selected, return an empty list.
279        """
280        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
282    def set_config(
283        self,
284        config: dict[str, Any],
285        *,
286        validate: bool = True,
287    ) -> None:
288        """Set the config for the connector.
289
290        If validate is True, raise an exception if the config fails validation.
291
292        If validate is False, validation will be deferred until check() or validate_config()
293        is called.
294        """
295        if validate:
296            self.validate_config(config)
297
298        self._config_dict = config
299
300        if self._to_be_selected_streams:
301            self.select_streams(self._to_be_selected_streams)
302            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_available_streams(self) -> list[str]:
322    def get_available_streams(self) -> list[str]:
323        """Get the available streams from the spec."""
324        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
357    @property
358    def config_spec(self) -> dict[str, Any]:
359        """Generate a configuration spec for this connector, as a JSON Schema definition.
360
361        This function generates a JSON Schema dictionary with configuration specs for the
362        current connector, as a dictionary.
363
364        Returns:
365            dict: The JSON Schema configuration spec as a dictionary.
366        """
367        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

docs_url: str
384    @property
385    def docs_url(self) -> str:
386        """Get the URL to the connector's documentation."""
387        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
388            "source-", ""
389        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
391    @property
392    def discovered_catalog(self) -> AirbyteCatalog:
393        """Get the raw catalog for the given streams.
394
395        If the catalog is not yet known, we call discover to get it.
396        """
397        if self._discovered_catalog is None:
398            self._discovered_catalog = self._discover()
399
400        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
402    @property
403    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
404        """Get the configured catalog for the given streams.
405
406        If the raw catalog is not yet known, we call discover to get it.
407
408        If no specific streams are selected, we return a catalog that syncs all available streams.
409
410        TODO: We should consider disabling by default the streams that the connector would
411        disable by default. (For instance, streams that require a premium license are sometimes
412        disabled by default within the connector.)
413        """
414        # Ensure discovered catalog is cached before we start
415        _ = self.discovered_catalog
416
417        # Filter for selected streams if set, otherwise use all available streams:
418        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
419        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
421    def get_configured_catalog(
422        self,
423        streams: Literal["*"] | list[str] | None = None,
424    ) -> ConfiguredAirbyteCatalog:
425        """Get a configured catalog for the given streams.
426
427        If no streams are provided, the selected streams will be used. If no streams are selected,
428        all available streams will be used.
429
430        If '*' is provided, all available streams will be used.
431        """
432        selected_streams: list[str] = []
433        if streams is None:
434            selected_streams = self._selected_stream_names or self.get_available_streams()
435        elif streams == "*":
436            selected_streams = self.get_available_streams()
437        elif isinstance(streams, list):
438            selected_streams = streams
439        else:
440            raise exc.PyAirbyteInputError(
441                message="Invalid streams argument.",
442                input_value=streams,
443            )
444
445        return ConfiguredAirbyteCatalog(
446            streams=[
447                ConfiguredAirbyteStream(
448                    stream=stream,
449                    destination_sync_mode=DestinationSyncMode.overwrite,
450                    sync_mode=SyncMode.incremental,
451                    primary_key=(
452                        [self._primary_key_overrides[stream.name.lower()]]
453                        if stream.name.lower() in self._primary_key_overrides
454                        else stream.source_defined_primary_key
455                    ),
456                    cursor_field=(
457                        [self._cursor_key_overrides[stream.name.lower()]]
458                        if stream.name.lower() in self._cursor_key_overrides
459                        else stream.default_cursor_field
460                    ),
461                    # These are unused in the current implementation:
462                    generation_id=None,
463                    minimum_generation_id=None,
464                    sync_id=None,
465                )
466                for stream in self.discovered_catalog.streams
467                if stream.name in selected_streams
468            ],
469        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
471    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
472        """Return the JSON Schema spec for the specified stream name."""
473        catalog: AirbyteCatalog = self.discovered_catalog
474        found: list[AirbyteStream] = [
475            stream for stream in catalog.streams if stream.name == stream_name
476        ]
477
478        if len(found) == 0:
479            raise exc.PyAirbyteInputError(
480                message="Stream name does not exist in catalog.",
481                input_value=stream_name,
482            )
483
484        if len(found) > 1:
485            raise exc.PyAirbyteInternalError(
486                message="Duplicate streams found with the same name.",
487                context={
488                    "found_streams": found,
489                },
490            )
491
492        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, limit: int | None = None, stop_event: threading.Event | None = None, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
494    def get_records(
495        self,
496        stream: str,
497        *,
498        limit: int | None = None,
499        stop_event: threading.Event | None = None,
500        normalize_field_names: bool = False,
501        prune_undeclared_fields: bool = True,
502    ) -> LazyDataset:
503        """Read a stream from the connector.
504
505        Args:
506            stream: The name of the stream to read.
507            limit: The maximum number of records to read. If None, all records will be read.
508            stop_event: If set, the event can be triggered by the caller to stop reading records
509                and terminate the process.
510            normalize_field_names: When `True`, field names will be normalized to lower case, with
511                special characters removed. This matches the behavior of PyAirbyte caches and most
512                Airbyte destinations.
513            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
514                which generally matches the behavior of PyAirbyte caches and most Airbyte
515                destinations, specifically when you expect the catalog may be stale. You can disable
516                this to keep all fields in the records.
517
518        This involves the following steps:
519        * Call discover to get the catalog
520        * Generate a configured catalog that syncs the given stream in full_refresh mode
521        * Write the configured catalog and the config to a temporary file
522        * execute the connector with read --config <config_file> --catalog <catalog_file>
523        * Listen to the messages and return the first AirbyteRecordMessages that come along.
524        * Make sure the subprocess is killed when the function returns.
525        """
526        stop_event = stop_event or threading.Event()
527        configured_catalog = self.get_configured_catalog(streams=[stream])
528        if len(configured_catalog.streams) == 0:
529            raise exc.PyAirbyteInputError(
530                message="Requested stream does not exist.",
531                context={
532                    "stream": stream,
533                    "available_streams": self.get_available_streams(),
534                    "connector_name": self.name,
535                },
536            ) from KeyError(stream)
537
538        configured_stream = configured_catalog.streams[0]
539
540        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
541            yield from records
542
543        stream_record_handler = StreamRecordHandler(
544            json_schema=self.get_stream_json_schema(stream),
545            prune_extra_fields=prune_undeclared_fields,
546            normalize_keys=normalize_field_names,
547        )
548
549        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
550        progress_tracker = ProgressTracker(
551            ProgressStyle.PLAIN,
552            source=self,
553            cache=None,
554            destination=None,
555            expected_streams=[stream],
556        )
557
558        iterator: Iterator[dict[str, Any]] = (
559            StreamRecord.from_record_message(
560                record_message=record.record,
561                stream_record_handler=stream_record_handler,
562            )
563            for record in self._read_with_catalog(
564                catalog=configured_catalog,
565                progress_tracker=progress_tracker,
566                stop_event=stop_event,
567            )
568            if record.record
569        )
570        if limit is not None:
571            # Stop the iterator after the limit is reached
572            iterator = islice(iterator, limit)
573
574        return LazyDataset(
575            iterator,
576            stream_metadata=configured_stream,
577            stop_event=stop_event,
578            progress_tracker=progress_tracker,
579        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • limit: The maximum number of records to read. If None, all records will be read.
  • stop_event: If set, the event can be triggered by the caller to stop reading records and terminate the process.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
581    def get_documents(
582        self,
583        stream: str,
584        title_property: str | None = None,
585        content_properties: list[str] | None = None,
586        metadata_properties: list[str] | None = None,
587        *,
588        render_metadata: bool = False,
589    ) -> Iterable[Document]:
590        """Read a stream from the connector and return the records as documents.
591
592        If metadata_properties is not set, all properties that are not content will be added to
593        the metadata.
594
595        If render_metadata is True, metadata will be rendered in the document, as well as the
596        the main content.
597        """
598        return self.get_records(stream).to_documents(
599            title_property=title_property,
600            content_properties=content_properties,
601            metadata_properties=metadata_properties,
602            render_metadata=render_metadata,
603        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def get_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'raise') -> dict[str, airbyte.datasets._inmemory.InMemoryDataset | None]:
605    def get_samples(
606        self,
607        streams: list[str] | Literal["*"] | None = None,
608        *,
609        limit: int = 5,
610        on_error: Literal["raise", "ignore", "log"] = "raise",
611    ) -> dict[str, InMemoryDataset | None]:
612        """Get a sample of records from the given streams."""
613        if streams == "*":
614            streams = self.get_available_streams()
615        elif streams is None:
616            streams = self.get_selected_streams()
617
618        results: dict[str, InMemoryDataset | None] = {}
619        for stream in streams:
620            stop_event = threading.Event()
621            try:
622                results[stream] = self.get_records(
623                    stream,
624                    limit=limit,
625                    stop_event=stop_event,
626                ).fetch_all()
627                stop_event.set()
628            except Exception as ex:
629                results[stream] = None
630                if on_error == "ignore":
631                    continue
632
633                if on_error == "raise":
634                    raise ex from None
635
636                if on_error == "log":
637                    print(f"Error fetching sample for stream '{stream}': {ex}")
638
639        return results

Get a sample of records from the given streams.

def print_samples( self, streams: Union[list[str], Literal['*'], NoneType] = None, *, limit: int = 5, on_error: Literal['raise', 'ignore', 'log'] = 'log') -> None:
641    def print_samples(
642        self,
643        streams: list[str] | Literal["*"] | None = None,
644        *,
645        limit: int = 5,
646        on_error: Literal["raise", "ignore", "log"] = "log",
647    ) -> None:
648        """Print a sample of records from the given streams."""
649        internal_cols: list[str] = [
650            AB_EXTRACTED_AT_COLUMN,
651            AB_META_COLUMN,
652            AB_RAW_ID_COLUMN,
653        ]
654        col_limit = 10
655        if streams == "*":
656            streams = self.get_available_streams()
657        elif streams is None:
658            streams = self.get_selected_streams()
659
660        console = Console()
661
662        console.print(
663            Markdown(
664                f"# Sample Records from `{self.name}` ({len(streams)} selected streams)",
665                justify="left",
666            )
667        )
668
669        for stream in streams:
670            console.print(Markdown(f"## `{stream}` Stream Sample", justify="left"))
671            samples = self.get_samples(
672                streams=[stream],
673                limit=limit,
674                on_error=on_error,
675            )
676            dataset = samples[stream]
677
678            table = Table(
679                show_header=True,
680                show_lines=True,
681            )
682            if dataset is None:
683                console.print(
684                    Markdown("**⚠️ `Error fetching sample records.` ⚠️**"),
685                )
686                continue
687
688            if len(dataset.column_names) > col_limit:
689                # We'll pivot the columns so each column is its own row
690                table.add_column("Column Name")
691                for _ in range(len(dataset)):
692                    table.add_column(overflow="fold")
693                for col in dataset.column_names:
694                    table.add_row(
695                        Markdown(f"**`{col}`**"),
696                        *[escape(str(record[col])) for record in dataset],
697                    )
698            else:
699                for col in dataset.column_names:
700                    table.add_column(
701                        Markdown(f"**`{col}`**"),
702                        overflow="fold",
703                    )
704
705                for record in dataset:
706                    table.add_row(
707                        *[
708                            escape(str(val))
709                            for key, val in record.items()
710                            # Exclude internal Airbyte columns.
711                            if key not in internal_cols
712                        ]
713                    )
714
715            console.print(table)
716
717        console.print(Markdown("--------------"))

Print a sample of records from the given streams.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
816    def read(
817        self,
818        cache: CacheBase | None = None,
819        *,
820        streams: str | list[str] | None = None,
821        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
822        force_full_refresh: bool = False,
823        skip_validation: bool = False,
824    ) -> ReadResult:
825        """Read from the connector and write to the cache.
826
827        Args:
828            cache: The cache to write to. If not set, a default cache will be used.
829            streams: Optional if already set. A list of stream names to select for reading. If set
830                to "*", all streams will be selected.
831            write_strategy: The strategy to use when writing to the cache. If a string, it must be
832                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
833                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
834                WriteStrategy.AUTO.
835            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
836                streams will be read in incremental mode if supported by the connector. This option
837                must be True when using the "replace" strategy.
838            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
839                running the connector. This can be helpful in debugging, when you want to send
840                configurations to the connector that otherwise might be rejected by JSON Schema
841                validation rules.
842        """
843        cache = cache or get_default_cache()
844        progress_tracker = ProgressTracker(
845            source=self,
846            cache=cache,
847            destination=None,
848            expected_streams=None,  # Will be set later
849        )
850
851        # Set up state provider if not in full refresh mode
852        if force_full_refresh:
853            state_provider: StateProviderBase | None = None
854        else:
855            state_provider = cache.get_state_provider(
856                source_name=self._name,
857            )
858        state_writer = cache.get_state_writer(source_name=self._name)
859
860        if streams:
861            self.select_streams(streams)
862
863        if not self._selected_stream_names:
864            raise exc.PyAirbyteNoStreamsSelectedError(
865                connector_name=self.name,
866                available_streams=self.get_available_streams(),
867            )
868
869        try:
870            result = self._read_to_cache(
871                cache=cache,
872                catalog_provider=CatalogProvider(self.configured_catalog),
873                stream_names=self._selected_stream_names,
874                state_provider=state_provider,
875                state_writer=state_writer,
876                write_strategy=write_strategy,
877                force_full_refresh=force_full_refresh,
878                skip_validation=skip_validation,
879                progress_tracker=progress_tracker,
880            )
881        except exc.PyAirbyteInternalError as ex:
882            progress_tracker.log_failure(exception=ex)
883            raise exc.AirbyteConnectorFailedError(
884                connector_name=self.name,
885                log_text=self._last_log_messages,
886            ) from ex
887        except Exception as ex:
888            progress_tracker.log_failure(exception=ex)
889            raise
890
891        progress_tracker.log_success()
892        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
get_config
config_hash
validate_config
print_config_spec
connector_version
check
install
uninstall
class ConnectorMetadata(pydantic.main.BaseModel):
58class ConnectorMetadata(BaseModel):
59    """Metadata for a connector."""
60
61    name: str
62    """Connector name. For example, "source-google-sheets"."""
63
64    latest_available_version: str | None
65    """The latest available version of the connector."""
66
67    pypi_package_name: str | None
68    """The name of the PyPI package for the connector, if it exists."""
69
70    language: Language | None
71    """The language of the connector."""
72
73    install_types: set[InstallType]
74    """The supported install types for the connector."""
75
76    suggested_streams: list[str] | None = None
77    """A list of suggested streams for the connector, if available."""
78
79    @property
80    def default_install_type(self) -> InstallType:
81        """Return the default install type for the connector."""
82        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
83            return InstallType.YAML
84
85        if InstallType.PYTHON in self.install_types:
86            return InstallType.PYTHON
87
88        # Else: Java or Docker
89        return InstallType.DOCKER

Metadata for a connector.

name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

The language of the connector.

The supported install types for the connector.

suggested_streams: list[str] | None

A list of suggested streams for the connector, if available.

default_install_type: airbyte.sources.registry.InstallType
79    @property
80    def default_install_type(self) -> InstallType:
81        """Return the default install type for the connector."""
82        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
83            return InstallType.YAML
84
85        if InstallType.PYTHON in self.install_types:
86            return InstallType.PYTHON
87
88        # Else: Java or Docker
89        return InstallType.DOCKER

Return the default install type for the connector.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields