airbyte.datasets

PyAirbyte dataset classes.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte dataset classes."""
 3
 4from __future__ import annotations
 5
 6from airbyte.datasets._base import DatasetBase
 7from airbyte.datasets._lazy import LazyDataset
 8from airbyte.datasets._map import DatasetMap
 9from airbyte.datasets._sql import CachedDataset, SQLDataset
10
11
12__all__ = [
13    "CachedDataset",
14    "DatasetBase",
15    "DatasetMap",
16    "LazyDataset",
17    "SQLDataset",
18]
class CachedDataset(airbyte.datasets.SQLDataset):
132class CachedDataset(SQLDataset):
133    """A dataset backed by a SQL table cache.
134
135    Because this dataset includes all records from the underlying table, we also expose the
136    underlying table as a SQLAlchemy Table object.
137    """
138
139    def __init__(
140        self,
141        cache: CacheBase,
142        stream_name: str,
143    ) -> None:
144        """We construct the query statement by selecting all columns from the table.
145
146        This prevents the need to scan the table schema to construct the query statement.
147        """
148        table_name = cache.processor.get_sql_table_name(stream_name)
149        schema_name = cache.schema_name
150        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
151        super().__init__(
152            cache=cache,
153            stream_name=stream_name,
154            query_statement=query,
155        )
156
157    @overrides
158    def to_pandas(self) -> DataFrame:
159        """Return the underlying dataset data as a pandas DataFrame."""
160        return self._cache.get_pandas_dataframe(self._stream_name)
161
162    def to_sql_table(self) -> Table:
163        """Return the underlying SQL table as a SQLAlchemy Table object."""
164        return self._cache.processor.get_sql_table(self.stream_name)
165
166    def __eq__(self, value: object) -> bool:
167        """Return True if the value is a CachedDataset with the same cache and stream name.
168
169        In the case of CachedDataset objects, we can simply compare the cache and stream name.
170
171        Note that this equality check is only supported on CachedDataset objects and not for
172        the base SQLDataset implementation. This is because of the complexity and computational
173        cost of comparing two arbitrary SQL queries that could be bound to different variables,
174        as well as the chance that two queries can be syntactically equivalent without being
175        text-wise equivalent.
176        """
177        if not isinstance(value, SQLDataset):
178            return False
179
180        if self._cache is not value._cache:
181            return False
182
183        return not self._stream_name != value._stream_name
184
185    def __hash__(self) -> int:
186        return hash(self._stream_name)

A dataset backed by a SQL table cache.

Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.

CachedDataset(cache: airbyte.caches.base.CacheBase, stream_name: str)
139    def __init__(
140        self,
141        cache: CacheBase,
142        stream_name: str,
143    ) -> None:
144        """We construct the query statement by selecting all columns from the table.
145
146        This prevents the need to scan the table schema to construct the query statement.
147        """
148        table_name = cache.processor.get_sql_table_name(stream_name)
149        schema_name = cache.schema_name
150        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
151        super().__init__(
152            cache=cache,
153            stream_name=stream_name,
154            query_statement=query,
155        )

We construct the query statement by selecting all columns from the table.

This prevents the need to scan the table schema to construct the query statement.

@overrides
def to_pandas(self) -> pandas.core.frame.DataFrame:
157    @overrides
158    def to_pandas(self) -> DataFrame:
159        """Return the underlying dataset data as a pandas DataFrame."""
160        return self._cache.get_pandas_dataframe(self._stream_name)

Return the underlying dataset data as a pandas DataFrame.

def to_sql_table(self) -> sqlalchemy.sql.schema.Table:
162    def to_sql_table(self) -> Table:
163        """Return the underlying SQL table as a SQLAlchemy Table object."""
164        return self._cache.processor.get_sql_table(self.stream_name)

Return the underlying SQL table as a SQLAlchemy Table object.

class DatasetBase(abc.ABC):
20class DatasetBase(ABC):
21    """Base implementation for all datasets."""
22
23    def __init__(self, stream_metadata: ConfiguredAirbyteStream) -> None:
24        self._stream_metadata = stream_metadata
25
26    @abstractmethod
27    def __iter__(self) -> Iterator[dict[str, Any]]:
28        """Return the iterator of records."""
29        raise NotImplementedError
30
31    def to_pandas(self) -> DataFrame:
32        """Return a pandas DataFrame representation of the dataset.
33
34        The base implementation simply passes the record iterator to Panda's DataFrame constructor.
35        """
36        # Technically, we return an iterator of Mapping objects. However, pandas
37        # expects an iterator of dict objects. This cast is safe because we know
38        # duck typing is correct for this use case.
39        return DataFrame(cast(Iterator[dict[str, Any]], self))
40
41    def to_documents(
42        self,
43        title_property: str | None = None,
44        content_properties: list[str] | None = None,
45        metadata_properties: list[str] | None = None,
46        *,
47        render_metadata: bool = False,
48    ) -> Iterable[Document]:
49        """Return the iterator of documents.
50
51        If metadata_properties is not set, all properties that are not content will be added to
52        the metadata.
53
54        If render_metadata is True, metadata will be rendered in the document, as well as the
55        the main content. Otherwise, metadata will be attached to the document but not rendered.
56        """
57        renderer = DocumentRenderer(
58            title_property=title_property,
59            content_properties=content_properties,
60            metadata_properties=metadata_properties,
61            render_metadata=render_metadata,
62        )
63        yield from renderer.render_documents(self)

Base implementation for all datasets.

def to_pandas(self) -> pandas.core.frame.DataFrame:
31    def to_pandas(self) -> DataFrame:
32        """Return a pandas DataFrame representation of the dataset.
33
34        The base implementation simply passes the record iterator to Panda's DataFrame constructor.
35        """
36        # Technically, we return an iterator of Mapping objects. However, pandas
37        # expects an iterator of dict objects. This cast is safe because we know
38        # duck typing is correct for this use case.
39        return DataFrame(cast(Iterator[dict[str, Any]], self))

Return a pandas DataFrame representation of the dataset.

The base implementation simply passes the record iterator to Panda's DataFrame constructor.

def to_documents( self, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> collections.abc.Iterable[airbyte.documents.Document]:
41    def to_documents(
42        self,
43        title_property: str | None = None,
44        content_properties: list[str] | None = None,
45        metadata_properties: list[str] | None = None,
46        *,
47        render_metadata: bool = False,
48    ) -> Iterable[Document]:
49        """Return the iterator of documents.
50
51        If metadata_properties is not set, all properties that are not content will be added to
52        the metadata.
53
54        If render_metadata is True, metadata will be rendered in the document, as well as the
55        the main content. Otherwise, metadata will be attached to the document but not rendered.
56        """
57        renderer = DocumentRenderer(
58            title_property=title_property,
59            content_properties=content_properties,
60            metadata_properties=metadata_properties,
61            render_metadata=render_metadata,
62        )
63        yield from renderer.render_documents(self)

Return the iterator of documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content. Otherwise, metadata will be attached to the document but not rendered.

class DatasetMap(collections.abc.Mapping):
20class DatasetMap(Mapping):
21    """A generic interface for a set of streams or datasets."""
22
23    def __init__(self) -> None:
24        self._datasets: dict[str, DatasetBase] = {}
25
26    def __getitem__(self, key: str) -> DatasetBase:
27        return self._datasets[key]
28
29    def __iter__(self) -> Iterator[str]:
30        return iter(self._datasets)
31
32    def __len__(self) -> int:
33        return len(self._datasets)

A generic interface for a set of streams or datasets.

Inherited Members
collections.abc.Mapping
get
keys
items
values
class LazyDataset(airbyte.datasets.DatasetBase):
18class LazyDataset(DatasetBase):
19    """A dataset that is loaded incrementally from a source or a SQL query."""
20
21    def __init__(
22        self,
23        iterator: Iterator[dict[str, Any]],
24        stream_metadata: ConfiguredAirbyteStream,
25    ) -> None:
26        self._iterator: Iterator[dict[str, Any]] = iterator
27        super().__init__(
28            stream_metadata=stream_metadata,
29        )
30
31    @overrides
32    def __iter__(self) -> Iterator[dict[str, Any]]:
33        return self._iterator
34
35    def __next__(self) -> Mapping[str, Any]:
36        return next(self._iterator)

A dataset that is loaded incrementally from a source or a SQL query.

LazyDataset( iterator: collections.abc.Iterator[dict[str, typing.Any]], stream_metadata: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream)
21    def __init__(
22        self,
23        iterator: Iterator[dict[str, Any]],
24        stream_metadata: ConfiguredAirbyteStream,
25    ) -> None:
26        self._iterator: Iterator[dict[str, Any]] = iterator
27        super().__init__(
28            stream_metadata=stream_metadata,
29        )
Inherited Members
DatasetBase
to_pandas
to_documents
class SQLDataset(airbyte.datasets.DatasetBase):
 32class SQLDataset(DatasetBase):
 33    """A dataset that is loaded incrementally from a SQL query.
 34
 35    The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full
 36    table as the query statement.
 37    """
 38
 39    def __init__(
 40        self,
 41        cache: CacheBase,
 42        stream_name: str,
 43        query_statement: Selectable,
 44        stream_configuration: ConfiguredAirbyteStream | None | Literal[False] = None,
 45    ) -> None:
 46        """Initialize the dataset with a cache, stream name, and query statement.
 47
 48        This class is not intended to be created directly. Instead, you can retrieve
 49        datasets from caches or Cloud connection objects, etc.
 50
 51        The query statement should be a SQLAlchemy Selectable object that can be executed to
 52        retrieve records from the dataset.
 53
 54        If stream_configuration is not provided, we attempt to retrieve the stream configuration
 55        from the cache processor. This is useful when constructing a dataset from a CachedDataset
 56        object, which already has the stream configuration.
 57
 58        If stream_configuration is set to False, we skip the stream configuration retrieval.
 59        """
 60        self._length: int | None = None
 61        self._cache: CacheBase = cache
 62        self._stream_name: str = stream_name
 63        self._query_statement: Selectable = query_statement
 64        if stream_configuration is None:
 65            try:
 66                stream_configuration = cache.processor.catalog_provider.get_configured_stream_info(
 67                    stream_name=stream_name
 68                )
 69            except Exception as ex:
 70                warnings.warn(
 71                    f"Failed to get stream configuration for {stream_name}: {ex}",
 72                    stacklevel=2,
 73                )
 74
 75            # Coalesce False to None
 76            stream_configuration = stream_configuration or None
 77
 78        super().__init__(stream_metadata=stream_configuration)
 79
 80    @property
 81    def stream_name(self) -> str:
 82        return self._stream_name
 83
 84    def __iter__(self) -> Iterator[dict[str, Any]]:
 85        with self._cache.processor.get_sql_connection() as conn:
 86            for row in conn.execute(self._query_statement):
 87                # Access to private member required because SQLAlchemy doesn't expose a public API.
 88                # https://pydoc.dev/sqlalchemy/latest/sqlalchemy.engine.row.RowMapping.html
 89                yield cast(dict[str, Any], row._mapping)  # noqa: SLF001
 90
 91    def __len__(self) -> int:
 92        """Return the number of records in the dataset.
 93
 94        This method caches the length of the dataset after the first call.
 95        """
 96        if self._length is None:
 97            count_query = select([func.count()]).select_from(self._query_statement.alias())
 98            with self._cache.processor.get_sql_connection() as conn:
 99                self._length = conn.execute(count_query).scalar()
100
101        return self._length
102
103    def to_pandas(self) -> DataFrame:
104        return self._cache.get_pandas_dataframe(self._stream_name)
105
106    def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset:
107        """Filter the dataset by a set of column values.
108
109        Filters can be specified as either a string or a SQLAlchemy expression.
110
111        Filters are lazily applied to the dataset, so they can be chained together. For example:
112
113                dataset.with_filter("id > 5").with_filter("id < 10")
114
115        is equivalent to:
116
117                dataset.with_filter("id > 5", "id < 10")
118        """
119        # Convert all strings to TextClause objects.
120        filters: list[ClauseElement] = [
121            text(expression) if isinstance(expression, str) else expression
122            for expression in filter_expressions
123        ]
124        filtered_select = self._query_statement.where(and_(*filters))
125        return SQLDataset(
126            cache=self._cache,
127            stream_name=self._stream_name,
128            query_statement=filtered_select,
129        )

A dataset that is loaded incrementally from a SQL query.

The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full table as the query statement.

SQLDataset( cache: airbyte.caches.base.CacheBase, stream_name: str, query_statement: sqlalchemy.sql.selectable.Selectable, stream_configuration: Union[airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream, NoneType, Literal[False]] = None)
39    def __init__(
40        self,
41        cache: CacheBase,
42        stream_name: str,
43        query_statement: Selectable,
44        stream_configuration: ConfiguredAirbyteStream | None | Literal[False] = None,
45    ) -> None:
46        """Initialize the dataset with a cache, stream name, and query statement.
47
48        This class is not intended to be created directly. Instead, you can retrieve
49        datasets from caches or Cloud connection objects, etc.
50
51        The query statement should be a SQLAlchemy Selectable object that can be executed to
52        retrieve records from the dataset.
53
54        If stream_configuration is not provided, we attempt to retrieve the stream configuration
55        from the cache processor. This is useful when constructing a dataset from a CachedDataset
56        object, which already has the stream configuration.
57
58        If stream_configuration is set to False, we skip the stream configuration retrieval.
59        """
60        self._length: int | None = None
61        self._cache: CacheBase = cache
62        self._stream_name: str = stream_name
63        self._query_statement: Selectable = query_statement
64        if stream_configuration is None:
65            try:
66                stream_configuration = cache.processor.catalog_provider.get_configured_stream_info(
67                    stream_name=stream_name
68                )
69            except Exception as ex:
70                warnings.warn(
71                    f"Failed to get stream configuration for {stream_name}: {ex}",
72                    stacklevel=2,
73                )
74
75            # Coalesce False to None
76            stream_configuration = stream_configuration or None
77
78        super().__init__(stream_metadata=stream_configuration)

Initialize the dataset with a cache, stream name, and query statement.

This class is not intended to be created directly. Instead, you can retrieve datasets from caches or Cloud connection objects, etc.

The query statement should be a SQLAlchemy Selectable object that can be executed to retrieve records from the dataset.

If stream_configuration is not provided, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.

If stream_configuration is set to False, we skip the stream configuration retrieval.

stream_name: str
80    @property
81    def stream_name(self) -> str:
82        return self._stream_name
def to_pandas(self) -> pandas.core.frame.DataFrame:
103    def to_pandas(self) -> DataFrame:
104        return self._cache.get_pandas_dataframe(self._stream_name)

Return a pandas DataFrame representation of the dataset.

The base implementation simply passes the record iterator to Panda's DataFrame constructor.

def with_filter( self, *filter_expressions: sqlalchemy.sql.elements.ClauseElement | str) -> SQLDataset:
106    def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset:
107        """Filter the dataset by a set of column values.
108
109        Filters can be specified as either a string or a SQLAlchemy expression.
110
111        Filters are lazily applied to the dataset, so they can be chained together. For example:
112
113                dataset.with_filter("id > 5").with_filter("id < 10")
114
115        is equivalent to:
116
117                dataset.with_filter("id > 5", "id < 10")
118        """
119        # Convert all strings to TextClause objects.
120        filters: list[ClauseElement] = [
121            text(expression) if isinstance(expression, str) else expression
122            for expression in filter_expressions
123        ]
124        filtered_select = self._query_statement.where(and_(*filters))
125        return SQLDataset(
126            cache=self._cache,
127            stream_name=self._stream_name,
128            query_statement=filtered_select,
129        )

Filter the dataset by a set of column values.

Filters can be specified as either a string or a SQLAlchemy expression.

Filters are lazily applied to the dataset, so they can be chained together. For example:

    dataset.with_filter("id > 5").with_filter("id < 10")

is equivalent to:

    dataset.with_filter("id > 5", "id < 10")
Inherited Members
DatasetBase
to_documents