airbyte.results
Module which defines the ReadResult
and WriteResult
classes.
These classes are used to return information about read and write operations, respectively. They contain information such as the number of records read or written, the cache object, and the state handlers for a sync.
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Module which defines the `ReadResult` and `WriteResult` classes. 3 4These classes are used to return information about read and write operations, respectively. They 5contain information such as the number of records read or written, the cache object, and the 6state handlers for a sync. 7""" 8 9from __future__ import annotations 10 11from collections.abc import Mapping 12from typing import TYPE_CHECKING 13 14from airbyte.datasets import CachedDataset 15 16 17if TYPE_CHECKING: 18 from collections.abc import Iterator 19 20 from sqlalchemy.engine import Engine 21 22 from airbyte._writers.base import AirbyteWriterInterface 23 from airbyte.caches import CacheBase 24 from airbyte.destinations.base import Destination 25 from airbyte.progress import ProgressTracker 26 from airbyte.shared.catalog_providers import CatalogProvider 27 from airbyte.shared.state_providers import StateProviderBase 28 from airbyte.shared.state_writers import StateWriterBase 29 from airbyte.sources.base import Source 30 31 32class ReadResult(Mapping[str, CachedDataset]): 33 """The result of a read operation. 34 35 This class is used to return information about the read operation, such as the number of 36 records read. It should not be created directly, but instead returned by the write method 37 of a destination. 38 """ 39 40 def __init__( 41 self, 42 *, 43 source_name: str, 44 processed_streams: list[str], 45 cache: CacheBase, 46 progress_tracker: ProgressTracker, 47 ) -> None: 48 """Initialize a read result. 49 50 This class should not be created directly. Instead, it should be returned by the `read` 51 method of the `Source` class. 52 """ 53 self.source_name = source_name 54 self._progress_tracker = progress_tracker 55 self._cache = cache 56 self._processed_streams = processed_streams 57 58 def __getitem__(self, stream: str) -> CachedDataset: 59 """Return the cached dataset for a given stream name.""" 60 if stream not in self._processed_streams: 61 raise KeyError(stream) 62 63 return CachedDataset(self._cache, stream) 64 65 def __contains__(self, stream: object) -> bool: 66 """Return whether a given stream name was included in processing.""" 67 if not isinstance(stream, str): 68 return False 69 70 return stream in self._processed_streams 71 72 def __iter__(self) -> Iterator[str]: 73 """Return an iterator over the stream names that were processed.""" 74 return self._processed_streams.__iter__() 75 76 def __len__(self) -> int: 77 """Return the number of streams that were processed.""" 78 return len(self._processed_streams) 79 80 def get_sql_engine(self) -> Engine: 81 """Return the SQL engine used by the cache.""" 82 return self._cache.get_sql_engine() 83 84 @property 85 def processed_records(self) -> int: 86 """The total number of records read from the source.""" 87 return self._progress_tracker.total_records_read 88 89 @property 90 def streams(self) -> Mapping[str, CachedDataset]: 91 """Return a mapping of stream names to cached datasets.""" 92 return { 93 stream_name: CachedDataset(self._cache, stream_name) 94 for stream_name in self._processed_streams 95 } 96 97 @property 98 def cache(self) -> CacheBase: 99 """Return the cache object.""" 100 return self._cache 101 102 103class WriteResult: 104 """The result of a write operation. 105 106 This class is used to return information about the write operation, such as the number of 107 records written. It should not be created directly, but instead returned by the write method 108 of a destination. 109 """ 110 111 def __init__( 112 self, 113 *, 114 destination: AirbyteWriterInterface | Destination, 115 source_data: Source | ReadResult, 116 catalog_provider: CatalogProvider, 117 state_writer: StateWriterBase, 118 progress_tracker: ProgressTracker, 119 ) -> None: 120 """Initialize a write result. 121 122 This class should not be created directly. Instead, it should be returned by the `write` 123 method of the `Destination` class. 124 """ 125 self._destination: AirbyteWriterInterface | Destination = destination 126 self._source_data: Source | ReadResult = source_data 127 self._catalog_provider: CatalogProvider = catalog_provider 128 self._state_writer: StateWriterBase = state_writer 129 self._progress_tracker: ProgressTracker = progress_tracker 130 131 @property 132 def processed_records(self) -> int: 133 """The total number of records written to the destination.""" 134 return self._progress_tracker.total_destination_records_delivered 135 136 def get_state_provider(self) -> StateProviderBase: 137 """Return the state writer as a state provider. 138 139 As a public interface, we only expose the state writer as a state provider. This is because 140 the state writer itself is only intended for internal use. As a state provider, the state 141 writer can be used to read the state artifacts that were written. This can be useful for 142 testing or debugging. 143 """ 144 return self._state_writer
33class ReadResult(Mapping[str, CachedDataset]): 34 """The result of a read operation. 35 36 This class is used to return information about the read operation, such as the number of 37 records read. It should not be created directly, but instead returned by the write method 38 of a destination. 39 """ 40 41 def __init__( 42 self, 43 *, 44 source_name: str, 45 processed_streams: list[str], 46 cache: CacheBase, 47 progress_tracker: ProgressTracker, 48 ) -> None: 49 """Initialize a read result. 50 51 This class should not be created directly. Instead, it should be returned by the `read` 52 method of the `Source` class. 53 """ 54 self.source_name = source_name 55 self._progress_tracker = progress_tracker 56 self._cache = cache 57 self._processed_streams = processed_streams 58 59 def __getitem__(self, stream: str) -> CachedDataset: 60 """Return the cached dataset for a given stream name.""" 61 if stream not in self._processed_streams: 62 raise KeyError(stream) 63 64 return CachedDataset(self._cache, stream) 65 66 def __contains__(self, stream: object) -> bool: 67 """Return whether a given stream name was included in processing.""" 68 if not isinstance(stream, str): 69 return False 70 71 return stream in self._processed_streams 72 73 def __iter__(self) -> Iterator[str]: 74 """Return an iterator over the stream names that were processed.""" 75 return self._processed_streams.__iter__() 76 77 def __len__(self) -> int: 78 """Return the number of streams that were processed.""" 79 return len(self._processed_streams) 80 81 def get_sql_engine(self) -> Engine: 82 """Return the SQL engine used by the cache.""" 83 return self._cache.get_sql_engine() 84 85 @property 86 def processed_records(self) -> int: 87 """The total number of records read from the source.""" 88 return self._progress_tracker.total_records_read 89 90 @property 91 def streams(self) -> Mapping[str, CachedDataset]: 92 """Return a mapping of stream names to cached datasets.""" 93 return { 94 stream_name: CachedDataset(self._cache, stream_name) 95 for stream_name in self._processed_streams 96 } 97 98 @property 99 def cache(self) -> CacheBase: 100 """Return the cache object.""" 101 return self._cache
The result of a read operation.
This class is used to return information about the read operation, such as the number of records read. It should not be created directly, but instead returned by the write method of a destination.
41 def __init__( 42 self, 43 *, 44 source_name: str, 45 processed_streams: list[str], 46 cache: CacheBase, 47 progress_tracker: ProgressTracker, 48 ) -> None: 49 """Initialize a read result. 50 51 This class should not be created directly. Instead, it should be returned by the `read` 52 method of the `Source` class. 53 """ 54 self.source_name = source_name 55 self._progress_tracker = progress_tracker 56 self._cache = cache 57 self._processed_streams = processed_streams
Initialize a read result.
This class should not be created directly. Instead, it should be returned by the read
method of the Source
class.
81 def get_sql_engine(self) -> Engine: 82 """Return the SQL engine used by the cache.""" 83 return self._cache.get_sql_engine()
Return the SQL engine used by the cache.
85 @property 86 def processed_records(self) -> int: 87 """The total number of records read from the source.""" 88 return self._progress_tracker.total_records_read
The total number of records read from the source.
90 @property 91 def streams(self) -> Mapping[str, CachedDataset]: 92 """Return a mapping of stream names to cached datasets.""" 93 return { 94 stream_name: CachedDataset(self._cache, stream_name) 95 for stream_name in self._processed_streams 96 }
Return a mapping of stream names to cached datasets.
98 @property 99 def cache(self) -> CacheBase: 100 """Return the cache object.""" 101 return self._cache
Return the cache object.
Inherited Members
- collections.abc.Mapping
- get
- keys
- items
- values
104class WriteResult: 105 """The result of a write operation. 106 107 This class is used to return information about the write operation, such as the number of 108 records written. It should not be created directly, but instead returned by the write method 109 of a destination. 110 """ 111 112 def __init__( 113 self, 114 *, 115 destination: AirbyteWriterInterface | Destination, 116 source_data: Source | ReadResult, 117 catalog_provider: CatalogProvider, 118 state_writer: StateWriterBase, 119 progress_tracker: ProgressTracker, 120 ) -> None: 121 """Initialize a write result. 122 123 This class should not be created directly. Instead, it should be returned by the `write` 124 method of the `Destination` class. 125 """ 126 self._destination: AirbyteWriterInterface | Destination = destination 127 self._source_data: Source | ReadResult = source_data 128 self._catalog_provider: CatalogProvider = catalog_provider 129 self._state_writer: StateWriterBase = state_writer 130 self._progress_tracker: ProgressTracker = progress_tracker 131 132 @property 133 def processed_records(self) -> int: 134 """The total number of records written to the destination.""" 135 return self._progress_tracker.total_destination_records_delivered 136 137 def get_state_provider(self) -> StateProviderBase: 138 """Return the state writer as a state provider. 139 140 As a public interface, we only expose the state writer as a state provider. This is because 141 the state writer itself is only intended for internal use. As a state provider, the state 142 writer can be used to read the state artifacts that were written. This can be useful for 143 testing or debugging. 144 """ 145 return self._state_writer
The result of a write operation.
This class is used to return information about the write operation, such as the number of records written. It should not be created directly, but instead returned by the write method of a destination.
112 def __init__( 113 self, 114 *, 115 destination: AirbyteWriterInterface | Destination, 116 source_data: Source | ReadResult, 117 catalog_provider: CatalogProvider, 118 state_writer: StateWriterBase, 119 progress_tracker: ProgressTracker, 120 ) -> None: 121 """Initialize a write result. 122 123 This class should not be created directly. Instead, it should be returned by the `write` 124 method of the `Destination` class. 125 """ 126 self._destination: AirbyteWriterInterface | Destination = destination 127 self._source_data: Source | ReadResult = source_data 128 self._catalog_provider: CatalogProvider = catalog_provider 129 self._state_writer: StateWriterBase = state_writer 130 self._progress_tracker: ProgressTracker = progress_tracker
Initialize a write result.
This class should not be created directly. Instead, it should be returned by the write
method of the Destination
class.
132 @property 133 def processed_records(self) -> int: 134 """The total number of records written to the destination.""" 135 return self._progress_tracker.total_destination_records_delivered
The total number of records written to the destination.
137 def get_state_provider(self) -> StateProviderBase: 138 """Return the state writer as a state provider. 139 140 As a public interface, we only expose the state writer as a state provider. This is because 141 the state writer itself is only intended for internal use. As a state provider, the state 142 writer can be used to read the state artifacts that were written. This can be useful for 143 testing or debugging. 144 """ 145 return self._state_writer
Return the state writer as a state provider.
As a public interface, we only expose the state writer as a state provider. This is because the state writer itself is only intended for internal use. As a state provider, the state writer can be used to read the state artifacts that were written. This can be useful for testing or debugging.