airbyte_cdk.sources.file_based.stream.concurrent.adapters

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import logging
  7from functools import lru_cache
  8from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
  9
 10from typing_extensions import deprecated
 11
 12from airbyte_cdk.models import (
 13    AirbyteLogMessage,
 14    AirbyteMessage,
 15    ConfiguredAirbyteStream,
 16    Level,
 17    SyncMode,
 18    Type,
 19)
 20from airbyte_cdk.sources import AbstractSource
 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 22from airbyte_cdk.sources.file_based.availability_strategy import (
 23    AbstractFileBasedAvailabilityStrategy,
 24)
 25from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
 26from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
 27from airbyte_cdk.sources.file_based.remote_file import RemoteFile
 28from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
 29from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor
 30from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
 31from airbyte_cdk.sources.file_based.types import StreamSlice
 32from airbyte_cdk.sources.message import MessageRepository
 33from airbyte_cdk.sources.source import ExperimentalClassWarning
 34from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 35from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 36from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
 37from airbyte_cdk.sources.streams.concurrent.helpers import (
 38    get_cursor_field_from_stream,
 39    get_primary_key_from_stream,
 40)
 41from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 42from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 43from airbyte_cdk.sources.streams.core import StreamData
 44from airbyte_cdk.sources.types import Record
 45from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
 46from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 47
 48if TYPE_CHECKING:
 49    from airbyte_cdk.sources.file_based.stream.concurrent.cursor import (
 50        AbstractConcurrentFileBasedCursor,
 51    )
 52
 53"""
 54This module contains adapters to help enabling concurrency on File-based Stream objects without needing to migrate to AbstractStream
 55"""
 56
 57
 58@deprecated(
 59    "This class is experimental. Use at your own risk.",
 60    category=ExperimentalClassWarning,
 61)
 62class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream):
 63    @classmethod
 64    def create_from_stream(
 65        cls,
 66        stream: AbstractFileBasedStream,
 67        source: AbstractSource,
 68        logger: logging.Logger,
 69        state: Optional[MutableMapping[str, Any]],
 70        cursor: "AbstractConcurrentFileBasedCursor",
 71    ) -> "FileBasedStreamFacade":
 72        """
 73        Create a ConcurrentStream from a FileBasedStream object.
 74        """
 75        pk = get_primary_key_from_stream(stream.primary_key)
 76        cursor_field = get_cursor_field_from_stream(stream)
 77        stream._cursor = cursor
 78
 79        if not source.message_repository:
 80            raise ValueError(
 81                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 82            )
 83
 84        message_repository = source.message_repository
 85        return FileBasedStreamFacade(
 86            DefaultStream(
 87                partition_generator=FileBasedStreamPartitionGenerator(
 88                    stream,
 89                    message_repository,
 90                    SyncMode.full_refresh
 91                    if isinstance(cursor, FileBasedFinalStateCursor)
 92                    else SyncMode.incremental,
 93                    [cursor_field] if cursor_field is not None else None,
 94                    state,
 95                    cursor,
 96                ),
 97                name=stream.name,
 98                json_schema=stream.get_json_schema(),
 99                primary_key=pk,
100                cursor_field=cursor_field,
101                logger=logger,
102                namespace=stream.namespace,
103                cursor=cursor,
104            ),
105            stream,
106            cursor,
107            logger=logger,
108            slice_logger=source._slice_logger,
109        )
110
111    def __init__(
112        self,
113        stream: DefaultStream,
114        legacy_stream: AbstractFileBasedStream,
115        cursor: AbstractFileBasedCursor,
116        slice_logger: SliceLogger,
117        logger: logging.Logger,
118    ):
119        """
120        :param stream: The underlying AbstractStream
121        """
122        self._abstract_stream = stream
123        self._legacy_stream = legacy_stream
124        self._cursor = cursor
125        self._slice_logger = slice_logger
126        self._logger = logger
127        self.catalog_schema = legacy_stream.catalog_schema
128        self.config = legacy_stream.config
129        self.validation_policy = legacy_stream.validation_policy
130
131    @property
132    def cursor_field(self) -> Union[str, List[str]]:
133        if self._abstract_stream.cursor_field is None:
134            return []
135        else:
136            return self._abstract_stream.cursor_field
137
138    @property
139    def name(self) -> str:
140        return self._abstract_stream.name
141
142    @property
143    def supports_incremental(self) -> bool:
144        return self._legacy_stream.supports_incremental
145
146    @property
147    @deprecated("Deprecated as of CDK version 3.7.0.")
148    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
149        return self._legacy_stream.availability_strategy
150
151    @lru_cache(maxsize=None)
152    def get_json_schema(self) -> Mapping[str, Any]:
153        return self._abstract_stream.get_json_schema()
154
155    @property
156    def primary_key(self) -> PrimaryKeyType:
157        return (
158            self._legacy_stream.config.primary_key
159            or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config)
160        )
161
162    def get_parser(self) -> FileTypeParser:
163        return self._legacy_stream.get_parser()
164
165    def get_files(self) -> Iterable[RemoteFile]:
166        return self._legacy_stream.get_files()
167
168    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]:
169        yield from self._legacy_stream.read_records_from_slice(stream_slice)  # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage
170
171    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
172        return self._legacy_stream.compute_slices()
173
174    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
175        return self._legacy_stream.infer_schema(files)
176
177    def get_underlying_stream(self) -> DefaultStream:
178        return self._abstract_stream
179
180    def read(
181        self,
182        configured_stream: ConfiguredAirbyteStream,
183        logger: logging.Logger,
184        slice_logger: SliceLogger,
185        stream_state: MutableMapping[str, Any],
186        state_manager: ConnectorStateManager,
187        internal_config: InternalConfig,
188    ) -> Iterable[StreamData]:
189        yield from self._read_records()
190
191    def read_records(
192        self,
193        sync_mode: SyncMode,
194        cursor_field: Optional[List[str]] = None,
195        stream_slice: Optional[Mapping[str, Any]] = None,
196        stream_state: Optional[Mapping[str, Any]] = None,
197    ) -> Iterable[StreamData]:
198        try:
199            yield from self._read_records()
200        except Exception as exc:
201            if hasattr(self._cursor, "state"):
202                state = str(self._cursor.state)
203            else:
204                # This shouldn't happen if the ConcurrentCursor was used
205                state = "unknown; no state attribute was available on the cursor"
206            yield AirbyteMessage(
207                type=Type.LOG,
208                log=AirbyteLogMessage(
209                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
210                ),
211            )
212            raise exc
213
214    def _read_records(self) -> Iterable[StreamData]:
215        for partition in self._abstract_stream.generate_partitions():
216            if self._slice_logger.should_log_slice_message(self._logger):
217                yield self._slice_logger.create_slice_log_message(partition.to_slice())
218            for record in partition.read():
219                yield record.data
220
221
222class FileBasedStreamPartition(Partition):
223    def __init__(
224        self,
225        stream: AbstractFileBasedStream,
226        _slice: Optional[Mapping[str, Any]],
227        message_repository: MessageRepository,
228        sync_mode: SyncMode,
229        cursor_field: Optional[List[str]],
230        state: Optional[MutableMapping[str, Any]],
231    ):
232        self._stream = stream
233        self._slice = _slice
234        self._message_repository = message_repository
235        self._sync_mode = sync_mode
236        self._cursor_field = cursor_field
237        self._state = state
238
239    def read(self) -> Iterable[Record]:
240        try:
241            for record_data in self._stream.read_records(
242                cursor_field=self._cursor_field,
243                sync_mode=SyncMode.full_refresh,
244                stream_slice=copy.deepcopy(self._slice),
245                stream_state=self._state,
246            ):
247                if isinstance(record_data, Mapping):
248                    data_to_return = dict(record_data)
249                    self._stream.transformer.transform(
250                        data_to_return, self._stream.get_json_schema()
251                    )
252                    yield Record(data=data_to_return, stream_name=self.stream_name())
253                elif (
254                    isinstance(record_data, AirbyteMessage)
255                    and record_data.type == Type.RECORD
256                    and record_data.record is not None
257                ):
258                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
259                    record_message_data = record_data.record.data
260                    if not record_message_data:
261                        raise ExceptionWithDisplayMessage("A record without data was found")
262                    else:
263                        yield Record(
264                            data=record_message_data,
265                            stream_name=self.stream_name(),
266                            file_reference=record_data.record.file_reference,
267                        )
268                else:
269                    self._message_repository.emit_message(record_data)
270        except Exception as e:
271            display_message = self._stream.get_error_display_message(e)
272            if display_message:
273                raise ExceptionWithDisplayMessage(display_message) from e
274            else:
275                raise e
276
277    def to_slice(self) -> Optional[Mapping[str, Any]]:
278        if self._slice is None:
279            return None
280        assert len(self._slice["files"]) == 1, (
281            f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
282        )
283        file = self._slice["files"][0]
284        return {"files": [file]}
285
286    def __hash__(self) -> int:
287        if self._slice:
288            # Convert the slice to a string so that it can be hashed
289            if len(self._slice["files"]) != 1:
290                raise ValueError(
291                    f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support."
292                )
293            else:
294                s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}"
295            return hash((self._stream.name, s))
296        else:
297            return hash(self._stream.name)
298
299    def stream_name(self) -> str:
300        return self._stream.name
301
302    def __repr__(self) -> str:
303        return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"
304
305
306class FileBasedStreamPartitionGenerator(PartitionGenerator):
307    def __init__(
308        self,
309        stream: AbstractFileBasedStream,
310        message_repository: MessageRepository,
311        sync_mode: SyncMode,
312        cursor_field: Optional[List[str]],
313        state: Optional[MutableMapping[str, Any]],
314        cursor: "AbstractConcurrentFileBasedCursor",
315    ):
316        self._stream = stream
317        self._message_repository = message_repository
318        self._sync_mode = sync_mode
319        self._cursor_field = cursor_field
320        self._state = state
321        self._cursor = cursor
322
323    def generate(self) -> Iterable[FileBasedStreamPartition]:
324        pending_partitions = []
325        for _slice in self._stream.stream_slices(
326            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
327        ):
328            if _slice is not None:
329                for file in _slice.get("files", []):
330                    pending_partitions.append(
331                        FileBasedStreamPartition(
332                            self._stream,
333                            {"files": [copy.deepcopy(file)]},
334                            self._message_repository,
335                            self._sync_mode,
336                            self._cursor_field,
337                            self._state,
338                        )
339                    )
340        self._cursor.set_pending_partitions(pending_partitions)
341        yield from pending_partitions
 59@deprecated(
 60    "This class is experimental. Use at your own risk.",
 61    category=ExperimentalClassWarning,
 62)
 63class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream):
 64    @classmethod
 65    def create_from_stream(
 66        cls,
 67        stream: AbstractFileBasedStream,
 68        source: AbstractSource,
 69        logger: logging.Logger,
 70        state: Optional[MutableMapping[str, Any]],
 71        cursor: "AbstractConcurrentFileBasedCursor",
 72    ) -> "FileBasedStreamFacade":
 73        """
 74        Create a ConcurrentStream from a FileBasedStream object.
 75        """
 76        pk = get_primary_key_from_stream(stream.primary_key)
 77        cursor_field = get_cursor_field_from_stream(stream)
 78        stream._cursor = cursor
 79
 80        if not source.message_repository:
 81            raise ValueError(
 82                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 83            )
 84
 85        message_repository = source.message_repository
 86        return FileBasedStreamFacade(
 87            DefaultStream(
 88                partition_generator=FileBasedStreamPartitionGenerator(
 89                    stream,
 90                    message_repository,
 91                    SyncMode.full_refresh
 92                    if isinstance(cursor, FileBasedFinalStateCursor)
 93                    else SyncMode.incremental,
 94                    [cursor_field] if cursor_field is not None else None,
 95                    state,
 96                    cursor,
 97                ),
 98                name=stream.name,
 99                json_schema=stream.get_json_schema(),
100                primary_key=pk,
101                cursor_field=cursor_field,
102                logger=logger,
103                namespace=stream.namespace,
104                cursor=cursor,
105            ),
106            stream,
107            cursor,
108            logger=logger,
109            slice_logger=source._slice_logger,
110        )
111
112    def __init__(
113        self,
114        stream: DefaultStream,
115        legacy_stream: AbstractFileBasedStream,
116        cursor: AbstractFileBasedCursor,
117        slice_logger: SliceLogger,
118        logger: logging.Logger,
119    ):
120        """
121        :param stream: The underlying AbstractStream
122        """
123        self._abstract_stream = stream
124        self._legacy_stream = legacy_stream
125        self._cursor = cursor
126        self._slice_logger = slice_logger
127        self._logger = logger
128        self.catalog_schema = legacy_stream.catalog_schema
129        self.config = legacy_stream.config
130        self.validation_policy = legacy_stream.validation_policy
131
132    @property
133    def cursor_field(self) -> Union[str, List[str]]:
134        if self._abstract_stream.cursor_field is None:
135            return []
136        else:
137            return self._abstract_stream.cursor_field
138
139    @property
140    def name(self) -> str:
141        return self._abstract_stream.name
142
143    @property
144    def supports_incremental(self) -> bool:
145        return self._legacy_stream.supports_incremental
146
147    @property
148    @deprecated("Deprecated as of CDK version 3.7.0.")
149    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
150        return self._legacy_stream.availability_strategy
151
152    @lru_cache(maxsize=None)
153    def get_json_schema(self) -> Mapping[str, Any]:
154        return self._abstract_stream.get_json_schema()
155
156    @property
157    def primary_key(self) -> PrimaryKeyType:
158        return (
159            self._legacy_stream.config.primary_key
160            or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config)
161        )
162
163    def get_parser(self) -> FileTypeParser:
164        return self._legacy_stream.get_parser()
165
166    def get_files(self) -> Iterable[RemoteFile]:
167        return self._legacy_stream.get_files()
168
169    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]:
170        yield from self._legacy_stream.read_records_from_slice(stream_slice)  # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage
171
172    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
173        return self._legacy_stream.compute_slices()
174
175    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
176        return self._legacy_stream.infer_schema(files)
177
178    def get_underlying_stream(self) -> DefaultStream:
179        return self._abstract_stream
180
181    def read(
182        self,
183        configured_stream: ConfiguredAirbyteStream,
184        logger: logging.Logger,
185        slice_logger: SliceLogger,
186        stream_state: MutableMapping[str, Any],
187        state_manager: ConnectorStateManager,
188        internal_config: InternalConfig,
189    ) -> Iterable[StreamData]:
190        yield from self._read_records()
191
192    def read_records(
193        self,
194        sync_mode: SyncMode,
195        cursor_field: Optional[List[str]] = None,
196        stream_slice: Optional[Mapping[str, Any]] = None,
197        stream_state: Optional[Mapping[str, Any]] = None,
198    ) -> Iterable[StreamData]:
199        try:
200            yield from self._read_records()
201        except Exception as exc:
202            if hasattr(self._cursor, "state"):
203                state = str(self._cursor.state)
204            else:
205                # This shouldn't happen if the ConcurrentCursor was used
206                state = "unknown; no state attribute was available on the cursor"
207            yield AirbyteMessage(
208                type=Type.LOG,
209                log=AirbyteLogMessage(
210                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
211                ),
212            )
213            raise exc
214
215    def _read_records(self) -> Iterable[StreamData]:
216        for partition in self._abstract_stream.generate_partitions():
217            if self._slice_logger.should_log_slice_message(self._logger):
218                yield self._slice_logger.create_slice_log_message(partition.to_slice())
219            for record in partition.read():
220                yield record.data

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

