airbyte.datasets
PyAirbyte dataset classes.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte dataset classes.""" 3 4from __future__ import annotations 5 6from airbyte.datasets._base import DatasetBase 7from airbyte.datasets._lazy import LazyDataset 8from airbyte.datasets._map import DatasetMap 9from airbyte.datasets._sql import CachedDataset, SQLDataset 10 11 12__all__ = [ 13 "CachedDataset", 14 "DatasetBase", 15 "DatasetMap", 16 "LazyDataset", 17 "SQLDataset", 18]
150class CachedDataset(SQLDataset): 151 """A dataset backed by a SQL table cache. 152 153 Because this dataset includes all records from the underlying table, we also expose the 154 underlying table as a SQLAlchemy Table object. 155 """ 156 157 def __init__( 158 self, 159 cache: CacheBase, 160 stream_name: str, 161 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 162 ) -> None: 163 """We construct the query statement by selecting all columns from the table. 164 165 This prevents the need to scan the table schema to construct the query statement. 166 167 If stream_configuration is None, we attempt to retrieve the stream configuration from the 168 cache processor. This is useful when constructing a dataset from a CachedDataset object, 169 which already has the stream configuration. 170 171 If stream_configuration is set to False, we skip the stream configuration retrieval. 172 """ 173 table_name = cache.processor.get_sql_table_name(stream_name) 174 schema_name = cache.schema_name 175 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 176 super().__init__( 177 cache=cache, 178 stream_name=stream_name, 179 query_statement=query, 180 stream_configuration=stream_configuration, 181 ) 182 183 @overrides 184 def to_pandas(self) -> DataFrame: 185 """Return the underlying dataset data as a pandas DataFrame.""" 186 return self._cache.get_pandas_dataframe(self._stream_name) 187 188 @overrides 189 def to_arrow( 190 self, 191 *, 192 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 193 ) -> Dataset: 194 """Return an Arrow Dataset containing the data from the specified stream. 195 196 Args: 197 stream_name (str): Name of the stream to retrieve data from. 198 max_chunk_size (int): max number of records to include in each batch of pyarrow dataset. 199 200 Returns: 201 pa.dataset.Dataset: Arrow Dataset containing the stream's data. 202 """ 203 return self._cache.get_arrow_dataset( 204 stream_name=self._stream_name, 205 max_chunk_size=max_chunk_size, 206 ) 207 208 def to_sql_table(self) -> Table: 209 """Return the underlying SQL table as a SQLAlchemy Table object.""" 210 return self._cache.processor.get_sql_table(self.stream_name) 211 212 def __eq__(self, value: object) -> bool: 213 """Return True if the value is a CachedDataset with the same cache and stream name. 214 215 In the case of CachedDataset objects, we can simply compare the cache and stream name. 216 217 Note that this equality check is only supported on CachedDataset objects and not for 218 the base SQLDataset implementation. This is because of the complexity and computational 219 cost of comparing two arbitrary SQL queries that could be bound to different variables, 220 as well as the chance that two queries can be syntactically equivalent without being 221 text-wise equivalent. 222 """ 223 if not isinstance(value, SQLDataset): 224 return False 225 226 if self._cache is not value._cache: 227 return False 228 229 return not self._stream_name != value._stream_name 230 231 def __hash__(self) -> int: 232 return hash(self._stream_name)
A dataset backed by a SQL table cache.
Because this dataset includes all records from the underlying table, we also expose the underlying table as a SQLAlchemy Table object.
157 def __init__( 158 self, 159 cache: CacheBase, 160 stream_name: str, 161 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 162 ) -> None: 163 """We construct the query statement by selecting all columns from the table. 164 165 This prevents the need to scan the table schema to construct the query statement. 166 167 If stream_configuration is None, we attempt to retrieve the stream configuration from the 168 cache processor. This is useful when constructing a dataset from a CachedDataset object, 169 which already has the stream configuration. 170 171 If stream_configuration is set to False, we skip the stream configuration retrieval. 172 """ 173 table_name = cache.processor.get_sql_table_name(stream_name) 174 schema_name = cache.schema_name 175 query = select("*").select_from(text(f"{schema_name}.{table_name}")) 176 super().__init__( 177 cache=cache, 178 stream_name=stream_name, 179 query_statement=query, 180 stream_configuration=stream_configuration, 181 )
We construct the query statement by selecting all columns from the table.
This prevents the need to scan the table schema to construct the query statement.
If stream_configuration is None, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.
If stream_configuration is set to False, we skip the stream configuration retrieval.
183 @overrides 184 def to_pandas(self) -> DataFrame: 185 """Return the underlying dataset data as a pandas DataFrame.""" 186 return self._cache.get_pandas_dataframe(self._stream_name)
Return the underlying dataset data as a pandas DataFrame.
188 @overrides 189 def to_arrow( 190 self, 191 *, 192 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 193 ) -> Dataset: 194 """Return an Arrow Dataset containing the data from the specified stream. 195 196 Args: 197 stream_name (str): Name of the stream to retrieve data from. 198 max_chunk_size (int): max number of records to include in each batch of pyarrow dataset. 199 200 Returns: 201 pa.dataset.Dataset: Arrow Dataset containing the stream's data. 202 """ 203 return self._cache.get_arrow_dataset( 204 stream_name=self._stream_name, 205 max_chunk_size=max_chunk_size, 206 )
Return an Arrow Dataset containing the data from the specified stream.
Arguments:
- stream_name (str): Name of the stream to retrieve data from.
- max_chunk_size (int): max number of records to include in each batch of pyarrow dataset.
Returns:
pa.dataset.Dataset: Arrow Dataset containing the stream's data.
208 def to_sql_table(self) -> Table: 209 """Return the underlying SQL table as a SQLAlchemy Table object.""" 210 return self._cache.processor.get_sql_table(self.stream_name)
Return the underlying SQL table as a SQLAlchemy Table object.
Inherited Members
26class DatasetBase(ABC): 27 """Base implementation for all datasets.""" 28 29 def __init__(self, stream_metadata: ConfiguredAirbyteStream) -> None: 30 self._stream_metadata = stream_metadata 31 32 @abstractmethod 33 def __iter__(self) -> Iterator[dict[str, Any]]: 34 """Return the iterator of records.""" 35 raise NotImplementedError 36 37 def to_pandas(self) -> DataFrame: 38 """Return a pandas DataFrame representation of the dataset. 39 40 The base implementation simply passes the record iterator to Panda's DataFrame constructor. 41 """ 42 # Technically, we return an iterator of Mapping objects. However, pandas 43 # expects an iterator of dict objects. This cast is safe because we know 44 # duck typing is correct for this use case. 45 return DataFrame(cast("Iterator[dict[str, Any]]", self)) 46 47 def to_arrow( 48 self, 49 *, 50 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 51 ) -> Dataset: 52 """Return an Arrow Dataset representation of the dataset. 53 54 This method should be implemented by subclasses. 55 """ 56 raise NotImplementedError("Not implemented in base class") 57 58 def to_documents( 59 self, 60 title_property: str | None = None, 61 content_properties: list[str] | None = None, 62 metadata_properties: list[str] | None = None, 63 *, 64 render_metadata: bool = False, 65 ) -> Iterable[Document]: 66 """Return the iterator of documents. 67 68 If metadata_properties is not set, all properties that are not content will be added to 69 the metadata. 70 71 If render_metadata is True, metadata will be rendered in the document, as well as the 72 the main content. Otherwise, metadata will be attached to the document but not rendered. 73 """ 74 renderer = DocumentRenderer( 75 title_property=title_property, 76 content_properties=content_properties, 77 metadata_properties=metadata_properties, 78 render_metadata=render_metadata, 79 ) 80 yield from renderer.render_documents(self) 81 82 @property 83 def column_names(self) -> list[str]: 84 """Return the list of top-level column names.""" 85 return list(self._stream_metadata.stream.json_schema["properties"].keys())
Base implementation for all datasets.
37 def to_pandas(self) -> DataFrame: 38 """Return a pandas DataFrame representation of the dataset. 39 40 The base implementation simply passes the record iterator to Panda's DataFrame constructor. 41 """ 42 # Technically, we return an iterator of Mapping objects. However, pandas 43 # expects an iterator of dict objects. This cast is safe because we know 44 # duck typing is correct for this use case. 45 return DataFrame(cast("Iterator[dict[str, Any]]", self))
Return a pandas DataFrame representation of the dataset.
The base implementation simply passes the record iterator to Panda's DataFrame constructor.
47 def to_arrow( 48 self, 49 *, 50 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 51 ) -> Dataset: 52 """Return an Arrow Dataset representation of the dataset. 53 54 This method should be implemented by subclasses. 55 """ 56 raise NotImplementedError("Not implemented in base class")
Return an Arrow Dataset representation of the dataset.
This method should be implemented by subclasses.
58 def to_documents( 59 self, 60 title_property: str | None = None, 61 content_properties: list[str] | None = None, 62 metadata_properties: list[str] | None = None, 63 *, 64 render_metadata: bool = False, 65 ) -> Iterable[Document]: 66 """Return the iterator of documents. 67 68 If metadata_properties is not set, all properties that are not content will be added to 69 the metadata. 70 71 If render_metadata is True, metadata will be rendered in the document, as well as the 72 the main content. Otherwise, metadata will be attached to the document but not rendered. 73 """ 74 renderer = DocumentRenderer( 75 title_property=title_property, 76 content_properties=content_properties, 77 metadata_properties=metadata_properties, 78 render_metadata=render_metadata, 79 ) 80 yield from renderer.render_documents(self)
Return the iterator of documents.
If metadata_properties is not set, all properties that are not content will be added to the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the the main content. Otherwise, metadata will be attached to the document but not rendered.
20class DatasetMap(Mapping): 21 """A generic interface for a set of streams or datasets.""" 22 23 def __init__(self) -> None: 24 self._datasets: dict[str, DatasetBase] = {} 25 26 def __getitem__(self, key: str) -> DatasetBase: 27 return self._datasets[key] 28 29 def __iter__(self) -> Iterator[str]: 30 return iter(self._datasets) 31 32 def __len__(self) -> int: 33 return len(self._datasets)
A generic interface for a set of streams or datasets.
Inherited Members
- collections.abc.Mapping
- get
- keys
- items
- values
22class LazyDataset(DatasetBase): 23 """A dataset that is loaded incrementally from a source or a SQL query.""" 24 25 def __init__( 26 self, 27 iterator: Iterator[dict[str, Any]], 28 *, 29 stream_metadata: ConfiguredAirbyteStream, 30 stop_event: threading.Event | None, 31 progress_tracker: progress.ProgressTracker, 32 ) -> None: 33 self._stop_event: threading.Event | None = stop_event or None 34 self._progress_tracker = progress_tracker 35 self._iterator: Iterator[dict[str, Any]] = iterator 36 super().__init__( 37 stream_metadata=stream_metadata, 38 ) 39 40 @overrides 41 def __iter__(self) -> Iterator[dict[str, Any]]: 42 return self._iterator 43 44 def __next__(self) -> Mapping[str, Any]: 45 try: 46 return next(self._iterator) 47 except StopIteration: 48 # The iterator is exhausted, tell the producer they can stop if they are still 49 # producing records. (Esp. when an artificial limit is reached.) 50 self._progress_tracker.log_success() 51 if self._stop_event: 52 self._stop_event.set() 53 raise 54 55 def fetch_all(self) -> InMemoryDataset: 56 """Fetch all records to memory and return an InMemoryDataset.""" 57 return InMemoryDataset( 58 records=list(self._iterator), 59 stream_metadata=self._stream_metadata, 60 ) 61 62 def close(self) -> None: 63 """Stop the dataset iterator. 64 65 This method is used to signal the dataset to stop fetching records, for example 66 when the dataset is being fetched incrementally and the user wants to stop the 67 fetching process. 68 """ 69 if self._stop_event: 70 self._stop_event.set() 71 72 def __del__(self) -> None: 73 """Close the dataset when the object is deleted.""" 74 self.close()
A dataset that is loaded incrementally from a source or a SQL query.
25 def __init__( 26 self, 27 iterator: Iterator[dict[str, Any]], 28 *, 29 stream_metadata: ConfiguredAirbyteStream, 30 stop_event: threading.Event | None, 31 progress_tracker: progress.ProgressTracker, 32 ) -> None: 33 self._stop_event: threading.Event | None = stop_event or None 34 self._progress_tracker = progress_tracker 35 self._iterator: Iterator[dict[str, Any]] = iterator 36 super().__init__( 37 stream_metadata=stream_metadata, 38 )
55 def fetch_all(self) -> InMemoryDataset: 56 """Fetch all records to memory and return an InMemoryDataset.""" 57 return InMemoryDataset( 58 records=list(self._iterator), 59 stream_metadata=self._stream_metadata, 60 )
Fetch all records to memory and return an InMemoryDataset.
62 def close(self) -> None: 63 """Stop the dataset iterator. 64 65 This method is used to signal the dataset to stop fetching records, for example 66 when the dataset is being fetched incrementally and the user wants to stop the 67 fetching process. 68 """ 69 if self._stop_event: 70 self._stop_event.set()
Stop the dataset iterator.
This method is used to signal the dataset to stop fetching records, for example when the dataset is being fetched incrementally and the user wants to stop the fetching process.
Inherited Members
38class SQLDataset(DatasetBase): 39 """A dataset that is loaded incrementally from a SQL query. 40 41 The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full 42 table as the query statement. 43 """ 44 45 def __init__( 46 self, 47 cache: CacheBase, 48 stream_name: str, 49 query_statement: Select, 50 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 51 ) -> None: 52 """Initialize the dataset with a cache, stream name, and query statement. 53 54 This class is not intended to be created directly. Instead, you can retrieve 55 datasets from caches or Cloud connection objects, etc. 56 57 The query statement should be a SQLAlchemy Selectable object that can be executed to 58 retrieve records from the dataset. 59 60 If stream_configuration is not provided, we attempt to retrieve the stream configuration 61 from the cache processor. This is useful when constructing a dataset from a CachedDataset 62 object, which already has the stream configuration. 63 64 If stream_configuration is set to False, we skip the stream configuration retrieval. 65 """ 66 self._length: int | None = None 67 self._cache: CacheBase = cache 68 self._stream_name: str = stream_name 69 self._query_statement: Select = query_statement 70 if stream_configuration is None: 71 try: 72 stream_configuration = cache.processor.catalog_provider.get_configured_stream_info( 73 stream_name=stream_name 74 ) 75 except Exception as ex: 76 warnings.warn( 77 f"Failed to get stream configuration for {stream_name}: {ex}", 78 stacklevel=2, 79 ) 80 81 # Coalesce False to None 82 stream_configuration = stream_configuration or None 83 84 super().__init__(stream_metadata=stream_configuration) 85 86 @property 87 def stream_name(self) -> str: 88 return self._stream_name 89 90 def __iter__(self) -> Iterator[dict[str, Any]]: 91 with self._cache.processor.get_sql_connection() as conn: 92 for row in conn.execute(self._query_statement): 93 # Access to private member required because SQLAlchemy doesn't expose a public API. 94 # https://pydoc.dev/sqlalchemy/latest/sqlalchemy.engine.row.RowMapping.html 95 yield cast("dict[str, Any]", row._mapping) # noqa: SLF001 96 97 def __len__(self) -> int: 98 """Return the number of records in the dataset. 99 100 This method caches the length of the dataset after the first call. 101 """ 102 if self._length is None: 103 count_query = select(func.count()).select_from(self._query_statement.subquery()) 104 with self._cache.processor.get_sql_connection() as conn: 105 self._length = conn.execute(count_query).scalar() 106 107 return cast("int", self._length) 108 109 def to_pandas(self) -> DataFrame: 110 return self._cache.get_pandas_dataframe(self._stream_name) 111 112 def to_arrow( 113 self, 114 *, 115 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 116 ) -> Dataset: 117 return self._cache.get_arrow_dataset(self._stream_name, max_chunk_size=max_chunk_size) 118 119 def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset: 120 """Filter the dataset by a set of column values. 121 122 Filters can be specified as either a string or a SQLAlchemy expression. 123 124 Filters are lazily applied to the dataset, so they can be chained together. For example: 125 126 dataset.with_filter("id > 5").with_filter("id < 10") 127 128 is equivalent to: 129 130 dataset.with_filter("id > 5", "id < 10") 131 """ 132 # Convert all strings to TextClause objects. 133 filters: list[ClauseElement] = [ 134 text(expression) if isinstance(expression, str) else expression 135 for expression in filter_expressions 136 ] 137 filtered_select = self._query_statement.where(and_(*filters)) 138 return SQLDataset( 139 cache=self._cache, 140 stream_name=self._stream_name, 141 query_statement=filtered_select, 142 ) 143 144 @property 145 def column_names(self) -> list[str]: 146 """Return the list of top-level column names, including internal Airbyte columns.""" 147 return [*super().column_names, AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN]
A dataset that is loaded incrementally from a SQL query.
The CachedDataset class is a subclass of this class, which simply passes a SELECT over the full table as the query statement.
45 def __init__( 46 self, 47 cache: CacheBase, 48 stream_name: str, 49 query_statement: Select, 50 stream_configuration: ConfiguredAirbyteStream | Literal[False] | None = None, 51 ) -> None: 52 """Initialize the dataset with a cache, stream name, and query statement. 53 54 This class is not intended to be created directly. Instead, you can retrieve 55 datasets from caches or Cloud connection objects, etc. 56 57 The query statement should be a SQLAlchemy Selectable object that can be executed to 58 retrieve records from the dataset. 59 60 If stream_configuration is not provided, we attempt to retrieve the stream configuration 61 from the cache processor. This is useful when constructing a dataset from a CachedDataset 62 object, which already has the stream configuration. 63 64 If stream_configuration is set to False, we skip the stream configuration retrieval. 65 """ 66 self._length: int | None = None 67 self._cache: CacheBase = cache 68 self._stream_name: str = stream_name 69 self._query_statement: Select = query_statement 70 if stream_configuration is None: 71 try: 72 stream_configuration = cache.processor.catalog_provider.get_configured_stream_info( 73 stream_name=stream_name 74 ) 75 except Exception as ex: 76 warnings.warn( 77 f"Failed to get stream configuration for {stream_name}: {ex}", 78 stacklevel=2, 79 ) 80 81 # Coalesce False to None 82 stream_configuration = stream_configuration or None 83 84 super().__init__(stream_metadata=stream_configuration)
Initialize the dataset with a cache, stream name, and query statement.
This class is not intended to be created directly. Instead, you can retrieve datasets from caches or Cloud connection objects, etc.
The query statement should be a SQLAlchemy Selectable object that can be executed to retrieve records from the dataset.
If stream_configuration is not provided, we attempt to retrieve the stream configuration from the cache processor. This is useful when constructing a dataset from a CachedDataset object, which already has the stream configuration.
If stream_configuration is set to False, we skip the stream configuration retrieval.
109 def to_pandas(self) -> DataFrame: 110 return self._cache.get_pandas_dataframe(self._stream_name)
Return a pandas DataFrame representation of the dataset.
The base implementation simply passes the record iterator to Panda's DataFrame constructor.
112 def to_arrow( 113 self, 114 *, 115 max_chunk_size: int = DEFAULT_ARROW_MAX_CHUNK_SIZE, 116 ) -> Dataset: 117 return self._cache.get_arrow_dataset(self._stream_name, max_chunk_size=max_chunk_size)
Return an Arrow Dataset representation of the dataset.
This method should be implemented by subclasses.
119 def with_filter(self, *filter_expressions: ClauseElement | str) -> SQLDataset: 120 """Filter the dataset by a set of column values. 121 122 Filters can be specified as either a string or a SQLAlchemy expression. 123 124 Filters are lazily applied to the dataset, so they can be chained together. For example: 125 126 dataset.with_filter("id > 5").with_filter("id < 10") 127 128 is equivalent to: 129 130 dataset.with_filter("id > 5", "id < 10") 131 """ 132 # Convert all strings to TextClause objects. 133 filters: list[ClauseElement] = [ 134 text(expression) if isinstance(expression, str) else expression 135 for expression in filter_expressions 136 ] 137 filtered_select = self._query_statement.where(and_(*filters)) 138 return SQLDataset( 139 cache=self._cache, 140 stream_name=self._stream_name, 141 query_statement=filtered_select, 142 )
Filter the dataset by a set of column values.
Filters can be specified as either a string or a SQLAlchemy expression.
Filters are lazily applied to the dataset, so they can be chained together. For example:
dataset.with_filter("id > 5").with_filter("id < 10")
is equivalent to:
dataset.with_filter("id > 5", "id < 10")
144 @property 145 def column_names(self) -> list[str]: 146 """Return the list of top-level column names, including internal Airbyte columns.""" 147 return [*super().column_names, AB_RAW_ID_COLUMN, AB_EXTRACTED_AT_COLUMN, AB_META_COLUMN]
Return the list of top-level column names, including internal Airbyte columns.