airbyte.sources

Sources connectors module for PyAirbyte.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Sources connectors module for PyAirbyte."""
 3
 4from __future__ import annotations
 5
 6from typing import TYPE_CHECKING
 7
 8from airbyte.sources.base import Source
 9from airbyte.sources.registry import (
10    ConnectorMetadata,
11    get_available_connectors,
12    get_connector_metadata,
13)
14from airbyte.sources.util import (
15    get_benchmark_source,
16    get_source,
17)
18
19
20# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
21if TYPE_CHECKING:
22    # ruff: noqa: TC004  # imports used for more than type checking
23    from airbyte.sources import (
24        base,
25        registry,
26        util,
27    )
28
29__all__ = [
30    # Submodules
31    "base",
32    "registry",
33    "util",
34    # Factories
35    "get_source",
36    "get_benchmark_source",
37    # Helper Functions
38    "get_available_connectors",
39    "get_connector_metadata",
40    # Classes
41    "Source",
42    "ConnectorMetadata",
43]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    pip_url: str | None = None,
 56    local_executable: Path | str | None = None,
 57    docker_image: bool | str | None = None,
 58    use_host_network: bool = False,
 59    source_manifest: bool | dict | Path | str | None = None,
 60    install_if_missing: bool = True,
 61    install_root: Path | None = None,
 62) -> Source:
 63    """Get a connector by name and version.
 64
 65    If an explicit install or execution method is requested (e.g. `local_executable`,
 66    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 67
 68    Otherwise, an appropriate method will be selected based on the available connector metadata:
 69    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 70       will be downloaded and used to to execute the connector.
 71    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 72    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 73       will be executed using Docker.
 74
 75    Args:
 76        name: connector name
 77        config: connector config - if not provided, you need to set it later via the set_config
 78            method.
 79        config_change_callback: callback function to be called when the connector config changes.
 80        streams: list of stream names to select for reading. If set to "*", all streams will be
 81            selected. If not provided, you can set it later via the `select_streams()` or
 82            `select_all_streams()` method.
 83        version: connector version - if not provided, the currently installed version will be used.
 84            If no version is installed, the latest available version will be used. The version can
 85            also be set to "latest" to force the use of the latest available version.
 86        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 87            connector name.
 88        local_executable: If set, the connector will be assumed to already be installed and will be
 89            executed using this path or executable name. Otherwise, the connector will be installed
 90            automatically in a virtual environment.
 91        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 92            to use the default image for the connector, or you can specify a custom image name.
 93            If `version` is specified and your image name does not already contain a tag
 94            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 95        use_host_network: If set, along with docker_image, the connector will be executed using
 96            the host network. This is useful for connectors that need to access resources on
 97            the host machine, such as a local database. This parameter is ignored when
 98            `docker_image` is not set.
 99        source_manifest: If set, the connector will be executed based on a declarative YAML
100            source definition. This input can be `True` to attempt to auto-download a YAML spec,
101            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
102            the local file system, or `str` to pull the definition from a web URL.
103        install_if_missing: Whether to install the connector if it is not available locally. This
104            parameter is ignored when `local_executable` or `source_manifest` are set.
105        install_root: (Optional.) The root directory where the virtual environment will be
106            created. If not provided, the current working directory will be used.
107    """
108    return Source(
109        name=name,
110        config=config,
111        config_change_callback=config_change_callback,
112        streams=streams,
113        executor=get_connector_executor(
114            name=name,
115            version=version,
116            pip_url=pip_url,
117            local_executable=local_executable,
118            docker_image=docker_image,
119            use_host_network=use_host_network,
120            source_manifest=source_manifest,
121            install_if_missing=install_if_missing,
122            install_root=install_root,
123        ),
124    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def get_benchmark_source( num_records: int | str = '5e5', *, install_if_missing: bool = True) -> Source:
127def get_benchmark_source(
128    num_records: int | str = "5e5",
129    *,
130    install_if_missing: bool = True,
131) -> Source:
132    """Get a source for benchmarking.
133
134    This source will generate dummy records for performance benchmarking purposes.
135    You can specify the number of records to generate using the `num_records` parameter.
136    The `num_records` parameter can be an integer or a string in scientific notation.
137    For example, `"5e6"` will generate 5 million records. If underscores are providing
138    within a numeric a string, they will be ignored.
139
140    Args:
141        num_records: The number of records to generate. Defaults to "5e5", or
142            500,000 records.
143            Can be an integer (`1000`) or a string in scientific notation.
144            For example, `"5e6"` will generate 5 million records.
145        install_if_missing: Whether to install the source if it is not available locally.
146
147    Returns:
148        Source: The source object for benchmarking.
149    """
150    if isinstance(num_records, str):
151        try:
152            num_records = int(Decimal(num_records.replace("_", "")))
153        except InvalidOperation as ex:
154            raise PyAirbyteInputError(
155                message="Invalid number format.",
156                original_exception=ex,
157                input_value=str(num_records),
158            ) from None
159
160    return get_source(
161        name="source-e2e-test",
162        docker_image=True,
163        # docker_image="airbyte/source-e2e-test:latest",
164        config={
165            "type": "BENCHMARK",
166            "schema": "FIVE_STRING_COLUMNS",
167            "terminationCondition": {
168                "type": "MAX_RECORDS",
169                "max": num_records,
170            },
171        },
172        streams="*",
173        install_if_missing=install_if_missing,
174    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records: The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
  • install_if_missing: Whether to install the source if it is not available locally.
Returns:

Source: The source object for benchmarking.

def get_available_connectors( install_type: airbyte.sources.registry.InstallType | str | None = None) -> list[str]:
237def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
238    """Return a list of all available connectors.
239
240    Connectors will be returned in alphabetical order, with the standard prefix "source-".
241    """
242    if install_type is None:
243        # No install type specified. Filter for whatever is runnable.
244        if is_docker_installed():
245            logger.info("Docker is detected. Returning all connectors.")
246            # If Docker is available, return all connectors.
247            return sorted(conn.name for conn in _get_registry_cache().values())
248
249        logger.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
250
251        # If Docker is not available, return only Python and Manifest-based connectors.
252        return sorted(
253            conn.name
254            for conn in _get_registry_cache().values()
255            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
256        )
257
258    if not isinstance(install_type, InstallType):
259        install_type = InstallType(install_type)
260
261    if install_type == InstallType.PYTHON:
262        return sorted(
263            conn.name
264            for conn in _get_registry_cache().values()
265            if conn.pypi_package_name is not None
266        )
267
268    if install_type == InstallType.JAVA:
269        warnings.warn(
270            message="Java connectors are not yet supported.",
271            stacklevel=2,
272        )
273        return sorted(
274            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
275        )
276
277    if install_type == InstallType.DOCKER:
278        return sorted(conn.name for conn in _get_registry_cache().values())
279
280    if install_type == InstallType.YAML:
281        return sorted(
282            conn.name
283            for conn in _get_registry_cache().values()
284            if InstallType.YAML in conn.install_types
285            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
286        )
287
288    # pragma: no cover  # Should never be reached.
289    raise exc.PyAirbyteInputError(
290        message="Invalid install type.",
291        context={
292            "install_type": install_type,
293        },
294    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

def get_connector_metadata(name: str) -> ConnectorMetadata | None:
207def get_connector_metadata(name: str) -> ConnectorMetadata | None:
208    """Check the cache for the connector.
209
210    If the cache is empty, populate by calling update_cache.
211    """
212    registry_url = _get_registry_url()
213
214    if _is_registry_disabled(registry_url):
215        return None
216
217    cache = copy(_get_registry_cache())
218
219    if not cache:
220        raise exc.PyAirbyteInternalError(
221            message="Connector registry could not be loaded.",
222            context={
223                "registry_url": _get_registry_url(),
224            },
225        )
226    if name not in cache:
227        raise exc.AirbyteConnectorNotRegisteredError(
228            connector_name=name,
229            context={
230                "registry_url": _get_registry_url(),
231                "available_connectors": get_available_connectors(),
232            },
233        )
234    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

class Source(airbyte._connector_base.ConnectorBase):
 53class Source(ConnectorBase):  # noqa: PLR0904
 54    """A class representing a source that can be called."""
 55
 56    connector_type = "source"
 57
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67        cursor_key_overrides: dict[str, str] | None = None,
 68        primary_key_overrides: dict[str, str | list[str]] | None = None,
 69    ) -> None:
 70        """Initialize the source.
 71
 72        If config is provided, it will be validated against the spec if validate is True.
 73        """
 74        self._to_be_selected_streams: list[str] | str = []
 75        """Used to hold selection criteria before catalog is known."""
 76
 77        super().__init__(
 78            executor=executor,
 79            name=name,
 80            config=config,
 81            config_change_callback=config_change_callback,
 82            validate=validate,
 83        )
 84        self._config_dict: dict[str, Any] | None = None
 85        self._last_log_messages: list[str] = []
 86        self._discovered_catalog: AirbyteCatalog | None = None
 87        self._selected_stream_names: list[str] = []
 88
 89        self._cursor_key_overrides: dict[str, str] = {}
 90        """A mapping of lower-cased stream names to cursor key overrides."""
 91
 92        self._primary_key_overrides: dict[str, list[str]] = {}
 93        """A mapping of lower-cased stream names to primary key overrides."""
 94
 95        if config is not None:
 96            self.set_config(config, validate=validate)
 97        if streams is not None:
 98            self.select_streams(streams)
 99        if cursor_key_overrides is not None:
100            self.set_cursor_keys(**cursor_key_overrides)
101        if primary_key_overrides is not None:
102            self.set_primary_keys(**primary_key_overrides)
103
104    def set_streams(self, streams: list[str]) -> None:
105        """Deprecated. See select_streams()."""
106        warnings.warn(
107            "The 'set_streams' method is deprecated and will be removed in a future version. "
108            "Please use the 'select_streams' method instead.",
109            DeprecationWarning,
110            stacklevel=2,
111        )
112        self.select_streams(streams)
113
114    def set_cursor_key(
115        self,
116        stream_name: str,
117        cursor_key: str,
118    ) -> None:
119        """Set the cursor for a single stream.
120
121        Note:
122        - This does not unset previously set cursors.
123        - The cursor key must be a single field name.
124        - Not all streams support custom cursors. If a stream does not support custom cursors,
125          the override may be ignored.
126        - Stream names are case insensitive, while field names are case sensitive.
127        - Stream names are not validated by PyAirbyte. If the stream name
128          does not exist in the catalog, the override may be ignored.
129        """
130        self._cursor_key_overrides[stream_name.lower()] = cursor_key
131
132    def set_cursor_keys(
133        self,
134        **kwargs: str,
135    ) -> None:
136        """Override the cursor key for one or more streams.
137
138        Usage:
139            source.set_cursor_keys(
140                stream1="cursor1",
141                stream2="cursor2",
142            )
143
144        Note:
145        - This does not unset previously set cursors.
146        - The cursor key must be a single field name.
147        - Not all streams support custom cursors. If a stream does not support custom cursors,
148          the override may be ignored.
149        - Stream names are case insensitive, while field names are case sensitive.
150        - Stream names are not validated by PyAirbyte. If the stream name
151          does not exist in the catalog, the override may be ignored.
152        """
153        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})
154
155    def set_primary_key(
156        self,
157        stream_name: str,
158        primary_key: str | list[str],
159    ) -> None:
160        """Set the primary key for a single stream.
161
162        Note:
163        - This does not unset previously set primary keys.
164        - The primary key must be a single field name or a list of field names.
165        - Not all streams support overriding primary keys. If a stream does not support overriding
166          primary keys, the override may be ignored.
167        - Stream names are case insensitive, while field names are case sensitive.
168        - Stream names are not validated by PyAirbyte. If the stream name
169          does not exist in the catalog, the override may be ignored.
170        """
171        self._primary_key_overrides[stream_name.lower()] = (
172            primary_key if isinstance(primary_key, list) else [primary_key]
173        )
174
175    def set_primary_keys(
176        self,
177        **kwargs: str | list[str],
178    ) -> None:
179        """Override the primary keys for one or more streams.
180
181        This does not unset previously set primary keys.
182
183        Usage:
184            source.set_primary_keys(
185                stream1="pk1",
186                stream2=["pk1", "pk2"],
187            )
188
189        Note:
190        - This does not unset previously set primary keys.
191        - The primary key must be a single field name or a list of field names.
192        - Not all streams support overriding primary keys. If a stream does not support overriding
193          primary keys, the override may be ignored.
194        - Stream names are case insensitive, while field names are case sensitive.
195        - Stream names are not validated by PyAirbyte. If the stream name
196          does not exist in the catalog, the override may be ignored.
197        """
198        self._primary_key_overrides.update(
199            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
200        )
201
202    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
203        """Logs a warning message indicating stream selection which are not selected yet."""
204        if streams == "*":
205            print(
206                "Warning: Config is not set yet. All streams will be selected after config is set."
207            )
208        else:
209            print(
210                "Warning: Config is not set yet. "
211                f"Streams to be selected after config is set: {streams}"
212            )
213
214    def select_all_streams(self) -> None:
215        """Select all streams.
216
217        This is a more streamlined equivalent to:
218        > source.select_streams(source.get_available_streams()).
219        """
220        if self._config_dict is None:
221            self._to_be_selected_streams = "*"
222            self._log_warning_preselected_stream(self._to_be_selected_streams)
223            return
224
225        self._selected_stream_names = self.get_available_streams()
226
227    def select_streams(self, streams: str | list[str]) -> None:
228        """Select the stream names that should be read from the connector.
229
230        Args:
231            streams: A list of stream names to select. If set to "*", all streams will be selected.
232
233        Currently, if this is not set, all streams will be read.
234        """
235        if self._config_dict is None:
236            self._to_be_selected_streams = streams
237            self._log_warning_preselected_stream(streams)
238            return
239
240        if streams == "*":
241            self.select_all_streams()
242            return
243
244        if isinstance(streams, str):
245            # If a single stream is provided, convert it to a one-item list
246            streams = [streams]
247
248        available_streams = self.get_available_streams()
249        for stream in streams:
250            if stream not in available_streams:
251                raise exc.AirbyteStreamNotFoundError(
252                    stream_name=stream,
253                    connector_name=self.name,
254                    available_streams=available_streams,
255                )
256        self._selected_stream_names = streams
257
258    def get_selected_streams(self) -> list[str]:
259        """Get the selected streams.
260
261        If no streams are selected, return an empty list.
262        """
263        return self._selected_stream_names
264
265    def set_config(
266        self,
267        config: dict[str, Any],
268        *,
269        validate: bool = True,
270    ) -> None:
271        """Set the config for the connector.
272
273        If validate is True, raise an exception if the config fails validation.
274
275        If validate is False, validation will be deferred until check() or validate_config()
276        is called.
277        """
278        if validate:
279            self.validate_config(config)
280
281        self._config_dict = config
282
283        if self._to_be_selected_streams:
284            self.select_streams(self._to_be_selected_streams)
285            self._to_be_selected_streams = []
286
287    def _discover(self) -> AirbyteCatalog:
288        """Call discover on the connector.
289
290        This involves the following steps:
291        - Write the config to a temporary file
292        - execute the connector with discover --config <config_file>
293        - Listen to the messages and return the first AirbyteCatalog that comes along.
294        - Make sure the subprocess is killed when the function returns.
295        """
296        with as_temp_files([self._hydrated_config]) as [config_file]:
297            for msg in self._execute(["discover", "--config", config_file]):
298                if msg.type == Type.CATALOG and msg.catalog:
299                    return msg.catalog
300            raise exc.AirbyteConnectorMissingCatalogError(
301                connector_name=self.name,
302                log_text=self._last_log_messages,
303            )
304
305    def get_available_streams(self) -> list[str]:
306        """Get the available streams from the spec."""
307        return [s.name for s in self.discovered_catalog.streams]
308
309    def _get_incremental_stream_names(self) -> list[str]:
310        """Get the name of streams that support incremental sync."""
311        return [
312            stream.name
313            for stream in self.discovered_catalog.streams
314            if SyncMode.incremental in stream.supported_sync_modes
315        ]
316
317    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
318        """Call spec on the connector.
319
320        This involves the following steps:
321        * execute the connector with spec
322        * Listen to the messages and return the first AirbyteCatalog that comes along.
323        * Make sure the subprocess is killed when the function returns.
324        """
325        if force_refresh or self._spec is None:
326            for msg in self._execute(["spec"]):
327                if msg.type == Type.SPEC and msg.spec:
328                    self._spec = msg.spec
329                    break
330
331        if self._spec:
332            return self._spec
333
334        raise exc.AirbyteConnectorMissingSpecError(
335            connector_name=self.name,
336            log_text=self._last_log_messages,
337        )
338
339    @property
340    def config_spec(self) -> dict[str, Any]:
341        """Generate a configuration spec for this connector, as a JSON Schema definition.
342
343        This function generates a JSON Schema dictionary with configuration specs for the
344        current connector, as a dictionary.
345
346        Returns:
347            dict: The JSON Schema configuration spec as a dictionary.
348        """
349        return self._get_spec(force_refresh=True).connectionSpecification
350
351    def print_config_spec(
352        self,
353        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
354        *,
355        output_file: Path | str | None = None,
356    ) -> None:
357        """Print the configuration spec for this connector.
358
359        Args:
360            format: The format to print the spec in. Must be "yaml" or "json".
361            output_file: Optional. If set, the spec will be written to the given file path.
362                Otherwise, it will be printed to the console.
363        """
364        if format not in {"yaml", "json"}:
365            raise exc.PyAirbyteInputError(
366                message="Invalid format. Expected 'yaml' or 'json'",
367                input_value=format,
368            )
369        if isinstance(output_file, str):
370            output_file = Path(output_file)
371
372        if format == "yaml":
373            content = yaml.dump(self.config_spec, indent=2)
374        elif format == "json":
375            content = json.dumps(self.config_spec, indent=2)
376
377        if output_file:
378            output_file.write_text(content)
379            return
380
381        syntax_highlighted = Syntax(content, format)
382        print(syntax_highlighted)
383
384    @property
385    def _yaml_spec(self) -> str:
386        """Get the spec as a yaml string.
387
388        For now, the primary use case is for writing and debugging a valid config for a source.
389
390        This is private for now because we probably want better polish before exposing this
391        as a stable interface. This will also get easier when we have docs links with this info
392        for each connector.
393        """
394        spec_obj: ConnectorSpecification = self._get_spec()
395        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
396        # convert to a yaml string
397        return yaml.dump(spec_dict)
398
399    @property
400    def docs_url(self) -> str:
401        """Get the URL to the connector's documentation."""
402        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
403            "source-", ""
404        )
405
406    @property
407    def discovered_catalog(self) -> AirbyteCatalog:
408        """Get the raw catalog for the given streams.
409
410        If the catalog is not yet known, we call discover to get it.
411        """
412        if self._discovered_catalog is None:
413            self._discovered_catalog = self._discover()
414
415        return self._discovered_catalog
416
417    @property
418    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
419        """Get the configured catalog for the given streams.
420
421        If the raw catalog is not yet known, we call discover to get it.
422
423        If no specific streams are selected, we return a catalog that syncs all available streams.
424
425        TODO: We should consider disabling by default the streams that the connector would
426        disable by default. (For instance, streams that require a premium license are sometimes
427        disabled by default within the connector.)
428        """
429        # Ensure discovered catalog is cached before we start
430        _ = self.discovered_catalog
431
432        # Filter for selected streams if set, otherwise use all available streams:
433        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
434        return self.get_configured_catalog(streams=streams_filter)
435
436    def get_configured_catalog(
437        self,
438        streams: Literal["*"] | list[str] | None = None,
439    ) -> ConfiguredAirbyteCatalog:
440        """Get a configured catalog for the given streams.
441
442        If no streams are provided, the selected streams will be used. If no streams are selected,
443        all available streams will be used.
444
445        If '*' is provided, all available streams will be used.
446        """
447        selected_streams: list[str] = []
448        if streams is None:
449            selected_streams = self._selected_stream_names or self.get_available_streams()
450        elif streams == "*":
451            selected_streams = self.get_available_streams()
452        elif isinstance(streams, list):
453            selected_streams = streams
454        else:
455            raise exc.PyAirbyteInputError(
456                message="Invalid streams argument.",
457                input_value=streams,
458            )
459
460        return ConfiguredAirbyteCatalog(
461            streams=[
462                ConfiguredAirbyteStream(
463                    stream=stream,
464                    destination_sync_mode=DestinationSyncMode.overwrite,
465                    sync_mode=SyncMode.incremental,
466                    primary_key=(
467                        [self._primary_key_overrides[stream.name.lower()]]
468                        if stream.name.lower() in self._primary_key_overrides
469                        else stream.source_defined_primary_key
470                    ),
471                    cursor_field=(
472                        [self._cursor_key_overrides[stream.name.lower()]]
473                        if stream.name.lower() in self._cursor_key_overrides
474                        else stream.default_cursor_field
475                    ),
476                    # These are unused in the current implementation:
477                    generation_id=None,
478                    minimum_generation_id=None,
479                    sync_id=None,
480                )
481                for stream in self.discovered_catalog.streams
482                if stream.name in selected_streams
483            ],
484        )
485
486    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
487        """Return the JSON Schema spec for the specified stream name."""
488        catalog: AirbyteCatalog = self.discovered_catalog
489        found: list[AirbyteStream] = [
490            stream for stream in catalog.streams if stream.name == stream_name
491        ]
492
493        if len(found) == 0:
494            raise exc.PyAirbyteInputError(
495                message="Stream name does not exist in catalog.",
496                input_value=stream_name,
497            )
498
499        if len(found) > 1:
500            raise exc.PyAirbyteInternalError(
501                message="Duplicate streams found with the same name.",
502                context={
503                    "found_streams": found,
504                },
505            )
506
507        return found[0].json_schema
508
509    def get_records(
510        self,
511        stream: str,
512        *,
513        normalize_field_names: bool = False,
514        prune_undeclared_fields: bool = True,
515    ) -> LazyDataset:
516        """Read a stream from the connector.
517
518        Args:
519            stream: The name of the stream to read.
520            normalize_field_names: When `True`, field names will be normalized to lower case, with
521                special characters removed. This matches the behavior of PyAirbyte caches and most
522                Airbyte destinations.
523            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
524                which generally matches the behavior of PyAirbyte caches and most Airbyte
525                destinations, specifically when you expect the catalog may be stale. You can disable
526                this to keep all fields in the records.
527
528        This involves the following steps:
529        * Call discover to get the catalog
530        * Generate a configured catalog that syncs the given stream in full_refresh mode
531        * Write the configured catalog and the config to a temporary file
532        * execute the connector with read --config <config_file> --catalog <catalog_file>
533        * Listen to the messages and return the first AirbyteRecordMessages that come along.
534        * Make sure the subprocess is killed when the function returns.
535        """
536        configured_catalog = self.get_configured_catalog(streams=[stream])
537        if len(configured_catalog.streams) == 0:
538            raise exc.PyAirbyteInputError(
539                message="Requested stream does not exist.",
540                context={
541                    "stream": stream,
542                    "available_streams": self.get_available_streams(),
543                    "connector_name": self.name,
544                },
545            ) from KeyError(stream)
546
547        configured_stream = configured_catalog.streams[0]
548
549        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
550            yield from records
551
552        stream_record_handler = StreamRecordHandler(
553            json_schema=self.get_stream_json_schema(stream),
554            prune_extra_fields=prune_undeclared_fields,
555            normalize_keys=normalize_field_names,
556        )
557
558        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
559        progress_tracker = ProgressTracker(
560            ProgressStyle.PLAIN,
561            source=self,
562            cache=None,
563            destination=None,
564            expected_streams=[stream],
565        )
566
567        iterator: Iterator[dict[str, Any]] = (
568            StreamRecord.from_record_message(
569                record_message=record.record,
570                stream_record_handler=stream_record_handler,
571            )
572            for record in self._read_with_catalog(
573                catalog=configured_catalog,
574                progress_tracker=progress_tracker,
575            )
576            if record.record
577        )
578        progress_tracker.log_success()
579        return LazyDataset(
580            iterator,
581            stream_metadata=configured_stream,
582        )
583
584    def get_documents(
585        self,
586        stream: str,
587        title_property: str | None = None,
588        content_properties: list[str] | None = None,
589        metadata_properties: list[str] | None = None,
590        *,
591        render_metadata: bool = False,
592    ) -> Iterable[Document]:
593        """Read a stream from the connector and return the records as documents.
594
595        If metadata_properties is not set, all properties that are not content will be added to
596        the metadata.
597
598        If render_metadata is True, metadata will be rendered in the document, as well as the
599        the main content.
600        """
601        return self.get_records(stream).to_documents(
602            title_property=title_property,
603            content_properties=content_properties,
604            metadata_properties=metadata_properties,
605            render_metadata=render_metadata,
606        )
607
608    def _get_airbyte_message_iterator(
609        self,
610        *,
611        streams: Literal["*"] | list[str] | None = None,
612        state_provider: StateProviderBase | None = None,
613        progress_tracker: ProgressTracker,
614        force_full_refresh: bool = False,
615    ) -> AirbyteMessageIterator:
616        """Get an AirbyteMessageIterator for this source."""
617        return AirbyteMessageIterator(
618            self._read_with_catalog(
619                catalog=self.get_configured_catalog(streams=streams),
620                state=state_provider if not force_full_refresh else None,
621                progress_tracker=progress_tracker,
622            )
623        )
624
625    def _read_with_catalog(
626        self,
627        catalog: ConfiguredAirbyteCatalog,
628        progress_tracker: ProgressTracker,
629        state: StateProviderBase | None = None,
630    ) -> Generator[AirbyteMessage, None, None]:
631        """Call read on the connector.
632
633        This involves the following steps:
634        * Write the config to a temporary file
635        * execute the connector with read --config <config_file> --catalog <catalog_file>
636        * Listen to the messages and return the AirbyteRecordMessages that come along.
637        * Send out telemetry on the performed sync (with information about which source was used and
638          the type of the cache)
639        """
640        with as_temp_files(
641            [
642                self._hydrated_config,
643                catalog.model_dump_json(),
644                state.to_state_input_file_text() if state else "[]",
645            ]
646        ) as [
647            config_file,
648            catalog_file,
649            state_file,
650        ]:
651            message_generator = self._execute(
652                [
653                    "read",
654                    "--config",
655                    config_file,
656                    "--catalog",
657                    catalog_file,
658                    "--state",
659                    state_file,
660                ],
661                progress_tracker=progress_tracker,
662            )
663            yield from progress_tracker.tally_records_read(message_generator)
664        progress_tracker.log_read_complete()
665
666    def _peek_airbyte_message(
667        self,
668        message: AirbyteMessage,
669        *,
670        raise_on_error: bool = True,
671    ) -> None:
672        """Process an Airbyte message.
673
674        This method handles reading Airbyte messages and taking action, if needed, based on the
675        message type. For instance, log messages are logged, records are tallied, and errors are
676        raised as exceptions if `raise_on_error` is True.
677
678        Raises:
679            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
680        """
681        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
682
683    def _log_incremental_streams(
684        self,
685        *,
686        incremental_streams: set[str] | None = None,
687    ) -> None:
688        """Log the streams which are using incremental sync mode."""
689        log_message = (
690            "The following streams are currently using incremental sync:\n"
691            f"{incremental_streams}\n"
692            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
693        )
694        print(log_message)
695
696    def read(
697        self,
698        cache: CacheBase | None = None,
699        *,
700        streams: str | list[str] | None = None,
701        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
702        force_full_refresh: bool = False,
703        skip_validation: bool = False,
704    ) -> ReadResult:
705        """Read from the connector and write to the cache.
706
707        Args:
708            cache: The cache to write to. If not set, a default cache will be used.
709            streams: Optional if already set. A list of stream names to select for reading. If set
710                to "*", all streams will be selected.
711            write_strategy: The strategy to use when writing to the cache. If a string, it must be
712                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
713                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
714                WriteStrategy.AUTO.
715            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
716                streams will be read in incremental mode if supported by the connector. This option
717                must be True when using the "replace" strategy.
718            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
719                running the connector. This can be helpful in debugging, when you want to send
720                configurations to the connector that otherwise might be rejected by JSON Schema
721                validation rules.
722        """
723        cache = cache or get_default_cache()
724        progress_tracker = ProgressTracker(
725            source=self,
726            cache=cache,
727            destination=None,
728            expected_streams=None,  # Will be set later
729        )
730
731        # Set up state provider if not in full refresh mode
732        if force_full_refresh:
733            state_provider: StateProviderBase | None = None
734        else:
735            state_provider = cache.get_state_provider(
736                source_name=self._name,
737            )
738        state_writer = cache.get_state_writer(source_name=self._name)
739
740        if streams:
741            self.select_streams(streams)
742
743        if not self._selected_stream_names:
744            raise exc.PyAirbyteNoStreamsSelectedError(
745                connector_name=self.name,
746                available_streams=self.get_available_streams(),
747            )
748
749        try:
750            result = self._read_to_cache(
751                cache=cache,
752                catalog_provider=CatalogProvider(self.configured_catalog),
753                stream_names=self._selected_stream_names,
754                state_provider=state_provider,
755                state_writer=state_writer,
756                write_strategy=write_strategy,
757                force_full_refresh=force_full_refresh,
758                skip_validation=skip_validation,
759                progress_tracker=progress_tracker,
760            )
761        except exc.PyAirbyteInternalError as ex:
762            progress_tracker.log_failure(exception=ex)
763            raise exc.AirbyteConnectorFailedError(
764                connector_name=self.name,
765                log_text=self._last_log_messages,
766            ) from ex
767        except Exception as ex:
768            progress_tracker.log_failure(exception=ex)
769            raise
770
771        progress_tracker.log_success()
772        return result
773
774    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
775        self,
776        cache: CacheBase,
777        *,
778        catalog_provider: CatalogProvider,
779        stream_names: list[str],
780        state_provider: StateProviderBase | None,
781        state_writer: StateWriterBase | None,
782        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
783        force_full_refresh: bool = False,
784        skip_validation: bool = False,
785        progress_tracker: ProgressTracker,
786    ) -> ReadResult:
787        """Internal read method."""
788        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
789            warnings.warn(
790                message=(
791                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
792                    "could result in data loss. "
793                    "To silence this warning, use the following: "
794                    'warnings.filterwarnings("ignore", '
795                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
796                ),
797                category=exc.PyAirbyteDataLossWarning,
798                stacklevel=1,
799            )
800        if isinstance(write_strategy, str):
801            try:
802                write_strategy = WriteStrategy(write_strategy)
803            except ValueError:
804                raise exc.PyAirbyteInputError(
805                    message="Invalid strategy",
806                    context={
807                        "write_strategy": write_strategy,
808                        "available_strategies": [s.value for s in WriteStrategy],
809                    },
810                ) from None
811
812        # Run optional validation step
813        if not skip_validation:
814            self.validate_config()
815
816        # Log incremental stream if incremental streams are known
817        if state_provider and state_provider.known_stream_names:
818            # Retrieve set of the known streams support which support incremental sync
819            incremental_streams = (
820                set(self._get_incremental_stream_names())
821                & state_provider.known_stream_names
822                & set(self.get_selected_streams())
823            )
824            if incremental_streams:
825                self._log_incremental_streams(incremental_streams=incremental_streams)
826
827        airbyte_message_iterator = AirbyteMessageIterator(
828            self._read_with_catalog(
829                catalog=catalog_provider.configured_catalog,
830                state=state_provider,
831                progress_tracker=progress_tracker,
832            )
833        )
834        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
835            stdin=airbyte_message_iterator,
836            catalog_provider=catalog_provider,
837            write_strategy=write_strategy,
838            state_writer=state_writer,
839            progress_tracker=progress_tracker,
840        )
841
842        # Flush the WAL, if applicable
843        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
844
845        return ReadResult(
846            source_name=self.name,
847            progress_tracker=progress_tracker,
848            processed_streams=stream_names,
849            cache=cache,
850        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False, cursor_key_overrides: dict[str, str] | None = None, primary_key_overrides: dict[str, str | list[str]] | None = None)
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67        cursor_key_overrides: dict[str, str] | None = None,
 68        primary_key_overrides: dict[str, str | list[str]] | None = None,
 69    ) -> None:
 70        """Initialize the source.
 71
 72        If config is provided, it will be validated against the spec if validate is True.
 73        """
 74        self._to_be_selected_streams: list[str] | str = []
 75        """Used to hold selection criteria before catalog is known."""
 76
 77        super().__init__(
 78            executor=executor,
 79            name=name,
 80            config=config,
 81            config_change_callback=config_change_callback,
 82            validate=validate,
 83        )
 84        self._config_dict: dict[str, Any] | None = None
 85        self._last_log_messages: list[str] = []
 86        self._discovered_catalog: AirbyteCatalog | None = None
 87        self._selected_stream_names: list[str] = []
 88
 89        self._cursor_key_overrides: dict[str, str] = {}
 90        """A mapping of lower-cased stream names to cursor key overrides."""
 91
 92        self._primary_key_overrides: dict[str, list[str]] = {}
 93        """A mapping of lower-cased stream names to primary key overrides."""
 94
 95        if config is not None:
 96            self.set_config(config, validate=validate)
 97        if streams is not None:
 98            self.select_streams(streams)
 99        if cursor_key_overrides is not None:
100            self.set_cursor_keys(**cursor_key_overrides)
101        if primary_key_overrides is not None:
102            self.set_primary_keys(**primary_key_overrides)

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'source'
def set_streams(self, streams: list[str]) -> None:
104    def set_streams(self, streams: list[str]) -> None:
105        """Deprecated. See select_streams()."""
106        warnings.warn(
107            "The 'set_streams' method is deprecated and will be removed in a future version. "
108            "Please use the 'select_streams' method instead.",
109            DeprecationWarning,
110            stacklevel=2,
111        )
112        self.select_streams(streams)

Deprecated. See select_streams().

def set_cursor_key(self, stream_name: str, cursor_key: str) -> None:
114    def set_cursor_key(
115        self,
116        stream_name: str,
117        cursor_key: str,
118    ) -> None:
119        """Set the cursor for a single stream.
120
121        Note:
122        - This does not unset previously set cursors.
123        - The cursor key must be a single field name.
124        - Not all streams support custom cursors. If a stream does not support custom cursors,
125          the override may be ignored.
126        - Stream names are case insensitive, while field names are case sensitive.
127        - Stream names are not validated by PyAirbyte. If the stream name
128          does not exist in the catalog, the override may be ignored.
129        """
130        self._cursor_key_overrides[stream_name.lower()] = cursor_key

