airbyte_cdk.sources.file_based.stream.concurrent.adapters
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import logging 7from functools import lru_cache 8from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union 9 10from typing_extensions import deprecated 11 12from airbyte_cdk.models import ( 13 AirbyteLogMessage, 14 AirbyteMessage, 15 ConfiguredAirbyteStream, 16 Level, 17 SyncMode, 18 Type, 19) 20from airbyte_cdk.sources import AbstractSource 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 22from airbyte_cdk.sources.file_based.availability_strategy import ( 23 AbstractFileBasedAvailabilityStrategy, 24) 25from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType 26from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser 27from airbyte_cdk.sources.file_based.remote_file import RemoteFile 28from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream 29from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor 30from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor 31from airbyte_cdk.sources.file_based.types import StreamSlice 32from airbyte_cdk.sources.message import MessageRepository 33from airbyte_cdk.sources.source import ExperimentalClassWarning 34from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 35from airbyte_cdk.sources.streams.concurrent.cursor import CursorField 36from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 37from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage 38from airbyte_cdk.sources.streams.concurrent.helpers import ( 39 get_cursor_field_from_stream, 40 get_primary_key_from_stream, 41) 42from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 43from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 44from airbyte_cdk.sources.streams.core import StreamData 45from airbyte_cdk.sources.types import Record 46from airbyte_cdk.sources.utils.schema_helpers import InternalConfig 47from airbyte_cdk.sources.utils.slice_logger import SliceLogger 48 49if TYPE_CHECKING: 50 from airbyte_cdk.sources.file_based.stream.concurrent.cursor import ( 51 AbstractConcurrentFileBasedCursor, 52 ) 53 54""" 55This module contains adapters to help enabling concurrency on File-based Stream objects without needing to migrate to AbstractStream 56""" 57 58 59@deprecated( 60 "This class is experimental. Use at your own risk.", 61 category=ExperimentalClassWarning, 62) 63class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream): 64 @classmethod 65 def create_from_stream( 66 cls, 67 stream: AbstractFileBasedStream, 68 source: AbstractSource, 69 logger: logging.Logger, 70 state: Optional[MutableMapping[str, Any]], 71 cursor: "AbstractConcurrentFileBasedCursor", 72 ) -> "FileBasedStreamFacade": 73 """ 74 Create a ConcurrentStream from a FileBasedStream object. 75 """ 76 pk = get_primary_key_from_stream(stream.primary_key) 77 cursor_field = get_cursor_field_from_stream(stream) 78 stream._cursor = cursor 79 80 if not source.message_repository: 81 raise ValueError( 82 "A message repository is required to emit non-record messages. Please set the message repository on the source." 83 ) 84 85 message_repository = source.message_repository 86 return FileBasedStreamFacade( 87 DefaultStream( 88 partition_generator=FileBasedStreamPartitionGenerator( 89 stream, 90 message_repository, 91 SyncMode.full_refresh 92 if isinstance(cursor, FileBasedFinalStateCursor) 93 else SyncMode.incremental, 94 [cursor_field] if cursor_field is not None else None, 95 state, 96 cursor, 97 ), 98 name=stream.name, 99 json_schema=stream.get_json_schema(), 100 primary_key=pk, 101 cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, 102 logger=logger, 103 namespace=stream.namespace, 104 cursor=cursor, 105 ), 106 stream, 107 cursor, 108 logger=logger, 109 slice_logger=source._slice_logger, 110 ) 111 112 def __init__( 113 self, 114 stream: DefaultStream, 115 legacy_stream: AbstractFileBasedStream, 116 cursor: AbstractFileBasedCursor, 117 slice_logger: SliceLogger, 118 logger: logging.Logger, 119 ): 120 """ 121 :param stream: The underlying AbstractStream 122 """ 123 self._abstract_stream = stream 124 self._legacy_stream = legacy_stream 125 self._cursor = cursor 126 self._slice_logger = slice_logger 127 self._logger = logger 128 self.catalog_schema = legacy_stream.catalog_schema 129 self.config = legacy_stream.config 130 self.validation_policy = legacy_stream.validation_policy 131 132 @property 133 def cursor_field(self) -> Union[str, List[str]]: 134 if self._abstract_stream.cursor_field is None: 135 return [] 136 else: 137 return self._abstract_stream.cursor_field 138 139 @property 140 def name(self) -> str: 141 return self._abstract_stream.name 142 143 @property 144 def supports_incremental(self) -> bool: 145 return self._legacy_stream.supports_incremental 146 147 @property 148 @deprecated("Deprecated as of CDK version 3.7.0.") 149 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 150 return self._legacy_stream.availability_strategy 151 152 @lru_cache(maxsize=None) 153 def get_json_schema(self) -> Mapping[str, Any]: 154 return self._abstract_stream.get_json_schema() 155 156 @property 157 def primary_key(self) -> PrimaryKeyType: 158 return ( 159 self._legacy_stream.config.primary_key 160 or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config) 161 ) 162 163 def get_parser(self) -> FileTypeParser: 164 return self._legacy_stream.get_parser() 165 166 def get_files(self) -> Iterable[RemoteFile]: 167 return self._legacy_stream.get_files() 168 169 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]: 170 yield from self._legacy_stream.read_records_from_slice(stream_slice) # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage 171 172 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 173 return self._legacy_stream.compute_slices() 174 175 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 176 return self._legacy_stream.infer_schema(files) 177 178 def get_underlying_stream(self) -> DefaultStream: 179 return self._abstract_stream 180 181 def read( 182 self, 183 configured_stream: ConfiguredAirbyteStream, 184 logger: logging.Logger, 185 slice_logger: SliceLogger, 186 stream_state: MutableMapping[str, Any], 187 state_manager: ConnectorStateManager, 188 internal_config: InternalConfig, 189 ) -> Iterable[StreamData]: 190 yield from self._read_records() 191 192 def read_records( 193 self, 194 sync_mode: SyncMode, 195 cursor_field: Optional[List[str]] = None, 196 stream_slice: Optional[Mapping[str, Any]] = None, 197 stream_state: Optional[Mapping[str, Any]] = None, 198 ) -> Iterable[StreamData]: 199 try: 200 yield from self._read_records() 201 except Exception as exc: 202 if hasattr(self._cursor, "state"): 203 state = str(self._cursor.state) 204 else: 205 # This shouldn't happen if the ConcurrentCursor was used 206 state = "unknown; no state attribute was available on the cursor" 207 yield AirbyteMessage( 208 type=Type.LOG, 209 log=AirbyteLogMessage( 210 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 211 ), 212 ) 213 raise exc 214 215 def _read_records(self) -> Iterable[StreamData]: 216 for partition in self._abstract_stream.generate_partitions(): 217 if self._slice_logger.should_log_slice_message(self._logger): 218 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 219 for record in partition.read(): 220 yield record.data 221 222 223class FileBasedStreamPartition(Partition): 224 def __init__( 225 self, 226 stream: AbstractFileBasedStream, 227 _slice: Optional[Mapping[str, Any]], 228 message_repository: MessageRepository, 229 sync_mode: SyncMode, 230 cursor_field: Optional[List[str]], 231 state: Optional[MutableMapping[str, Any]], 232 ): 233 self._stream = stream 234 self._slice = _slice 235 self._message_repository = message_repository 236 self._sync_mode = sync_mode 237 self._cursor_field = cursor_field 238 self._state = state 239 240 def read(self) -> Iterable[Record]: 241 try: 242 for record_data in self._stream.read_records( 243 cursor_field=self._cursor_field, 244 sync_mode=SyncMode.full_refresh, 245 stream_slice=copy.deepcopy(self._slice), 246 stream_state=self._state, 247 ): 248 if isinstance(record_data, Mapping): 249 data_to_return = dict(record_data) 250 self._stream.transformer.transform( 251 data_to_return, self._stream.get_json_schema() 252 ) 253 yield Record(data=data_to_return, stream_name=self.stream_name()) 254 elif ( 255 isinstance(record_data, AirbyteMessage) 256 and record_data.type == Type.RECORD 257 and record_data.record is not None 258 ): 259 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 260 record_message_data = record_data.record.data 261 if not record_message_data: 262 raise ExceptionWithDisplayMessage("A record without data was found") 263 else: 264 yield Record( 265 data=record_message_data, 266 stream_name=self.stream_name(), 267 file_reference=record_data.record.file_reference, 268 ) 269 else: 270 self._message_repository.emit_message(record_data) 271 except Exception as e: 272 display_message = self._stream.get_error_display_message(e) 273 if display_message: 274 raise ExceptionWithDisplayMessage(display_message) from e 275 else: 276 raise e 277 278 def to_slice(self) -> Optional[Mapping[str, Any]]: 279 if self._slice is None: 280 return None 281 assert len(self._slice["files"]) == 1, ( 282 f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 283 ) 284 file = self._slice["files"][0] 285 return {"files": [file]} 286 287 def __hash__(self) -> int: 288 if self._slice: 289 # Convert the slice to a string so that it can be hashed 290 if len(self._slice["files"]) != 1: 291 raise ValueError( 292 f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support." 293 ) 294 else: 295 s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}" 296 return hash((self._stream.name, s)) 297 else: 298 return hash(self._stream.name) 299 300 def stream_name(self) -> str: 301 return self._stream.name 302 303 def __repr__(self) -> str: 304 return f"FileBasedStreamPartition({self._stream.name}, {self._slice})" 305 306 307class FileBasedStreamPartitionGenerator(PartitionGenerator): 308 def __init__( 309 self, 310 stream: AbstractFileBasedStream, 311 message_repository: MessageRepository, 312 sync_mode: SyncMode, 313 cursor_field: Optional[List[str]], 314 state: Optional[MutableMapping[str, Any]], 315 cursor: "AbstractConcurrentFileBasedCursor", 316 ): 317 self._stream = stream 318 self._message_repository = message_repository 319 self._sync_mode = sync_mode 320 self._cursor_field = cursor_field 321 self._state = state 322 self._cursor = cursor 323 324 def generate(self) -> Iterable[FileBasedStreamPartition]: 325 pending_partitions = [] 326 for _slice in self._stream.stream_slices( 327 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 328 ): 329 if _slice is not None: 330 for file in _slice.get("files", []): 331 pending_partitions.append( 332 FileBasedStreamPartition( 333 self._stream, 334 {"files": [copy.deepcopy(file)]}, 335 self._message_repository, 336 self._sync_mode, 337 self._cursor_field, 338 self._state, 339 ) 340 ) 341 self._cursor.set_pending_partitions(pending_partitions) 342 yield from pending_partitions
60@deprecated( 61 "This class is experimental. Use at your own risk.", 62 category=ExperimentalClassWarning, 63) 64class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream): 65 @classmethod 66 def create_from_stream( 67 cls, 68 stream: AbstractFileBasedStream, 69 source: AbstractSource, 70 logger: logging.Logger, 71 state: Optional[MutableMapping[str, Any]], 72 cursor: "AbstractConcurrentFileBasedCursor", 73 ) -> "FileBasedStreamFacade": 74 """ 75 Create a ConcurrentStream from a FileBasedStream object. 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 stream._cursor = cursor 80 81 if not source.message_repository: 82 raise ValueError( 83 "A message repository is required to emit non-record messages. Please set the message repository on the source." 84 ) 85 86 message_repository = source.message_repository 87 return FileBasedStreamFacade( 88 DefaultStream( 89 partition_generator=FileBasedStreamPartitionGenerator( 90 stream, 91 message_repository, 92 SyncMode.full_refresh 93 if isinstance(cursor, FileBasedFinalStateCursor) 94 else SyncMode.incremental, 95 [cursor_field] if cursor_field is not None else None, 96 state, 97 cursor, 98 ), 99 name=stream.name, 100 json_schema=stream.get_json_schema(), 101 primary_key=pk, 102 cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, 103 logger=logger, 104 namespace=stream.namespace, 105 cursor=cursor, 106 ), 107 stream, 108 cursor, 109 logger=logger, 110 slice_logger=source._slice_logger, 111 ) 112 113 def __init__( 114 self, 115 stream: DefaultStream, 116 legacy_stream: AbstractFileBasedStream, 117 cursor: AbstractFileBasedCursor, 118 slice_logger: SliceLogger, 119 logger: logging.Logger, 120 ): 121 """ 122 :param stream: The underlying AbstractStream 123 """ 124 self._abstract_stream = stream 125 self._legacy_stream = legacy_stream 126 self._cursor = cursor 127 self._slice_logger = slice_logger 128 self._logger = logger 129 self.catalog_schema = legacy_stream.catalog_schema 130 self.config = legacy_stream.config 131 self.validation_policy = legacy_stream.validation_policy 132 133 @property 134 def cursor_field(self) -> Union[str, List[str]]: 135 if self._abstract_stream.cursor_field is None: 136 return [] 137 else: 138 return self._abstract_stream.cursor_field 139 140 @property 141 def name(self) -> str: 142 return self._abstract_stream.name 143 144 @property 145 def supports_incremental(self) -> bool: 146 return self._legacy_stream.supports_incremental 147 148 @property 149 @deprecated("Deprecated as of CDK version 3.7.0.") 150 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 151 return self._legacy_stream.availability_strategy 152 153 @lru_cache(maxsize=None) 154 def get_json_schema(self) -> Mapping[str, Any]: 155 return self._abstract_stream.get_json_schema() 156 157 @property 158 def primary_key(self) -> PrimaryKeyType: 159 return ( 160 self._legacy_stream.config.primary_key 161 or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config) 162 ) 163 164 def get_parser(self) -> FileTypeParser: 165 return self._legacy_stream.get_parser() 166 167 def get_files(self) -> Iterable[RemoteFile]: 168 return self._legacy_stream.get_files() 169 170 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]: 171 yield from self._legacy_stream.read_records_from_slice(stream_slice) # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage 172 173 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 174 return self._legacy_stream.compute_slices() 175 176 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 177 return self._legacy_stream.infer_schema(files) 178 179 def get_underlying_stream(self) -> DefaultStream: 180 return self._abstract_stream 181 182 def read( 183 self, 184 configured_stream: ConfiguredAirbyteStream, 185 logger: logging.Logger, 186 slice_logger: SliceLogger, 187 stream_state: MutableMapping[str, Any], 188 state_manager: ConnectorStateManager, 189 internal_config: InternalConfig, 190 ) -> Iterable[StreamData]: 191 yield from self._read_records() 192 193 def read_records( 194 self, 195 sync_mode: SyncMode, 196 cursor_field: Optional[List[str]] = None, 197 stream_slice: Optional[Mapping[str, Any]] = None, 198 stream_state: Optional[Mapping[str, Any]] = None, 199 ) -> Iterable[StreamData]: 200 try: 201 yield from self._read_records() 202 except Exception as exc: 203 if hasattr(self._cursor, "state"): 204 state = str(self._cursor.state) 205 else: 206 # This shouldn't happen if the ConcurrentCursor was used 207 state = "unknown; no state attribute was available on the cursor" 208 yield AirbyteMessage( 209 type=Type.LOG, 210 log=AirbyteLogMessage( 211 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 212 ), 213 ) 214 raise exc 215 216 def _read_records(self) -> Iterable[StreamData]: 217 for partition in self._abstract_stream.generate_partitions(): 218 if self._slice_logger.should_log_slice_message(self._logger): 219 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 220 for record in partition.read(): 221 yield record.data
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]:
def __getitem__(self, key: KT) -> VT:
...
# Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
try:
return mapping[key]
except KeyError:
return default
65 @classmethod 66 def create_from_stream( 67 cls, 68 stream: AbstractFileBasedStream, 69 source: AbstractSource, 70 logger: logging.Logger, 71 state: Optional[MutableMapping[str, Any]], 72 cursor: "AbstractConcurrentFileBasedCursor", 73 ) -> "FileBasedStreamFacade": 74 """ 75 Create a ConcurrentStream from a FileBasedStream object. 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 stream._cursor = cursor 80 81 if not source.message_repository: 82 raise ValueError( 83 "A message repository is required to emit non-record messages. Please set the message repository on the source." 84 ) 85 86 message_repository = source.message_repository 87 return FileBasedStreamFacade( 88 DefaultStream( 89 partition_generator=FileBasedStreamPartitionGenerator( 90 stream, 91 message_repository, 92 SyncMode.full_refresh 93 if isinstance(cursor, FileBasedFinalStateCursor) 94 else SyncMode.incremental, 95 [cursor_field] if cursor_field is not None else None, 96 state, 97 cursor, 98 ), 99 name=stream.name, 100 json_schema=stream.get_json_schema(), 101 primary_key=pk, 102 cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, 103 logger=logger, 104 namespace=stream.namespace, 105 cursor=cursor, 106 ), 107 stream, 108 cursor, 109 logger=logger, 110 slice_logger=source._slice_logger, 111 )
Create a ConcurrentStream from a FileBasedStream object.
133 @property 134 def cursor_field(self) -> Union[str, List[str]]: 135 if self._abstract_stream.cursor_field is None: 136 return [] 137 else: 138 return self._abstract_stream.cursor_field
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
144 @property 145 def supports_incremental(self) -> bool: 146 return self._legacy_stream.supports_incremental
Returns
True if this stream supports incrementally reading data
Return the underlying stream facade object.
182 def read( 183 self, 184 configured_stream: ConfiguredAirbyteStream, 185 logger: logging.Logger, 186 slice_logger: SliceLogger, 187 stream_state: MutableMapping[str, Any], 188 state_manager: ConnectorStateManager, 189 internal_config: InternalConfig, 190 ) -> Iterable[StreamData]: 191 yield from self._read_records()
Inherited Members
- airbyte_cdk.sources.file_based.stream.abstract_file_based_stream.AbstractFileBasedStream
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- primary_key
- list_files
- get_files
- read_records
- read_records_from_slice
- stream_slices
- compute_slices
- get_json_schema
- infer_schema
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
224class FileBasedStreamPartition(Partition): 225 def __init__( 226 self, 227 stream: AbstractFileBasedStream, 228 _slice: Optional[Mapping[str, Any]], 229 message_repository: MessageRepository, 230 sync_mode: SyncMode, 231 cursor_field: Optional[List[str]], 232 state: Optional[MutableMapping[str, Any]], 233 ): 234 self._stream = stream 235 self._slice = _slice 236 self._message_repository = message_repository 237 self._sync_mode = sync_mode 238 self._cursor_field = cursor_field 239 self._state = state 240 241 def read(self) -> Iterable[Record]: 242 try: 243 for record_data in self._stream.read_records( 244 cursor_field=self._cursor_field, 245 sync_mode=SyncMode.full_refresh, 246 stream_slice=copy.deepcopy(self._slice), 247 stream_state=self._state, 248 ): 249 if isinstance(record_data, Mapping): 250 data_to_return = dict(record_data) 251 self._stream.transformer.transform( 252 data_to_return, self._stream.get_json_schema() 253 ) 254 yield Record(data=data_to_return, stream_name=self.stream_name()) 255 elif ( 256 isinstance(record_data, AirbyteMessage) 257 and record_data.type == Type.RECORD 258 and record_data.record is not None 259 ): 260 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 261 record_message_data = record_data.record.data 262 if not record_message_data: 263 raise ExceptionWithDisplayMessage("A record without data was found") 264 else: 265 yield Record( 266 data=record_message_data, 267 stream_name=self.stream_name(), 268 file_reference=record_data.record.file_reference, 269 ) 270 else: 271 self._message_repository.emit_message(record_data) 272 except Exception as e: 273 display_message = self._stream.get_error_display_message(e) 274 if display_message: 275 raise ExceptionWithDisplayMessage(display_message) from e 276 else: 277 raise e 278 279 def to_slice(self) -> Optional[Mapping[str, Any]]: 280 if self._slice is None: 281 return None 282 assert len(self._slice["files"]) == 1, ( 283 f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 284 ) 285 file = self._slice["files"][0] 286 return {"files": [file]} 287 288 def __hash__(self) -> int: 289 if self._slice: 290 # Convert the slice to a string so that it can be hashed 291 if len(self._slice["files"]) != 1: 292 raise ValueError( 293 f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support." 294 ) 295 else: 296 s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}" 297 return hash((self._stream.name, s)) 298 else: 299 return hash(self._stream.name) 300 301 def stream_name(self) -> str: 302 return self._stream.name 303 304 def __repr__(self) -> str: 305 return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"
A partition is responsible for reading a specific set of data from a source.
225 def __init__( 226 self, 227 stream: AbstractFileBasedStream, 228 _slice: Optional[Mapping[str, Any]], 229 message_repository: MessageRepository, 230 sync_mode: SyncMode, 231 cursor_field: Optional[List[str]], 232 state: Optional[MutableMapping[str, Any]], 233 ): 234 self._stream = stream 235 self._slice = _slice 236 self._message_repository = message_repository 237 self._sync_mode = sync_mode 238 self._cursor_field = cursor_field 239 self._state = state
241 def read(self) -> Iterable[Record]: 242 try: 243 for record_data in self._stream.read_records( 244 cursor_field=self._cursor_field, 245 sync_mode=SyncMode.full_refresh, 246 stream_slice=copy.deepcopy(self._slice), 247 stream_state=self._state, 248 ): 249 if isinstance(record_data, Mapping): 250 data_to_return = dict(record_data) 251 self._stream.transformer.transform( 252 data_to_return, self._stream.get_json_schema() 253 ) 254 yield Record(data=data_to_return, stream_name=self.stream_name()) 255 elif ( 256 isinstance(record_data, AirbyteMessage) 257 and record_data.type == Type.RECORD 258 and record_data.record is not None 259 ): 260 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 261 record_message_data = record_data.record.data 262 if not record_message_data: 263 raise ExceptionWithDisplayMessage("A record without data was found") 264 else: 265 yield Record( 266 data=record_message_data, 267 stream_name=self.stream_name(), 268 file_reference=record_data.record.file_reference, 269 ) 270 else: 271 self._message_repository.emit_message(record_data) 272 except Exception as e: 273 display_message = self._stream.get_error_display_message(e) 274 if display_message: 275 raise ExceptionWithDisplayMessage(display_message) from e 276 else: 277 raise e
Reads the data from the partition.
Returns
An iterable of records.
279 def to_slice(self) -> Optional[Mapping[str, Any]]: 280 if self._slice is None: 281 return None 282 assert len(self._slice["files"]) == 1, ( 283 f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 284 ) 285 file = self._slice["files"][0] 286 return {"files": [file]}
Converts the partition to a slice that can be serialized and deserialized.
Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested
values (example)
Returns
A mapping representing a slice
308class FileBasedStreamPartitionGenerator(PartitionGenerator): 309 def __init__( 310 self, 311 stream: AbstractFileBasedStream, 312 message_repository: MessageRepository, 313 sync_mode: SyncMode, 314 cursor_field: Optional[List[str]], 315 state: Optional[MutableMapping[str, Any]], 316 cursor: "AbstractConcurrentFileBasedCursor", 317 ): 318 self._stream = stream 319 self._message_repository = message_repository 320 self._sync_mode = sync_mode 321 self._cursor_field = cursor_field 322 self._state = state 323 self._cursor = cursor 324 325 def generate(self) -> Iterable[FileBasedStreamPartition]: 326 pending_partitions = [] 327 for _slice in self._stream.stream_slices( 328 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 329 ): 330 if _slice is not None: 331 for file in _slice.get("files", []): 332 pending_partitions.append( 333 FileBasedStreamPartition( 334 self._stream, 335 {"files": [copy.deepcopy(file)]}, 336 self._message_repository, 337 self._sync_mode, 338 self._cursor_field, 339 self._state, 340 ) 341 ) 342 self._cursor.set_pending_partitions(pending_partitions) 343 yield from pending_partitions
Helper class that provides a standard way to create an ABC using inheritance.
309 def __init__( 310 self, 311 stream: AbstractFileBasedStream, 312 message_repository: MessageRepository, 313 sync_mode: SyncMode, 314 cursor_field: Optional[List[str]], 315 state: Optional[MutableMapping[str, Any]], 316 cursor: "AbstractConcurrentFileBasedCursor", 317 ): 318 self._stream = stream 319 self._message_repository = message_repository 320 self._sync_mode = sync_mode 321 self._cursor_field = cursor_field 322 self._state = state 323 self._cursor = cursor
325 def generate(self) -> Iterable[FileBasedStreamPartition]: 326 pending_partitions = [] 327 for _slice in self._stream.stream_slices( 328 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 329 ): 330 if _slice is not None: 331 for file in _slice.get("files", []): 332 pending_partitions.append( 333 FileBasedStreamPartition( 334 self._stream, 335 {"files": [copy.deepcopy(file)]}, 336 self._message_repository, 337 self._sync_mode, 338 self._cursor_field, 339 self._state, 340 ) 341 ) 342 self._cursor.set_pending_partitions(pending_partitions) 343 yield from pending_partitions
Generates partitions for a given sync mode.
Returns
An iterable of partitions