airbyte.datasets

PyAirbyte dataset classes.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte dataset classes."""
 3
 4from __future__ import annotations
 5
 6from airbyte.datasets._base import DatasetBase
 7from airbyte.datasets._lazy import LazyDataset
 8from airbyte.datasets._map import DatasetMap
 9from airbyte.datasets._sql import CachedDataset, SQLDataset
10
11
12__all__ = [
13    "CachedDataset",
14    "DatasetBase",
15    "DatasetMap",
16    "LazyDataset",
17    "SQLDataset",
18]
class CachedDataset(airbyte.datasets.SQLDataset):
150class CachedDataset(SQLDataset):
151    """A dataset backed by a SQL table cache.
152
153    Because this dataset includes all records from the underlying table, we also expose the
154    underlying table as a SQLAlchemy Table object.
155    """
156
157    def __init__(
158        self,
159        cache: CacheBase,
160        stream_name: str,
161        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
162    ) -> None:
163        """We construct the query statement by selecting all columns from the table.
164
165        This prevents the need to scan the table schema to construct the query statement.
166
167        If stream_configuration is None, we attempt to retrieve the stream configuration from the
168        cache processor. This is useful when constructing a dataset from a CachedDataset object,
169        which already has the stream configuration.
170
171        If stream_configuration is set to False, we skip the stream configuration retrieval.
172        """
173        table_name = cache.processor.get_sql_table_name(stream_name)
174        schema_name = cache.schema_name
175        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
176        super().__init__(
177            cache=cache,
178            stream_name=stream_name,
179            query_statement=query,
180            stream_configuration=stream_configuration,
181        )
182
183    @overrides
184    def to_pandas(self) -> DataFrame:
185        """Return the underlying dataset data as a pandas DataFrame."""
186        return self._cache.get_pandas_dataframe(self._stream_name)
187
188    @overrides
189    def to_arrow(
190        self,
191        *,
192        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
193    ) -> Dataset:
194        """Return an Arrow Dataset containing the data from the specified stream.
195
196        Args:
197            stream_name (str): Name of the stream to retrieve data from.
198            max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
199
200        Returns:
201            pa.dataset.Dataset: Arrow Dataset containing the stream's data.
202        """
203        return self._cache.get_arrow_dataset(
204            stream_name=self._stream_name,
205            max_chunk_size=max_chunk_size,
206        )
207
208    def to_sql_table(self) -> Table:
209        """Return the underlying SQL table as a SQLAlchemy Table object."""
210        return self._cache.processor.get_sql_table(self.stream_name)
211
212    def __eq__(self, value: object) -> bool:
213        """Return True if the value is a CachedDataset with the same cache and stream name.
214
215        In the case of CachedDataset objects, we can simply compare the cache and stream name.
216
217        Note that this equality check is only supported on CachedDataset objects and not for
218        the base SQLDataset implementation. This is because of the complexity and computational
219        cost of comparing two arbitrary SQL queries that could be bound to different variables,
220        as well as the chance that two queries can be syntactically equivalent without being
221        text-wise equivalent.
222        """
223        if not isinstance(value, SQLDataset):
224            return False
225
226        if self._cache is not value._cache:
227            return False
228
229        return not self._stream_name != value._stream_name
230
231    def __hash__(self) -> int:
232        return hash(self._stream_name)

A dataset backed by a SQL table cache.

Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.

CachedDataset( cache: airbyte.caches.CacheBase, stream_name: str, stream_configuration: Union[airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream, Literal[False], NoneType] = None)
157    def __init__(
158        self,
159        cache: CacheBase,
160        stream_name: str,
161        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
162    ) -> None:
163        """We construct the query statement by selecting all columns from the table.
164
165        This prevents the need to scan the table schema to construct the query statement.
166
167        If stream_configuration is None, we attempt to retrieve the stream configuration from the
168        cache processor. This is useful when constructing a dataset from a CachedDataset object,
169        which already has the stream configuration.
170
171        If stream_configuration is set to False, we skip the stream configuration retrieval.
172        """
173        table_name = cache.processor.get_sql_table_name(stream_name)
174        schema_name = cache.schema_name
175        query = select("*").select_from(text(f"{schema_name}.{table_name}"))
176        super().__init__(
177            cache=cache,
178            stream_name=stream_name,
179            query_statement=query,
180            stream_configuration=stream_configuration,
181        )

We construct the query statement by selecting all columns from the table.

This prevents the need to scan the table schema to construct the query statement.

If stream_configuration is None, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.

If stream_configuration is set to False, we skip the stream configuration retrieval.

@overrides
def to_pandas(self) -> pandas.core.frame.DataFrame:
183    @overrides
184    def to_pandas(self) -> DataFrame:
185        """Return the underlying dataset data as a pandas DataFrame."""
186        return self._cache.get_pandas_dataframe(self._stream_name)

Return the underlying dataset data as a pandas DataFrame.

@overrides
def to_arrow(self, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
188    @overrides
189    def to_arrow(
190        self,
191        *,
192        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
193    ) -> Dataset:
194        """Return an Arrow Dataset containing the data from the specified stream.
195
196        Args:
197            stream_name (str): Name of the stream to retrieve data from.
198            max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
199
200        Returns:
201            pa.dataset.Dataset: Arrow Dataset containing the stream's data.
202        """
203        return self._cache.get_arrow_dataset(
204            stream_name=self._stream_name,
205            max_chunk_size=max_chunk_size,
206        )

Return an Arrow Dataset containing the data from the specified stream.

Arguments:
  • stream_name (str): Name of the stream to retrieve data from.
  • max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
Returns:

pa.dataset.Dataset: Arrow Dataset containing the stream's data.

def to_sql_table(self) -> sqlalchemy.sql.schema.Table:
208    def to_sql_table(self) -> Table:
209        """Return the underlying SQL table as a SQLAlchemy Table object."""
210        return self._cache.processor.get_sql_table(self.stream_name)

Return the underlying SQL table as a SQLAlchemy Table object.

class DatasetBase(abc.ABC):
26class DatasetBase(ABC):
27    """Base implementation for all datasets."""
28
29    def __init__(self, stream_metadata: ConfiguredAirbyteStream) -> None:
30        self._stream_metadata = stream_metadata
31
32    @abstractmethod
33    def __iter__(self) -> Iterator[dict[str, Any]]:
34        """Return the iterator of records."""
35        raise NotImplementedError
36
37    def to_pandas(self) -> DataFrame:
38        """Return a pandas DataFrame representation of the dataset.
39
40        The base implementation simply passes the record iterator to Panda's DataFrame constructor.
41        """
42        # Technically, we return an iterator of Mapping objects. However, pandas
43        # expects an iterator of dict objects. This cast is safe because we know
44        # duck typing is correct for this use case.
45        return DataFrame(cast("Iterator[dict[str, Any]]", self))
46
47    def to_arrow(
48        self,
49        *,
50        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
51    ) -> Dataset:
52        """Return an Arrow Dataset representation of the dataset.
53
54        This method should be implemented by subclasses.
55        """
56        raise NotImplementedError("Not implemented in base class")
57
58    def to_documents(
59        self,
60        title_property: str | None = None,
61        content_properties: list[str] | None = None,
62        metadata_properties: list[str] | None = None,
63        *,
64        render_metadata: bool = False,
65    ) -> Iterable[Document]:
66        """Return the iterator of documents.
67
68        If metadata_properties is not set, all properties that are not content will be added to
69        the metadata.
70
71        If render_metadata is True, metadata will be rendered in the document, as well as the
72        the main content. Otherwise, metadata will be attached to the document but not rendered.
73        """
74        renderer = DocumentRenderer(
75            title_property=title_property,
76            content_properties=content_properties,
77            metadata_properties=metadata_properties,
78            render_metadata=render_metadata,
79        )
80        yield from renderer.render_documents(self)
81
82    @property
83    def column_names(self) -> list[str]:
84        """Return the list of top-level column names."""
85        return list(self._stream_metadata.stream.json_schema["properties"].keys())

Base implementation for all datasets.

def to_pandas(self) -> pandas.core.frame.DataFrame:
37    def to_pandas(self) -> DataFrame:
38        """Return a pandas DataFrame representation of the dataset.
39
40        The base implementation simply passes the record iterator to Panda's DataFrame constructor.
41        """
42        # Technically, we return an iterator of Mapping objects. However, pandas
43        # expects an iterator of dict objects. This cast is safe because we know
44        # duck typing is correct for this use case.
45        return DataFrame(cast("Iterator[dict[str, Any]]", self))

Return a pandas DataFrame representation of the dataset.

The base implementation simply passes the record iterator to Panda's DataFrame constructor.

def to_arrow(self, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
47    def to_arrow(
48        self,
49        *,
50        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
51    ) -> Dataset:
52        """Return an Arrow Dataset representation of the dataset.
53
54        This method should be implemented by subclasses.
55        """
56        raise NotImplementedError("Not implemented in base class")

Return an Arrow Dataset representation of the dataset.

This method should be implemented by subclasses.

def to_documents( self, title_property: str | None = None, content_properties: list[str] | None = None, metadata_properties: list[str] | None = None, *, render_metadata: bool = False) -> Iterable[airbyte.documents.Document]:
58    def to_documents(
59        self,
60        title_property: str | None = None,
61        content_properties: list[str] | None = None,
62        metadata_properties: list[str] | None = None,
63        *,
64        render_metadata: bool = False,
65    ) -> Iterable[Document]:
66        """Return the iterator of documents.
67
68        If metadata_properties is not set, all properties that are not content will be added to
69        the metadata.
70
71        If render_metadata is True, metadata will be rendered in the document, as well as the
72        the main content. Otherwise, metadata will be attached to the document but not rendered.
73        """
74        renderer = DocumentRenderer(
75            title_property=title_property,
76            content_properties=content_properties,
77            metadata_properties=metadata_properties,
78            render_metadata=render_metadata,
79        )
80        yield from renderer.render_documents(self)

Return the iterator of documents.

If metadata_properties is not set, all properties that are not content will be added to the metadata.

If render_metadata is True, metadata will be rendered in the document, as well as the the main content. Otherwise, metadata will be attached to the document but not rendered.

column_names: list[str]
82    @property
83    def column_names(self) -> list[str]:
84        """Return the list of top-level column names."""
85        return list(self._stream_metadata.stream.json_schema["properties"].keys())

Return the list of top-level column names.

class DatasetMap(collections.abc.Mapping):
20class DatasetMap(Mapping):
21    """A generic interface for a set of streams or datasets."""
22
23    def __init__(self) -> None:
24        self._datasets: dict[str, DatasetBase] = {}
25
26    def __getitem__(self, key: str) -> DatasetBase:
27        return self._datasets[key]
28
29    def __iter__(self) -> Iterator[str]:
30        return iter(self._datasets)
31
32    def __len__(self) -> int:
33        return len(self._datasets)

A generic interface for a set of streams or datasets.

Inherited Members
collections.abc.Mapping
get
keys
items
values
class LazyDataset(airbyte.datasets.DatasetBase):
22class LazyDataset(DatasetBase):
23    """A dataset that is loaded incrementally from a source or a SQL query."""
24
25    def __init__(
26        self,
27        iterator: Iterator[dict[str, Any]],
28        *,
29        stream_metadata: ConfiguredAirbyteStream,
30        stop_event: threading.Event | None,
31        progress_tracker: progress.ProgressTracker,
32    ) -> None:
33        self._stop_event: threading.Event | None = stop_event or None
34        self._progress_tracker = progress_tracker
35        self._iterator: Iterator[dict[str, Any]] = iterator
36        super().__init__(
37            stream_metadata=stream_metadata,
38        )
39
40    @overrides
41    def __iter__(self) -> Iterator[dict[str, Any]]:
42        return self._iterator
43
44    def __next__(self) -> Mapping[str, Any]:
45        try:
46            return next(self._iterator)
47        except StopIteration:
48            # The iterator is exhausted, tell the producer they can stop if they are still
49            # producing records. (Esp. when an artificial limit is reached.)
50            self._progress_tracker.log_success()
51            if self._stop_event:
52                self._stop_event.set()
53            raise
54
55    def fetch_all(self) -> InMemoryDataset:
56        """Fetch all records to memory and return an InMemoryDataset."""
57        return InMemoryDataset(
58            records=list(self._iterator),
59            stream_metadata=self._stream_metadata,
60        )
61
62    def close(self) -> None:
63        """Stop the dataset iterator.
64
65        This method is used to signal the dataset to stop fetching records, for example
66        when the dataset is being fetched incrementally and the user wants to stop the
67        fetching process.
68        """
69        if self._stop_event:
70            self._stop_event.set()
71
72    def __del__(self) -> None:
73        """Close the dataset when the object is deleted."""
74        self.close()

A dataset that is loaded incrementally from a source or a SQL query.

LazyDataset( iterator: Iterator[dict[str, typing.Any]], *, stream_metadata: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream, stop_event: threading.Event | None, progress_tracker: airbyte.progress.ProgressTracker)
25    def __init__(
26        self,
27        iterator: Iterator[dict[str, Any]],
28        *,
29        stream_metadata: ConfiguredAirbyteStream,
30        stop_event: threading.Event | None,
31        progress_tracker: progress.ProgressTracker,
32    ) -> None:
33        self._stop_event: threading.Event | None = stop_event or None
34        self._progress_tracker = progress_tracker
35        self._iterator: Iterator[dict[str, Any]] = iterator
36        super().__init__(
37            stream_metadata=stream_metadata,
38        )
def fetch_all(self) -> airbyte.datasets._inmemory.InMemoryDataset:
55    def fetch_all(self) -> InMemoryDataset:
56        """Fetch all records to memory and return an InMemoryDataset."""
57        return InMemoryDataset(
58            records=list(self._iterator),
59            stream_metadata=self._stream_metadata,
60        )

Fetch all records to memory and return an InMemoryDataset.

def close(self) -> None:
62    def close(self) -> None:
63        """Stop the dataset iterator.
64
65        This method is used to signal the dataset to stop fetching records, for example
66        when the dataset is being fetched incrementally and the user wants to stop the
67        fetching process.
68        """
69        if self._stop_event:
70            self._stop_event.set()

Stop the dataset iterator.

This method is used to signal the dataset to stop fetching records, for example when the dataset is being fetched incrementally and the user wants to stop the fetching process.

class SQLDataset(airbyte.datasets.DatasetBase):
 38class SQLDataset(DatasetBase):
 39    """A dataset that is loaded incrementally from a SQL query.
 40
 41    The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full
 42    table as the query statement.
 43    """
 44
 45    def __init__(
 46        self,
 47        cache: CacheBase,
 48        stream_name: str,
 49        query_statement: Select,
 50        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
 51    ) -> None:
 52        """Initialize the dataset with a cache, stream name, and query statement.
 53
 54        This class is not intended to be created directly. Instead, you can retrieve
 55        datasets from caches or Cloud connection objects, etc.
 56
 57        The query statement should be a SQLAlchemy Selectable object that can be executed to
 58        retrieve records from the dataset.
 59
 60        If stream_configuration is not provided, we attempt to retrieve the stream configuration
 61        from the cache processor. This is useful when constructing a dataset from a CachedDataset
 62        object, which already has the stream configuration.
 63
 64        If stream_configuration is set to False, we skip the stream configuration retrieval.
 65        """
 66        self._length: int | None = None
 67        self._cache: CacheBase = cache
 68        self._stream_name: str = stream_name
 69        self._query_statement: Select = query_statement
 70        if stream_configuration is None:
 71            try:
 72                stream_configuration = cache.processor.catalog_provider.get_configured_stream_info(
 73                    stream_name=stream_name
 74                )
 75            except Exception as ex:
 76                warnings.warn(
 77                    f"Failed to get stream configuration for {stream_name}: {ex}",
 78                    stacklevel=2,
 79                )
 80
 81        # Coalesce False to None
 82        stream_configuration = stream_configuration or None
 83
 84        super().__init__(stream_metadata=stream_configuration)
 85
 86    @property
 87    def stream_name(self) -> str:
 88        return self._stream_name
 89
 90    def __iter__(self) -> Iterator[dict[str, Any]]:
 91        with self._cache.processor.get_sql_connection() as conn:
 92            for row in conn.execute(self._query_statement):
 93                # Access to private member required because SQLAlchemy doesn't expose a public API.
 94                # https://pydoc.dev/sqlalchemy/latest/sqlalchemy.engine.row.RowMapping.html
 95                yield cast("dict[str, Any]", row._mapping)  # noqa: SLF001
 96
 97    def __len__(self) -> int:
 98        """Return the number of records in the dataset.
 99