Set the cursor for a single stream.

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_cursor_keys(self, **kwargs: str) -> None:
132    def set_cursor_keys(
133        self,
134        **kwargs: str,
135    ) -> None:
136        """Override the cursor key for one or more streams.
137
138        Usage:
139            source.set_cursor_keys(
140                stream1="cursor1",
141                stream2="cursor2",
142            )
143
144        Note:
145        - This does not unset previously set cursors.
146        - The cursor key must be a single field name.
147        - Not all streams support custom cursors. If a stream does not support custom cursors,
148          the override may be ignored.
149        - Stream names are case insensitive, while field names are case sensitive.
150        - Stream names are not validated by PyAirbyte. If the stream name
151          does not exist in the catalog, the override may be ignored.
152        """
153        self._cursor_key_overrides.update({k.lower(): v for k, v in kwargs.items()})

Override the cursor key for one or more streams.

Usage:

source.set_cursor_keys( stream1="cursor1", stream2="cursor2", )

Note:

  • This does not unset previously set cursors.
  • The cursor key must be a single field name.
  • Not all streams support custom cursors. If a stream does not support custom cursors, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_key(self, stream_name: str, primary_key: str | list[str]) -> None:
155    def set_primary_key(
156        self,
157        stream_name: str,
158        primary_key: str | list[str],
159    ) -> None:
160        """Set the primary key for a single stream.
161
162        Note:
163        - This does not unset previously set primary keys.
164        - The primary key must be a single field name or a list of field names.
165        - Not all streams support overriding primary keys. If a stream does not support overriding
166          primary keys, the override may be ignored.
167        - Stream names are case insensitive, while field names are case sensitive.
168        - Stream names are not validated by PyAirbyte. If the stream name
169          does not exist in the catalog, the override may be ignored.
170        """
171        self._primary_key_overrides[stream_name.lower()] = (
172            primary_key if isinstance(primary_key, list) else [primary_key]
173        )

