airbyte_cdk.sources.file_based.stream.cursor
16class AbstractFileBasedCursor(ABC): 17 """ 18 Abstract base class for cursors used by file-based streams. 19 """ 20 21 @abstractmethod 22 def __init__(self, stream_config: FileBasedStreamConfig, **kwargs: Any): 23 """ 24 Common interface for all cursors. 25 """ 26 ... 27 28 @abstractmethod 29 def add_file(self, file: RemoteFile) -> None: 30 """ 31 Add a file to the cursor. This method is called when a file is processed by the stream. 32 :param file: The file to add 33 """ 34 ... 35 36 @abstractmethod 37 def set_initial_state(self, value: StreamState) -> None: 38 """ 39 Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet. 40 :param value: The stream state 41 """ 42 43 @abstractmethod 44 def get_state(self) -> MutableMapping[str, Any]: 45 """ 46 Get the state of the cursor. 47 """ 48 ... 49 50 @abstractmethod 51 def get_start_time(self) -> datetime: 52 """ 53 Returns the start time of the current sync. 54 """ 55 ... 56 57 @abstractmethod 58 def get_files_to_sync( 59 self, all_files: Iterable[RemoteFile], logger: logging.Logger 60 ) -> Iterable[RemoteFile]: 61 """ 62 Given the list of files in the source, return the files that should be synced. 63 :param all_files: All files in the source 64 :param logger: 65 :return: The files that should be synced 66 """ 67 ...
Abstract base class for cursors used by file-based streams.
21 @abstractmethod 22 def __init__(self, stream_config: FileBasedStreamConfig, **kwargs: Any): 23 """ 24 Common interface for all cursors. 25 """ 26 ...
Common interface for all cursors.
28 @abstractmethod 29 def add_file(self, file: RemoteFile) -> None: 30 """ 31 Add a file to the cursor. This method is called when a file is processed by the stream. 32 :param file: The file to add 33 """ 34 ...
Add a file to the cursor. This method is called when a file is processed by the stream.
Parameters
- file: The file to add
36 @abstractmethod 37 def set_initial_state(self, value: StreamState) -> None: 38 """ 39 Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet. 40 :param value: The stream state 41 """
Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.
Parameters
- value: The stream state
43 @abstractmethod 44 def get_state(self) -> MutableMapping[str, Any]: 45 """ 46 Get the state of the cursor. 47 """ 48 ...
Get the state of the cursor.
50 @abstractmethod 51 def get_start_time(self) -> datetime: 52 """ 53 Returns the start time of the current sync. 54 """ 55 ...
Returns the start time of the current sync.
57 @abstractmethod 58 def get_files_to_sync( 59 self, all_files: Iterable[RemoteFile], logger: logging.Logger 60 ) -> Iterable[RemoteFile]: 61 """ 62 Given the list of files in the source, return the files that should be synced. 63 :param all_files: All files in the source 64 :param logger: 65 :return: The files that should be synced 66 """ 67 ...
Given the list of files in the source, return the files that should be synced.
Parameters
- all_files: All files in the source
- logger:
Returns
The files that should be synced
18class DefaultFileBasedCursor(AbstractFileBasedCursor): 19 DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3 20 DEFAULT_MAX_HISTORY_SIZE = 10_000 21 DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" 22 CURSOR_FIELD = "_ab_source_file_last_modified" 23 24 def __init__(self, stream_config: FileBasedStreamConfig, **_: Any): 25 super().__init__(stream_config) # type: ignore [safe-super] 26 self._file_to_datetime_history: MutableMapping[str, str] = {} 27 self._time_window_if_history_is_full = timedelta( 28 days=stream_config.days_to_sync_if_history_is_full 29 or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL 30 ) 31 32 if self._time_window_if_history_is_full <= timedelta(): 33 raise ValueError( 34 f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}" 35 ) 36 37 self._start_time = self._compute_start_time() 38 self._initial_earliest_file_in_history: Optional[RemoteFile] = None 39 40 def set_initial_state(self, value: StreamState) -> None: 41 self._file_to_datetime_history = value.get("history", {}) 42 self._start_time = self._compute_start_time() 43 self._initial_earliest_file_in_history = self._compute_earliest_file_in_history() 44 45 def add_file(self, file: RemoteFile) -> None: 46 self._file_to_datetime_history[file.uri] = file.last_modified.strftime( 47 self.DATE_TIME_FORMAT 48 ) 49 if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: 50 # Get the earliest file based on its last modified date and its uri 51 oldest_file = self._compute_earliest_file_in_history() 52 if oldest_file: 53 del self._file_to_datetime_history[oldest_file.uri] 54 else: 55 raise Exception( 56 "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK." 57 ) 58 59 def get_state(self) -> StreamState: 60 state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()} 61 return state 62 63 def _get_cursor(self) -> Optional[str]: 64 """ 65 Returns the cursor value. 66 67 Files are synced in order of last-modified with secondary sort on filename, so the cursor value is 68 a string joining the last-modified timestamp of the last synced file and the name of the file. 69 """ 70 if self._file_to_datetime_history.items(): 71 filename, timestamp = max( 72 self._file_to_datetime_history.items(), key=lambda x: (x[1], x[0]) 73 ) 74 return f"{timestamp}_{filename}" 75 return None 76 77 def _is_history_full(self) -> bool: 78 """ 79 Returns true if the state's history is full, meaning new entries will start to replace old entries. 80 """ 81 return len(self._file_to_datetime_history) >= self.DEFAULT_MAX_HISTORY_SIZE 82 83 def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool: 84 if file.uri in self._file_to_datetime_history: 85 # If the file's uri is in the history, we should sync the file if it has been modified since it was synced 86 updated_at_from_history = datetime.strptime( 87 self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT 88 ) 89 if file.last_modified < updated_at_from_history: 90 logger.warning( 91 f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file." 92 ) 93 else: 94 return file.last_modified > updated_at_from_history 95 return file.last_modified > updated_at_from_history 96 if self._is_history_full(): 97 if self._initial_earliest_file_in_history is None: 98 return True 99 if file.last_modified > self._initial_earliest_file_in_history.last_modified: 100 # If the history is partial and the file's datetime is strictly greater than the earliest file in the history, 101 # we should sync it 102 return True 103 elif file.last_modified == self._initial_earliest_file_in_history.last_modified: 104 # If the history is partial and the file's datetime is equal to the earliest file in the history, 105 # we should sync it if its uri is strictly greater than the earliest file in the history 106 return file.uri > self._initial_earliest_file_in_history.uri 107 else: 108 # Otherwise, only sync the file if it has been modified since the start of the time window 109 return file.last_modified >= self.get_start_time() 110 else: 111 # The file is not in the history and the history is complete. We know we need to sync the file 112 return True 113 114 def get_files_to_sync( 115 self, all_files: Iterable[RemoteFile], logger: logging.Logger 116 ) -> Iterable[RemoteFile]: 117 if self._is_history_full(): 118 logger.warning( 119 f"The state history is full. " 120 f"This sync and future syncs won't be able to use the history to filter out duplicate files. " 121 f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files." 122 ) 123 for f in all_files: 124 if self._should_sync_file(f, logger): 125 yield f 126 127 def get_start_time(self) -> datetime: 128 return self._start_time 129 130 def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]: 131 if self._file_to_datetime_history: 132 filename, last_modified = min( 133 self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0]) 134 ) 135 return RemoteFile( 136 uri=filename, last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT) 137 ) 138 else: 139 return None 140 141 def _compute_start_time(self) -> datetime: 142 if not self._file_to_datetime_history: 143 return datetime.min 144 else: 145 earliest = min(self._file_to_datetime_history.values()) 146 earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT) 147 if self._is_history_full(): 148 time_window = datetime.now() - self._time_window_if_history_is_full 149 earliest_dt = min(earliest_dt, time_window) 150 return earliest_dt
Abstract base class for cursors used by file-based streams.
24 def __init__(self, stream_config: FileBasedStreamConfig, **_: Any): 25 super().__init__(stream_config) # type: ignore [safe-super] 26 self._file_to_datetime_history: MutableMapping[str, str] = {} 27 self._time_window_if_history_is_full = timedelta( 28 days=stream_config.days_to_sync_if_history_is_full 29 or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL 30 ) 31 32 if self._time_window_if_history_is_full <= timedelta(): 33 raise ValueError( 34 f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}" 35 ) 36 37 self._start_time = self._compute_start_time() 38 self._initial_earliest_file_in_history: Optional[RemoteFile] = None
Common interface for all cursors.
40 def set_initial_state(self, value: StreamState) -> None: 41 self._file_to_datetime_history = value.get("history", {}) 42 self._start_time = self._compute_start_time() 43 self._initial_earliest_file_in_history = self._compute_earliest_file_in_history()
Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.
Parameters
- value: The stream state
45 def add_file(self, file: RemoteFile) -> None: 46 self._file_to_datetime_history[file.uri] = file.last_modified.strftime( 47 self.DATE_TIME_FORMAT 48 ) 49 if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE: 50 # Get the earliest file based on its last modified date and its uri 51 oldest_file = self._compute_earliest_file_in_history() 52 if oldest_file: 53 del self._file_to_datetime_history[oldest_file.uri] 54 else: 55 raise Exception( 56 "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK." 57 )
Add a file to the cursor. This method is called when a file is processed by the stream.
Parameters
- file: The file to add
59 def get_state(self) -> StreamState: 60 state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()} 61 return state
Get the state of the cursor.
114 def get_files_to_sync( 115 self, all_files: Iterable[RemoteFile], logger: logging.Logger 116 ) -> Iterable[RemoteFile]: 117 if self._is_history_full(): 118 logger.warning( 119 f"The state history is full. " 120 f"This sync and future syncs won't be able to use the history to filter out duplicate files. " 121 f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files." 122 ) 123 for f in all_files: 124 if self._should_sync_file(f, logger): 125 yield f
Given the list of files in the source, return the files that should be synced.
Parameters
- all_files: All files in the source
- logger:
Returns
The files that should be synced