airbyte.destinations

Destinations module.

This module contains classes and methods for interacting with Airbyte destinations. You can use this module to create custom destinations, or to interact with existing destinations.

Getting Started

To get started with destinations, you can use the get_destination() method to create a destination object. This method takes a destination name and configuration, and returns a destination object that you can use to write data to the destination.

import airbyte as ab

my_destination = ab.get_destination(
    "destination-foo",
    config={"api_key": "my_api_key"},
    docker_image=True,
)

Writing Data to a Destination

To write data to a destination, you can use the Destination.write() method. This method takes either a airbyte.Source or airbyte.ReadResult object.

Writing to a destination from a source

To write directly from a source, simply pass the source object to the Destination.write() method:

my_source = get_source(...)
my_destination = get_destination(...)
my_destination.write(source_faker)

Writing from a read result:

To write from a read result, you can use the following pattern. First, read data from the source, then write the data to the destination, using the ReadResult object as a buffer between the source and destination:

# First read data from the source:
my_source = get_source(...)
read_result = my_source.read(...)

# Optionally, you can validate data before writing it:
# ...misc validation code here...

# Then write the data to the destination:
my_destination.write(read_result)

Using Docker and Python-based Connectors

By default, the get_destination() method will look for a Python-based connector. If you want to use a Docker-based connector, you can set the docker_image parameter to True:

my_destination = ab.get_destination(
    "destination-foo",
    config={"api_key": "my_api_key"},
    docker_image=True,
)

