airbyte.datasets
PyAirbyte dataset classes.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte dataset classes.""" 3 4from __future__ import annotations 5 6from airbyte.datasets._base import DatasetBase 7from airbyte.datasets._lazy import LazyDataset 8from airbyte.datasets._map import DatasetMap 9from airbyte.datasets._sql import CachedDataset, SQLDataset 10 11 12__all__ = [ 13 "CachedDataset", 14 "DatasetBase", 15 "DatasetMap", 16 "LazyDataset", 17 "SQLDataset", 18]
140class CachedDataset(SQLDataset): 141 """A dataset backed by a SQL table cache. 142 143 Because this dataset includes all records from the underlying table, we also expose the 144 underlying table as a SQLAlchemy Table object. 145 """ 146 147 def __init__( 148 self, 149 cache: CacheBase, 150 stream_name: str, 151 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 152 ) -> None: 153 """We construct the query statement by selecting all columns from the table. 154 155 This prevents the need to scan the table schema to construct the query statement. 156 157 If stream_configuration is None, we attempt to retrieve the stream configuration from the 158 cache processor. This is useful when constructing a dataset from a CachedDataset object, 159 which already has the stream configuration. 160 161 If stream_configuration is set to False, we skip the stream configuration retrieval. 162 """ 163 table_name = cache.processor.get_sql_table_name(stream_name) 164 schema_name = cache.schema_name 165 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 166 super().__init__( 167 cache=cache, 168 stream_name=stream_name, 169 query_statement=query, 170 stream_configuration=stream_configuration, 171 ) 172 173 @overrides 174 def to_pandas(self) -> DataFrame: 175 """Return the underlying dataset data as a pandas DataFrame.""" 176 return self._cache.get_pandas_dataframe(self._stream_name) 177 178 @overrides 179 def to_arrow( 180 self, 181 *, 182 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 183 ) -> Dataset: 184 """Return an Arrow Dataset containing the data from the specified stream. 185 186 Args: 187 stream_name (str): Name of the stream to retrieve data from. 188 max_chunk_size (int): max number of records to include in each batch of pyarrow dataset. 189 190 Returns: 191 pa.dataset.Dataset: Arrow Dataset containing the stream's data. 192 """ 193 return self._cache.get_arrow_dataset( 194 stream_name=self._stream_name, 195 max_chunk_size=max_chunk_size, 196 ) 197 198 def to_sql_table(self) -> Table: 199 """Return the underlying SQL table as a SQLAlchemy Table object.""" 200 return self._cache.processor.get_sql_table(self.stream_name) 201 202 def __eq__(self, value: object) -> bool: 203 """Return True if the value is a CachedDataset with the same cache and stream name. 204 205 In the case of CachedDataset objects, we can simply compare the cache and stream name. 206 207 Note that this equality check is only supported on CachedDataset objects and not for 208 the base SQLDataset implementation. This is because of the complexity and computational 209 cost of comparing two arbitrary SQL queries that could be bound to different variables, 210 as well as the chance that two queries can be syntactically equivalent without being 211 text-wise equivalent. 212 """ 213 if not isinstance(value, SQLDataset): 214 return False 215 216 if self._cache is not value._cache: 217 return False 218 219 return not self._stream_name != value._stream_name 220 221 def __hash__(self) -> int: 222 return hash(self._stream_name)
A dataset backed by a SQL table cache.
Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.
147 def __init__( 148 self, 149 cache: CacheBase, 150 stream_name: str, 151 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 152 ) -> None: 153 """We construct the query statement by selecting all columns from the table. 154 155 This prevents the need to scan the table schema to construct the query statement. 156 157 If stream_configuration is None, we attempt to retrieve the stream configuration from the 158 cache processor. This is useful when constructing a dataset from a CachedDataset object, 159 which already has the stream configuration. 160 161 If stream_configuration is set to False, we skip the stream configuration retrieval. 162 """ 163 table_name = cache.processor.get_sql_table_name(stream_name) 164 schema_name = cache.schema_name 165 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 166 super().__init__( 167 cache=cache, 168 stream_name=stream_name, 169 query_statement=query, 170 stream_configuration=stream_configuration, 171 )
We construct the query statement by selecting all columns from the table.
This prevents the need to scan the table schema to construct the query statement.
If stream_configuration is None, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.
If stream_configuration is set to False, we skip the stream configuration retrieval.
173 @overrides 174 def to_pandas(self) -> DataFrame: 175 """Return the underlying dataset data as a pandas DataFrame.""" 176 return self._cache.get_pandas_dataframe(self._stream_name)
Return the underlying dataset data as a pandas DataFrame.
178 @overrides 179 def to_arrow( 180 self, 181 *, 182 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 183 ) -> Dataset: 184 """Return an Arrow Dataset containing the data from the specified stream. 185 186 Args: 187 stream_name (str): Name of the stream to retrieve data from. 188 max_chunk_size (int): max number of records to include in each batch of pyarrow dataset. 189 190 Returns: 191 pa.dataset.Dataset: Arrow Dataset containing the stream's data. 192 """ 193 return self._cache.get_arrow_dataset( 194 stream_name=self._stream_name, 195 max_chunk_size=max_chunk_size, 196 )
Return an Arrow Dataset containing the data from the specified stream.
Arguments:
- stream_name (str): Name of the stream to retrieve data from.
- max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
Returns:
pa.dataset.Dataset: Arrow Dataset containing the stream's data.
198 def to_sql_table(self) -> Table: 199 """Return the underlying SQL table as a SQLAlchemy Table object.""" 200 return self._cache.processor.get_sql_table(self.stream_name)
Return the underlying SQL table as a SQLAlchemy Table object.
Inherited Members
24class DatasetBase(ABC): 25 """Base implementation for all datasets.""" 26 27 def __init__(self, stream_metadata: ConfiguredAirbyteStream) -> None: 28 self._stream_metadata = stream_metadata 29 30 @abstractmethod 31 def __iter__(self) -> Iterator[dict[str, Any]]: 32 """Return the iterator of records.""" 33 raise NotImplementedError 34 35 def to_pandas(self) -> DataFrame: 36 """Return a pandas DataFrame representation of the dataset. 37 38 The base implementation simply passes the record iterator to Panda's DataFrame constructor. 39 """ 40 # Technically, we return an iterator of Mapping objects. However, pandas 41 # expects an iterator of dict objects. This cast is safe because we know 42 # duck typing is correct for this use case. 43 return DataFrame(cast("Iterator[dict[str, Any]]", self)) 44 45 def to_arrow( 46 self, 47 *, 48 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 49 ) -> Dataset: 50 """Return an Arrow Dataset representation of the dataset. 51 52 This method should be implemented by subclasses. 53 """ 54 raise NotImplementedError("Not implemented in base class") 55 56 def to_documents( 57 self, 58 title_property: str | None = None, 59 content_properties: list[str] | None = None, 60 metadata_properties: list[str] | None = None, 61 *, 62 render_metadata: bool = False, 63 ) -> Iterable[Document]: 64 """Return the iterator of documents. 65 66 If metadata_properties is not set, all properties that are not content will be added to 67 the metadata. 68 69 If render_metadata is True, metadata will be rendered in the document, as well as the 70 the main content. Otherwise, metadata will be attached to the document but not rendered. 71 """ 72 renderer = DocumentRenderer( 73 title_property=title_property, 74 content_properties=content_properties, 75 metadata_properties=metadata_properties, 76 render_metadata=render_metadata, 77 ) 78 yield from renderer.render_documents(self)
Base implementation for all datasets.
35 def to_pandas(self) -> DataFrame: 36 """Return a pandas DataFrame representation of the dataset. 37 38 The base implementation simply passes the record iterator to Panda's DataFrame constructor. 39 """ 40 # Technically, we return an iterator of Mapping objects. However, pandas 41 # expects an iterator of dict objects. This cast is safe because we know 42 # duck typing is correct for this use case. 43 return DataFrame(cast("Iterator[dict[str, Any]]", self))
Return a pandas DataFrame representation of the dataset.
The base implementation simply passes the record iterator to Panda's DataFrame constructor.
45 def to_arrow( 46 self, 47 *, 48 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 49 ) -> Dataset: 50 """Return an Arrow Dataset representation of the dataset. 51 52 This method should be implemented by subclasses. 53 """ 54 raise NotImplementedError("Not implemented in base class")
Return an Arrow Dataset representation of the dataset.
This method should be implemented by subclasses.
56 def to_documents( 57 self, 58 title_property: str | None = None, 59 content_properties: list[str] | None = None, 60 metadata_properties: list[str] | None = None, 61 *, 62 render_metadata: bool = False, 63 ) -> Iterable[Document]: 64 """Return the iterator of documents. 65 66 If metadata_properties is not set, all properties that are not content will be added to 67 the metadata. 68 69 If render_metadata is True, metadata will be rendered in the document, as well as the 70 the main content. Otherwise, metadata will be attached to the document but not rendered. 71 """ 72 renderer = DocumentRenderer( 73 title_property=title_property, 74 content_properties=content_properties, 75 metadata_properties=metadata_properties, 76 render_metadata=render_metadata, 77 ) 78 yield from renderer.render_documents(self)
Return the iterator of documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content. Otherwise, metadata will be attached to the document but not rendered.
20class DatasetMap(Mapping): 21 """A generic interface for a set of streams or datasets.""" 22 23 def __init__(self) -> None: 24 self._datasets: dict[str, DatasetBase] = {} 25 26 def __getitem__(self, key: str) -> DatasetBase: 27 return self._datasets[key] 28 29 def __iter__(self) -> Iterator[str]: 30 return iter(self._datasets) 31 32 def __len__(self) -> int: 33 return len(self._datasets)
A generic interface for a set of streams or datasets.
Inherited Members
- collections.abc.Mapping
- get
- keys
- items
- values
18class LazyDataset(DatasetBase): 19 """A dataset that is loaded incrementally from a source or a SQL query.""" 20 21 def __init__( 22 self, 23 iterator: Iterator[dict[str, Any]], 24 stream_metadata: ConfiguredAirbyteStream, 25 ) -> None: 26 self._iterator: Iterator[dict[str, Any]] = iterator 27 super().__init__( 28 stream_metadata=stream_metadata, 29 ) 30 31 @overrides 32 def __iter__(self) -> Iterator[dict[str, Any]]: 33 return self._iterator 34 35 def __next__(self) -> Mapping[str, Any]: 36 return next(self._iterator)
A dataset that is loaded incrementally from a source or a SQL query.
Inherited Members
33class SQLDataset(DatasetBase): 34 """A dataset that is loaded incrementally from a SQL query. 35 36 The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full 37 table as the query statement. 38 """ 39 40 def __init__( 41 self, 42 cache: CacheBase, 43 stream_name: str, 44 query_statement: Select, 45 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 46 ) -> None: 47 """Initialize the dataset with a cache, stream name, and query statement. 48 49 This class is not intended to be created directly. Instead, you can retrieve 50 datasets from caches or Cloud connection objects, etc. 51 52 The query statement should be a SQLAlchemy Selectable object that can be executed to 53 retrieve records from the dataset. 54 55 If stream_configuration is not provided, we attempt to retrieve the stream configuration 56 from the cache processor. This is useful when constructing a dataset from a CachedDataset 57 object, which already has the stream configuration. 58 59 If stream_configuration is set to False, we skip the stream configuration retrieval. 60 """ 61 self._length: int | None = None 62 self._cache: CacheBase = cache 63 self._stream_name: str = stream_name 64 self._query_statement: Select = query_statement 65 if stream_configuration is None: 66 try: 67 stream_configuration = cache.processor.catalog_provider.get_configured_stream_info( 68 stream_name=stream_name 69 ) 70 except Exception as ex: 71 warnings.warn( 72 f"Failed to get stream configuration for {stream_name}: {ex}", 73 stacklevel=2, 74 ) 75 76 # Coalesce False to None 77 stream_configuration = stream_configuration or None 78 79 super().__init__(stream_metadata=stream_configuration) 80 81 @property 82 def stream_name(self) -> str: 83 return self._stream_name 84 85 def __iter__(self) -> Iterator[dict[str, Any]]: 86 with self._cache.processor.get_sql_connection() as conn: 87 for row in conn.execute(self._query_statement): 88 # Access to private member required because SQLAlchemy doesn't expose a public API. 89 # https://pydoc.dev/sqlalchemy/latest/sqlalchemy.engine.row.RowMapping.html 90 yield cast("dict[str, Any]", row._mapping) # noqa: SLF001 91 92 def __len__(self) -> int: 93 """Return the number of records in the dataset. 94 95 This method caches the length of the dataset after the first call. 96 """ 97 if self._length is None: 98 count_query = select(func.count()).select_from(self._query_statement.subquery()) 99 with self._cache.processor.get_sql_connection() as conn: 100 self._length = conn.execute(count_query).scalar() 101 102 return cast("int", self._length) 103 104 def to_pandas(self) -> DataFrame: 105 return self._cache.get_pandas_dataframe(self._stream_name) 106 107 def to_arrow( 108 self, 109 *, 110 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 111 ) -> Dataset: 112 return self._cache.get_arrow_dataset(self._stream_name, max_chunk_size=max_chunk_size) 113 114 def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset: 115 """Filter the dataset by a set of column values. 116 117 Filters can be specified as either a string or a SQLAlchemy expression. 118 119 Filters are lazily applied to the dataset, so they can be chained together. For example: 120 121 dataset.with_filter("id > 5").with_filter("id < 10") 122 123 is equivalent to: 124 125 dataset.with_filter("id > 5", "id < 10") 126 """ 127 # Convert all strings to TextClause objects. 128 filters: list[ClauseElement] = [ 129 text(expression) if isinstance(expression, str) else expression 130 for expression in filter_expressions 131 ] 132 filtered_select = self._query_statement.where(and_(*filters)) 133 return SQLDataset( 134 cache=self._cache, 135 stream_name=self._stream_name, 136 query_statement=filtered_select, 137 )
A dataset that is loaded incrementally from a SQL query.
The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full table as the query statement.
40 def __init__( 41 self, 42 cache: CacheBase, 43 stream_name: str, 44 query_statement: Select, 45 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 46 ) -> None: 47 """Initialize the dataset with a cache, stream name, and query statement. 48 49 This class is not intended to be created directly. Instead, you can retrieve 50 datasets from caches or Cloud connection objects, etc. 51 52 The query statement should be a SQLAlchemy Selectable object that can be executed to 53 retrieve records from the dataset. 54 55 If stream_configuration is not provided, we attempt to retrieve the stream configuration 56 from the cache processor. This is useful when constructing a dataset from a CachedDataset 57 object, which already has the stream configuration. 58 59 If stream_configuration is set to False, we skip the stream configuration retrieval. 60 """ 61 self._length: int | None = None 62 self._cache: CacheBase = cache 63 self._stream_name: str = stream_name 64 self._query_statement: Select = query_statement 65 if stream_configuration is None: 66 try: 67 stream_configuration = cache.processor.catalog_provider.get_configured_stream_info( 68 stream_name=stream_name 69 ) 70 except Exception as ex: 71 warnings.warn( 72 f"Failed to get stream configuration for {stream_name}: {ex}", 73 stacklevel=2, 74 ) 75 76 # Coalesce False to None 77 stream_configuration = stream_configuration or None 78 79 super().__init__(stream_metadata=stream_configuration)
Initialize the dataset with a cache, stream name, and query statement.
This class is not intended to be created directly. Instead, you can retrieve datasets from caches or Cloud connection objects, etc.
The query statement should be a SQLAlchemy Selectable object that can be executed to retrieve records from the dataset.
If stream_configuration is not provided, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.
If stream_configuration is set to False, we skip the stream configuration retrieval.
104 def to_pandas(self) -> DataFrame: 105 return self._cache.get_pandas_dataframe(self._stream_name)
Return a pandas DataFrame representation of the dataset.
The base implementation simply passes the record iterator to Panda's DataFrame constructor.
107 def to_arrow( 108 self, 109 *, 110 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 111 ) -> Dataset: 112 return self._cache.get_arrow_dataset(self._stream_name, max_chunk_size=max_chunk_size)
Return an Arrow Dataset representation of the dataset.
This method should be implemented by subclasses.
114 def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset: 115 """Filter the dataset by a set of column values. 116 117 Filters can be specified as either a string or a SQLAlchemy expression. 118 119 Filters are lazily applied to the dataset, so they can be chained together. For example: 120 121 dataset.with_filter("id > 5").with_filter("id < 10") 122 123 is equivalent to: 124 125 dataset.with_filter("id > 5", "id < 10") 126 """ 127 # Convert all strings to TextClause objects. 128 filters: list[ClauseElement] = [ 129 text(expression) if isinstance(expression, str) else expression 130 for expression in filter_expressions 131 ] 132 filtered_select = self._query_statement.where(and_(*filters)) 133 return SQLDataset( 134 cache=self._cache, 135 stream_name=self._stream_name, 136 query_statement=filtered_select, 137 )
Filter the dataset by a set of column values.
Filters can be specified as either a string or a SQLAlchemy expression.
Filters are lazily applied to the dataset, so they can be chained together. For example:
dataset.with_filter("id > 5").with_filter("id < 10")
is equivalent to:
dataset.with_filter("id > 5", "id < 10")