airbyte_cdk.sources.file_based.stream.concurrent.adapters

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import logging
  7from functools import lru_cache
  8from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
  9
 10from typing_extensions import deprecated
 11
 12from airbyte_cdk.models import (
 13    AirbyteLogMessage,
 14    AirbyteMessage,
 15    ConfiguredAirbyteStream,
 16    Level,
 17    SyncMode,
 18    Type,
 19)
 20from airbyte_cdk.sources import AbstractSource
 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 22from airbyte_cdk.sources.file_based.availability_strategy import (
 23    AbstractFileBasedAvailabilityStrategy,
 24)
 25from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
 26from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
 27from airbyte_cdk.sources.file_based.remote_file import RemoteFile
 28from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
 29from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor
 30from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
 31from airbyte_cdk.sources.file_based.types import StreamSlice
 32from airbyte_cdk.sources.message import MessageRepository
 33from airbyte_cdk.sources.source import ExperimentalClassWarning
 34from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 35from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
 36from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 37from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
 38from airbyte_cdk.sources.streams.concurrent.helpers import (
 39    get_cursor_field_from_stream,
 40    get_primary_key_from_stream,
 41)
 42from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 43from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 44from airbyte_cdk.sources.streams.core import StreamData
 45from airbyte_cdk.sources.types import Record
 46from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
 47from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 48
 49if TYPE_CHECKING:
 50    from airbyte_cdk.sources.file_based.stream.concurrent.cursor import (
 51        AbstractConcurrentFileBasedCursor,
 52    )
 53
 54"""
 55This module contains adapters to help enabling concurrency on File-based Stream objects without needing to migrate to AbstractStream
 56"""
 57
 58
 59@deprecated(
 60    "This class is experimental. Use at your own risk.",
 61    category=ExperimentalClassWarning,
 62)
 63class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream):
 64    @classmethod
 65    def create_from_stream(
 66        cls,
 67        stream: AbstractFileBasedStream,
 68        source: AbstractSource,
 69        logger: logging.Logger,
 70        state: Optional[MutableMapping[str, Any]],
 71        cursor: "AbstractConcurrentFileBasedCursor",
 72    ) -> "FileBasedStreamFacade":
 73        """
 74        Create a ConcurrentStream from a FileBasedStream object.
 75        """
 76        pk = get_primary_key_from_stream(stream.primary_key)
 77        cursor_field = get_cursor_field_from_stream(stream)
 78        stream._cursor = cursor
 79
 80        if not source.message_repository:
 81            raise ValueError(
 82                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 83            )
 84
 85        message_repository = source.message_repository
 86        return FileBasedStreamFacade(
 87            DefaultStream(
 88                partition_generator=FileBasedStreamPartitionGenerator(
 89                    stream,
 90                    message_repository,
 91                    SyncMode.full_refresh
 92                    if isinstance(cursor, FileBasedFinalStateCursor)
 93                    else SyncMode.incremental,
 94                    [cursor_field] if cursor_field is not None else None,
 95                    state,
 96                    cursor,
 97                ),
 98                name=stream.name,
 99                json_schema=stream.get_json_schema(),
100                primary_key=pk,
101                cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
102                logger=logger,
103                namespace=stream.namespace,
104                cursor=cursor,
105            ),
106            stream,
107            cursor,
108            logger=logger,
109            slice_logger=source._slice_logger,
110        )
111
112    def __init__(
113        self,
114        stream: DefaultStream,
115        legacy_stream: AbstractFileBasedStream,
116        cursor: AbstractFileBasedCursor,
117        slice_logger: SliceLogger,
118        logger: logging.Logger,
119    ):
120        """
121        :param stream: The underlying AbstractStream
122        """
123        self._abstract_stream = stream
124        self._legacy_stream = legacy_stream
125        self._cursor = cursor
126        self._slice_logger = slice_logger
127        self._logger = logger
128        self.catalog_schema = legacy_stream.catalog_schema
129        self.config = legacy_stream.config
130        self.validation_policy = legacy_stream.validation_policy
131
132    @property
133    def cursor_field(self) -> Union[str, List[str]]:
134        if self._abstract_stream.cursor_field is None:
135            return []
136        else:
137            return self._abstract_stream.cursor_field
138
139    @property
140    def name(self) -> str:
141        return self._abstract_stream.name
142
143    @property
144    def supports_incremental(self) -> bool:
145        return self._legacy_stream.supports_incremental
146
147    @property
148    @deprecated("Deprecated as of CDK version 3.7.0.")
149    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
150        return self._legacy_stream.availability_strategy
151
152    @lru_cache(maxsize=None)
153    def get_json_schema(self) -> Mapping[str, Any]:
154        return self._abstract_stream.get_json_schema()
155
156    @property
157    def primary_key(self) -> PrimaryKeyType:
158        return (
159            self._legacy_stream.config.primary_key
160            or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config)
161        )
162
163    def get_parser(self) -> FileTypeParser:
164        return self._legacy_stream.get_parser()
165
166    def get_files(self) -> Iterable[RemoteFile]:
167        return self._legacy_stream.get_files()
168
169    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]:
170        yield from self._legacy_stream.read_records_from_slice(stream_slice)  # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage
171
172    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
173        return self._legacy_stream.compute_slices()
174
175    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
176        return self._legacy_stream.infer_schema(files)
177
178    def get_underlying_stream(self) -> DefaultStream:
179        return self._abstract_stream
180
181    def read(
182        self,
183        configured_stream: ConfiguredAirbyteStream,
184        logger: logging.Logger,
185        slice_logger: SliceLogger,
186        stream_state: MutableMapping[str, Any],
187        state_manager: ConnectorStateManager,
188        internal_config: InternalConfig,
189    ) -> Iterable[StreamData]:
190        yield from self._read_records()
191
192    def read_records(
193        self,
194        sync_mode: SyncMode,
195        cursor_field: Optional[List[str]] = None,
196        stream_slice: Optional[Mapping[str, Any]] = None,
197        stream_state: Optional[Mapping[str, Any]] = None,
198    ) -> Iterable[StreamData]:
199        try:
200            yield from self._read_records()
201        except Exception as exc:
202            if hasattr(self._cursor, "state"):
203                state = str(self._cursor.state)
204            else:
205                # This shouldn't happen if the ConcurrentCursor was used
206                state = "unknown; no state attribute was available on the cursor"
207            yield AirbyteMessage(
208                type=Type.LOG,
209                log=AirbyteLogMessage(
210                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
211                ),
212            )
213            raise exc
214
215    def _read_records(self) -> Iterable[StreamData]:
216        for partition in self._abstract_stream.generate_partitions():
217            if self._slice_logger.should_log_slice_message(self._logger):
218                yield self._slice_logger.create_slice_log_message(partition.to_slice())
219            for record in partition.read():
220                yield record.data
221
222
223class FileBasedStreamPartition(Partition):
224    def __init__(
225        self,
226        stream: AbstractFileBasedStream,
227        _slice: Optional[Mapping[str, Any]],
228        message_repository: MessageRepository,
229        sync_mode: SyncMode,
230        cursor_field: Optional[List[str]],
231        state: Optional[MutableMapping[str, Any]],
232    ):
233        self._stream = stream
234        self._slice = _slice
235        self._message_repository = message_repository
236        self._sync_mode = sync_mode
237        self._cursor_field = cursor_field
238        self._state = state
239
240    def read(self) -> Iterable[Record]:
241        try:
242            for record_data in self._stream.read_records(
243                cursor_field=self._cursor_field,
244                sync_mode=SyncMode.full_refresh,
245                stream_slice=copy.deepcopy(self._slice),
246                stream_state=self._state,
247            ):
248                if isinstance(record_data, Mapping):
249                    data_to_return = dict(record_data)
250                    self._stream.transformer.transform(
251                        data_to_return, self._stream.get_json_schema()
252                    )
253                    yield Record(data=data_to_return, stream_name=self.stream_name())
254                elif (
255                    isinstance(record_data, AirbyteMessage)
256                    and record_data.type == Type.RECORD
257                    and record_data.record is not None
258                ):
259                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
260                    record_message_data = record_data.record.data
261                    if not record_message_data:
262                        raise ExceptionWithDisplayMessage("A record without data was found")
263                    else:
264                        yield Record(
265                            data=record_message_data,
266                            stream_name=self.stream_name(),
267                            file_reference=record_data.record.file_reference,
268                        )
269                else:
270                    self._message_repository.emit_message(record_data)
271        except Exception as e:
272            display_message = self._stream.get_error_display_message(e)
273            if display_message:
274                raise ExceptionWithDisplayMessage(display_message) from e
275            else:
276                raise e
277
278    def to_slice(self) -> Optional[Mapping[str, Any]]:
279        if self._slice is None:
280            return None
281        assert len(self._slice["files"]) == 1, (
282            f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
283        )
284        file = self._slice["files"][0]
285        return {"files": [file]}
286
287    def __hash__(self) -> int:
288        if self._slice:
289            # Convert the slice to a string so that it can be hashed
290            if len(self._slice["files"]) != 1:
291                raise ValueError(
292                    f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support."
293                )
294            else:
295                s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}"
296            return hash((self._stream.name, s))
297        else:
298            return hash(self._stream.name)
299
300    def stream_name(self) -> str:
301        return self._stream.name
302
303    def __repr__(self) -> str:
304        return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"
305
306
307class FileBasedStreamPartitionGenerator(PartitionGenerator):
308    def __init__(
309        self,
310        stream: AbstractFileBasedStream,
311        message_repository: MessageRepository,
312        sync_mode: SyncMode,
313        cursor_field: Optional[List[str]],
314        state: Optional[MutableMapping[str, Any]],
315        cursor: "AbstractConcurrentFileBasedCursor",
316    ):
317        self._stream = stream
318        self._message_repository = message_repository
319        self._sync_mode = sync_mode
320        self._cursor_field = cursor_field
321        self._state = state
322        self._cursor = cursor
323
324    def generate(self) -> Iterable[FileBasedStreamPartition]:
325        pending_partitions = []
326        for _slice in self._stream.stream_slices(
327            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
328        ):
329            if _slice is not None:
330                for file in _slice.get("files", []):
331                    pending_partitions.append(
332                        FileBasedStreamPartition(
333                            self._stream,
334                            {"files": [copy.deepcopy(file)]},
335                            self._message_repository,
336                            self._sync_mode,
337                            self._cursor_field,
338                            self._state,
339                        )
340                    )
341        self._cursor.set_pending_partitions(pending_partitions)
342        yield from pending_partitions
 60@deprecated(
 61    "This class is experimental. Use at your own risk.",
 62    category=ExperimentalClassWarning,
 63)
 64class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream):
 65    @classmethod
 66    def create_from_stream(
 67        cls,
 68        stream: AbstractFileBasedStream,
 69        source: AbstractSource,
 70        logger: logging.Logger,
 71        state: Optional[MutableMapping[str, Any]],
 72        cursor: "AbstractConcurrentFileBasedCursor",
 73    ) -> "FileBasedStreamFacade":
 74        """
 75        Create a ConcurrentStream from a FileBasedStream object.
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79        stream._cursor = cursor
 80
 81        if not source.message_repository:
 82            raise ValueError(
 83                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 84            )
 85
 86        message_repository = source.message_repository
 87        return FileBasedStreamFacade(
 88            DefaultStream(
 89                partition_generator=FileBasedStreamPartitionGenerator(
 90                    stream,
 91                    message_repository,
 92                    SyncMode.full_refresh
 93                    if isinstance(cursor, FileBasedFinalStateCursor)
 94                    else SyncMode.incremental,
 95                    [cursor_field] if cursor_field is not None else None,
 96                    state,
 97                    cursor,
 98                ),
 99                name=stream.name,
100                json_schema=stream.get_json_schema(),
101                primary_key=pk,
102                cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
103                logger=logger,
104                namespace=stream.namespace,
105                cursor=cursor,
106            ),
107            stream,
108            cursor,
109            logger=logger,
110            slice_logger=source._slice_logger,
111        )
112
113    def __init__(
114        self,
115        stream: DefaultStream,
116        legacy_stream: AbstractFileBasedStream,
117        cursor: AbstractFileBasedCursor,
118        slice_logger: SliceLogger,
119        logger: logging.Logger,
120    ):
121        """
122        :param stream: The underlying AbstractStream
123        """
124        self._abstract_stream = stream
125        self._legacy_stream = legacy_stream
126        self._cursor = cursor
127        self._slice_logger = slice_logger
128        self._logger = logger
129        self.catalog_schema = legacy_stream.catalog_schema
130        self.config = legacy_stream.config
131        self.validation_policy = legacy_stream.validation_policy
132
133    @property
134    def cursor_field(self) -> Union[str, List[str]]:
135        if self._abstract_stream.cursor_field is None:
136            return []
137        else:
138            return self._abstract_stream.cursor_field
139
140    @property
141    def name(self) -> str:
142        return self._abstract_stream.name
143
144    @property
145    def supports_incremental(self) -> bool:
146        return self._legacy_stream.supports_incremental
147
148    @property
149    @deprecated("Deprecated as of CDK version 3.7.0.")
150    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
151        return self._legacy_stream.availability_strategy
152
153    @lru_cache(maxsize=None)
154    def get_json_schema(self) -> Mapping[str, Any]:
155        return self._abstract_stream.get_json_schema()
156
157    @property
158    def primary_key(self) -> PrimaryKeyType:
159        return (
160            self._legacy_stream.config.primary_key
161            or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config)
162        )
163
164    def get_parser(self) -> FileTypeParser:
165        return self._legacy_stream.get_parser()
166
167    def get_files(self) -> Iterable[RemoteFile]:
168        return self._legacy_stream.get_files()
169
170    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]:
171        yield from self._legacy_stream.read_records_from_slice(stream_slice)  # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage
172
173    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
174        return self._legacy_stream.compute_slices()
175
176    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
177        return self._legacy_stream.infer_schema(files)
178
179    def get_underlying_stream(self) -> DefaultStream:
180        return self._abstract_stream
181
182    def read(
183        self,
184        configured_stream: ConfiguredAirbyteStream,
185        logger: logging.Logger,
186        slice_logger: SliceLogger,
187        stream_state: MutableMapping[str, Any],
188        state_manager: ConnectorStateManager,
189        internal_config: InternalConfig,
190    ) -> Iterable[StreamData]:
191        yield from self._read_records()
192
193    def read_records(
194        self,
195        sync_mode: SyncMode,
196        cursor_field: Optional[List[str]] = None,
197        stream_slice: Optional[Mapping[str, Any]] = None,
198        stream_state: Optional[Mapping[str, Any]] = None,
199    ) -> Iterable[StreamData]:
200        try:
201            yield from self._read_records()
202        except Exception as exc:
203            if hasattr(self._cursor, "state"):
204                state = str(self._cursor.state)
205            else:
206                # This shouldn't happen if the ConcurrentCursor was used
207                state = "unknown; no state attribute was available on the cursor"
208            yield AirbyteMessage(
209                type=Type.LOG,
210                log=AirbyteLogMessage(
211                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
212                ),
213            )
214            raise exc
215
216    def _read_records(self) -> Iterable[StreamData]:
217        for partition in self._abstract_stream.generate_partitions():
218            if self._slice_logger.should_log_slice_message(self._logger):
219                yield self._slice_logger.create_slice_log_message(partition.to_slice())
220            for record in partition.read():
221                yield record.data

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default
@classmethod
def create_from_stream( cls, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, source: airbyte_cdk.AbstractSource, logger: logging.Logger, state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor) -> FileBasedStreamFacade:
 65    @classmethod
 66    def create_from_stream(
 67        cls,
 68        stream: AbstractFileBasedStream,
 69        source: AbstractSource,
 70        logger: logging.Logger,
 71        state: Optional[MutableMapping[str, Any]],
 72        cursor: "AbstractConcurrentFileBasedCursor",
 73    ) -> "FileBasedStreamFacade":
 74        """
 75        Create a ConcurrentStream from a FileBasedStream object.
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79        stream._cursor = cursor
 80
 81        if not source.message_repository:
 82            raise ValueError(
 83                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 84            )
 85
 86        message_repository = source.message_repository
 87        return FileBasedStreamFacade(
 88            DefaultStream(
 89                partition_generator=FileBasedStreamPartitionGenerator(
 90                    stream,
 91                    message_repository,
 92                    SyncMode.full_refresh
 93                    if isinstance(cursor, FileBasedFinalStateCursor)
 94                    else SyncMode.incremental,
 95                    [cursor_field] if cursor_field is not None else None,
 96                    state,
 97                    cursor,
 98                ),
 99                name=stream.name,
100                json_schema=stream.get_json_schema(),
101                primary_key=pk,
102                cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None,
103                logger=logger,
104                namespace=stream.namespace,
105                cursor=cursor,
106            ),
107            stream,
108            cursor,
109            logger=logger,
110            slice_logger=source._slice_logger,
111        )