@classmethod
def create_from_stream( cls, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, source: airbyte_cdk.AbstractSource, logger: logging.Logger, state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor) -> FileBasedStreamFacade:
 64    @classmethod
 65    def create_from_stream(
 66        cls,
 67        stream: AbstractFileBasedStream,
 68        source: AbstractSource,
 69        logger: logging.Logger,
 70        state: Optional[MutableMapping[str, Any]],
 71        cursor: "AbstractConcurrentFileBasedCursor",
 72    ) -> "FileBasedStreamFacade":
 73        """
 74        Create a ConcurrentStream from a FileBasedStream object.
 75        """
 76        pk = get_primary_key_from_stream(stream.primary_key)
 77        cursor_field = get_cursor_field_from_stream(stream)
 78        stream._cursor = cursor
 79
 80        if not source.message_repository:
 81            raise ValueError(
 82                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 83            )
 84
 85        message_repository = source.message_repository
 86        return FileBasedStreamFacade(
 87            DefaultStream(
 88                partition_generator=FileBasedStreamPartitionGenerator(
 89                    stream,
 90                    message_repository,
 91                    SyncMode.full_refresh
 92                    if isinstance(cursor, FileBasedFinalStateCursor)
 93                    else SyncMode.incremental,
 94                    [cursor_field] if cursor_field is not None else None,
 95                    state,
 96                    cursor,
 97                ),
 98                name=stream.name,
 99                json_schema=stream.get_json_schema(),
100                primary_key=pk,
101                cursor_field=cursor_field,
102                logger=logger,
103                namespace=stream.namespace,
104                cursor=cursor,
105            ),
106            stream,
107            cursor,
108            logger=logger,
109            slice_logger=source._slice_logger,
110        )

