airbyte.destinations
Destinations module.
This module contains classes and methods for interacting with Airbyte destinations. You can use this module to create custom destinations, or to interact with existing destinations.
Getting Started
To get started with destinations, you can use the get_destination() method to create a destination
object. This method takes a destination name and configuration, and returns a destination object
that you can use to write data to the destination.
import airbyte as ab
my_destination = ab.get_destination(
"destination-foo",
config={"api_key": "my_api_key"},
docker_image=True,
)
Writing Data to a Destination
To write data to a destination, you can use the Destination.write() method. This method
takes either a airbyte.Source or airbyte.ReadResult object.
Writing to a destination from a source
To write directly from a source, simply pass the source object to the Destination.write() method:
my_source = get_source(...)
my_destination = get_destination(...)
my_destination.write(source_faker)
Writing from a read result:
To write from a read result, you can use the following pattern. First, read data from the source,
then write the data to the destination, using the ReadResult object as a buffer between the source
and destination:
# First read data from the source:
my_source = get_source(...)
read_result = my_source.read(...)
# Optionally, you can validate data before writing it:
# ...misc validation code here...
# Then write the data to the destination:
my_destination.write(read_result)
Using Docker and Python-based Connectors
By default, the get_destination() method will look for a Python-based connector. If you want to
use a Docker-based connector, you can set the docker_image parameter to True:
my_destination = ab.get_destination(
"destination-foo",
config={"api_key": "my_api_key"},
docker_image=True,
)
Note: Unlike source connectors, most destination connectors are written in Java, and for this
reason are only available as Docker-based connectors. If you need to load to a SQL database and your
runtime does not support docker, you may want to use the airbyte.caches module to load data to
a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally
to PyAirbyte so they can run anywhere that PyAirbyte can run.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Destinations module. 3 4This module contains classes and methods for interacting with Airbyte destinations. You can use this 5module to create custom destinations, or to interact with existing destinations. 6 7## Getting Started 8 9To get started with destinations, you can use the `get_destination()` method to create a destination 10object. This method takes a destination name and configuration, and returns a destination object 11that you can use to write data to the destination. 12 13```python 14import airbyte as ab 15 16my_destination = ab.get_destination( 17 "destination-foo", 18 config={"api_key": "my_api_key"}, 19 docker_image=True, 20) 21``` 22 23## Writing Data to a Destination 24 25To write data to a destination, you can use the `Destination.write()` method. This method 26takes either a `airbyte.Source` or `airbyte.ReadResult` object. 27 28## Writing to a destination from a source 29 30To write directly from a source, simply pass the source object to the `Destination.write()` method: 31 32```python 33my_source = get_source(...) 34my_destination = get_destination(...) 35my_destination.write(source_faker) 36``` 37 38## Writing from a read result: 39 40To write from a read result, you can use the following pattern. First, read data from the source, 41then write the data to the destination, using the `ReadResult` object as a buffer between the source 42and destination: 43 44```python 45# First read data from the source: 46my_source = get_source(...) 47read_result = my_source.read(...) 48 49# Optionally, you can validate data before writing it: 50# ...misc validation code here... 51 52# Then write the data to the destination: 53my_destination.write(read_result) 54``` 55 56## Using Docker and Python-based Connectors 57 58By default, the `get_destination()` method will look for a Python-based connector. If you want to 59use a Docker-based connector, you can set the `docker_image` parameter to `True`: 60 61```python 62my_destination = ab.get_destination( 63 "destination-foo", 64 config={"api_key": "my_api_key"}, 65 docker_image=True, 66) 67``` 68 69**Note:** Unlike source connectors, most destination connectors are written in Java, and for this 70reason are only available as Docker-based connectors. If you need to load to a SQL database and your 71runtime does not support docker, you may want to use the `airbyte.caches` module to load data to 72a SQL cache. Caches are mostly identical to destinations in behavior, and are implemented internally 73to PyAirbyte so they can run anywhere that PyAirbyte can run. 74""" 75 76from __future__ import annotations 77 78from typing import TYPE_CHECKING 79 80from airbyte.destinations.base import Destination 81from airbyte.destinations.util import ( 82 get_destination, 83 get_noop_destination, 84) 85 86 87# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 88if TYPE_CHECKING: 89 # ruff: noqa: TC004 # imports used for more than type checking 90 from airbyte.destinations import util 91 92 93__all__ = [ 94 # Modules 95 "util", 96 # Methods 97 "get_destination", 98 "get_noop_destination", 99 # Classes 100 "Destination", 101]
22def get_destination( # noqa: PLR0913 # Too many arguments 23 name: str, 24 config: dict[str, Any] | None = None, 25 *, 26 config_change_callback: ConfigChangeCallback | None = None, 27 version: str | None = None, 28 use_python: bool | Path | str | None = None, 29 pip_url: str | None = None, 30 local_executable: Path | str | None = None, 31 docker_image: str | bool | None = None, 32 use_host_network: bool = False, 33 install_if_missing: bool = True, 34 install_root: Path | None = None, 35 no_executor: bool = False, 36) -> Destination: 37 """Get a connector by name and version. 38 39 Args: 40 name: connector name 41 config: connector config - if not provided, you need to set it later via the set_config 42 method. 43 config_change_callback: callback function to be called when the connector config changes. 44 streams: list of stream names to select for reading. If set to "*", all streams will be 45 selected. If not provided, you can set it later via the `select_streams()` or 46 `select_all_streams()` method. 47 version: connector version - if not provided, the currently installed version will be used. 48 If no version is installed, the latest available version will be used. The version can 49 also be set to "latest" to force the use of the latest available version. 50 use_python: (Optional.) Python interpreter specification: 51 - True: Use current Python interpreter. (Inferred if `pip_url` is set.) 52 - False: Use Docker instead. 53 - Path: Use interpreter at this path. 54 - str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet 55 installed, it will be installed by uv. (This generally adds less than 3 seconds 56 to install times.) 57 pip_url: connector pip URL - if not provided, the pip url will be inferred from the 58 connector name. 59 local_executable: If set, the connector will be assumed to already be installed and will be 60 executed using this path or executable name. Otherwise, the connector will be installed 61 automatically in a virtual environment. 62 docker_image: If set, the connector will be executed using Docker. You can specify `True` 63 to use the default image for the connector, or you can specify a custom image name. 64 If `version` is specified and your image name does not already contain a tag 65 (e.g. `my-image:latest`), the version will be appended as a tag (e.g. `my-image:0.1.0`). 66 use_host_network: If set, along with docker_image, the connector will be executed using 67 the host network. This is useful for connectors that need to access resources on 68 the host machine, such as a local database. This parameter is ignored when 69 `docker_image` is not set. 70 install_if_missing: Whether to install the connector if it is not available locally. This 71 parameter is ignored when local_executable is set. 72 install_root: (Optional.) The root directory where the virtual environment will be 73 created. If not provided, the current working directory will be used. 74 no_executor: If True, use NoOpExecutor which fetches specs from the registry without 75 local installation. This is useful for scenarios where you need to validate 76 configurations but don't need to run the connector locally (e.g., deploying to Cloud). 77 """ 78 executor = get_connector_executor( 79 name=name, 80 version=version, 81 use_python=use_python, 82 pip_url=pip_url, 83 local_executable=local_executable, 84 docker_image=docker_image, 85 use_host_network=use_host_network, 86 install_if_missing=install_if_missing, 87 install_root=install_root, 88 no_executor=no_executor, 89 ) 90 91 return Destination( 92 name=name, 93 config=config, 94 config_change_callback=config_change_callback, 95 executor=executor, 96 )
Get a connector by name and version.
Arguments:
- name: connector name
- config: connector config - if not provided, you need to set it later via the set_config method.
- config_change_callback: callback function to be called when the connector config changes.
- streams: list of stream names to select for reading. If set to "*", all streams will be
selected. If not provided, you can set it later via the
select_streams()orselect_all_streams()method. - version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
- use_python: (Optional.) Python interpreter specification:
- True: Use current Python interpreter. (Inferred if
pip_urlis set.) - False: Use Docker instead.
- Path: Use interpreter at this path.
- str: Use specific Python version. E.g. "3.11" or "3.11.10". If the version is not yet installed, it will be installed by uv. (This generally adds less than 3 seconds to install times.)
- True: Use current Python interpreter. (Inferred if
- pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
- local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
- docker_image: If set, the connector will be executed using Docker. You can specify
Trueto use the default image for the connector, or you can specify a custom image name. Ifversionis specified and your image name does not already contain a tag (e.g.my-image:latest), the version will be appended as a tag (e.g.my-image:0.1.0). - use_host_network: If set, along with docker_image, the connector will be executed using
the host network. This is useful for connectors that need to access resources on
the host machine, such as a local database. This parameter is ignored when
docker_imageis not set. - install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
- install_root: (Optional.) The root directory where the virtual environment will be created. If not provided, the current working directory will be used.
- no_executor: If True, use NoOpExecutor which fetches specs from the registry without local installation. This is useful for scenarios where you need to validate configurations but don't need to run the connector locally (e.g., deploying to Cloud).
99def get_noop_destination( 100 *, 101 install_if_missing: bool = True, 102) -> Destination: 103 """Get a devnull (no-op) destination. 104 105 This is useful for performance benchmarking of sources, without 106 adding the overhead of writing data to a real destination. 107 """ 108 return get_destination( 109 "destination-dev-null", 110 config={ 111 "test_destination": { 112 "test_destination_type": "SILENT", 113 } 114 }, 115 docker_image=True, 116 install_if_missing=install_if_missing, 117 )
Get a devnull (no-op) destination.
This is useful for performance benchmarking of sources, without adding the overhead of writing data to a real destination.
46class Destination(ConnectorBase, AirbyteWriterInterface): 47 """A class representing a destination that can be called.""" 48 49 connector_type = "destination" 50 51 def __init__( 52 self, 53 executor: Executor, 54 name: str, 55 config: dict[str, Any] | None = None, 56 *, 57 config_change_callback: ConfigChangeCallback | None = None, 58 validate: bool = False, 59 ) -> None: 60 """Initialize the source. 61 62 If config is provided, it will be validated against the spec if validate is True. 63 """ 64 super().__init__( 65 executor=executor, 66 name=name, 67 config=config, 68 config_change_callback=config_change_callback, 69 validate=validate, 70 ) 71 72 @staticmethod 73 def _normalize_destination_name(name: str) -> str: 74 """Normalize a destination name to canonical form (`destination-<type>`). 75 76 Accepts either the short form (e.g. `snowflake`) or the canonical 77 form (e.g. `destination-snowflake`). 78 """ 79 if not name.startswith(_CANONICAL_PREFIX): 80 return f"{_CANONICAL_PREFIX}{name}" 81 return name 82 83 @property 84 def is_cache_supported(self) -> bool: 85 """Whether this destination has a compatible cache implementation. 86 87 Returns `True` when `get_sql_cache()` is expected to succeed for 88 the destination's connector type. 89 """ 90 dest_type = self._normalize_destination_name( 91 self.name, 92 ).replace(_CANONICAL_PREFIX, "") 93 return dest_type in get_supported_destination_types() 94 95 def get_sql_cache( 96 self, 97 *, 98 schema_name: str | None = None, 99 ) -> CacheBase: 100 """Return a SQL Cache for querying data written by this destination. 101 102 This follows the same pattern as 103 `SyncResult.get_sql_cache()` in `airbyte.cloud.sync_results`: 104 it builds a cache from the destination's configuration using 105 `destination_to_cache()`. 106 107 Args: 108 schema_name: Override the schema/namespace on the returned cache. 109 When `None` the cache uses the default schema from the 110 destination config. 111 112 Raises: 113 ValueError: If the destination type is not supported. 114 """ 115 resolved_name = self._normalize_destination_name(self.name) 116 config = dict(self._hydrated_config) 117 118 # Ensure the config carries a destinationType key so that 119 # destination_to_cache() can dispatch correctly. 120 if "destinationType" not in config and "DESTINATION_TYPE" not in config: 121 dest_type = resolved_name.replace(_CANONICAL_PREFIX, "") 122 config["destinationType"] = dest_type 123 124 return destination_to_cache(config, schema_name=schema_name) 125 126 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 127 self, 128 source_data: Source | ReadResult, 129 *, 130 streams: list[str] | Literal["*"] | None = None, 131 cache: CacheBase | Literal[False] | None = None, 132 state_cache: CacheBase | Literal[False] | None = None, 133 write_strategy: WriteStrategy = WriteStrategy.AUTO, 134 force_full_refresh: bool = False, 135 ) -> WriteResult: 136 """Write data from source connector or already cached source data. 137 138 Caching is enabled by default, unless explicitly disabled. 139 140 Args: 141 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 142 streams: The streams to write to the destination. If omitted or if "*" is provided, 143 all streams will be written. If `source_data` is a source, then streams must be 144 selected here or on the source. If both are specified, this setting will override 145 the stream selection on the source. 146 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 147 False, the cache will be disabled. This must be `None` if `source_data` is already 148 a `Cache` object. 149 state_cache: A cache to use for storing incremental state. You do not need to set this 150 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 151 disable state management. 152 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 153 will decide the best strategy to use. 154 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 155 existing state will be ignored and all source data will be reloaded. 156 157 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 158 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 159 a known destination state, the destination-specific state will be used. If neither are 160 available, a full refresh will be performed. 161 """ 162 if not isinstance(source_data, ReadResult | Source): 163 raise exc.PyAirbyteInputError( 164 message="Invalid source_data type for `source_data` arg.", 165 context={ 166 "source_data_type_provided": type(source_data).__name__, 167 }, 168 ) 169 170 # Resolve `source`, `read_result`, and `source_name` 171 source: Source | None = source_data if isinstance(source_data, Source) else None 172 read_result: ReadResult | None = ( 173 source_data if isinstance(source_data, ReadResult) else None 174 ) 175 source_name: str = source.name if source else cast("ReadResult", read_result).source_name 176 177 # State providers and writers default to no-op, unless overridden below. 178 cache_state_provider: StateProviderBase = StaticInputState([]) 179 """Provides the state of the cache's data.""" 180 cache_state_writer: StateWriterBase = NoOpStateWriter() 181 """Writes updates for the state of the cache's data.""" 182 destination_state_provider: StateProviderBase = StaticInputState([]) 183 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 184 destination_state_writer: StateWriterBase = NoOpStateWriter() 185 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 186 187 # If caching not explicitly disabled 188 if cache is not False: 189 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 190 if isinstance(source_data, ReadResult): 191 cache = source_data.cache 192 193 cache = cache or get_default_cache() 194 cache_state_provider = cache.get_state_provider( 195 source_name=source_name, 196 destination_name=None, # This will just track the cache state 197 ) 198 cache_state_writer = cache.get_state_writer( 199 source_name=source_name, 200 destination_name=None, # This will just track the cache state 201 ) 202 203 # Resolve `state_cache` 204 if state_cache is None: 205 state_cache = cache or get_default_cache() 206 207 # Resolve `destination_state_writer` and `destination_state_provider` 208 if state_cache: 209 destination_state_writer = state_cache.get_state_writer( 210 source_name=source_name, 211 destination_name=self.name, 212 ) 213 if not force_full_refresh: 214 destination_state_provider = state_cache.get_state_provider( 215 source_name=source_name, 216 destination_name=self.name, 217 ) 218 elif state_cache is not False: 219 warnings.warn( 220 "No state backend or cache provided. State will not be tracked." 221 "To track state, provide a cache or state backend." 222 "To silence this warning, set `state_cache=False` explicitly.", 223 category=exc.PyAirbyteWarning, 224 stacklevel=2, 225 ) 226 227 # Resolve `catalog_provider` 228 if source: 229 catalog_provider = CatalogProvider( 230 configured_catalog=source.get_configured_catalog( 231 streams=streams, 232 force_full_refresh=force_full_refresh, 233 ) 234 ) 235 elif read_result: 236 catalog_provider = CatalogProvider.from_read_result(read_result) 237 else: 238 raise exc.PyAirbyteInternalError( 239 message="`source_data` must be a `Source` or `ReadResult` object.", 240 ) 241 242 progress_tracker = ProgressTracker( 243 source=source if isinstance(source_data, Source) else None, 244 cache=cache or None, 245 destination=self, 246 expected_streams=catalog_provider.stream_names, 247 ) 248 249 source_state_provider: StateProviderBase 250 source_state_provider = JoinedStateProvider( 251 primary=cache_state_provider, 252 secondary=destination_state_provider, 253 ) 254 255 if source: 256 if cache is False: 257 # Get message iterator for source (caching disabled) 258 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 259 streams=streams, 260 state_provider=source_state_provider, 261 progress_tracker=progress_tracker, 262 force_full_refresh=force_full_refresh, 263 ) 264 else: 265 # Caching enabled and we are reading from a source. 266 # Read the data to cache if caching is enabled. 267 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 268 cache=cache, 269 state_provider=source_state_provider, 270 state_writer=cache_state_writer, 271 catalog_provider=catalog_provider, 272 stream_names=catalog_provider.stream_names, 273 write_strategy=write_strategy, 274 force_full_refresh=force_full_refresh, 275 skip_validation=False, 276 progress_tracker=progress_tracker, 277 ) 278 message_iterator = AirbyteMessageIterator.from_read_result( 279 read_result=read_result, 280 ) 281 else: # Else we are reading from a read result 282 assert read_result is not None 283 message_iterator = AirbyteMessageIterator.from_read_result( 284 read_result=read_result, 285 ) 286 287 # Write the data to the destination 288 try: 289 self._write_airbyte_message_stream( 290 stdin=message_iterator, 291 catalog_provider=catalog_provider, 292 write_strategy=write_strategy, 293 state_writer=destination_state_writer, 294 progress_tracker=progress_tracker, 295 ) 296 except Exception as ex: 297 progress_tracker.log_failure(exception=ex) 298 raise 299 else: 300 # No exceptions were raised, so log success 301 progress_tracker.log_success() 302 303 return WriteResult( 304 destination=self, 305 source_data=source_data, 306 catalog_provider=catalog_provider, 307 state_writer=destination_state_writer, 308 progress_tracker=progress_tracker, 309 ) 310 311 def _write_airbyte_message_stream( 312 self, 313 stdin: IO[str] | AirbyteMessageIterator, 314 *, 315 catalog_provider: CatalogProvider, 316 write_strategy: WriteStrategy, 317 state_writer: StateWriterBase | None = None, 318 progress_tracker: ProgressTracker, 319 ) -> None: 320 """Read from the connector and write to the cache.""" 321 # Run optional validation step 322 if state_writer is None: 323 state_writer = StdOutStateWriter() 324 325 # Apply the write strategy to the catalog provider before sending to the destination 326 catalog_provider = catalog_provider.with_write_strategy(write_strategy) 327 328 with as_temp_files( 329 files_contents=[ 330 self._hydrated_config, 331 catalog_provider.configured_catalog.model_dump_json(exclude_none=True), 332 ] 333 ) as [ 334 config_file, 335 catalog_file, 336 ]: 337 try: 338 # We call the connector to write the data, tallying the inputs and outputs 339 for destination_message in progress_tracker.tally_confirmed_writes( 340 messages=self._execute( 341 args=[ 342 "write", 343 "--config", 344 config_file, 345 "--catalog", 346 catalog_file, 347 ], 348 stdin=AirbyteMessageIterator( 349 progress_tracker.tally_pending_writes( 350 stdin, 351 ) 352 ), 353 ) 354 ): 355 if destination_message.state: 356 state_writer.write_state(state_message=destination_message.state) 357 358 except exc.AirbyteConnectorFailedError as ex: 359 raise exc.AirbyteConnectorWriteError( 360 connector_name=self.name, 361 log_text=self._last_log_messages, 362 original_exception=ex, 363 ) from None
A class representing a destination that can be called.
51 def __init__( 52 self, 53 executor: Executor, 54 name: str, 55 config: dict[str, Any] | None = None, 56 *, 57 config_change_callback: ConfigChangeCallback | None = None, 58 validate: bool = False, 59 ) -> None: 60 """Initialize the source. 61 62 If config is provided, it will be validated against the spec if validate is True. 63 """ 64 super().__init__( 65 executor=executor, 66 name=name, 67 config=config, 68 config_change_callback=config_change_callback, 69 validate=validate, 70 )
Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
83 @property 84 def is_cache_supported(self) -> bool: 85 """Whether this destination has a compatible cache implementation. 86 87 Returns `True` when `get_sql_cache()` is expected to succeed for 88 the destination's connector type. 89 """ 90 dest_type = self._normalize_destination_name( 91 self.name, 92 ).replace(_CANONICAL_PREFIX, "") 93 return dest_type in get_supported_destination_types()
Whether this destination has a compatible cache implementation.
Returns True when get_sql_cache() is expected to succeed for
the destination's connector type.
95 def get_sql_cache( 96 self, 97 *, 98 schema_name: str | None = None, 99 ) -> CacheBase: 100 """Return a SQL Cache for querying data written by this destination. 101 102 This follows the same pattern as 103 `SyncResult.get_sql_cache()` in `airbyte.cloud.sync_results`: 104 it builds a cache from the destination's configuration using 105 `destination_to_cache()`. 106 107 Args: 108 schema_name: Override the schema/namespace on the returned cache. 109 When `None` the cache uses the default schema from the 110 destination config. 111 112 Raises: 113 ValueError: If the destination type is not supported. 114 """ 115 resolved_name = self._normalize_destination_name(self.name) 116 config = dict(self._hydrated_config) 117 118 # Ensure the config carries a destinationType key so that 119 # destination_to_cache() can dispatch correctly. 120 if "destinationType" not in config and "DESTINATION_TYPE" not in config: 121 dest_type = resolved_name.replace(_CANONICAL_PREFIX, "") 122 config["destinationType"] = dest_type 123 124 return destination_to_cache(config, schema_name=schema_name)
Return a SQL Cache for querying data written by this destination.
This follows the same pattern as
SyncResult.get_sql_cache() in airbyte.cloud.sync_results:
it builds a cache from the destination's configuration using
destination_to_cache().
Arguments:
- schema_name: Override the schema/namespace on the returned cache.
When
Nonethe cache uses the default schema from the destination config.
Raises:
- ValueError: If the destination type is not supported.
126 def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements 127 self, 128 source_data: Source | ReadResult, 129 *, 130 streams: list[str] | Literal["*"] | None = None, 131 cache: CacheBase | Literal[False] | None = None, 132 state_cache: CacheBase | Literal[False] | None = None, 133 write_strategy: WriteStrategy = WriteStrategy.AUTO, 134 force_full_refresh: bool = False, 135 ) -> WriteResult: 136 """Write data from source connector or already cached source data. 137 138 Caching is enabled by default, unless explicitly disabled. 139 140 Args: 141 source_data: The source data to write. Can be a `Source` or a `ReadResult` object. 142 streams: The streams to write to the destination. If omitted or if "*" is provided, 143 all streams will be written. If `source_data` is a source, then streams must be 144 selected here or on the source. If both are specified, this setting will override 145 the stream selection on the source. 146 cache: The cache to use for reading source_data. If `None`, no cache will be used. If 147 False, the cache will be disabled. This must be `None` if `source_data` is already 148 a `Cache` object. 149 state_cache: A cache to use for storing incremental state. You do not need to set this 150 if `cache` is specified or if `source_data` is a `Cache` object. Set to `False` to 151 disable state management. 152 write_strategy: The strategy to use for writing source_data. If `AUTO`, the connector 153 will decide the best strategy to use. 154 force_full_refresh: Whether to force a full refresh of the source_data. If `True`, any 155 existing state will be ignored and all source data will be reloaded. 156 157 For incremental syncs, `cache` or `state_cache` will be checked for matching state values. 158 If the cache has tracked state, this will be used for the sync. Otherwise, if there is 159 a known destination state, the destination-specific state will be used. If neither are 160 available, a full refresh will be performed. 161 """ 162 if not isinstance(source_data, ReadResult | Source): 163 raise exc.PyAirbyteInputError( 164 message="Invalid source_data type for `source_data` arg.", 165 context={ 166 "source_data_type_provided": type(source_data).__name__, 167 }, 168 ) 169 170 # Resolve `source`, `read_result`, and `source_name` 171 source: Source | None = source_data if isinstance(source_data, Source) else None 172 read_result: ReadResult | None = ( 173 source_data if isinstance(source_data, ReadResult) else None 174 ) 175 source_name: str = source.name if source else cast("ReadResult", read_result).source_name 176 177 # State providers and writers default to no-op, unless overridden below. 178 cache_state_provider: StateProviderBase = StaticInputState([]) 179 """Provides the state of the cache's data.""" 180 cache_state_writer: StateWriterBase = NoOpStateWriter() 181 """Writes updates for the state of the cache's data.""" 182 destination_state_provider: StateProviderBase = StaticInputState([]) 183 """Provides the state of the destination's data, from `cache` or `state_cache`.""" 184 destination_state_writer: StateWriterBase = NoOpStateWriter() 185 """Writes updates for the state of the destination's data, to `cache` or `state_cache`.""" 186 187 # If caching not explicitly disabled 188 if cache is not False: 189 # Resolve `cache`, `cache_state_provider`, and `cache_state_writer` 190 if isinstance(source_data, ReadResult): 191 cache = source_data.cache 192 193 cache = cache or get_default_cache() 194 cache_state_provider = cache.get_state_provider( 195 source_name=source_name, 196 destination_name=None, # This will just track the cache state 197 ) 198 cache_state_writer = cache.get_state_writer( 199 source_name=source_name, 200 destination_name=None, # This will just track the cache state 201 ) 202 203 # Resolve `state_cache` 204 if state_cache is None: 205 state_cache = cache or get_default_cache() 206 207 # Resolve `destination_state_writer` and `destination_state_provider` 208 if state_cache: 209 destination_state_writer = state_cache.get_state_writer( 210 source_name=source_name, 211 destination_name=self.name, 212 ) 213 if not force_full_refresh: 214 destination_state_provider = state_cache.get_state_provider( 215 source_name=source_name, 216 destination_name=self.name, 217 ) 218 elif state_cache is not False: 219 warnings.warn( 220 "No state backend or cache provided. State will not be tracked." 221 "To track state, provide a cache or state backend." 222 "To silence this warning, set `state_cache=False` explicitly.", 223 category=exc.PyAirbyteWarning, 224 stacklevel=2, 225 ) 226 227 # Resolve `catalog_provider` 228 if source: 229 catalog_provider = CatalogProvider( 230 configured_catalog=source.get_configured_catalog( 231 streams=streams, 232 force_full_refresh=force_full_refresh, 233 ) 234 ) 235 elif read_result: 236 catalog_provider = CatalogProvider.from_read_result(read_result) 237 else: 238 raise exc.PyAirbyteInternalError( 239 message="`source_data` must be a `Source` or `ReadResult` object.", 240 ) 241 242 progress_tracker = ProgressTracker( 243 source=source if isinstance(source_data, Source) else None, 244 cache=cache or None, 245 destination=self, 246 expected_streams=catalog_provider.stream_names, 247 ) 248 249 source_state_provider: StateProviderBase 250 source_state_provider = JoinedStateProvider( 251 primary=cache_state_provider, 252 secondary=destination_state_provider, 253 ) 254 255 if source: 256 if cache is False: 257 # Get message iterator for source (caching disabled) 258 message_iterator: AirbyteMessageIterator = source._get_airbyte_message_iterator( # noqa: SLF001 # Non-public API 259 streams=streams, 260 state_provider=source_state_provider, 261 progress_tracker=progress_tracker, 262 force_full_refresh=force_full_refresh, 263 ) 264 else: 265 # Caching enabled and we are reading from a source. 266 # Read the data to cache if caching is enabled. 267 read_result = source._read_to_cache( # noqa: SLF001 # Non-public API 268 cache=cache, 269 state_provider=source_state_provider, 270 state_writer=cache_state_writer, 271 catalog_provider=catalog_provider, 272 stream_names=catalog_provider.stream_names, 273 write_strategy=write_strategy, 274 force_full_refresh=force_full_refresh, 275 skip_validation=False, 276 progress_tracker=progress_tracker, 277 ) 278 message_iterator = AirbyteMessageIterator.from_read_result( 279 read_result=read_result, 280 ) 281 else: # Else we are reading from a read result 282 assert read_result is not None 283 message_iterator = AirbyteMessageIterator.from_read_result( 284 read_result=read_result, 285 ) 286 287 # Write the data to the destination 288 try: 289 self._write_airbyte_message_stream( 290 stdin=message_iterator, 291 catalog_provider=catalog_provider, 292 write_strategy=write_strategy, 293 state_writer=destination_state_writer, 294 progress_tracker=progress_tracker, 295 ) 296 except Exception as ex: 297 progress_tracker.log_failure(exception=ex) 298 raise 299 else: 300 # No exceptions were raised, so log success 301 progress_tracker.log_success() 302 303 return WriteResult( 304 destination=self, 305 source_data=source_data, 306 catalog_provider=catalog_provider, 307 state_writer=destination_state_writer, 308 progress_tracker=progress_tracker, 309 )
Write data from source connector or already cached source data.
Caching is enabled by default, unless explicitly disabled.
Arguments:
- source_data: The source data to write. Can be a
Sourceor aReadResultobject. - streams: The streams to write to the destination. If omitted or if "*" is provided,
all streams will be written. If
source_datais a source, then streams must be selected here or on the source. If both are specified, this setting will override the stream selection on the source. - cache: The cache to use for reading source_data. If
None, no cache will be used. If False, the cache will be disabled. This must beNoneifsource_datais already aCacheobject. - state_cache: A cache to use for storing incremental state. You do not need to set this
if
cacheis specified or ifsource_datais aCacheobject. Set toFalseto disable state management. - write_strategy: The strategy to use for writing source_data. If
AUTO, the connector will decide the best strategy to use. - force_full_refresh: Whether to force a full refresh of the source_data. If
True, any existing state will be ignored and all source data will be reloaded.
For incremental syncs, cache or state_cache will be checked for matching state values.
If the cache has tracked state, this will be used for the sync. Otherwise, if there is
a known destination state, the destination-specific state will be used. If neither are
available, a full refresh will be performed.