airbyte.datasets

PyAirbyte dataset classes.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte dataset classes."""
 3
 4from __future__ import annotations
 5
 6from airbyte.datasets._base import DatasetBase
 7from airbyte.datasets._lazy import LazyDataset
 8from airbyte.datasets._map import DatasetMap
 9from airbyte.datasets._sql import CachedDataset, SQLDataset
10
11
12__all__ = [
13    "CachedDataset",
14    "DatasetBase",
15    "DatasetMap",
16    "LazyDataset",
17    "SQLDataset",
18]
class CachedDataset(airbyte.datasets.SQLDataset):
140class CachedDataset(SQLDataset):
141    """A dataset backed by a SQL table cache.
142
143    Because this dataset includes all records from the underlying table, we also expose the
144    underlying table as a SQLAlchemy Table object.
145    """
146
147    def __init__(
148        self,
149        cache: CacheBase,
150        stream_name: str,
151        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
152    ) -> None:
153        """We construct the query statement by selecting all columns from the table.
154
155        This prevents the need to scan the table schema to construct the query statement.
156
157        If stream_configuration is None, we attempt to retrieve the stream configuration from the
158        cache processor. This is useful when constructing a dataset from a CachedDataset object,
159        which already has the stream configuration.
160
161        If stream_configuration is set to False, we skip the stream configuration retrieval.
162        """
163        table_name = cache.processor.get_sql_table_name(stream_name)
164        schema_name = cache.schema_name
165        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
166        super().__init__(
167            cache=cache,
168            stream_name=stream_name,
169            query_statement=query,
170            stream_configuration=stream_configuration,
171        )
172
173    @overrides
174    def to_pandas(self) -> DataFrame:
175        """Return the underlying dataset data as a pandas DataFrame."""
176        return self._cache.get_pandas_dataframe(self._stream_name)
177
178    @overrides
179    def to_arrow(
180        self,
181        *,
182        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
183    ) -> Dataset:
184        """Return an Arrow Dataset containing the data from the specified stream.
185
186        Args:
187            stream_name (str): Name of the stream to retrieve data from.
188            max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
189
190        Returns:
191            pa.dataset.Dataset: Arrow Dataset containing the stream's data.
192        """
193        return self._cache.get_arrow_dataset(
194            stream_name=self._stream_name,
195            max_chunk_size=max_chunk_size,
196        )
197
198    def to_sql_table(self) -> Table:
199        """Return the underlying SQL table as a SQLAlchemy Table object."""
200        return self._cache.processor.get_sql_table(self.stream_name)
201
202    def __eq__(self, value: object) -> bool:
203        """Return True if the value is a CachedDataset with the same cache and stream name.
204
205        In the case of CachedDataset objects, we can simply compare the cache and stream name.
206
207        Note that this equality check is only supported on CachedDataset objects and not for
208        the base SQLDataset implementation. This is because of the complexity and computational
209        cost of comparing two arbitrary SQL queries that could be bound to different variables,
210        as well as the chance that two queries can be syntactically equivalent without being
211        text-wise equivalent.
212        """
213        if not isinstance(value, SQLDataset):
214            return False
215
216        if self._cache is not value._cache:
217            return False
218
219        return not self._stream_name != value._stream_name
220
221    def __hash__(self) -> int:
222        return hash(self._stream_name)

A dataset backed by a SQL table cache.

Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.

CachedDataset( cache: airbyte.caches.CacheBase, stream_name: str, stream_configuration: Union[airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream, Literal[False], NoneType] = None)
147    def __init__(
148        self,
149        cache: CacheBase,
150        stream_name: str,
151        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
152    ) -> None:
153        """We construct the query statement by selecting all columns from the table.
154
155        This prevents the need to scan the table schema to construct the query statement.
156
157        If stream_configuration is None, we attempt to retrieve the stream configuration from the
158        cache processor. This is useful when constructing a dataset from a CachedDataset object,
159        which already has the stream configuration.
160
161        If stream_configuration is set to False, we skip the stream configuration retrieval.
162        """
163        table_name = cache.processor.get_sql_table_name(stream_name)
164        schema_name = cache.schema_name
165        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
166        super().__init__(
167            cache=cache,
168            stream_name=stream_name,
169            query_statement=query,
170            stream_configuration=stream_configuration,
171        )

We construct the query statement by selecting all columns from the table.

This prevents the need to scan the table schema to construct the query statement.

If stream_configuration is None, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.

If stream_configuration is set to False, we skip the stream configuration retrieval.

@overrides
def to_pandas(self) -> pandas.core.frame.DataFrame:
173    @overrides
174    def to_pandas(self) -> DataFrame:
175        """Return the underlying dataset data as a pandas DataFrame."""
176        return self._cache.get_pandas_dataframe(self._stream_name)

Return the underlying dataset data as a pandas DataFrame.

@overrides
def to_arrow(self, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
178    @overrides
179    def to_arrow(
180        self,
181        *,
182        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
183    ) -> Dataset:
184        """Return an Arrow Dataset containing the data from the specified stream.
185
186        Args:
187            stream_name (str): Name of the stream to retrieve data from.
188            max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
189
190        Returns:
191            pa.dataset.Dataset: Arrow Dataset containing the stream's data.
192        """
193        return self._cache.get_arrow_dataset(
194            stream_name=self._stream_name,
195            max_chunk_size=max_chunk_size,
196        )

Return an Arrow Dataset containing the data from the specified stream.

Arguments:
  • stream_name (str): Name of the stream to retrieve data from.
  • max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
Returns:

pa.dataset.Dataset: Arrow Dataset containing the stream's data.

def to_sql_table(self) -> sqlalchemy.sql.schema.Table:
198    def to_sql_table(self) -> Table:
199        """Return the underlying SQL table as a SQLAlchemy Table object."""
200        return self._cache.processor.get_sql_table(self.stream_name)

Return the underlying SQL table as a SQLAlchemy Table object.

class DatasetBase(abc.ABC):
24class DatasetBase(ABC):
25    """Base implementation for all datasets."""
26
27    def __init__(self, stream_metadata: ConfiguredAirbyteStream) -> None:
28        self._stream_metadata = stream_metadata
29
30    @abstractmethod
31    def __iter__(self) -> Iterator[dict[str, Any]]:
32        """Return the iterator of records."""
33        raise NotImplementedError
34
35    def to_pandas(self) -> DataFrame:
36        """Return a pandas DataFrame representation of the dataset.
37
38        The base implementation simply passes the record iterator to Panda's DataFrame constructor.
39        """
40        # Technically, we return an iterator of Mapping objects. However, pandas
41        # expects an iterator of dict objects. This cast is safe because we know
42        # duck typing is correct for this use case.
43        return DataFrame(cast("Iterator[dict[str, Any]]", self))
44
45    def to_arrow(
46        self,
47        *,
48        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
49    ) -> Dataset:
50        """Return an Arrow Dataset representation of the dataset.
51
52        This method should be implemented by subclasses.
53        """
54        raise NotImplementedError("Not implemented in base class")
55
56    def to_documents(
57        self,
58        title_property: str | None = None,
59        content_properties: list[str] | None = None,
60        metadata_properties: list[str] | None = None,
61        *,
62        render_metadata: bool = False,
63    ) -> Iterable[Document]:
64        """Return the iterator of documents.
65
66        If metadata_properties is not set, all properties that are not content will be added to
67        the metadata.
68
69        If render_metadata is True, metadata will be rendered in the document, as well as the
70        the main content. Otherwise, metadata will be attached to the document but not rendered.
71        """
72        renderer = DocumentRenderer(
73            title_property=title_property,
74            content_properties=content_properties,
75            metadata_properties=metadata_properties,
76            render_metadata=render_metadata,
77        )
78        yield from renderer.render_documents(self)

Base implementation for all datasets.

def to_pandas(self) -> pandas.core.frame.DataFrame:
35    def to_pandas(self) -> DataFrame:
36        """Return a pandas DataFrame representation of the dataset.
37
38        The base implementation simply passes the record iterator to Panda's DataFrame constructor.
39        """
40        # Technically, we return an iterator of Mapping objects. However, pandas
41        # expects an iterator of dict objects. This cast is safe because we know
42        # duck typing is correct for this use case.
43        return DataFrame(cast("Iterator[dict[str, Any]]", self))

Return a pandas DataFrame representation of the dataset.

The base implementation simply passes the record iterator to Panda's DataFrame constructor.

def to_arrow(self, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
45    def to_arrow(
46        self,
47        *,
48        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
49    ) -> Dataset:
50        """Return an Arrow Dataset representation of the dataset.
51
52        This method should be implemented by subclasses.
53        """
54        raise NotImplementedError("Not implemented in base class")

Return an Arrow Dataset representation of the dataset.

This method should be implemented by subclasses.

def to_documents( self, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
56    def to_documents(
57        self,
58        title_property: str | None = None,
59        content_properties: list[str] | None = None,
60        metadata_properties: list[str] | None = None,
61        *,
62        render_metadata: bool = False,
63    ) -> Iterable[Document]:
64        """Return the iterator of documents.
65
66        If metadata_properties is not set, all properties that are not content will be added to
67        the metadata.
68
69        If render_metadata is True, metadata will be rendered in the document, as well as the
70        the main content. Otherwise, metadata will be attached to the document but not rendered.
71        """
72        renderer = DocumentRenderer(
73            title_property=title_property,
74            content_properties=content_properties,
75            metadata_properties=metadata_properties,
76            render_metadata=render_metadata,
77        )
78        yield from renderer.render_documents(self)

Return the iterator of documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content. Otherwise, metadata will be attached to the document but not rendered.

class DatasetMap(collections.abc.Mapping):
20class DatasetMap(Mapping):
21    """A generic interface for a set of streams or datasets."""
22
23    def __init__(self) -> None:
24        self._datasets: dict[str, DatasetBase] = {}
25
26    def __getitem__(self, key: str) -> DatasetBase:
27        return self._datasets[key]
28
29    def __iter__(self) -> Iterator[str]:
30        return iter(self._datasets)
31
32    def __len__(self) -> int:
33        return len(self._datasets)

A generic interface for a set of streams or datasets.

Inherited Members
collections.abc.Mapping
get
keys
items
values
class LazyDataset(airbyte.datasets.DatasetBase):
18class LazyDataset(DatasetBase):
19    """A dataset that is loaded incrementally from a source or a SQL query."""
20
21    def __init__(
22        self,
23        iterator: Iterator[dict[str, Any]],
24        stream_metadata: ConfiguredAirbyteStream,
25    ) -> None:
26        self._iterator: Iterator[dict[str, Any]] = iterator
27        super().__init__(
28            stream_metadata=stream_metadata,
29        )
30
31    @overrides
32    def __iter__(self) -> Iterator[dict[str, Any]]:
33        return self._iterator
34
35    def __next__(self) -> Mapping[str, Any]:
36        return next(self._iterator)

A dataset that is loaded incrementally from a source or a SQL query.

LazyDataset( iterator: Iterator[dict[str, typing.Any]], stream_metadata: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream)
21    def __init__(
22        self,
23        iterator: Iterator[dict[str, Any]],
24        stream_metadata: ConfiguredAirbyteStream,
25    ) -> None:
26        self._iterator: Iterator[dict[str, Any]] = iterator
27        super().__init__(
28            stream_metadata=stream_metadata,
29        )
class SQLDataset(airbyte.datasets.DatasetBase):
 33class SQLDataset(DatasetBase):
 34    """A dataset that is loaded incrementally from a SQL query.
 35
 36    The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full
 37    table as the query statement.
 38    """
 39
 40    def __init__(
 41        self,
 42        cache: CacheBase,
 43        stream_name: str,
 44        query_statement: Select,
 45        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
 46    ) -> None:
 47        """Initialize the dataset with a cache, stream name, and query statement.
 48
 49        This class is not intended to be created directly. Instead, you can retrieve
 50        datasets from caches or Cloud connection objects, etc.
 51
 52        The query statement should be a SQLAlchemy Selectable object that can be executed to
 53        retrieve records from the dataset.
 54
 55        If stream_configuration is not provided, we attempt to retrieve the stream configuration
 56        from the cache processor. This is useful when constructing a dataset from a CachedDataset
 57        object, which already has the stream configuration.
 58
 59        If stream_configuration is set to False, we skip the stream configuration retrieval.
 60        """
 61        self._length: int | None = None
 62        self._cache: CacheBase = cache
 63        self._stream_name: str = stream_name
 64        self._query_statement: Select = query_statement
 65        if stream_configuration is None:
 66            try:
 67                stream_configuration = cache.processor.catalog_provider.get_configured_stream_info(
 68                    stream_name=stream_name
 69                )
 70            except Exception as ex:
 71                warnings.warn(
 72                    f"Failed to get stream configuration for {stream_name}: {ex}",
 73                    stacklevel=2,
 74                )
 75
 76        # Coalesce False to None
 77        stream_configuration = stream_configuration or None
 78
 79        super().__init__(stream_metadata=stream_configuration)
 80
 81    @property
 82    def stream_name(self) -> str:
 83        return self._stream_name
 84
 85    def __iter__(self) -> Iterator[dict[str, Any]]:
 86        with self._cache.processor.get_sql_connection() as conn:
 87            for row in conn.execute(self._query_statement):
 88                # Access to private member required because SQLAlchemy doesn't expose a public API.
 89                # https://pydoc.dev/sqlalchemy/latest/sqlalchemy.engine.row.RowMapping.html
 90                yield cast("dict[str, Any]", row._mapping)  # noqa: SLF001
 91
 92    def __len__(self) -> int:
 93        """Return the number of records in the dataset.
 94
 95        This method caches the length of the dataset after the first call.
 96        """
 97        if self._length is None:
 98            count_query = select(func.count()).select_from(self._query_statement.subquery())
 99            with self._cache.processor.get_sql_connection() as conn:
100                self._length = conn.execute(count_query).scalar()
101
102        return cast("int", self._length)
103
104    def to_pandas(self) -> DataFrame:
105        return self._cache.get_pandas_dataframe(self._stream_name)
106
107    def to_arrow(
108        self,
109        *,
110        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
111    ) -> Dataset:
112        return self._cache.get_arrow_dataset(self._stream_name, max_chunk_size=max_chunk_size)
113
114    def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset:
115        """Filter the dataset by a set of column values.
116
117        Filters can be specified as either a string or a SQLAlchemy expression.
118
119        Filters are lazily applied to the dataset, so they can be chained together. For example:
120
121                dataset.with_filter("id > 5").with_filter("id < 10")
122
123        is equivalent to:
124
125                dataset.with_filter("id > 5", "id < 10")
126        """
127        # Convert all strings to TextClause objects.
128        filters: list[ClauseElement] = [
129            text(expression) if isinstance(expression, str) else expression
130            for expression in filter_expressions
131        ]
132        filtered_select = self._query_statement.where(and_(*filters))
133        return SQLDataset(
134            cache=self._cache,
135            stream_name=self._stream_name,
136            query_statement=filtered_select,
137        )

A dataset that is loaded incrementally from a SQL query.

The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full table as the query statement.

SQLDataset( cache: airbyte.caches.CacheBase, stream_name: str, query_statement: sqlalchemy.sql.selectable.Select, stream_configuration: Union[airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream, Literal[False], NoneType] = None)
40    def __init__(
41        self,
42        cache: CacheBase,
43        stream_name: str,
44        query_statement: Select,
45        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
46    ) -> None:
47        """Initialize the dataset with a cache, stream name, and query statement.
48
49        This class is not intended to be created directly. Instead, you can retrieve
50        datasets from caches or Cloud connection objects, etc.
51
52        The query statement should be a SQLAlchemy Selectable object that can be executed to
53        retrieve records from the dataset.
54
55        If stream_configuration is not provided, we attempt to retrieve the stream configuration
56        from the cache processor. This is useful when constructing a dataset from a CachedDataset
57        object, which already has the stream configuration.
58
59        If stream_configuration is set to False, we skip the stream configuration retrieval.
60        """
61        self._length: int | None = None
62        self._cache: CacheBase = cache
63        self._stream_name: str = stream_name
64        self._query_statement: Select = query_statement
65        if stream_configuration is None:
66            try:
67                stream_configuration = cache.processor.catalog_provider.get_configured_stream_info(
68                    stream_name=stream_name
69                )
70            except Exception as ex:
71                warnings.warn(
72                    f"Failed to get stream configuration for {stream_name}: {ex}",
73                    stacklevel=2,
74                )
75
76        # Coalesce False to None
77        stream_configuration = stream_configuration or None
78
79        super().__init__(stream_metadata=stream_configuration)

Initialize the dataset with a cache, stream name, and query statement.

This class is not intended to be created directly. Instead, you can retrieve datasets from caches or Cloud connection objects, etc.

The query statement should be a SQLAlchemy Selectable object that can be executed to retrieve records from the dataset.

If stream_configuration is not provided, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.

If stream_configuration is set to False, we skip the stream configuration retrieval.

stream_name: str
81    @property
82    def stream_name(self) -> str:
83        return self._stream_name
def to_pandas(self) -> pandas.core.frame.DataFrame:
104    def to_pandas(self) -> DataFrame:
105        return self._cache.get_pandas_dataframe(self._stream_name)

Return a pandas DataFrame representation of the dataset.

The base implementation simply passes the record iterator to Panda's DataFrame constructor.

def to_arrow(self, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
107    def to_arrow(
108        self,
109        *,
110        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
111    ) -> Dataset:
112        return self._cache.get_arrow_dataset(self._stream_name, max_chunk_size=max_chunk_size)

Return an Arrow Dataset representation of the dataset.

This method should be implemented by subclasses.

def with_filter( self, *filter_expressions: sqlalchemy.sql.elements.ClauseElement | str) -> SQLDataset:
114    def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset:
115        """Filter the dataset by a set of column values.
116
117        Filters can be specified as either a string or a SQLAlchemy expression.
118
119        Filters are lazily applied to the dataset, so they can be chained together. For example:
120
121                dataset.with_filter("id > 5").with_filter("id < 10")
122
123        is equivalent to:
124
125                dataset.with_filter("id > 5", "id < 10")
126        """
127        # Convert all strings to TextClause objects.
128        filters: list[ClauseElement] = [
129            text(expression) if isinstance(expression, str) else expression
130            for expression in filter_expressions
131        ]
132        filtered_select = self._query_statement.where(and_(*filters))
133        return SQLDataset(
134            cache=self._cache,
135            stream_name=self._stream_name,
136            query_statement=filtered_select,
137        )

Filter the dataset by a set of column values.

Filters can be specified as either a string or a SQLAlchemy expression.

Filters are lazily applied to the dataset, so they can be chained together. For example:

    dataset.with_filter("id > 5").with_filter("id < 10")

is equivalent to:

    dataset.with_filter("id > 5", "id < 10")
Inherited Members
DatasetBase
to_documents