airbyte

PyAirbyte brings Airbyte ELT to every Python developer.

PyAirbyte

PyAirbyte brings the power of Airbyte to every Python developer. PyAirbyte provides a set of utilities to use Airbyte connectors in Python.

PyPI version PyPI - Downloads PyPI - Python Version PyPI - Wheel PyPI - Implementation PyPI - Format Star on GitHub

Getting Started

Watch this Getting Started Loom video or run one of our Quickstart tutorials below to see how you can use PyAirbyte in your python code.

Secrets Management

PyAirbyte can auto-import secrets from the following sources:

  1. Environment variables.
  2. Variables defined in a local .env ("Dotenv") file.
  3. Google Colab secrets.
  4. Manual entry via getpass.

_Note: You can also build your own secret manager by subclassing the CustomSecretManager implementation. For more information, see the airbyte.secrets.CustomSecretManager class definiton._

Retrieving Secrets

import airbyte as ab

source = ab.get_source("source-github")
source.set_config(
   "credentials": {
      "personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"),
   }
)

By default, PyAirbyte will search all available secrets sources. The get_secret() function also accepts an optional sources argument of specific source names (SecretSourceEnum) and/or secret manager objects to check.

By default, PyAirbyte will prompt the user for any requested secrets that are not provided via other secret managers. You can disable this prompt by passing allow_prompt=False to get_secret().

For more information, see the airbyte.secrets module.

Secrets Auto-Discovery

If you have a secret matching an expected name, PyAirbyte will automatically use it. For example, if you have a secret named GITHUB_PERSONAL_ACCESS_TOKEN, PyAirbyte will automatically use it when configuring the GitHub source.

The naming convention for secrets is as {CONNECTOR_NAME}_{PROPERTY_NAME}, for instance SNOWFLAKE_PASSWORD and BIGQUERY_CREDENTIALS_PATH.

PyAirbyte will also auto-discover secrets for interop with hosted Airbyte: AIRBYTE_CLOUD_API_URL, AIRBYTE_CLOUD_API_KEY, etc.

Contributing

To learn how you can contribute to PyAirbyte, please see our PyAirbyte Contributors Guide.

Frequently asked Questions

1. Does PyAirbyte replace Airbyte? No.

2. What is the PyAirbyte cache? Is it a destination? Yes, you can think of it as a built-in destination implementation, but we avoid the word "destination" in our docs to prevent confusion with our certified destinations list here.

3. Does PyAirbyte work with data orchestration frameworks like Airflow, Dagster, and Snowpark, Yes, it should. Please give it a try and report any problems you see. Also, drop us a note if works for you!

4. Can I use PyAirbyte to develop or test when developing Airbyte sources? Yes, you can, but only for Python-based sources.

5. Can I develop traditional ETL pipelines with PyAirbyte? Yes. Just pick the cache type matching the destination - like SnowflakeCache for landing data in Snowflake.

6. Can PyAirbyte import a connector from a local directory that has python project files, or does it have to be pip install Yes, PyAirbyte can use any local install that has a CLI - and will automatically find connectors by name if they are on PATH.

Changelog and Release Notes

For a version history and list of all changes, please see our GitHub Releases page.

