airbyte_cdk.sources.file_based.stream.concurrent.cursor
1from .abstract_concurrent_file_based_cursor import AbstractConcurrentFileBasedCursor 2from .file_based_concurrent_cursor import FileBasedConcurrentCursor 3from .file_based_final_state_cursor import FileBasedFinalStateCursor 4 5__all__ = [ 6 "AbstractConcurrentFileBasedCursor", 7 "FileBasedConcurrentCursor", 8 "FileBasedFinalStateCursor", 9]
22class AbstractConcurrentFileBasedCursor(Cursor, AbstractFileBasedCursor, ABC): 23 def __init__(self, *args: Any, **kwargs: Any) -> None: 24 pass 25 26 @property 27 @abstractmethod 28 def state(self) -> MutableMapping[str, Any]: ... 29 30 @abstractmethod 31 def observe(self, record: Record) -> None: ... 32 33 @abstractmethod 34 def close_partition(self, partition: Partition) -> None: ... 35 36 @abstractmethod 37 def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None: ... 38 39 @abstractmethod 40 def add_file(self, file: RemoteFile) -> None: ... 41 42 @abstractmethod 43 def get_files_to_sync( 44 self, all_files: Iterable[RemoteFile], logger: logging.Logger 45 ) -> Iterable[RemoteFile]: ... 46 47 @abstractmethod 48 def get_state(self) -> MutableMapping[str, Any]: ... 49 50 @abstractmethod 51 def set_initial_state(self, value: StreamState) -> None: ... 52 53 @abstractmethod 54 def get_start_time(self) -> datetime: ... 55 56 @abstractmethod 57 def emit_state_message(self) -> None: ... 58 59 @abstractmethod 60 def ensure_at_least_one_state_emitted(self) -> None: ...
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
Indicate to the cursor that the partition has been successfully processed
Add a file to the cursor. This method is called when a file is processed by the stream.
Parameters
- file: The file to add
42 @abstractmethod 43 def get_files_to_sync( 44 self, all_files: Iterable[RemoteFile], logger: logging.Logger 45 ) -> Iterable[RemoteFile]: ...
Given the list of files in the source, return the files that should be synced.
Parameters
- all_files: All files in the source
- logger:
Returns
The files that should be synced
Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.
Parameters
- value: The stream state
Returns the start time of the current sync.
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.
Inherited Members
31class FileBasedConcurrentCursor(AbstractConcurrentFileBasedCursor): 32 CURSOR_FIELD = "_ab_source_file_last_modified" 33 DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = ( 34 DefaultFileBasedCursor.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL 35 ) 36 DEFAULT_MAX_HISTORY_SIZE = 10_000 37 DATE_TIME_FORMAT = DefaultFileBasedCursor.DATE_TIME_FORMAT 38 zero_value = datetime.min 39 zero_cursor_value = f"0001-01-01T00:00:00.000000Z_{_NULL_FILE}" 40 41 def __init__( 42 self, 43 stream_config: FileBasedStreamConfig, 44 stream_name: str, 45 stream_namespace: Optional[str], 46 stream_state: MutableMapping[str, Any], 47 message_repository: MessageRepository, 48 connector_state_manager: ConnectorStateManager, 49 cursor_field: CursorField, 50 ) -> None: 51 super().__init__() 52 self._stream_name = stream_name 53 self._stream_namespace = stream_namespace 54 self._state = stream_state 55 self._message_repository = message_repository 56 self._connector_state_manager = connector_state_manager 57 self._cursor_field = cursor_field 58 self._time_window_if_history_is_full = timedelta( 59 days=stream_config.days_to_sync_if_history_is_full 60 or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL 61 ) 62 self._state_lock = RLock() 63 self._pending_files_lock = RLock() 64 self._pending_files: Optional[Dict[str, RemoteFile]] = None 65 self._file_to_datetime_history = stream_state.get("history", {}) if stream_state else {} 66 self._prev_cursor_value = self._compute_prev_sync_cursor(stream_state) 67 self._sync_start = self._compute_start_time() 68 69 @property 70 def state(self) -> MutableMapping[str, Any]: 71 return self._state 72 73 def observe(self, record: Record) -> None: 74 pass 75 76 def close_partition(self, partition: Partition) -> None: 77 with self._pending_files_lock: 78 if self._pending_files is None: 79 raise RuntimeError( 80 "Expected pending partitions to be set but it was not. This is unexpected. Please contact Support." 81 ) 82 83 def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None: 84 with self._pending_files_lock: 85 self._pending_files = {} 86 for partition in partitions: 87 _slice = partition.to_slice() 88 if _slice is None: 89 continue 90 for file in _slice["files"]: 91 if file.uri in self._pending_files.keys(): 92 raise RuntimeError( 93 f"Already found file {_slice} in pending files. This is unexpected. Please contact Support." 94 ) 95 self._pending_files.update({file.uri: file}) 96 97 def _compute_prev_sync_cursor(self, value: Optional[StreamState]) -> Tuple[datetime, str]: 98 if not value: 99 return self.zero_value, "" 100 prev_cursor_str = value.get(self._cursor_field.cursor_field_key) or self.zero_cursor_value 101 # So if we see a cursor greater than the earliest file, it means that we have likely synced all files. 102 # However, we take the earliest file as the cursor value for the purpose of checking which files to 103 # sync, in case new files have been uploaded in the meantime. 104 # This should be very rare, as it would indicate a race condition where a file with an earlier 105 # last_modified time was uploaded after a file with a later last_modified time. Since last_modified 106 # represents the start time that the file was uploaded, we can usually expect that all previous 107 # files have already been uploaded. If that's the case, they'll be in history and we'll skip 108 # re-uploading them. 109 earliest_file_cursor_value = self._get_cursor_key_from_file( 110 self._compute_earliest_file_in_history() 111 ) 112 cursor_str = min(prev_cursor_str, earliest_file_cursor_value) 113 cursor_dt, cursor_uri = cursor_str.split("_", 1) 114 return datetime.strptime(cursor_dt, self.DATE_TIME_FORMAT), cursor_uri 115 116 def _get_cursor_key_from_file(self, file: Optional[RemoteFile]) -> str: 117 if file: 118 return f"{datetime.strftime(file.last_modified, self.DATE_TIME_FORMAT)}_{file.uri}" 119 return self.zero_cursor_value 120 121 def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]: 122 with self._state_lock: 123 if self._file_to_datetime_history: 124 filename, last_modified = min( 125 self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0]) 126 ) 127 return RemoteFile( 128 uri=filename, 129 last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT), 130 ) 131 else: 132 return None 133 134 def add_file(self, file: RemoteFile) -> None: 135 """ 136 Add a file to the cursor. This method is called when a file is processed by the stream. 137 :param file: The file to add 138 """ 139 if self._pending_files is None: 140 raise RuntimeError( 141 "Expected pending partitions to be set but it was not. This is unexpected. Please contact Support." 142 ) 143 with self._pending_files_lock: 144 with self._state_lock: 145 if file.uri not in self._pending_files: 146 self._message_repository.emit_message( 147 AirbyteMessage( 148 type=Type.LOG, 149 log=AirbyteLogMessage( 150 level=Level.WARN, 151 message=f"The file {file.uri} was not found in the list of pending files. This is unexpected. Please contact Support", 152 ), 153 ) 154 ) 155 else: 156 self._pending_files.pop(file.uri) 157 self._file_to_datetime_history[file.uri] = file.last_modified.strftime( 158 self.DATE_TIME_FORMAT 159 ) 160 if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: 161 # Get the earliest file based on its last modified date and its uri 162 oldest_file = self._compute_earliest_file_in_history() 163 if oldest_file: 164 del self._file_to_datetime_history[oldest_file.uri] 165 else: 166 raise Exception( 167 "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK." 168 ) 169 self.emit_state_message() 170 171 def emit_state_message(self) -> None: 172 with self._state_lock: 173 new_state = self.get_state() 174 self._connector_state_manager.update_state_for_stream( 175 self._stream_name, 176 self._stream_namespace, 177 new_state, 178 ) 179 state_message = self._connector_state_manager.create_state_message( 180 self._stream_name, self._stream_namespace 181 ) 182 self._message_repository.emit_message(state_message) 183 184 def _get_new_cursor_value(self) -> str: 185 with self._pending_files_lock: 186 with self._state_lock: 187 if self._pending_files: 188 # If there are partitions that haven't been synced, we don't know whether the files that have been synced 189 # represent a contiguous region. 190 # To avoid missing files, we only increment the cursor up to the oldest pending file, because we know 191 # that all older files have been synced. 192 return self._get_cursor_key_from_file(self._compute_earliest_pending_file()) 193 elif self._file_to_datetime_history: 194 # If all partitions have been synced, we know that the sync is up-to-date and so can advance 195 # the cursor to the newest file in history. 196 return self._get_cursor_key_from_file(self._compute_latest_file_in_history()) 197 else: 198 return f"{self.zero_value.strftime(self.DATE_TIME_FORMAT)}_" 199 200 def _compute_earliest_pending_file(self) -> Optional[RemoteFile]: 201 if self._pending_files: 202 return min(self._pending_files.values(), key=lambda x: x.last_modified) 203 else: 204 return None 205 206 def _compute_latest_file_in_history(self) -> Optional[RemoteFile]: 207 with self._state_lock: 208 if self._file_to_datetime_history: 209 filename, last_modified = max( 210 self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0]) 211 ) 212 return RemoteFile( 213 uri=filename, 214 last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT), 215 ) 216 else: 217 return None 218 219 def get_files_to_sync( 220 self, all_files: Iterable[RemoteFile], logger: logging.Logger 221 ) -> Iterable[RemoteFile]: 222 """ 223 Given the list of files in the source, return the files that should be synced. 224 :param all_files: All files in the source 225 :param logger: 226 :return: The files that should be synced 227 """ 228 with self._state_lock: 229 if self._is_history_full(): 230 logger.warning( 231 f"The state history is full. " 232 f"This sync and future syncs won't be able to use the history to filter out duplicate files. " 233 f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files." 234 ) 235 for f in all_files: 236 if self._should_sync_file(f, logger): 237 yield f 238 239 def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool: 240 with self._state_lock: 241 if file.uri in self._file_to_datetime_history: 242 # If the file's uri is in the history, we should sync the file if it has been modified since it was synced 243 updated_at_from_history = datetime.strptime( 244 self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT 245 ) 246 if file.last_modified < updated_at_from_history: 247 self._message_repository.emit_message( 248 AirbyteMessage( 249 type=Type.LOG, 250 log=AirbyteLogMessage( 251 level=Level.WARN, 252 message=f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file.", 253 ), 254 ) 255 ) 256 return False 257 else: 258 return file.last_modified > updated_at_from_history 259 260 prev_cursor_timestamp, prev_cursor_uri = self._prev_cursor_value 261 if self._is_history_full(): 262 if file.last_modified > prev_cursor_timestamp: 263 # If the history is partial and the file's datetime is strictly greater than the cursor, we should sync it 264 return True 265 elif file.last_modified == prev_cursor_timestamp: 266 # If the history is partial and the file's datetime is equal to the earliest file in the history, 267 # we should sync it if its uri is greater than or equal to the cursor value. 268 return file.uri > prev_cursor_uri 269 else: 270 return file.last_modified >= self._sync_start 271 else: 272 # The file is not in the history and the history is complete. We know we need to sync the file 273 return True 274 275 def _is_history_full(self) -> bool: 276 """ 277 Returns true if the state's history is full, meaning new entries will start to replace old entries. 278 """ 279 with self._state_lock: 280 if self._file_to_datetime_history is None: 281 raise RuntimeError( 282 "The history object has not been set. This is unexpected. Please contact Support." 283 ) 284 return len(self._file_to_datetime_history) >= self.DEFAULT_MAX_HISTORY_SIZE 285 286 def _compute_start_time(self) -> datetime: 287 if not self._file_to_datetime_history: 288 return datetime.min 289 else: 290 earliest = min(self._file_to_datetime_history.values()) 291 earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT) 292 if self._is_history_full(): 293 time_window = datetime.now() - self._time_window_if_history_is_full 294 earliest_dt = min(earliest_dt, time_window) 295 return earliest_dt 296 297 def get_start_time(self) -> datetime: 298 return self._sync_start 299 300 def get_state(self) -> MutableMapping[str, Any]: 301 """ 302 Get the state of the cursor. 303 """ 304 with self._state_lock: 305 return { 306 "history": self._file_to_datetime_history, 307 self._cursor_field.cursor_field_key: self._get_new_cursor_value(), 308 } 309 310 def set_initial_state(self, value: StreamState) -> None: 311 pass 312 313 def ensure_at_least_one_state_emitted(self) -> None: 314 self.emit_state_message()
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
41 def __init__( 42 self, 43 stream_config: FileBasedStreamConfig, 44 stream_name: str, 45 stream_namespace: Optional[str], 46 stream_state: MutableMapping[str, Any], 47 message_repository: MessageRepository, 48 connector_state_manager: ConnectorStateManager, 49 cursor_field: CursorField, 50 ) -> None: 51 super().__init__() 52 self._stream_name = stream_name 53 self._stream_namespace = stream_namespace 54 self._state = stream_state 55 self._message_repository = message_repository 56 self._connector_state_manager = connector_state_manager 57 self._cursor_field = cursor_field 58 self._time_window_if_history_is_full = timedelta( 59 days=stream_config.days_to_sync_if_history_is_full 60 or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL 61 ) 62 self._state_lock = RLock() 63 self._pending_files_lock = RLock() 64 self._pending_files: Optional[Dict[str, RemoteFile]] = None 65 self._file_to_datetime_history = stream_state.get("history", {}) if stream_state else {} 66 self._prev_cursor_value = self._compute_prev_sync_cursor(stream_state) 67 self._sync_start = self._compute_start_time()
Common interface for all cursors.
76 def close_partition(self, partition: Partition) -> None: 77 with self._pending_files_lock: 78 if self._pending_files is None: 79 raise RuntimeError( 80 "Expected pending partitions to be set but it was not. This is unexpected. Please contact Support." 81 )
Indicate to the cursor that the partition has been successfully processed
83 def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None: 84 with self._pending_files_lock: 85 self._pending_files = {} 86 for partition in partitions: 87 _slice = partition.to_slice() 88 if _slice is None: 89 continue 90 for file in _slice["files"]: 91 if file.uri in self._pending_files.keys(): 92 raise RuntimeError( 93 f"Already found file {_slice} in pending files. This is unexpected. Please contact Support." 94 ) 95 self._pending_files.update({file.uri: file})
134 def add_file(self, file: RemoteFile) -> None: 135 """ 136 Add a file to the cursor. This method is called when a file is processed by the stream. 137 :param file: The file to add 138 """ 139 if self._pending_files is None: 140 raise RuntimeError( 141 "Expected pending partitions to be set but it was not. This is unexpected. Please contact Support." 142 ) 143 with self._pending_files_lock: 144 with self._state_lock: 145 if file.uri not in self._pending_files: 146 self._message_repository.emit_message( 147 AirbyteMessage( 148 type=Type.LOG, 149 log=AirbyteLogMessage( 150 level=Level.WARN, 151 message=f"The file {file.uri} was not found in the list of pending files. This is unexpected. Please contact Support", 152 ), 153 ) 154 ) 155 else: 156 self._pending_files.pop(file.uri) 157 self._file_to_datetime_history[file.uri] = file.last_modified.strftime( 158 self.DATE_TIME_FORMAT 159 ) 160 if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: 161 # Get the earliest file based on its last modified date and its uri 162 oldest_file = self._compute_earliest_file_in_history() 163 if oldest_file: 164 del self._file_to_datetime_history[oldest_file.uri] 165 else: 166 raise Exception( 167 "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK." 168 ) 169 self.emit_state_message()
Add a file to the cursor. This method is called when a file is processed by the stream.
Parameters
- file: The file to add
171 def emit_state_message(self) -> None: 172 with self._state_lock: 173 new_state = self.get_state() 174 self._connector_state_manager.update_state_for_stream( 175 self._stream_name, 176 self._stream_namespace, 177 new_state, 178 ) 179 state_message = self._connector_state_manager.create_state_message( 180 self._stream_name, self._stream_namespace 181 ) 182 self._message_repository.emit_message(state_message)
219 def get_files_to_sync( 220 self, all_files: Iterable[RemoteFile], logger: logging.Logger 221 ) -> Iterable[RemoteFile]: 222 """ 223 Given the list of files in the source, return the files that should be synced. 224 :param all_files: All files in the source 225 :param logger: 226 :return: The files that should be synced 227 """ 228 with self._state_lock: 229 if self._is_history_full(): 230 logger.warning( 231 f"The state history is full. " 232 f"This sync and future syncs won't be able to use the history to filter out duplicate files. " 233 f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files." 234 ) 235 for f in all_files: 236 if self._should_sync_file(f, logger): 237 yield f
Given the list of files in the source, return the files that should be synced.
Parameters
- all_files: All files in the source
- logger:
Returns
The files that should be synced
300 def get_state(self) -> MutableMapping[str, Any]: 301 """ 302 Get the state of the cursor. 303 """ 304 with self._state_lock: 305 return { 306 "history": self._file_to_datetime_history, 307 self._cursor_field.cursor_field_key: self._get_new_cursor_value(), 308 }
Get the state of the cursor.
Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.
Parameters
- value: The stream state
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.
Inherited Members
26class FileBasedFinalStateCursor(AbstractConcurrentFileBasedCursor): 27 """Cursor that is used to guarantee at least one state message is emitted for a concurrent file-based stream.""" 28 29 def __init__( 30 self, 31 stream_config: FileBasedStreamConfig, 32 message_repository: MessageRepository, 33 stream_namespace: Optional[str], 34 **kwargs: Any, 35 ): 36 self._stream_name = stream_config.name 37 self._stream_namespace = stream_namespace 38 self._message_repository = message_repository 39 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 40 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 41 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 42 self._connector_state_manager = ConnectorStateManager() 43 44 @property 45 def state(self) -> MutableMapping[str, Any]: 46 return {NO_CURSOR_STATE_KEY: True} 47 48 def observe(self, record: Record) -> None: 49 pass 50 51 def close_partition(self, partition: Partition) -> None: 52 pass 53 54 def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None: 55 pass 56 57 def add_file(self, file: RemoteFile) -> None: 58 pass 59 60 def get_files_to_sync( 61 self, all_files: Iterable[RemoteFile], logger: logging.Logger 62 ) -> Iterable[RemoteFile]: 63 return all_files 64 65 def get_state(self) -> MutableMapping[str, Any]: 66 return {} 67 68 def set_initial_state(self, value: StreamState) -> None: 69 return None 70 71 def get_start_time(self) -> datetime: 72 return datetime.min 73 74 def emit_state_message(self) -> None: 75 pass 76 77 def ensure_at_least_one_state_emitted(self) -> None: 78 self._connector_state_manager.update_state_for_stream( 79 self._stream_name, self._stream_namespace, self.state 80 ) 81 state_message = self._connector_state_manager.create_state_message( 82 self._stream_name, self._stream_namespace 83 ) 84 self._message_repository.emit_message(state_message)
Cursor that is used to guarantee at least one state message is emitted for a concurrent file-based stream.
29 def __init__( 30 self, 31 stream_config: FileBasedStreamConfig, 32 message_repository: MessageRepository, 33 stream_namespace: Optional[str], 34 **kwargs: Any, 35 ): 36 self._stream_name = stream_config.name 37 self._stream_namespace = stream_namespace 38 self._message_repository = message_repository 39 # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel 40 # state message rather than manage overall source state. This is also only temporary as we move to the resumable 41 # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state. 42 self._connector_state_manager = ConnectorStateManager()
Common interface for all cursors.
Indicate to the cursor that the partition has been successfully processed
Add a file to the cursor. This method is called when a file is processed by the stream.
Parameters
- file: The file to add
60 def get_files_to_sync( 61 self, all_files: Iterable[RemoteFile], logger: logging.Logger 62 ) -> Iterable[RemoteFile]: 63 return all_files
Given the list of files in the source, return the files that should be synced.
Parameters
- all_files: All files in the source
- logger:
Returns
The files that should be synced
Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.
Parameters
- value: The stream state
77 def ensure_at_least_one_state_emitted(self) -> None: 78 self._connector_state_manager.update_state_for_stream( 79 self._stream_name, self._stream_namespace, self.state 80 ) 81 state_message = self._connector_state_manager.create_state_message( 82 self._stream_name, self._stream_namespace 83 ) 84 self._message_repository.emit_message(state_message)
State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.