airbyte_cdk.sources.file_based.stream.concurrent.cursor

1from .abstract_concurrent_file_based_cursor import AbstractConcurrentFileBasedCursor
2from .file_based_concurrent_cursor import FileBasedConcurrentCursor
3from .file_based_final_state_cursor import FileBasedFinalStateCursor
4
5__all__ = [
6    "AbstractConcurrentFileBasedCursor",
7    "FileBasedConcurrentCursor",
8    "FileBasedFinalStateCursor",
9]
22class AbstractConcurrentFileBasedCursor(Cursor, AbstractFileBasedCursor, ABC):
23    def __init__(self, *args: Any, **kwargs: Any) -> None:
24        pass
25
26    @property
27    @abstractmethod
28    def state(self) -> MutableMapping[str, Any]: ...
29
30    @abstractmethod
31    def observe(self, record: Record) -> None: ...
32
33    @abstractmethod
34    def close_partition(self, partition: Partition) -> None: ...
35
36    @abstractmethod
37    def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None: ...
38
39    @abstractmethod
40    def add_file(self, file: RemoteFile) -> None: ...
41
42    @abstractmethod
43    def get_files_to_sync(
44        self, all_files: Iterable[RemoteFile], logger: logging.Logger
45    ) -> Iterable[RemoteFile]: ...
46
47    @abstractmethod
48    def get_state(self) -> MutableMapping[str, Any]: ...
49
50    @abstractmethod
51    def set_initial_state(self, value: StreamState) -> None: ...
52
53    @abstractmethod
54    def get_start_time(self) -> datetime: ...
55
56    @abstractmethod
57    def emit_state_message(self) -> None: ...
58
59    @abstractmethod
60    def ensure_at_least_one_state_emitted(self) -> None: ...

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

state: MutableMapping[str, Any]
26    @property
27    @abstractmethod
28    def state(self) -> MutableMapping[str, Any]: ...
@abstractmethod
def observe(self, record: airbyte_cdk.Record) -> None:
30    @abstractmethod
31    def observe(self, record: Record) -> None: ...

Indicate to the cursor that the record has been emitted

@abstractmethod
def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
33    @abstractmethod
34    def close_partition(self, partition: Partition) -> None: ...

Indicate to the cursor that the partition has been successfully processed

@abstractmethod
def set_pending_partitions( self, partitions: List[airbyte_cdk.sources.file_based.stream.concurrent.adapters.FileBasedStreamPartition]) -> None:
36    @abstractmethod
37    def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None: ...
@abstractmethod
def add_file( self, file: airbyte_cdk.sources.file_based.RemoteFile) -> None:
39    @abstractmethod
40    def add_file(self, file: RemoteFile) -> None: ...

Add a file to the cursor. This method is called when a file is processed by the stream.

Parameters
  • file: The file to add
@abstractmethod
def get_files_to_sync( self, all_files: Iterable[airbyte_cdk.sources.file_based.RemoteFile], logger: logging.Logger) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
42    @abstractmethod
43    def get_files_to_sync(
44        self, all_files: Iterable[RemoteFile], logger: logging.Logger
45    ) -> Iterable[RemoteFile]: ...

Given the list of files in the source, return the files that should be synced.

Parameters
  • all_files: All files in the source
  • logger:
Returns

The files that should be synced

@abstractmethod
def get_state(self) -> MutableMapping[str, Any]:
47    @abstractmethod
48    def get_state(self) -> MutableMapping[str, Any]: ...

Get the state of the cursor.

@abstractmethod
def set_initial_state(self, value: MutableMapping[str, Any]) -> None:
50    @abstractmethod
51    def set_initial_state(self, value: StreamState) -> None: ...

Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.

Parameters
  • value: The stream state
@abstractmethod
def get_start_time(self) -> datetime.datetime:
53    @abstractmethod
54    def get_start_time(self) -> datetime: ...

Returns the start time of the current sync.

@abstractmethod
def emit_state_message(self) -> None:
56    @abstractmethod
57    def emit_state_message(self) -> None: ...
@abstractmethod
def ensure_at_least_one_state_emitted(self) -> None:
59    @abstractmethod
60    def ensure_at_least_one_state_emitted(self) -> None: ...

