airbyte_cdk.sources.file_based.stream.concurrent.adapters
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import copy 6import logging 7from functools import cache, lru_cache 8from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union 9 10from typing_extensions import deprecated 11 12from airbyte_cdk.models import ( 13 AirbyteLogMessage, 14 AirbyteMessage, 15 ConfiguredAirbyteStream, 16 Level, 17 SyncMode, 18 Type, 19) 20from airbyte_cdk.sources import AbstractSource 21from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager 22from airbyte_cdk.sources.file_based.availability_strategy import ( 23 AbstractFileBasedAvailabilityStrategy, 24 AbstractFileBasedAvailabilityStrategyWrapper, 25) 26from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType 27from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser 28from airbyte_cdk.sources.file_based.remote_file import RemoteFile 29from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream 30from airbyte_cdk.sources.file_based.stream.concurrent.cursor import FileBasedFinalStateCursor 31from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor 32from airbyte_cdk.sources.file_based.types import StreamSlice 33from airbyte_cdk.sources.message import MessageRepository 34from airbyte_cdk.sources.source import ExperimentalClassWarning 35from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade 36from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream 37from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage 38from airbyte_cdk.sources.streams.concurrent.helpers import ( 39 get_cursor_field_from_stream, 40 get_primary_key_from_stream, 41) 42from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition 43from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator 44from airbyte_cdk.sources.streams.core import StreamData 45from airbyte_cdk.sources.types import Record 46from airbyte_cdk.sources.utils.schema_helpers import InternalConfig 47from airbyte_cdk.sources.utils.slice_logger import SliceLogger 48 49if TYPE_CHECKING: 50 from airbyte_cdk.sources.file_based.stream.concurrent.cursor import ( 51 AbstractConcurrentFileBasedCursor, 52 ) 53 54""" 55This module contains adapters to help enabling concurrency on File-based Stream objects without needing to migrate to AbstractStream 56""" 57 58 59@deprecated( 60 "This class is experimental. Use at your own risk.", 61 category=ExperimentalClassWarning, 62) 63class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream): 64 @classmethod 65 def create_from_stream( 66 cls, 67 stream: AbstractFileBasedStream, 68 source: AbstractSource, 69 logger: logging.Logger, 70 state: Optional[MutableMapping[str, Any]], 71 cursor: "AbstractConcurrentFileBasedCursor", 72 ) -> "FileBasedStreamFacade": 73 """ 74 Create a ConcurrentStream from a FileBasedStream object. 75 """ 76 pk = get_primary_key_from_stream(stream.primary_key) 77 cursor_field = get_cursor_field_from_stream(stream) 78 stream._cursor = cursor 79 80 if not source.message_repository: 81 raise ValueError( 82 "A message repository is required to emit non-record messages. Please set the message repository on the source." 83 ) 84 85 message_repository = source.message_repository 86 return FileBasedStreamFacade( 87 DefaultStream( 88 partition_generator=FileBasedStreamPartitionGenerator( 89 stream, 90 message_repository, 91 SyncMode.full_refresh 92 if isinstance(cursor, FileBasedFinalStateCursor) 93 else SyncMode.incremental, 94 [cursor_field] if cursor_field is not None else None, 95 state, 96 cursor, 97 ), 98 name=stream.name, 99 json_schema=stream.get_json_schema(), 100 availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream), 101 primary_key=pk, 102 cursor_field=cursor_field, 103 logger=logger, 104 namespace=stream.namespace, 105 cursor=cursor, 106 ), 107 stream, 108 cursor, 109 logger=logger, 110 slice_logger=source._slice_logger, 111 ) 112 113 def __init__( 114 self, 115 stream: DefaultStream, 116 legacy_stream: AbstractFileBasedStream, 117 cursor: AbstractFileBasedCursor, 118 slice_logger: SliceLogger, 119 logger: logging.Logger, 120 ): 121 """ 122 :param stream: The underlying AbstractStream 123 """ 124 self._abstract_stream = stream 125 self._legacy_stream = legacy_stream 126 self._cursor = cursor 127 self._slice_logger = slice_logger 128 self._logger = logger 129 self.catalog_schema = legacy_stream.catalog_schema 130 self.config = legacy_stream.config 131 self.validation_policy = legacy_stream.validation_policy 132 133 @property 134 def cursor_field(self) -> Union[str, List[str]]: 135 if self._abstract_stream.cursor_field is None: 136 return [] 137 else: 138 return self._abstract_stream.cursor_field 139 140 @property 141 def name(self) -> str: 142 return self._abstract_stream.name 143 144 @property 145 def supports_incremental(self) -> bool: 146 return self._legacy_stream.supports_incremental 147 148 @property 149 @deprecated("Deprecated as of CDK version 3.7.0.") 150 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 151 return self._legacy_stream.availability_strategy 152 153 @lru_cache(maxsize=None) 154 def get_json_schema(self) -> Mapping[str, Any]: 155 return self._abstract_stream.get_json_schema() 156 157 @property 158 def primary_key(self) -> PrimaryKeyType: 159 return ( 160 self._legacy_stream.config.primary_key 161 or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config) 162 ) 163 164 def get_parser(self) -> FileTypeParser: 165 return self._legacy_stream.get_parser() 166 167 def get_files(self) -> Iterable[RemoteFile]: 168 return self._legacy_stream.get_files() 169 170 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]: 171 yield from self._legacy_stream.read_records_from_slice(stream_slice) # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage 172 173 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 174 return self._legacy_stream.compute_slices() 175 176 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 177 return self._legacy_stream.infer_schema(files) 178 179 def get_underlying_stream(self) -> DefaultStream: 180 return self._abstract_stream 181 182 def read( 183 self, 184 configured_stream: ConfiguredAirbyteStream, 185 logger: logging.Logger, 186 slice_logger: SliceLogger, 187 stream_state: MutableMapping[str, Any], 188 state_manager: ConnectorStateManager, 189 internal_config: InternalConfig, 190 ) -> Iterable[StreamData]: 191 yield from self._read_records() 192 193 def read_records( 194 self, 195 sync_mode: SyncMode, 196 cursor_field: Optional[List[str]] = None, 197 stream_slice: Optional[Mapping[str, Any]] = None, 198 stream_state: Optional[Mapping[str, Any]] = None, 199 ) -> Iterable[StreamData]: 200 try: 201 yield from self._read_records() 202 except Exception as exc: 203 if hasattr(self._cursor, "state"): 204 state = str(self._cursor.state) 205 else: 206 # This shouldn't happen if the ConcurrentCursor was used 207 state = "unknown; no state attribute was available on the cursor" 208 yield AirbyteMessage( 209 type=Type.LOG, 210 log=AirbyteLogMessage( 211 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 212 ), 213 ) 214 raise exc 215 216 def _read_records(self) -> Iterable[StreamData]: 217 for partition in self._abstract_stream.generate_partitions(): 218 if self._slice_logger.should_log_slice_message(self._logger): 219 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 220 for record in partition.read(): 221 yield record.data 222 223 224class FileBasedStreamPartition(Partition): 225 def __init__( 226 self, 227 stream: AbstractFileBasedStream, 228 _slice: Optional[Mapping[str, Any]], 229 message_repository: MessageRepository, 230 sync_mode: SyncMode, 231 cursor_field: Optional[List[str]], 232 state: Optional[MutableMapping[str, Any]], 233 ): 234 self._stream = stream 235 self._slice = _slice 236 self._message_repository = message_repository 237 self._sync_mode = sync_mode 238 self._cursor_field = cursor_field 239 self._state = state 240 241 def read(self) -> Iterable[Record]: 242 try: 243 for record_data in self._stream.read_records( 244 cursor_field=self._cursor_field, 245 sync_mode=SyncMode.full_refresh, 246 stream_slice=copy.deepcopy(self._slice), 247 stream_state=self._state, 248 ): 249 if isinstance(record_data, Mapping): 250 data_to_return = dict(record_data) 251 self._stream.transformer.transform( 252 data_to_return, self._stream.get_json_schema() 253 ) 254 yield Record(data=data_to_return, stream_name=self.stream_name()) 255 elif ( 256 isinstance(record_data, AirbyteMessage) 257 and record_data.type == Type.RECORD 258 and record_data.record is not None 259 ): 260 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 261 # If stream is flagged for file_transfer the record should data in file key 262 record_message_data = ( 263 record_data.record.file 264 if self._use_file_transfer() 265 else record_data.record.data 266 ) 267 if not record_message_data: 268 raise ExceptionWithDisplayMessage("A record without data was found") 269 else: 270 yield Record( 271 data=record_message_data, 272 stream_name=self.stream_name(), 273 is_file_transfer_message=self._use_file_transfer(), 274 ) 275 else: 276 self._message_repository.emit_message(record_data) 277 except Exception as e: 278 display_message = self._stream.get_error_display_message(e) 279 if display_message: 280 raise ExceptionWithDisplayMessage(display_message) from e 281 else: 282 raise e 283 284 def to_slice(self) -> Optional[Mapping[str, Any]]: 285 if self._slice is None: 286 return None 287 assert ( 288 len(self._slice["files"]) == 1 289 ), f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 290 file = self._slice["files"][0] 291 return {"files": [file]} 292 293 def __hash__(self) -> int: 294 if self._slice: 295 # Convert the slice to a string so that it can be hashed 296 if len(self._slice["files"]) != 1: 297 raise ValueError( 298 f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support." 299 ) 300 else: 301 s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}" 302 return hash((self._stream.name, s)) 303 else: 304 return hash(self._stream.name) 305 306 def stream_name(self) -> str: 307 return self._stream.name 308 309 @cache 310 def _use_file_transfer(self) -> bool: 311 return hasattr(self._stream, "use_file_transfer") and self._stream.use_file_transfer 312 313 def __repr__(self) -> str: 314 return f"FileBasedStreamPartition({self._stream.name}, {self._slice})" 315 316 317class FileBasedStreamPartitionGenerator(PartitionGenerator): 318 def __init__( 319 self, 320 stream: AbstractFileBasedStream, 321 message_repository: MessageRepository, 322 sync_mode: SyncMode, 323 cursor_field: Optional[List[str]], 324 state: Optional[MutableMapping[str, Any]], 325 cursor: "AbstractConcurrentFileBasedCursor", 326 ): 327 self._stream = stream 328 self._message_repository = message_repository 329 self._sync_mode = sync_mode 330 self._cursor_field = cursor_field 331 self._state = state 332 self._cursor = cursor 333 334 def generate(self) -> Iterable[FileBasedStreamPartition]: 335 pending_partitions = [] 336 for _slice in self._stream.stream_slices( 337 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 338 ): 339 if _slice is not None: 340 for file in _slice.get("files", []): 341 pending_partitions.append( 342 FileBasedStreamPartition( 343 self._stream, 344 {"files": [copy.deepcopy(file)]}, 345 self._message_repository, 346 self._sync_mode, 347 self._cursor_field, 348 self._state, 349 ) 350 ) 351 self._cursor.set_pending_partitions(pending_partitions) 352 yield from pending_partitions
60@deprecated( 61 "This class is experimental. Use at your own risk.", 62 category=ExperimentalClassWarning, 63) 64class FileBasedStreamFacade(AbstractStreamFacade[DefaultStream], AbstractFileBasedStream): 65 @classmethod 66 def create_from_stream( 67 cls, 68 stream: AbstractFileBasedStream, 69 source: AbstractSource, 70 logger: logging.Logger, 71 state: Optional[MutableMapping[str, Any]], 72 cursor: "AbstractConcurrentFileBasedCursor", 73 ) -> "FileBasedStreamFacade": 74 """ 75 Create a ConcurrentStream from a FileBasedStream object. 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 stream._cursor = cursor 80 81 if not source.message_repository: 82 raise ValueError( 83 "A message repository is required to emit non-record messages. Please set the message repository on the source." 84 ) 85 86 message_repository = source.message_repository 87 return FileBasedStreamFacade( 88 DefaultStream( 89 partition_generator=FileBasedStreamPartitionGenerator( 90 stream, 91 message_repository, 92 SyncMode.full_refresh 93 if isinstance(cursor, FileBasedFinalStateCursor) 94 else SyncMode.incremental, 95 [cursor_field] if cursor_field is not None else None, 96 state, 97 cursor, 98 ), 99 name=stream.name, 100 json_schema=stream.get_json_schema(), 101 availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream), 102 primary_key=pk, 103 cursor_field=cursor_field, 104 logger=logger, 105 namespace=stream.namespace, 106 cursor=cursor, 107 ), 108 stream, 109 cursor, 110 logger=logger, 111 slice_logger=source._slice_logger, 112 ) 113 114 def __init__( 115 self, 116 stream: DefaultStream, 117 legacy_stream: AbstractFileBasedStream, 118 cursor: AbstractFileBasedCursor, 119 slice_logger: SliceLogger, 120 logger: logging.Logger, 121 ): 122 """ 123 :param stream: The underlying AbstractStream 124 """ 125 self._abstract_stream = stream 126 self._legacy_stream = legacy_stream 127 self._cursor = cursor 128 self._slice_logger = slice_logger 129 self._logger = logger 130 self.catalog_schema = legacy_stream.catalog_schema 131 self.config = legacy_stream.config 132 self.validation_policy = legacy_stream.validation_policy 133 134 @property 135 def cursor_field(self) -> Union[str, List[str]]: 136 if self._abstract_stream.cursor_field is None: 137 return [] 138 else: 139 return self._abstract_stream.cursor_field 140 141 @property 142 def name(self) -> str: 143 return self._abstract_stream.name 144 145 @property 146 def supports_incremental(self) -> bool: 147 return self._legacy_stream.supports_incremental 148 149 @property 150 @deprecated("Deprecated as of CDK version 3.7.0.") 151 def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: 152 return self._legacy_stream.availability_strategy 153 154 @lru_cache(maxsize=None) 155 def get_json_schema(self) -> Mapping[str, Any]: 156 return self._abstract_stream.get_json_schema() 157 158 @property 159 def primary_key(self) -> PrimaryKeyType: 160 return ( 161 self._legacy_stream.config.primary_key 162 or self.get_parser().get_parser_defined_primary_key(self._legacy_stream.config) 163 ) 164 165 def get_parser(self) -> FileTypeParser: 166 return self._legacy_stream.get_parser() 167 168 def get_files(self) -> Iterable[RemoteFile]: 169 return self._legacy_stream.get_files() 170 171 def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping[str, Any]]: 172 yield from self._legacy_stream.read_records_from_slice(stream_slice) # type: ignore[misc] # Only Mapping[str, Any] is expected for legacy streams, not AirbyteMessage 173 174 def compute_slices(self) -> Iterable[Optional[StreamSlice]]: 175 return self._legacy_stream.compute_slices() 176 177 def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: 178 return self._legacy_stream.infer_schema(files) 179 180 def get_underlying_stream(self) -> DefaultStream: 181 return self._abstract_stream 182 183 def read( 184 self, 185 configured_stream: ConfiguredAirbyteStream, 186 logger: logging.Logger, 187 slice_logger: SliceLogger, 188 stream_state: MutableMapping[str, Any], 189 state_manager: ConnectorStateManager, 190 internal_config: InternalConfig, 191 ) -> Iterable[StreamData]: 192 yield from self._read_records() 193 194 def read_records( 195 self, 196 sync_mode: SyncMode, 197 cursor_field: Optional[List[str]] = None, 198 stream_slice: Optional[Mapping[str, Any]] = None, 199 stream_state: Optional[Mapping[str, Any]] = None, 200 ) -> Iterable[StreamData]: 201 try: 202 yield from self._read_records() 203 except Exception as exc: 204 if hasattr(self._cursor, "state"): 205 state = str(self._cursor.state) 206 else: 207 # This shouldn't happen if the ConcurrentCursor was used 208 state = "unknown; no state attribute was available on the cursor" 209 yield AirbyteMessage( 210 type=Type.LOG, 211 log=AirbyteLogMessage( 212 level=Level.ERROR, message=f"Cursor State at time of exception: {state}" 213 ), 214 ) 215 raise exc 216 217 def _read_records(self) -> Iterable[StreamData]: 218 for partition in self._abstract_stream.generate_partitions(): 219 if self._slice_logger.should_log_slice_message(self._logger): 220 yield self._slice_logger.create_slice_log_message(partition.to_slice()) 221 for record in partition.read(): 222 yield record.data
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
65 @classmethod 66 def create_from_stream( 67 cls, 68 stream: AbstractFileBasedStream, 69 source: AbstractSource, 70 logger: logging.Logger, 71 state: Optional[MutableMapping[str, Any]], 72 cursor: "AbstractConcurrentFileBasedCursor", 73 ) -> "FileBasedStreamFacade": 74 """ 75 Create a ConcurrentStream from a FileBasedStream object. 76 """ 77 pk = get_primary_key_from_stream(stream.primary_key) 78 cursor_field = get_cursor_field_from_stream(stream) 79 stream._cursor = cursor 80 81 if not source.message_repository: 82 raise ValueError( 83 "A message repository is required to emit non-record messages. Please set the message repository on the source." 84 ) 85 86 message_repository = source.message_repository 87 return FileBasedStreamFacade( 88 DefaultStream( 89 partition_generator=FileBasedStreamPartitionGenerator( 90 stream, 91 message_repository, 92 SyncMode.full_refresh 93 if isinstance(cursor, FileBasedFinalStateCursor) 94 else SyncMode.incremental, 95 [cursor_field] if cursor_field is not None else None, 96 state, 97 cursor, 98 ), 99 name=stream.name, 100 json_schema=stream.get_json_schema(), 101 availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream), 102 primary_key=pk, 103 cursor_field=cursor_field, 104 logger=logger, 105 namespace=stream.namespace, 106 cursor=cursor, 107 ), 108 stream, 109 cursor, 110 logger=logger, 111 slice_logger=source._slice_logger, 112 )
Create a ConcurrentStream from a FileBasedStream object.
134 @property 135 def cursor_field(self) -> Union[str, List[str]]: 136 if self._abstract_stream.cursor_field is None: 137 return [] 138 else: 139 return self._abstract_stream.cursor_field
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
Returns
The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
145 @property 146 def supports_incremental(self) -> bool: 147 return self._legacy_stream.supports_incremental
Returns
True if this stream supports incrementally reading data
Return the underlying stream facade object.
183 def read( 184 self, 185 configured_stream: ConfiguredAirbyteStream, 186 logger: logging.Logger, 187 slice_logger: SliceLogger, 188 stream_state: MutableMapping[str, Any], 189 state_manager: ConnectorStateManager, 190 internal_config: InternalConfig, 191 ) -> Iterable[StreamData]: 192 yield from self._read_records()
Inherited Members
- airbyte_cdk.sources.file_based.stream.abstract_file_based_stream.AbstractFileBasedStream
- AbstractFileBasedStream
- config
- catalog_schema
- validation_policy
- stream_reader
- errors_collector
- primary_key
- list_files
- get_files
- read_records
- read_records_from_slice
- stream_slices
- compute_slices
- get_json_schema
- infer_schema
- get_parser
- record_passes_validation_policy
- availability_strategy
- name
- get_cursor
225class FileBasedStreamPartition(Partition): 226 def __init__( 227 self, 228 stream: AbstractFileBasedStream, 229 _slice: Optional[Mapping[str, Any]], 230 message_repository: MessageRepository, 231 sync_mode: SyncMode, 232 cursor_field: Optional[List[str]], 233 state: Optional[MutableMapping[str, Any]], 234 ): 235 self._stream = stream 236 self._slice = _slice 237 self._message_repository = message_repository 238 self._sync_mode = sync_mode 239 self._cursor_field = cursor_field 240 self._state = state 241 242 def read(self) -> Iterable[Record]: 243 try: 244 for record_data in self._stream.read_records( 245 cursor_field=self._cursor_field, 246 sync_mode=SyncMode.full_refresh, 247 stream_slice=copy.deepcopy(self._slice), 248 stream_state=self._state, 249 ): 250 if isinstance(record_data, Mapping): 251 data_to_return = dict(record_data) 252 self._stream.transformer.transform( 253 data_to_return, self._stream.get_json_schema() 254 ) 255 yield Record(data=data_to_return, stream_name=self.stream_name()) 256 elif ( 257 isinstance(record_data, AirbyteMessage) 258 and record_data.type == Type.RECORD 259 and record_data.record is not None 260 ): 261 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 262 # If stream is flagged for file_transfer the record should data in file key 263 record_message_data = ( 264 record_data.record.file 265 if self._use_file_transfer() 266 else record_data.record.data 267 ) 268 if not record_message_data: 269 raise ExceptionWithDisplayMessage("A record without data was found") 270 else: 271 yield Record( 272 data=record_message_data, 273 stream_name=self.stream_name(), 274 is_file_transfer_message=self._use_file_transfer(), 275 ) 276 else: 277 self._message_repository.emit_message(record_data) 278 except Exception as e: 279 display_message = self._stream.get_error_display_message(e) 280 if display_message: 281 raise ExceptionWithDisplayMessage(display_message) from e 282 else: 283 raise e 284 285 def to_slice(self) -> Optional[Mapping[str, Any]]: 286 if self._slice is None: 287 return None 288 assert ( 289 len(self._slice["files"]) == 1 290 ), f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 291 file = self._slice["files"][0] 292 return {"files": [file]} 293 294 def __hash__(self) -> int: 295 if self._slice: 296 # Convert the slice to a string so that it can be hashed 297 if len(self._slice["files"]) != 1: 298 raise ValueError( 299 f"Slices for file-based streams should be of length 1, but got {len(self._slice['files'])}. This is unexpected. Please contact Support." 300 ) 301 else: 302 s = f"{self._slice['files'][0].last_modified.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}_{self._slice['files'][0].uri}" 303 return hash((self._stream.name, s)) 304 else: 305 return hash(self._stream.name) 306 307 def stream_name(self) -> str: 308 return self._stream.name 309 310 @cache 311 def _use_file_transfer(self) -> bool: 312 return hasattr(self._stream, "use_file_transfer") and self._stream.use_file_transfer 313 314 def __repr__(self) -> str: 315 return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"
A partition is responsible for reading a specific set of data from a source.
226 def __init__( 227 self, 228 stream: AbstractFileBasedStream, 229 _slice: Optional[Mapping[str, Any]], 230 message_repository: MessageRepository, 231 sync_mode: SyncMode, 232 cursor_field: Optional[List[str]], 233 state: Optional[MutableMapping[str, Any]], 234 ): 235 self._stream = stream 236 self._slice = _slice 237 self._message_repository = message_repository 238 self._sync_mode = sync_mode 239 self._cursor_field = cursor_field 240 self._state = state
242 def read(self) -> Iterable[Record]: 243 try: 244 for record_data in self._stream.read_records( 245 cursor_field=self._cursor_field, 246 sync_mode=SyncMode.full_refresh, 247 stream_slice=copy.deepcopy(self._slice), 248 stream_state=self._state, 249 ): 250 if isinstance(record_data, Mapping): 251 data_to_return = dict(record_data) 252 self._stream.transformer.transform( 253 data_to_return, self._stream.get_json_schema() 254 ) 255 yield Record(data=data_to_return, stream_name=self.stream_name()) 256 elif ( 257 isinstance(record_data, AirbyteMessage) 258 and record_data.type == Type.RECORD 259 and record_data.record is not None 260 ): 261 # `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued 262 # If stream is flagged for file_transfer the record should data in file key 263 record_message_data = ( 264 record_data.record.file 265 if self._use_file_transfer() 266 else record_data.record.data 267 ) 268 if not record_message_data: 269 raise ExceptionWithDisplayMessage("A record without data was found") 270 else: 271 yield Record( 272 data=record_message_data, 273 stream_name=self.stream_name(), 274 is_file_transfer_message=self._use_file_transfer(), 275 ) 276 else: 277 self._message_repository.emit_message(record_data) 278 except Exception as e: 279 display_message = self._stream.get_error_display_message(e) 280 if display_message: 281 raise ExceptionWithDisplayMessage(display_message) from e 282 else: 283 raise e
Reads the data from the partition.
Returns
An iterable of records.
285 def to_slice(self) -> Optional[Mapping[str, Any]]: 286 if self._slice is None: 287 return None 288 assert ( 289 len(self._slice["files"]) == 1 290 ), f"Expected 1 file per partition but got {len(self._slice['files'])} for stream {self.stream_name()}" 291 file = self._slice["files"][0] 292 return {"files": [file]}
Converts the partition to a slice that can be serialized and deserialized.
Note: it would have been interesting to have a type of Mapping[str, Comparable]
to simplify typing but some slices can have nested
values (example)
Returns
A mapping representing a slice
318class FileBasedStreamPartitionGenerator(PartitionGenerator): 319 def __init__( 320 self, 321 stream: AbstractFileBasedStream, 322 message_repository: MessageRepository, 323 sync_mode: SyncMode, 324 cursor_field: Optional[List[str]], 325 state: Optional[MutableMapping[str, Any]], 326 cursor: "AbstractConcurrentFileBasedCursor", 327 ): 328 self._stream = stream 329 self._message_repository = message_repository 330 self._sync_mode = sync_mode 331 self._cursor_field = cursor_field 332 self._state = state 333 self._cursor = cursor 334 335 def generate(self) -> Iterable[FileBasedStreamPartition]: 336 pending_partitions = [] 337 for _slice in self._stream.stream_slices( 338 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 339 ): 340 if _slice is not None: 341 for file in _slice.get("files", []): 342 pending_partitions.append( 343 FileBasedStreamPartition( 344 self._stream, 345 {"files": [copy.deepcopy(file)]}, 346 self._message_repository, 347 self._sync_mode, 348 self._cursor_field, 349 self._state, 350 ) 351 ) 352 self._cursor.set_pending_partitions(pending_partitions) 353 yield from pending_partitions
Helper class that provides a standard way to create an ABC using inheritance.
319 def __init__( 320 self, 321 stream: AbstractFileBasedStream, 322 message_repository: MessageRepository, 323 sync_mode: SyncMode, 324 cursor_field: Optional[List[str]], 325 state: Optional[MutableMapping[str, Any]], 326 cursor: "AbstractConcurrentFileBasedCursor", 327 ): 328 self._stream = stream 329 self._message_repository = message_repository 330 self._sync_mode = sync_mode 331 self._cursor_field = cursor_field 332 self._state = state 333 self._cursor = cursor
335 def generate(self) -> Iterable[FileBasedStreamPartition]: 336 pending_partitions = [] 337 for _slice in self._stream.stream_slices( 338 sync_mode=self._sync_mode, cursor_field=self._cursor_field, stream_state=self._state 339 ): 340 if _slice is not None: 341 for file in _slice.get("files", []): 342 pending_partitions.append( 343 FileBasedStreamPartition( 344 self._stream, 345 {"files": [copy.deepcopy(file)]}, 346 self._message_repository, 347 self._sync_mode, 348 self._cursor_field, 349 self._state, 350 ) 351 ) 352 self._cursor.set_pending_partitions(pending_partitions) 353 yield from pending_partitions
Generates partitions for a given sync mode.
Returns
An iterable of partitions