airbyte_cdk.sources.file_based.stream.concurrent.adapters
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import logging 7from functools import lru_cache 8from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union 9 10from typing_extensions import deprecated 11 12from airbyte_cdk.models import ( 13 AirbyteLogMessage, 14 AirbyteMessage, 15 ConfiguredAirbyteStream, 16 Level, 17 SyncMode, 18 Type, 19) 20from airbyte_cdk.sources import AbstractSource 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 22from airbyte_cdk.sources.file_based.availability_strategy import ( 23 AbstractFileBasedAvailabilityStrategy, 24) 25from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType 26from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser 27from airbyte_cdk.sources.file_based.remote_file import RemoteFile 28from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream 29from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor 30from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor 31from airbyte_cdk.sources.file_based.types import StreamSlice 32from airbyte_cdk.sources.message import MessageRepository 33from airbyte_cdk.sources.source import ExperimentalClassWarning 34from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 35from airbyte_cdk.sources.streams.concurrent.cursor import CursorField 36from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 37from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage 38from airbyte_cdk.sources.streams.concurrent.helpers import ( 39 get_cursor_field_from_stream, 40 get_primary_key_from_stream, 41) 42from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 43from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 44from airbyte_cdk.sources.streams.core import StreamData 45from airbyte_cdk.sources.types import Record 46from airbyte_cdk.sources.utils.schema_helpers import InternalConfig 47from airbyte_cdk.sources.utils.slice_logger import SliceLogger 48 49if TYPE_CHECKING: 50 from airbyte_cdk.sources.file_based.stream.concurrent.cursor import ( 51 AbstractConcurrentFileBasedCursor, 52 ) 53 54""" 55This module contains adapters to help enabling concurrency on File-based Stream objects without needing to migrate to AbstractStream 56""" 57 58 59@deprecated( 60 "This class is experimental. Use at your own risk.", 61 category=ExperimentalClassWarning, 62) 63class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream): 64 @classmethod 65 def create_from_stream( 66 cls, 67 stream: AbstractFileBasedStream, 68 source: AbstractSource, 69 logger: logging.Logger, 70 state: Optional[MutableMapping[str, Any]], 71 cursor: "AbstractConcurrentFileBasedCursor", 72 ) -> "FileBasedStreamFacade": 73 """ 74 Create a ConcurrentStream from a FileBasedStream object. 75 """ 76 pk = get_primary_key_from_stream(stream.primary_key) 77 cursor_field = get_cursor_field_from_stream(stream) 78 stream._cursor = cursor 79 80 if not source.message_repository: 81 raise ValueError( 82 "A message repository is required to emit non-record messages. Please set the message repository on the source." 83 ) 84 85 message_repository = source.message_repository 86 return FileBasedStreamFacade( 87 DefaultStream( 88 partition_generator=FileBasedStreamPartitionGenerator( 89 stream, 90 message_repository, 91 SyncMode.full_refresh 92 if isinstance(cursor, FileBasedFinalStateCursor) 93 else SyncMode.incremental, 94 [cursor_field] if cursor_field is not None else None, 95 state, 96 cursor, 97 ), 98 name=stream.name, 99 json_schema=stream.get_json_schema(), 100 primary_key=pk, 101 cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, 102 logger=logger, 103 namespace=stream.namespace, 104 cursor=cursor, 105 ), 106 stream, 107 cursor, 108 logger=logger, 109 slice_logger=source._slice_logger, 110 ) 111 112 def __init__( 113 self, 114 stream: DefaultStream, 115 legacy_stream: AbstractFileBasedStream, 116 cursor: AbstractFileBasedCursor, 117 slice_logger: SliceLogger, 118 logger: logging.Logger, 119 ): 120 """ 121 :param stream: The underlying AbstractStream 122 """ 123 self._abstract_stream = stream 124 self._legacy_stream = legacy_stream 125 self._cursor = cursor 126 self._slice_logger = slice_logger 127 self._logger = logger 128 self.catalog_schema = legacy_stream.catalog_schema 129 self.config = legacy_stream.config 130 self.validation_policy = legacy_stream.validation_policy 131 132 @property 133 def cursor_field(self) -> Union[str, List[str]]: 134 if self._abstract_stream.cursor_field is None: 135 return [] 136 else: 137 return self._abstract_stream.cursor_field 138 139 @property 140 def name(self) -> str: 141 return self._abstract_stream.name 142 143 @property 144 def supports_incremental(self) -> bool: 145 return self._legacy_stream.supports_incremental 146 147 @property 148 @deprecated("Deprecated as of CDK version 3.7.0.") 149 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 150 return self._legacy_stream.availability_strategy 151 152 @lru_cache(maxsize=None) 153 def get_json_schema(self) -> Mapping[str, Any]: 154 return self._abstract_stream.get_json_schema() 155 156 @property 157 def primary_key(self) -> PrimaryKeyType: 158 return self._legacy_stream.primary_key 159 160 def get_parser(self) -> FileTypeParser: 161 return self._legacy_stream.get_parser() 162 163 def get_files(self) -> Iterable[RemoteFile]: 164 return self._legacy_stream.get_files() 165 166 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]: 167 yield from self._legacy_stream.read_records_from_slice(stream_slice) # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage 168 169 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 170 return self._legacy_stream.compute_slices() 171 172 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 173 return self._legacy_stream.infer_schema(files) 174 175 def get_underlying_stream(self) -> DefaultStream: 176 return self._abstract_stream 177 178 def read( 179 self, 180 configured_stream: ConfiguredAirbyteStream, 181 logger: logging.Logger, 182 slice_logger: SliceLogger, 183 stream_state: MutableMapping[str, Any], 184 state_manager: ConnectorStateManager, 185 internal_config: InternalConfig, 186 ) -> Iterable[StreamData]: 187 yield from self._read_records() 188 189 def read_records( 190 self, 191 sync_mode: SyncMode, 192 cursor_field: Optional[List[str]] = None, 193 stream_slice: Optional[Mapping[str, Any]] = None, 194 stream_state: Optional[Mapping[str, Any]] = None, 195 ) -> Iterable[StreamData]: 196 try: 197 yield from self._read_records() 198 except Exception as exc: 199 if hasattr(self._cursor, "state"): 200 state = str(self._cursor.state) 201 else: 202 # This shouldn't happen if the ConcurrentCursor was used 203 state = "unknown; no state attribute was available on the cursor" 204 yield AirbyteMessage( 205 type=Type.LOG, 206 log=AirbyteLogMessage( 207 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 208 ), 209 ) 210 raise exc 211 212 def _read_records(self) -> Iterable[StreamData]: 213 for partition in self._abstract_stream.generate_partitions(): 214 if self._slice_logger.should_log_slice_message(self._logger): 215 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 216 for record in partition.read(): 217 yield record.data 218 219 220class FileBasedStreamPartition(Partition): 221 def __init__( 222 self, 223 stream: AbstractFileBasedStream, 224 _slice: Optional[Mapping[str, Any]], 225 message_repository: MessageRepository, 226 sync_mode: SyncMode, 227 cursor_field: Optional[List[str]], 228 state: Optional[MutableMapping[str, Any]], 229 ): 230 self._stream = stream 231 self._slice = _slice 232 self._message_repository = message_repository 233 self._sync_mode = sync_mode 234 self._cursor_field = cursor_field 235 self._state = state 236 237 def read(self) -> Iterable[Record]: 238 try: 239 for record_data in self._stream.read_records( 240 cursor_field=self._cursor_field, 241 sync_mode=SyncMode.full_refresh, 242 stream_slice=copy.deepcopy(self._slice), 243 stream_state=self._state, 244 ): 245 if isinstance(record_data, Mapping): 246 data_to_return = dict(record_data) 247 self._stream.transformer.transform( 248 data_to_return, self._stream.get_json_schema() 249 ) 250 yield Record(data=data_to_return, stream_name=self.stream_name()) 251 elif ( 252 isinstance(record_data, AirbyteMessage) 253 and record_data.type == Type.RECORD 254 and record_data.record is not None 255 ): 256 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 257 record_message_data = record_data.record.data 258 if not record_message_data: 259 raise ExceptionWithDisplayMessage("A record without data was found") 260 else: 261 yield Record( 262 data=record_message_data, 263 stream_name=self.stream_name(), 264 file_reference=record_data.record.file_reference, 265 ) 266 else: 267 self._message_repository.emit_message(record_data) 268 except Exception as e: 269 display_message = self._stream.get_error_display_message(e) 270 if display_message: 271 raise ExceptionWithDisplayMessage(display_message) from e 272 else: 273 raise e 274 275 def to_slice(self) -> Optional[Mapping[str, Any]]: 276 if self._slice is None: 277 return None 278 assert len(self._slice["files"]) == 1, ( 279 f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 280 ) 281 file = self._slice["files"][0] 282 return {"files": [file]} 283 284 def __hash__(self) -> int: 285 if self._slice: 286 # Convert the slice to a string so that it can be hashed 287 if len(self._slice["files"]) != 1: 288 raise ValueError( 289 f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support." 290 ) 291 else: 292 s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}" 293 return hash((self._stream.name, s)) 294 else: 295 return hash(self._stream.name) 296 297 def stream_name(self) -> str: 298 return self._stream.name 299 300 def __repr__(self) -> str: 301 return f"FileBasedStreamPartition({self._stream.name}, {self._slice})" 302 303 304class FileBasedStreamPartitionGenerator(PartitionGenerator): 305 def __init__( 306 self, 307 stream: AbstractFileBasedStream, 308 message_repository: MessageRepository, 309 sync_mode: SyncMode, 310 cursor_field: Optional[List[str]], 311 state: Optional[MutableMapping[str, Any]], 312 cursor: "AbstractConcurrentFileBasedCursor", 313 ): 314 self._stream = stream 315 self._message_repository = message_repository 316 self._sync_mode = sync_mode 317 self._cursor_field = cursor_field 318 self._state = state 319 self._cursor = cursor 320 321 def generate(self) -> Iterable[FileBasedStreamPartition]: 322 pending_partitions = [] 323 for _slice in self._stream.stream_slices( 324 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 325 ): 326 if _slice is not None: 327 for file in _slice.get("files", []): 328 pending_partitions.append( 329 FileBasedStreamPartition( 330 self._stream, 331 {"files": [copy.deepcopy(file)]}, 332 self._message_repository, 333 self._sync_mode, 334 self._cursor_field, 335 self._state, 336 ) 337 ) 338 self._cursor.set_pending_partitions(pending_partitions) 339 yield from pending_partitions
60@deprecated( 61 "This class is experimental. Use at your own risk.", 62 category=ExperimentalClassWarning, 63) 64class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream): 65 @classmethod 66 def create_from_stream( 67 cls, 68 stream: AbstractFileBasedStream, 69 source: AbstractSource, 70 logger: logging.Logger, 71 state: Optional[MutableMapping[str, Any]], 72 cursor: "AbstractConcurrentFileBasedCursor", 73 ) -> "FileBasedStreamFacade": 74 """ 75 Create a ConcurrentStream from a FileBasedStream object. 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 stream._cursor = cursor 80 81 if not source.message_repository: 82 raise ValueError( 83 "A message repository is required to emit non-record messages. Please set the message repository on the source." 84 ) 85 86 message_repository = source.message_repository 87 return FileBasedStreamFacade( 88 DefaultStream( 89 partition_generator=FileBasedStreamPartitionGenerator( 90 stream, 91 message_repository, 92 SyncMode.full_refresh 93 if isinstance(cursor, FileBasedFinalStateCursor) 94 else SyncMode.incremental, 95 [cursor_field] if cursor_field is not None else None, 96 state, 97 cursor, 98 ), 99 name=stream.name, 100 json_schema=stream.get_json_schema(), 101 primary_key=pk, 102 cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, 103 logger=logger, 104 namespace=stream.namespace, 105 cursor=cursor, 106 ), 107 stream, 108 cursor, 109 logger=logger, 110 slice_logger=source._slice_logger, 111 ) 112 113 def __init__( 114 self, 115 stream: DefaultStream, 116 legacy_stream: AbstractFileBasedStream, 117 cursor: AbstractFileBasedCursor, 118 slice_logger: SliceLogger, 119 logger: logging.Logger, 120 ): 121 """ 122 :param stream: The underlying AbstractStream 123 """ 124 self._abstract_stream = stream 125 self._legacy_stream = legacy_stream 126 self._cursor = cursor 127 self._slice_logger = slice_logger 128 self._logger = logger 129 self.catalog_schema = legacy_stream.catalog_schema 130 self.config = legacy_stream.config 131 self.validation_policy = legacy_stream.validation_policy 132 133 @property 134 def cursor_field(self) -> Union[str, List[str]]: 135 if self._abstract_stream.cursor_field is None: 136 return [] 137 else: 138 return self._abstract_stream.cursor_field 139 140 @property 141 def name(self) -> str: 142 return self._abstract_stream.name 143 144 @property 145 def supports_incremental(self) -> bool: 146 return self._legacy_stream.supports_incremental 147 148 @property 149 @deprecated("Deprecated as of CDK version 3.7.0.") 150 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 151 return self._legacy_stream.availability_strategy 152 153 @lru_cache(maxsize=None) 154 def get_json_schema(self) -> Mapping[str, Any]: 155 return self._abstract_stream.get_json_schema() 156 157 @property 158 def primary_key(self) -> PrimaryKeyType: 159 return self._legacy_stream.primary_key 160 161 def get_parser(self) -> FileTypeParser: 162 return self._legacy_stream.get_parser() 163 164 def get_files(self) -> Iterable[RemoteFile]: 165 return self._legacy_stream.get_files() 166 167 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]: 168 yield from self._legacy_stream.read_records_from_slice(stream_slice) # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage 169 170 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 171 return self._legacy_stream.compute_slices() 172 173 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 174 return self._legacy_stream.infer_schema(files) 175 176 def get_underlying_stream(self) -> DefaultStream: 177 return self._abstract_stream 178 179 def read( 180 self, 181 configured_stream: ConfiguredAirbyteStream, 182 logger: logging.Logger, 183 slice_logger: SliceLogger, 184 stream_state: MutableMapping[str, Any], 185 state_manager: ConnectorStateManager, 186 internal_config: InternalConfig, 187 ) -> Iterable[StreamData]: 188 yield from self._read_records() 189 190 def read_records( 191 self, 192 sync_mode: SyncMode, 193 cursor_field: Optional[List[str]] = None, 194 stream_slice: Optional[Mapping[str, Any]] = None, 195 stream_state: Optional[Mapping[str, Any]] = None, 196 ) -> Iterable[StreamData]: 197 try: 198 yield from self._read_records() 199 except Exception as exc: 200 if hasattr(self._cursor, "state"): 201 state = str(self._cursor.state) 202 else: 203 # This shouldn't happen if the ConcurrentCursor was used 204 state = "unknown; no state attribute was available on the cursor" 205 yield AirbyteMessage( 206 type=Type.LOG, 207 log=AirbyteLogMessage( 208 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 209 ), 210 ) 211 raise exc 212 213 def _read_records(self) -> Iterable[StreamData]: 214 for partition in self._abstract_stream.generate_partitions(): 215 if self._slice_logger.should_log_slice_message(self._logger): 216 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 217 for record in partition.read(): 218 yield record.data
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]:
def __getitem__(self, key: KT) -> VT:
...
# Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
try:
return mapping[key]
except KeyError:
return default
65 @classmethod 66 def create_from_stream( 67 cls, 68 stream: AbstractFileBasedStream, 69 source: AbstractSource, 70 logger: logging.Logger, 71 state: Optional[MutableMapping[str, Any]], 72 cursor: "AbstractConcurrentFileBasedCursor", 73 ) -> "FileBasedStreamFacade": 74 """ 75 Create a ConcurrentStream from a FileBasedStream object. 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 stream._cursor = cursor 80 81 if not source.message_repository: 82 raise ValueError( 83 "A message repository is required to emit non-record messages. Please set the message repository on the source." 84 ) 85 86 message_repository = source.message_repository 87 return FileBasedStreamFacade( 88 DefaultStream( 89 partition_generator=FileBasedStreamPartitionGenerator( 90 stream, 91 message_repository, 92 SyncMode.full_refresh 93 if isinstance(cursor, FileBasedFinalStateCursor) 94 else SyncMode.incremental, 95 [cursor_field] if cursor_field is not None else None, 96 state, 97 cursor, 98 ), 99 name=stream.name, 100 json_schema=stream.get_json_schema(), 101 primary_key=pk, 102 cursor_field=CursorField(cursor_field_key=cursor_field) if cursor_field else None, 103 logger=logger, 104 namespace=stream.namespace, 105 cursor=cursor, 106 ), 107 stream, 108 cursor, 109 logger=logger, 110 slice_logger=source._slice_logger, 111 )
Create a ConcurrentStream from a FileBasedStream object.
133 @property 134 def cursor_field(self) -> Union[str, List[str]]: 135 if self._abstract_stream.cursor_field is None: 136 return [] 137 else: 138 return self._abstract_stream.cursor_field
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
144 @property 145 def supports_incremental(self) -> bool: 146 return self._legacy_stream.supports_incremental
Returns
True if this stream supports incrementally reading data
Return the underlying stream facade object.
179 def read( 180 self, 181 configured_stream: ConfiguredAirbyteStream, 182 logger: logging.Logger, 183 slice_logger: SliceLogger, 184 stream_state: MutableMapping[str, Any], 185 state_manager: ConnectorStateManager, 186 internal_config: InternalConfig, 187 ) -> Iterable[StreamData]: 188 yield from self._read_records()
Inherited Members
- airbyte_cdk.sources.file_based.stream.abstract_file_based_stream.AbstractFileBasedStream
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- primary_key
- list_files
- get_files
- read_records
- read_records_from_slice
- stream_slices
- compute_slices
- get_json_schema
- infer_schema
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
221class FileBasedStreamPartition(Partition): 222 def __init__( 223 self, 224 stream: AbstractFileBasedStream, 225 _slice: Optional[Mapping[str, Any]], 226 message_repository: MessageRepository, 227 sync_mode: SyncMode, 228 cursor_field: Optional[List[str]], 229 state: Optional[MutableMapping[str, Any]], 230 ): 231 self._stream = stream 232 self._slice = _slice 233 self._message_repository = message_repository 234 self._sync_mode = sync_mode 235 self._cursor_field = cursor_field 236 self._state = state 237 238 def read(self) -> Iterable[Record]: 239 try: 240 for record_data in self._stream.read_records( 241 cursor_field=self._cursor_field, 242 sync_mode=SyncMode.full_refresh, 243 stream_slice=copy.deepcopy(self._slice), 244 stream_state=self._state, 245 ): 246 if isinstance(record_data, Mapping): 247 data_to_return = dict(record_data) 248 self._stream.transformer.transform( 249 data_to_return, self._stream.get_json_schema() 250 ) 251 yield Record(data=data_to_return, stream_name=self.stream_name()) 252 elif ( 253 isinstance(record_data, AirbyteMessage) 254 and record_data.type == Type.RECORD 255 and record_data.record is not None 256 ): 257 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 258 record_message_data = record_data.record.data 259 if not record_message_data: 260 raise ExceptionWithDisplayMessage("A record without data was found") 261 else: 262 yield Record( 263 data=record_message_data, 264 stream_name=self.stream_name(), 265 file_reference=record_data.record.file_reference, 266 ) 267 else: 268 self._message_repository.emit_message(record_data) 269 except Exception as e: 270 display_message = self._stream.get_error_display_message(e) 271 if display_message: 272 raise ExceptionWithDisplayMessage(display_message) from e 273 else: 274 raise e 275 276 def to_slice(self) -> Optional[Mapping[str, Any]]: 277 if self._slice is None: 278 return None 279 assert len(self._slice["files"]) == 1, ( 280 f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 281 ) 282 file = self._slice["files"][0] 283 return {"files": [file]} 284 285 def __hash__(self) -> int: 286 if self._slice: 287 # Convert the slice to a string so that it can be hashed 288 if len(self._slice["files"]) != 1: 289 raise ValueError( 290 f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support." 291 ) 292 else: 293 s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}" 294 return hash((self._stream.name, s)) 295 else: 296 return hash(self._stream.name) 297 298 def stream_name(self) -> str: 299 return self._stream.name 300 301 def __repr__(self) -> str: 302 return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"
A partition is responsible for reading a specific set of data from a source.
222 def __init__( 223 self, 224 stream: AbstractFileBasedStream, 225 _slice: Optional[Mapping[str, Any]], 226 message_repository: MessageRepository, 227 sync_mode: SyncMode, 228 cursor_field: Optional[List[str]], 229 state: Optional[MutableMapping[str, Any]], 230 ): 231 self._stream = stream 232 self._slice = _slice 233 self._message_repository = message_repository 234 self._sync_mode = sync_mode 235 self._cursor_field = cursor_field 236 self._state = state
238 def read(self) -> Iterable[Record]: 239 try: 240 for record_data in self._stream.read_records( 241 cursor_field=self._cursor_field, 242 sync_mode=SyncMode.full_refresh, 243 stream_slice=copy.deepcopy(self._slice), 244 stream_state=self._state, 245 ): 246 if isinstance(record_data, Mapping): 247 data_to_return = dict(record_data) 248 self._stream.transformer.transform( 249 data_to_return, self._stream.get_json_schema() 250 ) 251 yield Record(data=data_to_return, stream_name=self.stream_name()) 252 elif ( 253 isinstance(record_data, AirbyteMessage) 254 and record_data.type == Type.RECORD 255 and record_data.record is not None 256 ): 257 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 258 record_message_data = record_data.record.data 259 if not record_message_data: 260 raise ExceptionWithDisplayMessage("A record without data was found") 261 else: 262 yield Record( 263 data=record_message_data, 264 stream_name=self.stream_name(), 265 file_reference=record_data.record.file_reference, 266 ) 267 else: 268 self._message_repository.emit_message(record_data) 269 except Exception as e: 270 display_message = self._stream.get_error_display_message(e) 271 if display_message: 272 raise ExceptionWithDisplayMessage(display_message) from e 273 else: 274 raise e
Reads the data from the partition.
Returns
An iterable of records.
276 def to_slice(self) -> Optional[Mapping[str, Any]]: 277 if self._slice is None: 278 return None 279 assert len(self._slice["files"]) == 1, ( 280 f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 281 ) 282 file = self._slice["files"][0] 283 return {"files": [file]}
Converts the partition to a slice that can be serialized and deserialized.
Note: it would have been interesting to have a type of Mapping[str, Comparable] to simplify typing but some slices can have nested
values (example)
Returns
A mapping representing a slice
305class FileBasedStreamPartitionGenerator(PartitionGenerator): 306 def __init__( 307 self, 308 stream: AbstractFileBasedStream, 309 message_repository: MessageRepository, 310 sync_mode: SyncMode, 311 cursor_field: Optional[List[str]], 312 state: Optional[MutableMapping[str, Any]], 313 cursor: "AbstractConcurrentFileBasedCursor", 314 ): 315 self._stream = stream 316 self._message_repository = message_repository 317 self._sync_mode = sync_mode 318 self._cursor_field = cursor_field 319 self._state = state 320 self._cursor = cursor 321 322 def generate(self) -> Iterable[FileBasedStreamPartition]: 323 pending_partitions = [] 324 for _slice in self._stream.stream_slices( 325 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 326 ): 327 if _slice is not None: 328 for file in _slice.get("files", []): 329 pending_partitions.append( 330 FileBasedStreamPartition( 331 self._stream, 332 {"files": [copy.deepcopy(file)]}, 333 self._message_repository, 334 self._sync_mode, 335 self._cursor_field, 336 self._state, 337 ) 338 ) 339 self._cursor.set_pending_partitions(pending_partitions) 340 yield from pending_partitions
Helper class that provides a standard way to create an ABC using inheritance.
306 def __init__( 307 self, 308 stream: AbstractFileBasedStream, 309 message_repository: MessageRepository, 310 sync_mode: SyncMode, 311 cursor_field: Optional[List[str]], 312 state: Optional[MutableMapping[str, Any]], 313 cursor: "AbstractConcurrentFileBasedCursor", 314 ): 315 self._stream = stream 316 self._message_repository = message_repository 317 self._sync_mode = sync_mode 318 self._cursor_field = cursor_field 319 self._state = state 320 self._cursor = cursor
322 def generate(self) -> Iterable[FileBasedStreamPartition]: 323 pending_partitions = [] 324 for _slice in self._stream.stream_slices( 325 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 326 ): 327 if _slice is not None: 328 for file in _slice.get("files", []): 329 pending_partitions.append( 330 FileBasedStreamPartition( 331 self._stream, 332 {"files": [copy.deepcopy(file)]}, 333 self._message_repository, 334 self._sync_mode, 335 self._cursor_field, 336 self._state, 337 ) 338 ) 339 self._cursor.set_pending_partitions(pending_partitions) 340 yield from pending_partitions
Generates partitions for a given sync mode.
Returns
An iterable of partitions