airbyte_cdk.sources.file_based.stream.concurrent.adapters

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import logging
  7from functools import lru_cache
  8from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
  9
 10from typing_extensions import deprecated
 11
 12from airbyte_cdk.models import (
 13    AirbyteLogMessage,
 14    AirbyteMessage,
 15    ConfiguredAirbyteStream,
 16    Level,
 17    SyncMode,
 18    Type,
 19)
 20from airbyte_cdk.sources import AbstractSource
 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 22from airbyte_cdk.sources.file_based.availability_strategy import (
 23    AbstractFileBasedAvailabilityStrategy,
 24)
 25from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
 26from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
 27from airbyte_cdk.sources.file_based.remote_file import RemoteFile
 28from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
 29from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor
 30from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
 31from airbyte_cdk.sources.file_based.types import StreamSlice
 32from airbyte_cdk.sources.message import MessageRepository
 33from airbyte_cdk.sources.source import ExperimentalClassWarning
 34from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 35from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
 36from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 37from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
 38from airbyte_cdk.sources.streams.concurrent.helpers import (
 39    get_cursor_field_from_stream,
 40    get_primary_key_from_stream,
 41)
 42from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 43from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 44from airbyte_cdk.sources.streams.core import StreamData
 45from airbyte_cdk.sources.types import Record
 46from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
 47from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 48
 49if TYPE_CHECKING:
 50    from airbyte_cdk.sources.file_based.stream.concurrent.cursor import (
 51        AbstractConcurrentFileBasedCursor,
 52    )
 53
 54"""
 55This module contains adapters to help enabling concurrency on File-based Stream objects without needing to migrate to AbstractStream
 56"""
 57
 58
 59@deprecated(
 60    "This class is experimental. Use at your own risk.",
 61    category=ExperimentalClassWarning,
 62)
 63class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream):
 64    @classmethod
 65    def create_from_stream(
 66        cls,
 67        stream: AbstractFileBasedStream,
 68        source: AbstractSource,
 69        logger: logging.Logger,
 70        state: Optional[MutableMapping[str, Any]],
 71        cursor: "AbstractConcurrentFileBasedCursor",
 72    ) -> "FileBasedStreamFacade":
 73        """
 74        Create a ConcurrentStream from a FileBasedStream object.
 75        """
 76        pk = get_primary_key_from_stream(stream.primary_key)
 77        cursor_field = get_cursor_field_from_stream(stream)
 78        stream._cursor = cursor
 79
 80        if not source.message_repository:
 81            raise ValueError(
 82                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 83            )
 84
 85        message_repository = source.message_repository
 86        return FileBasedStreamFacade(
 87            DefaultStream(
 88                partition_generator=FileBasedStreamPartitionGenerator(
 89                    stream,
 90                    message_repository,
 91                    SyncMode.full_refresh
 92                    if isinstance(cursor, FileBasedFinalStateCursor)
 93                    else SyncMode.incremental,
 94                    [cursor_field] if cursor_field is not None else None,
 95                    state,
 96                    cursor,
 97                ),
 98                name=stream.name,
 99                json_schema=stream.get_json_schema(),
100                primary_key=pk,
101                cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
102                logger=logger,
103                namespace=stream.namespace,
104                cursor=cursor,
105            ),
106            stream,
107            cursor,
108            logger=logger,
109            slice_logger=source._slice_logger,
110        )
111
112    def __init__(
113        self,
114        stream: DefaultStream,
115        legacy_stream: AbstractFileBasedStream,
116        cursor: AbstractFileBasedCursor,
117        slice_logger: SliceLogger,
118        logger: logging.Logger,
119    ):
120        """
121        :param stream: The underlying AbstractStream
122        """
123        self._abstract_stream = stream
124        self._legacy_stream = legacy_stream
125        self._cursor = cursor
126        self._slice_logger = slice_logger
127        self._logger = logger
128        self.catalog_schema = legacy_stream.catalog_schema
129        self.config = legacy_stream.config
130        self.validation_policy = legacy_stream.validation_policy
131
132    @property
133    def cursor_field(self) -> Union[str, List[str]]:
134        if self._abstract_stream.cursor_field is None:
135            return []
136        else:
137            return self._abstract_stream.cursor_field
138
139    @property
140    def name(self) -> str:
141        return self._abstract_stream.name
142
143    @property
144    def supports_incremental(self) -> bool:
145        return self._legacy_stream.supports_incremental
146
147    @property
148    @deprecated("Deprecated as of CDK version 3.7.0.")
149    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
150        return self._legacy_stream.availability_strategy
151
152    @lru_cache(maxsize=None)
153    def get_json_schema(self) -> Mapping[str, Any]:
154        return self._abstract_stream.get_json_schema()
155
156    @property
157    def primary_key(self) -> PrimaryKeyType:
158        return self._legacy_stream.primary_key
159
160    def get_parser(self) -> FileTypeParser:
161        return self._legacy_stream.get_parser()
162
163    def get_files(self) -> Iterable[RemoteFile]:
164        return self._legacy_stream.get_files()
165
166    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]:
167        yield from self._legacy_stream.read_records_from_slice(stream_slice)  # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage
168
169    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
170        return self._legacy_stream.compute_slices()
171
172    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
173        return self._legacy_stream.infer_schema(files)
174
175    def get_underlying_stream(self) -> DefaultStream:
176        return self._abstract_stream
177
178    def read(
179        self,
180        configured_stream: ConfiguredAirbyteStream,
181        logger: logging.Logger,
182        slice_logger: SliceLogger,
183        stream_state: MutableMapping[str, Any],
184        state_manager: ConnectorStateManager,
185        internal_config: InternalConfig,
186    ) -> Iterable[StreamData]:
187        yield from self._read_records()
188
189    def read_records(
190        self,
191        sync_mode: SyncMode,
192        cursor_field: Optional[List[str]] = None,
193        stream_slice: Optional[Mapping[str, Any]] = None,
194        stream_state: Optional[Mapping[str, Any]] = None,
195    ) -> Iterable[StreamData]:
196        try:
197            yield from self._read_records()
198        except Exception as exc:
199            if hasattr(self._cursor, "state"):
200                state = str(self._cursor.state)
201            else:
202                # This shouldn't happen if the ConcurrentCursor was used
203                state = "unknown; no state attribute was available on the cursor"
204            yield AirbyteMessage(
205                type=Type.LOG,
206                log=AirbyteLogMessage(
207                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
208                ),
209            )
210            raise exc
211
212    def _read_records(self) -> Iterable[StreamData]:
213        for partition in self._abstract_stream.generate_partitions():
214            if self._slice_logger.should_log_slice_message(self._logger):
215                yield self._slice_logger.create_slice_log_message(partition.to_slice())
216            for record in partition.read():
217                yield record.data
218
219
220class FileBasedStreamPartition(Partition):
221    def __init__(
222        self,
223        stream: AbstractFileBasedStream,
224        _slice: Optional[Mapping[str, Any]],
225        message_repository: MessageRepository,
226        sync_mode: SyncMode,
227        cursor_field: Optional[List[str]],
228        state: Optional[MutableMapping[str, Any]],
229    ):
230        self._stream = stream
231        self._slice = _slice
232        self._message_repository = message_repository
233        self._sync_mode = sync_mode
234        self._cursor_field = cursor_field
235        self._state = state
236
237    def read(self) -> Iterable[Record]:
238        try:
239            for record_data in self._stream.read_records(
240                cursor_field=self._cursor_field,
241                sync_mode=SyncMode.full_refresh,
242                stream_slice=copy.deepcopy(self._slice),
243                stream_state=self._state,
244            ):
245                if isinstance(record_data, Mapping):
246                    data_to_return = dict(record_data)
247                    self._stream.transformer.transform(
248                        data_to_return, self._stream.get_json_schema()
249                    )
250                    yield Record(data=data_to_return, stream_name=self.stream_name())
251                elif (
252                    isinstance(record_data, AirbyteMessage)
253                    and record_data.type == Type.RECORD
254                    and record_data.record is not None
255                ):
256                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
257                    record_message_data = record_data.record.data
258                    if not record_message_data:
259                        raise ExceptionWithDisplayMessage("A record without data was found")
260                    else:
261                        yield Record(
262                            data=record_message_data,
263                            stream_name=self.stream_name(),
264                            file_reference=record_data.record.file_reference,
265                        )
266                else:
267                    self._message_repository.emit_message(record_data)
268        except Exception as e:
269            display_message = self._stream.get_error_display_message(e)
270            if display_message:
271                raise ExceptionWithDisplayMessage(display_message) from e
272            else:
273                raise e
274
275    def to_slice(self) -> Optional[Mapping[str, Any]]:
276        if self._slice is None:
277            return None
278        assert len(self._slice["files"]) == 1, (
279            f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
280        )
281        file = self._slice["files"][0]
282        return {"files": [file]}
283
284    def __hash__(self) -> int:
285        if self._slice:
286            # Convert the slice to a string so that it can be hashed
287            if len(self._slice["files"]) != 1:
288                raise ValueError(
289                    f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support."
290                )
291            else:
292                s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}"
293            return hash((self._stream.name, s))
294        else:
295            return hash(self._stream.name)
296
297    def stream_name(self) -> str:
298        return self._stream.name
299
300    def __repr__(self) -> str:
301        return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"
302
303
304class FileBasedStreamPartitionGenerator(PartitionGenerator):
305    def __init__(
306        self,
307        stream: AbstractFileBasedStream,
308        message_repository: MessageRepository,
309        sync_mode: SyncMode,
310        cursor_field: Optional[List[str]],
311        state: Optional[MutableMapping[str, Any]],
312        cursor: "AbstractConcurrentFileBasedCursor",
313    ):
314        self._stream = stream
315        self._message_repository = message_repository
316        self._sync_mode = sync_mode
317        self._cursor_field = cursor_field
318        self._state = state
319        self._cursor = cursor
320
321    def generate(self) -> Iterable[FileBasedStreamPartition]:
322        pending_partitions = []
323        for _slice in self._stream.stream_slices(
324            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
325        ):
326            if _slice is not None:
327                for file in _slice.get("files", []):
328                    pending_partitions.append(
329                        FileBasedStreamPartition(
330                            self._stream,
331                            {"files": [copy.deepcopy(file)]},
332                            self._message_repository,
333                            self._sync_mode,
334                            self._cursor_field,
335                            self._state,
336                        )
337                    )
338        self._cursor.set_pending_partitions(pending_partitions)
339        yield from pending_partitions
 60@deprecated(
 61    "This class is experimental. Use at your own risk.",
 62    category=ExperimentalClassWarning,
 63)
 64class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream):
 65    @classmethod
 66    def create_from_stream(
 67        cls,
 68        stream: AbstractFileBasedStream,
 69        source: AbstractSource,
 70        logger: logging.Logger,
 71        state: Optional[MutableMapping[str, Any]],
 72        cursor: "AbstractConcurrentFileBasedCursor",
 73    ) -> "FileBasedStreamFacade":
 74        """
 75        Create a ConcurrentStream from a FileBasedStream object.
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79        stream._cursor = cursor
 80
 81        if not source.message_repository:
 82            raise ValueError(
 83                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 84            )
 85
 86        message_repository = source.message_repository
 87        return FileBasedStreamFacade(
 88            DefaultStream(
 89                partition_generator=FileBasedStreamPartitionGenerator(
 90                    stream,
 91                    message_repository,
 92                    SyncMode.full_refresh
 93                    if isinstance(cursor, FileBasedFinalStateCursor)
 94                    else SyncMode.incremental,
 95                    [cursor_field] if cursor_field is not None else None,
 96                    state,
 97                    cursor,
 98                ),
 99                name=stream.name,
100                json_schema=stream.get_json_schema(),
101                primary_key=pk,
102                cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
103                logger=logger,
104                namespace=stream.namespace,
105                cursor=cursor,
106            ),
107            stream,
108            cursor,
109            logger=logger,
110            slice_logger=source._slice_logger,
111        )
112
113    def __init__(
114        self,
115        stream: DefaultStream,
116        legacy_stream: AbstractFileBasedStream,
117        cursor: AbstractFileBasedCursor,
118        slice_logger: SliceLogger,
119        logger: logging.Logger,
120    ):
121        """
122        :param stream: The underlying AbstractStream
123        """
124        self._abstract_stream = stream
125        self._legacy_stream = legacy_stream
126        self._cursor = cursor
127        self._slice_logger = slice_logger
128        self._logger = logger
129        self.catalog_schema = legacy_stream.catalog_schema
130        self.config = legacy_stream.config
131        self.validation_policy = legacy_stream.validation_policy
132
133    @property
134    def cursor_field(self) -> Union[str, List[str]]:
135        if self._abstract_stream.cursor_field is None:
136            return []
137        else:
138            return self._abstract_stream.cursor_field
139
140    @property
141    def name(self) -> str:
142        return self._abstract_stream.name
143
144    @property
145    def supports_incremental(self) -> bool:
146        return self._legacy_stream.supports_incremental
147
148    @property
149    @deprecated("Deprecated as of CDK version 3.7.0.")
150    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
151        return self._legacy_stream.availability_strategy
152
153    @lru_cache(maxsize=None)
154    def get_json_schema(self) -> Mapping[str, Any]:
155        return self._abstract_stream.get_json_schema()
156
157    @property
158    def primary_key(self) -> PrimaryKeyType:
159        return self._legacy_stream.primary_key
160
161    def get_parser(self) -> FileTypeParser:
162        return self._legacy_stream.get_parser()
163
164    def get_files(self) -> Iterable[RemoteFile]:
165        return self._legacy_stream.get_files()
166
167    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]:
168        yield from self._legacy_stream.read_records_from_slice(stream_slice)  # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage
169
170    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
171        return self._legacy_stream.compute_slices()
172
173    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
174        return self._legacy_stream.infer_schema(files)
175
176    def get_underlying_stream(self) -> DefaultStream:
177        return self._abstract_stream
178
179    def read(
180        self,
181        configured_stream: ConfiguredAirbyteStream,
182        logger: logging.Logger,
183        slice_logger: SliceLogger,
184        stream_state: MutableMapping[str, Any],
185        state_manager: ConnectorStateManager,
186        internal_config: InternalConfig,
187    ) -> Iterable[StreamData]:
188        yield from self._read_records()
189
190    def read_records(
191        self,
192        sync_mode: SyncMode,
193        cursor_field: Optional[List[str]] = None,
194        stream_slice: Optional[Mapping[str, Any]] = None,
195        stream_state: Optional[Mapping[str, Any]] = None,
196    ) -> Iterable[StreamData]:
197        try:
198            yield from self._read_records()
199        except Exception as exc:
200            if hasattr(self._cursor, "state"):
201                state = str(self._cursor.state)
202            else:
203                # This shouldn't happen if the ConcurrentCursor was used
204                state = "unknown; no state attribute was available on the cursor"
205            yield AirbyteMessage(
206                type=Type.LOG,
207                log=AirbyteLogMessage(
208                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
209                ),
210            )
211            raise exc
212
213    def _read_records(self) -> Iterable[StreamData]:
214        for partition in self._abstract_stream.generate_partitions():
215            if self._slice_logger.should_log_slice_message(self._logger):
216                yield self._slice_logger.create_slice_log_message(partition.to_slice())
217            for record in partition.read():
218                yield record.data

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default
@classmethod
def create_from_stream( cls, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, source: airbyte_cdk.AbstractSource, logger: logging.Logger, state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor) -> FileBasedStreamFacade:
 65    @classmethod
 66    def create_from_stream(
 67        cls,
 68        stream: AbstractFileBasedStream,
 69        source: AbstractSource,
 70        logger: logging.Logger,
 71        state: Optional[MutableMapping[str, Any]],
 72        cursor: "AbstractConcurrentFileBasedCursor",
 73    ) -> "FileBasedStreamFacade":
 74        """
 75        Create a ConcurrentStream from a FileBasedStream object.
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79        stream._cursor = cursor
 80
 81        if not source.message_repository:
 82            raise ValueError(
 83                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 84            )
 85
 86        message_repository = source.message_repository
 87        return FileBasedStreamFacade(
 88            DefaultStream(
 89                partition_generator=FileBasedStreamPartitionGenerator(
 90                    stream,
 91                    message_repository,
 92                    SyncMode.full_refresh
 93                    if isinstance(cursor, FileBasedFinalStateCursor)
 94                    else SyncMode.incremental,
 95                    [cursor_field] if cursor_field is not None else None,
 96                    state,
 97                    cursor,
 98                ),
 99                name=stream.name,
100                json_schema=stream.get_json_schema(),
101                primary_key=pk,
102                cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
103                logger=logger,
104                namespace=stream.namespace,
105                cursor=cursor,
106            ),
107            stream,
108            cursor,
109            logger=logger,
110            slice_logger=source._slice_logger,
111        )

