airbyte.destinations
Destinations module.
This module contains classes and methods for interacting with Airbyte destinations. You can use this module to create custom destinations, or to interact with existing destinations.
Getting Started
To get started with destinations, you can use the get_destination()
method to create a destination
object. This method takes a destination name and configuration, and returns a destination object
that you can use to write data to the destination.
import airbyte as ab
my_destination = ab.get_destination(
"destination-foo",
config={"api_key": "my_api_key"},
docker_image=True,
)
Writing Data to a Destination
To write data to a destination, you can use the Destination.write()
method. This method
takes either a airbyte.Source
or airbyte.ReadResult
object.
Writing to a destination from a source
To write directly from a source, simply pass the source object to the Destination.write()
method:
my_source = get_source(...)
my_destination = get_destination(...)
my_destination.write(source_faker)
Writing from a read result:
To write from a read result, you can use the following pattern. First, read data from the source,
then write the data to the destination, using the ReadResult
object as a buffer between the source
and destination:
# First read data from the source:
my_source = get_source(...)
read_result = my_source.read(...)
# Optionally, you can validate data before writing it:
# ...misc validation code here...
# Then write the data to the destination:
my_destination.write(read_result)
Using Docker and Python-based Connectors
By default, the get_destination()
method will look for a Python-based connector. If you want to
use a Docker-based connector, you can set the docker_image
parameter to True
:
my_destination = ab.get_destination(
"destination-foo",
config={"api_key": "my_api_key"},
docker_image=True,
)
Note: Unlike source connectors, most destination connectors are written in Java, and for this
reason are only available as Docker-based connectors. If you need to load to a SQL database and your
runtime does not support docker, you may want to use the airbyte.caches
module to load data to
a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally
to PyAirbyte so they can run anywhere that PyAirbyte can run.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Destinations module. 3 4This module contains classes and methods for interacting with Airbyte destinations. You can use this 5module to create custom destinations, or to interact with existing destinations. 6 7## Getting Started 8 9To get started with destinations, you can use the `get_destination()` method to create a destination 10object. This method takes a destination name and configuration, and returns a destination object 11that you can use to write data to the destination. 12 13```python 14import airbyte as ab 15 16my_destination = ab.get_destination( 17 "destination-foo", 18 config={"api_key": "my_api_key"}, 19 docker_image=True, 20) 21``` 22 23## Writing Data to a Destination 24 25To write data to a destination, you can use the `Destination.write()` method. This method 26takes either a `airbyte.Source` or `airbyte.ReadResult` object. 27 28## Writing to a destination from a source 29 30To write directly from a source, simply pass the source object to the `Destination.write()` method: 31 32```python 33my_source = get_source(...) 34my_destination = get_destination(...) 35my_destination.write(source_faker) 36``` 37 38## Writing from a read result: 39 40To write from a read result, you can use the following pattern. First, read data from the source, 41then write the data to the destination, using the `ReadResult` object as a buffer between the source 42and destination: 43 44```python 45# First read data from the source: 46my_source = get_source(...) 47read_result = my_source.read(...) 48 49# Optionally, you can validate data before writing it: 50# ...misc validation code here... 51 52# Then write the data to the destination: 53my_destination.write(read_result) 54``` 55 56## Using Docker and Python-based Connectors 57 58By default, the `get_destination()` method will look for a Python-based connector. If you want to 59use a Docker-based connector, you can set the `docker_image` parameter to `True`: 60 61```python 62my_destination = ab.get_destination( 63 "destination-foo", 64 config={"api_key": "my_api_key"}, 65 docker_image=True, 66) 67``` 68 69**Note:** Unlike source connectors, most destination connectors are written in Java, and for this 70reason are only available as Docker-based connectors. If you need to load to a SQL database and your 71runtime does not support docker, you may want to use the `airbyte.caches` module to load data to 72a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally 73to PyAirbyte so they can run anywhere that PyAirbyte can run. 74""" 75 76from __future__ import annotations 77 78from typing import TYPE_CHECKING 79 80from airbyte.destinations.base import Destination 81from airbyte.destinations.util import ( 82 get_destination, 83 get_noop_destination, 84) 85 86 87# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 88if TYPE_CHECKING: 89 # ruff: noqa: TC004 # imports used for more than type checking 90 from airbyte.destinations import util 91 92 93__all__ = [ 94 # Modules 95 "util", 96 # Methods 97 "get_destination", 98 "get_noop_destination", 99 # Classes 100 "Destination", 101]
22def get_destination( # noqa: PLR0913 # Too many arguments 23 name: str, 24 config: dict[str, Any] | None = None, 25 *, 26 config_change_callback: ConfigChangeCallback | None = None, 27 version: str | None = None, 28 pip_url: str | None = None, 29 local_executable: Path | str | None = None, 30 docker_image: str | bool | None = None, 31 use_host_network: bool = False, 32 install_if_missing: bool = True, 33) -> Destination: 34 """Get a connector by name and version. 35 36 Args: 37 name: connector name 38 config: connector config - if not provided, you need to set it later via the set_config 39 method. 40 config_change_callback: callback function to be called when the connector config changes. 41 streams: list of stream names to select for reading. If set to "*", all streams will be 42 selected. If not provided, you can set it later via the `select_streams()` or 43 `select_all_streams()` method. 44 version: connector version - if not provided, the currently installed version will be used. 45 If no version is installed, the latest available version will be used. The version can 46 also be set to "latest" to force the use of the latest available version. 47 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 48 connector name. 49 local_executable: If set, the connector will be assumed to already be installed and will be 50 executed using this path or executable name. Otherwise, the connector will be installed 51 automatically in a virtual environment. 52 docker_image: If set, the connector will be executed using Docker. You can specify `True` 53 to use the default image for the connector, or you can specify a custom image name. 54 If `version` is specified and your image name does not already contain a tag 55 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 56 use_host_network: If set, along with docker_image, the connector will be executed using 57 the host network. This is useful for connectors that need to access resources on 58 the host machine, such as a local database. This parameter is ignored when 59 `docker_image` is not set. 60 install_if_missing: Whether to install the connector if it is not available locally. This 61 parameter is ignored when local_executable is set. 62 """ 63 return Destination( 64 name=name, 65 config=config, 66 config_change_callback=config_change_callback, 67 executor=get_connector_executor( 68 name=name, 69 version=version, 70 pip_url=pip_url, 71 local_executable=local_executable, 72 docker_image=docker_image, 73 use_host_network=use_host_network, 74 install_if_missing=install_if_missing, 75 ), 76 )
Get a connector by name and version.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
True
to use the default image for the connector, or you can specify a custom image name. Ifversion
is specified and your image name does not already contain a tag (e.g.my-image:latest
), the version will be appended as a tag (e.g.my-image:0.1.0
). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_image
is not set. - install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
79def get_noop_destination( 80 *, 81 install_if_missing: bool = True, 82) -> Destination: 83 """Get a devnull (no-op) destination. 84 85 This is useful for performance benchmarking of sources, without 86 adding the overhead of writing data to a real destination. 87 """ 88 return get_destination( 89 "destination-dev-null", 90 config={ 91 "test_destination": { 92 "test_destination_type": "LOGGING", 93 "logging_config": { 94 "logging_type": "FirstN", 95 "max_entry_count": 100, 96 }, 97 } 98 }, 99 docker_image=True, 100 install_if_missing=install_if_missing, 101 )
Get a devnull (no-op) destination.
This is useful for performance benchmarking of sources, without adding the overhead of writing data to a real destination.
39class Destination(ConnectorBase, AirbyteWriterInterface): 40 """A class representing a destination that can be called.""" 41 42 connector_type = "destination" 43 44 def __init__( 45 self, 46 executor: Executor, 47 name: str, 48 config: dict[str, Any] | None = None, 49 *, 50 config_change_callback: ConfigChangeCallback | None = None, 51 validate: bool = False, 52 ) -> None: 53 """Initialize the source. 54 55 If config is provided, it will be validated against the spec if validate is True. 56 """ 57 super().__init__( 58 executor=executor, 59 name=name, 60 config=config, 61 config_change_callback=config_change_callback, 62 validate=validate, 63 ) 64 65 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 66 self, 67 source_data: Source | ReadResult, 68 *, 69 streams: list[str] | Literal["*"] | None = None, 70 cache: CacheBase | Literal[False] | None = None, 71 state_cache: CacheBase | Literal[False] | None = None, 72 write_strategy: WriteStrategy = WriteStrategy.AUTO, 73 force_full_refresh: bool = False, 74 ) -> WriteResult: 75 """Write data from source connector or already cached source data. 76 77 Caching is enabled by default, unless explicitly disabled. 78 79 Args: 80 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 81 streams: The streams to write to the destination. If omitted or if "*" is provided, 82 all streams will be written. If `source_data` is a source, then streams must be 83 selected here or on the source. If both are specified, this setting will override 84 the stream selection on the source. 85 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 86 False, the cache will be disabled. This must be `None` if `source_data` is already 87 a `Cache` object. 88 state_cache: A cache to use for storing incremental state. You do not need to set this 89 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 90 disable state management. 91 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 92 will decide the best strategy to use. 93 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 94 existing state will be ignored and all source data will be reloaded. 95 96 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 97 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 98 a known destination state, the destination-specific state will be used. If neither are 99 available, a full refresh will be performed. 100 """ 101 if not isinstance(source_data, ReadResult | Source): 102 raise exc.PyAirbyteInputError( 103 message="Invalid source_data type for `source_data` arg.", 104 context={ 105 "source_data_type_provided": type(source_data).__name__, 106 }, 107 ) 108 109 # Resolve `source`, `read_result`, and `source_name` 110 source: Source | None = source_data if isinstance(source_data, Source) else None 111 read_result: ReadResult | None = ( 112 source_data if isinstance(source_data, ReadResult) else None 113 ) 114 source_name: str = source.name if source else cast("ReadResult", read_result).source_name 115 116 # State providers and writers default to no-op, unless overridden below. 117 cache_state_provider: StateProviderBase = StaticInputState([]) 118 """Provides the state of the cache's data.""" 119 cache_state_writer: StateWriterBase = NoOpStateWriter() 120 """Writes updates for the state of the cache's data.""" 121 destination_state_provider: StateProviderBase = StaticInputState([]) 122 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 123 destination_state_writer: StateWriterBase = NoOpStateWriter() 124 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 125 126 # If caching not explicitly disabled 127 if cache is not False: 128 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 129 if isinstance(source_data, ReadResult): 130 cache = source_data.cache 131 132 cache = cache or get_default_cache() 133 cache_state_provider = cache.get_state_provider( 134 source_name=source_name, 135 destination_name=None, # This will just track the cache state 136 ) 137 cache_state_writer = cache.get_state_writer( 138 source_name=source_name, 139 destination_name=None, # This will just track the cache state 140 ) 141 142 # Resolve `state_cache` 143 if state_cache is None: 144 state_cache = cache or get_default_cache() 145 146 # Resolve `destination_state_writer` and `destination_state_provider` 147 if state_cache: 148 destination_state_writer = state_cache.get_state_writer( 149 source_name=source_name, 150 destination_name=self.name, 151 ) 152 if not force_full_refresh: 153 destination_state_provider = state_cache.get_state_provider( 154 source_name=source_name, 155 destination_name=self.name, 156 ) 157 elif state_cache is not False: 158 warnings.warn( 159 "No state backend or cache provided. State will not be tracked." 160 "To track state, provide a cache or state backend." 161 "To silence this warning, set `state_cache=False` explicitly.", 162 category=exc.PyAirbyteWarning, 163 stacklevel=2, 164 ) 165 166 # Resolve `catalog_provider` 167 if source: 168 catalog_provider = CatalogProvider( 169 configured_catalog=source.get_configured_catalog( 170 streams=streams, 171 ) 172 ) 173 elif read_result: 174 catalog_provider = CatalogProvider.from_read_result(read_result) 175 else: 176 raise exc.PyAirbyteInternalError( 177 message="`source_data` must be a `Source` or `ReadResult` object.", 178 ) 179 180 progress_tracker = ProgressTracker( 181 source=source if isinstance(source_data, Source) else None, 182 cache=cache or None, 183 destination=self, 184 expected_streams=catalog_provider.stream_names, 185 ) 186 187 source_state_provider: StateProviderBase 188 source_state_provider = JoinedStateProvider( 189 primary=cache_state_provider, 190 secondary=destination_state_provider, 191 ) 192 193 if source: 194 if cache is False: 195 # Get message iterator for source (caching disabled) 196 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 197 streams=streams, 198 state_provider=source_state_provider, 199 progress_tracker=progress_tracker, 200 force_full_refresh=force_full_refresh, 201 ) 202 else: 203 # Caching enabled and we are reading from a source. 204 # Read the data to cache if caching is enabled. 205 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 206 cache=cache, 207 state_provider=source_state_provider, 208 state_writer=cache_state_writer, 209 catalog_provider=catalog_provider, 210 stream_names=catalog_provider.stream_names, 211 write_strategy=write_strategy, 212 force_full_refresh=force_full_refresh, 213 skip_validation=False, 214 progress_tracker=progress_tracker, 215 ) 216 message_iterator = AirbyteMessageIterator.from_read_result( 217 read_result=read_result, 218 ) 219 else: # Else we are reading from a read result 220 assert read_result is not None 221 message_iterator = AirbyteMessageIterator.from_read_result( 222 read_result=read_result, 223 ) 224 225 # Write the data to the destination 226 try: 227 self._write_airbyte_message_stream( 228 stdin=message_iterator, 229 catalog_provider=catalog_provider, 230 write_strategy=write_strategy, 231 state_writer=destination_state_writer, 232 progress_tracker=progress_tracker, 233 ) 234 except Exception as ex: 235 progress_tracker.log_failure(exception=ex) 236 raise 237 else: 238 # No exceptions were raised, so log success 239 progress_tracker.log_success() 240 241 return WriteResult( 242 destination=self, 243 source_data=source_data, 244 catalog_provider=catalog_provider, 245 state_writer=destination_state_writer, 246 progress_tracker=progress_tracker, 247 ) 248 249 def _write_airbyte_message_stream( 250 self, 251 stdin: IO[str] | AirbyteMessageIterator, 252 *, 253 catalog_provider: CatalogProvider, 254 write_strategy: WriteStrategy, 255 state_writer: StateWriterBase | None = None, 256 progress_tracker: ProgressTracker, 257 ) -> None: 258 """Read from the connector and write to the cache.""" 259 # Run optional validation step 260 if state_writer is None: 261 state_writer = StdOutStateWriter() 262 263 # Apply the write strategy to the catalog provider before sending to the destination 264 catalog_provider = catalog_provider.with_write_strategy(write_strategy) 265 266 with as_temp_files( 267 files_contents=[ 268 self._config, 269 catalog_provider.configured_catalog.model_dump_json(), 270 ] 271 ) as [ 272 config_file, 273 catalog_file, 274 ]: 275 try: 276 # We call the connector to write the data, tallying the inputs and outputs 277 for destination_message in progress_tracker.tally_confirmed_writes( 278 messages=self._execute( 279 args=[ 280 "write", 281 "--config", 282 config_file, 283 "--catalog", 284 catalog_file, 285 ], 286 stdin=AirbyteMessageIterator( 287 progress_tracker.tally_pending_writes( 288 stdin, 289 ) 290 ), 291 ) 292 ): 293 if destination_message.state: 294 state_writer.write_state(state_message=destination_message.state) 295 296 except exc.AirbyteConnectorFailedError as ex: 297 raise exc.AirbyteConnectorWriteError( 298 connector_name=self.name, 299 log_text=self._last_log_messages, 300 original_exception=ex, 301 ) from None
A class representing a destination that can be called.
44 def __init__( 45 self, 46 executor: Executor, 47 name: str, 48 config: dict[str, Any] | None = None, 49 *, 50 config_change_callback: ConfigChangeCallback | None = None, 51 validate: bool = False, 52 ) -> None: 53 """Initialize the source. 54 55 If config is provided, it will be validated against the spec if validate is True. 56 """ 57 super().__init__( 58 executor=executor, 59 name=name, 60 config=config, 61 config_change_callback=config_change_callback, 62 validate=validate, 63 )
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
65 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 66 self, 67 source_data: Source | ReadResult, 68 *, 69 streams: list[str] | Literal["*"] | None = None, 70 cache: CacheBase | Literal[False] | None = None, 71 state_cache: CacheBase | Literal[False] | None = None, 72 write_strategy: WriteStrategy = WriteStrategy.AUTO, 73 force_full_refresh: bool = False, 74 ) -> WriteResult: 75 """Write data from source connector or already cached source data. 76 77 Caching is enabled by default, unless explicitly disabled. 78 79 Args: 80 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 81 streams: The streams to write to the destination. If omitted or if "*" is provided, 82 all streams will be written. If `source_data` is a source, then streams must be 83 selected here or on the source. If both are specified, this setting will override 84 the stream selection on the source. 85 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 86 False, the cache will be disabled. This must be `None` if `source_data` is already 87 a `Cache` object. 88 state_cache: A cache to use for storing incremental state. You do not need to set this 89 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 90 disable state management. 91 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 92 will decide the best strategy to use. 93 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 94 existing state will be ignored and all source data will be reloaded. 95 96 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 97 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 98 a known destination state, the destination-specific state will be used. If neither are 99 available, a full refresh will be performed. 100 """ 101 if not isinstance(source_data, ReadResult | Source): 102 raise exc.PyAirbyteInputError( 103 message="Invalid source_data type for `source_data` arg.", 104 context={ 105 "source_data_type_provided": type(source_data).__name__, 106 }, 107 ) 108 109 # Resolve `source`, `read_result`, and `source_name` 110 source: Source | None = source_data if isinstance(source_data, Source) else None 111 read_result: ReadResult | None = ( 112 source_data if isinstance(source_data, ReadResult) else None 113 ) 114 source_name: str = source.name if source else cast("ReadResult", read_result).source_name 115 116 # State providers and writers default to no-op, unless overridden below. 117 cache_state_provider: StateProviderBase = StaticInputState([]) 118 """Provides the state of the cache's data.""" 119 cache_state_writer: StateWriterBase = NoOpStateWriter() 120 """Writes updates for the state of the cache's data.""" 121 destination_state_provider: StateProviderBase = StaticInputState([]) 122 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 123 destination_state_writer: StateWriterBase = NoOpStateWriter() 124 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 125 126 # If caching not explicitly disabled 127 if cache is not False: 128 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 129 if isinstance(source_data, ReadResult): 130 cache = source_data.cache 131 132 cache = cache or get_default_cache() 133 cache_state_provider = cache.get_state_provider( 134 source_name=source_name, 135 destination_name=None, # This will just track the cache state 136 ) 137 cache_state_writer = cache.get_state_writer( 138 source_name=source_name, 139 destination_name=None, # This will just track the cache state 140 ) 141 142 # Resolve `state_cache` 143 if state_cache is None: 144 state_cache = cache or get_default_cache() 145 146 # Resolve `destination_state_writer` and `destination_state_provider` 147 if state_cache: 148 destination_state_writer = state_cache.get_state_writer( 149 source_name=source_name, 150 destination_name=self.name, 151 ) 152 if not force_full_refresh: 153 destination_state_provider = state_cache.get_state_provider( 154 source_name=source_name, 155 destination_name=self.name, 156 ) 157 elif state_cache is not False: 158 warnings.warn( 159 "No state backend or cache provided. State will not be tracked." 160 "To track state, provide a cache or state backend." 161 "To silence this warning, set `state_cache=False` explicitly.", 162 category=exc.PyAirbyteWarning, 163 stacklevel=2, 164 ) 165 166 # Resolve `catalog_provider` 167 if source: 168 catalog_provider = CatalogProvider( 169 configured_catalog=source.get_configured_catalog( 170 streams=streams, 171 ) 172 ) 173 elif read_result: 174 catalog_provider = CatalogProvider.from_read_result(read_result) 175 else: 176 raise exc.PyAirbyteInternalError( 177 message="`source_data` must be a `Source` or `ReadResult` object.", 178 ) 179 180 progress_tracker = ProgressTracker( 181 source=source if isinstance(source_data, Source) else None, 182 cache=cache or None, 183 destination=self, 184 expected_streams=catalog_provider.stream_names, 185 ) 186 187 source_state_provider: StateProviderBase 188 source_state_provider = JoinedStateProvider( 189 primary=cache_state_provider, 190 secondary=destination_state_provider, 191 ) 192 193 if source: 194 if cache is False: 195 # Get message iterator for source (caching disabled) 196 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 197 streams=streams, 198 state_provider=source_state_provider, 199 progress_tracker=progress_tracker, 200 force_full_refresh=force_full_refresh, 201 ) 202 else: 203 # Caching enabled and we are reading from a source. 204 # Read the data to cache if caching is enabled. 205 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 206 cache=cache, 207 state_provider=source_state_provider, 208 state_writer=cache_state_writer, 209 catalog_provider=catalog_provider, 210 stream_names=catalog_provider.stream_names, 211 write_strategy=write_strategy, 212 force_full_refresh=force_full_refresh, 213 skip_validation=False, 214 progress_tracker=progress_tracker, 215 ) 216 message_iterator = AirbyteMessageIterator.from_read_result( 217 read_result=read_result, 218 ) 219 else: # Else we are reading from a read result 220 assert read_result is not None 221 message_iterator = AirbyteMessageIterator.from_read_result( 222 read_result=read_result, 223 ) 224 225 # Write the data to the destination 226 try: 227 self._write_airbyte_message_stream( 228 stdin=message_iterator, 229 catalog_provider=catalog_provider, 230 write_strategy=write_strategy, 231 state_writer=destination_state_writer, 232 progress_tracker=progress_tracker, 233 ) 234 except Exception as ex: 235 progress_tracker.log_failure(exception=ex) 236 raise 237 else: 238 # No exceptions were raised, so log success 239 progress_tracker.log_success() 240 241 return WriteResult( 242 destination=self, 243 source_data=source_data, 244 catalog_provider=catalog_provider, 245 state_writer=destination_state_writer, 246 progress_tracker=progress_tracker, 247 )
Write data from source connector or already cached source data.
Caching is enabled by default, unless explicitly disabled.
Arguments:
- source_data: The source data to write. Can be a
Source
or aReadResult
object. - streams: The streams to write to the destination. If omitted or if "*" is provided,
all streams will be written. If
source_data
is a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source. - cache: The cache to use for reading source_data. If
None
, no cache will be used. If False, the cache will be disabled. This must beNone
ifsource_data
is already aCache
object. - state_cache: A cache to use for storing incremental state. You do not need to set this
if
cache
is specified or ifsource_data
is aCache
object. Set toFalse
to disable state management. - write_strategy: The strategy to use for writing source_data. If
AUTO
, the connector will decide the best strategy to use. - force_full_refresh: Whether to force a full refresh of the source_data. If
True
, any existing state will be ignored and all source data will be reloaded.
For incremental syncs, cache
or state_cache
will be checked for matching state values.
If the cache has tracked state, this will be used for the sync. Otherwise, if there is
a known destination state, the destination-specific state will be used. If neither are
available, a full refresh will be performed.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- set_config
- get_config
- config_hash
- validate_config
- config_spec
- print_config_spec
- docs_url
- connector_version
- check
- install
- uninstall