Create a ConcurrentStream from a FileBasedStream object.

cursor_field: Union[str, List[str]]
133    @property
134    def cursor_field(self) -> Union[str, List[str]]:
135        if self._abstract_stream.cursor_field is None:
136            return []
137        else:
138            return self._abstract_stream.cursor_field

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

supports_incremental: bool
144    @property
145    def supports_incremental(self) -> bool:
146        return self._legacy_stream.supports_incremental
Returns

True if this stream supports incrementally reading data

def get_underlying_stream( self) -> airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream:
179    def get_underlying_stream(self) -> DefaultStream:
180        return self._abstract_stream

Return the underlying stream facade object.

def read( self, configured_stream: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, stream_state: MutableMapping[str, Any], state_manager: airbyte_cdk.ConnectorStateManager, internal_config: airbyte_cdk.InternalConfig) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
182    def read(
183        self,
184        configured_stream: ConfiguredAirbyteStream,
185        logger: logging.Logger,
186        slice_logger: SliceLogger,
187        stream_state: MutableMapping[str, Any],
188        state_manager: ConnectorStateManager,
189        internal_config: InternalConfig,
190    ) -> Iterable[StreamData]:
191        yield from self._read_records()
class FileBasedStreamPartition(airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition):
224class FileBasedStreamPartition(Partition):
225    def __init__(
226        self,
227        stream: AbstractFileBasedStream,
228        _slice: Optional[Mapping[str, Any]],
229        message_repository: MessageRepository,
230        sync_mode: SyncMode,
231        cursor_field: Optional[List[str]],
232        state: Optional[MutableMapping[str, Any]],
233    ):
234        self._stream = stream
235        self._slice = _slice
236        self._message_repository = message_repository
237        self._sync_mode = sync_mode
238        self._cursor_field = cursor_field
239        self._state = state
240
241    def read(self) -> Iterable[Record]:
242        try:
243            for record_data in self._stream.read_records(
244                cursor_field=self._cursor_field,
245                sync_mode=SyncMode.full_refresh,
246                stream_slice=copy.deepcopy(self._slice),
247                stream_state=self._state,
248            ):
249                if isinstance(record_data, Mapping):
250                    data_to_return = dict(record_data)
251                    self._stream.transformer.transform(
252                        data_to_return, self._stream.get_json_schema()
253                    )
254                    yield Record(data=data_to_return, stream_name=self.stream_name())
255                elif (
256                    isinstance(record_data, AirbyteMessage)
257                    and record_data.type == Type.RECORD
258                    and record_data.record is not None
259                ):
260                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
261                    record_message_data = record_data.record.data
262                    if not record_message_data:
263                        raise ExceptionWithDisplayMessage("A record without data was found")
264                    else:
265                        yield Record(
266                            data=record_message_data,
267                            stream_name=self.stream_name(),
268                            file_reference=record_data.record.file_reference,
269                        )
270                else:
271                    self._message_repository.emit_message(record_data)
272        except Exception as e:
273            display_message = self._stream.get_error_display_message(e)
274            if display_message:
275                raise ExceptionWithDisplayMessage(display_message) from e
276            else:
277                raise e
278
279    def to_slice(self) -> Optional[Mapping[str, Any]]:
280        if self._slice is None:
281            return None
282        assert len(self._slice["files"]) == 1, (
283            f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
284        )
285        file = self._slice["files"][0]
286        return {"files": [file]}
287
288    def __hash__(self) -> int:
289        if self._slice:
290            # Convert the slice to a string so that it can be hashed
291            if len(self._slice["files"]) != 1:
292                raise ValueError(
293                    f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support."
294                )
295            else:
296                s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}"
297            return hash((self._stream.name, s))
298        else:
299            return hash(self._stream.name)
300
301    def stream_name(self) -> str:
302        return self._stream.name
303
304    def __repr__(self) -> str:
305        return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"