Set the primary key for a single stream.

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def set_primary_keys(self, **kwargs: str | list[str]) -> None:
175    def set_primary_keys(
176        self,
177        **kwargs: str | list[str],
178    ) -> None:
179        """Override the primary keys for one or more streams.
180
181        This does not unset previously set primary keys.
182
183        Usage:
184            source.set_primary_keys(
185                stream1="pk1",
186                stream2=["pk1", "pk2"],
187            )
188
189        Note:
190        - This does not unset previously set primary keys.
191        - The primary key must be a single field name or a list of field names.
192        - Not all streams support overriding primary keys. If a stream does not support overriding
193          primary keys, the override may be ignored.
194        - Stream names are case insensitive, while field names are case sensitive.
195        - Stream names are not validated by PyAirbyte. If the stream name
196          does not exist in the catalog, the override may be ignored.
197        """
198        self._primary_key_overrides.update(
199            {k.lower(): v if isinstance(v, list) else [v] for k, v in kwargs.items()}
200        )

Override the primary keys for one or more streams.

This does not unset previously set primary keys.

Usage:

source.set_primary_keys( stream1="pk1", stream2=["pk1", "pk2"], )

Note:

  • This does not unset previously set primary keys.
  • The primary key must be a single field name or a list of field names.
  • Not all streams support overriding primary keys. If a stream does not support overriding primary keys, the override may be ignored.
  • Stream names are case insensitive, while field names are case sensitive.
  • Stream names are not validated by PyAirbyte. If the stream name does not exist in the catalog, the override may be ignored.
