airbyte.callbacks
Callbacks for working with PyAirbyte.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Callbacks for working with PyAirbyte.""" 3 4from __future__ import annotations 5 6from collections.abc import Callable 7from typing import Any 8 9 10ConfigChangeCallback = Callable[[dict[str, Any]], None] 11"""Callback for when the configuration changes while the connector is running. 12 13This callback can be passed to supporting functions like `airbyte.get_source()` and 14`airbyte.get_destination()` to take action whenever configuration changes. 15The callback will be called with the new configuration as the only argument. 16 17The most common use case for this callback is for connectors with OAuth APIs to pass updated 18refresh tokens when the previous token is about to expire. 19 20Note that the dictionary passed will contain the entire configuration, not just the changed fields. 21 22Example Usage: 23 24```python 25import airbyte as ab 26import yaml 27from pathlib import Path 28 29config_file = Path("path/to/my/config.yaml") 30config_dict = yaml.safe_load(config_file.read_text()) 31 32# Define the callback function: 33def config_callback(new_config: dict[str, Any]) -> None: 34 # Write new config back to config file 35 config_file.write_text(yaml.safe_dump(new_config)) 36 37# Pass in the callback function when creating the source: 38source = get_source( 39 "source-faker", 40 config=config_dict, 41 config_change_callback=config_callback, 42) 43# Now read as usual. If config changes during sync, the callback will be called. 44source.read() 45``` 46 47For more information on the underlying Airbyte protocol, please see documentation on the 48[`CONNECTOR_CONFIG`](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytecontrolconnectorconfigmessage) 49control messages. 50"""
ConfigChangeCallback =
collections.abc.Callable[[dict[str, typing.Any]], None]
Callback for when the configuration changes while the connector is running.
This callback can be passed to supporting functions like airbyte.get_source()
and
airbyte.get_destination()
to take action whenever configuration changes.
The callback will be called with the new configuration as the only argument.
The most common use case for this callback is for connectors with OAuth APIs to pass updated refresh tokens when the previous token is about to expire.
Note that the dictionary passed will contain the entire configuration, not just the changed fields.
Example Usage:
import airbyte as ab
import yaml
from pathlib import Path
config_file = Path("path/to/my/config.yaml")
config_dict = yaml.safe_load(config_file.read_text())
# Define the callback function:
def config_callback(new_config: dict[str, Any]) -> None:
# Write new config back to config file
config_file.write_text(yaml.safe_dump(new_config))
# Pass in the callback function when creating the source:
source = get_source(
"source-faker",
config=config_dict,
config_change_callback=config_callback,
)
# Now read as usual. If config changes during sync, the callback will be called.
source.read()
For more information on the underlying Airbyte protocol, please see documentation on the
CONNECTOR_CONFIG
control messages.