A partition is responsible for reading a specific set of data from a source.

FileBasedStreamPartition( stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, _slice: Optional[Mapping[str, Any]], message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
225    def __init__(
226        self,
227        stream: AbstractFileBasedStream,
228        _slice: Optional[Mapping[str, Any]],
229        message_repository: MessageRepository,
230        sync_mode: SyncMode,
231        cursor_field: Optional[List[str]],
232        state: Optional[MutableMapping[str, Any]],
233    ):
234        self._stream = stream
235        self._slice = _slice
236        self._message_repository = message_repository
237        self._sync_mode = sync_mode
238        self._cursor_field = cursor_field
239        self._state = state
def read(self) -> Iterable[airbyte_cdk.Record]:
241    def read(self) -> Iterable[Record]:
242        try:
243            for record_data in self._stream.read_records(
244                cursor_field=self._cursor_field,
245                sync_mode=SyncMode.full_refresh,
246                stream_slice=copy.deepcopy(self._slice),
247                stream_state=self._state,
248            ):
249                if isinstance(record_data, Mapping):
250                    data_to_return = dict(record_data)
251                    self._stream.transformer.transform(
252                        data_to_return, self._stream.get_json_schema()
253                    )
254                    yield Record(data=data_to_return, stream_name=self.stream_name())
255                elif (
256                    isinstance(record_data, AirbyteMessage)
257                    and record_data.type == Type.RECORD
258                    and record_data.record is not None
259                ):
260                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
261                    record_message_data = record_data.record.data
262                    if not record_message_data:
263                        raise ExceptionWithDisplayMessage("A record without data was found")
264                    else:
265                        yield Record(
266                            data=record_message_data,
267                            stream_name=self.stream_name(),
268                            file_reference=record_data.record.file_reference,
269                        )
270                else:
271                    self._message_repository.emit_message(record_data)
272        except Exception as e:
273            display_message = self._stream.get_error_display_message(e)
274            if display_message:
275                raise ExceptionWithDisplayMessage(display_message) from e
276            else:
277                raise e

Reads the data from the partition.

Returns

An iterable of records.

def to_slice(self) -> Optional[Mapping[str, Any]]:
279    def to_slice(self) -> Optional[Mapping[str, Any]]:
280        if self._slice is None:
281            return None
282        assert len(self._slice["files"]) == 1, (
283            f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
284        )
285        file = self._slice["files"][0]
286        return {"files": [file]}

Converts the partition to a slice that can be serialized and deserialized.

Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested values (example)

Returns

A mapping representing a slice

def stream_name(self) -> str:
301    def stream_name(self) -> str:
302        return self._stream.name

Returns the name of the stream that this partition is reading from.

Returns

The name of the stream.

308class FileBasedStreamPartitionGenerator(PartitionGenerator):
309    def __init__(
310        self,
311        stream: AbstractFileBasedStream,
312        message_repository: MessageRepository,
313        sync_mode: SyncMode,
314        cursor_field: Optional[List[str]],
315        state: Optional[MutableMapping[str, Any]],
316        cursor: "AbstractConcurrentFileBasedCursor",
317    ):
318        self._stream = stream
319        self._message_repository = message_repository
320        self._sync_mode = sync_mode
321        self._cursor_field = cursor_field
322        self._state = state
323        self._cursor = cursor
324
325    def generate(self) -> Iterable[FileBasedStreamPartition]:
326        pending_partitions = []
327        for _slice in self._stream.stream_slices(
328            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
329        ):
330            if _slice is not None:
331                for file in _slice.get("files", []):
332                    pending_partitions.append(
333                        FileBasedStreamPartition(
334                            self._stream,
335                            {"files": [copy.deepcopy(file)]},
336                            self._message_repository,
337                            self._sync_mode,
338                            self._cursor_field,
339                            self._state,
340                        )
341                    )
342        self._cursor.set_pending_partitions(pending_partitions)
343        yield from pending_partitions

Helper class that provides a standard way to create an ABC using inheritance.

FileBasedStreamPartitionGenerator( stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor)
309    def __init__(
310        self,
311        stream: AbstractFileBasedStream,
312        message_repository: MessageRepository,
313        sync_mode: SyncMode,
314        cursor_field: Optional[List[str]],
315        state: Optional[MutableMapping[str, Any]],
316        cursor: "AbstractConcurrentFileBasedCursor",
317    ):
318        self._stream = stream
319        self._message_repository = message_repository
320        self._sync_mode = sync_mode
321        self._cursor_field = cursor_field
322        self._state = state
323        self._cursor = cursor
def generate( self) -> Iterable[FileBasedStreamPartition]:
325    def generate(self) -> Iterable[FileBasedStreamPartition]:
326        pending_partitions = []
327        for _slice in self._stream.stream_slices(
328            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
329        ):
330            if _slice is not None:
331                for file in _slice.get("files", []):
332                    pending_partitions.append(
333                        FileBasedStreamPartition(
334                            self._stream,
335                            {"files": [copy.deepcopy(file)]},
336                            self._message_repository,
337                            self._sync_mode,
338                            self._cursor_field,
339                            self._state,
340                        )
341                    )
342        self._cursor.set_pending_partitions(pending_partitions)
343        yield from pending_partitions

Generates partitions for a given sync mode.

Returns

An iterable of partitions