def select_all_streams(self) -> None:
214    def select_all_streams(self) -> None:
215        """Select all streams.
216
217        This is a more streamlined equivalent to:
218        > source.select_streams(source.get_available_streams()).
219        """
220        if self._config_dict is None:
221            self._to_be_selected_streams = "*"
222            self._log_warning_preselected_stream(self._to_be_selected_streams)
223            return
224
225        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
227    def select_streams(self, streams: str | list[str]) -> None:
228        """Select the stream names that should be read from the connector.
229
230        Args:
231            streams: A list of stream names to select. If set to "*", all streams will be selected.
232
233        Currently, if this is not set, all streams will be read.
234        """
235        if self._config_dict is None:
236            self._to_be_selected_streams = streams
237            self._log_warning_preselected_stream(streams)
238            return
239
240        if streams == "*":
241            self.select_all_streams()
242            return
243
244        if isinstance(streams, str):
245            # If a single stream is provided, convert it to a one-item list
246            streams = [streams]
247
248        available_streams = self.get_available_streams()
249        for stream in streams:
250            if stream not in available_streams:
251                raise exc.AirbyteStreamNotFoundError(
252                    stream_name=stream,
253                    connector_name=self.name,
254                    available_streams=available_streams,
255                )
256        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
258    def get_selected_streams(self) -> list[str]:
259        """Get the selected streams.
260
261        If no streams are selected, return an empty list.
262        """
263        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
265    def set_config(
266        self,
267        config: dict[str, Any],
268        *,
269        validate: bool = True,
270    ) -> None:
271        """Set the config for the connector.
272
273        If validate is True, raise an exception if the config fails validation.
274
275        If validate is False, validation will be deferred until check() or validate_config()
276        is called.
277        """
278        if validate:
279            self.validate_config(config)
280
281        self._config_dict = config
282
283        if self._to_be_selected_streams:
284            self.select_streams(self._to_be_selected_streams)
285            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_available_streams(self) -> list[str]:
305    def get_available_streams(self) -> list[str]:
306        """Get the available streams from the spec."""
307        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
339    @property
340    def config_spec(self) -> dict[str, Any]:
341        """Generate a configuration spec for this connector, as a JSON Schema definition.
342
343        This function generates a JSON Schema dictionary with configuration specs for the
344        current connector, as a dictionary.
345
346        Returns:
347            dict: The JSON Schema configuration spec as a dictionary.
348        """
349        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
351    def print_config_spec(
352        self,
353        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
354        *,
355        output_file: Path | str | None = None,
356    ) -> None:
357        """Print the configuration spec for this connector.
358
359        Args:
360            format: The format to print the spec in. Must be "yaml" or "json".
361            output_file: Optional. If set, the spec will be written to the given file path.
362                Otherwise, it will be printed to the console.
363        """
364        if format not in {"yaml", "json"}:
365            raise exc.PyAirbyteInputError(
366                message="Invalid format. Expected 'yaml' or 'json'",
367                input_value=format,
368            )
369        if isinstance(output_file, str):
370            output_file = Path(output_file)
371
372        if format == "yaml":
373            content = yaml.dump(self.config_spec, indent=2)
374        elif format == "json":
375            content = json.dumps(self.config_spec, indent=2)
376
377        if output_file:
378            output_file.write_text(content)
379            return
380
381        syntax_highlighted = Syntax(content, format)
382        print(syntax_highlighted)

