airbyte.destinations

Destinations module.

This module contains classes and methods for interacting with Airbyte destinations. You can use this module to create custom destinations, or to interact with existing destinations.

Getting Started

To get started with destinations, you can use the get_destination() method to create a destination object. This method takes a destination name and configuration, and returns a destination object that you can use to write data to the destination.

import airbyte as ab

my_destination = ab.get_destination(
    "destination-foo",
    config={"api_key": "my_api_key"},
    docker_image=True,
)

Writing Data to a Destination

To write data to a destination, you can use the Destination.write() method. This method takes either a airbyte.Source or airbyte.ReadResult object.

Writing to a destination from a source

To write directly from a source, simply pass the source object to the Destination.write() method:

my_source = get_source(...)
my_destination = get_destination(...)
my_destination.write(source_faker)

Writing from a read result:

To write from a read result, you can use the following pattern. First, read data from the source, then write the data to the destination, using the ReadResult object as a buffer between the source and destination:

# First read data from the source:
my_source = get_source(...)
read_result = my_source.read(...)

# Optionally, you can validate data before writing it:
# ...misc validation code here...

# Then write the data to the destination:
my_destination.write(read_result)

Using Docker and Python-based Connectors

By default, the get_destination() method will look for a Python-based connector. If you want to use a Docker-based connector, you can set the docker_image parameter to True:

my_destination = ab.get_destination(
    "destination-foo",
    config={"api_key": "my_api_key"},
    docker_image=True,
)

Note: Unlike source connectors, most destination connectors are written in Java, and for this reason are only available as Docker-based connectors. If you need to load to a SQL database and your runtime does not support docker, you may want to use the airbyte.caches module to load data to a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally to PyAirbyte so they can run anywhere that PyAirbyte can run.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Destinations module.
 3
 4This module contains classes and methods for interacting with Airbyte destinations. You can use this
 5module to create custom destinations, or to interact with existing destinations.
 6
 7## Getting Started
 8
 9To get started with destinations, you can use the `get_destination()` method to create a destination