100        This method caches the length of the dataset after the first call.
101        """
102        if self._length is None:
103            count_query = select(func.count()).select_from(self._query_statement.subquery())
104            with self._cache.processor.get_sql_connection() as conn:
105                self._length = conn.execute(count_query).scalar()
106
107        return cast("int", self._length)
108
109    def to_pandas(self) -> DataFrame:
110        return self._cache.get_pandas_dataframe(self._stream_name)
111
112    def to_arrow(
113        self,
114        *,
115        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
116    ) -> Dataset:
117        return self._cache.get_arrow_dataset(self._stream_name, max_chunk_size=max_chunk_size)
118
119    def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset:
120        """Filter the dataset by a set of column values.
121
122        Filters can be specified as either a string or a SQLAlchemy expression.
123
124        Filters are lazily applied to the dataset, so they can be chained together. For example:
125
126                dataset.with_filter("id > 5").with_filter("id < 10")
127
128        is equivalent to:
129
130                dataset.with_filter("id > 5", "id < 10")
131        """
132        # Convert all strings to TextClause objects.
133        filters: list[ClauseElement] = [
134            text(expression) if isinstance(expression, str) else expression
135            for expression in filter_expressions
136        ]
137        filtered_select = self._query_statement.where(and_(*filters))
138        return SQLDataset(
139            cache=self._cache,
140            stream_name=self._stream_name,
141            query_statement=filtered_select,
142        )
143
144    @property
145    def column_names(self) -> list[str]:
146        """Return the list of top-level column names, including internal Airbyte columns."""
147        return [*super().column_names, AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN]

A dataset that is loaded incrementally from a SQL query.

The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full table as the query statement.

SQLDataset( cache: airbyte.caches.CacheBase, stream_name: str, query_statement: sqlalchemy.sql.selectable.Select, stream_configuration: Union[airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteStream, Literal[False], NoneType] = None)
45    def __init__(
46        self,
47        cache: CacheBase,
48        stream_name: str,
49        query_statement: Select,
50        stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None,
51    ) -> None:
52        """Initialize the dataset with a cache, stream name, and query statement.
53
54        This class is not intended to be created directly. Instead, you can retrieve
55        datasets from caches or Cloud connection objects, etc.
56
57        The query statement should be a SQLAlchemy Selectable object that can be executed to
58        retrieve records from the dataset.
59
60        If stream_configuration is not provided, we attempt to retrieve the stream configuration
61        from the cache processor. This is useful when constructing a dataset from a CachedDataset
62        object, which already has the stream configuration.
63
64        If stream_configuration is set to False, we skip the stream configuration retrieval.
65        """
66        self._length: int | None = None
67        self._cache: CacheBase = cache
68        self._stream_name: str = stream_name
69        self._query_statement: Select = query_statement
70        if stream_configuration is None:
71            try:
72                stream_configuration = cache.processor.catalog_provider.get_configured_stream_info(
73                    stream_name=stream_name
74                )
75            except Exception as ex:
76                warnings.warn(
77                    f"Failed to get stream configuration for {stream_name}: {ex}",
78                    stacklevel=2,
79                )
80
81        # Coalesce False to None
82        stream_configuration = stream_configuration or None
83
84        super().__init__(stream_metadata=stream_configuration)

Initialize the dataset with a cache, stream name, and query statement.

This class is not intended to be created directly. Instead, you can retrieve datasets from caches or Cloud connection objects, etc.

The query statement should be a SQLAlchemy Selectable object that can be executed to retrieve records from the dataset.

If stream_configuration is not provided, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.

If stream_configuration is set to False, we skip the stream configuration retrieval.

stream_name: str
86    @property
87    def stream_name(self) -> str:
88        return self._stream_name
def to_pandas(self) -> pandas.core.frame.DataFrame:
109    def to_pandas(self) -> DataFrame:
110        return self._cache.get_pandas_dataframe(self._stream_name)

Return a pandas DataFrame representation of the dataset.

The base implementation simply passes the record iterator to Panda's DataFrame constructor.

def to_arrow(self, *, max_chunk_size: int = 100000) -> pyarrow._dataset.Dataset:
112    def to_arrow(
113        self,
114        *,
115        max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE,
116    ) -> Dataset:
117        return self._cache.get_arrow_dataset(self._stream_name, max_chunk_size=max_chunk_size)

Return an Arrow Dataset representation of the dataset.

This method should be implemented by subclasses.

def with_filter( self, *filter_expressions: sqlalchemy.sql.elements.ClauseElement | str) -> SQLDataset:
119    def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset:
120        """Filter the dataset by a set of column values.
121
122        Filters can be specified as either a string or a SQLAlchemy expression.
123
124        Filters are lazily applied to the dataset, so they can be chained together. For example:
125
126                dataset.with_filter("id > 5").with_filter("id < 10")
127
128        is equivalent to:
129
130                dataset.with_filter("id > 5", "id < 10")
131        """
132        # Convert all strings to TextClause objects.
133        filters: list[ClauseElement] = [
134            text(expression) if isinstance(expression, str) else expression
135            for expression in filter_expressions
136        ]
137        filtered_select = self._query_statement.where(and_(*filters))
138        return SQLDataset(
139            cache=self._cache,
140            stream_name=self._stream_name,
141            query_statement=filtered_select,
142        )

Filter the dataset by a set of column values.

Filters can be specified as either a string or a SQLAlchemy expression.

Filters are lazily applied to the dataset, so they can be chained together. For example:

    dataset.with_filter("id > 5").with_filter("id < 10")

is equivalent to:

    dataset.with_filter("id > 5", "id < 10")
column_names: list[str]
144    @property
145    def column_names(self) -> list[str]:
146        """Return the list of top-level column names, including internal Airbyte columns."""
147        return [*super().column_names, AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN]

Return the list of top-level column names, including internal Airbyte columns.

Inherited Members
DatasetBase
to_documents