API Reference

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte brings Airbyte ELT to every Python developer.
 3
 4.. include:: ../README.md
 5
 6## API Reference
 7
 8"""
 9
10from __future__ import annotations
11
12from airbyte import (
13    caches,
14    cloud,
15    datasets,
16    documents,
17    exceptions,  # noqa: ICN001  # No 'exc' alias for top-level module
18    experimental,
19    results,
20    secrets,
21    sources,
22)
23from airbyte.caches.bigquery import BigQueryCache
24from airbyte.caches.duckdb import DuckDBCache
25from airbyte.caches.util import get_default_cache, new_local_cache
26from airbyte.datasets import CachedDataset
27from airbyte.records import StreamRecord
28from airbyte.results import ReadResult
29from airbyte.secrets import SecretSourceEnum, get_secret
30from airbyte.sources import registry
31from airbyte.sources.base import Source
32from airbyte.sources.registry import get_available_connectors
33from airbyte.sources.util import get_source
34
35
36__all__ = [
37    # Modules
38    "cloud",
39    "caches",
40    "datasets",
41    "documents",
42    "exceptions",
43    "experimental",
44    "records",
45    "registry",
46    "results",
47    "secrets",
48    "sources",
49    # Factories
50    "get_available_connectors",
51    "get_default_cache",
52    "get_secret",
53    "get_source",
54    "new_local_cache",
55    # Classes
56    "BigQueryCache",
57    "CachedDataset",
58    "DuckDBCache",
59    "ReadResult",
60    "SecretSourceEnum",
61    "Source",
62    "StreamRecord",
63]
64
65__docformat__ = "google"
def get_available_connectors() -> list[str]:
118def get_available_connectors() -> list[str]:
119    """Return a list of all available connectors.
120
121    Connectors will be returned in alphabetical order, with the standard prefix "source-".
122    """
123    return sorted(
124        conn.name for conn in _get_registry_cache().values() if conn.pypi_package_name is not None
125    )

Return a list of all available connectors.

Connectors will be returned in alphabetical order, with the standard prefix "source-".

def get_default_cache() -> DuckDBCache:
15def get_default_cache() -> DuckDBCache:
16    """Get a local cache for storing data, using the default database path.
17
18    Cache files are stored in the `.cache` directory, relative to the current
19    working directory.
20    """
21    cache_dir = Path("./.cache/default_cache")
22    return DuckDBCache(
23        db_path=cache_dir / "default_cache.duckdb",
24        cache_dir=cache_dir,
25    )

Get a local cache for storing data, using the default database path.

Cache files are stored in the .cache directory, relative to the current working directory.

def get_secret( secret_name: str, /, *, sources: list[airbyte.secrets.base.SecretManager | SecretSourceEnum] | None = None, allow_prompt: bool = True, **kwargs: dict[str, typing.Any]) -> airbyte.secrets.base.SecretString:
15def get_secret(
16    secret_name: str,
17    /,
18    *,
19    sources: list[SecretManager | SecretSourceEnum] | None = None,
20    allow_prompt: bool = True,
21    **kwargs: dict[str, Any],
22) -> SecretString:
23    """Get a secret from the environment.
24
25    The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum`
26    options. If left blank, all available sources will be checked. If a list of `SecretSourceEnum`
27    entries is passed, then the sources will be checked using the provided ordering.
28
29    If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then
30    the user will be prompted to enter the secret if it is not found in any of the other sources.
31    """
32    if "source" in kwargs:
33        warnings.warn(
34            message="The `source` argument is deprecated. Use the `sources` argument instead.",
35            category=DeprecationWarning,
36            stacklevel=2,
37        )
38        sources = kwargs.pop("source")  # type: ignore [assignment]
39
40    available_sources: dict[str, SecretManager] = {}
41    for available_source in _get_secret_sources():
42        # Add available sources to the dict. Order matters.
43        available_sources[available_source.name] = available_source
44
45    if sources is None:
46        # If ANY is in the list, then we don't need to check any other sources.
47        # This is the default behavior.
48        sources = list(available_sources.values())
49
50    elif not isinstance(sources, list):
51        sources = [sources]  # type: ignore [unreachable]  # This is a 'just in case' catch.
52
53    # Replace any SecretSourceEnum strings with the matching SecretManager object
54    for source in list(sources):
55        if isinstance(source, SecretSourceEnum):
56            if source not in available_sources:
57                raise exc.PyAirbyteInputError(
58                    guidance="Invalid secret source name.",
59                    input_value=source,
60                    context={
61                        "Available Sources": list(available_sources.keys()),
62                    },
63                )
64
65            sources[sources.index(source)] = available_sources[source]
66
67    secret_managers = cast(list[SecretManager], sources)
68
69    if SecretSourceEnum.PROMPT in secret_managers:
70        prompt_source = secret_managers.pop(
71            # Mis-typed, but okay here since we have equality logic for the enum comparison:
72            secret_managers.index(SecretSourceEnum.PROMPT),  # type: ignore [arg-type]
73        )
74
75        if allow_prompt:
76            # Always check prompt last. Add it to the end of the list.
77            secret_managers.append(prompt_source)
78
79    for secret_mgr in secret_managers:
80        val = secret_mgr.get_secret(secret_name)
81        if val:
82            return SecretString(val)
83
84    raise exc.PyAirbyteSecretNotFoundError(
85        secret_name=secret_name,
86        sources=[str(s) for s in available_sources],
87    )

Get a secret from the environment.

The optional sources argument of enum type SecretSourceEnum or list of SecretSourceEnum options. If left blank, all available sources will be checked. If a list of SecretSourceEnum entries is passed, then the sources will be checked using the provided ordering.

If allow_prompt is True or if SecretSourceEnum.PROMPT is declared in the source arg, then the user will be prompted to enter the secret if it is not found in any of the other sources.

def get_source( name: str, config: dict[str, typing.Any] | None = None, *, streams: str | list[str] | None = None, version: str | None = None, pip_url: str | None = None, local_executable: pathlib.Path | str | None = None, install_if_missing: bool = True) -> Source:
208def get_source(
209    name: str,
210    config: dict[str, Any] | None = None,
211    *,
212    streams: str | list[str] | None = None,
213    version: str | None = None,
214    pip_url: str | None = None,
215    local_executable: Path | str | None = None,
216    install_if_missing: bool = True,
217) -> Source:
218    """Get a connector by name and version.
219
220    Args:
221        name: connector name
222        config: connector config - if not provided, you need to set it later via the set_config
223            method.
224        streams: list of stream names to select for reading. If set to "*", all streams will be
225            selected. If not provided, you can set it later via the `select_streams()` or
226            `select_all_streams()` method.
227        version: connector version - if not provided, the currently installed version will be used.
228            If no version is installed, the latest available version will be used. The version can
229            also be set to "latest" to force the use of the latest available version.
230        pip_url: connector pip URL - if not provided, the pip url will be inferred from the
231            connector name.
232        local_executable: If set, the connector will be assumed to already be installed and will be
233            executed using this path or executable name. Otherwise, the connector will be installed
234            automatically in a virtual environment.
235        install_if_missing: Whether to install the connector if it is not available locally. This
236            parameter is ignored when local_executable is set.
237    """
238    return _get_source(
239        name=name,
240        config=config,
241        streams=streams,
242        version=version,
243        pip_url=pip_url,
244        local_executable=local_executable,
245        install_if_missing=install_if_missing,
246    )

Get a connector by name and version.

Arguments:
  • name: connector name
  • config: connector config - if not provided, you need to set it later via the set_config method.
  • streams: list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, you can set it later via the select_streams() or select_all_streams() method.
  • version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
  • pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
  • local_executable: If set, the connector will be assumed to already be installed and will be executed using this path or executable name. Otherwise, the connector will be installed automatically in a virtual environment.
  • install_if_missing: Whether to install the connector if it is not available locally. This parameter is ignored when local_executable is set.
def new_local_cache( cache_name: str | None = None, cache_dir: str | pathlib.Path | None = None, *, cleanup: bool = True) -> DuckDBCache:
28def new_local_cache(
29    cache_name: str | None = None,
30    cache_dir: str | Path | None = None,
31    *,
32    cleanup: bool = True,
33) -> DuckDBCache:
34    """Get a local cache for storing data, using a name string to seed the path.
35
36    Args:
37        cache_name: Name to use for the cache. Defaults to None.
38        cache_dir: Root directory to store the cache in. Defaults to None.
39        cleanup: Whether to clean up temporary files. Defaults to True.
40
41    Cache files are stored in the `.cache` directory, relative to the current
42    working directory.
43    """
44    if cache_name:
45        if " " in cache_name:
46            raise exc.PyAirbyteInputError(
47                message="Cache name cannot contain spaces.",
48                input_value=cache_name,
49            )
50
51        if not cache_name.replace("_", "").isalnum():
52            raise exc.PyAirbyteInputError(
53                message="Cache name can only contain alphanumeric characters and underscores.",
54                input_value=cache_name,
55            )
56
57    cache_name = cache_name or str(ulid.ULID())
58    cache_dir = cache_dir or Path(f"./.cache/{cache_name}")
59    if not isinstance(cache_dir, Path):
60        cache_dir = Path(cache_dir)
61
62    return DuckDBCache(
63        db_path=cache_dir / f"db_{cache_name}.duckdb",
64        cache_dir=cache_dir,
65        cleanup=cleanup,
66    )

Get a local cache for storing data, using a name string to seed the path.

Arguments:
  • cache_name: Name to use for the cache. Defaults to None.
  • cache_dir: Root directory to store the cache in. Defaults to None.
  • cleanup: Whether to clean up temporary files. Defaults to True.

Cache files are stored in the .cache directory, relative to the current working directory.

class BigQueryCache(airbyte._processors.sql.bigquery.BigQueryConfig, airbyte.caches.base.CacheBase):
29class BigQueryCache(BigQueryConfig, CacheBase):
30    """The BigQuery cache implementation."""
31
32    _sql_processor_class: type[BigQuerySqlProcessor] = PrivateAttr(default=BigQuerySqlProcessor)

The BigQuery cache implementation.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
processor
get_record_processor
get_records
get_pandas_dataframe
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.bigquery.BigQueryConfig
database_name
schema_name
credentials_path
project_name
dataset_name
get_sql_alchemy_url
get_database_name
get_vendor_client
airbyte._future_cdk.sql_processor.SqlConfig
table_prefix
get_sql_engine
pydantic.main.BaseModel
Config
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
class CachedDataset(airbyte.datasets._sql.SQLDataset):
132class CachedDataset(SQLDataset):
133    """A dataset backed by a SQL table cache.
134
135    Because this dataset includes all records from the underlying table, we also expose the
136    underlying table as a SQLAlchemy Table object.
137    """
138
139    def __init__(
140        self,
141        cache: CacheBase,
142        stream_name: str,
143    ) -> None:
144        """We construct the query statement by selecting all columns from the table.
145
146        This prevents the need to scan the table schema to construct the query statement.
147        """
148        table_name = cache.processor.get_sql_table_name(stream_name)
149        schema_name = cache.schema_name
150        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
151        super().__init__(
152            cache=cache,
153            stream_name=stream_name,
154            query_statement=query,
155        )
156
157    @overrides
158    def to_pandas(self) -> DataFrame:
159        """Return the underlying dataset data as a pandas DataFrame."""
160        return self._cache.get_pandas_dataframe(self._stream_name)
161
162    def to_sql_table(self) -> Table:
163        """Return the underlying SQL table as a SQLAlchemy Table object."""
164        return self._cache.processor.get_sql_table(self.stream_name)
165
166    def __eq__(self, value: object) -> bool:
167        """Return True if the value is a CachedDataset with the same cache and stream name.
168
169        In the case of CachedDataset objects, we can simply compare the cache and stream name.
170
171        Note that this equality check is only supported on CachedDataset objects and not for
172        the base SQLDataset implementation. This is because of the complexity and computational
173        cost of comparing two arbitrary SQL queries that could be bound to different variables,
174        as well as the chance that two queries can be syntactically equivalent without being
175        text-wise equivalent.
176        """
177        if not isinstance(value, SQLDataset):
178            return False
179
180        if self._cache is not value._cache:
181            return False
182
183        return not self._stream_name != value._stream_name
184
185    def __hash__(self) -> int:
186        return hash(self._stream_name)

A dataset backed by a SQL table cache.

Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.

CachedDataset(cache: airbyte.caches.base.CacheBase, stream_name: str)
139    def __init__(
140        self,
141        cache: CacheBase,
142        stream_name: str,
143    ) -> None:
144        """We construct the query statement by selecting all columns from the table.
145
146        This prevents the need to scan the table schema to construct the query statement.
147        """
148        table_name = cache.processor.get_sql_table_name(stream_name)
149        schema_name = cache.schema_name
150        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
151        super().__init__(
152            cache=cache,
153            stream_name=stream_name,
154            query_statement=query,
155        )

We construct the query statement by selecting all columns from the table.

This prevents the need to scan the table schema to construct the query statement.

@overrides
def to_pandas(self) -> pandas.core.frame.DataFrame:
157    @overrides
158    def to_pandas(self) -> DataFrame:
159        """Return the underlying dataset data as a pandas DataFrame."""
160        return self._cache.get_pandas_dataframe(self._stream_name)

Return the underlying dataset data as a pandas DataFrame.

def to_sql_table(self) -> sqlalchemy.sql.schema.Table:
162    def to_sql_table(self) -> Table:
163        """Return the underlying SQL table as a SQLAlchemy Table object."""
164        return self._cache.processor.get_sql_table(self.stream_name)

Return the underlying SQL table as a SQLAlchemy Table object.

Inherited Members
airbyte.datasets._sql.SQLDataset
stream_name
with_filter
airbyte.datasets._base.DatasetBase
to_documents
class DuckDBCache(airbyte._processors.sql.duckdb.DuckDBConfig, airbyte.caches.base.CacheBase):
37class DuckDBCache(DuckDBConfig, CacheBase):
38    """A DuckDB cache."""
39
40    _sql_processor_class: type[DuckDBSqlProcessor] = PrivateAttr(default=DuckDBSqlProcessor)

A DuckDB cache.

Inherited Members
airbyte.caches.base.CacheBase
CacheBase
cache_dir
cleanup
processor
get_record_processor
get_records
get_pandas_dataframe
streams
get_state_provider
get_state_writer
register_source
airbyte._processors.sql.duckdb.DuckDBConfig
db_path
schema_name
get_sql_alchemy_url
get_database_name
get_sql_engine
airbyte._future_cdk.sql_processor.SqlConfig
table_prefix
get_vendor_client
pydantic.main.BaseModel
Config
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
class ReadResult(collections.abc.Mapping[str, airbyte.datasets._sql.CachedDataset]):
19class ReadResult(Mapping[str, CachedDataset]):
20    def __init__(
21        self,
22        processed_records: int,
23        cache: CacheBase,
24        processed_streams: list[str],
25    ) -> None:
26        self.processed_records = processed_records
27        self._cache = cache
28        self._processed_streams = processed_streams
29
30    def __getitem__(self, stream: str) -> CachedDataset:
31        if stream not in self._processed_streams:
32            raise KeyError(stream)
33
34        return CachedDataset(self._cache, stream)
35
36    def __contains__(self, stream: object) -> bool:
37        if not isinstance(stream, str):
38            return False
39
40        return stream in self._processed_streams
41
42    def __iter__(self) -> Iterator[str]:
43        return self._processed_streams.__iter__()
44
45    def __len__(self) -> int:
46        return len(self._processed_streams)
47
48    def get_sql_engine(self) -> Engine:
49        return self._cache.get_sql_engine()
50
51    @property
52    def streams(self) -> Mapping[str, CachedDataset]:
53        return {
54            stream_name: CachedDataset(self._cache, stream_name)
55            for stream_name in self._processed_streams
56        }
57
58    @property
59    def cache(self) -> CacheBase:
60        return self._cache

A Mapping is a generic container for associating key/value pairs.

This class provides concrete generic implementations of all methods except for __getitem__, __iter__, and __len__.

ReadResult( processed_records: int, cache: airbyte.caches.base.CacheBase, processed_streams: list[str])
20    def __init__(
21        self,
22        processed_records: int,
23        cache: CacheBase,
24        processed_streams: list[str],
25    ) -> None:
26        self.processed_records = processed_records
27        self._cache = cache
28        self._processed_streams = processed_streams
processed_records
def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
48    def get_sql_engine(self) -> Engine:
49        return self._cache.get_sql_engine()
streams: collections.abc.Mapping[str, CachedDataset]
51    @property
52    def streams(self) -> Mapping[str, CachedDataset]:
53        return {
54            stream_name: CachedDataset(self._cache, stream_name)
55            for stream_name in self._processed_streams
56        }
cache: airbyte.caches.base.CacheBase
58    @property
59    def cache(self) -> CacheBase:
60        return self._cache
Inherited Members
collections.abc.Mapping
get
keys
items
values
class SecretSourceEnum(builtins.str, enum.Enum):
15class SecretSourceEnum(str, Enum):
16    ENV = "env"
17    DOTENV = "dotenv"
18    GOOGLE_COLAB = "google_colab"
19    GOOGLE_GSM = "google_gsm"  # Not enabled by default
20
21    PROMPT = "prompt"

An enumeration.

ENV = <SecretSourceEnum.ENV: 'env'>
DOTENV = <SecretSourceEnum.DOTENV: 'dotenv'>
GOOGLE_COLAB = <SecretSourceEnum.GOOGLE_COLAB: 'google_colab'>
GOOGLE_GSM = <SecretSourceEnum.GOOGLE_GSM: 'google_gsm'>
PROMPT = <SecretSourceEnum.PROMPT: 'prompt'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Source:
 60class Source:  # noqa: PLR0904  # Ignore max publish methods
 61    """A class representing a source that can be called."""
 62
 63    def __init__(
 64        self,
 65        executor: Executor,
 66        name: str,
 67        config: dict[str, Any] | None = None,
 68        streams: str | list[str] | None = None,
 69        *,
 70        validate: bool = False,
 71    ) -> None:
 72        """Initialize the source.
 73
 74        If config is provided, it will be validated against the spec if validate is True.
 75        """
 76        self.executor = executor
 77        self.name = name
 78        self._processed_records = 0
 79        self._config_dict: dict[str, Any] | None = None
 80        self._last_log_messages: list[str] = []
 81        self._discovered_catalog: AirbyteCatalog | None = None
 82        self._spec: ConnectorSpecification | None = None
 83        self._selected_stream_names: list[str] = []
 84        if config is not None:
 85            self.set_config(config, validate=validate)
 86        if streams is not None:
 87            self.select_streams(streams)
 88
 89        self._deployed_api_root: str | None = None
 90        self._deployed_workspace_id: str | None = None
 91        self._deployed_source_id: str | None = None
 92
 93    def set_streams(self, streams: list[str]) -> None:
 94        """Deprecated. See select_streams()."""
 95        warnings.warn(
 96            "The 'set_streams' method is deprecated and will be removed in a future version. "
 97            "Please use the 'select_streams' method instead.",
 98            DeprecationWarning,
 99            stacklevel=2,
100        )
101        self.select_streams(streams)
102
103    def select_all_streams(self) -> None:
104        """Select all streams.
105
106        This is a more streamlined equivalent to:
107        > source.select_streams(source.get_available_streams()).
108        """
109        self._selected_stream_names = self.get_available_streams()
110
111    def select_streams(self, streams: str | list[str]) -> None:
112        """Select the stream names that should be read from the connector.
113
114        Args:
115        - streams: A list of stream names to select. If set to "*", all streams will be selected.
116
117        Currently, if this is not set, all streams will be read.
118        """
119        if streams == "*":
120            self.select_all_streams()
121            return
122
123        if isinstance(streams, str):
124            # If a single stream is provided, convert it to a one-item list
125            streams = [streams]
126
127        available_streams = self.get_available_streams()
128        for stream in streams:
129            if stream not in available_streams:
130                raise exc.AirbyteStreamNotFoundError(
131                    stream_name=stream,
132                    connector_name=self.name,
133                    available_streams=available_streams,
134                )
135        self._selected_stream_names = streams
136
137    def get_selected_streams(self) -> list[str]:
138        """Get the selected streams.
139
140        If no streams are selected, return an empty list.
141        """
142        return self._selected_stream_names
143
144    def set_config(
145        self,
146        config: dict[str, Any],
147        *,
148        validate: bool = True,
149    ) -> None:
150        """Set the config for the connector.
151
152        If validate is True, raise an exception if the config fails validation.
153
154        If validate is False, validation will be deferred until check() or validate_config()
155        is called.
156        """
157        if validate:
158            self.validate_config(config)
159
160        self._config_dict = config
161
162    def get_config(self) -> dict[str, Any]:
163        """Get the config for the connector."""
164        return self._config
165
166    @property
167    def _config(self) -> dict[str, Any]:
168        if self._config_dict is None:
169            raise exc.AirbyteConnectorConfigurationMissingError(
170                guidance="Provide via get_source() or set_config()"
171            )
172        return self._config_dict
173
174    def _discover(self) -> AirbyteCatalog:
175        """Call discover on the connector.
176
177        This involves the following steps:
178        * Write the config to a temporary file
179        * execute the connector with discover --config <config_file>
180        * Listen to the messages and return the first AirbyteCatalog that comes along.
181        * Make sure the subprocess is killed when the function returns.
182        """
183        with as_temp_files([self._config]) as [config_file]:
184            for msg in self._execute(["discover", "--config", config_file]):
185                if msg.type == Type.CATALOG and msg.catalog:
186                    return msg.catalog
187            raise exc.AirbyteConnectorMissingCatalogError(
188                log_text=self._last_log_messages,
189            )
190
191    def validate_config(self, config: dict[str, Any] | None = None) -> None:
192        """Validate the config against the spec.
193
194        If config is not provided, the already-set config will be validated.
195        """
196        spec = self._get_spec(force_refresh=False)
197        config = self._config if config is None else config
198        try:
199            jsonschema.validate(config, spec.connectionSpecification)
200            log_config_validation_result(
201                name=self.name,
202                state=EventState.SUCCEEDED,
203            )
204        except jsonschema.ValidationError as ex:
205            validation_ex = exc.AirbyteConnectorValidationFailedError(
206                message="The provided config is not valid.",
207                context={
208                    "error_message": ex.message,
209                    "error_path": ex.path,
210                    "error_instance": ex.instance,
211                    "error_schema": ex.schema,
212                },
213            )
214            log_config_validation_result(
215                name=self.name,
216                state=EventState.FAILED,
217                exception=validation_ex,
218            )
219            raise validation_ex from ex
220
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]
224
225    def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
226        """Call spec on the connector.
227
228        This involves the following steps:
229        * execute the connector with spec
230        * Listen to the messages and return the first AirbyteCatalog that comes along.
231        * Make sure the subprocess is killed when the function returns.
232        """
233        if force_refresh or self._spec is None:
234            for msg in self._execute(["spec"]):
235                if msg.type == Type.SPEC and msg.spec:
236                    self._spec = msg.spec
237                    break
238
239        if self._spec:
240            return self._spec
241
242        raise exc.AirbyteConnectorMissingSpecError(
243            log_text=self._last_log_messages,
244        )
245
246    @property
247    def config_spec(self) -> dict[str, Any]:
248        """Generate a configuration spec for this connector, as a JSON Schema definition.
249
250        This function generates a JSON Schema dictionary with configuration specs for the
251        current connector, as a dictionary.
252
253        Returns:
254            dict: The JSON Schema configuration spec as a dictionary.
255        """
256        return self._get_spec(force_refresh=True).connectionSpecification
257
258    def print_config_spec(
259        self,
260        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
261        *,
262        output_file: Path | str | None = None,
263    ) -> None:
264        """Print the configuration spec for this connector.
265
266        Args:
267        - format: The format to print the spec in. Must be "yaml" or "json".
268        - output_file: Optional. If set, the spec will be written to the given file path. Otherwise,
269          it will be printed to the console.
270        """
271        if format not in {"yaml", "json"}:
272            raise exc.PyAirbyteInputError(
273                message="Invalid format. Expected 'yaml' or 'json'",
274                input_value=format,
275            )
276        if isinstance(output_file, str):
277            output_file = Path(output_file)
278
279        if format == "yaml":
280            content = yaml.dump(self.config_spec, indent=2)
281        elif format == "json":
282            content = json.dumps(self.config_spec, indent=2)
283
284        if output_file:
285            output_file.write_text(content)
286            return
287
288        syntax_highlighted = Syntax(content, format)
289        print(syntax_highlighted)
290
291    @property
292    def _yaml_spec(self) -> str:
293        """Get the spec as a yaml string.
294
295        For now, the primary use case is for writing and debugging a valid config for a source.
296
297        This is private for now because we probably want better polish before exposing this
298        as a stable interface. This will also get easier when we have docs links with this info
299        for each connector.
300        """
301        spec_obj: ConnectorSpecification = self._get_spec()
302        spec_dict = spec_obj.dict(exclude_unset=True)
303        # convert to a yaml string
304        return yaml.dump(spec_dict)
305
306    @property
307    def docs_url(self) -> str:
308        """Get the URL to the connector's documentation."""
309        # TODO: Replace with docs URL from metadata when available
310        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
311            "source-", ""
312        )
313
314    @property
315    def discovered_catalog(self) -> AirbyteCatalog:
316        """Get the raw catalog for the given streams.
317
318        If the catalog is not yet known, we call discover to get it.
319        """
320        if self._discovered_catalog is None:
321            self._discovered_catalog = self._discover()
322
323        return self._discovered_catalog
324
325    @property
326    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
327        """Get the configured catalog for the given streams.
328
329        If the raw catalog is not yet known, we call discover to get it.
330
331        If no specific streams are selected, we return a catalog that syncs all available streams.
332
333        TODO: We should consider disabling by default the streams that the connector would
334        disable by default. (For instance, streams that require a premium license are sometimes
335        disabled by default within the connector.)
336        """
337        # Ensure discovered catalog is cached before we start
338        _ = self.discovered_catalog
339
340        # Filter for selected streams if set, otherwise use all available streams:
341        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
342
343        return ConfiguredAirbyteCatalog(
344            streams=[
345                ConfiguredAirbyteStream(
346                    stream=stream,
347                    destination_sync_mode=DestinationSyncMode.overwrite,
348                    primary_key=stream.source_defined_primary_key,
349                    # TODO: The below assumes all sources can coalesce from incremental sync to
350                    # full_table as needed. CDK supports this, so it might be safe:
351                    sync_mode=SyncMode.incremental,
352                )
353                for stream in self.discovered_catalog.streams
354                if stream.name in streams_filter
355            ],
356        )
357
358    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
359        """Return the JSON Schema spec for the specified stream name."""
360        catalog: AirbyteCatalog = self.discovered_catalog
361        found: list[AirbyteStream] = [
362            stream for stream in catalog.streams if stream.name == stream_name
363        ]
364
365        if len(found) == 0:
366            raise exc.PyAirbyteInputError(
367                message="Stream name does not exist in catalog.",
368                input_value=stream_name,
369            )
370
371        if len(found) > 1:
372            raise exc.PyAirbyteInternalError(
373                message="Duplicate streams found with the same name.",
374                context={
375                    "found_streams": found,
376                },
377            )
378
379        return found[0].json_schema
380
381    def get_records(self, stream: str) -> LazyDataset:
382        """Read a stream from the connector.
383
384        This involves the following steps:
385        * Call discover to get the catalog
386        * Generate a configured catalog that syncs the given stream in full_refresh mode
387        * Write the configured catalog and the config to a temporary file
388        * execute the connector with read --config <config_file> --catalog <catalog_file>
389        * Listen to the messages and return the first AirbyteRecordMessages that come along.
390        * Make sure the subprocess is killed when the function returns.
391        """
392        discovered_catalog: AirbyteCatalog = self.discovered_catalog
393        configured_catalog = ConfiguredAirbyteCatalog(
394            streams=[
395                ConfiguredAirbyteStream(
396                    stream=s,
397                    sync_mode=SyncMode.full_refresh,
398                    destination_sync_mode=DestinationSyncMode.overwrite,
399                )
400                for s in discovered_catalog.streams
401                if s.name == stream
402            ],
403        )
404        if len(configured_catalog.streams) == 0:
405            raise exc.PyAirbyteInputError(
406                message="Requested stream does not exist.",
407                context={
408                    "stream": stream,
409                    "available_streams": self.get_available_streams(),
410                    "connector_name": self.name,
411                },
412            ) from KeyError(stream)
413
414        configured_stream = configured_catalog.streams[0]
415        all_properties = cast(
416            list[str], list(configured_stream.stream.json_schema["properties"].keys())
417        )
418
419        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
420            self._log_sync_start(cache=None)
421            yield from records
422            self._log_sync_success(cache=None)
423
424        iterator: Iterator[dict[str, Any]] = _with_logging(
425            records=(  # Generator comprehension yields StreamRecord objects for each record
426                StreamRecord.from_record_message(
427                    record_message=record.record,
428                    expected_keys=all_properties,
429                    prune_extra_fields=True,
430                )
431                for record in self._read_with_catalog(configured_catalog)
432                if record.record
433            )
434        )
435        return LazyDataset(
436            iterator,
437            stream_metadata=configured_stream,
438        )
439
440    @property
441    def connector_version(self) -> str | None:
442        """Return the version of the connector as reported by the executor.
443
444        Returns None if the version cannot be determined.
445        """
446        return self.executor.get_installed_version()
447
448    def get_documents(
449        self,
450        stream: str,
451        title_property: str | None = None,
452        content_properties: list[str] | None = None,
453        metadata_properties: list[str] | None = None,
454        *,
455        render_metadata: bool = False,
456    ) -> Iterable[Document]:
457        """Read a stream from the connector and return the records as documents.
458
459        If metadata_properties is not set, all properties that are not content will be added to
460        the metadata.
461
462        If render_metadata is True, metadata will be rendered in the document, as well as the
463        the main content.
464        """
465        return self.get_records(stream).to_documents(
466            title_property=title_property,
467            content_properties=content_properties,
468            metadata_properties=metadata_properties,
469            render_metadata=render_metadata,
470        )
471
472    def check(self) -> None:
473        """Call check on the connector.
474
475        This involves the following steps:
476        * Write the config to a temporary file
477        * execute the connector with check --config <config_file>
478        * Listen to the messages and return the first AirbyteCatalog that comes along.
479        * Make sure the subprocess is killed when the function returns.
480        """
481        with as_temp_files([self._config]) as [config_file]:
482            try:
483                for msg in self._execute(["check", "--config", config_file]):
484                    if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
485                        if msg.connectionStatus.status != Status.FAILED:
486                            print(f"Connection check succeeded for `{self.name}`.")
487                            log_source_check_result(
488                                name=self.name,
489                                state=EventState.SUCCEEDED,
490                            )
491                            return
492
493                        log_source_check_result(
494                            name=self.name,
495                            state=EventState.FAILED,
496                        )
497                        raise exc.AirbyteConnectorCheckFailedError(
498                            help_url=self.docs_url,
499                            context={
500                                "failure_reason": msg.connectionStatus.message,
501                            },
502                        )
503                raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
504            except exc.AirbyteConnectorReadError as ex:
505                raise exc.AirbyteConnectorCheckFailedError(
506                    message="The connector failed to check the connection.",
507                    log_text=ex.log_text,
508                ) from ex
509
510    def install(self) -> None:
511        """Install the connector if it is not yet installed."""
512        self.executor.install()
513        print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n")
514
515    def uninstall(self) -> None:
516        """Uninstall the connector if it is installed.
517
518        This only works if the use_local_install flag wasn't used and installation is managed by
519        PyAirbyte.
520        """
521        self.executor.uninstall()
522
523    def _read_with_catalog(
524        self,
525        catalog: ConfiguredAirbyteCatalog,
526        state: StateProviderBase | None = None,
527    ) -> Iterator[AirbyteMessage]:
528        """Call read on the connector.
529
530        This involves the following steps:
531        * Write the config to a temporary file
532        * execute the connector with read --config <config_file> --catalog <catalog_file>
533        * Listen to the messages and return the AirbyteRecordMessages that come along.
534        * Send out telemetry on the performed sync (with information about which source was used and
535          the type of the cache)
536        """
537        self._processed_records = 0  # Reset the counter before we start
538        with as_temp_files(
539            [
540                self._config,
541                catalog.json(),
542                state.to_state_input_file_text() if state else "[]",
543            ]
544        ) as [
545            config_file,
546            catalog_file,
547            state_file,
548        ]:
549            yield from self._tally_records(
550                self._execute(
551                    [
552                        "read",
553                        "--config",
554                        config_file,
555                        "--catalog",
556                        catalog_file,
557                        "--state",
558                        state_file,
559                    ],
560                )
561            )
562
563    def _add_to_logs(self, message: str) -> None:
564        self._last_log_messages.append(message)
565        self._last_log_messages = self._last_log_messages[-10:]
566
567    def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]:
568        """Execute the connector with the given arguments.
569
570        This involves the following steps:
571        * Locate the right venv. It is called ".venv-<connector_name>"
572        * Spawn a subprocess with .venv-<connector_name>/bin/<connector-name> <args>
573        * Read the output line by line of the subprocess and serialize them AirbyteMessage objects.
574          Drop if not valid.
575        """
576        # Fail early if the connector is not installed.
577        self.executor.ensure_installation(auto_fix=False)
578
579        try:
580            self._last_log_messages = []
581            for line in self.executor.execute(args):
582                try:
583                    message = AirbyteMessage.parse_raw(line)
584                    if message.type is Type.RECORD:
585                        self._processed_records += 1
586                    if message.type == Type.LOG:
587                        self._add_to_logs(message.log.message)
588                    if message.type == Type.TRACE and message.trace.type == TraceType.ERROR:
589                        self._add_to_logs(message.trace.error.message)
590                    yield message
591                except Exception:
592                    self._add_to_logs(line)
593        except Exception as e:
594            raise exc.AirbyteConnectorReadError(
595                log_text=self._last_log_messages,
596            ) from e
597
598    def _tally_records(
599        self,
600        messages: Iterable[AirbyteMessage],
601    ) -> Generator[AirbyteMessage, Any, None]:
602        """This method simply tallies the number of records processed and yields the messages."""
603        self._processed_records = 0  # Reset the counter before we start
604        progress.reset(len(self._selected_stream_names or []))
605
606        for message in messages:
607            yield message
608            progress.log_records_read(new_total_count=self._processed_records)
609
610    def _log_sync_start(
611        self,
612        *,
613        cache: CacheBase | None,
614    ) -> None:
615        """Log the start of a sync operation."""
616        print(f"Started `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}...")
617        send_telemetry(
618            source=self,
619            cache=cache,
620            state=EventState.STARTED,
621            event_type=EventType.SYNC,
622        )
623
624    def _log_sync_success(
625        self,
626        *,
627        cache: CacheBase | None,
628    ) -> None:
629        """Log the success of a sync operation."""
630        print(f"Completed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.")
631        send_telemetry(
632            source=self,
633            cache=cache,
634            state=EventState.SUCCEEDED,
635            number_of_records=self._processed_records,
636            event_type=EventType.SYNC,
637        )
638
639    def _log_sync_failure(
640        self,
641        *,
642        cache: CacheBase | None,
643        exception: Exception,
644    ) -> None:
645        """Log the failure of a sync operation."""
646        print(f"Failed `{self.name}` read operation at {pendulum.now().format('HH:mm:ss')}.")
647        send_telemetry(
648            state=EventState.FAILED,
649            source=self,
650            cache=cache,
651            number_of_records=self._processed_records,
652            exception=exception,
653            event_type=EventType.SYNC,
654        )
655
656    def read(
657        self,
658        cache: CacheBase | None = None,
659        *,
660        streams: str | list[str] | None = None,
661        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
662        force_full_refresh: bool = False,
663        skip_validation: bool = False,
664    ) -> ReadResult:
665        """Read from the connector and write to the cache.
666
667        Args:
668            cache: The cache to write to. If None, a default cache will be used.
669            write_strategy: The strategy to use when writing to the cache. If a string, it must be
670                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
671                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
672                WriteStrategy.AUTO.
673            streams: Optional if already set. A list of stream names to select for reading. If set
674                to "*", all streams will be selected.
675            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
676                streams will be read in incremental mode if supported by the connector. This option
677                must be True when using the "replace" strategy.
678        """
679        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
680            warnings.warn(
681                message=(
682                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
683                    "could result in data loss. "
684                    "To silence this warning, use the following: "
685                    'warnings.filterwarnings("ignore", '
686                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
687                ),
688                category=PyAirbyteDataLossWarning,
689                stacklevel=1,
690            )
691        if isinstance(write_strategy, str):
692            try:
693                write_strategy = WriteStrategy(write_strategy)
694            except ValueError:
695                raise exc.PyAirbyteInputError(
696                    message="Invalid strategy",
697                    context={
698                        "write_strategy": write_strategy,
699                        "available_strategies": [s.value for s in WriteStrategy],
700                    },
701                ) from None
702
703        if streams:
704            self.select_streams(streams)
705
706        if not self._selected_stream_names:
707            raise exc.PyAirbyteNoStreamsSelectedError(
708                connector_name=self.name,
709                available_streams=self.get_available_streams(),
710            )
711
712        # Run optional validation step
713        if not skip_validation:
714            self.validate_config()
715
716        # Set up cache and related resources
717        if cache is None:
718            cache = get_default_cache()
719
720        # Set up state provider if not in full refresh mode
721        if force_full_refresh:
722            state_provider: StateProviderBase | None = None
723        else:
724            state_provider = cache.get_state_provider(
725                source_name=self.name,
726            )
727
728        self._log_sync_start(cache=cache)
729
730        cache_processor = cache.get_record_processor(
731            source_name=self.name,
732            catalog_provider=CatalogProvider(self.configured_catalog),
733        )
734        try:
735            cache_processor.process_airbyte_messages(
736                self._read_with_catalog(
737                    catalog=self.configured_catalog,
738                    state=state_provider,
739                ),
740                write_strategy=write_strategy,
741            )
742
743        # TODO: We should catch more specific exceptions here
744        except Exception as ex:
745            self._log_sync_failure(cache=cache, exception=ex)
746            raise exc.AirbyteConnectorFailedError(
747                log_text=self._last_log_messages,
748            ) from ex
749
750        self._log_sync_success(cache=cache)
751        return ReadResult(
752            processed_records=self._processed_records,
753            cache=cache,
754            processed_streams=[stream.stream.name for stream in self.configured_catalog.streams],
755        )