Create a ConcurrentStream from a FileBasedStream object.

cursor_field: Union[str, List[str]]
132    @property
133    def cursor_field(self) -> Union[str, List[str]]:
134        if self._abstract_stream.cursor_field is None:
135            return []
136        else:
137            return self._abstract_stream.cursor_field

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

supports_incremental: bool
143    @property
144    def supports_incremental(self) -> bool:
145        return self._legacy_stream.supports_incremental
Returns

True if this stream supports incrementally reading data

def get_underlying_stream( self) -> airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream:
178    def get_underlying_stream(self) -> DefaultStream:
179        return self._abstract_stream

Return the underlying stream facade object.

def read( self, configured_stream: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, stream_state: MutableMapping[str, Any], state_manager: airbyte_cdk.ConnectorStateManager, internal_config: airbyte_cdk.InternalConfig) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
181    def read(
182        self,
183        configured_stream: ConfiguredAirbyteStream,
184        logger: logging.Logger,
185        slice_logger: SliceLogger,
186        stream_state: MutableMapping[str, Any],
187        state_manager: ConnectorStateManager,
188        internal_config: InternalConfig,
189    ) -> Iterable[StreamData]:
190        yield from self._read_records()
class FileBasedStreamPartition(airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition):
223class FileBasedStreamPartition(Partition):
224    def __init__(
225        self,
226        stream: AbstractFileBasedStream,
227        _slice: Optional[Mapping[str, Any]],
228        message_repository: MessageRepository,
229        sync_mode: SyncMode,
230        cursor_field: Optional[List[str]],
231        state: Optional[MutableMapping[str, Any]],
232    ):
233        self._stream = stream
234        self._slice = _slice
235        self._message_repository = message_repository
236        self._sync_mode = sync_mode
237        self._cursor_field = cursor_field
238        self._state = state
239
240    def read(self) -> Iterable[Record]:
241        try:
242            for record_data in self._stream.read_records(
243                cursor_field=self._cursor_field,
244                sync_mode=SyncMode.full_refresh,
245                stream_slice=copy.deepcopy(self._slice),
246                stream_state=self._state,
247            ):
248                if isinstance(record_data, Mapping):
249                    data_to_return = dict(record_data)
250                    self._stream.transformer.transform(
251                        data_to_return, self._stream.get_json_schema()
252                    )
253                    yield Record(data=data_to_return, stream_name=self.stream_name())
254                elif (
255                    isinstance(record_data, AirbyteMessage)
256                    and record_data.type == Type.RECORD
257                    and record_data.record is not None
258                ):
259                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
260                    record_message_data = record_data.record.data
261                    if not record_message_data:
262                        raise ExceptionWithDisplayMessage("A record without data was found")
263                    else:
264                        yield Record(
265                            data=record_message_data,
266                            stream_name=self.stream_name(),
267                            file_reference=record_data.record.file_reference,
268                        )
269                else:
270                    self._message_repository.emit_message(record_data)
271        except Exception as e:
272            display_message = self._stream.get_error_display_message(e)
273            if display_message:
274                raise ExceptionWithDisplayMessage(display_message) from e
275            else:
276                raise e
277
278    def to_slice(self) -> Optional[Mapping[str, Any]]:
279        if self._slice is None:
280            return None
281        assert len(self._slice["files"]) == 1, (
282            f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
283        )
284        file = self._slice["files"][0]
285        return {"files": [file]}
286
287    def __hash__(self) -> int:
288        if self._slice:
289            # Convert the slice to a string so that it can be hashed
290            if len(self._slice["files"]) != 1:
291                raise ValueError(
292                    f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support."
293                )
294            else:
295                s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}"
296            return hash((self._stream.name, s))
297        else:
298            return hash(self._stream.name)
299
300    def stream_name(self) -> str:
301        return self._stream.name
302
303    def __repr__(self) -> str:
304        return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"

