airbyte.destinations

Destinations module.

This module contains classes and methods for interacting with Airbyte destinations. You can use this module to create custom destinations, or to interact with existing destinations.

Getting Started

To get started with destinations, you can use the get_destination() method to create a destination object. This method takes a destination name and configuration, and returns a destination object that you can use to write data to the destination.

import airbyte as ab

my_destination = ab.get_destination(
    "destination-foo",
    config={"api_key": "my_api_key"},
    docker_image=True,
)

Writing Data to a Destination

To write data to a destination, you can use the Destination.write() method. This method takes either a airbyte.Source or airbyte.ReadResult object.

Writing to a destination from a source

To write directly from a source, simply pass the source object to the Destination.write() method:

my_source = get_source(...)
my_destination = get_destination(...)
my_destination.write(source_faker)

Writing from a read result:

To write from a read result, you can use the following pattern. First, read data from the source, then write the data to the destination, using the ReadResult object as a buffer between the source and destination:

# First read data from the source:
my_source = get_source(...)
read_result = my_source.read(...)

# Optionally, you can validate data before writing it:
# ...misc validation code here...

# Then write the data to the destination:
my_destination.write(read_result)

Using Docker and Python-based Connectors

By default, the get_destination() method will look for a Python-based connector. If you want to use a Docker-based connector, you can set the docker_image parameter to True:

my_destination = ab.get_destination(
    "destination-foo",
    config={"api_key": "my_api_key"},
    docker_image=True,
)

Note: Unlike source connectors, most destination connectors are written in Java, and for this reason are only available as Docker-based connectors. If you need to load to a SQL database and your runtime does not support docker, you may want to use the airbyte.caches module to load data to a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally to PyAirbyte so they can run anywhere that PyAirbyte can run.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Destinations module.
  3
  4This module contains classes and methods for interacting with Airbyte destinations. You can use this
  5module to create custom destinations, or to interact with existing destinations.
  6
  7## Getting Started
  8
  9To get started with destinations, you can use the `get_destination()` method to create a destination
 10object. This method takes a destination name and configuration, and returns a destination object
 11that you can use to write data to the destination.
 12
 13```python
 14import airbyte as ab
 15
 16my_destination = ab.get_destination(
 17    "destination-foo",
 18    config={"api_key": "my_api_key"},
 19    docker_image=True,
 20)
 21```
 22
 23## Writing Data to a Destination
 24
 25To write data to a destination, you can use the `Destination.write()` method. This method
 26takes either a `airbyte.Source` or `airbyte.ReadResult` object.
 27
 28## Writing to a destination from a source
 29
 30To write directly from a source, simply pass the source object to the `Destination.write()` method:
 31
 32```python
 33my_source = get_source(...)
 34my_destination = get_destination(...)
 35my_destination.write(source_faker)
 36```
 37
 38## Writing from a read result:
 39
 40To write from a read result, you can use the following pattern. First, read data from the source,
 41then write the data to the destination, using the `ReadResult` object as a buffer between the source
 42and destination:
 43
 44```python
 45# First read data from the source:
 46my_source = get_source(...)
 47read_result = my_source.read(...)
 48
 49# Optionally, you can validate data before writing it:
 50# ...misc validation code here...
 51
 52# Then write the data to the destination:
 53my_destination.write(read_result)
 54```
 55
 56## Using Docker and Python-based Connectors
 57
 58By default, the `get_destination()` method will look for a Python-based connector. If you want to
 59use a Docker-based connector, you can set the `docker_image` parameter to `True`:
 60
 61```python
 62my_destination = ab.get_destination(
 63    "destination-foo",
 64    config={"api_key": "my_api_key"},
 65    docker_image=True,
 66)
 67```
 68
 69**Note:** Unlike source connectors, most destination connectors are written in Java, and for this
 70reason are only available as Docker-based connectors. If you need to load to a SQL database and your
 71runtime does not support docker, you may want to use the `airbyte.caches` module to load data to
 72a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally
 73to PyAirbyte so they can run anywhere that PyAirbyte can run.
 74"""
 75
 76from __future__ import annotations
 77
 78from typing import TYPE_CHECKING
 79
 80from airbyte.destinations.base import Destination
 81from airbyte.destinations.util import (
 82    get_destination,
 83    get_noop_destination,
 84)
 85
 86
 87# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
 88if TYPE_CHECKING:
 89    # ruff: noqa: TC004  # imports used for more than type checking
 90    from airbyte.destinations import util
 91
 92
 93__all__ = [
 94    # Modules
 95    "util",
 96    # Methods
 97    "get_destination",
 98    "get_noop_destination",
 99    # Classes
100    "Destination",
101]
def get_destination( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, version: str | None = None, use_python: bool | pathlib.Path | str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: str | bool | None = None, use_host_network: bool = False, install_if_missing: bool = True, install_root: pathlib.Path | None = None, no_executor: bool = False) -> Destination:
22def get_destination(  # noqa: PLR0913 # Too many arguments
23    name: str,
24    config: dict[str, Any] | None = None,
25    *,
26    config_change_callback: ConfigChangeCallback | None = None,
27    version: str | None = None,
28    use_python: bool | Path | str | None = None,
29    pip_url: str | None = None,
30    local_executable: Path | str | None = None,
31    docker_image: str | bool | None = None,
32    use_host_network: bool = False,
33    install_if_missing: bool = True,
34    install_root: Path | None = None,
35    no_executor: bool = False,
36) -> Destination:
37    """Get a connector by name and version.
38
39    Args:
40        name: connector name
41        config: connector config - if not provided, you need to set it later via the set_config
42            method.
43        config_change_callback: callback function to be called when the connector config changes.
44        streams: list of stream names to select for reading. If set to "*", all streams will be
45            selected. If not provided, you can set it later via the `select_streams()` or
46            `select_all_streams()` method.
47        version: connector version - if not provided, the currently installed version will be used.
48            If no version is installed, the latest available version will be used. The version can
49            also be set to "latest" to force the use of the latest available version.
50        use_python: (Optional.) Python interpreter specification:
51            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
52            - False: Use Docker instead.
53            - Path: Use interpreter at this path.
54            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
55                installed, it will be installed by uv. (This generally adds less than 3 seconds
56                to install times.)
57        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
58            connector name.
59        local_executable: If set, the connector will be assumed to already be installed and will be
60            executed using this path or executable name. Otherwise, the connector will be installed
61            automatically in a virtual environment.
62        docker_image: If set, the connector will be executed using Docker. You can specify `True`
63            to use the default image for the connector, or you can specify a custom image name.
64            If `version` is specified and your image name does not already contain a tag
65            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
66        use_host_network: If set, along with docker_image, the connector will be executed using
67            the host network. This is useful for connectors that need to access resources on
68            the host machine, such as a local database. This parameter is ignored when
69            `docker_image` is not set.
70        install_if_missing: Whether to install the connector if it is not available locally. This
71            parameter is ignored when local_executable is set.
72        install_root: (Optional.) The root directory where the virtual environment will be
73            created. If not provided, the current working directory will be used.
74        no_executor: If True, use NoOpExecutor which fetches specs from the registry without
75            local installation. This is useful for scenarios where you need to validate
76            configurations but don't need to run the connector locally (e.g., deploying to Cloud).
77    """
78    executor = get_connector_executor(
79        name=name,
80        version=version,
81        use_python=use_python,
82        pip_url=pip_url,
83        local_executable=local_executable,
84        docker_image=docker_image,
85        use_host_network=use_host_network,
86        install_if_missing=install_if_missing,
87        install_root=install_root,
88        no_executor=no_executor,
89    )
90
91    return Destination(
92        name=name,
93        config=config,
94        config_change_callback=config_change_callback,
95        executor=executor,
96    )

Get a connector by name and version.

Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • use_python: (Optional.) Python interpreter specification:
    • True: Use current Python interpreter. (Inferred if pip_url is set.)
    • False: Use Docker instead.
    • Path: Use interpreter at this path.
    • str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
  • no_executor: If True, use NoOpExecutor which fetches specs from the registry without local installation. This is useful for scenarios where you need to validate configurations but don't need to run the connector locally (e.g., deploying to Cloud).
def get_noop_destination( *, install_if_missing: bool = True) -> Destination:
 99def get_noop_destination(
100    *,
101    install_if_missing: bool = True,
102) -> Destination:
103    """Get a devnull (no-op) destination.
104
105    This is useful for performance benchmarking of sources, without
106    adding the overhead of writing data to a real destination.
107    """
108    return get_destination(
109        "destination-dev-null",
110        config={
111            "test_destination": {
112                "test_destination_type": "SILENT",
113            }
114        },
115        docker_image=True,
116        install_if_missing=install_if_missing,
117    )

Get a devnull (no-op) destination.

This is useful for performance benchmarking of sources, without adding the overhead of writing data to a real destination.

class Destination(airbyte._connector_base.ConnectorBase, airbyte._writers.base.AirbyteWriterInterface):
 46class Destination(ConnectorBase, AirbyteWriterInterface):
 47    """A class representing a destination that can be called."""
 48
 49    connector_type = "destination"
 50
 51    def __init__(
 52        self,
 53        executor: Executor,
 54        name: str,
 55        config: dict[str, Any] | None = None,
 56        *,
 57        config_change_callback: ConfigChangeCallback | None = None,
 58        validate: bool = False,
 59    ) -> None:
 60        """Initialize the source.
 61
 62        If config is provided, it will be validated against the spec if validate is True.
 63        """
 64        super().__init__(
 65            executor=executor,
 66            name=name,
 67            config=config,
 68            config_change_callback=config_change_callback,
 69            validate=validate,
 70        )
 71
 72    @staticmethod
 73    def _normalize_destination_name(name: str) -> str:
 74        """Normalize a destination name to canonical form (`destination-<type>`).
 75
 76        Accepts either the short form (e.g. `snowflake`) or the canonical
 77        form (e.g. `destination-snowflake`).
 78        """
 79        if not name.startswith(_CANONICAL_PREFIX):
 80            return f"{_CANONICAL_PREFIX}{name}"
 81        return name
 82
 83    @property
 84    def is_cache_supported(self) -> bool:
 85        """Whether this destination has a compatible cache implementation.
 86
 87        Returns `True` when `get_sql_cache()` is expected to succeed for
 88        the destination's connector type.
 89        """
 90        dest_type = self._normalize_destination_name(
 91            self.name,
 92        ).replace(_CANONICAL_PREFIX, "")
 93        return dest_type in get_supported_destination_types()
 94
 95    def get_sql_cache(
 96        self,
 97        *,
 98        schema_name: str | None = None,
 99    ) -> CacheBase:
100        """Return a SQL Cache for querying data written by this destination.
101
102        This follows the same pattern as
103        `SyncResult.get_sql_cache()` in `airbyte.cloud.sync_results`:
104        it builds a cache from the destination's configuration using
105        `destination_to_cache()`.
106
107        Args:
108            schema_name: Override the schema/namespace on the returned cache.
109                When `None` the cache uses the default schema from the
110                destination config.
111
112        Raises:
113            ValueError: If the destination type is not supported.
114        """
115        resolved_name = self._normalize_destination_name(self.name)
116        config = dict(self._hydrated_config)
117
118        # Ensure the config carries a destinationType key so that
119        # destination_to_cache() can dispatch correctly.
120        if "destinationType" not in config and "DESTINATION_TYPE" not in config:
121            dest_type = resolved_name.replace(_CANONICAL_PREFIX, "")
122            config["destinationType"] = dest_type
123
124        return destination_to_cache(config, schema_name=schema_name)
125
126    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
127        self,
128        source_data: Source | ReadResult,
129        *,
130        streams: list[str] | Literal["*"] | None = None,
131        cache: CacheBase | Literal[False] | None = None,
132        state_cache: CacheBase | Literal[False] | None = None,
133        write_strategy: WriteStrategy = WriteStrategy.AUTO,
134        force_full_refresh: bool = False,
135    ) -> WriteResult:
136        """Write data from source connector or already cached source data.
137
138        Caching is enabled by default, unless explicitly disabled.
139
140        Args:
141            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
142            streams: The streams to write to the destination. If omitted or if "*" is provided,
143                all streams will be written. If `source_data` is a source, then streams must be
144                selected here or on the source. If both are specified, this setting will override
145                the stream selection on the source.
146            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
147                False, the cache will be disabled. This must be `None` if `source_data` is already
148                a `Cache` object.
149            state_cache: A cache to use for storing incremental state. You do not need to set this
150                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
151                disable state management.
152            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
153                will decide the best strategy to use.
154            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
155                existing state will be ignored and all source data will be reloaded.
156
157        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
158        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
159        a known destination state, the destination-specific state will be used. If neither are
160        available, a full refresh will be performed.
161        """
162        if not isinstance(source_data, ReadResult | Source):
163            raise exc.PyAirbyteInputError(
164                message="Invalid source_data type for `source_data` arg.",
165                context={
166                    "source_data_type_provided": type(source_data).__name__,
167                },
168            )
169
170        # Resolve `source`, `read_result`, and `source_name`
171        source: Source | None = source_data if isinstance(source_data, Source) else None
172        read_result: ReadResult | None = (
173            source_data if isinstance(source_data, ReadResult) else None
174        )
175        source_name: str = source.name if source else cast("ReadResult", read_result).source_name
176
177        # State providers and writers default to no-op, unless overridden below.
178        cache_state_provider: StateProviderBase = StaticInputState([])
179        """Provides the state of the cache's data."""
180        cache_state_writer: StateWriterBase = NoOpStateWriter()
181        """Writes updates for the state of the cache's data."""
182        destination_state_provider: StateProviderBase = StaticInputState([])
183        """Provides the state of the destination's data, from `cache` or `state_cache`."""
184        destination_state_writer: StateWriterBase = NoOpStateWriter()
185        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
186
187        # If caching not explicitly disabled
188        if cache is not False:
189            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
190            if isinstance(source_data, ReadResult):
191                cache = source_data.cache
192
193            cache = cache or get_default_cache()
194            cache_state_provider = cache.get_state_provider(
195                source_name=source_name,
196                destination_name=None,  # This will just track the cache state
197            )
198            cache_state_writer = cache.get_state_writer(
199                source_name=source_name,
200                destination_name=None,  # This will just track the cache state
201            )
202
203        # Resolve `state_cache`
204        if state_cache is None:
205            state_cache = cache or get_default_cache()
206
207        # Resolve `destination_state_writer` and `destination_state_provider`
208        if state_cache:
209            destination_state_writer = state_cache.get_state_writer(
210                source_name=source_name,
211                destination_name=self.name,
212            )
213            if not force_full_refresh:
214                destination_state_provider = state_cache.get_state_provider(
215                    source_name=source_name,
216                    destination_name=self.name,
217                )
218        elif state_cache is not False:
219            warnings.warn(
220                "No state backend or cache provided. State will not be tracked."
221                "To track state, provide a cache or state backend."
222                "To silence this warning, set `state_cache=False` explicitly.",
223                category=exc.PyAirbyteWarning,
224                stacklevel=2,
225            )
226
227        # Resolve `catalog_provider`
228        if source:
229            catalog_provider = CatalogProvider(
230                configured_catalog=source.get_configured_catalog(
231                    streams=streams,
232                    force_full_refresh=force_full_refresh,
233                )
234            )
235        elif read_result:
236            catalog_provider = CatalogProvider.from_read_result(read_result)
237        else:
238            raise exc.PyAirbyteInternalError(
239                message="`source_data` must be a `Source` or `ReadResult` object.",
240            )
241
242        progress_tracker = ProgressTracker(
243            source=source if isinstance(source_data, Source) else None,
244            cache=cache or None,
245            destination=self,
246            expected_streams=catalog_provider.stream_names,
247        )
248
249        source_state_provider: StateProviderBase
250        source_state_provider = JoinedStateProvider(
251            primary=cache_state_provider,
252            secondary=destination_state_provider,
253        )
254
255        if source:
256            if cache is False:
257                # Get message iterator for source (caching disabled)
258                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
259                    streams=streams,
260                    state_provider=source_state_provider,
261                    progress_tracker=progress_tracker,
262                    force_full_refresh=force_full_refresh,
263                )
264            else:
265                # Caching enabled and we are reading from a source.
266                # Read the data to cache if caching is enabled.
267                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
268                    cache=cache,
269                    state_provider=source_state_provider,
270                    state_writer=cache_state_writer,
271                    catalog_provider=catalog_provider,
272                    stream_names=catalog_provider.stream_names,
273                    write_strategy=write_strategy,
274                    force_full_refresh=force_full_refresh,
275                    skip_validation=False,
276                    progress_tracker=progress_tracker,
277                )
278                message_iterator = AirbyteMessageIterator.from_read_result(
279                    read_result=read_result,
280                )
281        else:  # Else we are reading from a read result
282            assert read_result is not None
283            message_iterator = AirbyteMessageIterator.from_read_result(
284                read_result=read_result,
285            )
286
287        # Write the data to the destination
288        try:
289            self._write_airbyte_message_stream(
290                stdin=message_iterator,
291                catalog_provider=catalog_provider,
292                write_strategy=write_strategy,
293                state_writer=destination_state_writer,
294                progress_tracker=progress_tracker,
295            )
296        except Exception as ex:
297            progress_tracker.log_failure(exception=ex)
298            raise
299        else:
300            # No exceptions were raised, so log success
301            progress_tracker.log_success()
302
303        return WriteResult(
304            destination=self,
305            source_data=source_data,
306            catalog_provider=catalog_provider,
307            state_writer=destination_state_writer,
308            progress_tracker=progress_tracker,
309        )
310
311    def _write_airbyte_message_stream(
312        self,
313        stdin: IO[str] | AirbyteMessageIterator,
314        *,
315        catalog_provider: CatalogProvider,
316        write_strategy: WriteStrategy,
317        state_writer: StateWriterBase | None = None,
318        progress_tracker: ProgressTracker,
319    ) -> None:
320        """Read from the connector and write to the cache."""
321        # Run optional validation step
322        if state_writer is None:
323            state_writer = StdOutStateWriter()
324
325        # Apply the write strategy to the catalog provider before sending to the destination
326        catalog_provider = catalog_provider.with_write_strategy(write_strategy)
327
328        with as_temp_files(
329            files_contents=[
330                self._hydrated_config,
331                catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
332            ]
333        ) as [
334            config_file,
335            catalog_file,
336        ]:
337            try:
338                # We call the connector to write the data, tallying the inputs and outputs
339                for destination_message in progress_tracker.tally_confirmed_writes(
340                    messages=self._execute(
341                        args=[
342                            "write",
343                            "--config",
344                            config_file,
345                            "--catalog",
346                            catalog_file,
347                        ],
348                        stdin=AirbyteMessageIterator(
349                            progress_tracker.tally_pending_writes(
350                                stdin,
351                            )
352                        ),
353                    )
354                ):
355                    if destination_message.state:
356                        state_writer.write_state(state_message=destination_message.state)
357
358            except exc.AirbyteConnectorFailedError as ex:
359                raise exc.AirbyteConnectorWriteError(
360                    connector_name=self.name,
361                    log_text=self._last_log_messages,
362                    original_exception=ex,
363                ) from None

A class representing a destination that can be called.

Destination( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, validate: bool = False)
51    def __init__(
52        self,
53        executor: Executor,
54        name: str,
55        config: dict[str, Any] | None = None,
56        *,
57        config_change_callback: ConfigChangeCallback | None = None,
58        validate: bool = False,
59    ) -> None:
60        """Initialize the source.
61
62        If config is provided, it will be validated against the spec if validate is True.
63        """
64        super().__init__(
65            executor=executor,
66            name=name,
67            config=config,
68            config_change_callback=config_change_callback,
69            validate=validate,
70        )

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'destination'
is_cache_supported: bool
83    @property
84    def is_cache_supported(self) -> bool:
85        """Whether this destination has a compatible cache implementation.
86
87        Returns `True` when `get_sql_cache()` is expected to succeed for
88        the destination's connector type.
89        """
90        dest_type = self._normalize_destination_name(
91            self.name,
92        ).replace(_CANONICAL_PREFIX, "")
93        return dest_type in get_supported_destination_types()

Whether this destination has a compatible cache implementation.

Returns True when get_sql_cache() is expected to succeed for the destination's connector type.

def get_sql_cache(self, *, schema_name: str | None = None) -> airbyte.caches.CacheBase:
 95    def get_sql_cache(
 96        self,
 97        *,
 98        schema_name: str | None = None,
 99    ) -> CacheBase:
100        """Return a SQL Cache for querying data written by this destination.
101
102        This follows the same pattern as
103        `SyncResult.get_sql_cache()` in `airbyte.cloud.sync_results`:
104        it builds a cache from the destination's configuration using
105        `destination_to_cache()`.
106
107        Args:
108            schema_name: Override the schema/namespace on the returned cache.
109                When `None` the cache uses the default schema from the
110                destination config.
111
112        Raises:
113            ValueError: If the destination type is not supported.
114        """
115        resolved_name = self._normalize_destination_name(self.name)
116        config = dict(self._hydrated_config)
117
118        # Ensure the config carries a destinationType key so that
119        # destination_to_cache() can dispatch correctly.
120        if "destinationType" not in config and "DESTINATION_TYPE" not in config:
121            dest_type = resolved_name.replace(_CANONICAL_PREFIX, "")
122            config["destinationType"] = dest_type
123
124        return destination_to_cache(config, schema_name=schema_name)

Return a SQL Cache for querying data written by this destination.

This follows the same pattern as SyncResult.get_sql_cache() in airbyte.cloud.sync_results: it builds a cache from the destination's configuration using destination_to_cache().

Arguments:
  • schema_name: Override the schema/namespace on the returned cache. When None the cache uses the default schema from the destination config.
Raises:
  • ValueError: If the destination type is not supported.
def write( self, source_data: airbyte.Source | airbyte.ReadResult, *, streams: Union[list[str], Literal['*'], NoneType] = None, cache: Union[airbyte.caches.CacheBase, Literal[False], NoneType] = None, state_cache: Union[airbyte.caches.CacheBase, Literal[False], NoneType] = None, write_strategy: airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False) -> airbyte.WriteResult:
126    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
127        self,
128        source_data: Source | ReadResult,
129        *,
130        streams: list[str] | Literal["*"] | None = None,
131        cache: CacheBase | Literal[False] | None = None,
132        state_cache: CacheBase | Literal[False] | None = None,
133        write_strategy: WriteStrategy = WriteStrategy.AUTO,
134        force_full_refresh: bool = False,
135    ) -> WriteResult:
136        """Write data from source connector or already cached source data.
137
138        Caching is enabled by default, unless explicitly disabled.
139
140        Args:
141            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
142            streams: The streams to write to the destination. If omitted or if "*" is provided,
143                all streams will be written. If `source_data` is a source, then streams must be
144                selected here or on the source. If both are specified, this setting will override
145                the stream selection on the source.
146            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
147                False, the cache will be disabled. This must be `None` if `source_data` is already
148                a `Cache` object.
149            state_cache: A cache to use for storing incremental state. You do not need to set this
150                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
151                disable state management.
152            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
153                will decide the best strategy to use.
154            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
155                existing state will be ignored and all source data will be reloaded.
156
157        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
158        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
159        a known destination state, the destination-specific state will be used. If neither are
160        available, a full refresh will be performed.
161        """
162        if not isinstance(source_data, ReadResult | Source):
163            raise exc.PyAirbyteInputError(
164                message="Invalid source_data type for `source_data` arg.",
165                context={
166                    "source_data_type_provided": type(source_data).__name__,
167                },
168            )
169
170        # Resolve `source`, `read_result`, and `source_name`
171        source: Source | None = source_data if isinstance(source_data, Source) else None
172        read_result: ReadResult | None = (
173            source_data if isinstance(source_data, ReadResult) else None
174        )
175        source_name: str = source.name if source else cast("ReadResult", read_result).source_name
176
177        # State providers and writers default to no-op, unless overridden below.
178        cache_state_provider: StateProviderBase = StaticInputState([])
179        """Provides the state of the cache's data."""
180        cache_state_writer: StateWriterBase = NoOpStateWriter()
181        """Writes updates for the state of the cache's data."""
182        destination_state_provider: StateProviderBase = StaticInputState([])
183        """Provides the state of the destination's data, from `cache` or `state_cache`."""
184        destination_state_writer: StateWriterBase = NoOpStateWriter()
185        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
186
187        # If caching not explicitly disabled
188        if cache is not False:
189            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
190            if isinstance(source_data, ReadResult):
191                cache = source_data.cache
192
193            cache = cache or get_default_cache()
194            cache_state_provider = cache.get_state_provider(
195                source_name=source_name,
196                destination_name=None,  # This will just track the cache state
197            )
198            cache_state_writer = cache.get_state_writer(
199                source_name=source_name,
200                destination_name=None,  # This will just track the cache state
201            )
202
203        # Resolve `state_cache`
204        if state_cache is None:
205            state_cache = cache or get_default_cache()
206
207        # Resolve `destination_state_writer` and `destination_state_provider`
208        if state_cache:
209            destination_state_writer = state_cache.get_state_writer(
210                source_name=source_name,
211                destination_name=self.name,
212            )
213            if not force_full_refresh:
214                destination_state_provider = state_cache.get_state_provider(
215                    source_name=source_name,
216                    destination_name=self.name,
217                )
218        elif state_cache is not False:
219            warnings.warn(
220                "No state backend or cache provided. State will not be tracked."
221                "To track state, provide a cache or state backend."
222                "To silence this warning, set `state_cache=False` explicitly.",
223                category=exc.PyAirbyteWarning,
224                stacklevel=2,
225            )
226
227        # Resolve `catalog_provider`
228        if source:
229            catalog_provider = CatalogProvider(
230                configured_catalog=source.get_configured_catalog(
231                    streams=streams,
232                    force_full_refresh=force_full_refresh,
233                )
234            )
235        elif read_result:
236            catalog_provider = CatalogProvider.from_read_result(read_result)
237        else:
238            raise exc.PyAirbyteInternalError(
239                message="`source_data` must be a `Source` or `ReadResult` object.",
240            )
241
242        progress_tracker = ProgressTracker(
243            source=source if isinstance(source_data, Source) else None,
244            cache=cache or None,
245            destination=self,
246            expected_streams=catalog_provider.stream_names,
247        )
248
249        source_state_provider: StateProviderBase
250        source_state_provider = JoinedStateProvider(
251            primary=cache_state_provider,
252            secondary=destination_state_provider,
253        )
254
255        if source:
256            if cache is False:
257                # Get message iterator for source (caching disabled)
258                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
259                    streams=streams,
260                    state_provider=source_state_provider,
261                    progress_tracker=progress_tracker,
262                    force_full_refresh=force_full_refresh,
263                )
264            else:
265                # Caching enabled and we are reading from a source.
266                # Read the data to cache if caching is enabled.
267                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
268                    cache=cache,
269                    state_provider=source_state_provider,
270                    state_writer=cache_state_writer,
271                    catalog_provider=catalog_provider,
272                    stream_names=catalog_provider.stream_names,
273                    write_strategy=write_strategy,
274                    force_full_refresh=force_full_refresh,
275                    skip_validation=False,
276                    progress_tracker=progress_tracker,
277                )
278                message_iterator = AirbyteMessageIterator.from_read_result(
279                    read_result=read_result,
280                )
281        else:  # Else we are reading from a read result
282            assert read_result is not None
283            message_iterator = AirbyteMessageIterator.from_read_result(
284                read_result=read_result,
285            )
286
287        # Write the data to the destination
288        try:
289            self._write_airbyte_message_stream(
290                stdin=message_iterator,
291                catalog_provider=catalog_provider,
292                write_strategy=write_strategy,
293                state_writer=destination_state_writer,
294                progress_tracker=progress_tracker,
295            )
296        except Exception as ex:
297            progress_tracker.log_failure(exception=ex)
298            raise
299        else:
300            # No exceptions were raised, so log success
301            progress_tracker.log_success()
302
303        return WriteResult(
304            destination=self,
305            source_data=source_data,
306            catalog_provider=catalog_provider,
307            state_writer=destination_state_writer,
308            progress_tracker=progress_tracker,
309        )

Write data from source connector or already cached source data.

Caching is enabled by default, unless explicitly disabled.

Arguments:
  • source_data: The source data to write. Can be a Source or a ReadResult object.
  • streams: The streams to write to the destination. If omitted or if "*" is provided, all streams will be written. If source_data is a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source.
  • cache: The cache to use for reading source_data. If None, no cache will be used. If False, the cache will be disabled. This must be None if source_data is already a Cache object.
  • state_cache: A cache to use for storing incremental state. You do not need to set this if cache is specified or if source_data is a Cache object. Set to False to disable state management.
  • write_strategy: The strategy to use for writing source_data. If AUTO, the connector will decide the best strategy to use.
  • force_full_refresh: Whether to force a full refresh of the source_data. If True, any existing state will be ignored and all source data will be reloaded.

For incremental syncs, cache or state_cache will be checked for matching state values. If the cache has tracked state, this will be used for the sync. Otherwise, if there is a known destination state, the destination-specific state will be used. If neither are available, a full refresh will be performed.