Create a ConcurrentStream from a FileBasedStream object.

cursor_field: Union[str, List[str]]
133    @property
134    def cursor_field(self) -> Union[str, List[str]]:
135        if self._abstract_stream.cursor_field is None:
136            return []
137        else:
138            return self._abstract_stream.cursor_field

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

supports_incremental: bool
144    @property
145    def supports_incremental(self) -> bool:
146        return self._legacy_stream.supports_incremental
Returns

True if this stream supports incrementally reading data

def get_underlying_stream( self) -> airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream:
176    def get_underlying_stream(self) -> DefaultStream:
177        return self._abstract_stream

Return the underlying stream facade object.

def read( self, configured_stream: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, stream_state: MutableMapping[str, Any], state_manager: airbyte_cdk.ConnectorStateManager, internal_config: airbyte_cdk.InternalConfig) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
179    def read(
180        self,
181        configured_stream: ConfiguredAirbyteStream,
182        logger: logging.Logger,
183        slice_logger: SliceLogger,
184        stream_state: MutableMapping[str, Any],
185        state_manager: ConnectorStateManager,
186        internal_config: InternalConfig,
187    ) -> Iterable[StreamData]:
188        yield from self._read_records()
class FileBasedStreamPartition(airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition):
221class FileBasedStreamPartition(Partition):
222    def __init__(
223        self,
224        stream: AbstractFileBasedStream,
225        _slice: Optional[Mapping[str, Any]],
226        message_repository: MessageRepository,
227        sync_mode: SyncMode,
228        cursor_field: Optional[List[str]],
229        state: Optional[MutableMapping[str, Any]],
230    ):
231        self._stream = stream
232        self._slice = _slice
233        self._message_repository = message_repository
234        self._sync_mode = sync_mode
235        self._cursor_field = cursor_field
236        self._state = state
237
238    def read(self) -> Iterable[Record]:
239        try:
240            for record_data in self._stream.read_records(
241                cursor_field=self._cursor_field,
242                sync_mode=SyncMode.full_refresh,
243                stream_slice=copy.deepcopy(self._slice),
244                stream_state=self._state,
245            ):
246                if isinstance(record_data, Mapping):
247                    data_to_return = dict(record_data)
248                    self._stream.transformer.transform(
249                        data_to_return, self._stream.get_json_schema()
250                    )
251                    yield Record(data=data_to_return, stream_name=self.stream_name())
252                elif (
253                    isinstance(record_data, AirbyteMessage)
254                    and record_data.type == Type.RECORD
255                    and record_data.record is not None
256                ):
257                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
258                    record_message_data = record_data.record.data
259                    if not record_message_data:
260                        raise ExceptionWithDisplayMessage("A record without data was found")
261                    else:
262                        yield Record(
263                            data=record_message_data,
264                            stream_name=self.stream_name(),
265                            file_reference=record_data.record.file_reference,
266                        )
267                else:
268                    self._message_repository.emit_message(record_data)
269        except Exception as e:
270            display_message = self._stream.get_error_display_message(e)
271            if display_message:
272                raise ExceptionWithDisplayMessage(display_message) from e
273            else:
274                raise e
275
276    def to_slice(self) -> Optional[Mapping[str, Any]]:
277        if self._slice is None:
278            return None
279        assert len(self._slice["files"]) == 1, (
280            f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
281        )
282        file = self._slice["files"][0]
283        return {"files": [file]}
284
285    def __hash__(self) -> int:
286        if self._slice:
287            # Convert the slice to a string so that it can be hashed
288            if len(self._slice["files"]) != 1:
289                raise ValueError(
290                    f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support."
291                )
292            else:
293                s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}"
294            return hash((self._stream.name, s))
295        else:
296            return hash(self._stream.name)
297
298    def stream_name(self) -> str:
299        return self._stream.name
300
301    def __repr__(self) -> str:
302        return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"

