airbyte_cdk.sources.file_based.stream.concurrent.adapters

  1#
  2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3#
  4
  5import copy
  6import logging
  7from functools import cache, lru_cache
  8from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
  9
 10from typing_extensions import deprecated
 11
 12from airbyte_cdk.models import (
 13    AirbyteLogMessage,
 14    AirbyteMessage,
 15    ConfiguredAirbyteStream,
 16    Level,
 17    SyncMode,
 18    Type,
 19)
 20from airbyte_cdk.sources import AbstractSource
 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
 22from airbyte_cdk.sources.file_based.availability_strategy import (
 23    AbstractFileBasedAvailabilityStrategy,
 24    AbstractFileBasedAvailabilityStrategyWrapper,
 25)
 26from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
 27from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
 28from airbyte_cdk.sources.file_based.remote_file import RemoteFile
 29from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
 30from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor
 31from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
 32from airbyte_cdk.sources.file_based.types import StreamSlice
 33from airbyte_cdk.sources.message import MessageRepository
 34from airbyte_cdk.sources.source import ExperimentalClassWarning
 35from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
 36from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
 37from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
 38from airbyte_cdk.sources.streams.concurrent.helpers import (
 39    get_cursor_field_from_stream,
 40    get_primary_key_from_stream,
 41)
 42from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
 43from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
 44from airbyte_cdk.sources.streams.core import StreamData
 45from airbyte_cdk.sources.types import Record
 46from airbyte_cdk.sources.utils.schema_helpers import InternalConfig
 47from airbyte_cdk.sources.utils.slice_logger import SliceLogger
 48
 49if TYPE_CHECKING:
 50    from airbyte_cdk.sources.file_based.stream.concurrent.cursor import (
 51        AbstractConcurrentFileBasedCursor,
 52    )
 53
 54"""
 55This module contains adapters to help enabling concurrency on File-based Stream objects without needing to migrate to AbstractStream
 56"""
 57
 58
 59@deprecated(
 60    "This class is experimental. Use at your own risk.",
 61    category=ExperimentalClassWarning,
 62)
 63class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream):
 64    @classmethod
 65    def create_from_stream(
 66        cls,
 67        stream: AbstractFileBasedStream,
 68        source: AbstractSource,
 69        logger: logging.Logger,
 70        state: Optional[MutableMapping[str, Any]],
 71        cursor: "AbstractConcurrentFileBasedCursor",
 72    ) -> "FileBasedStreamFacade":
 73        """
 74        Create a ConcurrentStream from a FileBasedStream object.
 75        """
 76        pk = get_primary_key_from_stream(stream.primary_key)
 77        cursor_field = get_cursor_field_from_stream(stream)
 78        stream._cursor = cursor
 79
 80        if not source.message_repository:
 81            raise ValueError(
 82                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 83            )
 84
 85        message_repository = source.message_repository
 86        return FileBasedStreamFacade(
 87            DefaultStream(
 88                partition_generator=FileBasedStreamPartitionGenerator(
 89                    stream,
 90                    message_repository,
 91                    SyncMode.full_refresh
 92                    if isinstance(cursor, FileBasedFinalStateCursor)
 93                    else SyncMode.incremental,
 94                    [cursor_field] if cursor_field is not None else None,
 95                    state,
 96                    cursor,
 97                ),
 98                name=stream.name,
 99                json_schema=stream.get_json_schema(),
100                availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream),
101                primary_key=pk,
102                cursor_field=cursor_field,
103                logger=logger,
104                namespace=stream.namespace,
105                cursor=cursor,
106            ),
107            stream,
108            cursor,
109            logger=logger,
110            slice_logger=source._slice_logger,
111        )
112
113    def __init__(
114        self,
115        stream: DefaultStream,
116        legacy_stream: AbstractFileBasedStream,
117        cursor: AbstractFileBasedCursor,
118        slice_logger: SliceLogger,
119        logger: logging.Logger,
120    ):
121        """
122        :param stream: The underlying AbstractStream
123        """
124        self._abstract_stream = stream
125        self._legacy_stream = legacy_stream
126        self._cursor = cursor
127        self._slice_logger = slice_logger
128        self._logger = logger
129        self.catalog_schema = legacy_stream.catalog_schema
130        self.config = legacy_stream.config
131        self.validation_policy = legacy_stream.validation_policy
132
133    @property
134    def cursor_field(self) -> Union[str, List[str]]:
135        if self._abstract_stream.cursor_field is None:
136            return []
137        else:
138            return self._abstract_stream.cursor_field
139
140    @property
141    def name(self) -> str:
142        return self._abstract_stream.name
143
144    @property
145    def supports_incremental(self) -> bool:
146        return self._legacy_stream.supports_incremental
147
148    @property
149    @deprecated("Deprecated as of CDK version 3.7.0.")
150    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
151        return self._legacy_stream.availability_strategy
152
153    @lru_cache(maxsize=None)
154    def get_json_schema(self) -> Mapping[str, Any]:
155        return self._abstract_stream.get_json_schema()
156
157    @property
158    def primary_key(self) -> PrimaryKeyType:
159        return (
160            self._legacy_stream.config.primary_key
161            or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config)
162        )
163
164    def get_parser(self) -> FileTypeParser:
165        return self._legacy_stream.get_parser()
166
167    def get_files(self) -> Iterable[RemoteFile]:
168        return self._legacy_stream.get_files()
169
170    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]:
171        yield from self._legacy_stream.read_records_from_slice(stream_slice)  # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage
172
173    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
174        return self._legacy_stream.compute_slices()
175
176    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
177        return self._legacy_stream.infer_schema(files)
178
179    def get_underlying_stream(self) -> DefaultStream:
180        return self._abstract_stream
181
182    def read(
183        self,
184        configured_stream: ConfiguredAirbyteStream,
185        logger: logging.Logger,
186        slice_logger: SliceLogger,
187        stream_state: MutableMapping[str, Any],
188        state_manager: ConnectorStateManager,
189        internal_config: InternalConfig,
190    ) -> Iterable[StreamData]:
191        yield from self._read_records()
192
193    def read_records(
194        self,
195        sync_mode: SyncMode,
196        cursor_field: Optional[List[str]] = None,
197        stream_slice: Optional[Mapping[str, Any]] = None,
198        stream_state: Optional[Mapping[str, Any]] = None,
199    ) -> Iterable[StreamData]:
200        try:
201            yield from self._read_records()
202        except Exception as exc:
203            if hasattr(self._cursor, "state"):
204                state = str(self._cursor.state)
205            else:
206                # This shouldn't happen if the ConcurrentCursor was used
207                state = "unknown; no state attribute was available on the cursor"
208            yield AirbyteMessage(
209                type=Type.LOG,
210                log=AirbyteLogMessage(
211                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
212                ),
213            )
214            raise exc
215
216    def _read_records(self) -> Iterable[StreamData]:
217        for partition in self._abstract_stream.generate_partitions():
218            if self._slice_logger.should_log_slice_message(self._logger):
219                yield self._slice_logger.create_slice_log_message(partition.to_slice())
220            for record in partition.read():
221                yield record.data
222
223
224class FileBasedStreamPartition(Partition):
225    def __init__(
226        self,
227        stream: AbstractFileBasedStream,
228        _slice: Optional[Mapping[str, Any]],
229        message_repository: MessageRepository,
230        sync_mode: SyncMode,
231        cursor_field: Optional[List[str]],
232        state: Optional[MutableMapping[str, Any]],
233    ):
234        self._stream = stream
235        self._slice = _slice
236        self._message_repository = message_repository
237        self._sync_mode = sync_mode
238        self._cursor_field = cursor_field
239        self._state = state
240
241    def read(self) -> Iterable[Record]:
242        try:
243            for record_data in self._stream.read_records(
244                cursor_field=self._cursor_field,
245                sync_mode=SyncMode.full_refresh,
246                stream_slice=copy.deepcopy(self._slice),
247                stream_state=self._state,
248            ):
249                if isinstance(record_data, Mapping):
250                    data_to_return = dict(record_data)
251                    self._stream.transformer.transform(
252                        data_to_return, self._stream.get_json_schema()
253                    )
254                    yield Record(data=data_to_return, stream_name=self.stream_name())
255                elif (
256                    isinstance(record_data, AirbyteMessage)
257                    and record_data.type == Type.RECORD
258                    and record_data.record is not None
259                ):
260                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
261                    # If stream is flagged for file_transfer the record should data in file key
262                    record_message_data = (
263                        record_data.record.file
264                        if self._use_file_transfer()
265                        else record_data.record.data
266                    )
267                    if not record_message_data:
268                        raise ExceptionWithDisplayMessage("A record without data was found")
269                    else:
270                        yield Record(
271                            data=record_message_data,
272                            stream_name=self.stream_name(),
273                            is_file_transfer_message=self._use_file_transfer(),
274                        )
275                else:
276                    self._message_repository.emit_message(record_data)
277        except Exception as e:
278            display_message = self._stream.get_error_display_message(e)
279            if display_message:
280                raise ExceptionWithDisplayMessage(display_message) from e
281            else:
282                raise e
283
284    def to_slice(self) -> Optional[Mapping[str, Any]]:
285        if self._slice is None:
286            return None
287        assert (
288            len(self._slice["files"]) == 1
289        ), f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
290        file = self._slice["files"][0]
291        return {"files": [file]}
292
293    def __hash__(self) -> int:
294        if self._slice:
295            # Convert the slice to a string so that it can be hashed
296            if len(self._slice["files"]) != 1:
297                raise ValueError(
298                    f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support."
299                )
300            else:
301                s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}"
302            return hash((self._stream.name, s))
303        else:
304            return hash(self._stream.name)
305
306    def stream_name(self) -> str:
307        return self._stream.name
308
309    @cache
310    def _use_file_transfer(self) -> bool:
311        return hasattr(self._stream, "use_file_transfer") and self._stream.use_file_transfer
312
313    def __repr__(self) -> str:
314        return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"
315
316
317class FileBasedStreamPartitionGenerator(PartitionGenerator):
318    def __init__(
319        self,
320        stream: AbstractFileBasedStream,
321        message_repository: MessageRepository,
322        sync_mode: SyncMode,
323        cursor_field: Optional[List[str]],
324        state: Optional[MutableMapping[str, Any]],
325        cursor: "AbstractConcurrentFileBasedCursor",
326    ):
327        self._stream = stream
328        self._message_repository = message_repository
329        self._sync_mode = sync_mode
330        self._cursor_field = cursor_field
331        self._state = state
332        self._cursor = cursor
333
334    def generate(self) -> Iterable[FileBasedStreamPartition]:
335        pending_partitions = []
336        for _slice in self._stream.stream_slices(
337            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
338        ):
339            if _slice is not None:
340                for file in _slice.get("files", []):
341                    pending_partitions.append(
342                        FileBasedStreamPartition(
343                            self._stream,
344                            {"files": [copy.deepcopy(file)]},
345                            self._message_repository,
346                            self._sync_mode,
347                            self._cursor_field,
348                            self._state,
349                        )
350                    )
351        self._cursor.set_pending_partitions(pending_partitions)
352        yield from pending_partitions
 60@deprecated(
 61    "This class is experimental. Use at your own risk.",
 62    category=ExperimentalClassWarning,
 63)
 64class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream):
 65    @classmethod
 66    def create_from_stream(
 67        cls,
 68        stream: AbstractFileBasedStream,
 69        source: AbstractSource,
 70        logger: logging.Logger,
 71        state: Optional[MutableMapping[str, Any]],
 72        cursor: "AbstractConcurrentFileBasedCursor",
 73    ) -> "FileBasedStreamFacade":
 74        """
 75        Create a ConcurrentStream from a FileBasedStream object.
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79        stream._cursor = cursor
 80
 81        if not source.message_repository:
 82            raise ValueError(
 83                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 84            )
 85
 86        message_repository = source.message_repository
 87        return FileBasedStreamFacade(
 88            DefaultStream(
 89                partition_generator=FileBasedStreamPartitionGenerator(
 90                    stream,
 91                    message_repository,
 92                    SyncMode.full_refresh
 93                    if isinstance(cursor, FileBasedFinalStateCursor)
 94                    else SyncMode.incremental,
 95                    [cursor_field] if cursor_field is not None else None,
 96                    state,
 97                    cursor,
 98                ),
 99                name=stream.name,
100                json_schema=stream.get_json_schema(),
101                availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream),
102                primary_key=pk,
103                cursor_field=cursor_field,
104                logger=logger,
105                namespace=stream.namespace,
106                cursor=cursor,
107            ),
108            stream,
109            cursor,
110            logger=logger,
111            slice_logger=source._slice_logger,
112        )
113
114    def __init__(
115        self,
116        stream: DefaultStream,
117        legacy_stream: AbstractFileBasedStream,
118        cursor: AbstractFileBasedCursor,
119        slice_logger: SliceLogger,
120        logger: logging.Logger,
121    ):
122        """
123        :param stream: The underlying AbstractStream
124        """
125        self._abstract_stream = stream
126        self._legacy_stream = legacy_stream
127        self._cursor = cursor
128        self._slice_logger = slice_logger
129        self._logger = logger
130        self.catalog_schema = legacy_stream.catalog_schema
131        self.config = legacy_stream.config
132        self.validation_policy = legacy_stream.validation_policy
133
134    @property
135    def cursor_field(self) -> Union[str, List[str]]:
136        if self._abstract_stream.cursor_field is None:
137            return []
138        else:
139            return self._abstract_stream.cursor_field
140
141    @property
142    def name(self) -> str:
143        return self._abstract_stream.name
144
145    @property
146    def supports_incremental(self) -> bool:
147        return self._legacy_stream.supports_incremental
148
149    @property
150    @deprecated("Deprecated as of CDK version 3.7.0.")
151    def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
152        return self._legacy_stream.availability_strategy
153
154    @lru_cache(maxsize=None)
155    def get_json_schema(self) -> Mapping[str, Any]:
156        return self._abstract_stream.get_json_schema()
157
158    @property
159    def primary_key(self) -> PrimaryKeyType:
160        return (
161            self._legacy_stream.config.primary_key
162            or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config)
163        )
164
165    def get_parser(self) -> FileTypeParser:
166        return self._legacy_stream.get_parser()
167
168    def get_files(self) -> Iterable[RemoteFile]:
169        return self._legacy_stream.get_files()
170
171    def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]:
172        yield from self._legacy_stream.read_records_from_slice(stream_slice)  # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage
173
174    def compute_slices(self) -> Iterable[Optional[StreamSlice]]:
175        return self._legacy_stream.compute_slices()
176
177    def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
178        return self._legacy_stream.infer_schema(files)
179
180    def get_underlying_stream(self) -> DefaultStream:
181        return self._abstract_stream
182
183    def read(
184        self,
185        configured_stream: ConfiguredAirbyteStream,
186        logger: logging.Logger,
187        slice_logger: SliceLogger,
188        stream_state: MutableMapping[str, Any],
189        state_manager: ConnectorStateManager,
190        internal_config: InternalConfig,
191    ) -> Iterable[StreamData]:
192        yield from self._read_records()
193
194    def read_records(
195        self,
196        sync_mode: SyncMode,
197        cursor_field: Optional[List[str]] = None,
198        stream_slice: Optional[Mapping[str, Any]] = None,
199        stream_state: Optional[Mapping[str, Any]] = None,
200    ) -> Iterable[StreamData]:
201        try:
202            yield from self._read_records()
203        except Exception as exc:
204            if hasattr(self._cursor, "state"):
205                state = str(self._cursor.state)
206            else:
207                # This shouldn't happen if the ConcurrentCursor was used
208                state = "unknown; no state attribute was available on the cursor"
209            yield AirbyteMessage(
210                type=Type.LOG,
211                log=AirbyteLogMessage(
212                    level=Level.ERROR, message=f"Cursor State at time of exception: {state}"
213                ),
214            )
215            raise exc
216
217    def _read_records(self) -> Iterable[StreamData]:
218        for partition in self._abstract_stream.generate_partitions():
219            if self._slice_logger.should_log_slice_message(self._logger):
220                yield self._slice_logger.create_slice_log_message(partition.to_slice())
221            for record in partition.read():
222                yield record.data

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

@classmethod
def create_from_stream( cls, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, source: airbyte_cdk.AbstractSource, logger: logging.Logger, state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor) -> FileBasedStreamFacade:
 65    @classmethod
 66    def create_from_stream(
 67        cls,
 68        stream: AbstractFileBasedStream,
 69        source: AbstractSource,
 70        logger: logging.Logger,
 71        state: Optional[MutableMapping[str, Any]],
 72        cursor: "AbstractConcurrentFileBasedCursor",
 73    ) -> "FileBasedStreamFacade":
 74        """
 75        Create a ConcurrentStream from a FileBasedStream object.
 76        """
 77        pk = get_primary_key_from_stream(stream.primary_key)
 78        cursor_field = get_cursor_field_from_stream(stream)
 79        stream._cursor = cursor
 80
 81        if not source.message_repository:
 82            raise ValueError(
 83                "A message repository is required to emit non-record messages. Please set the message repository on the source."
 84            )
 85
 86        message_repository = source.message_repository
 87        return FileBasedStreamFacade(
 88            DefaultStream(
 89                partition_generator=FileBasedStreamPartitionGenerator(
 90                    stream,
 91                    message_repository,
 92                    SyncMode.full_refresh
 93                    if isinstance(cursor, FileBasedFinalStateCursor)
 94                    else SyncMode.incremental,
 95                    [cursor_field] if cursor_field is not None else None,
 96                    state,
 97                    cursor,
 98                ),
 99                name=stream.name,
100                json_schema=stream.get_json_schema(),
101                availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream),
102                primary_key=pk,
103                cursor_field=cursor_field,
104                logger=logger,
105                namespace=stream.namespace,
106                cursor=cursor,
107            ),
108            stream,
109            cursor,
110            logger=logger,
111            slice_logger=source._slice_logger,
112        )

Create a ConcurrentStream from a FileBasedStream object.

cursor_field: Union[str, List[str]]
134    @property
135    def cursor_field(self) -> Union[str, List[str]]:
136        if self._abstract_stream.cursor_field is None:
137            return []
138        else:
139            return self._abstract_stream.cursor_field

Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.

Returns

The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.

supports_incremental: bool
145    @property
146    def supports_incremental(self) -> bool:
147        return self._legacy_stream.supports_incremental
Returns

True if this stream supports incrementally reading data

def get_underlying_stream( self) -> airbyte_cdk.sources.streams.concurrent.default_stream.DefaultStream:
180    def get_underlying_stream(self) -> DefaultStream:
181        return self._abstract_stream

Return the underlying stream facade object.

def read( self, configured_stream: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream, logger: logging.Logger, slice_logger: airbyte_cdk.sources.utils.slice_logger.SliceLogger, stream_state: MutableMapping[str, Any], state_manager: airbyte_cdk.ConnectorStateManager, internal_config: airbyte_cdk.InternalConfig) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
183    def read(
184        self,
185        configured_stream: ConfiguredAirbyteStream,
186        logger: logging.Logger,
187        slice_logger: SliceLogger,
188        stream_state: MutableMapping[str, Any],
189        state_manager: ConnectorStateManager,
190        internal_config: InternalConfig,
191    ) -> Iterable[StreamData]:
192        yield from self._read_records()
class FileBasedStreamPartition(airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition):
225class FileBasedStreamPartition(Partition):
226    def __init__(
227        self,
228        stream: AbstractFileBasedStream,
229        _slice: Optional[Mapping[str, Any]],
230        message_repository: MessageRepository,
231        sync_mode: SyncMode,
232        cursor_field: Optional[List[str]],
233        state: Optional[MutableMapping[str, Any]],
234    ):
235        self._stream = stream
236        self._slice = _slice
237        self._message_repository = message_repository
238        self._sync_mode = sync_mode
239        self._cursor_field = cursor_field
240        self._state = state
241
242    def read(self) -> Iterable[Record]:
243        try:
244            for record_data in self._stream.read_records(
245                cursor_field=self._cursor_field,
246                sync_mode=SyncMode.full_refresh,
247                stream_slice=copy.deepcopy(self._slice),
248                stream_state=self._state,
249            ):
250                if isinstance(record_data, Mapping):
251                    data_to_return = dict(record_data)
252                    self._stream.transformer.transform(
253                        data_to_return, self._stream.get_json_schema()
254                    )
255                    yield Record(data=data_to_return, stream_name=self.stream_name())
256                elif (
257                    isinstance(record_data, AirbyteMessage)
258                    and record_data.type == Type.RECORD
259                    and record_data.record is not None
260                ):
261                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
262                    # If stream is flagged for file_transfer the record should data in file key
263                    record_message_data = (
264                        record_data.record.file
265                        if self._use_file_transfer()
266                        else record_data.record.data
267                    )
268                    if not record_message_data:
269                        raise ExceptionWithDisplayMessage("A record without data was found")
270                    else:
271                        yield Record(
272                            data=record_message_data,
273                            stream_name=self.stream_name(),
274                            is_file_transfer_message=self._use_file_transfer(),
275                        )
276                else:
277                    self._message_repository.emit_message(record_data)
278        except Exception as e:
279            display_message = self._stream.get_error_display_message(e)
280            if display_message:
281                raise ExceptionWithDisplayMessage(display_message) from e
282            else:
283                raise e
284
285    def to_slice(self) -> Optional[Mapping[str, Any]]:
286        if self._slice is None:
287            return None
288        assert (
289            len(self._slice["files"]) == 1
290        ), f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
291        file = self._slice["files"][0]
292        return {"files": [file]}
293
294    def __hash__(self) -> int:
295        if self._slice:
296            # Convert the slice to a string so that it can be hashed
297            if len(self._slice["files"]) != 1:
298                raise ValueError(
299                    f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support."
300                )
301            else:
302                s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}"
303            return hash((self._stream.name, s))
304        else:
305            return hash(self._stream.name)
306
307    def stream_name(self) -> str:
308        return self._stream.name
309
310    @cache
311    def _use_file_transfer(self) -> bool:
312        return hasattr(self._stream, "use_file_transfer") and self._stream.use_file_transfer
313
314    def __repr__(self) -> str:
315        return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"

A partition is responsible for reading a specific set of data from a source.

FileBasedStreamPartition( stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, _slice: Optional[Mapping[str, Any]], message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]])
226    def __init__(
227        self,
228        stream: AbstractFileBasedStream,
229        _slice: Optional[Mapping[str, Any]],
230        message_repository: MessageRepository,
231        sync_mode: SyncMode,
232        cursor_field: Optional[List[str]],
233        state: Optional[MutableMapping[str, Any]],
234    ):
235        self._stream = stream
236        self._slice = _slice
237        self._message_repository = message_repository
238        self._sync_mode = sync_mode
239        self._cursor_field = cursor_field
240        self._state = state
def read(self) -> Iterable[airbyte_cdk.Record]:
242    def read(self) -> Iterable[Record]:
243        try:
244            for record_data in self._stream.read_records(
245                cursor_field=self._cursor_field,
246                sync_mode=SyncMode.full_refresh,
247                stream_slice=copy.deepcopy(self._slice),
248                stream_state=self._state,
249            ):
250                if isinstance(record_data, Mapping):
251                    data_to_return = dict(record_data)
252                    self._stream.transformer.transform(
253                        data_to_return, self._stream.get_json_schema()
254                    )
255                    yield Record(data=data_to_return, stream_name=self.stream_name())
256                elif (
257                    isinstance(record_data, AirbyteMessage)
258                    and record_data.type == Type.RECORD
259                    and record_data.record is not None
260                ):
261                    # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
262                    # If stream is flagged for file_transfer the record should data in file key
263                    record_message_data = (
264                        record_data.record.file
265                        if self._use_file_transfer()
266                        else record_data.record.data
267                    )
268                    if not record_message_data:
269                        raise ExceptionWithDisplayMessage("A record without data was found")
270                    else:
271                        yield Record(
272                            data=record_message_data,
273                            stream_name=self.stream_name(),
274                            is_file_transfer_message=self._use_file_transfer(),
275                        )
276                else:
277                    self._message_repository.emit_message(record_data)
278        except Exception as e:
279            display_message = self._stream.get_error_display_message(e)
280            if display_message:
281                raise ExceptionWithDisplayMessage(display_message) from e
282            else:
283                raise e

Reads the data from the partition.

Returns

An iterable of records.

def to_slice(self) -> Optional[Mapping[str, Any]]:
285    def to_slice(self) -> Optional[Mapping[str, Any]]:
286        if self._slice is None:
287            return None
288        assert (
289            len(self._slice["files"]) == 1
290        ), f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}"
291        file = self._slice["files"][0]
292        return {"files": [file]}

Converts the partition to a slice that can be serialized and deserialized.

Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested values (example)

Returns

A mapping representing a slice

def stream_name(self) -> str:
307    def stream_name(self) -> str:
308        return self._stream.name

Returns the name of the stream that this partition is reading from.

Returns

The name of the stream.

318class FileBasedStreamPartitionGenerator(PartitionGenerator):
319    def __init__(
320        self,
321        stream: AbstractFileBasedStream,
322        message_repository: MessageRepository,
323        sync_mode: SyncMode,
324        cursor_field: Optional[List[str]],
325        state: Optional[MutableMapping[str, Any]],
326        cursor: "AbstractConcurrentFileBasedCursor",
327    ):
328        self._stream = stream
329        self._message_repository = message_repository
330        self._sync_mode = sync_mode
331        self._cursor_field = cursor_field
332        self._state = state
333        self._cursor = cursor
334
335    def generate(self) -> Iterable[FileBasedStreamPartition]:
336        pending_partitions = []
337        for _slice in self._stream.stream_slices(
338            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
339        ):
340            if _slice is not None:
341                for file in _slice.get("files", []):
342                    pending_partitions.append(
343                        FileBasedStreamPartition(
344                            self._stream,
345                            {"files": [copy.deepcopy(file)]},
346                            self._message_repository,
347                            self._sync_mode,
348                            self._cursor_field,
349                            self._state,
350                        )
351                    )
352        self._cursor.set_pending_partitions(pending_partitions)
353        yield from pending_partitions

Helper class that provides a standard way to create an ABC using inheritance.

FileBasedStreamPartitionGenerator( stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, message_repository: airbyte_cdk.MessageRepository, sync_mode: airbyte_protocol_dataclasses.models.airbyte_protocol.SyncMode, cursor_field: Optional[List[str]], state: Optional[MutableMapping[str, Any]], cursor: airbyte_cdk.sources.file_based.stream.concurrent.cursor.AbstractConcurrentFileBasedCursor)
319    def __init__(
320        self,
321        stream: AbstractFileBasedStream,
322        message_repository: MessageRepository,
323        sync_mode: SyncMode,
324        cursor_field: Optional[List[str]],
325        state: Optional[MutableMapping[str, Any]],
326        cursor: "AbstractConcurrentFileBasedCursor",
327    ):
328        self._stream = stream
329        self._message_repository = message_repository
330        self._sync_mode = sync_mode
331        self._cursor_field = cursor_field
332        self._state = state
333        self._cursor = cursor
def generate( self) -> Iterable[FileBasedStreamPartition]:
335    def generate(self) -> Iterable[FileBasedStreamPartition]:
336        pending_partitions = []
337        for _slice in self._stream.stream_slices(
338            sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state
339        ):
340            if _slice is not None:
341                for file in _slice.get("files", []):
342                    pending_partitions.append(
343                        FileBasedStreamPartition(
344                            self._stream,
345                            {"files": [copy.deepcopy(file)]},
346                            self._message_repository,
347                            self._sync_mode,
348                            self._cursor_field,
349                            self._state,
350                        )
351                    )
352        self._cursor.set_pending_partitions(pending_partitions)
353        yield from pending_partitions

Generates partitions for a given sync mode.

Returns

An iterable of partitions