Print the configuration spec for this connector.

Arguments:
  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
399    @property
400    def docs_url(self) -> str:
401        """Get the URL to the connector's documentation."""
402        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
403            "source-", ""
404        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
406    @property
407    def discovered_catalog(self) -> AirbyteCatalog:
408        """Get the raw catalog for the given streams.
409
410        If the catalog is not yet known, we call discover to get it.
411        """
412        if self._discovered_catalog is None:
413            self._discovered_catalog = self._discover()
414
415        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
417    @property
418    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
419        """Get the configured catalog for the given streams.
420
421        If the raw catalog is not yet known, we call discover to get it.
422
423        If no specific streams are selected, we return a catalog that syncs all available streams.
424
425        TODO: We should consider disabling by default the streams that the connector would
426        disable by default. (For instance, streams that require a premium license are sometimes
427        disabled by default within the connector.)
428        """
429        # Ensure discovered catalog is cached before we start
430        _ = self.discovered_catalog
431
432        # Filter for selected streams if set, otherwise use all available streams:
433        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
434        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
436    def get_configured_catalog(
437        self,
438        streams: Literal["*"] | list[str] | None = None,
439    ) -> ConfiguredAirbyteCatalog:
440        """Get a configured catalog for the given streams.
441
442        If no streams are provided, the selected streams will be used. If no streams are selected,
443        all available streams will be used.
444
445        If '*' is provided, all available streams will be used.
446        """
447        selected_streams: list[str] = []
448        if streams is None:
449            selected_streams = self._selected_stream_names or self.get_available_streams()
450        elif streams == "*":
451            selected_streams = self.get_available_streams()
452        elif isinstance(streams, list):
453            selected_streams = streams
454        else:
455            raise exc.PyAirbyteInputError(
456                message="Invalid streams argument.",
457                input_value=streams,
458            )
459
460        return ConfiguredAirbyteCatalog(
461            streams=[
462                ConfiguredAirbyteStream(
463                    stream=stream,
464                    destination_sync_mode=DestinationSyncMode.overwrite,
465                    sync_mode=SyncMode.incremental,
466                    primary_key=(
467                        [self._primary_key_overrides[stream.name.lower()]]
468                        if stream.name.lower() in self._primary_key_overrides
469                        else stream.source_defined_primary_key
470                    ),
471                    cursor_field=(
472                        [self._cursor_key_overrides[stream.name.lower()]]
473                        if stream.name.lower() in self._cursor_key_overrides
474                        else stream.default_cursor_field
475                    ),
476                    # These are unused in the current implementation:
477                    generation_id=None,
478                    minimum_generation_id=None,
479                    sync_id=None,
480                )
481                for stream in self.discovered_catalog.streams
482                if stream.name in selected_streams
483            ],
484        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
486    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
487        """Return the JSON Schema spec for the specified stream name."""
488        catalog: AirbyteCatalog = self.discovered_catalog
489        found: list[AirbyteStream] = [
490            stream for stream in catalog.streams if stream.name == stream_name
491        ]
492
493        if len(found) == 0:
494            raise exc.PyAirbyteInputError(
495                message="Stream name does not exist in catalog.",
496                input_value=stream_name,
497            )
498
499        if len(found) > 1:
500            raise exc.PyAirbyteInternalError(
501                message="Duplicate streams found with the same name.",
502                context={
503                    "found_streams": found,
504                },
505            )
506
507        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
509    def get_records(
510        self,
511        stream: str,
512        *,
513        normalize_field_names: bool = False,
514        prune_undeclared_fields: bool = True,
515    ) -> LazyDataset:
516        """Read a stream from the connector.
517
518        Args:
519            stream: The name of the stream to read.
520            normalize_field_names: When `True`, field names will be normalized to lower case, with
521                special characters removed. This matches the behavior of PyAirbyte caches and most
522                Airbyte destinations.
523            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
524                which generally matches the behavior of PyAirbyte caches and most Airbyte
525                destinations, specifically when you expect the catalog may be stale. You can disable
526                this to keep all fields in the records.
527
528        This involves the following steps:
529        * Call discover to get the catalog
530        * Generate a configured catalog that syncs the given stream in full_refresh mode
531        * Write the configured catalog and the config to a temporary file
532        * execute the connector with read --config <config_file> --catalog <catalog_file>
533        * Listen to the messages and return the first AirbyteRecordMessages that come along.
534        * Make sure the subprocess is killed when the function returns.
535        """
536        configured_catalog = self.get_configured_catalog(streams=[stream])
537        if len(configured_catalog.streams) == 0:
538            raise exc.PyAirbyteInputError(
539                message="Requested stream does not exist.",
540                context={
541                    "stream": stream,
542                    "available_streams": self.get_available_streams(),
543                    "connector_name": self.name,
544                },
545            ) from KeyError(stream)
546
547        configured_stream = configured_catalog.streams[0]
548
549        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
550            yield from records
551
552        stream_record_handler = StreamRecordHandler(
553            json_schema=self.get_stream_json_schema(stream),
554            prune_extra_fields=prune_undeclared_fields,
555            normalize_keys=normalize_field_names,
556        )
557
558        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
559        progress_tracker = ProgressTracker(
560            ProgressStyle.PLAIN,
561            source=self,
562            cache=None,
563            destination=None,
564            expected_streams=[stream],
565        )
566
567        iterator: Iterator[dict[str, Any]] = (
568            StreamRecord.from_record_message(
569                record_message=record.record,
570                stream_record_handler=stream_record_handler,
571            )
572            for record in self._read_with_catalog(
573                catalog=configured_catalog,
574                progress_tracker=progress_tracker,
575            )
576            if record.record
577        )
578        progress_tracker.log_success()
579        return LazyDataset(
580            iterator,
581            stream_metadata=configured_stream,
582        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
584    def get_documents(
585        self,
586        stream: str,
587        title_property: str | None = None,
588        content_properties: list[str] | None = None,
589        metadata_properties: list[str] | None = None,
590        *,
591        render_metadata: bool = False,
592    ) -> Iterable[Document]:
593        """Read a stream from the connector and return the records as documents.
594
595        If metadata_properties is not set, all properties that are not content will be added to
596        the metadata.
597
598        If render_metadata is True, metadata will be rendered in the document, as well as the
599        the main content.
600        """
601        return self.get_records(stream).to_documents(
602            title_property=title_property,
603            content_properties=content_properties,
604            metadata_properties=metadata_properties,
605            render_metadata=render_metadata,
606        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
696    def read(
697        self,
698        cache: CacheBase | None = None,
699        *,
700        streams: str | list[str] | None = None,
701        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
702        force_full_refresh: bool = False,
703        skip_validation: bool = False,
704    ) -> ReadResult:
705        """Read from the connector and write to the cache.
706
707        Args:
708            cache: The cache to write to. If not set, a default cache will be used.
709            streams: Optional if already set. A list of stream names to select for reading. If set
710                to "*", all streams will be selected.
711            write_strategy: The strategy to use when writing to the cache. If a string, it must be
712                one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one
713                of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or
714                WriteStrategy.AUTO.
715            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
716                streams will be read in incremental mode if supported by the connector. This option
717                must be True when using the "replace" strategy.
718            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
719                running the connector. This can be helpful in debugging, when you want to send
720                configurations to the connector that otherwise might be rejected by JSON Schema
721                validation rules.
722        """
723        cache = cache or get_default_cache()
724        progress_tracker = ProgressTracker(
725            source=self,
726            cache=cache,
727            destination=None,
728            expected_streams=None,  # Will be set later
729        )
730
731        # Set up state provider if not in full refresh mode
732        if force_full_refresh:
733            state_provider: StateProviderBase | None = None
734        else:
735            state_provider = cache.get_state_provider(
736                source_name=self._name,
737            )
738        state_writer = cache.get_state_writer(source_name=self._name)
739
740        if streams:
741            self.select_streams(streams)
742
743        if not self._selected_stream_names:
744            raise exc.PyAirbyteNoStreamsSelectedError(
745                connector_name=self.name,
746                available_streams=self.get_available_streams(),
747            )
748
749        try:
750            result = self._read_to_cache(
751                cache=cache,
752                catalog_provider=CatalogProvider(self.configured_catalog),
753                stream_names=self._selected_stream_names,
754                state_provider=state_provider,
755                state_writer=state_writer,
756                write_strategy=write_strategy,
757                force_full_refresh=force_full_refresh,
758                skip_validation=skip_validation,
759                progress_tracker=progress_tracker,
760            )
761        except exc.PyAirbyteInternalError as ex:
762            progress_tracker.log_failure(exception=ex)
763            raise exc.AirbyteConnectorFailedError(
764                connector_name=self.name,
765                log_text=self._last_log_messages,
766            ) from ex
767        except Exception as ex:
768            progress_tracker.log_failure(exception=ex)
769            raise
770
771        progress_tracker.log_success()
772        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "merge", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.MERGE, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
get_config
config_hash
validate_config
connector_version
check
install
uninstall
class ConnectorMetadata(pydantic.main.BaseModel):
 73class ConnectorMetadata(BaseModel):
 74    """Metadata for a connector."""
 75
 76    name: str
 77    """Connector name. For example, "source-google-sheets"."""
 78
 79    latest_available_version: str | None
 80    """The latest available version of the connector."""
 81
 82    pypi_package_name: str | None
 83    """The name of the PyPI package for the connector, if it exists."""
 84
 85    language: Language | None
 86    """The language of the connector."""
 87
 88    install_types: set[InstallType]
 89    """The supported install types for the connector."""
 90
 91    suggested_streams: list[str] | None = None
 92    """A list of suggested streams for the connector, if available."""
 93
 94    @property
 95    def default_install_type(self) -> InstallType:
 96        """Return the default install type for the connector."""
 97        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 98            return InstallType.YAML
 99
100        if InstallType.PYTHON in self.install_types:
101            return InstallType.PYTHON
102
103        # Else: Java or Docker
104        return InstallType.DOCKER

Metadata for a connector.

name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

The language of the connector.

The supported install types for the connector.

suggested_streams: list[str] | None

A list of suggested streams for the connector, if available.

default_install_type: airbyte.sources.registry.InstallType
 94    @property
 95    def default_install_type(self) -> InstallType:
 96        """Return the default install type for the connector."""
 97        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
 98            return InstallType.YAML
 99
100        if InstallType.PYTHON in self.install_types:
101            return InstallType.PYTHON
102
103        # Else: Java or Docker
104        return InstallType.DOCKER

Return the default install type for the connector.

model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields