airbyte.destinations
Destinations module.
This module contains classes and methods for interacting with Airbyte destinations. You can use this module to create custom destinations, or to interact with existing destinations.
Getting Started
To get started with destinations, you can use the get_destination()
method to create a destination
object. This method takes a destination name and configuration, and returns a destination object
that you can use to write data to the destination.
import airbyte as ab
my_destination = ab.get_destination(
"destination-foo",
config={"api_key": "my_api_key"},
docker_image=True,
)
Writing Data to a Destination
To write data to a destination, you can use the Destination.write()
method. This method
takes either a airbyte.Source
or airbyte.ReadResult
object.
Writing to a destination from a source
To write directly from a source, simply pass the source object to the Destination.write()
method:
my_source = get_source(...)
my_destination = get_destination(...)
my_destination.write(source_faker)
Writing from a read result:
To write from a read result, you can use the following pattern. First, read data from the source,
then write the data to the destination, using the ReadResult
object as a buffer between the source
and destination:
# First read data from the source:
my_source = get_source(...)
read_result = my_source.read(...)
# Optionally, you can validate data before writing it:
# ...misc validation code here...
# Then write the data to the destination:
my_destination.write(read_result)
Using Docker and Python-based Connectors
By default, the get_destination()
method will look for a Python-based connector. If you want to
use a Docker-based connector, you can set the docker_image
parameter to True
:
my_destination = ab.get_destination(
"destination-foo",
config={"api_key": "my_api_key"},
docker_image=True,
)
Note: Unlike source connectors, most destination connectors are written in Java, and for this
reason are only available as Docker-based connectors. If you need to load to a SQL database and your
runtime does not support docker, you may want to use the airbyte.caches
module to load data to
a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally
to PyAirbyte so they can run anywhere that PyAirbyte can run.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Destinations module. 3 4This module contains classes and methods for interacting with Airbyte destinations. You can use this 5module to create custom destinations, or to interact with existing destinations. 6 7## Getting Started 8 9To get started with destinations, you can use the `get_destination()` method to create a destination 10object. This method takes a destination name and configuration, and returns a destination object 11that you can use to write data to the destination. 12 13```python 14import airbyte as ab 15 16my_destination = ab.get_destination( 17 "destination-foo", 18 config={"api_key": "my_api_key"}, 19 docker_image=True, 20) 21``` 22 23## Writing Data to a Destination 24 25To write data to a destination, you can use the `Destination.write()` method. This method 26takes either a `airbyte.Source` or `airbyte.ReadResult` object. 27 28## Writing to a destination from a source 29 30To write directly from a source, simply pass the source object to the `Destination.write()` method: 31 32```python 33my_source = get_source(...) 34my_destination = get_destination(...) 35my_destination.write(source_faker) 36``` 37 38## Writing from a read result: 39 40To write from a read result, you can use the following pattern. First, read data from the source, 41then write the data to the destination, using the `ReadResult` object as a buffer between the source 42and destination: 43 44```python 45# First read data from the source: 46my_source = get_source(...) 47read_result = my_source.read(...) 48 49# Optionally, you can validate data before writing it: 50# ...misc validation code here... 51 52# Then write the data to the destination: 53my_destination.write(read_result) 54``` 55 56## Using Docker and Python-based Connectors 57 58By default, the `get_destination()` method will look for a Python-based connector. If you want to 59use a Docker-based connector, you can set the `docker_image` parameter to `True`: 60 61```python 62my_destination = ab.get_destination( 63 "destination-foo", 64 config={"api_key": "my_api_key"}, 65 docker_image=True, 66) 67``` 68 69**Note:** Unlike source connectors, most destination connectors are written in Java, and for this 70reason are only available as Docker-based connectors. If you need to load to a SQL database and your 71runtime does not support docker, you may want to use the `airbyte.caches` module to load data to 72a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally 73to PyAirbyte so they can run anywhere that PyAirbyte can run. 74""" 75 76from __future__ import annotations 77 78from airbyte.destinations import util 79from airbyte.destinations.base import Destination 80from airbyte.destinations.util import ( 81 get_destination, 82 get_noop_destination, 83) 84 85 86__all__ = [ 87 # Modules 88 "util", 89 # Methods 90 "get_destination", 91 "get_noop_destination", 92 # Classes 93 "Destination", 94]
22def get_destination( # noqa: PLR0913 # Too many arguments 23 name: str, 24 config: dict[str, Any] | None = None, 25 *, 26 config_change_callback: ConfigChangeCallback | None = None, 27 version: str | None = None, 28 pip_url: str | None = None, 29 local_executable: Path | str | None = None, 30 docker_image: str | bool | None = None, 31 use_host_network: bool = False, 32 install_if_missing: bool = True, 33) -> Destination: 34 """Get a connector by name and version. 35 36 Args: 37 name: connector name 38 config: connector config - if not provided, you need to set it later via the set_config 39 method. 40 config_change_callback: callback function to be called when the connector config changes. 41 streams: list of stream names to select for reading. If set to "*", all streams will be 42 selected. If not provided, you can set it later via the `select_streams()` or 43 `select_all_streams()` method. 44 version: connector version - if not provided, the currently installed version will be used. 45 If no version is installed, the latest available version will be used. The version can 46 also be set to "latest" to force the use of the latest available version. 47 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 48 connector name. 49 local_executable: If set, the connector will be assumed to already be installed and will be 50 executed using this path or executable name. Otherwise, the connector will be installed 51 automatically in a virtual environment. 52 docker_image: If set, the connector will be executed using Docker. You can specify `True` 53 to use the default image for the connector, or you can specify a custom image name. 54 If `version` is specified and your image name does not already contain a tag 55 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 56 use_host_network: If set, along with docker_image, the connector will be executed using 57 the host network. This is useful for connectors that need to access resources on 58 the host machine, such as a local database. This parameter is ignored when 59 `docker_image` is not set. 60 install_if_missing: Whether to install the connector if it is not available locally. This 61 parameter is ignored when local_executable is set. 62 """ 63 return Destination( 64 name=name, 65 config=config, 66 config_change_callback=config_change_callback, 67 executor=get_connector_executor( 68 name=name, 69 version=version, 70 pip_url=pip_url, 71 local_executable=local_executable, 72 docker_image=docker_image, 73 use_host_network=use_host_network, 74 install_if_missing=install_if_missing, 75 ), 76 )
Get a connector by name and version.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()
orselect_all_streams()
method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
True
to use the default image for the connector, or you can specify a custom image name. Ifversion
is specified and your image name does not already contain a tag (e.g.my-image:latest
), the version will be appended as a tag (e.g.my-image:0.1.0
). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_image
is not set. - install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
79def get_noop_destination() -> Destination: 80 """Get a devnull (no-op) destination. 81 82 This is useful for performance benchmarking of sources, without 83 adding the overhead of writing data to a real destination. 84 """ 85 return get_destination( 86 "destination-dev-null", 87 config={ 88 "test_destination": { 89 "test_destination_type": "LOGGING", 90 "logging_config": { 91 "logging_type": "FirstN", 92 "max_entry_count": 100, 93 }, 94 } 95 }, 96 docker_image=True, 97 )
Get a devnull (no-op) destination.
This is useful for performance benchmarking of sources, without adding the overhead of writing data to a real destination.
43class Destination(ConnectorBase, AirbyteWriterInterface): 44 """A class representing a destination that can be called.""" 45 46 connector_type: Literal["destination"] = "destination" 47 48 def __init__( 49 self, 50 executor: Executor, 51 name: str, 52 config: dict[str, Any] | None = None, 53 *, 54 config_change_callback: ConfigChangeCallback | None = None, 55 validate: bool = False, 56 ) -> None: 57 """Initialize the source. 58 59 If config is provided, it will be validated against the spec if validate is True. 60 """ 61 super().__init__( 62 executor=executor, 63 name=name, 64 config=config, 65 config_change_callback=config_change_callback, 66 validate=validate, 67 ) 68 69 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 70 self, 71 source_data: Source | ReadResult, 72 *, 73 streams: list[str] | Literal["*"] | None = None, 74 cache: CacheBase | None | Literal[False] = None, 75 state_cache: CacheBase | None | Literal[False] = None, 76 write_strategy: WriteStrategy = WriteStrategy.AUTO, 77 force_full_refresh: bool = False, 78 ) -> WriteResult: 79 """Write data from source connector or already cached source data. 80 81 Caching is enabled by default, unless explicitly disabled. 82 83 Args: 84 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 85 streams: The streams to write to the destination. If omitted or if "*" is provided, 86 all streams will be written. If `source_data` is a source, then streams must be 87 selected here or on the source. If both are specified, this setting will override 88 the stream selection on the source. 89 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 90 False, the cache will be disabled. This must be `None` if `source_data` is already 91 a `Cache` object. 92 state_cache: A cache to use for storing incremental state. You do not need to set this 93 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 94 disable state management. 95 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 96 will decide the best strategy to use. 97 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 98 existing state will be ignored and all source data will be reloaded. 99 100 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 101 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 102 a known destination state, the destination-specific state will be used. If neither are 103 available, a full refresh will be performed. 104 """ 105 if not isinstance(source_data, ReadResult | Source): 106 raise exc.PyAirbyteInputError( 107 message="Invalid source_data type for `source_data` arg.", 108 context={ 109 "source_data_type_provided": type(source_data).__name__, 110 }, 111 ) 112 113 # Resolve `source`, `read_result`, and `source_name` 114 source: Source | None = source_data if isinstance(source_data, Source) else None 115 read_result: ReadResult | None = ( 116 source_data if isinstance(source_data, ReadResult) else None 117 ) 118 source_name: str = source.name if source else cast(ReadResult, read_result).source_name 119 120 # State providers and writers default to no-op, unless overridden below. 121 cache_state_provider: StateProviderBase = StaticInputState([]) 122 """Provides the state of the cache's data.""" 123 cache_state_writer: StateWriterBase = NoOpStateWriter() 124 """Writes updates for the state of the cache's data.""" 125 destination_state_provider: StateProviderBase = StaticInputState([]) 126 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 127 destination_state_writer: StateWriterBase = NoOpStateWriter() 128 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 129 130 # If caching not explicitly disabled 131 if cache is not False: 132 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 133 if isinstance(source_data, ReadResult): 134 cache = source_data.cache 135 136 cache = cache or get_default_cache() 137 cache_state_provider = cache.get_state_provider( 138 source_name=source_name, 139 destination_name=None, # This will just track the cache state 140 ) 141 cache_state_writer = cache.get_state_writer( 142 source_name=source_name, 143 destination_name=None, # This will just track the cache state 144 ) 145 146 # Resolve `state_cache` 147 if state_cache is None: 148 state_cache = cache or get_default_cache() 149 150 # Resolve `destination_state_writer` and `destination_state_provider` 151 if state_cache: 152 destination_state_writer = state_cache.get_state_writer( 153 source_name=source_name, 154 destination_name=self.name, 155 ) 156 if not force_full_refresh: 157 destination_state_provider = state_cache.get_state_provider( 158 source_name=source_name, 159 destination_name=self.name, 160 ) 161 elif state_cache is not False: 162 warnings.warn( 163 "No state backend or cache provided. State will not be tracked." 164 "To track state, provide a cache or state backend." 165 "To silence this warning, set `state_cache=False` explicitly.", 166 category=exc.PyAirbyteWarning, 167 stacklevel=2, 168 ) 169 170 # Resolve `catalog_provider` 171 if source: 172 catalog_provider = CatalogProvider( 173 configured_catalog=source.get_configured_catalog( 174 streams=streams, 175 ) 176 ) 177 elif read_result: 178 catalog_provider = CatalogProvider.from_read_result(read_result) 179 else: 180 raise exc.PyAirbyteInternalError( 181 message="`source_data` must be a `Source` or `ReadResult` object.", 182 ) 183 184 progress_tracker = ProgressTracker( 185 source=source if isinstance(source_data, Source) else None, 186 cache=cache or None, 187 destination=self, 188 expected_streams=catalog_provider.stream_names, 189 ) 190 191 source_state_provider: StateProviderBase 192 source_state_provider = JoinedStateProvider( 193 primary=cache_state_provider, 194 secondary=destination_state_provider, 195 ) 196 197 if source: 198 if cache is False: 199 # Get message iterator for source (caching disabled) 200 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 201 streams=streams, 202 state_provider=source_state_provider, 203 progress_tracker=progress_tracker, 204 force_full_refresh=force_full_refresh, 205 ) 206 else: 207 # Caching enabled and we are reading from a source. 208 # Read the data to cache if caching is enabled. 209 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 210 cache=cache, 211 state_provider=source_state_provider, 212 state_writer=cache_state_writer, 213 catalog_provider=catalog_provider, 214 stream_names=catalog_provider.stream_names, 215 write_strategy=write_strategy, 216 force_full_refresh=force_full_refresh, 217 skip_validation=False, 218 progress_tracker=progress_tracker, 219 ) 220 message_iterator = AirbyteMessageIterator.from_read_result( 221 read_result=read_result, 222 ) 223 else: # Else we are reading from a read result 224 assert read_result is not None 225 message_iterator = AirbyteMessageIterator.from_read_result( 226 read_result=read_result, 227 ) 228 229 # Write the data to the destination 230 try: 231 self._write_airbyte_message_stream( 232 stdin=message_iterator, 233 catalog_provider=catalog_provider, 234 write_strategy=write_strategy, 235 state_writer=destination_state_writer, 236 progress_tracker=progress_tracker, 237 ) 238 except Exception as ex: 239 progress_tracker.log_failure(exception=ex) 240 raise 241 else: 242 # No exceptions were raised, so log success 243 progress_tracker.log_success() 244 245 return WriteResult( 246 destination=self, 247 source_data=source_data, 248 catalog_provider=catalog_provider, 249 state_writer=destination_state_writer, 250 progress_tracker=progress_tracker, 251 ) 252 253 def _write_airbyte_message_stream( 254 self, 255 stdin: IO[str] | AirbyteMessageIterator, 256 *, 257 catalog_provider: CatalogProvider, 258 write_strategy: WriteStrategy, 259 state_writer: StateWriterBase | None = None, 260 progress_tracker: ProgressTracker, 261 ) -> None: 262 """Read from the connector and write to the cache.""" 263 # Run optional validation step 264 if state_writer is None: 265 state_writer = StdOutStateWriter() 266 267 # Apply the write strategy to the catalog provider before sending to the destination 268 catalog_provider = catalog_provider.with_write_strategy(write_strategy) 269 270 with as_temp_files( 271 files_contents=[ 272 self._config, 273 catalog_provider.configured_catalog.model_dump_json(), 274 ] 275 ) as [ 276 config_file, 277 catalog_file, 278 ]: 279 try: 280 # We call the connector to write the data, tallying the inputs and outputs 281 for destination_message in progress_tracker.tally_confirmed_writes( 282 messages=self._execute( 283 args=[ 284 "write", 285 "--config", 286 config_file, 287 "--catalog", 288 catalog_file, 289 ], 290 stdin=AirbyteMessageIterator( 291 progress_tracker.tally_pending_writes( 292 stdin, 293 ) 294 ), 295 ) 296 ): 297 if destination_message.type is Type.STATE: 298 state_writer.write_state(state_message=destination_message.state) 299 300 except exc.AirbyteConnectorFailedError as ex: 301 raise exc.AirbyteConnectorWriteError( 302 connector_name=self.name, 303 log_text=self._last_log_messages, 304 original_exception=ex, 305 ) from None
A class representing a destination that can be called.
48 def __init__( 49 self, 50 executor: Executor, 51 name: str, 52 config: dict[str, Any] | None = None, 53 *, 54 config_change_callback: ConfigChangeCallback | None = None, 55 validate: bool = False, 56 ) -> None: 57 """Initialize the source. 58 59 If config is provided, it will be validated against the spec if validate is True. 60 """ 61 super().__init__( 62 executor=executor, 63 name=name, 64 config=config, 65 config_change_callback=config_change_callback, 66 validate=validate, 67 )
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
69 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 70 self, 71 source_data: Source | ReadResult, 72 *, 73 streams: list[str] | Literal["*"] | None = None, 74 cache: CacheBase | None | Literal[False] = None, 75 state_cache: CacheBase | None | Literal[False] = None, 76 write_strategy: WriteStrategy = WriteStrategy.AUTO, 77 force_full_refresh: bool = False, 78 ) -> WriteResult: 79 """Write data from source connector or already cached source data. 80 81 Caching is enabled by default, unless explicitly disabled. 82 83 Args: 84 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 85 streams: The streams to write to the destination. If omitted or if "*" is provided, 86 all streams will be written. If `source_data` is a source, then streams must be 87 selected here or on the source. If both are specified, this setting will override 88 the stream selection on the source. 89 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 90 False, the cache will be disabled. This must be `None` if `source_data` is already 91 a `Cache` object. 92 state_cache: A cache to use for storing incremental state. You do not need to set this 93 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 94 disable state management. 95 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 96 will decide the best strategy to use. 97 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 98 existing state will be ignored and all source data will be reloaded. 99 100 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 101 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 102 a known destination state, the destination-specific state will be used. If neither are 103 available, a full refresh will be performed. 104 """ 105 if not isinstance(source_data, ReadResult | Source): 106 raise exc.PyAirbyteInputError( 107 message="Invalid source_data type for `source_data` arg.", 108 context={ 109 "source_data_type_provided": type(source_data).__name__, 110 }, 111 ) 112 113 # Resolve `source`, `read_result`, and `source_name` 114 source: Source | None = source_data if isinstance(source_data, Source) else None 115 read_result: ReadResult | None = ( 116 source_data if isinstance(source_data, ReadResult) else None 117 ) 118 source_name: str = source.name if source else cast(ReadResult, read_result).source_name 119 120 # State providers and writers default to no-op, unless overridden below. 121 cache_state_provider: StateProviderBase = StaticInputState([]) 122 """Provides the state of the cache's data.""" 123 cache_state_writer: StateWriterBase = NoOpStateWriter() 124 """Writes updates for the state of the cache's data.""" 125 destination_state_provider: StateProviderBase = StaticInputState([]) 126 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 127 destination_state_writer: StateWriterBase = NoOpStateWriter() 128 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 129 130 # If caching not explicitly disabled 131 if cache is not False: 132 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 133 if isinstance(source_data, ReadResult): 134 cache = source_data.cache 135 136 cache = cache or get_default_cache() 137 cache_state_provider = cache.get_state_provider( 138 source_name=source_name, 139 destination_name=None, # This will just track the cache state 140 ) 141 cache_state_writer = cache.get_state_writer( 142 source_name=source_name, 143 destination_name=None, # This will just track the cache state 144 ) 145 146 # Resolve `state_cache` 147 if state_cache is None: 148 state_cache = cache or get_default_cache() 149 150 # Resolve `destination_state_writer` and `destination_state_provider` 151 if state_cache: 152 destination_state_writer = state_cache.get_state_writer( 153 source_name=source_name, 154 destination_name=self.name, 155 ) 156 if not force_full_refresh: 157 destination_state_provider = state_cache.get_state_provider( 158 source_name=source_name, 159 destination_name=self.name, 160 ) 161 elif state_cache is not False: 162 warnings.warn( 163 "No state backend or cache provided. State will not be tracked." 164 "To track state, provide a cache or state backend." 165 "To silence this warning, set `state_cache=False` explicitly.", 166 category=exc.PyAirbyteWarning, 167 stacklevel=2, 168 ) 169 170 # Resolve `catalog_provider` 171 if source: 172 catalog_provider = CatalogProvider( 173 configured_catalog=source.get_configured_catalog( 174 streams=streams, 175 ) 176 ) 177 elif read_result: 178 catalog_provider = CatalogProvider.from_read_result(read_result) 179 else: 180 raise exc.PyAirbyteInternalError( 181 message="`source_data` must be a `Source` or `ReadResult` object.", 182 ) 183 184 progress_tracker = ProgressTracker( 185 source=source if isinstance(source_data, Source) else None, 186 cache=cache or None, 187 destination=self, 188 expected_streams=catalog_provider.stream_names, 189 ) 190 191 source_state_provider: StateProviderBase 192 source_state_provider = JoinedStateProvider( 193 primary=cache_state_provider, 194 secondary=destination_state_provider, 195 ) 196 197 if source: 198 if cache is False: 199 # Get message iterator for source (caching disabled) 200 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 201 streams=streams, 202 state_provider=source_state_provider, 203 progress_tracker=progress_tracker, 204 force_full_refresh=force_full_refresh, 205 ) 206 else: 207 # Caching enabled and we are reading from a source. 208 # Read the data to cache if caching is enabled. 209 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 210 cache=cache, 211 state_provider=source_state_provider, 212 state_writer=cache_state_writer, 213 catalog_provider=catalog_provider, 214 stream_names=catalog_provider.stream_names, 215 write_strategy=write_strategy, 216 force_full_refresh=force_full_refresh, 217 skip_validation=False, 218 progress_tracker=progress_tracker, 219 ) 220 message_iterator = AirbyteMessageIterator.from_read_result( 221 read_result=read_result, 222 ) 223 else: # Else we are reading from a read result 224 assert read_result is not None 225 message_iterator = AirbyteMessageIterator.from_read_result( 226 read_result=read_result, 227 ) 228 229 # Write the data to the destination 230 try: 231 self._write_airbyte_message_stream( 232 stdin=message_iterator, 233 catalog_provider=catalog_provider, 234 write_strategy=write_strategy, 235 state_writer=destination_state_writer, 236 progress_tracker=progress_tracker, 237 ) 238 except Exception as ex: 239 progress_tracker.log_failure(exception=ex) 240 raise 241 else: 242 # No exceptions were raised, so log success 243 progress_tracker.log_success() 244 245 return WriteResult( 246 destination=self, 247 source_data=source_data, 248 catalog_provider=catalog_provider, 249 state_writer=destination_state_writer, 250 progress_tracker=progress_tracker, 251 )
Write data from source connector or already cached source data.
Caching is enabled by default, unless explicitly disabled.
Arguments:
- source_data: The source data to write. Can be a
Source
or aReadResult
object. - streams: The streams to write to the destination. If omitted or if "*" is provided,
all streams will be written. If
source_data
is a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source. - cache: The cache to use for reading source_data. If
None
, no cache will be used. If False, the cache will be disabled. This must beNone
ifsource_data
is already aCache
object. - state_cache: A cache to use for storing incremental state. You do not need to set this
if
cache
is specified or ifsource_data
is aCache
object. Set toFalse
to disable state management. - write_strategy: The strategy to use for writing source_data. If
AUTO
, the connector will decide the best strategy to use. - force_full_refresh: Whether to force a full refresh of the source_data. If
True
, any existing state will be ignored and all source data will be reloaded.
For incremental syncs, cache
or state_cache
will be checked for matching state values.
If the cache has tracked state, this will be used for the sync. Otherwise, if there is
a known destination state, the destination-specific state will be used. If neither are
available, a full refresh will be performed.
Inherited Members
- airbyte._connector_base.ConnectorBase
- config_change_callback
- executor
- name
- set_config
- get_config
- config_hash
- validate_config
- config_spec
- print_config_spec
- docs_url
- connector_version
- check
- install
- uninstall