A class representing a source that can be called.

Source( executor: airbyte._executor.Executor, name: str, config: dict[str, typing.Any] | None = None, streams: str | list[str] | None = None, *, validate: bool = False)
63    def __init__(
64        self,
65        executor: Executor,
66        name: str,
67        config: dict[str, Any] | None = None,
68        streams: str | list[str] | None = None,
69        *,
70        validate: bool = False,
71    ) -> None:
72        """Initialize the source.
73
74        If config is provided, it will be validated against the spec if validate is True.
75        """
76        self.executor = executor
77        self.name = name
78        self._processed_records = 0
79        self._config_dict: dict[str, Any] | None = None
80        self._last_log_messages: list[str] = []
81        self._discovered_catalog: AirbyteCatalog | None = None
82        self._spec: ConnectorSpecification | None = None
83        self._selected_stream_names: list[str] = []
84        if config is not None:
85            self.set_config(config, validate=validate)
86        if streams is not None:
87            self.select_streams(streams)
88
89        self._deployed_api_root: str | None = None
90        self._deployed_workspace_id: str | None = None
91        self._deployed_source_id: str | None = None

Initialize the source.

If config is provided, it will be validated against the spec if validate is True.

executor
name
def set_streams(self, streams: list[str]) -> None:
 93    def set_streams(self, streams: list[str]) -> None:
 94        """Deprecated. See select_streams()."""
 95        warnings.warn(
 96            "The 'set_streams' method is deprecated and will be removed in a future version. "
 97            "Please use the 'select_streams' method instead.",
 98            DeprecationWarning,
 99            stacklevel=2,
100        )
101        self.select_streams(streams)

