airbyte.results

Module which defines the ReadResult and WriteResult classes.

These classes are used to return information about read and write operations, respectively. They contain information such as the number of records read or written, the cache object, and the state handlers for a sync.

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Module which defines the `ReadResult` and `WriteResult` classes.
  3
  4These classes are used to return information about read and write operations, respectively. They
  5contain information such as the number of records read or written, the cache object, and the
  6state handlers for a sync.
  7"""
  8
  9from __future__ import annotations
 10
 11from collections.abc import Mapping
 12from typing import TYPE_CHECKING
 13
 14from airbyte.datasets import CachedDataset
 15
 16
 17if TYPE_CHECKING:
 18    from collections.abc import Iterator
 19
 20    from sqlalchemy.engine import Engine
 21
 22    from airbyte._writers.base import AirbyteWriterInterface
 23    from airbyte.caches import CacheBase
 24    from airbyte.destinations.base import Destination
 25    from airbyte.progress import ProgressTracker
 26    from airbyte.shared.catalog_providers import CatalogProvider
 27    from airbyte.shared.state_providers import StateProviderBase
 28    from airbyte.shared.state_writers import StateWriterBase
 29    from airbyte.sources.base import Source
 30
 31
 32class ReadResult(Mapping[str, CachedDataset]):
 33    """The result of a read operation.
 34
 35    This class is used to return information about the read operation, such as the number of
 36    records read. It should not be created directly, but instead returned by the write method
 37    of a destination.
 38    """
 39
 40    def __init__(
 41        self,
 42        *,
 43        source_name: str,
 44        processed_streams: list[str],
 45        cache: CacheBase,
 46        progress_tracker: ProgressTracker,
 47    ) -> None:
 48        """Initialize a read result.
 49
 50        This class should not be created directly. Instead, it should be returned by the `read`
 51        method of the `Source` class.
 52        """
 53        self.source_name = source_name
 54        self._progress_tracker = progress_tracker
 55        self._cache = cache
 56        self._processed_streams = processed_streams
 57
 58    def __getitem__(self, stream: str) -> CachedDataset:
 59        """Return the cached dataset for a given stream name."""
 60        if stream not in self._processed_streams:
 61            raise KeyError(stream)
 62
 63        return CachedDataset(self._cache, stream)
 64
 65    def __contains__(self, stream: object) -> bool:
 66        """Return whether a given stream name was included in processing."""
 67        if not isinstance(stream, str):
 68            return False
 69
 70        return stream in self._processed_streams
 71
 72    def __iter__(self) -> Iterator[str]:
 73        """Return an iterator over the stream names that were processed."""
 74        return self._processed_streams.__iter__()
 75
 76    def __len__(self) -> int:
 77        """Return the number of streams that were processed."""
 78        return len(self._processed_streams)
 79
 80    def get_sql_engine(self) -> Engine:
 81        """Return the SQL engine used by the cache."""
 82        return self._cache.get_sql_engine()
 83
 84    @property
 85    def processed_records(self) -> int:
 86        """The total number of records read from the source."""
 87        return self._progress_tracker.total_records_read
 88
 89    @property
 90    def streams(self) -> Mapping[str, CachedDataset]:
 91        """Return a mapping of stream names to cached datasets."""
 92        return {
 93            stream_name: CachedDataset(self._cache, stream_name)
 94            for stream_name in self._processed_streams
 95        }
 96
 97    @property
 98    def cache(self) -> CacheBase:
 99        """Return the cache object."""
100        return self._cache
101
102
103class WriteResult:
104    """The result of a write operation.
105
106    This class is used to return information about the write operation, such as the number of
107    records written. It should not be created directly, but instead returned by the write method
108    of a destination.
109    """
110
111    def __init__(
112        self,
113        *,
114        destination: AirbyteWriterInterface | Destination,
115        source_data: Source | ReadResult,
116        catalog_provider: CatalogProvider,
117        state_writer: StateWriterBase,
118        progress_tracker: ProgressTracker,
119    ) -> None:
120        """Initialize a write result.
121
122        This class should not be created directly. Instead, it should be returned by the `write`
123        method of the `Destination` class.
124        """
125        self._destination: AirbyteWriterInterface | Destination = destination
126        self._source_data: Source | ReadResult = source_data
127        self._catalog_provider: CatalogProvider = catalog_provider
128        self._state_writer: StateWriterBase = state_writer
129        self._progress_tracker: ProgressTracker = progress_tracker
130
131    @property
132    def processed_records(self) -> int:
133        """The total number of records written to the destination."""
134        return self._progress_tracker.total_destination_records_delivered
135
136    def get_state_provider(self) -> StateProviderBase:
137        """Return the state writer as a state provider.
138
139        As a public interface, we only expose the state writer as a state provider. This is because
140        the state writer itself is only intended for internal use. As a state provider, the state
141        writer can be used to read the state artifacts that were written. This can be useful for
142        testing or debugging.
143        """
144        return self._state_writer
class ReadResult(collections.abc.Mapping[str, airbyte.datasets._sql.CachedDataset]):
 33class ReadResult(Mapping[str, CachedDataset]):
 34    """The result of a read operation.
 35
 36    This class is used to return information about the read operation, such as the number of
 37    records read. It should not be created directly, but instead returned by the write method
 38    of a destination.
 39    """
 40
 41    def __init__(
 42        self,
 43        *,
 44        source_name: str,
 45        processed_streams: list[str],
 46        cache: CacheBase,
 47        progress_tracker: ProgressTracker,
 48    ) -> None:
 49        """Initialize a read result.
 50
 51        This class should not be created directly. Instead, it should be returned by the `read`
 52        method of the `Source` class.
 53        """
 54        self.source_name = source_name
 55        self._progress_tracker = progress_tracker
 56        self._cache = cache
 57        self._processed_streams = processed_streams
 58
 59    def __getitem__(self, stream: str) -> CachedDataset:
 60        """Return the cached dataset for a given stream name."""
 61        if stream not in self._processed_streams:
 62            raise KeyError(stream)
 63
 64        return CachedDataset(self._cache, stream)
 65
 66    def __contains__(self, stream: object) -> bool:
 67        """Return whether a given stream name was included in processing."""
 68        if not isinstance(stream, str):
 69            return False
 70
 71        return stream in self._processed_streams
 72
 73    def __iter__(self) -> Iterator[str]:
 74        """Return an iterator over the stream names that were processed."""
 75        return self._processed_streams.__iter__()
 76
 77    def __len__(self) -> int:
 78        """Return the number of streams that were processed."""
 79        return len(self._processed_streams)
 80
 81    def get_sql_engine(self) -> Engine:
 82        """Return the SQL engine used by the cache."""
 83        return self._cache.get_sql_engine()
 84
 85    @property
 86    def processed_records(self) -> int:
 87        """The total number of records read from the source."""
 88        return self._progress_tracker.total_records_read
 89
 90    @property
 91    def streams(self) -> Mapping[str, CachedDataset]:
 92        """Return a mapping of stream names to cached datasets."""
 93        return {
 94            stream_name: CachedDataset(self._cache, stream_name)
 95            for stream_name in self._processed_streams
 96        }
 97
 98    @property
 99    def cache(self) -> CacheBase:
100        """Return the cache object."""
101        return self._cache

The result of a read operation.

This class is used to return information about the read operation, such as the number of records read. It should not be created directly, but instead returned by the write method of a destination.

ReadResult( *, source_name: str, processed_streams: list[str], cache: airbyte.caches.CacheBase, progress_tracker: airbyte.progress.ProgressTracker)
41    def __init__(
42        self,
43        *,
44        source_name: str,
45        processed_streams: list[str],
46        cache: CacheBase,
47        progress_tracker: ProgressTracker,
48    ) -> None:
49        """Initialize a read result.
50
51        This class should not be created directly. Instead, it should be returned by the `read`
52        method of the `Source` class.
53        """
54        self.source_name = source_name
55        self._progress_tracker = progress_tracker
56        self._cache = cache
57        self._processed_streams = processed_streams

Initialize a read result.

This class should not be created directly. Instead, it should be returned by the read method of the Source class.

source_name
def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
81    def get_sql_engine(self) -> Engine:
82        """Return the SQL engine used by the cache."""
83        return self._cache.get_sql_engine()

Return the SQL engine used by the cache.

processed_records: int
85    @property
86    def processed_records(self) -> int:
87        """The total number of records read from the source."""
88        return self._progress_tracker.total_records_read

The total number of records read from the source.

streams: Mapping[str, airbyte.CachedDataset]
90    @property
91    def streams(self) -> Mapping[str, CachedDataset]:
92        """Return a mapping of stream names to cached datasets."""
93        return {
94            stream_name: CachedDataset(self._cache, stream_name)
95            for stream_name in self._processed_streams
96        }

Return a mapping of stream names to cached datasets.

cache: airbyte.caches.CacheBase
 98    @property
 99    def cache(self) -> CacheBase:
100        """Return the cache object."""
101        return self._cache

Return the cache object.

Inherited Members
collections.abc.Mapping
get
keys
items
values
class WriteResult:
104class WriteResult:
105    """The result of a write operation.
106
107    This class is used to return information about the write operation, such as the number of
108    records written. It should not be created directly, but instead returned by the write method
109    of a destination.
110    """
111
112    def __init__(
113        self,
114        *,
115        destination: AirbyteWriterInterface | Destination,
116        source_data: Source | ReadResult,
117        catalog_provider: CatalogProvider,
118        state_writer: StateWriterBase,
119        progress_tracker: ProgressTracker,
120    ) -> None:
121        """Initialize a write result.
122
123        This class should not be created directly. Instead, it should be returned by the `write`
124        method of the `Destination` class.
125        """
126        self._destination: AirbyteWriterInterface | Destination = destination
127        self._source_data: Source | ReadResult = source_data
128        self._catalog_provider: CatalogProvider = catalog_provider
129        self._state_writer: StateWriterBase = state_writer
130        self._progress_tracker: ProgressTracker = progress_tracker
131
132    @property
133    def processed_records(self) -> int:
134        """The total number of records written to the destination."""
135        return self._progress_tracker.total_destination_records_delivered
136
137    def get_state_provider(self) -> StateProviderBase:
138        """Return the state writer as a state provider.
139
140        As a public interface, we only expose the state writer as a state provider. This is because
141        the state writer itself is only intended for internal use. As a state provider, the state
142        writer can be used to read the state artifacts that were written. This can be useful for
143        testing or debugging.
144        """
145        return self._state_writer

The result of a write operation.

This class is used to return information about the write operation, such as the number of records written. It should not be created directly, but instead returned by the write method of a destination.

WriteResult( *, destination: airbyte._writers.base.AirbyteWriterInterface | airbyte.Destination, source_data: airbyte.Source | ReadResult, catalog_provider: airbyte.shared.catalog_providers.CatalogProvider, state_writer: airbyte.shared.state_writers.StateWriterBase, progress_tracker: airbyte.progress.ProgressTracker)
112    def __init__(
113        self,
114        *,
115        destination: AirbyteWriterInterface | Destination,
116        source_data: Source | ReadResult,
117        catalog_provider: CatalogProvider,
118        state_writer: StateWriterBase,
119        progress_tracker: ProgressTracker,
120    ) -> None:
121        """Initialize a write result.
122
123        This class should not be created directly. Instead, it should be returned by the `write`
124        method of the `Destination` class.
125        """
126        self._destination: AirbyteWriterInterface | Destination = destination
127        self._source_data: Source | ReadResult = source_data
128        self._catalog_provider: CatalogProvider = catalog_provider
129        self._state_writer: StateWriterBase = state_writer
130        self._progress_tracker: ProgressTracker = progress_tracker

Initialize a write result.

This class should not be created directly. Instead, it should be returned by the write method of the Destination class.

processed_records: int
132    @property
133    def processed_records(self) -> int:
134        """The total number of records written to the destination."""
135        return self._progress_tracker.total_destination_records_delivered

The total number of records written to the destination.

def get_state_provider(self) -> airbyte.shared.state_providers.StateProviderBase:
137    def get_state_provider(self) -> StateProviderBase:
138        """Return the state writer as a state provider.
139
140        As a public interface, we only expose the state writer as a state provider. This is because
141        the state writer itself is only intended for internal use. As a state provider, the state
142        writer can be used to read the state artifacts that were written. This can be useful for
143        testing or debugging.
144        """
145        return self._state_writer

Return the state writer as a state provider.

As a public interface, we only expose the state writer as a state provider. This is because the state writer itself is only intended for internal use. As a state provider, the state writer can be used to read the state artifacts that were written. This can be useful for testing or debugging.