airbyte_cdk.sources.file_based.stream.cursor

1from .abstract_file_based_cursor import AbstractFileBasedCursor
2from .default_file_based_cursor import DefaultFileBasedCursor
3
4__all__ = ["AbstractFileBasedCursor", "DefaultFileBasedCursor"]
class AbstractFileBasedCursor(abc.ABC):
16class AbstractFileBasedCursor(ABC):
17    """
18    Abstract base class for cursors used by file-based streams.
19    """
20
21    @abstractmethod
22    def __init__(self, stream_config: FileBasedStreamConfig, **kwargs: Any):
23        """
24        Common interface for all cursors.
25        """
26        ...
27
28    @abstractmethod
29    def add_file(self, file: RemoteFile) -> None:
30        """
31        Add a file to the cursor. This method is called when a file is processed by the stream.
32        :param file: The file to add
33        """
34        ...
35
36    @abstractmethod
37    def set_initial_state(self, value: StreamState) -> None:
38        """
39        Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.
40        :param value: The stream state
41        """
42
43    @abstractmethod
44    def get_state(self) -> MutableMapping[str, Any]:
45        """
46        Get the state of the cursor.
47        """
48        ...
49
50    @abstractmethod
51    def get_start_time(self) -> datetime:
52        """
53        Returns the start time of the current sync.
54        """
55        ...
56
57    @abstractmethod
58    def get_files_to_sync(
59        self, all_files: Iterable[RemoteFile], logger: logging.Logger
60    ) -> Iterable[RemoteFile]:
61        """
62        Given the list of files in the source, return the files that should be synced.
63        :param all_files: All files in the source
64        :param logger:
65        :return: The files that should be synced
66        """
67        ...

Abstract base class for cursors used by file-based streams.

@abstractmethod
AbstractFileBasedCursor( stream_config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, **kwargs: Any)
21    @abstractmethod
22    def __init__(self, stream_config: FileBasedStreamConfig, **kwargs: Any):
23        """
24        Common interface for all cursors.
25        """
26        ...

Common interface for all cursors.

@abstractmethod
def add_file( self, file: airbyte_cdk.sources.file_based.RemoteFile) -> None:
28    @abstractmethod
29    def add_file(self, file: RemoteFile) -> None:
30        """
31        Add a file to the cursor. This method is called when a file is processed by the stream.
32        :param file: The file to add
33        """
34        ...

Add a file to the cursor. This method is called when a file is processed by the stream.

Parameters
  • file: The file to add
@abstractmethod
def set_initial_state(self, value: MutableMapping[str, Any]) -> None:
36    @abstractmethod
37    def set_initial_state(self, value: StreamState) -> None:
38        """
39        Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.
40        :param value: The stream state
41        """

Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.

Parameters
  • value: The stream state
@abstractmethod
def get_state(self) -> MutableMapping[str, Any]:
43    @abstractmethod
44    def get_state(self) -> MutableMapping[str, Any]:
45        """
46        Get the state of the cursor.
47        """
48        ...

Get the state of the cursor.

@abstractmethod
def get_start_time(self) -> datetime.datetime:
50    @abstractmethod
51    def get_start_time(self) -> datetime:
52        """
53        Returns the start time of the current sync.
54        """
55        ...

Returns the start time of the current sync.

@abstractmethod
def get_files_to_sync( self, all_files: Iterable[airbyte_cdk.sources.file_based.RemoteFile], logger: logging.Logger) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
57    @abstractmethod
58    def get_files_to_sync(
59        self, all_files: Iterable[RemoteFile], logger: logging.Logger
60    ) -> Iterable[RemoteFile]:
61        """
62        Given the list of files in the source, return the files that should be synced.
63        :param all_files: All files in the source
64        :param logger:
65        :return: The files that should be synced
66        """
67        ...

Given the list of files in the source, return the files that should be synced.

Parameters
  • all_files: All files in the source
  • logger:
Returns

The files that should be synced

 18class DefaultFileBasedCursor(AbstractFileBasedCursor):
 19    DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
 20    DEFAULT_MAX_HISTORY_SIZE = 10_000
 21    DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
 22    CURSOR_FIELD = "_ab_source_file_last_modified"
 23
 24    def __init__(self, stream_config: FileBasedStreamConfig, **_: Any):
 25        super().__init__(stream_config)  # type: ignore [safe-super]
 26        self._file_to_datetime_history: MutableMapping[str, str] = {}
 27        self._time_window_if_history_is_full = timedelta(
 28            days=stream_config.days_to_sync_if_history_is_full
 29            or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
 30        )
 31
 32        if self._time_window_if_history_is_full <= timedelta():
 33            raise ValueError(
 34                f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}"
 35            )
 36
 37        self._start_time = self._compute_start_time()
 38        self._initial_earliest_file_in_history: Optional[RemoteFile] = None
 39
 40    def set_initial_state(self, value: StreamState) -> None:
 41        self._file_to_datetime_history = value.get("history", {})
 42        self._start_time = self._compute_start_time()
 43        self._initial_earliest_file_in_history = self._compute_earliest_file_in_history()
 44
 45    def add_file(self, file: RemoteFile) -> None:
 46        self._file_to_datetime_history[file.uri] = file.last_modified.strftime(
 47            self.DATE_TIME_FORMAT
 48        )
 49        if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
 50            # Get the earliest file based on its last modified date and its uri
 51            oldest_file = self._compute_earliest_file_in_history()
 52            if oldest_file:
 53                del self._file_to_datetime_history[oldest_file.uri]
 54            else:
 55                raise Exception(
 56                    "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
 57                )
 58
 59    def get_state(self) -> StreamState:
 60        state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()}
 61        return state
 62
 63    def _get_cursor(self) -> Optional[str]:
 64        """
 65        Returns the cursor value.
 66
 67        Files are synced in order of last-modified with secondary sort on filename, so the cursor value is
 68        a string joining the last-modified timestamp of the last synced file and the name of the file.
 69        """
 70        if self._file_to_datetime_history.items():
 71            filename, timestamp = max(
 72                self._file_to_datetime_history.items(), key=lambda x: (x[1], x[0])
 73            )
 74            return f"{timestamp}_{filename}"
 75        return None
 76
 77    def _is_history_full(self) -> bool:
 78        """
 79        Returns true if the state's history is full, meaning new entries will start to replace old entries.
 80        """
 81        return len(self._file_to_datetime_history) >= self.DEFAULT_MAX_HISTORY_SIZE
 82
 83    def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool:
 84        if file.uri in self._file_to_datetime_history:
 85            # If the file's uri is in the history, we should sync the file if it has been modified since it was synced
 86            updated_at_from_history = datetime.strptime(
 87                self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT
 88            )
 89            if file.last_modified < updated_at_from_history:
 90                logger.warning(
 91                    f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file."
 92                )
 93            else:
 94                return file.last_modified > updated_at_from_history
 95            return file.last_modified > updated_at_from_history
 96        if self._is_history_full():
 97            if self._initial_earliest_file_in_history is None:
 98                return True
 99            if file.last_modified > self._initial_earliest_file_in_history.last_modified:
100                # If the history is partial and the file's datetime is strictly greater than the earliest file in the history,
101                # we should sync it
102                return True
103            elif file.last_modified == self._initial_earliest_file_in_history.last_modified:
104                # If the history is partial and the file's datetime is equal to the earliest file in the history,
105                # we should sync it if its uri is strictly greater than the earliest file in the history
106                return file.uri > self._initial_earliest_file_in_history.uri
107            else:
108                # Otherwise, only sync the file if it has been modified since the start of the time window
109                return file.last_modified >= self.get_start_time()
110        else:
111            # The file is not in the history and the history is complete. We know we need to sync the file
112            return True
113
114    def get_files_to_sync(
115        self, all_files: Iterable[RemoteFile], logger: logging.Logger
116    ) -> Iterable[RemoteFile]:
117        if self._is_history_full():
118            logger.warning(
119                f"The state history is full. "
120                f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
121                f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
122            )
123        for f in all_files:
124            if self._should_sync_file(f, logger):
125                yield f
126
127    def get_start_time(self) -> datetime:
128        return self._start_time
129
130    def _compute_earliest_file_in_history(self) -> Optional[RemoteFile]:
131        if self._file_to_datetime_history:
132            filename, last_modified = min(
133                self._file_to_datetime_history.items(), key=lambda f: (f[1], f[0])
134            )
135            return RemoteFile(
136                uri=filename, last_modified=datetime.strptime(last_modified, self.DATE_TIME_FORMAT)
137            )
138        else:
139            return None
140
141    def _compute_start_time(self) -> datetime:
142        if not self._file_to_datetime_history:
143            return datetime.min
144        else:
145            earliest = min(self._file_to_datetime_history.values())
146            earliest_dt = datetime.strptime(earliest, self.DATE_TIME_FORMAT)
147            if self._is_history_full():
148                time_window = datetime.now() - self._time_window_if_history_is_full
149                earliest_dt = min(earliest_dt, time_window)
150            return earliest_dt