A partition is responsible for reading a specific set of data from a source.

FileBasedStreamPartition( stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, _slice: Optional[Mapping[str, Any]], message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
222    def __init__(
223        self,
224        stream: AbstractFileBasedStream,
225        _slice: Optional[Mapping[str, Any]],
226        message_repository: MessageRepository,
227        sync_mode: SyncMode,
228        cursor_field: Optional[List[str]],
229        state: Optional[MutableMapping[str, Any]],
230    ):
231        self._stream = stream
232        self._slice = _slice
233        self._message_repository = message_repository
234        self._sync_mode = sync_mode
235        self._cursor_field = cursor_field
236        self._state = state
def read(self) -> Iterable[airbyte_cdk.Record]:
238    def read(self) -> Iterable[Record]:
239        try:
240            for record_data in self._stream.read_records(
241                cursor_field=self._cursor_field,
242                sync_mode=SyncMode.full_refresh,
243                stream_slice=copy.deepcopy(self._slice),
244                stream_state=self._state,
245            ):
246                if isinstance(record_data, Mapping):
247                    data_to_return = dict(record_data)
248                    self._stream.transformer.transform(
249                        data_to_return, self._stream.get_json_schema()
250                    )
251                    yield Record(data=data_to_return, stream_name=self.stream_name())
252                elif (
253                    isinstance(record_data, AirbyteMessage)
254                    and record_data.type == Type.RECORD
255                    and record_data.record is not None
256                ):
257                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
258                    record_message_data = record_data.record.data
259                    if not record_message_data:
260                        raise ExceptionWithDisplayMessage("A record without data was found")
261                    else:
262                        yield Record(
263                            data=record_message_data,
264                            stream_name=self.stream_name(),
265                            file_reference=record_data.record.file_reference,
266                        )
267                else:
268                    self._message_repository.emit_message(record_data)
269        except Exception as e:
270            display_message = self._stream.get_error_display_message(e)
271            if display_message:
272                raise ExceptionWithDisplayMessage(display_message) from e
273            else:
274                raise e

Reads the data from the partition.

Returns

An iterable of records.

def to_slice(self) -> Optional[Mapping[str, Any]]:
276    def to_slice(self) -> Optional[Mapping[str, Any]]:
277        if self._slice is None:
278            return None
279        assert len(self._slice["files"]) == 1, (
280            f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
281        )
282        file = self._slice["files"][0]
283        return {"files": [file]}

Converts the partition to a slice that can be serialized and deserialized.

Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested values (example)

Returns

A mapping representing a slice

def stream_name(self) -> str:
298    def stream_name(self) -> str:
299        return self._stream.name

Returns the name of the stream that this partition is reading from.

Returns

The name of the stream.

305class FileBasedStreamPartitionGenerator(PartitionGenerator):
306    def __init__(
307        self,
308        stream: AbstractFileBasedStream,
309        message_repository: MessageRepository,
310        sync_mode: SyncMode,
311        cursor_field: Optional[List[str]],
312        state: Optional[MutableMapping[str, Any]],
313        cursor: "AbstractConcurrentFileBasedCursor",
314    ):
315        self._stream = stream
316        self._message_repository = message_repository
317        self._sync_mode = sync_mode
318        self._cursor_field = cursor_field
319        self._state = state
320        self._cursor = cursor
321
322    def generate(self) -> Iterable[FileBasedStreamPartition]:
323        pending_partitions = []
324        for _slice in self._stream.stream_slices(
325            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
326        ):
327            if _slice is not None:
328                for file in _slice.get("files", []):
329                    pending_partitions.append(
330                        FileBasedStreamPartition(
331                            self._stream,
332                            {"files": [copy.deepcopy(file)]},
333                            self._message_repository,
334                            self._sync_mode,
335                            self._cursor_field,
336                            self._state,
337                        )
338                    )
339        self._cursor.set_pending_partitions(pending_partitions)
340        yield from pending_partitions

Helper class that provides a standard way to create an ABC using inheritance.

FileBasedStreamPartitionGenerator( stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor)
306    def __init__(
307        self,
308        stream: AbstractFileBasedStream,
309        message_repository: MessageRepository,
310        sync_mode: SyncMode,
311        cursor_field: Optional[List[str]],
312        state: Optional[MutableMapping[str, Any]],
313        cursor: "AbstractConcurrentFileBasedCursor",
314    ):
315        self._stream = stream
316        self._message_repository = message_repository
317        self._sync_mode = sync_mode
318        self._cursor_field = cursor_field
319        self._state = state
320        self._cursor = cursor
def generate( self) -> Iterable[FileBasedStreamPartition]:
322    def generate(self) -> Iterable[FileBasedStreamPartition]:
323        pending_partitions = []
324        for _slice in self._stream.stream_slices(
325            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
326        ):
327            if _slice is not None:
328                for file in _slice.get("files", []):
329                    pending_partitions.append(
330                        FileBasedStreamPartition(
331                            self._stream,
332                            {"files": [copy.deepcopy(file)]},
333                            self._message_repository,
334                            self._sync_mode,
335                            self._cursor_field,
336                            self._state,
337                        )
338                    )
339        self._cursor.set_pending_partitions(pending_partitions)
340        yield from pending_partitions

Generates partitions for a given sync mode.

Returns

An iterable of partitions