airbyte_cdk.sources.file_based.stream.concurrent.adapters
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import logging 7from functools import lru_cache 8from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union 9 10from typing_extensions import deprecated 11 12from airbyte_cdk.models import ( 13 AirbyteLogMessage, 14 AirbyteMessage, 15 ConfiguredAirbyteStream, 16 Level, 17 SyncMode, 18 Type, 19) 20from airbyte_cdk.sources import AbstractSource 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 22from airbyte_cdk.sources.file_based.availability_strategy import ( 23 AbstractFileBasedAvailabilityStrategy, 24) 25from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType 26from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser 27from airbyte_cdk.sources.file_based.remote_file import RemoteFile 28from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream 29from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor 30from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor 31from airbyte_cdk.sources.file_based.types import StreamSlice 32from airbyte_cdk.sources.message import MessageRepository 33from airbyte_cdk.sources.source import ExperimentalClassWarning 34from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 35from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 36from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage 37from airbyte_cdk.sources.streams.concurrent.helpers import ( 38 get_cursor_field_from_stream, 39 get_primary_key_from_stream, 40) 41from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 42from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 43from airbyte_cdk.sources.streams.core import StreamData 44from airbyte_cdk.sources.types import Record 45from airbyte_cdk.sources.utils.schema_helpers import InternalConfig 46from airbyte_cdk.sources.utils.slice_logger import SliceLogger 47 48if TYPE_CHECKING: 49 from airbyte_cdk.sources.file_based.stream.concurrent.cursor import ( 50 AbstractConcurrentFileBasedCursor, 51 ) 52 53""" 54This module contains adapters to help enabling concurrency on File-based Stream objects without needing to migrate to AbstractStream 55""" 56 57 58@deprecated( 59 "This class is experimental. Use at your own risk.", 60 category=ExperimentalClassWarning, 61) 62class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream): 63 @classmethod 64 def create_from_stream( 65 cls, 66 stream: AbstractFileBasedStream, 67 source: AbstractSource, 68 logger: logging.Logger, 69 state: Optional[MutableMapping[str, Any]], 70 cursor: "AbstractConcurrentFileBasedCursor", 71 ) -> "FileBasedStreamFacade": 72 """ 73 Create a ConcurrentStream from a FileBasedStream object. 74 """ 75 pk = get_primary_key_from_stream(stream.primary_key) 76 cursor_field = get_cursor_field_from_stream(stream) 77 stream._cursor = cursor 78 79 if not source.message_repository: 80 raise ValueError( 81 "A message repository is required to emit non-record messages. Please set the message repository on the source." 82 ) 83 84 message_repository = source.message_repository 85 return FileBasedStreamFacade( 86 DefaultStream( 87 partition_generator=FileBasedStreamPartitionGenerator( 88 stream, 89 message_repository, 90 SyncMode.full_refresh 91 if isinstance(cursor, FileBasedFinalStateCursor) 92 else SyncMode.incremental, 93 [cursor_field] if cursor_field is not None else None, 94 state, 95 cursor, 96 ), 97 name=stream.name, 98 json_schema=stream.get_json_schema(), 99 primary_key=pk, 100 cursor_field=cursor_field, 101 logger=logger, 102 namespace=stream.namespace, 103 cursor=cursor, 104 ), 105 stream, 106 cursor, 107 logger=logger, 108 slice_logger=source._slice_logger, 109 ) 110 111 def __init__( 112 self, 113 stream: DefaultStream, 114 legacy_stream: AbstractFileBasedStream, 115 cursor: AbstractFileBasedCursor, 116 slice_logger: SliceLogger, 117 logger: logging.Logger, 118 ): 119 """ 120 :param stream: The underlying AbstractStream 121 """ 122 self._abstract_stream = stream 123 self._legacy_stream = legacy_stream 124 self._cursor = cursor 125 self._slice_logger = slice_logger 126 self._logger = logger 127 self.catalog_schema = legacy_stream.catalog_schema 128 self.config = legacy_stream.config 129 self.validation_policy = legacy_stream.validation_policy 130 131 @property 132 def cursor_field(self) -> Union[str, List[str]]: 133 if self._abstract_stream.cursor_field is None: 134 return [] 135 else: 136 return self._abstract_stream.cursor_field 137 138 @property 139 def name(self) -> str: 140 return self._abstract_stream.name 141 142 @property 143 def supports_incremental(self) -> bool: 144 return self._legacy_stream.supports_incremental 145 146 @property 147 @deprecated("Deprecated as of CDK version 3.7.0.") 148 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 149 return self._legacy_stream.availability_strategy 150 151 @lru_cache(maxsize=None) 152 def get_json_schema(self) -> Mapping[str, Any]: 153 return self._abstract_stream.get_json_schema() 154 155 @property 156 def primary_key(self) -> PrimaryKeyType: 157 return ( 158 self._legacy_stream.config.primary_key 159 or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config) 160 ) 161 162 def get_parser(self) -> FileTypeParser: 163 return self._legacy_stream.get_parser() 164 165 def get_files(self) -> Iterable[RemoteFile]: 166 return self._legacy_stream.get_files() 167 168 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]: 169 yield from self._legacy_stream.read_records_from_slice(stream_slice) # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage 170 171 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 172 return self._legacy_stream.compute_slices() 173 174 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 175 return self._legacy_stream.infer_schema(files) 176 177 def get_underlying_stream(self) -> DefaultStream: 178 return self._abstract_stream 179 180 def read( 181 self, 182 configured_stream: ConfiguredAirbyteStream, 183 logger: logging.Logger, 184 slice_logger: SliceLogger, 185 stream_state: MutableMapping[str, Any], 186 state_manager: ConnectorStateManager, 187 internal_config: InternalConfig, 188 ) -> Iterable[StreamData]: 189 yield from self._read_records() 190 191 def read_records( 192 self, 193 sync_mode: SyncMode, 194 cursor_field: Optional[List[str]] = None, 195 stream_slice: Optional[Mapping[str, Any]] = None, 196 stream_state: Optional[Mapping[str, Any]] = None, 197 ) -> Iterable[StreamData]: 198 try: 199 yield from self._read_records() 200 except Exception as exc: 201 if hasattr(self._cursor, "state"): 202 state = str(self._cursor.state) 203 else: 204 # This shouldn't happen if the ConcurrentCursor was used 205 state = "unknown; no state attribute was available on the cursor" 206 yield AirbyteMessage( 207 type=Type.LOG, 208 log=AirbyteLogMessage( 209 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 210 ), 211 ) 212 raise exc 213 214 def _read_records(self) -> Iterable[StreamData]: 215 for partition in self._abstract_stream.generate_partitions(): 216 if self._slice_logger.should_log_slice_message(self._logger): 217 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 218 for record in partition.read(): 219 yield record.data 220 221 222class FileBasedStreamPartition(Partition): 223 def __init__( 224 self, 225 stream: AbstractFileBasedStream, 226 _slice: Optional[Mapping[str, Any]], 227 message_repository: MessageRepository, 228 sync_mode: SyncMode, 229 cursor_field: Optional[List[str]], 230 state: Optional[MutableMapping[str, Any]], 231 ): 232 self._stream = stream 233 self._slice = _slice 234 self._message_repository = message_repository 235 self._sync_mode = sync_mode 236 self._cursor_field = cursor_field 237 self._state = state 238 239 def read(self) -> Iterable[Record]: 240 try: 241 for record_data in self._stream.read_records( 242 cursor_field=self._cursor_field, 243 sync_mode=SyncMode.full_refresh, 244 stream_slice=copy.deepcopy(self._slice), 245 stream_state=self._state, 246 ): 247 if isinstance(record_data, Mapping): 248 data_to_return = dict(record_data) 249 self._stream.transformer.transform( 250 data_to_return, self._stream.get_json_schema() 251 ) 252 yield Record(data=data_to_return, stream_name=self.stream_name()) 253 elif ( 254 isinstance(record_data, AirbyteMessage) 255 and record_data.type == Type.RECORD 256 and record_data.record is not None 257 ): 258 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 259 record_message_data = record_data.record.data 260 if not record_message_data: 261 raise ExceptionWithDisplayMessage("A record without data was found") 262 else: 263 yield Record( 264 data=record_message_data, 265 stream_name=self.stream_name(), 266 file_reference=record_data.record.file_reference, 267 ) 268 else: 269 self._message_repository.emit_message(record_data) 270 except Exception as e: 271 display_message = self._stream.get_error_display_message(e) 272 if display_message: 273 raise ExceptionWithDisplayMessage(display_message) from e 274 else: 275 raise e 276 277 def to_slice(self) -> Optional[Mapping[str, Any]]: 278 if self._slice is None: 279 return None 280 assert len(self._slice["files"]) == 1, ( 281 f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 282 ) 283 file = self._slice["files"][0] 284 return {"files": [file]} 285 286 def __hash__(self) -> int: 287 if self._slice: 288 # Convert the slice to a string so that it can be hashed 289 if len(self._slice["files"]) != 1: 290 raise ValueError( 291 f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support." 292 ) 293 else: 294 s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}" 295 return hash((self._stream.name, s)) 296 else: 297 return hash(self._stream.name) 298 299 def stream_name(self) -> str: 300 return self._stream.name 301 302 def __repr__(self) -> str: 303 return f"FileBasedStreamPartition({self._stream.name}, {self._slice})" 304 305 306class FileBasedStreamPartitionGenerator(PartitionGenerator): 307 def __init__( 308 self, 309 stream: AbstractFileBasedStream, 310 message_repository: MessageRepository, 311 sync_mode: SyncMode, 312 cursor_field: Optional[List[str]], 313 state: Optional[MutableMapping[str, Any]], 314 cursor: "AbstractConcurrentFileBasedCursor", 315 ): 316 self._stream = stream 317 self._message_repository = message_repository 318 self._sync_mode = sync_mode 319 self._cursor_field = cursor_field 320 self._state = state 321 self._cursor = cursor 322 323 def generate(self) -> Iterable[FileBasedStreamPartition]: 324 pending_partitions = [] 325 for _slice in self._stream.stream_slices( 326 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 327 ): 328 if _slice is not None: 329 for file in _slice.get("files", []): 330 pending_partitions.append( 331 FileBasedStreamPartition( 332 self._stream, 333 {"files": [copy.deepcopy(file)]}, 334 self._message_repository, 335 self._sync_mode, 336 self._cursor_field, 337 self._state, 338 ) 339 ) 340 self._cursor.set_pending_partitions(pending_partitions) 341 yield from pending_partitions
59@deprecated( 60 "This class is experimental. Use at your own risk.", 61 category=ExperimentalClassWarning, 62) 63class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream): 64 @classmethod 65 def create_from_stream( 66 cls, 67 stream: AbstractFileBasedStream, 68 source: AbstractSource, 69 logger: logging.Logger, 70 state: Optional[MutableMapping[str, Any]], 71 cursor: "AbstractConcurrentFileBasedCursor", 72 ) -> "FileBasedStreamFacade": 73 """ 74 Create a ConcurrentStream from a FileBasedStream object. 75 """ 76 pk = get_primary_key_from_stream(stream.primary_key) 77 cursor_field = get_cursor_field_from_stream(stream) 78 stream._cursor = cursor 79 80 if not source.message_repository: 81 raise ValueError( 82 "A message repository is required to emit non-record messages. Please set the message repository on the source." 83 ) 84 85 message_repository = source.message_repository 86 return FileBasedStreamFacade( 87 DefaultStream( 88 partition_generator=FileBasedStreamPartitionGenerator( 89 stream, 90 message_repository, 91 SyncMode.full_refresh 92 if isinstance(cursor, FileBasedFinalStateCursor) 93 else SyncMode.incremental, 94 [cursor_field] if cursor_field is not None else None, 95 state, 96 cursor, 97 ), 98 name=stream.name, 99 json_schema=stream.get_json_schema(), 100 primary_key=pk, 101 cursor_field=cursor_field, 102 logger=logger, 103 namespace=stream.namespace, 104 cursor=cursor, 105 ), 106 stream, 107 cursor, 108 logger=logger, 109 slice_logger=source._slice_logger, 110 ) 111 112 def __init__( 113 self, 114 stream: DefaultStream, 115 legacy_stream: AbstractFileBasedStream, 116 cursor: AbstractFileBasedCursor, 117 slice_logger: SliceLogger, 118 logger: logging.Logger, 119 ): 120 """ 121 :param stream: The underlying AbstractStream 122 """ 123 self._abstract_stream = stream 124 self._legacy_stream = legacy_stream 125 self._cursor = cursor 126 self._slice_logger = slice_logger 127 self._logger = logger 128 self.catalog_schema = legacy_stream.catalog_schema 129 self.config = legacy_stream.config 130 self.validation_policy = legacy_stream.validation_policy 131 132 @property 133 def cursor_field(self) -> Union[str, List[str]]: 134 if self._abstract_stream.cursor_field is None: 135 return [] 136 else: 137 return self._abstract_stream.cursor_field 138 139 @property 140 def name(self) -> str: 141 return self._abstract_stream.name 142 143 @property 144 def supports_incremental(self) -> bool: 145 return self._legacy_stream.supports_incremental 146 147 @property 148 @deprecated("Deprecated as of CDK version 3.7.0.") 149 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 150 return self._legacy_stream.availability_strategy 151 152 @lru_cache(maxsize=None) 153 def get_json_schema(self) -> Mapping[str, Any]: 154 return self._abstract_stream.get_json_schema() 155 156 @property 157 def primary_key(self) -> PrimaryKeyType: 158 return ( 159 self._legacy_stream.config.primary_key 160 or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config) 161 ) 162 163 def get_parser(self) -> FileTypeParser: 164 return self._legacy_stream.get_parser() 165 166 def get_files(self) -> Iterable[RemoteFile]: 167 return self._legacy_stream.get_files() 168 169 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]: 170 yield from self._legacy_stream.read_records_from_slice(stream_slice) # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage 171 172 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 173 return self._legacy_stream.compute_slices() 174 175 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 176 return self._legacy_stream.infer_schema(files) 177 178 def get_underlying_stream(self) -> DefaultStream: 179 return self._abstract_stream 180 181 def read( 182 self, 183 configured_stream: ConfiguredAirbyteStream, 184 logger: logging.Logger, 185 slice_logger: SliceLogger, 186 stream_state: MutableMapping[str, Any], 187 state_manager: ConnectorStateManager, 188 internal_config: InternalConfig, 189 ) -> Iterable[StreamData]: 190 yield from self._read_records() 191 192 def read_records( 193 self, 194 sync_mode: SyncMode, 195 cursor_field: Optional[List[str]] = None, 196 stream_slice: Optional[Mapping[str, Any]] = None, 197 stream_state: Optional[Mapping[str, Any]] = None, 198 ) -> Iterable[StreamData]: 199 try: 200 yield from self._read_records() 201 except Exception as exc: 202 if hasattr(self._cursor, "state"): 203 state = str(self._cursor.state) 204 else: 205 # This shouldn't happen if the ConcurrentCursor was used 206 state = "unknown; no state attribute was available on the cursor" 207 yield AirbyteMessage( 208 type=Type.LOG, 209 log=AirbyteLogMessage( 210 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 211 ), 212 ) 213 raise exc 214 215 def _read_records(self) -> Iterable[StreamData]: 216 for partition in self._abstract_stream.generate_partitions(): 217 if self._slice_logger.should_log_slice_message(self._logger): 218 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 219 for record in partition.read(): 220 yield record.data
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
64 @classmethod 65 def create_from_stream( 66 cls, 67 stream: AbstractFileBasedStream, 68 source: AbstractSource, 69 logger: logging.Logger, 70 state: Optional[MutableMapping[str, Any]], 71 cursor: "AbstractConcurrentFileBasedCursor", 72 ) -> "FileBasedStreamFacade": 73 """ 74 Create a ConcurrentStream from a FileBasedStream object. 75 """ 76 pk = get_primary_key_from_stream(stream.primary_key) 77 cursor_field = get_cursor_field_from_stream(stream) 78 stream._cursor = cursor 79 80 if not source.message_repository: 81 raise ValueError( 82 "A message repository is required to emit non-record messages. Please set the message repository on the source." 83 ) 84 85 message_repository = source.message_repository 86 return FileBasedStreamFacade( 87 DefaultStream( 88 partition_generator=FileBasedStreamPartitionGenerator( 89 stream, 90 message_repository, 91 SyncMode.full_refresh 92 if isinstance(cursor, FileBasedFinalStateCursor) 93 else SyncMode.incremental, 94 [cursor_field] if cursor_field is not None else None, 95 state, 96 cursor, 97 ), 98 name=stream.name, 99 json_schema=stream.get_json_schema(), 100 primary_key=pk, 101 cursor_field=cursor_field, 102 logger=logger, 103 namespace=stream.namespace, 104 cursor=cursor, 105 ), 106 stream, 107 cursor, 108 logger=logger, 109 slice_logger=source._slice_logger, 110 )
Create a ConcurrentStream from a FileBasedStream object.
132 @property 133 def cursor_field(self) -> Union[str, List[str]]: 134 if self._abstract_stream.cursor_field is None: 135 return [] 136 else: 137 return self._abstract_stream.cursor_field
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
143 @property 144 def supports_incremental(self) -> bool: 145 return self._legacy_stream.supports_incremental
Returns
True if this stream supports incrementally reading data
Return the underlying stream facade object.
181 def read( 182 self, 183 configured_stream: ConfiguredAirbyteStream, 184 logger: logging.Logger, 185 slice_logger: SliceLogger, 186 stream_state: MutableMapping[str, Any], 187 state_manager: ConnectorStateManager, 188 internal_config: InternalConfig, 189 ) -> Iterable[StreamData]: 190 yield from self._read_records()
Inherited Members
- airbyte_cdk.sources.file_based.stream.abstract_file_based_stream.AbstractFileBasedStream
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- primary_key
- list_files
- get_files
- read_records
- read_records_from_slice
- stream_slices
- compute_slices
- get_json_schema
- infer_schema
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
223class FileBasedStreamPartition(Partition): 224 def __init__( 225 self, 226 stream: AbstractFileBasedStream, 227 _slice: Optional[Mapping[str, Any]], 228 message_repository: MessageRepository, 229 sync_mode: SyncMode, 230 cursor_field: Optional[List[str]], 231 state: Optional[MutableMapping[str, Any]], 232 ): 233 self._stream = stream 234 self._slice = _slice 235 self._message_repository = message_repository 236 self._sync_mode = sync_mode 237 self._cursor_field = cursor_field 238 self._state = state 239 240 def read(self) -> Iterable[Record]: 241 try: 242 for record_data in self._stream.read_records( 243 cursor_field=self._cursor_field, 244 sync_mode=SyncMode.full_refresh, 245 stream_slice=copy.deepcopy(self._slice), 246 stream_state=self._state, 247 ): 248 if isinstance(record_data, Mapping): 249 data_to_return = dict(record_data) 250 self._stream.transformer.transform( 251 data_to_return, self._stream.get_json_schema() 252 ) 253 yield Record(data=data_to_return, stream_name=self.stream_name()) 254 elif ( 255 isinstance(record_data, AirbyteMessage) 256 and record_data.type == Type.RECORD 257 and record_data.record is not None 258 ): 259 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 260 record_message_data = record_data.record.data 261 if not record_message_data: 262 raise ExceptionWithDisplayMessage("A record without data was found") 263 else: 264 yield Record( 265 data=record_message_data, 266 stream_name=self.stream_name(), 267 file_reference=record_data.record.file_reference, 268 ) 269 else: 270 self._message_repository.emit_message(record_data) 271 except Exception as e: 272 display_message = self._stream.get_error_display_message(e) 273 if display_message: 274 raise ExceptionWithDisplayMessage(display_message) from e 275 else: 276 raise e 277 278 def to_slice(self) -> Optional[Mapping[str, Any]]: 279 if self._slice is None: 280 return None 281 assert len(self._slice["files"]) == 1, ( 282 f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 283 ) 284 file = self._slice["files"][0] 285 return {"files": [file]} 286 287 def __hash__(self) -> int: 288 if self._slice: 289 # Convert the slice to a string so that it can be hashed 290 if len(self._slice["files"]) != 1: 291 raise ValueError( 292 f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support." 293 ) 294 else: 295 s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}" 296 return hash((self._stream.name, s)) 297 else: 298 return hash(self._stream.name) 299 300 def stream_name(self) -> str: 301 return self._stream.name 302 303 def __repr__(self) -> str: 304 return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"
A partition is responsible for reading a specific set of data from a source.
224 def __init__( 225 self, 226 stream: AbstractFileBasedStream, 227 _slice: Optional[Mapping[str, Any]], 228 message_repository: MessageRepository, 229 sync_mode: SyncMode, 230 cursor_field: Optional[List[str]], 231 state: Optional[MutableMapping[str, Any]], 232 ): 233 self._stream = stream 234 self._slice = _slice 235 self._message_repository = message_repository 236 self._sync_mode = sync_mode 237 self._cursor_field = cursor_field 238 self._state = state
240 def read(self) -> Iterable[Record]: 241 try: 242 for record_data in self._stream.read_records( 243 cursor_field=self._cursor_field, 244 sync_mode=SyncMode.full_refresh, 245 stream_slice=copy.deepcopy(self._slice), 246 stream_state=self._state, 247 ): 248 if isinstance(record_data, Mapping): 249 data_to_return = dict(record_data) 250 self._stream.transformer.transform( 251 data_to_return, self._stream.get_json_schema() 252 ) 253 yield Record(data=data_to_return, stream_name=self.stream_name()) 254 elif ( 255 isinstance(record_data, AirbyteMessage) 256 and record_data.type == Type.RECORD 257 and record_data.record is not None 258 ): 259 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 260 record_message_data = record_data.record.data 261 if not record_message_data: 262 raise ExceptionWithDisplayMessage("A record without data was found") 263 else: 264 yield Record( 265 data=record_message_data, 266 stream_name=self.stream_name(), 267 file_reference=record_data.record.file_reference, 268 ) 269 else: 270 self._message_repository.emit_message(record_data) 271 except Exception as e: 272 display_message = self._stream.get_error_display_message(e) 273 if display_message: 274 raise ExceptionWithDisplayMessage(display_message) from e 275 else: 276 raise e
Reads the data from the partition.
Returns
An iterable of records.
278 def to_slice(self) -> Optional[Mapping[str, Any]]: 279 if self._slice is None: 280 return None 281 assert len(self._slice["files"]) == 1, ( 282 f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 283 ) 284 file = self._slice["files"][0] 285 return {"files": [file]}
Converts the partition to a slice that can be serialized and deserialized.
Note: it would have been interesting to have a type of Mapping[str, Comparable]
to simplify typing but some slices can have nested
values (example)
Returns
A mapping representing a slice
307class FileBasedStreamPartitionGenerator(PartitionGenerator): 308 def __init__( 309 self, 310 stream: AbstractFileBasedStream, 311 message_repository: MessageRepository, 312 sync_mode: SyncMode, 313 cursor_field: Optional[List[str]], 314 state: Optional[MutableMapping[str, Any]], 315 cursor: "AbstractConcurrentFileBasedCursor", 316 ): 317 self._stream = stream 318 self._message_repository = message_repository 319 self._sync_mode = sync_mode 320 self._cursor_field = cursor_field 321 self._state = state 322 self._cursor = cursor 323 324 def generate(self) -> Iterable[FileBasedStreamPartition]: 325 pending_partitions = [] 326 for _slice in self._stream.stream_slices( 327 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 328 ): 329 if _slice is not None: 330 for file in _slice.get("files", []): 331 pending_partitions.append( 332 FileBasedStreamPartition( 333 self._stream, 334 {"files": [copy.deepcopy(file)]}, 335 self._message_repository, 336 self._sync_mode, 337 self._cursor_field, 338 self._state, 339 ) 340 ) 341 self._cursor.set_pending_partitions(pending_partitions) 342 yield from pending_partitions
Helper class that provides a standard way to create an ABC using inheritance.
308 def __init__( 309 self, 310 stream: AbstractFileBasedStream, 311 message_repository: MessageRepository, 312 sync_mode: SyncMode, 313 cursor_field: Optional[List[str]], 314 state: Optional[MutableMapping[str, Any]], 315 cursor: "AbstractConcurrentFileBasedCursor", 316 ): 317 self._stream = stream 318 self._message_repository = message_repository 319 self._sync_mode = sync_mode 320 self._cursor_field = cursor_field 321 self._state = state 322 self._cursor = cursor
324 def generate(self) -> Iterable[FileBasedStreamPartition]: 325 pending_partitions = [] 326 for _slice in self._stream.stream_slices( 327 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 328 ): 329 if _slice is not None: 330 for file in _slice.get("files", []): 331 pending_partitions.append( 332 FileBasedStreamPartition( 333 self._stream, 334 {"files": [copy.deepcopy(file)]}, 335 self._message_repository, 336 self._sync_mode, 337 self._cursor_field, 338 self._state, 339 ) 340 ) 341 self._cursor.set_pending_partitions(pending_partitions) 342 yield from pending_partitions
Generates partitions for a given sync mode.
Returns
An iterable of partitions