Note: Unlike source connectors, most destination connectors are written in Java, and for this reason are only available as Docker-based connectors. If you need to load to a SQL database and your runtime does not support docker, you may want to use the airbyte.caches module to load data to a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally to PyAirbyte so they can run anywhere that PyAirbyte can run.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Destinations module.
  3
  4This module contains classes and methods for interacting with Airbyte destinations. You can use this
  5module to create custom destinations, or to interact with existing destinations.
  6
  7## Getting Started
  8
  9To get started with destinations, you can use the `get_destination()` method to create a destination
 10object. This method takes a destination name and configuration, and returns a destination object
 11that you can use to write data to the destination.
 12
 13```python
 14import airbyte as ab
 15
 16my_destination = ab.get_destination(
 17    "destination-foo",
 18    config={"api_key": "my_api_key"},
 19    docker_image=True,
 20)
 21```
 22
 23## Writing Data to a Destination
 24
 25To write data to a destination, you can use the `Destination.write()` method. This method
 26takes either a `airbyte.Source` or `airbyte.ReadResult` object.
 27
 28## Writing to a destination from a source
 29
 30To write directly from a source, simply pass the source object to the `Destination.write()` method:
 31
 32```python
 33my_source = get_source(...)
 34my_destination = get_destination(...)
 35my_destination.write(source_faker)
 36```
 37
 38## Writing from a read result:
 39
 40To write from a read result, you can use the following pattern. First, read data from the source,
 41then write the data to the destination, using the `ReadResult` object as a buffer between the source
 42and destination:
 43
 44```python
 45# First read data from the source:
 46my_source = get_source(...)
 47read_result = my_source.read(...)
 48
 49# Optionally, you can validate data before writing it:
 50# ...misc validation code here...
 51
 52# Then write the data to the destination:
 53my_destination.write(read_result)
 54```
 55
 56## Using Docker and Python-based Connectors
 57
 58By default, the `get_destination()` method will look for a Python-based connector. If you want to
 59use a Docker-based connector, you can set the `docker_image` parameter to `True`:
 60
 61```python
 62my_destination = ab.get_destination(
 63    "destination-foo",
 64    config={"api_key": "my_api_key"},
 65    docker_image=True,
 66)
 67```
 68
 69**Note:** Unlike source connectors, most destination connectors are written in Java, and for this
 70reason are only available as Docker-based connectors. If you need to load to a SQL database and your
 71runtime does not support docker, you may want to use the `airbyte.caches` module to load data to
 72a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally
 73to PyAirbyte so they can run anywhere that PyAirbyte can run.
 74"""
 75
 76from __future__ import annotations
 77
 78from typing import TYPE_CHECKING
 79
 80from airbyte.destinations.base import Destination
 81from airbyte.destinations.util import (
 82    get_destination,
 83    get_noop_destination,
 84)
 85
 86
 87# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
 88if TYPE_CHECKING:
 89    # ruff: noqa: TC004  # imports used for more than type checking
 90    from airbyte.destinations import util
 91
 92
 93__all__ = [
 94    # Modules
 95    "util",
 96    # Methods
 97    "get_destination",
 98    "get_noop_destination",
 99    # Classes
100    "Destination",
101]
def get_destination( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: str | bool | None = None, use_host_network: bool = False, install_if_missing: bool = True) -> Destination:
22def get_destination(  # noqa: PLR0913 # Too many arguments
23    name: str,
24    config: dict[str, Any] | None = None,
25    *,
26    config_change_callback: ConfigChangeCallback | None = None,
27    version: str | None = None,
28    pip_url: str | None = None,
29    local_executable: Path | str | None = None,
30    docker_image: str | bool | None = None,
31    use_host_network: bool = False,
32    install_if_missing: bool = True,
33) -> Destination:
34    """Get a connector by name and version.
35
36    Args:
37        name: connector name
38        config: connector config - if not provided, you need to set it later via the set_config
39            method.
40        config_change_callback: callback function to be called when the connector config changes.
41        streams: list of stream names to select for reading. If set to "*", all streams will be
42            selected. If not provided, you can set it later via the `select_streams()` or
43            `select_all_streams()` method.
44        version: connector version - if not provided, the currently installed version will be used.
45            If no version is installed, the latest available version will be used. The version can
46            also be set to "latest" to force the use of the latest available version.
47        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
48            connector name.
49        local_executable: If set, the connector will be assumed to already be installed and will be
50            executed using this path or executable name. Otherwise, the connector will be installed
51            automatically in a virtual environment.
52        docker_image: If set, the connector will be executed using Docker. You can specify `True`
53            to use the default image for the connector, or you can specify a custom image name.
54            If `version` is specified and your image name does not already contain a tag
55            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
56        use_host_network: If set, along with docker_image, the connector will be executed using
57            the host network. This is useful for connectors that need to access resources on
58            the host machine, such as a local database. This parameter is ignored when
59            `docker_image` is not set.
60        install_if_missing: Whether to install the connector if it is not available locally. This
61            parameter is ignored when local_executable is set.
62    """
63    return Destination(
64        name=name,
65        config=config,
66        config_change_callback=config_change_callback,
67        executor=get_connector_executor(
68            name=name,
69            version=version,
70            pip_url=pip_url,
71            local_executable=local_executable,
72            docker_image=docker_image,
73            use_host_network=use_host_network,
74            install_if_missing=install_if_missing,
75        ),
76    )

Get a connector by name and version.

Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
def get_noop_destination( *, install_if_missing: bool = True) -> Destination:
 79def get_noop_destination(
 80    *,
 81    install_if_missing: bool = True,
 82) -> Destination:
 83    """Get a devnull (no-op) destination.
 84
 85    This is useful for performance benchmarking of sources, without
 86    adding the overhead of writing data to a real destination.
 87    """
 88    return get_destination(
 89        "destination-dev-null",
 90        config={
 91            "test_destination": {
 92                "test_destination_type": "LOGGING",
 93                "logging_config": {
 94                    "logging_type": "FirstN",
 95                    "max_entry_count": 100,
 96                },
 97            }
 98        },
 99        docker_image=True,
100        install_if_missing=install_if_missing,
101    )

Get a devnull (no-op) destination.

This is useful for performance benchmarking of sources, without adding the overhead of writing data to a real destination.

class Destination(airbyte._connector_base.ConnectorBase, airbyte._writers.base.AirbyteWriterInterface):
 39class Destination(ConnectorBase, AirbyteWriterInterface):
 40    """A class representing a destination that can be called."""
 41
 42    connector_type = "destination"
 43
 44    def __init__(
 45        self,
 46        executor: Executor,
 47        name: str,
 48        config: dict[str, Any] | None = None,
 49        *,
 50        config_change_callback: ConfigChangeCallback | None = None,
 51        validate: bool = False,
 52    ) -> None:
 53        """Initialize the source.
 54
 55        If config is provided, it will be validated against the spec if validate is True.
 56        """
 57        super().__init__(
 58            executor=executor,
 59            name=name,
 60            config=config,
 61            config_change_callback=config_change_callback,
 62            validate=validate,
 63        )
 64
 65    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 66        self,
 67        source_data: Source | ReadResult,
 68        *,
 69        streams: list[str] | Literal["*"] | None = None,
 70        cache: CacheBase | Literal[False] | None = None,
 71        state_cache: CacheBase | Literal[False] | None = None,
 72        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 73        force_full_refresh: bool = False,
 74    ) -> WriteResult:
 75        """Write data from source connector or already cached source data.
 76
 77        Caching is enabled by default, unless explicitly disabled.
 78
 79        Args:
 80            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 81            streams: The streams to write to the destination. If omitted or if "*" is provided,
 82                all streams will be written. If `source_data` is a source, then streams must be
 83                selected here or on the source. If both are specified, this setting will override
 84                the stream selection on the source.
 85            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 86                False, the cache will be disabled. This must be `None` if `source_data` is already
 87                a `Cache` object.
 88            state_cache: A cache to use for storing incremental state. You do not need to set this
 89                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 90                disable state management.
 91            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 92                will decide the best strategy to use.
 93            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 94                existing state will be ignored and all source data will be reloaded.
 95
 96        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
 97        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
 98        a known destination state, the destination-specific state will be used. If neither are
 99        available, a full refresh will be performed.