Deprecated. See select_streams().

def select_all_streams(self) -> None:
103    def select_all_streams(self) -> None:
104        """Select all streams.
105
106        This is a more streamlined equivalent to:
107        > source.select_streams(source.get_available_streams()).
108        """
109        self._selected_stream_names = self.get_available_streams()

Select all streams.

This is a more streamlined equivalent to:

source.select_streams(source.get_available_streams()).

def select_streams(self, streams: str | list[str]) -> None:
111    def select_streams(self, streams: str | list[str]) -> None:
112        """Select the stream names that should be read from the connector.
113
114        Args:
115        - streams: A list of stream names to select. If set to "*", all streams will be selected.
116
117        Currently, if this is not set, all streams will be read.
118        """
119        if streams == "*":
120            self.select_all_streams()
121            return
122
123        if isinstance(streams, str):
124            # If a single stream is provided, convert it to a one-item list
125            streams = [streams]
126
127        available_streams = self.get_available_streams()
128        for stream in streams:
129            if stream not in available_streams:
130                raise exc.AirbyteStreamNotFoundError(
131                    stream_name=stream,
132                    connector_name=self.name,
133                    available_streams=available_streams,
134                )
135        self._selected_stream_names = streams

Select the stream names that should be read from the connector.

Args:

  • streams: A list of stream names to select. If set to "*", all streams will be selected.

