airbyte.sources

Sources connectors module for PyAirbyte.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Sources connectors module for PyAirbyte."""
 3
 4from __future__ import annotations
 5
 6from airbyte.sources import base, util
 7from airbyte.sources.base import Source
 8from airbyte.sources.registry import (
 9    ConnectorMetadata,
10    get_available_connectors,
11    get_connector_metadata,
12)
13from airbyte.sources.util import (
14    get_benchmark_source,
15    get_source,
16)
17
18
19__all__ = [
20    # Submodules
21    "base",
22    "util",
23    # Factories
24    "get_source",
25    "get_benchmark_source",
26    # Helper Functions
27    "get_available_connectors",
28    "get_connector_metadata",
29    # Classes
30    "Source",
31    "ConnectorMetadata",
32]
def get_source( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: bool | str | None = None, use_host_network: bool = False, source_manifest: bool | dict | pathlib.Path | str | None = None, install_if_missing: bool = True, install_root: pathlib.Path | None = None) -> Source:
 48def get_source(  # noqa: PLR0913 # Too many arguments
 49    name: str,
 50    config: dict[str, Any] | None = None,
 51    *,
 52    config_change_callback: ConfigChangeCallback | None = None,
 53    streams: str | list[str] | None = None,
 54    version: str | None = None,
 55    pip_url: str | None = None,
 56    local_executable: Path | str | None = None,
 57    docker_image: bool | str | None = None,
 58    use_host_network: bool = False,
 59    source_manifest: bool | dict | Path | str | None = None,
 60    install_if_missing: bool = True,
 61    install_root: Path | None = None,
 62) -> Source:
 63    """Get a connector by name and version.
 64
 65    If an explicit install or execution method is requested (e.g. `local_executable`,
 66    `docker_image`, `pip_url`, `source_manifest`), the connector will be executed using this method.
 67
 68    Otherwise, an appropriate method will be selected based on the available connector metadata:
 69    1. If the connector is registered and has a YAML source manifest is available, the YAML manifest
 70       will be downloaded and used to to execute the connector.
 71    2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
 72    3. Else, if the connector is registered and has a Docker image, and if Docker is available, it
 73       will be executed using Docker.
 74
 75    Args:
 76        name: connector name
 77        config: connector config - if not provided, you need to set it later via the set_config
 78            method.
 79        config_change_callback: callback function to be called when the connector config changes.
 80        streams: list of stream names to select for reading. If set to "*", all streams will be
 81            selected. If not provided, you can set it later via the `select_streams()` or
 82            `select_all_streams()` method.
 83        version: connector version - if not provided, the currently installed version will be used.
 84            If no version is installed, the latest available version will be used. The version can
 85            also be set to "latest" to force the use of the latest available version.
 86        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
 87            connector name.
 88        local_executable: If set, the connector will be assumed to already be installed and will be
 89            executed using this path or executable name. Otherwise, the connector will be installed
 90            automatically in a virtual environment.
 91        docker_image: If set, the connector will be executed using Docker. You can specify `True`
 92            to use the default image for the connector, or you can specify a custom image name.
 93            If `version` is specified and your image name does not already contain a tag
 94            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
 95        use_host_network: If set, along with docker_image, the connector will be executed using
 96            the host network. This is useful for connectors that need to access resources on
 97            the host machine, such as a local database. This parameter is ignored when
 98            `docker_image` is not set.
 99        source_manifest: If set, the connector will be executed based on a declarative YAML
100            source definition. This input can be `True` to attempt to auto-download a YAML spec,
101            `dict` to accept a Python dictionary as the manifest, `Path` to pull a manifest from
102            the local file system, or `str` to pull the definition from a web URL.
103        install_if_missing: Whether to install the connector if it is not available locally. This
104            parameter is ignored when `local_executable` or `source_manifest` are set.
105        install_root: (Optional.) The root directory where the virtual environment will be
106            created. If not provided, the current working directory will be used.
107    """
108    return Source(
109        name=name,
110        config=config,
111        config_change_callback=config_change_callback,
112        streams=streams,
113        executor=get_connector_executor(
114            name=name,
115            version=version,
116            pip_url=pip_url,
117            local_executable=local_executable,
118            docker_image=docker_image,
119            use_host_network=use_host_network,
120            source_manifest=source_manifest,
121            install_if_missing=install_if_missing,
122            install_root=install_root,
123        ),
124    )

Get a connector by name and version.

If an explicit install or execution method is requested (e.g. local_executable, docker_image, pip_url, source_manifest), the connector will be executed using this method.

Otherwise, an appropriate method will be selected based on the available connector metadata:

  1. If the connector is registered and has a YAML source manifest is available, the YAML manifest will be downloaded and used to to execute the connector.
  2. Else, if the connector is registered and has a PyPI package, it will be installed via pip.
  3. Else, if the connector is registered and has a Docker image, and if Docker is available, it will be executed using Docker.
Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • source_manifest: If set, the connector will be executed based on a declarative YAML source definition. This input can be True to attempt to auto-download a YAML spec, dict to accept a Python dictionary as the manifest, Path to pull a manifest from the local file system, or str to pull the definition from a web URL.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable or source_manifest are set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
def get_benchmark_source(num_records: int | str = '5e5') -> Source:
127def get_benchmark_source(
128    num_records: int | str = "5e5",
129) -> Source:
130    """Get a source for benchmarking.
131
132    This source will generate dummy records for performance benchmarking purposes.
133    You can specify the number of records to generate using the `num_records` parameter.
134    The `num_records` parameter can be an integer or a string in scientific notation.
135    For example, `"5e6"` will generate 5 million records. If underscores are providing
136    within a numeric a string, they will be ignored.
137
138    Args:
139        num_records (int | str): The number of records to generate. Defaults to "5e5", or
140            500,000 records.
141            Can be an integer (`1000`) or a string in scientific notation.
142            For example, `"5e6"` will generate 5 million records.
143
144    Returns:
145        Source: The source object for benchmarking.
146    """
147    if isinstance(num_records, str):
148        try:
149            num_records = int(Decimal(num_records.replace("_", "")))
150        except InvalidOperation as ex:
151            raise PyAirbyteInputError(
152                message="Invalid number format.",
153                original_exception=ex,
154                input_value=str(num_records),
155            ) from None
156
157    return get_source(
158        name="source-e2e-test",
159        docker_image=True,
160        # docker_image="airbyte/source-e2e-test:latest",
161        config={
162            "type": "BENCHMARK",
163            "schema": "FIVE_STRING_COLUMNS",
164            "terminationCondition": {
165                "type": "MAX_RECORDS",
166                "max": num_records,
167            },
168        },
169        streams="*",
170    )

Get a source for benchmarking.

This source will generate dummy records for performance benchmarking purposes. You can specify the number of records to generate using the num_records parameter. The num_records parameter can be an integer or a string in scientific notation. For example, "5e6" will generate 5 million records. If underscores are providing within a numeric a string, they will be ignored.

Arguments:
  • num_records (int | str): The number of records to generate. Defaults to "5e5", or 500,000 records. Can be an integer (1000) or a string in scientific notation. For example, "5e6" will generate 5 million records.
Returns:

Source: The source object for benchmarking.

def get_available_connectors( install_type: airbyte.sources.registry.InstallType | str | None = None) -> list[str]:
311def get_available_connectors(install_type: InstallType | str | None = None) -> list[str]:
312    """Return a list of all available connectors.
313
314    Connectors will be returned in alphabetical order, with the standard prefix "source-".
315    """
316    if install_type is None:
317        # No install type specified. Filter for whatever is runnable.
318        if is_docker_installed():
319            logging.info("Docker is detected. Returning all connectors.")
320            # If Docker is available, return all connectors.
321            return sorted(conn.name for conn in _get_registry_cache().values())
322
323        logging.info("Docker was not detected. Returning only Python and Manifest-only connectors.")
324
325        # If Docker is not available, return only Python and Manifest-based connectors.
326        return sorted(
327            conn.name
328            for conn in _get_registry_cache().values()
329            if conn.language in {Language.PYTHON, Language.MANIFEST_ONLY}
330        )
331
332    if not isinstance(install_type, InstallType):
333        install_type = InstallType(install_type)
334
335    if install_type == InstallType.PYTHON:
336        return sorted(
337            conn.name
338            for conn in _get_registry_cache().values()
339            if conn.pypi_package_name is not None
340        )
341
342    if install_type == InstallType.JAVA:
343        warnings.warn(
344            message="Java connectors are not yet supported.",
345            stacklevel=2,
346        )
347        return sorted(
348            conn.name for conn in _get_registry_cache().values() if conn.language == Language.JAVA
349        )
350
351    if install_type == InstallType.DOCKER:
352        return sorted(conn.name for conn in _get_registry_cache().values())
353
354    if install_type == InstallType.YAML:
355        return sorted(
356            conn.name
357            for conn in _get_registry_cache().values()
358            if InstallType.YAML in conn.install_types
359            and conn.name not in _LOWCODE_CONNECTORS_EXCLUDED
360        )
361
362    # pragma: no cover  # Should never be reached.
363    raise exc.PyAirbyteInputError(
364        message="Invalid install type.",
365        context={
366            "install_type": install_type,
367        },
368    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

def get_connector_metadata(name: str) -> None | ConnectorMetadata:
281def get_connector_metadata(name: str) -> None | ConnectorMetadata:
282    """Check the cache for the connector.
283
284    If the cache is empty, populate by calling update_cache.
285    """
286    registry_url = _get_registry_url()
287
288    if _is_registry_disabled(registry_url):
289        return None
290
291    cache = copy(_get_registry_cache())
292
293    if not cache:
294        raise exc.PyAirbyteInternalError(
295            message="Connector registry could not be loaded.",
296            context={
297                "registry_url": _get_registry_url(),
298            },
299        )
300    if name not in cache:
301        raise exc.AirbyteConnectorNotRegisteredError(
302            connector_name=name,
303            context={
304                "registry_url": _get_registry_url(),
305                "available_connectors": get_available_connectors(),
306            },
307        )
308    return cache[name]

Check the cache for the connector.

If the cache is empty, populate by calling update_cache.

class Source(airbyte._connector_base.ConnectorBase):
 53class Source(ConnectorBase):
 54    """A class representing a source that can be called."""
 55
 56    connector_type: Literal["source"] = "source"
 57
 58    def __init__(
 59        self,
 60        executor: Executor,
 61        name: str,
 62        config: dict[str, Any] | None = None,
 63        *,
 64        config_change_callback: ConfigChangeCallback | None = None,
 65        streams: str | list[str] | None = None,
 66        validate: bool = False,
 67    ) -> None:
 68        """Initialize the source.
 69
 70        If config is provided, it will be validated against the spec if validate is True.
 71        """
 72        self._to_be_selected_streams: list[str] | str = []
 73        """Used to hold selection criteria before catalog is known."""
 74
 75        super().__init__(
 76            executor=executor,
 77            name=name,
 78            config=config,
 79            config_change_callback=config_change_callback,
 80            validate=validate,
 81        )
 82        self._config_dict: dict[str, Any] | None = None
 83        self._last_log_messages: list[str] = []
 84        self._discovered_catalog: AirbyteCatalog | None = None
 85        self._selected_stream_names: list[str] = []
 86        if config is not None:
 87            self.set_config(config, validate=validate)
 88        if streams is not None:
 89            self.select_streams(streams)
 90
 91        self._deployed_api_root: str | None = None
 92        self._deployed_workspace_id: str | None = None
 93        self._deployed_source_id: str | None = None
 94
 95    def set_streams(self, streams: list[str]) -> None:
 96        """Deprecated. See select_streams()."""
 97        warnings.warn(
 98            "The 'set_streams' method is deprecated and will be removed in a future version. "
 99            "Please use the 'select_streams' method instead.",
100            DeprecationWarning,
101            stacklevel=2,
102        )
103        self.select_streams(streams)
104
105    def _log_warning_preselected_stream(self, streams: str | list[str]) -> None:
106        """Logs a warning message indicating stream selection which are not selected yet."""
107        if streams == "*":
108            print(
109                "Warning: Config is not set yet. All streams will be selected after config is set."
110            )
111        else:
112            print(
113                "Warning: Config is not set yet. "
114                f"Streams to be selected after config is set: {streams}"
115            )
116
117    def select_all_streams(self) -> None:
118        """Select all streams.
119
120        This is a more streamlined equivalent to:
121        > source.select_streams(source.get_available_streams()).
122        """
123        if self._config_dict is None:
124            self._to_be_selected_streams = "*"
125            self._log_warning_preselected_stream(self._to_be_selected_streams)
126            return
127
128        self._selected_stream_names = self.get_available_streams()
129
130    def select_streams(self, streams: str | list[str]) -> None:
131        """Select the stream names that should be read from the connector.
132
133        Args:
134            streams: A list of stream names to select. If set to "*", all streams will be selected.
135
136        Currently, if this is not set, all streams will be read.
137        """
138        if self._config_dict is None:
139            self._to_be_selected_streams = streams
140            self._log_warning_preselected_stream(streams)
141            return
142
143        if streams == "*":
144            self.select_all_streams()
145            return
146
147        if isinstance(streams, str):
148            # If a single stream is provided, convert it to a one-item list
149            streams = [streams]
150
151        available_streams = self.get_available_streams()
152        for stream in streams:
153            if stream not in available_streams:
154                raise exc.AirbyteStreamNotFoundError(
155                    stream_name=stream,
156                    connector_name=self.name,
157                    available_streams=available_streams,
158                )
159        self._selected_stream_names = streams
160
161    def get_selected_streams(self) -> list[str]:
162        """Get the selected streams.
163
164        If no streams are selected, return an empty list.
165        """
166        return self._selected_stream_names
167
168    def set_config(
169        self,
170        config: dict[str, Any],
171        *,
172        validate: bool = True,
173    ) -> None:
174        """Set the config for the connector.
175
176        If validate is True, raise an exception if the config fails validation.
177
178        If validate is False, validation will be deferred until check() or validate_config()
179        is called.
180        """
181        if validate:
182            self.validate_config(config)
183
184        self._config_dict = config
185
186        if self._to_be_selected_streams:
187            self.select_streams(self._to_be_selected_streams)
188            self._to_be_selected_streams = []
189
190    def get_config(self) -> dict[str, Any]:
191        """Get the config for the connector."""
192        return self._config
193
194    @property
195    def _config(self) -> dict[str, Any]:
196        if self._config_dict is None:
197            raise exc.AirbyteConnectorConfigurationMissingError(
198                connector_name=self.name,
199                guidance="Provide via get_source() or set_config()",
200            )
201        return self._config_dict
202
203    def _discover(self) -> AirbyteCatalog:
204        """Call discover on the connector.
205
206        This involves the following steps:
207        - Write the config to a temporary file
208        - execute the connector with discover --config <config_file>
209        - Listen to the messages and return the first AirbyteCatalog that comes along.
210        - Make sure the subprocess is killed when the function returns.
211        """
212        with as_temp_files([self._config]) as [config_file]:
213            for msg in self._execute(["discover", "--config", config_file]):
214                if msg.type == Type.CATALOG and msg.catalog:
215                    return msg.catalog
216            raise exc.AirbyteConnectorMissingCatalogError(
217                connector_name=self.name,
218                log_text=self._last_log_messages,
219            )
220
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]
224
225    def _get_incremental_stream_names(self) -> list[str]:
226        """Get the name of streams that support incremental sync."""
227        return [
228            stream.name
229            for stream in self.discovered_catalog.streams
230            if SyncMode.incremental in stream.supported_sync_modes
231        ]
232
233    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
234        """Call spec on the connector.
235
236        This involves the following steps:
237        * execute the connector with spec
238        * Listen to the messages and return the first AirbyteCatalog that comes along.
239        * Make sure the subprocess is killed when the function returns.
240        """
241        if force_refresh or self._spec is None:
242            for msg in self._execute(["spec"]):
243                if msg.type == Type.SPEC and msg.spec:
244                    self._spec = msg.spec
245                    break
246
247        if self._spec:
248            return self._spec
249
250        raise exc.AirbyteConnectorMissingSpecError(
251            connector_name=self.name,
252            log_text=self._last_log_messages,
253        )
254
255    @property
256    def config_spec(self) -> dict[str, Any]:
257        """Generate a configuration spec for this connector, as a JSON Schema definition.
258
259        This function generates a JSON Schema dictionary with configuration specs for the
260        current connector, as a dictionary.
261
262        Returns:
263            dict: The JSON Schema configuration spec as a dictionary.
264        """
265        return self._get_spec(force_refresh=True).connectionSpecification
266
267    def print_config_spec(
268        self,
269        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
270        *,
271        output_file: Path | str | None = None,
272    ) -> None:
273        """Print the configuration spec for this connector.
274
275        Args:
276            format: The format to print the spec in. Must be "yaml" or "json".
277            output_file: Optional. If set, the spec will be written to the given file path.
278                Otherwise, it will be printed to the console.
279        """
280        if format not in {"yaml", "json"}:
281            raise exc.PyAirbyteInputError(
282                message="Invalid format. Expected 'yaml' or 'json'",
283                input_value=format,
284            )
285        if isinstance(output_file, str):
286            output_file = Path(output_file)
287
288        if format == "yaml":
289            content = yaml.dump(self.config_spec, indent=2)
290        elif format == "json":
291            content = json.dumps(self.config_spec, indent=2)
292
293        if output_file:
294            output_file.write_text(content)
295            return
296
297        syntax_highlighted = Syntax(content, format)
298        print(syntax_highlighted)
299
300    @property
301    def _yaml_spec(self) -> str:
302        """Get the spec as a yaml string.
303
304        For now, the primary use case is for writing and debugging a valid config for a source.
305
306        This is private for now because we probably want better polish before exposing this
307        as a stable interface. This will also get easier when we have docs links with this info
308        for each connector.
309        """
310        spec_obj: ConnectorSpecification = self._get_spec()
311        spec_dict: dict[str, Any] = spec_obj.model_dump(exclude_unset=True)
312        # convert to a yaml string
313        return yaml.dump(spec_dict)
314
315    @property
316    def docs_url(self) -> str:
317        """Get the URL to the connector's documentation."""
318        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
319            "source-", ""
320        )
321
322    @property
323    def discovered_catalog(self) -> AirbyteCatalog:
324        """Get the raw catalog for the given streams.
325
326        If the catalog is not yet known, we call discover to get it.
327        """
328        if self._discovered_catalog is None:
329            self._discovered_catalog = self._discover()
330
331        return self._discovered_catalog
332
333    @property
334    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
335        """Get the configured catalog for the given streams.
336
337        If the raw catalog is not yet known, we call discover to get it.
338
339        If no specific streams are selected, we return a catalog that syncs all available streams.
340
341        TODO: We should consider disabling by default the streams that the connector would
342        disable by default. (For instance, streams that require a premium license are sometimes
343        disabled by default within the connector.)
344        """
345        # Ensure discovered catalog is cached before we start
346        _ = self.discovered_catalog
347
348        # Filter for selected streams if set, otherwise use all available streams:
349        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
350        return self.get_configured_catalog(streams=streams_filter)
351
352    def get_configured_catalog(
353        self,
354        streams: Literal["*"] | list[str] | None = None,
355    ) -> ConfiguredAirbyteCatalog:
356        """Get a configured catalog for the given streams.
357
358        If no streams are provided, the selected streams will be used. If no streams are selected,
359        all available streams will be used.
360
361        If '*' is provided, all available streams will be used.
362        """
363        selected_streams: list[str] = []
364        if streams is None:
365            selected_streams = self._selected_stream_names or self.get_available_streams()
366        elif streams == "*":
367            selected_streams = self.get_available_streams()
368        elif isinstance(streams, list):
369            selected_streams = streams
370        else:
371            raise exc.PyAirbyteInputError(
372                message="Invalid streams argument.",
373                input_value=streams,
374            )
375
376        return ConfiguredAirbyteCatalog(
377            streams=[
378                ConfiguredAirbyteStream(
379                    stream=stream,
380                    destination_sync_mode=DestinationSyncMode.overwrite,
381                    primary_key=stream.source_defined_primary_key,
382                    sync_mode=SyncMode.incremental,
383                )
384                for stream in self.discovered_catalog.streams
385                if stream.name in selected_streams
386            ],
387        )
388
389    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
390        """Return the JSON Schema spec for the specified stream name."""
391        catalog: AirbyteCatalog = self.discovered_catalog
392        found: list[AirbyteStream] = [
393            stream for stream in catalog.streams if stream.name == stream_name
394        ]
395
396        if len(found) == 0:
397            raise exc.PyAirbyteInputError(
398                message="Stream name does not exist in catalog.",
399                input_value=stream_name,
400            )
401
402        if len(found) > 1:
403            raise exc.PyAirbyteInternalError(
404                message="Duplicate streams found with the same name.",
405                context={
406                    "found_streams": found,
407                },
408            )
409
410        return found[0].json_schema
411
412    def get_records(
413        self,
414        stream: str,
415        *,
416        normalize_field_names: bool = False,
417        prune_undeclared_fields: bool = True,
418    ) -> LazyDataset:
419        """Read a stream from the connector.
420
421        Args:
422            stream: The name of the stream to read.
423            normalize_field_names: When `True`, field names will be normalized to lower case, with
424                special characters removed. This matches the behavior of PyAirbyte caches and most
425                Airbyte destinations.
426            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
427                which generally matches the behavior of PyAirbyte caches and most Airbyte
428                destinations, specifically when you expect the catalog may be stale. You can disable
429                this to keep all fields in the records.
430
431        This involves the following steps:
432        * Call discover to get the catalog
433        * Generate a configured catalog that syncs the given stream in full_refresh mode
434        * Write the configured catalog and the config to a temporary file
435        * execute the connector with read --config <config_file> --catalog <catalog_file>
436        * Listen to the messages and return the first AirbyteRecordMessages that come along.
437        * Make sure the subprocess is killed when the function returns.
438        """
439        discovered_catalog: AirbyteCatalog = self.discovered_catalog
440        configured_catalog = ConfiguredAirbyteCatalog(
441            streams=[
442                ConfiguredAirbyteStream(
443                    stream=s,
444                    sync_mode=SyncMode.full_refresh,
445                    destination_sync_mode=DestinationSyncMode.overwrite,
446                )
447                for s in discovered_catalog.streams
448                if s.name == stream
449            ],
450        )
451        if len(configured_catalog.streams) == 0:
452            raise exc.PyAirbyteInputError(
453                message="Requested stream does not exist.",
454                context={
455                    "stream": stream,
456                    "available_streams": self.get_available_streams(),
457                    "connector_name": self.name,
458                },
459            ) from KeyError(stream)
460
461        configured_stream = configured_catalog.streams[0]
462
463        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
464            yield from records
465
466        stream_record_handler = StreamRecordHandler(
467            json_schema=self.get_stream_json_schema(stream),
468            prune_extra_fields=prune_undeclared_fields,
469            normalize_keys=normalize_field_names,
470        )
471
472        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
473        progress_tracker = ProgressTracker(
474            ProgressStyle.PLAIN,
475            source=self,
476            cache=None,
477            destination=None,
478            expected_streams=[stream],
479        )
480
481        iterator: Iterator[dict[str, Any]] = (
482            StreamRecord.from_record_message(
483                record_message=record.record,
484                stream_record_handler=stream_record_handler,
485            )
486            for record in self._read_with_catalog(
487                catalog=configured_catalog,
488                progress_tracker=progress_tracker,
489            )
490            if record.record
491        )
492        progress_tracker.log_success()
493        return LazyDataset(
494            iterator,
495            stream_metadata=configured_stream,
496        )
497
498    def get_documents(
499        self,
500        stream: str,
501        title_property: str | None = None,
502        content_properties: list[str] | None = None,
503        metadata_properties: list[str] | None = None,
504        *,
505        render_metadata: bool = False,
506    ) -> Iterable[Document]:
507        """Read a stream from the connector and return the records as documents.
508
509        If metadata_properties is not set, all properties that are not content will be added to
510        the metadata.
511
512        If render_metadata is True, metadata will be rendered in the document, as well as the
513        the main content.
514        """
515        return self.get_records(stream).to_documents(
516            title_property=title_property,
517            content_properties=content_properties,
518            metadata_properties=metadata_properties,
519            render_metadata=render_metadata,
520        )
521
522    def _get_airbyte_message_iterator(
523        self,
524        *,
525        streams: Literal["*"] | list[str] | None = None,
526        state_provider: StateProviderBase | None = None,
527        progress_tracker: ProgressTracker,
528        force_full_refresh: bool = False,
529    ) -> AirbyteMessageIterator:
530        """Get an AirbyteMessageIterator for this source."""
531        return AirbyteMessageIterator(
532            self._read_with_catalog(
533                catalog=self.get_configured_catalog(streams=streams),
534                state=state_provider if not force_full_refresh else None,
535                progress_tracker=progress_tracker,
536            )
537        )
538
539    def _read_with_catalog(
540        self,
541        catalog: ConfiguredAirbyteCatalog,
542        progress_tracker: ProgressTracker,
543        state: StateProviderBase | None = None,
544    ) -> Generator[AirbyteMessage, None, None]:
545        """Call read on the connector.
546
547        This involves the following steps:
548        * Write the config to a temporary file
549        * execute the connector with read --config <config_file> --catalog <catalog_file>
550        * Listen to the messages and return the AirbyteRecordMessages that come along.
551        * Send out telemetry on the performed sync (with information about which source was used and
552          the type of the cache)
553        """
554        with as_temp_files(
555            [
556                self._config,
557                catalog.model_dump_json(),
558                state.to_state_input_file_text() if state else "[]",
559            ]
560        ) as [
561            config_file,
562            catalog_file,
563            state_file,
564        ]:
565            message_generator = self._execute(
566                [
567                    "read",
568                    "--config",
569                    config_file,
570                    "--catalog",
571                    catalog_file,
572                    "--state",
573                    state_file,
574                ],
575                progress_tracker=progress_tracker,
576            )
577            yield from progress_tracker.tally_records_read(message_generator)
578        progress_tracker.log_read_complete()
579
580    def _peek_airbyte_message(
581        self,
582        message: AirbyteMessage,
583        *,
584        raise_on_error: bool = True,
585    ) -> None:
586        """Process an Airbyte message.
587
588        This method handles reading Airbyte messages and taking action, if needed, based on the
589        message type. For instance, log messages are logged, records are tallied, and errors are
590        raised as exceptions if `raise_on_error` is True.
591
592        Raises:
593            AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
594        """
595        super()._peek_airbyte_message(message, raise_on_error=raise_on_error)
596
597    def _log_incremental_streams(
598        self,
599        *,
600        incremental_streams: set[str] | None = None,
601    ) -> None:
602        """Log the streams which are using incremental sync mode."""
603        log_message = (
604            "The following streams are currently using incremental sync:\n"
605            f"{incremental_streams}\n"
606            "To perform a full refresh, set 'force_full_refresh=True' in 'airbyte.read()' method."
607        )
608        print(log_message)
609
610    def read(
611        self,
612        cache: CacheBase | None = None,
613        *,
614        streams: str | list[str] | None = None,
615        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
616        force_full_refresh: bool = False,
617        skip_validation: bool = False,
618    ) -> ReadResult:
619        """Read from the connector and write to the cache.
620
621        Args:
622            cache: The cache to write to. If not set, a default cache will be used.
623            streams: Optional if already set. A list of stream names to select for reading. If set
624                to "*", all streams will be selected.
625            write_strategy: The strategy to use when writing to the cache. If a string, it must be
626                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
627                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
628                WriteStrategy.AUTO.
629            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
630                streams will be read in incremental mode if supported by the connector. This option
631                must be True when using the "replace" strategy.
632            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
633                running the connector. This can be helpful in debugging, when you want to send
634                configurations to the connector that otherwise might be rejected by JSON Schema
635                validation rules.
636        """
637        cache = cache or get_default_cache()
638        progress_tracker = ProgressTracker(
639            source=self,
640            cache=cache,
641            destination=None,
642            expected_streams=None,  # Will be set later
643        )
644
645        # Set up state provider if not in full refresh mode
646        if force_full_refresh:
647            state_provider: StateProviderBase | None = None
648        else:
649            state_provider = cache.get_state_provider(
650                source_name=self._name,
651            )
652        state_writer = cache.get_state_writer(source_name=self._name)
653
654        if streams:
655            self.select_streams(streams)
656
657        if not self._selected_stream_names:
658            raise exc.PyAirbyteNoStreamsSelectedError(
659                connector_name=self.name,
660                available_streams=self.get_available_streams(),
661            )
662
663        try:
664            result = self._read_to_cache(
665                cache=cache,
666                catalog_provider=CatalogProvider(self.configured_catalog),
667                stream_names=self._selected_stream_names,
668                state_provider=state_provider,
669                state_writer=state_writer,
670                write_strategy=write_strategy,
671                force_full_refresh=force_full_refresh,
672                skip_validation=skip_validation,
673                progress_tracker=progress_tracker,
674            )
675        except exc.PyAirbyteInternalError as ex:
676            progress_tracker.log_failure(exception=ex)
677            raise exc.AirbyteConnectorFailedError(
678                connector_name=self.name,
679                log_text=self._last_log_messages,
680            ) from ex
681        except Exception as ex:
682            progress_tracker.log_failure(exception=ex)
683            raise
684
685        progress_tracker.log_success()
686        return result
687
688    def _read_to_cache(  # noqa: PLR0913  # Too many arguments
689        self,
690        cache: CacheBase,
691        *,
692        catalog_provider: CatalogProvider,
693        stream_names: list[str],
694        state_provider: StateProviderBase | None,
695        state_writer: StateWriterBase | None,
696        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
697        force_full_refresh: bool = False,
698        skip_validation: bool = False,
699        progress_tracker: ProgressTracker,
700    ) -> ReadResult:
701        """Internal read method."""
702        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
703            warnings.warn(
704                message=(
705                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
706                    "could result in data loss. "
707                    "To silence this warning, use the following: "
708                    'warnings.filterwarnings("ignore", '
709                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
710                ),
711                category=exc.PyAirbyteDataLossWarning,
712                stacklevel=1,
713            )
714        if isinstance(write_strategy, str):
715            try:
716                write_strategy = WriteStrategy(write_strategy)
717            except ValueError:
718                raise exc.PyAirbyteInputError(
719                    message="Invalid strategy",
720                    context={
721                        "write_strategy": write_strategy,
722                        "available_strategies": [s.value for s in WriteStrategy],
723                    },
724                ) from None
725
726        # Run optional validation step
727        if not skip_validation:
728            self.validate_config()
729
730        # Log incremental stream if incremental streams are known
731        if state_provider and state_provider.known_stream_names:
732            # Retrieve set of the known streams support which support incremental sync
733            incremental_streams = (
734                set(self._get_incremental_stream_names())
735                & state_provider.known_stream_names
736                & set(self.get_selected_streams())
737            )
738            if incremental_streams:
739                self._log_incremental_streams(incremental_streams=incremental_streams)
740
741        airbyte_message_iterator = AirbyteMessageIterator(
742            self._read_with_catalog(
743                catalog=catalog_provider.configured_catalog,
744                state=state_provider,
745                progress_tracker=progress_tracker,
746            )
747        )
748        cache._write_airbyte_message_stream(  # noqa: SLF001  # Non-public API
749            stdin=airbyte_message_iterator,
750            catalog_provider=catalog_provider,
751            write_strategy=write_strategy,
752            state_writer=state_writer,
753            progress_tracker=progress_tracker,
754        )
755
756        # Flush the WAL, if applicable
757        cache.processor._do_checkpoint()  # noqa: SLF001  # Non-public API
758
759        return ReadResult(
760            source_name=self.name,
761            progress_tracker=progress_tracker,
762            processed_streams=stream_names,
763            cache=cache,
764        )

A class representing a source that can be called.

Source( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, streams: str | list[str] | None = None, validate: bool = False)
58    def __init__(
59        self,
60        executor: Executor,
61        name: str,
62        config: dict[str, Any] | None = None,
63        *,
64        config_change_callback: ConfigChangeCallback | None = None,
65        streams: str | list[str] | None = None,
66        validate: bool = False,
67    ) -> None:
68        """Initialize the source.
69
70        If config is provided, it will be validated against the spec if validate is True.
71        """
72        self._to_be_selected_streams: list[str] | str = []
73        """Used to hold selection criteria before catalog is known."""
74
75        super().__init__(
76            executor=executor,
77            name=name,
78            config=config,
79            config_change_callback=config_change_callback,
80            validate=validate,
81        )
82        self._config_dict: dict[str, Any] | None = None
83        self._last_log_messages: list[str] = []
84        self._discovered_catalog: AirbyteCatalog | None = None
85        self._selected_stream_names: list[str] = []
86        if config is not None:
87            self.set_config(config, validate=validate)
88        if streams is not None:
89            self.select_streams(streams)
90
91        self._deployed_api_root: str | None = None
92        self._deployed_workspace_id: str | None = None
93        self._deployed_source_id: str | None = None

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type: Literal['source'] = 'source'
def set_streams(self, streams: list[str]) -> None:
 95    def set_streams(self, streams: list[str]) -> None:
 96        """Deprecated. See select_streams()."""
 97        warnings.warn(
 98            "The 'set_streams' method is deprecated and will be removed in a future version. "
 99            "Please use the 'select_streams' method instead.",
100            DeprecationWarning,
101            stacklevel=2,
102        )
103        self.select_streams(streams)