100        """
101        if not isinstance(source_data, ReadResult | Source):
102            raise exc.PyAirbyteInputError(
103                message="Invalid source_data type for `source_data` arg.",
104                context={
105                    "source_data_type_provided": type(source_data).__name__,
106                },
107            )
108
109        # Resolve `source`, `read_result`, and `source_name`
110        source: Source | None = source_data if isinstance(source_data, Source) else None
111        read_result: ReadResult | None = (
112            source_data if isinstance(source_data, ReadResult) else None
113        )
114        source_name: str = source.name if source else cast("ReadResult", read_result).source_name
115
116        # State providers and writers default to no-op, unless overridden below.
117        cache_state_provider: StateProviderBase = StaticInputState([])
118        """Provides the state of the cache's data."""
119        cache_state_writer: StateWriterBase = NoOpStateWriter()
120        """Writes updates for the state of the cache's data."""
121        destination_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the destination's data, from `cache` or `state_cache`."""
123        destination_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
125
126        # If caching not explicitly disabled
127        if cache is not False:
128            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
129            if isinstance(source_data, ReadResult):
130                cache = source_data.cache
131
132            cache = cache or get_default_cache()
133            cache_state_provider = cache.get_state_provider(
134                source_name=source_name,
135                destination_name=None,  # This will just track the cache state
136            )
137            cache_state_writer = cache.get_state_writer(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141
142        # Resolve `state_cache`
143        if state_cache is None:
144            state_cache = cache or get_default_cache()
145
146        # Resolve `destination_state_writer` and `destination_state_provider`
147        if state_cache:
148            destination_state_writer = state_cache.get_state_writer(
149                source_name=source_name,
150                destination_name=self.name,
151            )
152            if not force_full_refresh:
153                destination_state_provider = state_cache.get_state_provider(
154                    source_name=source_name,
155                    destination_name=self.name,
156                )
157        elif state_cache is not False:
158            warnings.warn(
159                "No state backend or cache provided. State will not be tracked."
160                "To track state, provide a cache or state backend."
161                "To silence this warning, set `state_cache=False` explicitly.",
162                category=exc.PyAirbyteWarning,
163                stacklevel=2,
164            )
165
166        # Resolve `catalog_provider`
167        if source:
168            catalog_provider = CatalogProvider(
169                configured_catalog=source.get_configured_catalog(
170                    streams=streams,
171                )
172            )
173        elif read_result:
174            catalog_provider = CatalogProvider.from_read_result(read_result)
175        else:
176            raise exc.PyAirbyteInternalError(
177                message="`source_data` must be a `Source` or `ReadResult` object.",
178            )
179
180        progress_tracker = ProgressTracker(
181            source=source if isinstance(source_data, Source) else None,
182            cache=cache or None,
183            destination=self,
184            expected_streams=catalog_provider.stream_names,
185        )
186
187        source_state_provider: StateProviderBase
188        source_state_provider = JoinedStateProvider(
189            primary=cache_state_provider,
190            secondary=destination_state_provider,
191        )
192
193        if source:
194            if cache is False:
195                # Get message iterator for source (caching disabled)
196                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
197                    streams=streams,
198                    state_provider=source_state_provider,
199                    progress_tracker=progress_tracker,
200                    force_full_refresh=force_full_refresh,
201                )
202            else:
203                # Caching enabled and we are reading from a source.
204                # Read the data to cache if caching is enabled.
205                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
206                    cache=cache,
207                    state_provider=source_state_provider,
208                    state_writer=cache_state_writer,
209                    catalog_provider=catalog_provider,
210                    stream_names=catalog_provider.stream_names,
211                    write_strategy=write_strategy,
212                    force_full_refresh=force_full_refresh,
213                    skip_validation=False,
214                    progress_tracker=progress_tracker,
215                )
216                message_iterator = AirbyteMessageIterator.from_read_result(
217                    read_result=read_result,
218                )
219        else:  # Else we are reading from a read result
220            assert read_result is not None
221            message_iterator = AirbyteMessageIterator.from_read_result(
222                read_result=read_result,
223            )
224
225        # Write the data to the destination
226        try:
227            self._write_airbyte_message_stream(
228                stdin=message_iterator,
229                catalog_provider=catalog_provider,
230                write_strategy=write_strategy,
231                state_writer=destination_state_writer,
232                progress_tracker=progress_tracker,
233            )
234        except Exception as ex:
235            progress_tracker.log_failure(exception=ex)
236            raise
237        else:
238            # No exceptions were raised, so log success
239            progress_tracker.log_success()
240
241        return WriteResult(
242            destination=self,
243            source_data=source_data,
244            catalog_provider=catalog_provider,
245            state_writer=destination_state_writer,
246            progress_tracker=progress_tracker,
247        )
248
249    def _write_airbyte_message_stream(
250        self,
251        stdin: IO[str] | AirbyteMessageIterator,
252        *,
253        catalog_provider: CatalogProvider,
254        write_strategy: WriteStrategy,
255        state_writer: StateWriterBase | None = None,
256        progress_tracker: ProgressTracker,
257    ) -> None:
258        """Read from the connector and write to the cache."""
259        # Run optional validation step
260        if state_writer is None:
261            state_writer = StdOutStateWriter()
262
263        # Apply the write strategy to the catalog provider before sending to the destination
264        catalog_provider = catalog_provider.with_write_strategy(write_strategy)
265
266        with as_temp_files(
267            files_contents=[
268                self._config,
269                catalog_provider.configured_catalog.model_dump_json(),
270            ]
271        ) as [
272            config_file,
273            catalog_file,
274        ]:
275            try:
276                # We call the connector to write the data, tallying the inputs and outputs
277                for destination_message in progress_tracker.tally_confirmed_writes(
278                    messages=self._execute(
279                        args=[
280                            "write",
281                            "--config",
282                            config_file,
283                            "--catalog",
284                            catalog_file,
285                        ],
286                        stdin=AirbyteMessageIterator(
287                            progress_tracker.tally_pending_writes(
288                                stdin,
289                            )
290                        ),
291                    )
292                ):
293                    if destination_message.state:
294                        state_writer.write_state(state_message=destination_message.state)
295
296            except exc.AirbyteConnectorFailedError as ex:
297                raise exc.AirbyteConnectorWriteError(
298                    connector_name=self.name,
299                    log_text=self._last_log_messages,
300                    original_exception=ex,
301                ) from None