Currently, if this is not set, all streams will be read.

def get_selected_streams(self) -> list[str]:
137    def get_selected_streams(self) -> list[str]:
138        """Get the selected streams.
139
140        If no streams are selected, return an empty list.
141        """
142        return self._selected_stream_names

Get the selected streams.

If no streams are selected, return an empty list.

def set_config(self, config: dict[str, typing.Any], *, validate: bool = True) -> None:
144    def set_config(
145        self,
146        config: dict[str, Any],
147        *,
148        validate: bool = True,
149    ) -> None:
150        """Set the config for the connector.
151
152        If validate is True, raise an exception if the config fails validation.
153
154        If validate is False, validation will be deferred until check() or validate_config()
155        is called.
156        """
157        if validate:
158            self.validate_config(config)
159
160        self._config_dict = config

Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() or validate_config() is called.

def get_config(self) -> dict[str, typing.Any]:
162    def get_config(self) -> dict[str, Any]:
163        """Get the config for the connector."""
164        return self._config

Get the config for the connector.

def validate_config(self, config: dict[str, typing.Any] | None = None) -> None:
191    def validate_config(self, config: dict[str, Any] | None = None) -> None:
192        """Validate the config against the spec.
193
194        If config is not provided, the already-set config will be validated.
195        """
196        spec = self._get_spec(force_refresh=False)
197        config = self._config if config is None else config
198        try:
199            jsonschema.validate(config, spec.connectionSpecification)
200            log_config_validation_result(
201                name=self.name,
202                state=EventState.SUCCEEDED,
203            )
204        except jsonschema.ValidationError as ex:
205            validation_ex = exc.AirbyteConnectorValidationFailedError(
206                message="The provided config is not valid.",
207                context={
208                    "error_message": ex.message,
209                    "error_path": ex.path,
210                    "error_instance": ex.instance,
211                    "error_schema": ex.schema,
212                },
213            )
214            log_config_validation_result(
215                name=self.name,
216                state=EventState.FAILED,
217                exception=validation_ex,
218            )
219            raise validation_ex from ex