Abstract base class for cursors used by file-based streams.

DefaultFileBasedCursor( stream_config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, **_: Any)
24    def __init__(self, stream_config: FileBasedStreamConfig, **_: Any):
25        super().__init__(stream_config)  # type: ignore [safe-super]
26        self._file_to_datetime_history: MutableMapping[str, str] = {}
27        self._time_window_if_history_is_full = timedelta(
28            days=stream_config.days_to_sync_if_history_is_full
29            or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL
30        )
31
32        if self._time_window_if_history_is_full <= timedelta():
33            raise ValueError(
34                f"days_to_sync_if_history_is_full must be a positive timedelta, got {self._time_window_if_history_is_full}"
35            )
36
37        self._start_time = self._compute_start_time()
38        self._initial_earliest_file_in_history: Optional[RemoteFile] = None

Common interface for all cursors.

DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = 3
DEFAULT_MAX_HISTORY_SIZE = 10000
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
CURSOR_FIELD = '_ab_source_file_last_modified'
def set_initial_state(self, value: MutableMapping[str, Any]) -> None:
40    def set_initial_state(self, value: StreamState) -> None:
41        self._file_to_datetime_history = value.get("history", {})
42        self._start_time = self._compute_start_time()
43        self._initial_earliest_file_in_history = self._compute_earliest_file_in_history()

Set the initial state of the cursor. The cursor cannot be initialized at construction time because the stream doesn't know its state yet.

Parameters
  • value: The stream state
def add_file( self, file: airbyte_cdk.sources.file_based.RemoteFile) -> None:
45    def add_file(self, file: RemoteFile) -> None:
46        self._file_to_datetime_history[file.uri] = file.last_modified.strftime(
47            self.DATE_TIME_FORMAT
48        )
49        if len(self._file_to_datetime_history) > self.DEFAULT_MAX_HISTORY_SIZE:
50            # Get the earliest file based on its last modified date and its uri
51            oldest_file = self._compute_earliest_file_in_history()
52            if oldest_file:
53                del self._file_to_datetime_history[oldest_file.uri]
54            else:
55                raise Exception(
56                    "The history is full but there is no files in the history. This should never happen and might be indicative of a bug in the CDK."
57                )

Add a file to the cursor. This method is called when a file is processed by the stream.

Parameters
  • file: The file to add
def get_state(self) -> MutableMapping[str, Any]:
59    def get_state(self) -> StreamState:
60        state = {"history": self._file_to_datetime_history, self.CURSOR_FIELD: self._get_cursor()}
61        return state

Get the state of the cursor.

def get_files_to_sync( self, all_files: Iterable[airbyte_cdk.sources.file_based.RemoteFile], logger: logging.Logger) -> Iterable[airbyte_cdk.sources.file_based.RemoteFile]:
114    def get_files_to_sync(
115        self, all_files: Iterable[RemoteFile], logger: logging.Logger
116    ) -> Iterable[RemoteFile]:
117        if self._is_history_full():
118            logger.warning(
119                f"The state history is full. "
120                f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
121                f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
122            )
123        for f in all_files:
124            if self._should_sync_file(f, logger):
125                yield f

Given the list of files in the source, return the files that should be synced.

Parameters
  • all_files: All files in the source
  • logger:
Returns

The files that should be synced

def get_start_time(self) -> datetime.datetime:
127    def get_start_time(self) -> datetime:
128        return self._start_time

Returns the start time of the current sync.