A partition is responsible for reading a specific set of data from a source.

FileBasedStreamPartition( stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, _slice: Optional[Mapping[str, Any]], message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
224    def __init__(
225        self,
226        stream: AbstractFileBasedStream,
227        _slice: Optional[Mapping[str, Any]],
228        message_repository: MessageRepository,
229        sync_mode: SyncMode,
230        cursor_field: Optional[List[str]],
231        state: Optional[MutableMapping[str, Any]],
232    ):
233        self._stream = stream
234        self._slice = _slice
235        self._message_repository = message_repository
236        self._sync_mode = sync_mode
237        self._cursor_field = cursor_field
238        self._state = state
def read(self) -> Iterable[airbyte_cdk.Record]:
240    def read(self) -> Iterable[Record]:
241        try:
242            for record_data in self._stream.read_records(
243                cursor_field=self._cursor_field,
244                sync_mode=SyncMode.full_refresh,
245                stream_slice=copy.deepcopy(self._slice),
246                stream_state=self._state,
247            ):
248                if isinstance(record_data, Mapping):
249                    data_to_return = dict(record_data)
250                    self._stream.transformer.transform(
251                        data_to_return, self._stream.get_json_schema()
252                    )
253                    yield Record(data=data_to_return, stream_name=self.stream_name())
254                elif (
255                    isinstance(record_data, AirbyteMessage)
256                    and record_data.type == Type.RECORD
257                    and record_data.record is not None
258                ):
259                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
260                    record_message_data = record_data.record.data
261                    if not record_message_data:
262                        raise ExceptionWithDisplayMessage("A record without data was found")
263                    else:
264                        yield Record(
265                            data=record_message_data,
266                            stream_name=self.stream_name(),
267                            file_reference=record_data.record.file_reference,
268                        )
269                else:
270                    self._message_repository.emit_message(record_data)
271        except Exception as e:
272            display_message = self._stream.get_error_display_message(e)
273            if display_message:
274                raise ExceptionWithDisplayMessage(display_message) from e
275            else:
276                raise e

Reads the data from the partition.

Returns

An iterable of records.

def to_slice(self) -> Optional[Mapping[str, Any]]:
278    def to_slice(self) -> Optional[Mapping[str, Any]]:
279        if self._slice is None:
280            return None
281        assert len(self._slice["files"]) == 1, (
282            f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
283        )
284        file = self._slice["files"][0]
285        return {"files": [file]}

Converts the partition to a slice that can be serialized and deserialized.

Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested values (example)

Returns

A mapping representing a slice

def stream_name(self) -> str:
300    def stream_name(self) -> str:
301        return self._stream.name

Returns the name of the stream that this partition is reading from.

Returns

The name of the stream.

307class FileBasedStreamPartitionGenerator(PartitionGenerator):
308    def __init__(
309        self,
310        stream: AbstractFileBasedStream,
311        message_repository: MessageRepository,
312        sync_mode: SyncMode,
313        cursor_field: Optional[List[str]],
314        state: Optional[MutableMapping[str, Any]],
315        cursor: "AbstractConcurrentFileBasedCursor",
316    ):
317        self._stream = stream
318        self._message_repository = message_repository
319        self._sync_mode = sync_mode
320        self._cursor_field = cursor_field
321        self._state = state
322        self._cursor = cursor
323
324    def generate(self) -> Iterable[FileBasedStreamPartition]:
325        pending_partitions = []
326        for _slice in self._stream.stream_slices(
327            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
328        ):
329            if _slice is not None:
330                for file in _slice.get("files", []):
331                    pending_partitions.append(
332                        FileBasedStreamPartition(
333                            self._stream,
334                            {"files": [copy.deepcopy(file)]},
335                            self._message_repository,
336                            self._sync_mode,
337                            self._cursor_field,
338                            self._state,
339                        )
340                    )
341        self._cursor.set_pending_partitions(pending_partitions)
342        yield from pending_partitions

Helper class that provides a standard way to create an ABC using inheritance.

FileBasedStreamPartitionGenerator( stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor)
308    def __init__(
309        self,
310        stream: AbstractFileBasedStream,
311        message_repository: MessageRepository,
312        sync_mode: SyncMode,
313        cursor_field: Optional[List[str]],
314        state: Optional[MutableMapping[str, Any]],
315        cursor: "AbstractConcurrentFileBasedCursor",
316    ):
317        self._stream = stream
318        self._message_repository = message_repository
319        self._sync_mode = sync_mode
320        self._cursor_field = cursor_field
321        self._state = state
322        self._cursor = cursor
def generate( self) -> Iterable[FileBasedStreamPartition]:
324    def generate(self) -> Iterable[FileBasedStreamPartition]:
325        pending_partitions = []
326        for _slice in self._stream.stream_slices(
327            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
328        ):
329            if _slice is not None:
330                for file in _slice.get("files", []):
331                    pending_partitions.append(
332                        FileBasedStreamPartition(
333                            self._stream,
334                            {"files": [copy.deepcopy(file)]},
335                            self._message_repository,
336                            self._sync_mode,
337                            self._cursor_field,
338                            self._state,
339                        )
340                    )
341        self._cursor.set_pending_partitions(pending_partitions)
342        yield from pending_partitions

Generates partitions for a given sync mode.

Returns

An iterable of partitions