10object. This method takes a destination name and configuration, and returns a destination object
11that you can use to write data to the destination.
12
13```python
14import airbyte as ab
15
16my_destination = ab.get_destination(
17    "destination-foo",
18    config={"api_key": "my_api_key"},
19    docker_image=True,
20)
21```
22
23## Writing Data to a Destination
24
25To write data to a destination, you can use the `Destination.write()` method. This method
26takes either a `airbyte.Source` or `airbyte.ReadResult` object.
27
28## Writing to a destination from a source
29
30To write directly from a source, simply pass the source object to the `Destination.write()` method:
31
32```python
33my_source = get_source(...)
34my_destination = get_destination(...)
35my_destination.write(source_faker)
36```
37
38## Writing from a read result:
39
40To write from a read result, you can use the following pattern. First, read data from the source,
41then write the data to the destination, using the `ReadResult` object as a buffer between the source
42and destination:
43
44```python
45# First read data from the source:
46my_source = get_source(...)
47read_result = my_source.read(...)
48
49# Optionally, you can validate data before writing it:
50# ...misc validation code here...
51
52# Then write the data to the destination:
53my_destination.write(read_result)
54```
55
56## Using Docker and Python-based Connectors
57
58By default, the `get_destination()` method will look for a Python-based connector. If you want to
59use a Docker-based connector, you can set the `docker_image` parameter to `True`:
60
61```python
62my_destination = ab.get_destination(
63    "destination-foo",
64    config={"api_key": "my_api_key"},
65    docker_image=True,
66)
67```
68
69**Note:** Unlike source connectors, most destination connectors are written in Java, and for this
70reason are only available as Docker-based connectors. If you need to load to a SQL database and your
71runtime does not support docker, you may want to use the `airbyte.caches` module to load data to
72a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally
73to PyAirbyte so they can run anywhere that PyAirbyte can run.
74"""
75
76from __future__ import annotations
77
78from airbyte.destinations import util
79from airbyte.destinations.base import Destination
80from airbyte.destinations.util import (
81    get_destination,
82    get_noop_destination,
83)
84
85
86__all__ = [
87    # Modules
88    "util",
89    # Methods
90    "get_destination",
91    "get_noop_destination",
92    # Classes
93    "Destination",
94]
def get_destination( name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, docker_image: str | bool | None = None, use_host_network: bool = False, install_if_missing: bool = True) -> Destination:
22def get_destination(  # noqa: PLR0913 # Too many arguments
23    name: str,
24    config: dict[str, Any] | None = None,
25    *,
26    config_change_callback: ConfigChangeCallback | None = None,
27    version: str | None = None,
28    pip_url: str | None = None,
29    local_executable: Path | str | None = None,
30    docker_image: str | bool | None = None,
31    use_host_network: bool = False,
32    install_if_missing: bool = True,
33) -> Destination:
34    """Get a connector by name and version.
35
36    Args:
37        name: connector name
38        config: connector config - if not provided, you need to set it later via the set_config
39            method.
40        config_change_callback: callback function to be called when the connector config changes.
41        streams: list of stream names to select for reading. If set to "*", all streams will be
42            selected. If not provided, you can set it later via the `select_streams()` or
43            `select_all_streams()` method.
44        version: connector version - if not provided, the currently installed version will be used.
45            If no version is installed, the latest available version will be used. The version can
46            also be set to "latest" to force the use of the latest available version.
47        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
48            connector name.
49        local_executable: If set, the connector will be assumed to already be installed and will be
50            executed using this path or executable name. Otherwise, the connector will be installed
51            automatically in a virtual environment.
52        docker_image: If set, the connector will be executed using Docker. You can specify `True`
53            to use the default image for the connector, or you can specify a custom image name.
54            If `version` is specified and your image name does not already contain a tag
55            (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`).
56        use_host_network: If set, along with docker_image, the connector will be executed using
57            the host network. This is useful for connectors that need to access resources on
58            the host machine, such as a local database. This parameter is ignored when
59            `docker_image` is not set.
60        install_if_missing: Whether to install the connector if it is not available locally. This
61            parameter is ignored when local_executable is set.
62    """
63    return Destination(
64        name=name,
65        config=config,
66        config_change_callback=config_change_callback,
67        executor=get_connector_executor(
68            name=name,
69            version=version,
70            pip_url=pip_url,
71            local_executable=local_executable,
72            docker_image=docker_image,
73            use_host_network=use_host_network,
74            install_if_missing=install_if_missing,
75        ),
76    )

Get a connector by name and version.

Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • config_change_callback: callback function to be called when the connector config changes.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • docker_image: If set, the connector will be executed using Docker. You can specify True to use the default image for the connector, or you can specify a custom image name. If version is specified and your image name does not already contain a tag (e.g. my-image:latest), the version will be appended as a tag (e.g. my-image:0.1.0).
  • use_host_network: If set, along with docker_image, the connector will be executed using the host network. This is useful for connectors that need to access resources on the host machine, such as a local database. This parameter is ignored when docker_image is not set.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
def get_noop_destination() -> Destination:
79def get_noop_destination() -> Destination:
80    """Get a devnull (no-op) destination.
81
82    This is useful for performance benchmarking of sources, without
83    adding the overhead of writing data to a real destination.
84    """
85    return get_destination(
86        "destination-dev-null",
87        config={
88            "test_destination": {
89                "test_destination_type": "LOGGING",
90                "logging_config": {
91                    "logging_type": "FirstN",
92                    "max_entry_count": 100,
93                },
94            }
95        },
96        docker_image=True,
97    )

Get a devnull (no-op) destination.

This is useful for performance benchmarking of sources, without adding the overhead of writing data to a real destination.

class Destination(airbyte._connector_base.ConnectorBase, airbyte._writers.base.AirbyteWriterInterface):
 43class Destination(ConnectorBase, AirbyteWriterInterface):
 44    """A class representing a destination that can be called."""
 45
 46    connector_type: Literal["destination"] = "destination"
 47
 48    def __init__(
 49        self,
 50        executor: Executor,
 51        name: str,
 52        config: dict[str, Any] | None = None,
 53        *,
 54        config_change_callback: ConfigChangeCallback | None = None,
 55        validate: bool = False,
 56    ) -> None:
 57        """Initialize the source.
 58
 59        If config is provided, it will be validated against the spec if validate is True.
 60        """
 61        super().__init__(
 62            executor=executor,
 63            name=name,
 64            config=config,
 65            config_change_callback=config_change_callback,
 66            validate=validate,
 67        )
 68
 69    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 70        self,
 71        source_data: Source | ReadResult,
 72        *,
 73        streams: list[str] | Literal["*"] | None = None,
 74        cache: CacheBase | None | Literal[False] = None,
 75        state_cache: CacheBase | None | Literal[False] = None,
 76        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 77        force_full_refresh: bool = False,
 78    ) -> WriteResult:
 79        """Write data from source connector or already cached source data.
 80
 81        Caching is enabled by default, unless explicitly disabled.
 82
 83        Args:
 84            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 85            streams: The streams to write to the destination. If omitted or if "*" is provided,
 86                all streams will be written. If `source_data` is a source, then streams must be
 87                selected here or on the source. If both are specified, this setting will override
 88                the stream selection on the source.
 89            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 90                False, the cache will be disabled. This must be `None` if `source_data` is already
 91                a `Cache` object.
 92            state_cache: A cache to use for storing incremental state. You do not need to set this
 93                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 94                disable state management.
 95            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 96                will decide the best strategy to use.
 97            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 98                existing state will be ignored and all source data will be reloaded.
 99
100        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
101        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
102        a known destination state, the destination-specific state will be used. If neither are
103        available, a full refresh will be performed.
104        """
105        if not isinstance(source_data, ReadResult | Source):
106            raise exc.PyAirbyteInputError(
107                message="Invalid source_data type for `source_data` arg.",
108                context={
109                    "source_data_type_provided": type(source_data).__name__,
110                },
111            )
112
113        # Resolve `source`, `read_result`, and `source_name`
114        source: Source | None = source_data if isinstance(source_data, Source) else None
115        read_result: ReadResult | None = (
116            source_data if isinstance(source_data, ReadResult) else None
117        )
118        source_name: str = source.name if source else cast(ReadResult, read_result).source_name
119
120        # State providers and writers default to no-op, unless overridden below.
121        cache_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the cache's data."""
123        cache_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the cache's data."""
125        destination_state_provider: StateProviderBase = StaticInputState([])
126        """Provides the state of the destination's data, from `cache` or `state_cache`."""
127        destination_state_writer: StateWriterBase = NoOpStateWriter()
128        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
129
130        # If caching not explicitly disabled
131        if cache is not False:
132            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
133            if isinstance(source_data, ReadResult):
134                cache = source_data.cache
135
136            cache = cache or get_default_cache()
137            cache_state_provider = cache.get_state_provider(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141            cache_state_writer = cache.get_state_writer(
142                source_name=source_name,
143                destination_name=None,  # This will just track the cache state
144            )
145
146        # Resolve `state_cache`
147        if state_cache is None:
148            state_cache = cache or get_default_cache()
149
150        # Resolve `destination_state_writer` and `destination_state_provider`
151        if state_cache:
152            destination_state_writer = state_cache.get_state_writer(
153                source_name=source_name,
154                destination_name=self.name,
155            )
156            if not force_full_refresh:
157                destination_state_provider = state_cache.get_state_provider(
158                    source_name=source_name,
159                    destination_name=self.name,
160                )
161        elif state_cache is not False:
162            warnings.warn(
163                "No state backend or cache provided. State will not be tracked."
164                "To track state, provide a cache or state backend."
165                "To silence this warning, set `state_cache=False` explicitly.",
166                category=exc.PyAirbyteWarning,
167                stacklevel=2,
168            )
169
170        # Resolve `catalog_provider`
171        if source:
172            catalog_provider = CatalogProvider(
173                configured_catalog=source.get_configured_catalog(
174                    streams=streams,
175                )
176            )
177        elif read_result:
178            catalog_provider = CatalogProvider.from_read_result(read_result)
179        else:
180            raise exc.PyAirbyteInternalError(
181                message="`source_data` must be a `Source` or `ReadResult` object.",
182            )
183
184        progress_tracker = ProgressTracker(
185            source=source if isinstance(source_data, Source) else None,
186            cache=cache or None,
187            destination=self,
188            expected_streams=catalog_provider.stream_names,
189        )
190
191        source_state_provider: StateProviderBase
192        source_state_provider = JoinedStateProvider(
193            primary=cache_state_provider,
194            secondary=destination_state_provider,
195        )
196
197        if source:
198            if cache is False:
199                # Get message iterator for source (caching disabled)
200                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
201                    streams=streams,
202                    state_provider=source_state_provider,
203                    progress_tracker=progress_tracker,
204                    force_full_refresh=force_full_refresh,
205                )
206            else:
207                # Caching enabled and we are reading from a source.
208                # Read the data to cache if caching is enabled.
209                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
210                    cache=cache,
211                    state_provider=source_state_provider,
212                    state_writer=cache_state_writer,
213                    catalog_provider=catalog_provider,
214                    stream_names=catalog_provider.stream_names,
215                    write_strategy=write_strategy,
216                    force_full_refresh=force_full_refresh,
217                    skip_validation=False,
218                    progress_tracker=progress_tracker,
219                )
220                message_iterator = AirbyteMessageIterator.from_read_result(
221                    read_result=read_result,
222                )
223        else:  # Else we are reading from a read result
224            assert read_result is not None
225            message_iterator = AirbyteMessageIterator.from_read_result(
226                read_result=read_result,
227            )
228
229        # Write the data to the destination
230        try:
231            self._write_airbyte_message_stream(
232                stdin=message_iterator,
233                catalog_provider=catalog_provider,
234                write_strategy=write_strategy,
235                state_writer=destination_state_writer,
236                progress_tracker=progress_tracker,
237            )
238        except Exception as ex:
239            progress_tracker.log_failure(exception=ex)
240            raise
241        else:
242            # No exceptions were raised, so log success
243            progress_tracker.log_success()
244
245        return WriteResult(
246            destination=self,
247            source_data=source_data,
248            catalog_provider=catalog_provider,
249            state_writer=destination_state_writer,
250            progress_tracker=progress_tracker,
251        )
252
253    def _write_airbyte_message_stream(
254        self,
255        stdin: IO[str] | AirbyteMessageIterator,
256        *,
257        catalog_provider: CatalogProvider,
258        write_strategy: WriteStrategy,
259        state_writer: StateWriterBase | None = None,
260        progress_tracker: ProgressTracker,
261    ) -> None:
262        """Read from the connector and write to the cache."""
263        # Run optional validation step
264        if state_writer is None:
265            state_writer = StdOutStateWriter()
266
267        # Apply the write strategy to the catalog provider before sending to the destination
268        catalog_provider = catalog_provider.with_write_strategy(write_strategy)
269
270        with as_temp_files(
271            files_contents=[
272                self._config,
273                catalog_provider.configured_catalog.model_dump_json(),
274            ]
275        ) as [
276            config_file,
277            catalog_file,
278        ]:
279            try:
280                # We call the connector to write the data, tallying the inputs and outputs
281                for destination_message in progress_tracker.tally_confirmed_writes(
282                    messages=self._execute(
283                        args=[
284                            "write",
285                            "--config",
286                            config_file,
287                            "--catalog",
288                            catalog_file,
289                        ],
290                        stdin=AirbyteMessageIterator(
291                            progress_tracker.tally_pending_writes(
292                                stdin,
293                            )
294                        ),
295                    )
296                ):
297                    if destination_message.type is Type.STATE:
298                        state_writer.write_state(state_message=destination_message.state)
299
300            except exc.AirbyteConnectorFailedError as ex:
301                raise exc.AirbyteConnectorWriteError(
302                    connector_name=self.name,
303                    log_text=self._last_log_messages,
304                    original_exception=ex,
305                ) from None

A class representing a destination that can be called.

Destination( executor: airbyte._executors.base.Executor, name: str, config: dict[str, typing.Any] | None = None, *, config_change_callback: Callable[[dict[str, typing.Any]], None] | None = None, validate: bool = False)
48    def __init__(
49        self,
50        executor: Executor,
51        name: str,
52        config: dict[str, Any] | None = None,
53        *,
54        config_change_callback: ConfigChangeCallback | None = None,
55        validate: bool = False,
56    ) -> None:
57        """Initialize the source.
58
59        If config is provided, it will be validated against the spec if validate is True.
60        """
61        super().__init__(
62            executor=executor,
63            name=name,
64            config=config,
65            config_change_callback=config_change_callback,
66            validate=validate,
67        )

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

connector_type: Literal['destination'] = 'destination'
def write( self, source_data: airbyte.Source | airbyte.ReadResult, *, streams: Union[list[str], Literal['*'], NoneType] = None, cache: Union[airbyte.caches.CacheBase, NoneType, Literal[False]] = None, state_cache: Union[airbyte.caches.CacheBase, NoneType, Literal[False]] = None, write_strategy: airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False) -> airbyte.WriteResult:
 69    def write(  # noqa: PLR0912, PLR0915 # Too many arguments/statements
 70        self,
 71        source_data: Source | ReadResult,
 72        *,
 73        streams: list[str] | Literal["*"] | None = None,
 74        cache: CacheBase | None | Literal[False] = None,
 75        state_cache: CacheBase | None | Literal[False] = None,
 76        write_strategy: WriteStrategy = WriteStrategy.AUTO,
 77        force_full_refresh: bool = False,
 78    ) -> WriteResult:
 79        """Write data from source connector or already cached source data.
 80
 81        Caching is enabled by default, unless explicitly disabled.
 82
 83        Args:
 84            source_data: The source data to write. Can be a `Source` or a `ReadResult` object.
 85            streams: The streams to write to the destination. If omitted or if "*" is provided,
 86                all streams will be written. If `source_data` is a source, then streams must be
 87                selected here or on the source. If both are specified, this setting will override
 88                the stream selection on the source.
 89            cache: The cache to use for reading source_data. If `None`, no cache will be used. If
 90                False, the cache will be disabled. This must be `None` if `source_data` is already
 91                a `Cache` object.
 92            state_cache: A cache to use for storing incremental state. You do not need to set this
 93                if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to
 94                disable state management.
 95            write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector
 96                will decide the best strategy to use.
 97            force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any
 98                existing state will be ignored and all source data will be reloaded.
 99
100        For incremental syncs, `cache` or `state_cache` will be checked for matching state values.
101        If the cache has tracked state, this will be used for the sync. Otherwise, if there is
102        a known destination state, the destination-specific state will be used. If neither are
103        available, a full refresh will be performed.
104        """
105        if not isinstance(source_data, ReadResult | Source):
106            raise exc.PyAirbyteInputError(
107                message="Invalid source_data type for `source_data` arg.",
108                context={
109                    "source_data_type_provided": type(source_data).__name__,
110                },
111            )
112
113        # Resolve `source`, `read_result`, and `source_name`
114        source: Source | None = source_data if isinstance(source_data, Source) else None
115        read_result: ReadResult | None = (
116            source_data if isinstance(source_data, ReadResult) else None
117        )
118        source_name: str = source.name if source else cast(ReadResult, read_result).source_name
119
120        # State providers and writers default to no-op, unless overridden below.
121        cache_state_provider: StateProviderBase = StaticInputState([])
122        """Provides the state of the cache's data."""
123        cache_state_writer: StateWriterBase = NoOpStateWriter()
124        """Writes updates for the state of the cache's data."""
125        destination_state_provider: StateProviderBase = StaticInputState([])
126        """Provides the state of the destination's data, from `cache` or `state_cache`."""
127        destination_state_writer: StateWriterBase = NoOpStateWriter()
128        """Writes updates for the state of the destination's data, to `cache` or `state_cache`."""
129
130        # If caching not explicitly disabled
131        if cache is not False:
132            # Resolve `cache`, `cache_state_provider`, and `cache_state_writer`
133            if isinstance(source_data, ReadResult):
134                cache = source_data.cache
135
136            cache = cache or get_default_cache()
137            cache_state_provider = cache.get_state_provider(
138                source_name=source_name,
139                destination_name=None,  # This will just track the cache state
140            )
141            cache_state_writer = cache.get_state_writer(
142                source_name=source_name,
143                destination_name=None,  # This will just track the cache state
144            )
145
146        # Resolve `state_cache`
147        if state_cache is None:
148            state_cache = cache or get_default_cache()
149
150        # Resolve `destination_state_writer` and `destination_state_provider`
151        if state_cache:
152            destination_state_writer = state_cache.get_state_writer(
153                source_name=source_name,
154                destination_name=self.name,
155            )
156            if not force_full_refresh:
157                destination_state_provider = state_cache.get_state_provider(
158                    source_name=source_name,
159                    destination_name=self.name,
160                )
161        elif state_cache is not False:
162            warnings.warn(
163                "No state backend or cache provided. State will not be tracked."
164                "To track state, provide a cache or state backend."
165                "To silence this warning, set `state_cache=False` explicitly.",
166                category=exc.PyAirbyteWarning,
167                stacklevel=2,
168            )
169
170        # Resolve `catalog_provider`
171        if source:
172            catalog_provider = CatalogProvider(
173                configured_catalog=source.get_configured_catalog(
174                    streams=streams,
175                )
176            )
177        elif read_result:
178            catalog_provider = CatalogProvider.from_read_result(read_result)
179        else:
180            raise exc.PyAirbyteInternalError(
181                message="`source_data` must be a `Source` or `ReadResult` object.",
182            )
183
184        progress_tracker = ProgressTracker(
185            source=source if isinstance(source_data, Source) else None,
186            cache=cache or None,
187            destination=self,
188            expected_streams=catalog_provider.stream_names,
189        )
190
191        source_state_provider: StateProviderBase
192        source_state_provider = JoinedStateProvider(
193            primary=cache_state_provider,
194            secondary=destination_state_provider,
195        )
196
197        if source:
198            if cache is False:
199                # Get message iterator for source (caching disabled)
200                message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator(  # noqa: SLF001 # Non-public API
201                    streams=streams,
202                    state_provider=source_state_provider,
203                    progress_tracker=progress_tracker,
204                    force_full_refresh=force_full_refresh,
205                )
206            else:
207                # Caching enabled and we are reading from a source.
208                # Read the data to cache if caching is enabled.
209                read_result = source._read_to_cache(  # noqa: SLF001  # Non-public API
210                    cache=cache,
211                    state_provider=source_state_provider,
212                    state_writer=cache_state_writer,
213                    catalog_provider=catalog_provider,
214                    stream_names=catalog_provider.stream_names,
215                    write_strategy=write_strategy,
216                    force_full_refresh=force_full_refresh,
217                    skip_validation=False,
218                    progress_tracker=progress_tracker,
219                )
220                message_iterator = AirbyteMessageIterator.from_read_result(
221                    read_result=read_result,
222                )
223        else:  # Else we are reading from a read result
224            assert read_result is not None
225            message_iterator = AirbyteMessageIterator.from_read_result(
226                read_result=read_result,
227            )
228
229        # Write the data to the destination
230        try:
231            self._write_airbyte_message_stream(
232                stdin=message_iterator,
233                catalog_provider=catalog_provider,
234                write_strategy=write_strategy,
235                state_writer=destination_state_writer,
236                progress_tracker=progress_tracker,
237            )
238        except Exception as ex:
239            progress_tracker.log_failure(exception=ex)
240            raise
241        else:
242            # No exceptions were raised, so log success
243            progress_tracker.log_success()
244
245        return WriteResult(
246            destination=self,
247            source_data=source_data,
248            catalog_provider=catalog_provider,
249            state_writer=destination_state_writer,
250            progress_tracker=progress_tracker,
251        )

Write data from source connector or already cached source data.

Caching is enabled by default, unless explicitly disabled.

Arguments:
  • source_data: The source data to write. Can be a Source or a ReadResult object.
  • streams: The streams to write to the destination. If omitted or if "*" is provided, all streams will be written. If source_data is a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source.
  • cache: The cache to use for reading source_data. If None, no cache will be used. If False, the cache will be disabled. This must be None if source_data is already a Cache object.
  • state_cache: A cache to use for storing incremental state. You do not need to set this if cache is specified or if source_data is a Cache object. Set to False to disable state management.
  • write_strategy: The strategy to use for writing source_data. If AUTO, the connector will decide the best strategy to use.
  • force_full_refresh: Whether to force a full refresh of the source_data. If True, any existing state will be ignored and all source data will be reloaded.

For incremental syncs, cache or state_cache will be checked for matching state values. If the cache has tracked state, this will be used for the sync. Otherwise, if there is a known destination state, the destination-specific state will be used. If neither are available, a full refresh will be performed.

Inherited Members
airbyte._connector_base.ConnectorBase
config_change_callback
executor
name
set_config
get_config
config_hash
validate_config
config_spec
print_config_spec
docs_url
connector_version
check
install
uninstall