airbyte.destinations

Destinations module.

This module contains classes and methods for interacting with Airbyte destinations. You can use this module to create custom destinations, or to interact with existing destinations.

Getting Started

To get started with destinations, you can use the get_destination() method to create a destination object. This method takes a destination name and configuration, and returns a destination object that you can use to write data to the destination.

import airbyte as ab

my_destination = ab.get_destination(
    "destination-foo",
    config={"api_key": "my_api_key"},
    docker_image=True,
)

Writing Data to a Destination

To write data to a destination, you can use the Destination.write() method. This method takes either a airbyte.Source or airbyte.ReadResult object.

Writing to a destination from a source

To write directly from a source, simply pass the source object to the Destination.write() method:

my_source = get_source(...)
my_destination = get_destination(...)
my_destination.write(source_faker)

Writing from a read result:

To write from a read result, you can use the following pattern. First, read data from the source, then write the data to the destination, using the ReadResult object as a buffer between the source and destination:

# First read data from the source:
my_source = get_source(...)
read_result = my_source.read(...)

# Optionally, you can validate data before writing it:
# ...misc validation code here...

# Then write the data to the destination:
my_destination.write(read_result)

Using Docker and Python-based Connectors

By default, the get_destination() method will look for a Python-based connector. If you want to use a Docker-based connector, you can set the docker_image parameter to True:

my_destination = ab.get_destination(
    "destination-foo",
    config={"api_key": "my_api_key"},
    docker_image=True,
)

Note: Unlike source connectors, most destination connectors are written in Java, and for this reason are only available as Docker-based connectors. If you need to load to a SQL database and your runtime does not support docker, you may want to use the airbyte.caches module to load data to a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally to PyAirbyte so they can run anywhere that PyAirbyte can run.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Destinations module.
  3
  4This module contains classes and methods for interacting with Airbyte destinations. You can use this
  5module to create custom destinations, or to interact with existing destinations.
  6
  7## Getting Started
  8
  9To get started with destinations, you can use the `get_destination()` method to create a destination
 10object. This method takes a destination name and configuration, and returns a destination object
 11that you can use to write data to the destination.
 12
 13```python
 14import airbyte as ab
 15
 16my_destination = ab.get_destination(
 17    "destination-foo",
 18    config={"api_key": "my_api_key"},
 19    docker_image=True,
 20)
 21```
 22
 23## Writing Data to a Destination
 24
 25To write data to a destination, you can use the `Destination.write()` method. This method
 26takes either a `airbyte.Source` or `airbyte.ReadResult` object.
 27
 28## Writing to a destination from a source
 29
 30To write directly from a source, simply pass the source object to the `Destination.write()` method:
 31
 32```python
 33my_source = get_source(...)
 34my_destination = get_destination(...)
 35my_destination.write(source_faker)
 36```
 37
 38## Writing from a read result:
 39
 40To write from a read result, you can use the following pattern. First, read data from the source,
 41then write the data to the destination, using the `ReadResult` object as a buffer between the source
 42and destination:
 43
 44```python
 45# First read data from the source:
 46my_source = get_source(...)
 47read_result = my_source.read(...)
 48
 49# Optionally, you can validate data before writing it:
 50# ...misc validation code here...
 51
 52# Then write the data to the destination:
 53my_destination.write(read_result)
 54```
 55
 56## Using Docker and Python-based Connectors
 57
 58By default, the `get_destination()` method will look for a Python-based connector. If you want to
 59use a Docker-based connector, you can set the `docker_image` parameter to `True`:
 60
 61```python
 62my_destination = ab.get_destination(
 63    "destination-foo",
 64    config={"api_key": "my_api_key"},
 65    docker_image=True,
 66)
 67```
 68
 69**Note:** Unlike source connectors, most destination connectors are written in Java, and for this
 70reason are only available as Docker-based connectors. If you need to load to a SQL database and your
 71runtime does not support docker, you may want to use the `airbyte.caches` module to load data to
 72a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally
 73to PyAirbyte so they can run anywhere that PyAirbyte can run.
 74"""
 75
 76from __future__ import annotations
 77
 78from typing import TYPE_CHECKING
 79
 80from airbyte.destinations.base import Destination
 81from airbyte.destinations.util import (
 82    get_destination,
 83    get_noop_destination,
 84)
 85
 86
 87# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
 88if TYPE_CHECKING:
 89    # ruff: noqa: TC004  # imports used for more than type checking
 90    from airbyte.destinations import util
 91
 92
 93__all__ = [
 94    # Modules
 95    "util",
 96    # Methods
 97    "get_destination",
 98    "get_noop_destination",
 99    # Classes
100    "Destination",
101]
def get_destination( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, version: str | None = None, use_python: bool | pathlib.Path | str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: str | bool | None = None, use_host_network: bool = False, install_if_missing: bool = True, install_root: pathlib.Path | None = None, no_executor: bool = False) -> Destination:
22def get_destination(  # noqa: PLR0913 # Too many arguments
23    name: str,
24    config: dict[str, Any] | None = None,
25    *,
26    config_change_callback: ConfigChangeCallback | None = None,
27    version: str | None = None,
28    use_python: bool | Path | str | None = None,
29    pip_url: str | None = None,
30    local_executable: Path | str | None = None,
31    docker_image: str | bool | None = None,
32    use_host_network: bool = False,
33    install_if_missing: bool = True,
34    install_root: Path | None = None,
35    no_executor: bool = False,
36) -> Destination:
37    """Get a connector by name and version.
38
39    Args:
40        name: connector name
41        config: connector config - if not provided, you need to set it later via the set_config
42            method.
43        config_change_callback: callback function to be called when the connector config changes.
44        streams: list of stream names to select for reading. If set to "*", all streams will be
45            selected. If not provided, you can set it later via the `select_streams()` or
46            `select_all_streams()` method.
47        version: connector version - if not provided, the currently installed version will be used.
48            If no version is installed, the latest available version will be used. The version can
49            also be set to "latest" to force the use of the latest available version.
50        use_python: (Optional.) Python interpreter specification:
51            - True: Use current Python interpreter. (Inferred if `pip_url` is set.)
52            - False: Use Docker instead.
53            - Path: Use interpreter at this path.
54            - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet
55                installed, it will be installed by uv. (This generally adds less than 3 seconds
56                to install times.)
57        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
58            connector name.
59        local_executable: If set, the connector will be assumed to already be installed and will be
60            executed using this path or executable name. Otherwise, the connector will be installed
61            automatically in a virtual environment.
62        docker_image: If set, the connector will be executed using Docker. You can specify `True`
63            to use the default image for the connector, or you can specify a custom image name.
64            If `version` is specified and your image name does not already contain a tag
65            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
66        use_host_network: If set, along with docker_image, the connector will be executed using
67            the host network. This is useful for connectors that need to access resources on
68            the host machine, such as a local database. This parameter is ignored when
69            `docker_image` is not set.
70        install_if_missing: Whether to install the connector if it is not available locally. This
71            parameter is ignored when local_executable is set.
72        install_root: (Optional.) The root directory where the virtual environment will be
73            created. If not provided, the current working directory will be used.
74        no_executor: If True, use NoOpExecutor which fetches specs from the registry without
75            local installation. This is useful for scenarios where you need to validate
76            configurations but don't need to run the connector locally (e.g., deploying to Cloud).
77    """
78    executor = get_connector_executor(
79        name=name,
80        version=version,
81        use_python=use_python,
82        pip_url=pip_url,
83        local_executable=local_executable,
84        docker_image=docker_image,
85        use_host_network=use_host_network,
86        install_if_missing=install_if_missing,
87        install_root=install_root,
88        no_executor=no_executor,
89    )
90
91    return Destination(
92        name=name,
93        config=config,
94        config_change_callback=config_change_callback,
95        executor=executor,
96    )

Get a connector by name and version.

Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • use_python: (Optional.) Python interpreter specification:
    • True: Use current Python interpreter. (Inferred if pip_url is set.)
    • False: Use Docker instead.
    • Path: Use interpreter at this path.
    • str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
  • install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
  • no_executor: If True, use NoOpExecutor which fetches specs from the registry without local installation. This is useful for scenarios where you need to validate configurations but don't need to run the connector locally (e.g., deploying to Cloud).
def get_noop_destination( *, install_if_missing: bool = True) -> Destination:
 99def get_noop_destination(
100    *,
101    install_if_missing: bool = True,
102) -> Destination:
103    """Get a devnull (no-op) destination.
104
105    This is useful for performance benchmarking of sources, without
106    adding the overhead of writing data to a real destination.
107    """
108    return get_destination(
109        "destination-dev-null",
110        config={
111            "test_destination": {
112                "test_destination_type": "SILENT",
113            }
114        },
115        docker_image=True,
116        install_if_missing=install_if_missing,
117    )

Get a devnull (no-op) destination.

This is useful for performance benchmarking of sources, without adding the overhead of writing data to a real destination.

class Destination(airbyte._connector_base.ConnectorBase, airbyte._writers.base.AirbyteWriterInterface):
 39class Destination(ConnectorBase, AirbyteWriterInterface):
 40    """A class representing a destination that can be called."""
 41
 42    connector_type = "destination"
 43
 44    def __init__(
 45        self,
 46        executor: Executor,
 47        name: str,
 48        config: dict[str, Any] | None = None,
 49        *,
 50        config_change_callback: ConfigChangeCallback | None = None,
 51        validate: bool = False,
 52    ) -> None:
 53        """Initialize the source.
 54
 55        If config is provided, it will be validated against the spec if validate is True.
 56        """
 57        super().__init__(
 58            executor=executor,
 59            name=name,
 60            config=config,
 61            config_change_callback=config_change_callback,
 62            validate=validate,
 63        )
 64
 65    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 66        self,
 67        source_data: Source | ReadResult,
 68        *,
 69        streams: list[str] | Literal["*"] | None = None,
 70        cache: CacheBase | Literal[False] | None = None,
 71        state_cache: CacheBase | Literal[False] | None = None,
 72        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 73        force_full_refresh: bool = False,
 74    ) -> WriteResult:
 75        """Write data from source connector or already cached source data.
 76
 77        Caching is enabled by default, unless explicitly disabled.
 78
 79        Args:
 80            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 81            streams: The streams to write to the destination. If omitted or if "*" is provided,
 82                all streams will be written. If `source_data` is a source, then streams must be
 83                selected here or on the source. If both are specified, this setting will override
 84                the stream selection on the source.
 85            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 86                False, the cache will be disabled. This must be `None` if `source_data` is already
 87                a `Cache` object.
 88            state_cache: A cache to use for storing incremental state. You do not need to set this
 89                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 90                disable state management.
 91            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 92                will decide the best strategy to use.
 93            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 94                existing state will be ignored and all source data will be reloaded.
 95
 96        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
 97        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
 98        a known destination state, the destination-specific state will be used. If neither are
 99        available, a full refresh will be performed.
100        """
101        if not isinstance(source_data, ReadResult | Source):
102            raise exc.PyAirbyteInputError(
103                message="Invalid source_data type for `source_data` arg.",
104                context={
105                    "source_data_type_provided": type(source_data).__name__,
106                },
107            )
108
109        # Resolve `source`, `read_result`, and `source_name`
110        source: Source | None = source_data if isinstance(source_data, Source) else None
111        read_result: ReadResult | None = (
112            source_data if isinstance(source_data, ReadResult) else None
113        )
114        source_name: str = source.name if source else cast("ReadResult", read_result).source_name
115
116        # State providers and writers default to no-op, unless overridden below.
117        cache_state_provider: StateProviderBase = StaticInputState([])
118        """Provides the state of the cache's data."""
119        cache_state_writer: StateWriterBase = NoOpStateWriter()
120        """Writes updates for the state of the cache's data."""
121        destination_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the destination's data, from `cache` or `state_cache`."""
123        destination_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
125
126        # If caching not explicitly disabled
127        if cache is not False:
128            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
129            if isinstance(source_data, ReadResult):
130                cache = source_data.cache
131
132            cache = cache or get_default_cache()
133            cache_state_provider = cache.get_state_provider(
134                source_name=source_name,
135                destination_name=None,  # This will just track the cache state
136            )
137            cache_state_writer = cache.get_state_writer(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141
142        # Resolve `state_cache`
143        if state_cache is None:
144            state_cache = cache or get_default_cache()
145
146        # Resolve `destination_state_writer` and `destination_state_provider`
147        if state_cache:
148            destination_state_writer = state_cache.get_state_writer(
149                source_name=source_name,
150                destination_name=self.name,
151            )
152            if not force_full_refresh:
153                destination_state_provider = state_cache.get_state_provider(
154                    source_name=source_name,
155                    destination_name=self.name,
156                )
157        elif state_cache is not False:
158            warnings.warn(
159                "No state backend or cache provided. State will not be tracked."
160                "To track state, provide a cache or state backend."
161                "To silence this warning, set `state_cache=False` explicitly.",
162                category=exc.PyAirbyteWarning,
163                stacklevel=2,
164            )
165
166        # Resolve `catalog_provider`
167        if source:
168            catalog_provider = CatalogProvider(
169                configured_catalog=source.get_configured_catalog(
170                    streams=streams,
171                    force_full_refresh=force_full_refresh,
172                )
173            )
174        elif read_result:
175            catalog_provider = CatalogProvider.from_read_result(read_result)
176        else:
177            raise exc.PyAirbyteInternalError(
178                message="`source_data` must be a `Source` or `ReadResult` object.",
179            )
180
181        progress_tracker = ProgressTracker(
182            source=source if isinstance(source_data, Source) else None,
183            cache=cache or None,
184            destination=self,
185            expected_streams=catalog_provider.stream_names,
186        )
187
188        source_state_provider: StateProviderBase
189        source_state_provider = JoinedStateProvider(
190            primary=cache_state_provider,
191            secondary=destination_state_provider,
192        )
193
194        if source:
195            if cache is False:
196                # Get message iterator for source (caching disabled)
197                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
198                    streams=streams,
199                    state_provider=source_state_provider,
200                    progress_tracker=progress_tracker,
201                    force_full_refresh=force_full_refresh,
202                )
203            else:
204                # Caching enabled and we are reading from a source.
205                # Read the data to cache if caching is enabled.
206                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
207                    cache=cache,
208                    state_provider=source_state_provider,
209                    state_writer=cache_state_writer,
210                    catalog_provider=catalog_provider,
211                    stream_names=catalog_provider.stream_names,
212                    write_strategy=write_strategy,
213                    force_full_refresh=force_full_refresh,
214                    skip_validation=False,
215                    progress_tracker=progress_tracker,
216                )
217                message_iterator = AirbyteMessageIterator.from_read_result(
218                    read_result=read_result,
219                )
220        else:  # Else we are reading from a read result
221            assert read_result is not None
222            message_iterator = AirbyteMessageIterator.from_read_result(
223                read_result=read_result,
224            )
225
226        # Write the data to the destination
227        try:
228            self._write_airbyte_message_stream(
229                stdin=message_iterator,
230                catalog_provider=catalog_provider,
231                write_strategy=write_strategy,
232                state_writer=destination_state_writer,
233                progress_tracker=progress_tracker,
234            )
235        except Exception as ex:
236            progress_tracker.log_failure(exception=ex)
237            raise
238        else:
239            # No exceptions were raised, so log success
240            progress_tracker.log_success()
241
242        return WriteResult(
243            destination=self,
244            source_data=source_data,
245            catalog_provider=catalog_provider,
246            state_writer=destination_state_writer,
247            progress_tracker=progress_tracker,
248        )
249
250    def _write_airbyte_message_stream(
251        self,
252        stdin: IO[str] | AirbyteMessageIterator,
253        *,
254        catalog_provider: CatalogProvider,
255        write_strategy: WriteStrategy,
256        state_writer: StateWriterBase | None = None,
257        progress_tracker: ProgressTracker,
258    ) -> None:
259        """Read from the connector and write to the cache."""
260        # Run optional validation step
261        if state_writer is None:
262            state_writer = StdOutStateWriter()
263
264        # Apply the write strategy to the catalog provider before sending to the destination
265        catalog_provider = catalog_provider.with_write_strategy(write_strategy)
266
267        with as_temp_files(
268            files_contents=[
269                self._hydrated_config,
270                catalog_provider.configured_catalog.model_dump_json(exclude_none=True),
271            ]
272        ) as [
273            config_file,
274            catalog_file,
275        ]:
276            try:
277                # We call the connector to write the data, tallying the inputs and outputs
278                for destination_message in progress_tracker.tally_confirmed_writes(
279                    messages=self._execute(
280                        args=[
281                            "write",
282                            "--config",
283                            config_file,
284                            "--catalog",
285                            catalog_file,
286                        ],
287                        stdin=AirbyteMessageIterator(
288                            progress_tracker.tally_pending_writes(
289                                stdin,
290                            )
291                        ),
292                    )
293                ):
294                    if destination_message.state:
295                        state_writer.write_state(state_message=destination_message.state)
296
297            except exc.AirbyteConnectorFailedError as ex:
298                raise exc.AirbyteConnectorWriteError(
299                    connector_name=self.name,
300                    log_text=self._last_log_messages,
301                    original_exception=ex,
302                ) from None

A class representing a destination that can be called.

Destination( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, validate: bool = False)
44    def __init__(
45        self,
46        executor: Executor,
47        name: str,
48        config: dict[str, Any] | None = None,
49        *,
50        config_change_callback: ConfigChangeCallback | None = None,
51        validate: bool = False,
52    ) -> None:
53        """Initialize the source.
54
55        If config is provided, it will be validated against the spec if validate is True.
56        """
57        super().__init__(
58            executor=executor,
59            name=name,
60            config=config,
61            config_change_callback=config_change_callback,
62            validate=validate,
63        )

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'destination'
def write( self, source_data: airbyte.Source | airbyte.ReadResult, *, streams: Union[list[str], Literal['*'], NoneType] = None, cache: Union[airbyte.caches.CacheBase, Literal[False], NoneType] = None, state_cache: Union[airbyte.caches.CacheBase, Literal[False], NoneType] = None, write_strategy: airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False) -> airbyte.WriteResult:
 65    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 66        self,
 67        source_data: Source | ReadResult,
 68        *,
 69        streams: list[str] | Literal["*"] | None = None,
 70        cache: CacheBase | Literal[False] | None = None,
 71        state_cache: CacheBase | Literal[False] | None = None,
 72        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 73        force_full_refresh: bool = False,
 74    ) -> WriteResult:
 75        """Write data from source connector or already cached source data.
 76
 77        Caching is enabled by default, unless explicitly disabled.
 78
 79        Args:
 80            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 81            streams: The streams to write to the destination. If omitted or if "*" is provided,
 82                all streams will be written. If `source_data` is a source, then streams must be
 83                selected here or on the source. If both are specified, this setting will override
 84                the stream selection on the source.
 85            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 86                False, the cache will be disabled. This must be `None` if `source_data` is already
 87                a `Cache` object.
 88            state_cache: A cache to use for storing incremental state. You do not need to set this
 89                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 90                disable state management.
 91            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 92                will decide the best strategy to use.
 93            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 94                existing state will be ignored and all source data will be reloaded.
 95
 96        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
 97        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
 98        a known destination state, the destination-specific state will be used. If neither are
 99        available, a full refresh will be performed.
100        """
101        if not isinstance(source_data, ReadResult | Source):
102            raise exc.PyAirbyteInputError(
103                message="Invalid source_data type for `source_data` arg.",
104                context={
105                    "source_data_type_provided": type(source_data).__name__,
106                },
107            )
108
109        # Resolve `source`, `read_result`, and `source_name`
110        source: Source | None = source_data if isinstance(source_data, Source) else None
111        read_result: ReadResult | None = (
112            source_data if isinstance(source_data, ReadResult) else None
113        )
114        source_name: str = source.name if source else cast("ReadResult", read_result).source_name
115
116        # State providers and writers default to no-op, unless overridden below.
117        cache_state_provider: StateProviderBase = StaticInputState([])
118        """Provides the state of the cache's data."""
119        cache_state_writer: StateWriterBase = NoOpStateWriter()
120        """Writes updates for the state of the cache's data."""
121        destination_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the destination's data, from `cache` or `state_cache`."""
123        destination_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
125
126        # If caching not explicitly disabled
127        if cache is not False:
128            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
129            if isinstance(source_data, ReadResult):
130                cache = source_data.cache
131
132            cache = cache or get_default_cache()
133            cache_state_provider = cache.get_state_provider(
134                source_name=source_name,
135                destination_name=None,  # This will just track the cache state
136            )
137            cache_state_writer = cache.get_state_writer(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141
142        # Resolve `state_cache`
143        if state_cache is None:
144            state_cache = cache or get_default_cache()
145
146        # Resolve `destination_state_writer` and `destination_state_provider`
147        if state_cache:
148            destination_state_writer = state_cache.get_state_writer(
149                source_name=source_name,
150                destination_name=self.name,
151            )
152            if not force_full_refresh:
153                destination_state_provider = state_cache.get_state_provider(
154                    source_name=source_name,
155                    destination_name=self.name,
156                )
157        elif state_cache is not False:
158            warnings.warn(
159                "No state backend or cache provided. State will not be tracked."
160                "To track state, provide a cache or state backend."
161                "To silence this warning, set `state_cache=False` explicitly.",
162                category=exc.PyAirbyteWarning,
163                stacklevel=2,
164            )
165
166        # Resolve `catalog_provider`
167        if source:
168            catalog_provider = CatalogProvider(
169                configured_catalog=source.get_configured_catalog(
170                    streams=streams,
171                    force_full_refresh=force_full_refresh,
172                )
173            )
174        elif read_result:
175            catalog_provider = CatalogProvider.from_read_result(read_result)
176        else:
177            raise exc.PyAirbyteInternalError(
178                message="`source_data` must be a `Source` or `ReadResult` object.",
179            )
180
181        progress_tracker = ProgressTracker(
182            source=source if isinstance(source_data, Source) else None,
183            cache=cache or None,
184            destination=self,
185            expected_streams=catalog_provider.stream_names,
186        )
187
188        source_state_provider: StateProviderBase
189        source_state_provider = JoinedStateProvider(
190            primary=cache_state_provider,
191            secondary=destination_state_provider,
192        )
193
194        if source:
195            if cache is False:
196                # Get message iterator for source (caching disabled)
197                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
198                    streams=streams,
199                    state_provider=source_state_provider,
200                    progress_tracker=progress_tracker,
201                    force_full_refresh=force_full_refresh,
202                )
203            else:
204                # Caching enabled and we are reading from a source.
205                # Read the data to cache if caching is enabled.
206                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
207                    cache=cache,
208                    state_provider=source_state_provider,
209                    state_writer=cache_state_writer,
210                    catalog_provider=catalog_provider,
211                    stream_names=catalog_provider.stream_names,
212                    write_strategy=write_strategy,
213                    force_full_refresh=force_full_refresh,
214                    skip_validation=False,
215                    progress_tracker=progress_tracker,
216                )
217                message_iterator = AirbyteMessageIterator.from_read_result(
218                    read_result=read_result,
219                )
220        else:  # Else we are reading from a read result
221            assert read_result is not None
222            message_iterator = AirbyteMessageIterator.from_read_result(
223                read_result=read_result,
224            )
225
226        # Write the data to the destination
227        try:
228            self._write_airbyte_message_stream(
229                stdin=message_iterator,
230                catalog_provider=catalog_provider,
231                write_strategy=write_strategy,
232                state_writer=destination_state_writer,
233                progress_tracker=progress_tracker,
234            )
235        except Exception as ex:
236            progress_tracker.log_failure(exception=ex)
237            raise
238        else:
239            # No exceptions were raised, so log success
240            progress_tracker.log_success()
241
242        return WriteResult(
243            destination=self,
244            source_data=source_data,
245            catalog_provider=catalog_provider,
246            state_writer=destination_state_writer,
247            progress_tracker=progress_tracker,
248        )

Write data from source connector or already cached source data.

Caching is enabled by default, unless explicitly disabled.

Arguments:
  • source_data: The source data to write. Can be a Source or a ReadResult object.
  • streams: The streams to write to the destination. If omitted or if "*" is provided, all streams will be written. If source_data is a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source.
  • cache: The cache to use for reading source_data. If None, no cache will be used. If False, the cache will be disabled. This must be None if source_data is already a Cache object.
  • state_cache: A cache to use for storing incremental state. You do not need to set this if cache is specified or if source_data is a Cache object. Set to False to disable state management.
  • write_strategy: The strategy to use for writing source_data. If AUTO, the connector will decide the best strategy to use.
  • force_full_refresh: Whether to force a full refresh of the source_data. If True, any existing state will be ignored and all source data will be reloaded.

For incremental syncs, cache or state_cache will be checked for matching state values. If the cache has tracked state, this will be used for the sync. Otherwise, if there is a known destination state, the destination-specific state will be used. If neither are available, a full refresh will be performed.