State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.

 31class FileBasedConcurrentCursor(AbstractConcurrentFileBasedCursor):
 32    CURSOR_FIELD = "_ab_source_file_last_modified"
 33    DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = (
 34        DefaultFileBasedCursor.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
 35    )
 36    DEFAULT_MAX_HISTORY_SIZE = 10_000
 37    DATE_TIME_FORMAT = DefaultFileBasedCursor.DATE_TIME_FORMAT
 38    zero_value = datetime.min
 39    zero_cursor_value = f"0001-01-01T00:00:00.000000Z_{_NULL_FILE}"
 40
 41    def __init__(
 42        self,
 43        stream_config: FileBasedStreamConfig,
 44        stream_name: str,
 45        stream_namespace: Optional[str],
 46        stream_state: MutableMapping[str, Any],
 47        message_repository: MessageRepository,
 48        connector_state_manager: ConnectorStateManager,
 49        cursor_field: CursorField,
 50    ) -> None:
 51        super().__init__()
 52        self._stream_name = stream_name
 53        self._stream_namespace = stream_namespace
 54        self._state = stream_state
 55        self._message_repository = message_repository
 56        self._connector_state_manager = connector_state_manager
 57        self._cursor_field = cursor_field
 58        self._time_window_if_history_is_full = timedelta(
 59            days=stream_config.days_to_sync_if_history_is_full
 60            or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
 61        )
 62        self._state_lock = RLock()
 63        self._pending_files_lock = RLock()
 64        self._pending_files: Optional[Dict[str, RemoteFile]] = None
 65        self._file_to_datetime_history = stream_state.get("history", {}) if stream_state else {}
 66        self._prev_cursor_value = self._compute_prev_sync_cursor(stream_state)
 67        self._sync_start = self._compute_start_time()
 68
 69    @property
 70    def state(self) -> MutableMapping[str, Any]:
 71        return self._state
 72
 73    def observe(self, record: Record) -> None:
 74        pass
 75
 76    def close_partition(self, partition: Partition) -> None:
 77        with self._pending_files_lock:
 78            if self._pending_files is None:
 79                raise RuntimeError(
 80                    "Expected pending partitions to be set but it was not. This is unexpected. Please contact Support."
 81                )
 82
 83    def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None:
 84        with self._pending_files_lock:
 85            self._pending_files = {}
 86            for partition in partitions:
 87                _slice = partition.to_slice()
 88                if _slice is None:
 89                    continue
 90                for file in _slice["files"]:
 91                    if file.uri in self._pending_files.keys():
 92                        raise RuntimeError(
 93                            f"Already found file {_slice} in pending files. This is unexpected. Please contact Support."
 94                        )
 95                self._pending_files.update({file.uri: file})
 96
 97    def _compute_prev_sync_cursor(self, value: Optional[StreamState]) -> Tuple[datetime, str]:
 98        if not value:
 99            return self.zero_value, ""
100        prev_cursor_str = value.get(self._cursor_field.cursor_field_key) or self.zero_cursor_value
101        # So if we see a cursor greater than the earliest file, it means that we have likely synced all files.
102        # However, we take the earliest file as the cursor value for the purpose of checking which files to
103        # sync, in case new files have been uploaded in the meantime.
104        # This should be very rare, as it would indicate a race condition where a file with an earlier
105        # last_modified time was uploaded after a file with a later last_modified time. Since last_modified
106        # represents the start time that the file was uploaded, we can usually expect that all previous
107        # files have already been uploaded. If that's the case, they'll be in history and we'll skip
108        # re-uploading them.
109        earliest_file_cursor_value = self._get_cursor_key_from_file(
110            self._compute_earliest_file_in_history()
111        )
112        cursor_str = min(prev_cursor_str, earliest_file_cursor_value)
113        cursor_dt, cursor_uri = cursor_str.split("_", 1)
114        return datetime.strptime(cursor_dt, self.DATE_TIME_FORMAT), cursor_uri
115
116    def _get_cursor_key_from_file(self, file: Optional[RemoteFile]) -> str:
117        if file:
118            return f"{datetime.strftime(file.last_modified, self.DATE_TIME_FORMAT)}_{file.uri}"
119        return self.zero_cursor_value
120
121    def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:
122        with self._state_lock:
123            if self._file_to_datetime_history:
124                filename, last_modified = min(
125                    self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0])
126                )
127                return RemoteFile(
128                    uri=filename,
129                    last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT),
130                )
131            else:
132                return None
133
134    def add_file(self, file: RemoteFile) -> None:
135        """
136        Add a file to the cursor. This method is called when a file is processed by the stream.
137        :param file: The file to add
138        """
139        if self._pending_files is None:
140            raise RuntimeError(
141                "Expected pending partitions to be set but it was not. This is unexpected. Please contact Support."
142            )
143        with self._pending_files_lock:
144            with self._state_lock:
145                if file.uri not in self._pending_files:
146                    self._message_repository.emit_message(
147                        AirbyteMessage(
148                            type=Type.LOG,
149                            log=AirbyteLogMessage(
150                                level=Level.WARN,
151                                message=f"The file {file.uri} was not found in the list of pending files. This is unexpected. Please contact Support",
152                            ),
153                        )
154                    )
155                else:
156                    self._pending_files.pop(file.uri)
157                self._file_to_datetime_history[file.uri] = file.last_modified.strftime(
158                    self.DATE_TIME_FORMAT
159                )
160                if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
161                    # Get the earliest file based on its last modified date and its uri
162                    oldest_file = self._compute_earliest_file_in_history()
163                    if oldest_file:
164                        del self._file_to_datetime_history[oldest_file.uri]
165                    else:
166                        raise Exception(
167                            "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
168                        )
169                self.emit_state_message()
170
171    def emit_state_message(self) -> None:
172        with self._state_lock:
173            new_state = self.get_state()
174            self._connector_state_manager.update_state_for_stream(
175                self._stream_name,
176                self._stream_namespace,
177                new_state,
178            )
179            state_message = self._connector_state_manager.create_state_message(
180                self._stream_name, self._stream_namespace
181            )
182            self._message_repository.emit_message(state_message)
183
184    def _get_new_cursor_value(self) -> str:
185        with self._pending_files_lock:
186            with self._state_lock:
187                if self._pending_files:
188                    # If there are partitions that haven't been synced, we don't know whether the files that have been synced
189                    # represent a contiguous region.
190                    # To avoid missing files, we only increment the cursor up to the oldest pending file, because we know
191                    # that all older files have been synced.
192                    return self._get_cursor_key_from_file(self._compute_earliest_pending_file())
193                elif self._file_to_datetime_history:
194                    # If all partitions have been synced, we know that the sync is up-to-date and so can advance
195                    # the cursor to the newest file in history.
196                    return self._get_cursor_key_from_file(self._compute_latest_file_in_history())
197                else:
198                    return f"{self.zero_value.strftime(self.DATE_TIME_FORMAT)}_"
199
200    def _compute_earliest_pending_file(self) -> Optional[RemoteFile]:
201        if self._pending_files:
202            return min(self._pending_files.values(), key=lambda x: x.last_modified)
203        else:
204            return None
205
206    def _compute_latest_file_in_history(self) -> Optional[RemoteFile]:
207        with self._state_lock:
208            if self._file_to_datetime_history:
209                filename, last_modified = max(
210                    self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0])
211                )
212                return RemoteFile(
213                    uri=filename,
214                    last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT),
215                )
216            else:
217                return None
218
219    def get_files_to_sync(
220        self, all_files: Iterable[RemoteFile], logger: logging.Logger
221    ) -> Iterable[RemoteFile]:
222        """
223        Given the list of files in the source, return the files that should be synced.
224        :param all_files: All files in the source
225        :param logger:
226        :return: The files that should be synced
227        """
228        with self._state_lock:
229            if self._is_history_full():
230                logger.warning(
231                    f"The state history is full. "
232                    f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
233                    f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
234                )
235            for f in all_files:
236                if self._should_sync_file(f, logger):
237                    yield f
238
239    def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool:
240        with self._state_lock:
241            if file.uri in self._file_to_datetime_history:
242                # If the file's uri is in the history, we should sync the file if it has been modified since it was synced
243                updated_at_from_history = datetime.strptime(
244                    self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT
245                )
246                if file.last_modified < updated_at_from_history:
247                    self._message_repository.emit_message(
248                        AirbyteMessage(
249                            type=Type.LOG,
250                            log=AirbyteLogMessage(
251                                level=Level.WARN,
252                                message=f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file.",
253                            ),
254                        )
255                    )
256                    return False
257                else:
258                    return file.last_modified > updated_at_from_history
259
260            prev_cursor_timestamp, prev_cursor_uri = self._prev_cursor_value
261            if self._is_history_full():
262                if file.last_modified > prev_cursor_timestamp:
263                    # If the history is partial and the file's datetime is strictly greater than the cursor, we should sync it
264                    return True
265                elif file.last_modified == prev_cursor_timestamp:
266                    # If the history is partial and the file's datetime is equal to the earliest file in the history,
267                    # we should sync it if its uri is greater than or equal to the cursor value.
268                    return file.uri > prev_cursor_uri
269                else:
270                    return file.last_modified >= self._sync_start
271            else:
272                # The file is not in the history and the history is complete. We know we need to sync the file
273                return True
274
275    def _is_history_full(self) -> bool:
276        """
277        Returns true if the state's history is full, meaning new entries will start to replace old entries.
278        """
279        with self._state_lock:
280            if self._file_to_datetime_history is None:
281                raise RuntimeError(
282                    "The history object has not been set. This is unexpected. Please contact Support."
283                )
284            return len(self._file_to_datetime_history) >= self.DEFAULT_MAX_HISTORY_SIZE
285
286    def _compute_start_time(self) -> datetime:
287        if not self._file_to_datetime_history:
288            return datetime.min
289        else:
290            earliest = min(self._file_to_datetime_history.values())
291            earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT)
292            if self._is_history_full():
293                time_window = datetime.now() - self._time_window_if_history_is_full
294                earliest_dt = min(earliest_dt, time_window)
295            return earliest_dt
296
297    def get_start_time(self) -> datetime:
298        return self._sync_start
299
300    def get_state(self) -> MutableMapping[str, Any]:
301        """
302        Get the state of the cursor.
303        """
304        with self._state_lock:
305            return {
306                "history": self._file_to_datetime_history,
307                self._cursor_field.cursor_field_key: self._get_new_cursor_value(),
308            }
309
310    def set_initial_state(self, value: StreamState) -> None:
311        pass
312
313    def ensure_at_least_one_state_emitted(self) -> None:
314        self.emit_state_message()

Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.

FileBasedConcurrentCursor( stream_config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, stream_name: str, stream_namespace: Optional[str], stream_state: MutableMapping[str, Any], message_repository: airbyte_cdk.MessageRepository, connector_state_manager: airbyte_cdk.ConnectorStateManager, cursor_field: airbyte_cdk.CursorField)
41    def __init__(
42        self,
43        stream_config: FileBasedStreamConfig,
44        stream_name: str,
45        stream_namespace: Optional[str],
46        stream_state: MutableMapping[str, Any],
47        message_repository: MessageRepository,
48        connector_state_manager: ConnectorStateManager,
49        cursor_field: CursorField,
50    ) -> None:
51        super().__init__()
52        self._stream_name = stream_name
53        self._stream_namespace = stream_namespace
54        self._state = stream_state
55        self._message_repository = message_repository
56        self._connector_state_manager = connector_state_manager
57        self._cursor_field = cursor_field
58        self._time_window_if_history_is_full = timedelta(
59            days=stream_config.days_to_sync_if_history_is_full
60            or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
61        )
62        self._state_lock = RLock()
63        self._pending_files_lock = RLock()
64        self._pending_files: Optional[Dict[str, RemoteFile]] = None
65        self._file_to_datetime_history = stream_state.get("history", {}) if stream_state else {}
66        self._prev_cursor_value = self._compute_prev_sync_cursor(stream_state)
67        self._sync_start = self._compute_start_time()

Common interface for all cursors.

CURSOR_FIELD = '_ab_source_file_last_modified'
DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
DEFAULT_MAX_HISTORY_SIZE = 10000
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
zero_value = datetime.datetime(1, 1, 1, 0, 0)
zero_cursor_value = '0001-01-01T00:00:00.000000Z_'
state: MutableMapping[str, Any]
69    @property
70    def state(self) -> MutableMapping[str, Any]:
71        return self._state
def observe(self, record: airbyte_cdk.Record) -> None:
73    def observe(self, record: Record) -> None:
74        pass

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
76    def close_partition(self, partition: Partition) -> None:
77        with self._pending_files_lock:
78            if self._pending_files is None:
79                raise RuntimeError(
80                    "Expected pending partitions to be set but it was not. This is unexpected. Please contact Support."
81                )

Indicate to the cursor that the partition has been successfully processed

def set_pending_partitions( self, partitions: List[airbyte_cdk.sources.file_based.stream.concurrent.adapters.FileBasedStreamPartition]) -> None:
83    def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None:
84        with self._pending_files_lock:
85            self._pending_files = {}
86            for partition in partitions:
87                _slice = partition.to_slice()
88                if _slice is None:
89                    continue
90                for file in _slice["files"]:
91                    if file.uri in self._pending_files.keys():
92                        raise RuntimeError(
93                            f"Already found file {_slice} in pending files. This is unexpected. Please contact Support."
94                        )
95                self._pending_files.update({file.uri: file})
def add_file( self, file: airbyte_cdk.sources.file_based.RemoteFile) -> None:
134    def add_file(self, file: RemoteFile) -> None:
135        """
136        Add a file to the cursor. This method is called when a file is processed by the stream.
137        :param file: The file to add
138        """
139        if self._pending_files is None:
140            raise RuntimeError(
141                "Expected pending partitions to be set but it was not. This is unexpected. Please contact Support."
142            )
143        with self._pending_files_lock:
144            with self._state_lock:
145                if file.uri not in self._pending_files:
146                    self._message_repository.emit_message(
147                        AirbyteMessage(
148                            type=Type.LOG,
149                            log=AirbyteLogMessage(
150                                level=Level.WARN,
151                                message=f"The file {file.uri} was not found in the list of pending files. This is unexpected. Please contact Support",
152                            ),
153                        )
154                    )
155                else:
156                    self._pending_files.pop(file.uri)
157                self._file_to_datetime_history[file.uri] = file.last_modified.strftime(
158                    self.DATE_TIME_FORMAT
159                )
160                if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
161                    # Get the earliest file based on its last modified date and its uri
162                    oldest_file = self._compute_earliest_file_in_history()
163                    if oldest_file:
164                        del self._file_to_datetime_history[oldest_file.uri]
165                    else:
166                        raise Exception(
167                            "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
168                        )
169                self.emit_state_message()

Add a file to the cursor. This method is called when a file is processed by the stream.

Parameters
  • file: The file to add
def emit_state_message(self) -> None:
171    def emit_state_message(self) -> None:
172        with self._state_lock:
173            new_state = self.get_state()
174            self._connector_state_manager.update_state_for_stream(
175                self._stream_name,
176                self._stream_namespace,
177                new_state,
178            )
179            state_message = self._connector_state_manager.create_state_message(
180                self._stream_name, self._stream_namespace
181            )
182            self._message_repository.emit_message(state_message)
def get_files_to_sync( self, all_files: Iterable[airbyte_cdk.sources.file_based.RemoteFile], logger: logging.Logger) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
219    def get_files_to_sync(
220        self, all_files: Iterable[RemoteFile], logger: logging.Logger
221    ) -> Iterable[RemoteFile]:
222        """
223        Given the list of files in the source, return the files that should be synced.
224        :param all_files: All files in the source
225        :param logger:
226        :return: The files that should be synced
227        """
228        with self._state_lock:
229            if self._is_history_full():
230                logger.warning(
231                    f"The state history is full. "
232                    f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
233                    f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
234                )
235            for f in all_files:
236                if self._should_sync_file(f, logger):
237                    yield f

Given the list of files in the source, return the files that should be synced.

Parameters
  • all_files: All files in the source
  • logger:
Returns

The files that should be synced

def get_start_time(self) -> datetime.datetime:
297    def get_start_time(self) -> datetime:
298        return self._sync_start

Returns the start time of the current sync.

def get_state(self) -> MutableMapping[str, Any]:
300    def get_state(self) -> MutableMapping[str, Any]:
301        """
302        Get the state of the cursor.
303        """
304        with self._state_lock:
305            return {
306                "history": self._file_to_datetime_history,
307                self._cursor_field.cursor_field_key: self._get_new_cursor_value(),
308            }

Get the state of the cursor.

def set_initial_state(self, value: MutableMapping[str, Any]) -> None:
310    def set_initial_state(self, value: StreamState) -> None:
311        pass

Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.

Parameters
  • value: The stream state
def ensure_at_least_one_state_emitted(self) -> None:
313    def ensure_at_least_one_state_emitted(self) -> None:
314        self.emit_state_message()

State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.

26class FileBasedFinalStateCursor(AbstractConcurrentFileBasedCursor):
27    """Cursor that is used to guarantee at least one state message is emitted for a concurrent file-based stream."""
28
29    def __init__(
30        self,
31        stream_config: FileBasedStreamConfig,
32        message_repository: MessageRepository,
33        stream_namespace: Optional[str],
34        **kwargs: Any,
35    ):
36        self._stream_name = stream_config.name
37        self._stream_namespace = stream_namespace
38        self._message_repository = message_repository
39        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
40        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
41        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
42        self._connector_state_manager = ConnectorStateManager()
43
44    @property
45    def state(self) -> MutableMapping[str, Any]:
46        return {NO_CURSOR_STATE_KEY: True}
47
48    def observe(self, record: Record) -> None:
49        pass
50
51    def close_partition(self, partition: Partition) -> None:
52        pass
53
54    def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None:
55        pass
56
57    def add_file(self, file: RemoteFile) -> None:
58        pass
59
60    def get_files_to_sync(
61        self, all_files: Iterable[RemoteFile], logger: logging.Logger
62    ) -> Iterable[RemoteFile]:
63        return all_files
64
65    def get_state(self) -> MutableMapping[str, Any]:
66        return {}
67
68    def set_initial_state(self, value: StreamState) -> None:
69        return None
70
71    def get_start_time(self) -> datetime:
72        return datetime.min
73
74    def emit_state_message(self) -> None:
75        pass
76
77    def ensure_at_least_one_state_emitted(self) -> None:
78        self._connector_state_manager.update_state_for_stream(
79            self._stream_name, self._stream_namespace, self.state
80        )
81        state_message = self._connector_state_manager.create_state_message(
82            self._stream_name, self._stream_namespace
83        )
84        self._message_repository.emit_message(state_message)

Cursor that is used to guarantee at least one state message is emitted for a concurrent file-based stream.

FileBasedFinalStateCursor( stream_config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, message_repository: airbyte_cdk.MessageRepository, stream_namespace: Optional[str], **kwargs: Any)
29    def __init__(
30        self,
31        stream_config: FileBasedStreamConfig,
32        message_repository: MessageRepository,
33        stream_namespace: Optional[str],
34        **kwargs: Any,
35    ):
36        self._stream_name = stream_config.name
37        self._stream_namespace = stream_namespace
38        self._message_repository = message_repository
39        # Normally the connector state manager operates at the source-level. However, we only need it to write the sentinel
40        # state message rather than manage overall source state. This is also only temporary as we move to the resumable
41        # full refresh world where every stream uses a FileBasedConcurrentCursor with incremental state.
42        self._connector_state_manager = ConnectorStateManager()

Common interface for all cursors.

state: MutableMapping[str, Any]
44    @property
45    def state(self) -> MutableMapping[str, Any]:
46        return {NO_CURSOR_STATE_KEY: True}
def observe(self, record: airbyte_cdk.Record) -> None:
48    def observe(self, record: Record) -> None:
49        pass

Indicate to the cursor that the record has been emitted

def close_partition( self, partition: airbyte_cdk.sources.streams.concurrent.partitions.partition.Partition) -> None:
51    def close_partition(self, partition: Partition) -> None:
52        pass

Indicate to the cursor that the partition has been successfully processed

def set_pending_partitions( self, partitions: List[airbyte_cdk.sources.file_based.stream.concurrent.adapters.FileBasedStreamPartition]) -> None:
54    def set_pending_partitions(self, partitions: List["FileBasedStreamPartition"]) -> None:
55        pass
def add_file( self, file: airbyte_cdk.sources.file_based.RemoteFile) -> None:
57    def add_file(self, file: RemoteFile) -> None:
58        pass

Add a file to the cursor. This method is called when a file is processed by the stream.

Parameters
  • file: The file to add
def get_files_to_sync( self, all_files: Iterable[airbyte_cdk.sources.file_based.RemoteFile], logger: logging.Logger) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
60    def get_files_to_sync(
61        self, all_files: Iterable[RemoteFile], logger: logging.Logger
62    ) -> Iterable[RemoteFile]:
63        return all_files

Given the list of files in the source, return the files that should be synced.

Parameters
  • all_files: All files in the source
  • logger:
Returns

The files that should be synced

def get_state(self) -> MutableMapping[str, Any]:
65    def get_state(self) -> MutableMapping[str, Any]:
66        return {}

Get the state of the cursor.

def set_initial_state(self, value: MutableMapping[str, Any]) -> None:
68    def set_initial_state(self, value: StreamState) -> None:
69        return None

Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.

Parameters
  • value: The stream state
def get_start_time(self) -> datetime.datetime:
71    def get_start_time(self) -> datetime:
72        return datetime.min

Returns the start time of the current sync.

def emit_state_message(self) -> None:
74    def emit_state_message(self) -> None:
75        pass
def ensure_at_least_one_state_emitted(self) -> None:
77    def ensure_at_least_one_state_emitted(self) -> None:
78        self._connector_state_manager.update_state_for_stream(
79            self._stream_name, self._stream_namespace, self.state
80        )
81        state_message = self._connector_state_manager.create_state_message(
82            self._stream_name, self._stream_namespace
83        )
84        self._message_repository.emit_message(state_message)

State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called.