airbyte.callbacks

Callbacks for working with PyAirbyte.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Callbacks for working with PyAirbyte."""
 3
 4from __future__ import annotations
 5
 6from collections.abc import Callable
 7from typing import Any
 8
 9
10ConfigChangeCallback = Callable[[dict[str, Any]], None]
11"""Callback for when the configuration changes while the connector is running.
12
13This callback can be passed to supporting functions like `airbyte.get_source()` and
14`airbyte.get_destination()` to take action whenever configuration changes.
15The callback will be called with the new configuration as the only argument.
16
17The most common use case for this callback is for connectors with OAuth APIs to pass updated
18refresh tokens when the previous token is about to expire.
19
20Note that the dictionary passed will contain the entire configuration, not just the changed fields.
21
22Example Usage:
23
24```python
25import airbyte as ab
26import yaml
27from pathlib import Path
28
29config_file = Path("path/to/my/config.yaml")
30config_dict = yaml.safe_load(config_file.read_text())
31
32# Define the callback function:
33def config_callback(new_config: dict[str, Any]) -> None:
34    # Write new config back to config file
35    config_file.write_text(yaml.safe_dump(new_config))
36
37# Pass in the callback function when creating the source:
38source = get_source(
39    "source-faker",
40    config=config_dict,
41    config_change_callback=config_callback,
42)
43# Now read as usual. If config changes during sync, the callback will be called.
44source.read()
45```
46
47For more information on the underlying Airbyte protocol, please see documentation on the
48[`CONNECTOR_CONFIG`](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytecontrolconnectorconfigmessage)
49control messages.
50"""
ConfigChangeCallback = collections.abc.Callable[[dict[str, typing.Any]], None]

Callback for when the configuration changes while the connector is running.

This callback can be passed to supporting functions like airbyte.get_source() and airbyte.get_destination() to take action whenever configuration changes. The callback will be called with the new configuration as the only argument.

The most common use case for this callback is for connectors with OAuth APIs to pass updated refresh tokens when the previous token is about to expire.

Note that the dictionary passed will contain the entire configuration, not just the changed fields.

Example Usage:

import airbyte as ab
import yaml
from pathlib import Path

config_file = Path("path/to/my/config.yaml")
config_dict = yaml.safe_load(config_file.read_text())

# Define the callback function:
def config_callback(new_config: dict[str, Any]) -> None:
    # Write new config back to config file
    config_file.write_text(yaml.safe_dump(new_config))

# Pass in the callback function when creating the source:
source = get_source(
    "source-faker",
    config=config_dict,
    config_change_callback=config_callback,
)
# Now read as usual. If config changes during sync, the callback will be called.
source.read()

For more information on the underlying Airbyte protocol, please see documentation on the CONNECTOR_CONFIG control messages.