Deprecated. See select_streams().

def select_all_streams(self) -> None:
117    def select_all_streams(self) -> None:
118        """Select all streams.
119
120        This is a more streamlined equivalent to:
121        > source.select_streams(source.get_available_streams()).
122        """
123        if self._config_dict is None:
124            self._to_be_selected_streams = "*"
125            self._log_warning_preselected_stream(self._to_be_selected_streams)
126            return
127
128        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
130    def select_streams(self, streams: str | list[str]) -> None:
131        """Select the stream names that should be read from the connector.
132
133        Args:
134            streams: A list of stream names to select. If set to "*", all streams will be selected.
135
136        Currently, if this is not set, all streams will be read.
137        """
138        if self._config_dict is None:
139            self._to_be_selected_streams = streams
140            self._log_warning_preselected_stream(streams)
141            return
142
143        if streams == "*":
144            self.select_all_streams()
145            return
146
147        if isinstance(streams, str):
148            # If a single stream is provided, convert it to a one-item list
149            streams = [streams]
150
151        available_streams = self.get_available_streams()
152        for stream in streams:
153            if stream not in available_streams:
154                raise exc.AirbyteStreamNotFoundError(
155                    stream_name=stream,
156                    connector_name=self.name,
157                    available_streams=available_streams,
158                )
159        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Arguments:
  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
161    def get_selected_streams(self) -> list[str]:
162        """Get the selected streams.
163
164        If no streams are selected, return an empty list.
165        """
166        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
168    def set_config(
169        self,
170        config: dict[str, Any],
171        *,
172        validate: bool = True,
173    ) -> None:
174        """Set the config for the connector.
175
176        If validate is True, raise an exception if the config fails validation.
177
178        If validate is False, validation will be deferred until check() or validate_config()
179        is called.
180        """
181        if validate:
182            self.validate_config(config)
183
184        self._config_dict = config
185
186        if self._to_be_selected_streams:
187            self.select_streams(self._to_be_selected_streams)
188            self._to_be_selected_streams = []

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_config(self) -> dict[str, typing.Any]:
190    def get_config(self) -> dict[str, Any]:
191        """Get the config for the connector."""
192        return self._config

Get the config for the connector.

def get_available_streams(self) -> list[str]:
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
255    @property
256    def config_spec(self) -> dict[str, Any]:
257        """Generate a configuration spec for this connector, as a JSON Schema definition.
258
259        This function generates a JSON Schema dictionary with configuration specs for the
260        current connector, as a dictionary.
261
262        Returns:
263            dict: The JSON Schema configuration spec as a dictionary.
264        """
265        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
267    def print_config_spec(
268        self,
269        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
270        *,
271        output_file: Path | str | None = None,
272    ) -> None:
273        """Print the configuration spec for this connector.
274
275        Args:
276            format: The format to print the spec in. Must be "yaml" or "json".
277            output_file: Optional. If set, the spec will be written to the given file path.
278                Otherwise, it will be printed to the console.
279        """
280        if format not in {"yaml", "json"}:
281            raise exc.PyAirbyteInputError(
282                message="Invalid format. Expected 'yaml' or 'json'",
283                input_value=format,
284            )
285        if isinstance(output_file, str):
286            output_file = Path(output_file)
287
288        if format == "yaml":
289            content = yaml.dump(self.config_spec, indent=2)
290        elif format == "json":
291            content = json.dumps(self.config_spec, indent=2)
292
293        if output_file:
294            output_file.write_text(content)
295            return
296
297        syntax_highlighted = Syntax(content, format)
298        print(syntax_highlighted)

Print the configuration spec for this connector.

Arguments:
  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
315    @property
316    def docs_url(self) -> str:
317        """Get the URL to the connector's documentation."""
318        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
319            "source-", ""
320        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
322    @property
323    def discovered_catalog(self) -> AirbyteCatalog:
324        """Get the raw catalog for the given streams.
325
326        If the catalog is not yet known, we call discover to get it.
327        """
328        if self._discovered_catalog is None:
329            self._discovered_catalog = self._discover()
330
331        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
333    @property
334    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
335        """Get the configured catalog for the given streams.
336
337        If the raw catalog is not yet known, we call discover to get it.
338
339        If no specific streams are selected, we return a catalog that syncs all available streams.
340
341        TODO: We should consider disabling by default the streams that the connector would
342        disable by default. (For instance, streams that require a premium license are sometimes
343        disabled by default within the connector.)
344        """
345        # Ensure discovered catalog is cached before we start
346        _ = self.discovered_catalog
347
348        # Filter for selected streams if set, otherwise use all available streams:
349        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
350        return self.get_configured_catalog(streams=streams_filter)

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_configured_catalog( self, streams: Union[list[str], Literal['*'], NoneType] = None) -> airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog:
352    def get_configured_catalog(
353        self,
354        streams: Literal["*"] | list[str] | None = None,
355    ) -> ConfiguredAirbyteCatalog:
356        """Get a configured catalog for the given streams.
357
358        If no streams are provided, the selected streams will be used. If no streams are selected,
359        all available streams will be used.
360
361        If '*' is provided, all available streams will be used.
362        """
363        selected_streams: list[str] = []
364        if streams is None:
365            selected_streams = self._selected_stream_names or self.get_available_streams()
366        elif streams == "*":
367            selected_streams = self.get_available_streams()
368        elif isinstance(streams, list):
369            selected_streams = streams
370        else:
371            raise exc.PyAirbyteInputError(
372                message="Invalid streams argument.",
373                input_value=streams,
374            )
375
376        return ConfiguredAirbyteCatalog(
377            streams=[
378                ConfiguredAirbyteStream(
379                    stream=stream,
380                    destination_sync_mode=DestinationSyncMode.overwrite,
381                    primary_key=stream.source_defined_primary_key,
382                    sync_mode=SyncMode.incremental,
383                )
384                for stream in self.discovered_catalog.streams
385                if stream.name in selected_streams
386            ],
387        )

Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected, all available streams will be used.

If '*' is provided, all available streams will be used.

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
389    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
390        """Return the JSON Schema spec for the specified stream name."""
391        catalog: AirbyteCatalog = self.discovered_catalog
392        found: list[AirbyteStream] = [
393            stream for stream in catalog.streams if stream.name == stream_name
394        ]
395
396        if len(found) == 0:
397            raise exc.PyAirbyteInputError(
398                message="Stream name does not exist in catalog.",
399                input_value=stream_name,
400            )
401
402        if len(found) > 1:
403            raise exc.PyAirbyteInternalError(
404                message="Duplicate streams found with the same name.",
405                context={
406                    "found_streams": found,
407                },
408            )
409
410        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records( self, stream: str, *, normalize_field_names: bool = False, prune_undeclared_fields: bool = True) -> airbyte.datasets.LazyDataset:
412    def get_records(
413        self,
414        stream: str,
415        *,
416        normalize_field_names: bool = False,
417        prune_undeclared_fields: bool = True,
418    ) -> LazyDataset:
419        """Read a stream from the connector.
420
421        Args:
422            stream: The name of the stream to read.
423            normalize_field_names: When `True`, field names will be normalized to lower case, with
424                special characters removed. This matches the behavior of PyAirbyte caches and most
425                Airbyte destinations.
426            prune_undeclared_fields: When `True`, undeclared fields will be pruned from the records,
427                which generally matches the behavior of PyAirbyte caches and most Airbyte
428                destinations, specifically when you expect the catalog may be stale. You can disable
429                this to keep all fields in the records.
430
431        This involves the following steps:
432        * Call discover to get the catalog
433        * Generate a configured catalog that syncs the given stream in full_refresh mode
434        * Write the configured catalog and the config to a temporary file
435        * execute the connector with read --config <config_file> --catalog <catalog_file>
436        * Listen to the messages and return the first AirbyteRecordMessages that come along.
437        * Make sure the subprocess is killed when the function returns.
438        """
439        discovered_catalog: AirbyteCatalog = self.discovered_catalog
440        configured_catalog = ConfiguredAirbyteCatalog(
441            streams=[
442                ConfiguredAirbyteStream(
443                    stream=s,
444                    sync_mode=SyncMode.full_refresh,
445                    destination_sync_mode=DestinationSyncMode.overwrite,
446                )
447                for s in discovered_catalog.streams
448                if s.name == stream
449            ],
450        )
451        if len(configured_catalog.streams) == 0:
452            raise exc.PyAirbyteInputError(
453                message="Requested stream does not exist.",
454                context={
455                    "stream": stream,
456                    "available_streams": self.get_available_streams(),
457                    "connector_name": self.name,
458                },
459            ) from KeyError(stream)
460
461        configured_stream = configured_catalog.streams[0]
462
463        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
464            yield from records
465
466        stream_record_handler = StreamRecordHandler(
467            json_schema=self.get_stream_json_schema(stream),
468            prune_extra_fields=prune_undeclared_fields,
469            normalize_keys=normalize_field_names,
470        )
471
472        # This method is non-blocking, so we use "PLAIN" to avoid a live progress display
473        progress_tracker = ProgressTracker(
474            ProgressStyle.PLAIN,
475            source=self,
476            cache=None,
477            destination=None,
478            expected_streams=[stream],
479        )
480
481        iterator: Iterator[dict[str, Any]] = (
482            StreamRecord.from_record_message(
483                record_message=record.record,
484                stream_record_handler=stream_record_handler,
485            )
486            for record in self._read_with_catalog(
487                catalog=configured_catalog,
488                progress_tracker=progress_tracker,
489            )
490            if record.record
491        )
492        progress_tracker.log_success()
493        return LazyDataset(
494            iterator,
495            stream_metadata=configured_stream,
496        )

Read a stream from the connector.

Arguments:
  • stream: The name of the stream to read.
  • normalize_field_names: When True, field names will be normalized to lower case, with special characters removed. This matches the behavior of PyAirbyte caches and most Airbyte destinations.
  • prune_undeclared_fields: When True, undeclared fields will be pruned from the records, which generally matches the behavior of PyAirbyte caches and most Airbyte destinations, specifically when you expect the catalog may be stale. You can disable this to keep all fields in the records.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
498    def get_documents(
499        self,
500        stream: str,
501        title_property: str | None = None,
502        content_properties: list[str] | None = None,
503        metadata_properties: list[str] | None = None,
504        *,
505        render_metadata: bool = False,
506    ) -> Iterable[Document]:
507        """Read a stream from the connector and return the records as documents.
508
509        If metadata_properties is not set, all properties that are not content will be added to
510        the metadata.
511
512        If render_metadata is True, metadata will be rendered in the document, as well as the
513        the main content.
514        """
515        return self.get_records(stream).to_documents(
516            title_property=title_property,
517            content_properties=content_properties,
518            metadata_properties=metadata_properties,
519            render_metadata=render_metadata,
520        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def read( self, cache: airbyte.caches.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> airbyte.ReadResult:
610    def read(
611        self,
612        cache: CacheBase | None = None,
613        *,
614        streams: str | list[str] | None = None,
615        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
616        force_full_refresh: bool = False,
617        skip_validation: bool = False,
618    ) -> ReadResult:
619        """Read from the connector and write to the cache.
620
621        Args:
622            cache: The cache to write to. If not set, a default cache will be used.
623            streams: Optional if already set. A list of stream names to select for reading. If set
624                to "*", all streams will be selected.
625            write_strategy: The strategy to use when writing to the cache. If a string, it must be
626                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
627                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
628                WriteStrategy.AUTO.
629            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
630                streams will be read in incremental mode if supported by the connector. This option
631                must be True when using the "replace" strategy.
632            skip_validation: If True, PyAirbyte will not pre-validate the input configuration before
633                running the connector. This can be helpful in debugging, when you want to send
634                configurations to the connector that otherwise might be rejected by JSON Schema
635                validation rules.
636        """
637        cache = cache or get_default_cache()
638        progress_tracker = ProgressTracker(
639            source=self,
640            cache=cache,
641            destination=None,
642            expected_streams=None,  # Will be set later
643        )
644
645        # Set up state provider if not in full refresh mode
646        if force_full_refresh:
647            state_provider: StateProviderBase | None = None
648        else:
649            state_provider = cache.get_state_provider(
650                source_name=self._name,
651            )
652        state_writer = cache.get_state_writer(source_name=self._name)
653
654        if streams:
655            self.select_streams(streams)
656
657        if not self._selected_stream_names:
658            raise exc.PyAirbyteNoStreamsSelectedError(
659                connector_name=self.name,
660                available_streams=self.get_available_streams(),
661            )
662
663        try:
664            result = self._read_to_cache(
665                cache=cache,
666                catalog_provider=CatalogProvider(self.configured_catalog),
667                stream_names=self._selected_stream_names,
668                state_provider=state_provider,
669                state_writer=state_writer,
670                write_strategy=write_strategy,
671                force_full_refresh=force_full_refresh,
672                skip_validation=skip_validation,
673                progress_tracker=progress_tracker,
674            )
675        except exc.PyAirbyteInternalError as ex:
676            progress_tracker.log_failure(exception=ex)
677            raise exc.AirbyteConnectorFailedError(
678                connector_name=self.name,
679                log_text=self._last_log_messages,
680            ) from ex
681        except Exception as ex:
682            progress_tracker.log_failure(exception=ex)
683            raise
684
685        progress_tracker.log_success()
686        return result

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If not set, a default cache will be used.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
  • skip_validation: If True, PyAirbyte will not pre-validate the input configuration before running the connector. This can be helpful in debugging, when you want to send configurations to the connector that otherwise might be rejected by JSON Schema validation rules.
Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
config_hash
validate_config
connector_version
check
install
uninstall
@dataclass
class ConnectorMetadata:
147@dataclass
148class ConnectorMetadata:
149    """Metadata for a connector."""
150
151    name: str
152    """Connector name. For example, "source-google-sheets"."""
153
154    latest_available_version: str | None
155    """The latest available version of the connector."""
156
157    pypi_package_name: str | None
158    """The name of the PyPI package for the connector, if it exists."""
159
160    language: Language | None
161    """The language of the connector."""
162
163    install_types: set[InstallType]
164    """The supported install types for the connector."""
165
166    @property
167    def default_install_type(self) -> InstallType:
168        """Return the default install type for the connector."""
169        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
170            return InstallType.YAML
171
172        if InstallType.PYTHON in self.install_types:
173            return InstallType.PYTHON
174
175        # Else: Java or Docker
176        return InstallType.DOCKER

Metadata for a connector.

ConnectorMetadata( name: str, latest_available_version: str | None, pypi_package_name: str | None, language: airbyte.sources.registry.Language | None, install_types: set[airbyte.sources.registry.InstallType])
name: str

Connector name. For example, "source-google-sheets".

latest_available_version: str | None

The latest available version of the connector.

pypi_package_name: str | None

The name of the PyPI package for the connector, if it exists.

language: airbyte.sources.registry.Language | None

The language of the connector.

install_types: set[airbyte.sources.registry.InstallType]

The supported install types for the connector.

default_install_type: airbyte.sources.registry.InstallType
166    @property
167    def default_install_type(self) -> InstallType:
168        """Return the default install type for the connector."""
169        if self.language == Language.MANIFEST_ONLY and InstallType.YAML in self.install_types:
170            return InstallType.YAML
171
172        if InstallType.PYTHON in self.install_types:
173            return InstallType.PYTHON
174
175        # Else: Java or Docker
176        return InstallType.DOCKER

Return the default install type for the connector.