A class representing a destination that can be called.

Destination( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, validate: bool = False)
44    def __init__(
45        self,
46        executor: Executor,
47        name: str,
48        config: dict[str, Any] | None = None,
49        *,
50        config_change_callback: ConfigChangeCallback | None = None,
51        validate: bool = False,
52    ) -> None:
53        """Initialize the source.
54
55        If config is provided, it will be validated against the spec if validate is True.
56        """
57        super().__init__(
58            executor=executor,
59            name=name,
60            config=config,
61            config_change_callback=config_change_callback,
62            validate=validate,
63        )

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type = 'destination'
def write( self, source_data: airbyte.Source | airbyte.ReadResult, *, streams: Union[list[str], Literal['*'], NoneType] = None, cache: Union[airbyte.caches.CacheBase, Literal[False], NoneType] = None, state_cache: Union[airbyte.caches.CacheBase, Literal[False], NoneType] = None, write_strategy: airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False) -> airbyte.WriteResult:
 65    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 66        self,
 67        source_data: Source | ReadResult,
 68        *,
 69        streams: list[str] | Literal["*"] | None = None,
 70        cache: CacheBase | Literal[False] | None = None,
 71        state_cache: CacheBase | Literal[False] | None = None,
 72        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 73        force_full_refresh: bool = False,
 74    ) -> WriteResult:
 75        """Write data from source connector or already cached source data.
 76
 77        Caching is enabled by default, unless explicitly disabled.
 78
 79        Args:
 80            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 81            streams: The streams to write to the destination. If omitted or if "*" is provided,
 82                all streams will be written. If `source_data` is a source, then streams must be
 83                selected here or on the source. If both are specified, this setting will override
 84                the stream selection on the source.
 85            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 86                False, the cache will be disabled. This must be `None` if `source_data` is already
 87                a `Cache` object.
 88            state_cache: A cache to use for storing incremental state. You do not need to set this
 89                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 90                disable state management.
 91            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 92                will decide the best strategy to use.
 93            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 94                existing state will be ignored and all source data will be reloaded.
 95
 96        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
 97        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
 98        a known destination state, the destination-specific state will be used. If neither are
 99        available, a full refresh will be performed.
100        """
101        if not isinstance(source_data, ReadResult | Source):
102            raise exc.PyAirbyteInputError(
103                message="Invalid source_data type for `source_data` arg.",
104                context={
105                    "source_data_type_provided": type(source_data).__name__,
106                },
107            )
108
109        # Resolve `source`, `read_result`, and `source_name`
110        source: Source | None = source_data if isinstance(source_data, Source) else None
111        read_result: ReadResult | None = (
112            source_data if isinstance(source_data, ReadResult) else None
113        )
114        source_name: str = source.name if source else cast("ReadResult", read_result).source_name
115
116        # State providers and writers default to no-op, unless overridden below.
117        cache_state_provider: StateProviderBase = StaticInputState([])
118        """Provides the state of the cache's data."""
119        cache_state_writer: StateWriterBase = NoOpStateWriter()
120        """Writes updates for the state of the cache's data."""
121        destination_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the destination's data, from `cache` or `state_cache`."""
123        destination_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
125
126        # If caching not explicitly disabled
127        if cache is not False:
128            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
129            if isinstance(source_data, ReadResult):
130                cache = source_data.cache
131
132            cache = cache or get_default_cache()
133            cache_state_provider = cache.get_state_provider(
134                source_name=source_name,
135                destination_name=None,  # This will just track the cache state
136            )
137            cache_state_writer = cache.get_state_writer(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141
142        # Resolve `state_cache`
143        if state_cache is None:
144            state_cache = cache or get_default_cache()
145
146        # Resolve `destination_state_writer` and `destination_state_provider`
147        if state_cache:
148            destination_state_writer = state_cache.get_state_writer(
149                source_name=source_name,
150                destination_name=self.name,
151            )
152            if not force_full_refresh:
153                destination_state_provider = state_cache.get_state_provider(
154                    source_name=source_name,
155                    destination_name=self.name,
156                )
157        elif state_cache is not False:
158            warnings.warn(
159                "No state backend or cache provided. State will not be tracked."
160                "To track state, provide a cache or state backend."
161                "To silence this warning, set `state_cache=False` explicitly.",
162                category=exc.PyAirbyteWarning,
163                stacklevel=2,
164            )
165
166        # Resolve `catalog_provider`
167        if source:
168            catalog_provider = CatalogProvider(
169                configured_catalog=source.get_configured_catalog(
170                    streams=streams,
171                )
172            )
173        elif read_result:
174            catalog_provider = CatalogProvider.from_read_result(read_result)
175        else:
176            raise exc.PyAirbyteInternalError(
177                message="`source_data` must be a `Source` or `ReadResult` object.",
178            )
179
180        progress_tracker = ProgressTracker(
181            source=source if isinstance(source_data, Source) else None,
182            cache=cache or None,
183            destination=self,
184            expected_streams=catalog_provider.stream_names,
185        )
186
187        source_state_provider: StateProviderBase
188        source_state_provider = JoinedStateProvider(
189            primary=cache_state_provider,
190            secondary=destination_state_provider,
191        )
192
193        if source:
194            if cache is False:
195                # Get message iterator for source (caching disabled)
196                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
197                    streams=streams,
198                    state_provider=source_state_provider,
199                    progress_tracker=progress_tracker,
200                    force_full_refresh=force_full_refresh,
201                )
202            else:
203                # Caching enabled and we are reading from a source.
204                # Read the data to cache if caching is enabled.
205                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
206                    cache=cache,
207                    state_provider=source_state_provider,
208                    state_writer=cache_state_writer,
209                    catalog_provider=catalog_provider,
210                    stream_names=catalog_provider.stream_names,
211                    write_strategy=write_strategy,
212                    force_full_refresh=force_full_refresh,
213                    skip_validation=False,
214                    progress_tracker=progress_tracker,
215                )
216                message_iterator = AirbyteMessageIterator.from_read_result(
217                    read_result=read_result,
218                )
219        else:  # Else we are reading from a read result
220            assert read_result is not None
221            message_iterator = AirbyteMessageIterator.from_read_result(
222                read_result=read_result,
223            )
224
225        # Write the data to the destination
226        try:
227            self._write_airbyte_message_stream(
228                stdin=message_iterator,
229                catalog_provider=catalog_provider,
230                write_strategy=write_strategy,
231                state_writer=destination_state_writer,
232                progress_tracker=progress_tracker,
233            )
234        except Exception as ex:
235            progress_tracker.log_failure(exception=ex)
236            raise
237        else:
238            # No exceptions were raised, so log success
239            progress_tracker.log_success()
240
241        return WriteResult(
242            destination=self,
243            source_data=source_data,
244            catalog_provider=catalog_provider,
245            state_writer=destination_state_writer,
246            progress_tracker=progress_tracker,
247        )

Write data from source connector or already cached source data.

Caching is enabled by default, unless explicitly disabled.

Arguments:
  • source_data: The source data to write. Can be a Source or a ReadResult object.
  • streams: The streams to write to the destination. If omitted or if "*" is provided, all streams will be written. If source_data is a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source.
  • cache: The cache to use for reading source_data. If None, no cache will be used. If False, the cache will be disabled. This must be None if source_data is already a Cache object.
  • state_cache: A cache to use for storing incremental state. You do not need to set this if cache is specified or if source_data is a Cache object. Set to False to disable state management.
  • write_strategy: The strategy to use for writing source_data. If AUTO, the connector will decide the best strategy to use.
  • force_full_refresh: Whether to force a full refresh of the source_data. If True, any existing state will be ignored and all source data will be reloaded.

For incremental syncs, cache or state_cache will be checked for matching state values. If the cache has tracked state, this will be used for the sync. Otherwise, if there is a known destination state, the destination-specific state will be used. If neither are available, a full refresh will be performed.

Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
set_config
get_config
config_hash
validate_config
config_spec
print_config_spec
docs_url
connector_version
check
install
uninstall