Validate the config against the spec.

If config is not provided, the already-set config will be validated.

def get_available_streams(self) -> list[str]:
221    def get_available_streams(self) -> list[str]:
222        """Get the available streams from the spec."""
223        return [s.name for s in self.discovered_catalog.streams]

Get the available streams from the spec.

config_spec: dict[str, typing.Any]
246    @property
247    def config_spec(self) -> dict[str, Any]:
248        """Generate a configuration spec for this connector, as a JSON Schema definition.
249
250        This function generates a JSON Schema dictionary with configuration specs for the
251        current connector, as a dictionary.
252
253        Returns:
254            dict: The JSON Schema configuration spec as a dictionary.
255        """
256        return self._get_spec(force_refresh=True).connectionSpecification

Generate a configuration spec for this connector, as a JSON Schema definition.

This function generates a JSON Schema dictionary with configuration specs for the current connector, as a dictionary.

Returns:

dict: The JSON Schema configuration spec as a dictionary.

def print_config_spec( self, format: Literal['yaml', 'json'] = 'yaml', *, output_file: pathlib.Path | str | None = None) -> None:
258    def print_config_spec(
259        self,
260        format: Literal["yaml", "json"] = "yaml",  # noqa: A002
261        *,
262        output_file: Path | str | None = None,
263    ) -> None:
264        """Print the configuration spec for this connector.
265
266        Args:
267        - format: The format to print the spec in. Must be "yaml" or "json".
268        - output_file: Optional. If set, the spec will be written to the given file path. Otherwise,
269          it will be printed to the console.
270        """
271        if format not in {"yaml", "json"}:
272            raise exc.PyAirbyteInputError(
273                message="Invalid format. Expected 'yaml' or 'json'",
274                input_value=format,
275            )
276        if isinstance(output_file, str):
277            output_file = Path(output_file)
278
279        if format == "yaml":
280            content = yaml.dump(self.config_spec, indent=2)
281        elif format == "json":
282            content = json.dumps(self.config_spec, indent=2)
283
284        if output_file:
285            output_file.write_text(content)
286            return
287
288        syntax_highlighted = Syntax(content, format)
289        print(syntax_highlighted)

Print the configuration spec for this connector.

Args:

  • format: The format to print the spec in. Must be "yaml" or "json".
  • output_file: Optional. If set, the spec will be written to the given file path. Otherwise, it will be printed to the console.
docs_url: str
306    @property
307    def docs_url(self) -> str:
308        """Get the URL to the connector's documentation."""
309        # TODO: Replace with docs URL from metadata when available
310        return "https://docs.airbyte.com/integrations/sources/" + self.name.lower().replace(
311            "source-", ""
312        )

Get the URL to the connector's documentation.

discovered_catalog: airbyte_protocol.models.airbyte_protocol.AirbyteCatalog
314    @property
315    def discovered_catalog(self) -> AirbyteCatalog:
316        """Get the raw catalog for the given streams.
317
318        If the catalog is not yet known, we call discover to get it.
319        """
320        if self._discovered_catalog is None:
321            self._discovered_catalog = self._discover()
322
323        return self._discovered_catalog

Get the raw catalog for the given streams.

If the catalog is not yet known, we call discover to get it.

configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog
325    @property
326    def configured_catalog(self) -> ConfiguredAirbyteCatalog:
327        """Get the configured catalog for the given streams.
328
329        If the raw catalog is not yet known, we call discover to get it.
330
331        If no specific streams are selected, we return a catalog that syncs all available streams.
332
333        TODO: We should consider disabling by default the streams that the connector would
334        disable by default. (For instance, streams that require a premium license are sometimes
335        disabled by default within the connector.)
336        """
337        # Ensure discovered catalog is cached before we start
338        _ = self.discovered_catalog
339
340        # Filter for selected streams if set, otherwise use all available streams:
341        streams_filter: list[str] = self._selected_stream_names or self.get_available_streams()
342
343        return ConfiguredAirbyteCatalog(
344            streams=[
345                ConfiguredAirbyteStream(
346                    stream=stream,
347                    destination_sync_mode=DestinationSyncMode.overwrite,
348                    primary_key=stream.source_defined_primary_key,
349                    # TODO: The below assumes all sources can coalesce from incremental sync to
350                    # full_table as needed. CDK supports this, so it might be safe:
351                    sync_mode=SyncMode.incremental,
352                )
353                for stream in self.discovered_catalog.streams
354                if stream.name in streams_filter
355            ],
356        )

Get the configured catalog for the given streams.

If the raw catalog is not yet known, we call discover to get it.

If no specific streams are selected, we return a catalog that syncs all available streams.

TODO: We should consider disabling by default the streams that the connector would disable by default. (For instance, streams that require a premium license are sometimes disabled by default within the connector.)

def get_stream_json_schema(self, stream_name: str) -> dict[str, typing.Any]:
358    def get_stream_json_schema(self, stream_name: str) -> dict[str, Any]:
359        """Return the JSON Schema spec for the specified stream name."""
360        catalog: AirbyteCatalog = self.discovered_catalog
361        found: list[AirbyteStream] = [
362            stream for stream in catalog.streams if stream.name == stream_name
363        ]
364
365        if len(found) == 0:
366            raise exc.PyAirbyteInputError(
367                message="Stream name does not exist in catalog.",
368                input_value=stream_name,
369            )
370
371        if len(found) > 1:
372            raise exc.PyAirbyteInternalError(
373                message="Duplicate streams found with the same name.",
374                context={
375                    "found_streams": found,
376                },
377            )
378
379        return found[0].json_schema

Return the JSON Schema spec for the specified stream name.

def get_records(self, stream: str) -> airbyte.datasets._lazy.LazyDataset:
381    def get_records(self, stream: str) -> LazyDataset:
382        """Read a stream from the connector.
383
384        This involves the following steps:
385        * Call discover to get the catalog
386        * Generate a configured catalog that syncs the given stream in full_refresh mode
387        * Write the configured catalog and the config to a temporary file
388        * execute the connector with read --config <config_file> --catalog <catalog_file>
389        * Listen to the messages and return the first AirbyteRecordMessages that come along.
390        * Make sure the subprocess is killed when the function returns.
391        """
392        discovered_catalog: AirbyteCatalog = self.discovered_catalog
393        configured_catalog = ConfiguredAirbyteCatalog(
394            streams=[
395                ConfiguredAirbyteStream(
396                    stream=s,
397                    sync_mode=SyncMode.full_refresh,
398                    destination_sync_mode=DestinationSyncMode.overwrite,
399                )
400                for s in discovered_catalog.streams
401                if s.name == stream
402            ],
403        )
404        if len(configured_catalog.streams) == 0:
405            raise exc.PyAirbyteInputError(
406                message="Requested stream does not exist.",
407                context={
408                    "stream": stream,
409                    "available_streams": self.get_available_streams(),
410                    "connector_name": self.name,
411                },
412            ) from KeyError(stream)
413
414        configured_stream = configured_catalog.streams[0]
415        all_properties = cast(
416            list[str], list(configured_stream.stream.json_schema["properties"].keys())
417        )
418
419        def _with_logging(records: Iterable[dict[str, Any]]) -> Iterator[dict[str, Any]]:
420            self._log_sync_start(cache=None)
421            yield from records
422            self._log_sync_success(cache=None)
423
424        iterator: Iterator[dict[str, Any]] = _with_logging(
425            records=(  # Generator comprehension yields StreamRecord objects for each record
426                StreamRecord.from_record_message(
427                    record_message=record.record,
428                    expected_keys=all_properties,
429                    prune_extra_fields=True,
430                )
431                for record in self._read_with_catalog(configured_catalog)
432                if record.record
433            )
434        )
435        return LazyDataset(
436            iterator,
437            stream_metadata=configured_stream,
438        )

Read a stream from the connector.

This involves the following steps:

  • Call discover to get the catalog
  • Generate a configured catalog that syncs the given stream in full_refresh mode
  • Write the configured catalog and the config to a temporary file
  • execute the connector with read --config --catalog
  • Listen to the messages and return the first AirbyteRecordMessages that come along.
  • Make sure the subprocess is killed when the function returns.
connector_version: str | None
440    @property
441    def connector_version(self) -> str | None:
442        """Return the version of the connector as reported by the executor.
443
444        Returns None if the version cannot be determined.
445        """
446        return self.executor.get_installed_version()

Return the version of the connector as reported by the executor.

Returns None if the version cannot be determined.

def get_documents( self, stream: str, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> collections.abc.Iterable[airbyte.documents.Document]:
448    def get_documents(
449        self,
450        stream: str,
451        title_property: str | None = None,
452        content_properties: list[str] | None = None,
453        metadata_properties: list[str] | None = None,
454        *,
455        render_metadata: bool = False,
456    ) -> Iterable[Document]:
457        """Read a stream from the connector and return the records as documents.
458
459        If metadata_properties is not set, all properties that are not content will be added to
460        the metadata.
461
462        If render_metadata is True, metadata will be rendered in the document, as well as the
463        the main content.
464        """
465        return self.get_records(stream).to_documents(
466            title_property=title_property,
467            content_properties=content_properties,
468            metadata_properties=metadata_properties,
469            render_metadata=render_metadata,
470        )

Read a stream from the connector and return the records as documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content.

def check(self) -> None:
472    def check(self) -> None:
473        """Call check on the connector.
474
475        This involves the following steps:
476        * Write the config to a temporary file
477        * execute the connector with check --config <config_file>
478        * Listen to the messages and return the first AirbyteCatalog that comes along.
479        * Make sure the subprocess is killed when the function returns.
480        """
481        with as_temp_files([self._config]) as [config_file]:
482            try:
483                for msg in self._execute(["check", "--config", config_file]):
484                    if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
485                        if msg.connectionStatus.status != Status.FAILED:
486                            print(f"Connection check succeeded for `{self.name}`.")
487                            log_source_check_result(
488                                name=self.name,
489                                state=EventState.SUCCEEDED,
490                            )
491                            return
492
493                        log_source_check_result(
494                            name=self.name,
495                            state=EventState.FAILED,
496                        )
497                        raise exc.AirbyteConnectorCheckFailedError(
498                            help_url=self.docs_url,
499                            context={
500                                "failure_reason": msg.connectionStatus.message,
501                            },
502                        )
503                raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
504            except exc.AirbyteConnectorReadError as ex:
505                raise exc.AirbyteConnectorCheckFailedError(
506                    message="The connector failed to check the connection.",
507                    log_text=ex.log_text,
508                ) from ex

Call check on the connector.

This involves the following steps:

  • Write the config to a temporary file
  • execute the connector with check --config
  • Listen to the messages and return the first AirbyteCatalog that comes along.
  • Make sure the subprocess is killed when the function returns.
def install(self) -> None:
510    def install(self) -> None:
511        """Install the connector if it is not yet installed."""
512        self.executor.install()
513        print("For configuration instructions, see: \n" f"{self.docs_url}#reference\n")

Install the connector if it is not yet installed.

def uninstall(self) -> None:
515    def uninstall(self) -> None:
516        """Uninstall the connector if it is installed.
517
518        This only works if the use_local_install flag wasn't used and installation is managed by
519        PyAirbyte.
520        """
521        self.executor.uninstall()

Uninstall the connector if it is installed.

This only works if the use_local_install flag wasn't used and installation is managed by PyAirbyte.

def read( self, cache: airbyte.caches.base.CacheBase | None = None, *, streams: str | list[str] | None = None, write_strategy: str | airbyte.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False, skip_validation: bool = False) -> ReadResult:
656    def read(
657        self,
658        cache: CacheBase | None = None,
659        *,
660        streams: str | list[str] | None = None,
661        write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
662        force_full_refresh: bool = False,
663        skip_validation: bool = False,
664    ) -> ReadResult:
665        """Read from the connector and write to the cache.
666
667        Args:
668            cache: The cache to write to. If None, a default cache will be used.
669            write_strategy: The strategy to use when writing to the cache. If a string, it must be
670                one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one
671                of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or
672                WriteStrategy.AUTO.
673            streams: Optional if already set. A list of stream names to select for reading. If set
674                to "*", all streams will be selected.
675            force_full_refresh: If True, the source will operate in full refresh mode. Otherwise,
676                streams will be read in incremental mode if supported by the connector. This option
677                must be True when using the "replace" strategy.
678        """
679        if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
680            warnings.warn(
681                message=(
682                    "Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
683                    "could result in data loss. "
684                    "To silence this warning, use the following: "
685                    'warnings.filterwarnings("ignore", '
686                    'category="airbyte.warnings.PyAirbyteDataLossWarning")`'
687                ),
688                category=PyAirbyteDataLossWarning,
689                stacklevel=1,
690            )
691        if isinstance(write_strategy, str):
692            try:
693                write_strategy = WriteStrategy(write_strategy)
694            except ValueError:
695                raise exc.PyAirbyteInputError(
696                    message="Invalid strategy",
697                    context={
698                        "write_strategy": write_strategy,
699                        "available_strategies": [s.value for s in WriteStrategy],
700                    },
701                ) from None
702
703        if streams:
704            self.select_streams(streams)
705
706        if not self._selected_stream_names:
707            raise exc.PyAirbyteNoStreamsSelectedError(
708                connector_name=self.name,
709                available_streams=self.get_available_streams(),
710            )
711
712        # Run optional validation step
713        if not skip_validation:
714            self.validate_config()
715
716        # Set up cache and related resources
717        if cache is None:
718            cache = get_default_cache()
719
720        # Set up state provider if not in full refresh mode
721        if force_full_refresh:
722            state_provider: StateProviderBase | None = None
723        else:
724            state_provider = cache.get_state_provider(
725                source_name=self.name,
726            )
727
728        self._log_sync_start(cache=cache)
729
730        cache_processor = cache.get_record_processor(
731            source_name=self.name,
732            catalog_provider=CatalogProvider(self.configured_catalog),
733        )
734        try:
735            cache_processor.process_airbyte_messages(
736                self._read_with_catalog(
737                    catalog=self.configured_catalog,
738                    state=state_provider,
739                ),
740                write_strategy=write_strategy,
741            )
742
743        # TODO: We should catch more specific exceptions here
744        except Exception as ex:
745            self._log_sync_failure(cache=cache, exception=ex)
746            raise exc.AirbyteConnectorFailedError(
747                log_text=self._last_log_messages,
748            ) from ex
749
750        self._log_sync_success(cache=cache)
751        return ReadResult(
752            processed_records=self._processed_records,
753            cache=cache,
754            processed_streams=[stream.stream.name for stream in self.configured_catalog.streams],
755        )

Read from the connector and write to the cache.

Arguments:
  • cache: The cache to write to. If None, a default cache will be used.
  • write_strategy: The strategy to use when writing to the cache. If a string, it must be one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO.
  • streams: Optional if already set. A list of stream names to select for reading. If set to "*", all streams will be selected.
  • force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy.
class StreamRecord(dict[str, typing.Any]):
 93class StreamRecord(dict[str, Any]):
 94    """The StreamRecord class is a case-aware, case-insensitive dictionary implementation.
 95
 96    It has these behaviors:
 97    - When a key is retrieved, deleted, or checked for existence, it is always checked in a
 98      case-insensitive manner.
 99    - The original case is stored in a separate dictionary, so that the original case can be
100      retrieved when needed.
101    - Because it is subclassed from `dict`, the `StreamRecord` class can be passed as a normal
102      Python dictionary.
103    - In addition to the properties of the stream's records, the dictionary also stores the Airbyte
104      metadata columns: `_airbyte_raw_id`, `_airbyte_extracted_at`, and `_airbyte_meta`.
105
106    This behavior mirrors how a case-aware, case-insensitive SQL database would handle column
107    references.
108
109    There are two ways this class can store keys internally:
110    - If normalize_keys is True, the keys are normalized using the given normalizer.
111    - If normalize_keys is False, the original case of the keys is stored.
112
113    In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the
114    dictionary will be initialized with the given keys. If a key is not found in the input data, it
115    will be initialized with a value of None. When provided, the 'expected_keys' input will also
116    determine the original case of the keys.
117    """
118
119    def _display_case(self, key: str) -> str:
120        """Return the original case of the key."""
121        return self._pretty_case_keys[self._normalizer.normalize(key)]
122
123    def _index_case(self, key: str) -> str:
124        """Return the internal case of the key.
125
126        If normalize_keys is True, return the normalized key.
127        Otherwise, return the original case of the key.
128        """
129        if self._normalize_keys:
130            return self._normalizer.normalize(key)
131
132        return self._display_case(key)
133
134    @classmethod
135    def from_record_message(
136        cls,
137        record_message: AirbyteRecordMessage,
138        *,
139        prune_extra_fields: bool,
140        normalize_keys: bool = True,
141        normalizer: type[NameNormalizerBase] | None = None,
142        expected_keys: list[str] | None = None,
143    ) -> StreamRecord:
144        """Return a StreamRecord from a RecordMessage."""
145        data_dict: dict[str, Any] = record_message.data.copy()
146        data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID())
147        data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp(
148            record_message.emitted_at / 1000, tz=pytz.utc
149        )
150        data_dict[AB_META_COLUMN] = {}
151
152        return cls(
153            from_dict=data_dict,
154            prune_extra_fields=prune_extra_fields,
155            normalize_keys=normalize_keys,
156            normalizer=normalizer,
157            expected_keys=expected_keys,
158        )
159
160    def __init__(
161        self,
162        from_dict: dict,
163        *,
164        prune_extra_fields: bool,
165        normalize_keys: bool = True,
166        normalizer: type[NameNormalizerBase] | None = None,
167        expected_keys: list[str] | None = None,
168    ) -> None:
169        """Initialize the dictionary with the given data.
170
171        Args:
172            from_dict: The dictionary to initialize the StreamRecord with.
173            prune_extra_fields: If `True`, unexpected fields will be removed.
174            normalize_keys: If `True`, the keys will be normalized using the given normalizer.
175            normalizer: The normalizer to use when normalizing keys. If not provided, the
176                LowerCaseNormalizer will be used.
177            expected_keys: If provided and `prune_extra_fields` is True, then unexpected fields
178                will be removed. This option is ignored if `expected_keys` is not provided.
179        """
180        # If no normalizer is provided, use LowerCaseNormalizer.
181        self._normalize_keys = normalize_keys
182        self._normalizer: type[NameNormalizerBase] = normalizer or LowerCaseNormalizer
183
184        # If no expected keys are provided, use all keys from the input dictionary.
185        if not expected_keys:
186            expected_keys = list(from_dict.keys())
187            prune_extra_fields = False  # No expected keys provided.
188        else:
189            expected_keys = list(expected_keys)
190
191        for internal_col in AB_INTERNAL_COLUMNS:
192            if internal_col not in expected_keys:
193                expected_keys.append(internal_col)
194
195        # Store a lookup from normalized keys to pretty cased (originally cased) keys.
196        self._pretty_case_keys: dict[str, str] = {
197            self._normalizer.normalize(pretty_case.lower()): pretty_case
198            for pretty_case in expected_keys
199        }
200
201        if normalize_keys:
202            index_keys = [self._normalizer.normalize(key) for key in expected_keys]
203        else:
204            index_keys = expected_keys
205
206        self.update(dict.fromkeys(index_keys))  # Start by initializing all values to None
207        for k, v in from_dict.items():
208            index_cased_key = self._index_case(k)
209            if prune_extra_fields and index_cased_key not in index_keys:
210                # Dropping undeclared field
211                continue
212
213            self[index_cased_key] = v
214
215    def __getitem__(self, key: str) -> Any:  # noqa: ANN401
216        if super().__contains__(key):
217            return super().__getitem__(key)
218
219        if super().__contains__(self._index_case(key)):
220            return super().__getitem__(self._index_case(key))
221
222        raise KeyError(key)
223
224    def __setitem__(self, key: str, value: Any) -> None:  # noqa: ANN401
225        if super().__contains__(key):
226            super().__setitem__(key, value)
227            return
228
229        if super().__contains__(self._index_case(key)):
230            super().__setitem__(self._index_case(key), value)
231            return
232
233        # Store the pretty cased (originally cased) key:
234        self._pretty_case_keys[self._normalizer.normalize(key)] = key
235
236        # Store the data with the normalized key:
237        super().__setitem__(self._index_case(key), value)
238
239    def __delitem__(self, key: str) -> None:
240        if super().__contains__(key):
241            super().__delitem__(key)
242            return
243
244        if super().__contains__(self._index_case(key)):
245            super().__delitem__(self._index_case(key))
246            return
247
248        raise KeyError(key)
249
250    def __contains__(self, key: object) -> bool:
251        assert isinstance(key, str), "Key must be a string."
252        return super().__contains__(key) or super().__contains__(self._index_case(key))
253
254    def __iter__(self) -> Any:  # noqa: ANN401
255        return iter(super().__iter__())
256
257    def __len__(self) -> int:
258        return super().__len__()
259
260    def __eq__(self, other: object) -> bool:
261        if isinstance(other, StreamRecord):
262            return dict(self) == dict(other)
263
264        if isinstance(other, dict):
265            return {k.lower(): v for k, v in self.items()} == {
266                k.lower(): v for k, v in other.items()
267            }
268        return False
269
270    def __hash__(self) -> int:  # type: ignore [override]  # Doesn't match superclass (dict)
271        """Return the hash of the dictionary with keys sorted."""
272        items = [(k, v) for k, v in self.items() if not isinstance(v, dict)]
273        return hash(tuple(sorted(items)))

The StreamRecord class is a case-aware, case-insensitive dictionary implementation.

It has these behaviors:

  • When a key is retrieved, deleted, or checked for existence, it is always checked in a case-insensitive manner.
  • The original case is stored in a separate dictionary, so that the original case can be retrieved when needed.
  • Because it is subclassed from dict, the StreamRecord class can be passed as a normal Python dictionary.
  • In addition to the properties of the stream's records, the dictionary also stores the Airbyte metadata columns: _airbyte_raw_id, _airbyte_extracted_at, and _airbyte_meta.

This behavior mirrors how a case-aware, case-insensitive SQL database would handle column references.

There are two ways this class can store keys internally:

  • If normalize_keys is True, the keys are normalized using the given normalizer.
  • If normalize_keys is False, the original case of the keys is stored.

In regards to missing values, the dictionary accepts an 'expected_keys' input. When set, the dictionary will be initialized with the given keys. If a key is not found in the input data, it will be initialized with a value of None. When provided, the 'expected_keys' input will also determine the original case of the keys.

@classmethod
def from_record_message( cls, record_message: airbyte_protocol.models.airbyte_protocol.AirbyteRecordMessage, *, prune_extra_fields: bool, normalize_keys: bool = True, normalizer: type[airbyte._util.name_normalizers.NameNormalizerBase] | None = None, expected_keys: list[str] | None = None) -> StreamRecord:
134    @classmethod
135    def from_record_message(
136        cls,
137        record_message: AirbyteRecordMessage,
138        *,
139        prune_extra_fields: bool,
140        normalize_keys: bool = True,
141        normalizer: type[NameNormalizerBase] | None = None,
142        expected_keys: list[str] | None = None,
143    ) -> StreamRecord:
144        """Return a StreamRecord from a RecordMessage."""
145        data_dict: dict[str, Any] = record_message.data.copy()
146        data_dict[AB_RAW_ID_COLUMN] = str(ulid.ULID())
147        data_dict[AB_EXTRACTED_AT_COLUMN] = datetime.fromtimestamp(
148            record_message.emitted_at / 1000, tz=pytz.utc
149        )
150        data_dict[AB_META_COLUMN] = {}
151
152        return cls(
153            from_dict=data_dict,
154            prune_extra_fields=prune_extra_fields,
155            normalize_keys=normalize_keys,
156            normalizer=normalizer,
157            expected_keys=expected_keys,
158        )

Return a StreamRecord from a RecordMessage.

Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy