airbyte_cdk.legacy.sources.declarative.incremental
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import ( 4 DatetimeBasedCursor, 5) 6from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor 7from airbyte_cdk.legacy.sources.declarative.incremental.global_substream_cursor import ( 8 GlobalSubstreamCursor, 9) 10from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( 11 CursorFactory, 12 PerPartitionCursor, 13) 14from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_with_global import ( 15 PerPartitionWithGlobalCursor, 16) 17from airbyte_cdk.legacy.sources.declarative.incremental.resumable_full_refresh_cursor import ( 18 ChildPartitionResumableFullRefreshCursor, 19 ResumableFullRefreshCursor, 20) 21 22__all__ = [ 23 "CursorFactory", 24 "DatetimeBasedCursor", 25 "DeclarativeCursor", 26 "GlobalSubstreamCursor", 27 "PerPartitionCursor", 28 "PerPartitionWithGlobalCursor", 29 "ResumableFullRefreshCursor", 30 "ChildPartitionResumableFullRefreshCursor", 31]
20class CursorFactory: 21 def __init__(self, create_function: Callable[[], DeclarativeCursor]): 22 self._create_function = create_function 23 24 def create(self) -> DeclarativeCursor: 25 return self._create_function()
28@dataclass 29class DatetimeBasedCursor(DeclarativeCursor): 30 """ 31 Slices the stream over a datetime range and create a state with format {<cursor_field>: <datetime> } 32 33 Given a start time, end time, a step function, and an optional lookback window, 34 the stream slicer will partition the date range from start time - lookback window to end time. 35 36 The step function is defined as a string of the form ISO8601 duration 37 38 The timestamp format accepts the same format codes as datetime.strfptime, which are 39 all the format codes required by the 1989 C standard. 40 Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html 41 42 Attributes: 43 start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced 44 end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced 45 cursor_field (Union[InterpolatedString, str]): record's cursor field 46 datetime_format (str): format of the datetime 47 step (Optional[str]): size of the timewindow (ISO8601 duration) 48 cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one 49 config (Config): connection config 50 start_time_option (Optional[RequestOption]): request option for start time 51 end_time_option (Optional[RequestOption]): request option for end time 52 partition_field_start (Optional[str]): partition start time field 53 partition_field_end (Optional[str]): stream slice end time field 54 lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration) 55 """ 56 57 start_datetime: Union[MinMaxDatetime, str] 58 cursor_field: Union[InterpolatedString, str] 59 datetime_format: str 60 config: Config 61 parameters: InitVar[Mapping[str, Any]] 62 _highest_observed_cursor_field_value: Optional[str] = field( 63 repr=False, default=None 64 ) # tracks the latest observed datetime, which may not be safe to emit in the case of out-of-order records 65 _cursor: Optional[str] = field( 66 repr=False, default=None 67 ) # tracks the latest observed datetime that is appropriate to emit as stream state 68 end_datetime: Optional[Union[MinMaxDatetime, str]] = None 69 step: Optional[Union[InterpolatedString, str]] = None 70 cursor_granularity: Optional[str] = None 71 start_time_option: Optional[RequestOption] = None 72 end_time_option: Optional[RequestOption] = None 73 partition_field_start: Optional[str] = None 74 partition_field_end: Optional[str] = None 75 lookback_window: Optional[Union[InterpolatedString, str]] = None 76 message_repository: Optional[MessageRepository] = None 77 is_compare_strictly: Optional[bool] = False 78 cursor_datetime_formats: List[str] = field(default_factory=lambda: []) 79 80 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 81 if (self.step and not self.cursor_granularity) or ( 82 not self.step and self.cursor_granularity 83 ): 84 raise ValueError( 85 f"If step is defined, cursor_granularity should be as well and vice-versa. " 86 f"Right now, step is `{self.step}` and cursor_granularity is `{self.cursor_granularity}`" 87 ) 88 self._start_datetime = MinMaxDatetime.create(self.start_datetime, parameters) 89 self._end_datetime = ( 90 None if not self.end_datetime else MinMaxDatetime.create(self.end_datetime, parameters) 91 ) 92 93 self._timezone = datetime.timezone.utc 94 self._interpolation = JinjaInterpolation() 95 96 self._step = ( 97 self._parse_timedelta( 98 InterpolatedString.create(self.step, parameters=parameters).eval(self.config) 99 ) 100 if self.step 101 else datetime.timedelta.max 102 ) 103 self._cursor_granularity = self._parse_timedelta(self.cursor_granularity) 104 self.cursor_field = InterpolatedString.create(self.cursor_field, parameters=parameters) 105 self._lookback_window = ( 106 InterpolatedString.create(self.lookback_window, parameters=parameters) 107 if self.lookback_window 108 else None 109 ) 110 self._partition_field_start = InterpolatedString.create( 111 self.partition_field_start or "start_time", parameters=parameters 112 ) 113 self._partition_field_end = InterpolatedString.create( 114 self.partition_field_end or "end_time", parameters=parameters 115 ) 116 self._parser = DatetimeParser() 117 118 # If datetime format is not specified then start/end datetime should inherit it from the stream slicer 119 if not self._start_datetime.datetime_format: 120 self._start_datetime.datetime_format = self.datetime_format 121 if self._end_datetime and not self._end_datetime.datetime_format: 122 self._end_datetime.datetime_format = self.datetime_format 123 124 if not self.cursor_datetime_formats: 125 self.cursor_datetime_formats = [self.datetime_format] 126 127 _validate_component_request_option_paths( 128 self.config, self.start_time_option, self.end_time_option 129 ) 130 131 def get_stream_state(self) -> StreamState: 132 return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 133 134 def set_initial_state(self, stream_state: StreamState) -> None: 135 """ 136 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 137 before calling anything else 138 139 :param stream_state: The state of the stream as returned by get_stream_state 140 """ 141 self._cursor = ( 142 stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore [union-attr] 143 ) 144 145 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 146 """ 147 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 148 149 :param stream_slice: The current slice, which may or may not contain the most recently observed record 150 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 151 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 152 """ 153 record_cursor_value = record.get(self.cursor_field.eval(self.config)) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 154 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 155 if not record_cursor_value: 156 return 157 158 start_field = self._partition_field_start.eval(self.config) 159 end_field = self._partition_field_end.eval(self.config) 160 is_highest_observed_cursor_value = ( 161 not self._highest_observed_cursor_field_value 162 or self.parse_date(record_cursor_value) 163 > self.parse_date(self._highest_observed_cursor_field_value) 164 ) 165 if ( 166 self._is_within_daterange_boundaries( 167 record, 168 stream_slice.get(start_field), # type: ignore [arg-type] 169 stream_slice.get(end_field), # type: ignore [arg-type] 170 ) 171 and is_highest_observed_cursor_value 172 ): 173 self._highest_observed_cursor_field_value = record_cursor_value 174 175 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 176 if stream_slice.partition: 177 raise ValueError( 178 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 179 ) 180 cursor_value_str_by_cursor_value_datetime = dict( 181 map( 182 # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like 183 # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z' 184 lambda datetime_str: (self.parse_date(datetime_str), datetime_str), # type: ignore # because of the filter on the next line, this will only be called with a str 185 filter( 186 lambda item: item, [self._cursor, self._highest_observed_cursor_field_value] 187 ), 188 ) 189 ) 190 self._cursor = ( 191 cursor_value_str_by_cursor_value_datetime[ 192 max(cursor_value_str_by_cursor_value_datetime.keys()) 193 ] 194 if cursor_value_str_by_cursor_value_datetime 195 else None 196 ) 197 198 def stream_slices(self) -> Iterable[StreamSlice]: 199 """ 200 Partition the daterange into slices of size = step. 201 202 The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime 203 The end of the window is the minimum datetime between the start of the window and end_datetime. 204 205 :return: 206 """ 207 end_datetime = self.select_best_end_datetime() 208 start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime()) 209 return self._partition_daterange(start_datetime, end_datetime, self._step) 210 211 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 212 # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress 213 # through each slice and does not belong to a specific slice. We just return stream state as it is. 214 return self.get_stream_state() 215 216 def _calculate_earliest_possible_value( 217 self, end_datetime: datetime.datetime 218 ) -> datetime.datetime: 219 lookback_delta = self._parse_timedelta( 220 self._lookback_window.eval(self.config) if self._lookback_window else "P0D" 221 ) 222 earliest_possible_start_datetime = min( 223 self._start_datetime.get_datetime(self.config), end_datetime 224 ) 225 try: 226 cursor_datetime = ( 227 self._calculate_cursor_datetime_from_state(self.get_stream_state()) - lookback_delta 228 ) 229 except OverflowError: 230 # cursor_datetime defers to the minimum date if it does not exist in the state. Trying to subtract 231 # a timedelta from the minimum datetime results in an OverflowError 232 cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state()) 233 return max(earliest_possible_start_datetime, cursor_datetime) 234 235 def select_best_end_datetime(self) -> datetime.datetime: 236 """ 237 Returns the optimal end datetime. 238 This method compares the current datetime with a pre-configured end datetime 239 and returns the earlier of the two. If no pre-configured end datetime is set, 240 the current datetime is returned. 241 242 :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier. 243 """ 244 now = datetime.datetime.now(tz=self._timezone) 245 if not self._end_datetime: 246 return now 247 return min(self._end_datetime.get_datetime(self.config), now) 248 249 def _calculate_cursor_datetime_from_state( 250 self, stream_state: Mapping[str, Any] 251 ) -> datetime.datetime: 252 if self.cursor_field.eval(self.config, stream_state=stream_state) in stream_state: # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 253 return self.parse_date(stream_state[self.cursor_field.eval(self.config)]) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 254 return datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) 255 256 def _format_datetime(self, dt: datetime.datetime) -> str: 257 return self._parser.format(dt, self.datetime_format) 258 259 def _partition_daterange( 260 self, 261 start: datetime.datetime, 262 end: datetime.datetime, 263 step: Union[datetime.timedelta, Duration], 264 ) -> List[StreamSlice]: 265 start_field = self._partition_field_start.eval(self.config) 266 end_field = self._partition_field_end.eval(self.config) 267 dates = [] 268 269 while self._is_within_date_range(start, end): 270 next_start = self._evaluate_next_start_date_safely(start, step) 271 end_date = self._get_date(next_start - self._cursor_granularity, end, min) 272 dates.append( 273 StreamSlice( 274 partition={}, 275 cursor_slice={ 276 start_field: self._format_datetime(start), 277 end_field: self._format_datetime(end_date), 278 }, 279 ) 280 ) 281 start = next_start 282 return dates 283 284 def _is_within_date_range(self, start: datetime.datetime, end: datetime.datetime) -> bool: 285 if self.is_compare_strictly: 286 return start < end 287 return start <= end 288 289 def _evaluate_next_start_date_safely( 290 self, start: datetime.datetime, step: datetime.timedelta 291 ) -> datetime.datetime: 292 """ 293 Given that we set the default step at datetime.timedelta.max, we will generate an OverflowError when evaluating the next start_date 294 This method assumes that users would never enter a step that would generate an overflow. Given that would be the case, the code 295 would have broken anyway. 296 """ 297 try: 298 return start + step 299 except OverflowError: 300 return datetime.datetime.max.replace(tzinfo=datetime.timezone.utc) 301 302 def _get_date( 303 self, 304 cursor_value: datetime.datetime, 305 default_date: datetime.datetime, 306 comparator: Callable[[datetime.datetime, datetime.datetime], datetime.datetime], 307 ) -> datetime.datetime: 308 cursor_date = cursor_value or default_date 309 return comparator(cursor_date, default_date) 310 311 def parse_date(self, date: str) -> datetime.datetime: 312 for datetime_format in self.cursor_datetime_formats + [self.datetime_format]: 313 try: 314 return self._parser.parse(date, datetime_format) 315 except ValueError: 316 pass 317 raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}") 318 319 @classmethod 320 def _parse_timedelta(cls, time_str: Optional[str]) -> Union[datetime.timedelta, Duration]: 321 """ 322 :return Parses an ISO 8601 durations into datetime.timedelta or Duration objects. 323 """ 324 if not time_str: 325 return datetime.timedelta(0) 326 return parse_duration(time_str) 327 328 def get_request_params( 329 self, 330 *, 331 stream_state: Optional[StreamState] = None, 332 stream_slice: Optional[StreamSlice] = None, 333 next_page_token: Optional[Mapping[str, Any]] = None, 334 ) -> Mapping[str, Any]: 335 return self._get_request_options(RequestOptionType.request_parameter, stream_slice) 336 337 def get_request_headers( 338 self, 339 *, 340 stream_state: Optional[StreamState] = None, 341 stream_slice: Optional[StreamSlice] = None, 342 next_page_token: Optional[Mapping[str, Any]] = None, 343 ) -> Mapping[str, Any]: 344 return self._get_request_options(RequestOptionType.header, stream_slice) 345 346 def get_request_body_data( 347 self, 348 *, 349 stream_state: Optional[StreamState] = None, 350 stream_slice: Optional[StreamSlice] = None, 351 next_page_token: Optional[Mapping[str, Any]] = None, 352 ) -> Mapping[str, Any]: 353 return self._get_request_options(RequestOptionType.body_data, stream_slice) 354 355 def get_request_body_json( 356 self, 357 *, 358 stream_state: Optional[StreamState] = None, 359 stream_slice: Optional[StreamSlice] = None, 360 next_page_token: Optional[Mapping[str, Any]] = None, 361 ) -> Mapping[str, Any]: 362 return self._get_request_options(RequestOptionType.body_json, stream_slice) 363 364 def request_kwargs(self) -> Mapping[str, Any]: 365 # Never update kwargs 366 return {} 367 368 def _get_request_options( 369 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 370 ) -> Mapping[str, Any]: 371 options: MutableMapping[str, Any] = {} 372 if not stream_slice: 373 return options 374 375 if self.start_time_option and self.start_time_option.inject_into == option_type: 376 start_time_value = stream_slice.get(self._partition_field_start.eval(self.config)) 377 self.start_time_option.inject_into_request(options, start_time_value, self.config) 378 379 if self.end_time_option and self.end_time_option.inject_into == option_type: 380 end_time_value = stream_slice.get(self._partition_field_end.eval(self.config)) 381 self.end_time_option.inject_into_request(options, end_time_value, self.config) 382 383 return options 384 385 def should_be_synced(self, record: Record) -> bool: 386 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 387 record_cursor_value = record.get(cursor_field) 388 if not record_cursor_value: 389 self._send_log( 390 Level.WARN, 391 f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced", 392 ) 393 return True 394 latest_possible_cursor_value = self.select_best_end_datetime() 395 earliest_possible_cursor_value = self._calculate_earliest_possible_value( 396 latest_possible_cursor_value 397 ) 398 return self._is_within_daterange_boundaries( 399 record, earliest_possible_cursor_value, latest_possible_cursor_value 400 ) 401 402 def _is_within_daterange_boundaries( 403 self, 404 record: Record, 405 start_datetime_boundary: Union[datetime.datetime, str], 406 end_datetime_boundary: Union[datetime.datetime, str], 407 ) -> bool: 408 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 409 record_cursor_value = record.get(cursor_field) 410 if not record_cursor_value: 411 self._send_log( 412 Level.WARN, 413 f"Could not find cursor field `{cursor_field}` in record. The record will not be considered when emitting sync state", 414 ) 415 return False 416 if isinstance(start_datetime_boundary, str): 417 start_datetime_boundary = self.parse_date(start_datetime_boundary) 418 if isinstance(end_datetime_boundary, str): 419 end_datetime_boundary = self.parse_date(end_datetime_boundary) 420 return ( 421 start_datetime_boundary <= self.parse_date(record_cursor_value) <= end_datetime_boundary 422 ) 423 424 def _send_log(self, level: Level, message: str) -> None: 425 if self.message_repository: 426 self.message_repository.emit_message( 427 AirbyteMessage( 428 type=Type.LOG, 429 log=AirbyteLogMessage(level=level, message=message), 430 ) 431 ) 432 433 def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None: 434 """ 435 Updates the lookback window based on a given number of seconds if the new duration 436 is greater than the currently configured lookback window. 437 438 :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to. 439 """ 440 runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds)) 441 config_lookback = parse_duration( 442 self._lookback_window.eval(self.config) if self._lookback_window else "P0D" 443 ) 444 445 # Check if the new runtime lookback window is greater than the current config lookback 446 if parse_duration(runtime_lookback_window) > config_lookback: 447 self._lookback_window = InterpolatedString.create( 448 runtime_lookback_window, parameters={} 449 )
Slices the stream over a datetime range and create a state with format {
Given a start time, end time, a step function, and an optional lookback window, the stream slicer will partition the date range from start time - lookback window to end time.
The step function is defined as a string of the form ISO8601 duration
The timestamp format accepts the same format codes as datetime.strfptime, which are all the format codes required by the 1989 C standard. Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html
Attributes:
- start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
- end_datetime (Optional[Union[MinMaxDatetime, str]]): the datetime that determines the last record that should be synced
- cursor_field (Union[InterpolatedString, str]): record's cursor field
- datetime_format (str): format of the datetime
- step (Optional[str]): size of the timewindow (ISO8601 duration)
- cursor_granularity (Optional[str]): smallest increment the datetime_format has (ISO 8601 duration) that will be used to ensure that the start of a slice does not overlap with the end of the previous one
- config (Config): connection config
- start_time_option (Optional[RequestOption]): request option for start time
- end_time_option (Optional[RequestOption]): request option for end time
- partition_field_start (Optional[str]): partition start time field
- partition_field_end (Optional[str]): stream slice end time field
- lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for (ISO8601 duration)
131 def get_stream_state(self) -> StreamState: 132 return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
134 def set_initial_state(self, stream_state: StreamState) -> None: 135 """ 136 Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called 137 before calling anything else 138 139 :param stream_state: The state of the stream as returned by get_stream_state 140 """ 141 self._cursor = ( 142 stream_state.get(self.cursor_field.eval(self.config)) if stream_state else None # type: ignore [union-attr] 143 )
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else
Parameters
- stream_state: The state of the stream as returned by get_stream_state
145 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 146 """ 147 Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read. 148 149 :param stream_slice: The current slice, which may or may not contain the most recently observed record 150 :param record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the 151 stream state may need to be deferred depending on whether the source reliably orders records by the cursor field. 152 """ 153 record_cursor_value = record.get(self.cursor_field.eval(self.config)) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 154 # if the current record has no cursor value, we cannot meaningfully update the state based on it, so there is nothing more to do 155 if not record_cursor_value: 156 return 157 158 start_field = self._partition_field_start.eval(self.config) 159 end_field = self._partition_field_end.eval(self.config) 160 is_highest_observed_cursor_value = ( 161 not self._highest_observed_cursor_field_value 162 or self.parse_date(record_cursor_value) 163 > self.parse_date(self._highest_observed_cursor_field_value) 164 ) 165 if ( 166 self._is_within_daterange_boundaries( 167 record, 168 stream_slice.get(start_field), # type: ignore [arg-type] 169 stream_slice.get(end_field), # type: ignore [arg-type] 170 ) 171 and is_highest_observed_cursor_value 172 ): 173 self._highest_observed_cursor_field_value = record_cursor_value
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
175 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 176 if stream_slice.partition: 177 raise ValueError( 178 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 179 ) 180 cursor_value_str_by_cursor_value_datetime = dict( 181 map( 182 # we need to ensure the cursor value is preserved as is in the state else the CATs might complain of something like 183 # 2023-01-04T17:30:19.000Z' <= '2023-01-04T17:30:19.000000Z' 184 lambda datetime_str: (self.parse_date(datetime_str), datetime_str), # type: ignore # because of the filter on the next line, this will only be called with a str 185 filter( 186 lambda item: item, [self._cursor, self._highest_observed_cursor_field_value] 187 ), 188 ) 189 ) 190 self._cursor = ( 191 cursor_value_str_by_cursor_value_datetime[ 192 max(cursor_value_str_by_cursor_value_datetime.keys()) 193 ] 194 if cursor_value_str_by_cursor_value_datetime 195 else None 196 )
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
198 def stream_slices(self) -> Iterable[StreamSlice]: 199 """ 200 Partition the daterange into slices of size = step. 201 202 The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime 203 The end of the window is the minimum datetime between the start of the window and end_datetime. 204 205 :return: 206 """ 207 end_datetime = self.select_best_end_datetime() 208 start_datetime = self._calculate_earliest_possible_value(self.select_best_end_datetime()) 209 return self._partition_daterange(start_datetime, end_datetime, self._step)
Partition the daterange into slices of size = step.
The start of the window is the minimum datetime between start_datetime - lookback_window and the stream_state's datetime The end of the window is the minimum datetime between the start of the window and end_datetime.
Returns
211 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 212 # Datetime based cursors operate over slices made up of datetime ranges. Stream state is based on the progress 213 # through each slice and does not belong to a specific slice. We just return stream state as it is. 214 return self.get_stream_state()
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
235 def select_best_end_datetime(self) -> datetime.datetime: 236 """ 237 Returns the optimal end datetime. 238 This method compares the current datetime with a pre-configured end datetime 239 and returns the earlier of the two. If no pre-configured end datetime is set, 240 the current datetime is returned. 241 242 :return datetime.datetime: The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier. 243 """ 244 now = datetime.datetime.now(tz=self._timezone) 245 if not self._end_datetime: 246 return now 247 return min(self._end_datetime.get_datetime(self.config), now)
Returns the optimal end datetime. This method compares the current datetime with a pre-configured end datetime and returns the earlier of the two. If no pre-configured end datetime is set, the current datetime is returned.
Returns
The best end datetime, which is either the current datetime or the pre-configured end datetime, whichever is earlier.
311 def parse_date(self, date: str) -> datetime.datetime: 312 for datetime_format in self.cursor_datetime_formats + [self.datetime_format]: 313 try: 314 return self._parser.parse(date, datetime_format) 315 except ValueError: 316 pass 317 raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
328 def get_request_params( 329 self, 330 *, 331 stream_state: Optional[StreamState] = None, 332 stream_slice: Optional[StreamSlice] = None, 333 next_page_token: Optional[Mapping[str, Any]] = None, 334 ) -> Mapping[str, Any]: 335 return self._get_request_options(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
337 def get_request_headers( 338 self, 339 *, 340 stream_state: Optional[StreamState] = None, 341 stream_slice: Optional[StreamSlice] = None, 342 next_page_token: Optional[Mapping[str, Any]] = None, 343 ) -> Mapping[str, Any]: 344 return self._get_request_options(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
346 def get_request_body_data( 347 self, 348 *, 349 stream_state: Optional[StreamState] = None, 350 stream_slice: Optional[StreamSlice] = None, 351 next_page_token: Optional[Mapping[str, Any]] = None, 352 ) -> Mapping[str, Any]: 353 return self._get_request_options(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
355 def get_request_body_json( 356 self, 357 *, 358 stream_state: Optional[StreamState] = None, 359 stream_slice: Optional[StreamSlice] = None, 360 next_page_token: Optional[Mapping[str, Any]] = None, 361 ) -> Mapping[str, Any]: 362 return self._get_request_options(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
385 def should_be_synced(self, record: Record) -> bool: 386 cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__ 387 record_cursor_value = record.get(cursor_field) 388 if not record_cursor_value: 389 self._send_log( 390 Level.WARN, 391 f"Could not find cursor field `{cursor_field}` in record. The incremental sync will assume it needs to be synced", 392 ) 393 return True 394 latest_possible_cursor_value = self.select_best_end_datetime() 395 earliest_possible_cursor_value = self._calculate_earliest_possible_value( 396 latest_possible_cursor_value 397 ) 398 return self._is_within_daterange_boundaries( 399 record, earliest_possible_cursor_value, latest_possible_cursor_value 400 )
Evaluating if a record should be synced allows for filtering and stop condition on pagination
433 def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None: 434 """ 435 Updates the lookback window based on a given number of seconds if the new duration 436 is greater than the currently configured lookback window. 437 438 :param lookback_window_in_seconds: The lookback duration in seconds to potentially update to. 439 """ 440 runtime_lookback_window = duration_isoformat(timedelta(seconds=lookback_window_in_seconds)) 441 config_lookback = parse_duration( 442 self._lookback_window.eval(self.config) if self._lookback_window else "P0D" 443 ) 444 445 # Check if the new runtime lookback window is greater than the current config lookback 446 if parse_duration(runtime_lookback_window) > config_lookback: 447 self._lookback_window = InterpolatedString.create( 448 runtime_lookback_window, parameters={} 449 )
Updates the lookback window based on a given number of seconds if the new duration is greater than the currently configured lookback window.
Parameters
- lookback_window_in_seconds: The lookback duration in seconds to potentially update to.
10class DeclarativeCursor(Cursor, StreamSlicer, ABC): 11 """ 12 DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of 13 state, declarative cursors also manage stream slicing and injecting slice values into outbound requests. 14 """
DeclarativeCursors are components that allow for checkpointing syncs. In addition to managing the fetching and updating of state, declarative cursors also manage stream slicing and injecting slice values into outbound requests.
Inherited Members
74class GlobalSubstreamCursor(DeclarativeCursor): 75 """ 76 The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor. 77 This class is beneficial for streams with many partitions, as it allows the state to be managed globally 78 instead of per partition, simplifying state management and reducing the size of state messages. 79 80 This cursor is activated by setting the `global_substream_cursor` parameter for incremental sync. 81 82 Warnings: 83 - This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs. 84 - The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated. 85 - When using the `incremental_dependency` option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both the `global_substream_cursor` and `incremental_dependency` options to avoid data loss. 86 """ 87 88 def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter): 89 self._stream_cursor = stream_cursor 90 self._partition_router = partition_router 91 self._timer = Timer() 92 self._lock = threading.Lock() 93 self._slice_semaphore = threading.Semaphore( 94 0 95 ) # Start with 0, indicating no slices being tracked 96 self._all_slices_yielded = False 97 self._lookback_window: Optional[int] = None 98 self._current_partition: Optional[Mapping[str, Any]] = None 99 self._last_slice: bool = False 100 self._parent_state: Optional[Mapping[str, Any]] = None 101 102 def start_slices_generation(self) -> None: 103 self._timer.start() 104 105 def stream_slices(self) -> Iterable[StreamSlice]: 106 """ 107 Generates stream slices, ensuring the last slice is properly flagged and processed. 108 109 This method creates a sequence of stream slices by iterating over partitions and cursor slices. 110 It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the 111 final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice` 112 is called only after all slices have been processed. 113 114 We expect the following events: 115 * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False` 116 * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed 117 * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed 118 * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too 119 """ 120 slice_generator = ( 121 StreamSlice( 122 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 123 ) 124 for partition in self._partition_router.stream_slices() 125 for cursor_slice in self._stream_cursor.stream_slices() 126 ) 127 128 self.start_slices_generation() 129 for slice, last, state in iterate_with_last_flag_and_state( 130 slice_generator, self._partition_router.get_stream_state 131 ): 132 self._parent_state = state 133 self.register_slice(last) 134 yield slice 135 self._parent_state = self._partition_router.get_stream_state() 136 137 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 138 slice_generator = ( 139 StreamSlice( 140 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 141 ) 142 for cursor_slice in self._stream_cursor.stream_slices() 143 ) 144 145 yield from slice_generator 146 147 def register_slice(self, last: bool) -> None: 148 """ 149 Tracks the processing of a stream slice. 150 151 Releases the semaphore for each slice. If it's the last slice (`last=True`), 152 sets `_all_slices_yielded` to `True` to indicate no more slices will be processed. 153 154 Args: 155 last (bool): True if the current slice is the last in the sequence. 156 """ 157 self._slice_semaphore.release() 158 if last: 159 self._all_slices_yielded = True 160 161 def set_initial_state(self, stream_state: StreamState) -> None: 162 """ 163 Set the initial state for the cursors. 164 165 This method initializes the state for the global cursor using the provided stream state. 166 167 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 168 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 169 170 Args: 171 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 172 { 173 "state": { 174 "last_updated": "2023-05-27T00:00:00Z" 175 }, 176 "parent_state": { 177 "parent_stream_name": { 178 "last_updated": "2023-05-27T00:00:00Z" 179 } 180 }, 181 "lookback_window": 132 182 } 183 """ 184 if not stream_state: 185 return 186 187 if "lookback_window" in stream_state: 188 self._lookback_window = stream_state["lookback_window"] 189 self._inject_lookback_into_stream_cursor(stream_state["lookback_window"]) 190 191 if "state" in stream_state: 192 self._stream_cursor.set_initial_state(stream_state["state"]) 193 elif "states" not in stream_state: 194 # We assume that `stream_state` is in the old global format 195 # Example: {"global_state_format_key": "global_state_format_value"} 196 self._stream_cursor.set_initial_state(stream_state) 197 198 # We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the 199 # Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called. 200 # We are still keeping this line as a comment to be explicit about the past behavior. 201 # self._partition_router.set_initial_state(stream_state) 202 203 def _inject_lookback_into_stream_cursor(self, lookback_window: int) -> None: 204 """ 205 Modifies the stream cursor's lookback window based on the duration of the previous sync. 206 This adjustment ensures the cursor is set to the minimal lookback window necessary for 207 avoiding missing data. 208 209 Parameters: 210 lookback_window (int): The lookback duration in seconds to be set, derived from 211 the previous sync. 212 213 Raises: 214 ValueError: If the cursor does not support dynamic lookback window adjustments. 215 """ 216 if hasattr(self._stream_cursor, "set_runtime_lookback_window"): 217 self._stream_cursor.set_runtime_lookback_window(lookback_window) 218 else: 219 raise ValueError( 220 "The cursor class for Global Substream Cursor does not have a set_runtime_lookback_window method" 221 ) 222 223 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 224 self._stream_cursor.observe( 225 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 226 ) 227 228 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 229 """ 230 Close the current stream slice. 231 232 This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor 233 only after reading all slices. This ensures that we do not miss any child records from a later parent record 234 if the child cursor is earlier than a record from the first parent record. 235 236 Args: 237 stream_slice (StreamSlice): The stream slice to be closed. 238 *args (Any): Additional arguments. 239 """ 240 with self._lock: 241 self._slice_semaphore.acquire() 242 if self._all_slices_yielded and self._slice_semaphore._value == 0: 243 self._lookback_window = self._timer.finish() 244 self._stream_cursor.close_slice( 245 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 246 ) 247 248 def get_stream_state(self) -> StreamState: 249 state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()} 250 251 if self._parent_state: 252 state["parent_state"] = self._parent_state 253 254 if self._lookback_window is not None: 255 state["lookback_window"] = self._lookback_window 256 257 return state 258 259 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 260 # stream_slice is ignored as cursor is global 261 return self._stream_cursor.get_stream_state() 262 263 def get_request_params( 264 self, 265 *, 266 stream_state: Optional[StreamState] = None, 267 stream_slice: Optional[StreamSlice] = None, 268 next_page_token: Optional[Mapping[str, Any]] = None, 269 ) -> Mapping[str, Any]: 270 if stream_slice: 271 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 272 stream_state=stream_state, 273 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 274 next_page_token=next_page_token, 275 ) | self._stream_cursor.get_request_params( 276 stream_state=stream_state, 277 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 278 next_page_token=next_page_token, 279 ) 280 else: 281 raise ValueError("A partition needs to be provided in order to get request params") 282 283 def get_request_headers( 284 self, 285 *, 286 stream_state: Optional[StreamState] = None, 287 stream_slice: Optional[StreamSlice] = None, 288 next_page_token: Optional[Mapping[str, Any]] = None, 289 ) -> Mapping[str, Any]: 290 if stream_slice: 291 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 292 stream_state=stream_state, 293 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 294 next_page_token=next_page_token, 295 ) | self._stream_cursor.get_request_headers( 296 stream_state=stream_state, 297 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 298 next_page_token=next_page_token, 299 ) 300 else: 301 raise ValueError("A partition needs to be provided in order to get request headers") 302 303 def get_request_body_data( 304 self, 305 *, 306 stream_state: Optional[StreamState] = None, 307 stream_slice: Optional[StreamSlice] = None, 308 next_page_token: Optional[Mapping[str, Any]] = None, 309 ) -> Union[Mapping[str, Any], str]: 310 if stream_slice: 311 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 312 stream_state=stream_state, 313 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 314 next_page_token=next_page_token, 315 ) | self._stream_cursor.get_request_body_data( 316 stream_state=stream_state, 317 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 318 next_page_token=next_page_token, 319 ) 320 else: 321 raise ValueError("A partition needs to be provided in order to get request body data") 322 323 def get_request_body_json( 324 self, 325 *, 326 stream_state: Optional[StreamState] = None, 327 stream_slice: Optional[StreamSlice] = None, 328 next_page_token: Optional[Mapping[str, Any]] = None, 329 ) -> Mapping[str, Any]: 330 if stream_slice: 331 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 332 stream_state=stream_state, 333 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 334 next_page_token=next_page_token, 335 ) | self._stream_cursor.get_request_body_json( 336 stream_state=stream_state, 337 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 338 next_page_token=next_page_token, 339 ) 340 else: 341 raise ValueError("A partition needs to be provided in order to get request body json") 342 343 def should_be_synced(self, record: Record) -> bool: 344 return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record)) 345 346 @staticmethod 347 def _convert_record_to_cursor_record(record: Record) -> Record: 348 return Record( 349 data=record.data, 350 stream_name=record.stream_name, 351 associated_slice=StreamSlice( 352 partition={}, cursor_slice=record.associated_slice.cursor_slice 353 ) 354 if record.associated_slice 355 else None, 356 )
The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor. This class is beneficial for streams with many partitions, as it allows the state to be managed globally instead of per partition, simplifying state management and reducing the size of state messages.
This cursor is activated by setting the global_substream_cursor
parameter for incremental sync.
Warnings:
- This class enforces a minimal lookback window for substream based on the duration of the previous sync to avoid losing records. This lookback ensures that any records added or updated during the sync are captured in subsequent syncs.
- The global cursor is updated only at the end of the sync. If the sync ends prematurely (e.g., due to an exception), the state will not be updated.
- When using the
incremental_dependency
option, the sync will progress through parent records, preventing the sync from getting infinitely stuck. However, it is crucial to understand the requirements for both theglobal_substream_cursor
andincremental_dependency
options to avoid data loss.
88 def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: PartitionRouter): 89 self._stream_cursor = stream_cursor 90 self._partition_router = partition_router 91 self._timer = Timer() 92 self._lock = threading.Lock() 93 self._slice_semaphore = threading.Semaphore( 94 0 95 ) # Start with 0, indicating no slices being tracked 96 self._all_slices_yielded = False 97 self._lookback_window: Optional[int] = None 98 self._current_partition: Optional[Mapping[str, Any]] = None 99 self._last_slice: bool = False 100 self._parent_state: Optional[Mapping[str, Any]] = None
105 def stream_slices(self) -> Iterable[StreamSlice]: 106 """ 107 Generates stream slices, ensuring the last slice is properly flagged and processed. 108 109 This method creates a sequence of stream slices by iterating over partitions and cursor slices. 110 It holds onto one slice in memory to set `_all_slices_yielded` to `True` before yielding the 111 final slice. A semaphore is used to track the processing of slices, ensuring that `close_slice` 112 is called only after all slices have been processed. 113 114 We expect the following events: 115 * Yields all the slices except the last one. At this point, `close_slice` won't actually close the global slice as `self._all_slices_yielded == False` 116 * Release the semaphore one last time before setting `self._all_slices_yielded = True`. This will cause `close_slice` to know about all the slices before we indicate that all slices have been yielded so the left side of `if self._all_slices_yielded and self._slice_semaphore._value == 0` will be false if not everything is closed 117 * Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed 118 * Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too 119 """ 120 slice_generator = ( 121 StreamSlice( 122 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 123 ) 124 for partition in self._partition_router.stream_slices() 125 for cursor_slice in self._stream_cursor.stream_slices() 126 ) 127 128 self.start_slices_generation() 129 for slice, last, state in iterate_with_last_flag_and_state( 130 slice_generator, self._partition_router.get_stream_state 131 ): 132 self._parent_state = state 133 self.register_slice(last) 134 yield slice 135 self._parent_state = self._partition_router.get_stream_state()
Generates stream slices, ensuring the last slice is properly flagged and processed.
This method creates a sequence of stream slices by iterating over partitions and cursor slices.
It holds onto one slice in memory to set _all_slices_yielded
to True
before yielding the
final slice. A semaphore is used to track the processing of slices, ensuring that close_slice
is called only after all slices have been processed.
We expect the following events:
- Yields all the slices except the last one. At this point,
close_slice
won't actually close the global slice asself._all_slices_yielded == False
- Release the semaphore one last time before setting
self._all_slices_yielded = True
. This will causeclose_slice
to know about all the slices before we indicate that all slices have been yielded so the left side ofif self._all_slices_yielded and self._slice_semaphore._value == 0
will be false if not everything is closed - Setting
self._all_slices_yielded = True
. We do that before actually yielding the last slice as the caller ofstream_slices
might stop iterating at any point and hence the code afteryield
might not be executed - Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
137 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 138 slice_generator = ( 139 StreamSlice( 140 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 141 ) 142 for cursor_slice in self._stream_cursor.stream_slices() 143 ) 144 145 yield from slice_generator
147 def register_slice(self, last: bool) -> None: 148 """ 149 Tracks the processing of a stream slice. 150 151 Releases the semaphore for each slice. If it's the last slice (`last=True`), 152 sets `_all_slices_yielded` to `True` to indicate no more slices will be processed. 153 154 Args: 155 last (bool): True if the current slice is the last in the sequence. 156 """ 157 self._slice_semaphore.release() 158 if last: 159 self._all_slices_yielded = True
Tracks the processing of a stream slice.
Releases the semaphore for each slice. If it's the last slice (last=True
),
sets _all_slices_yielded
to True
to indicate no more slices will be processed.
Arguments:
- last (bool): True if the current slice is the last in the sequence.
161 def set_initial_state(self, stream_state: StreamState) -> None: 162 """ 163 Set the initial state for the cursors. 164 165 This method initializes the state for the global cursor using the provided stream state. 166 167 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 168 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 169 170 Args: 171 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 172 { 173 "state": { 174 "last_updated": "2023-05-27T00:00:00Z" 175 }, 176 "parent_state": { 177 "parent_stream_name": { 178 "last_updated": "2023-05-27T00:00:00Z" 179 } 180 }, 181 "lookback_window": 132 182 } 183 """ 184 if not stream_state: 185 return 186 187 if "lookback_window" in stream_state: 188 self._lookback_window = stream_state["lookback_window"] 189 self._inject_lookback_into_stream_cursor(stream_state["lookback_window"]) 190 191 if "state" in stream_state: 192 self._stream_cursor.set_initial_state(stream_state["state"]) 193 elif "states" not in stream_state: 194 # We assume that `stream_state` is in the old global format 195 # Example: {"global_state_format_key": "global_state_format_value"} 196 self._stream_cursor.set_initial_state(stream_state) 197 198 # We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the 199 # Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called. 200 # We are still keeping this line as a comment to be explicit about the past behavior. 201 # self._partition_router.set_initial_state(stream_state)
Set the initial state for the cursors.
This method initializes the state for the global cursor using the provided stream state.
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
Arguments:
- stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "state": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } }, "lookback_window": 132 }
223 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 224 self._stream_cursor.observe( 225 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 226 )
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
228 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 229 """ 230 Close the current stream slice. 231 232 This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor 233 only after reading all slices. This ensures that we do not miss any child records from a later parent record 234 if the child cursor is earlier than a record from the first parent record. 235 236 Args: 237 stream_slice (StreamSlice): The stream slice to be closed. 238 *args (Any): Additional arguments. 239 """ 240 with self._lock: 241 self._slice_semaphore.acquire() 242 if self._all_slices_yielded and self._slice_semaphore._value == 0: 243 self._lookback_window = self._timer.finish() 244 self._stream_cursor.close_slice( 245 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 246 )
Close the current stream slice.
This method is called when a stream slice is completed. For the global parent cursor, we close the child cursor only after reading all slices. This ensures that we do not miss any child records from a later parent record if the child cursor is earlier than a record from the first parent record.
Arguments:
- stream_slice (StreamSlice): The stream slice to be closed.
- *args (Any): Additional arguments.
248 def get_stream_state(self) -> StreamState: 249 state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()} 250 251 if self._parent_state: 252 state["parent_state"] = self._parent_state 253 254 if self._lookback_window is not None: 255 state["lookback_window"] = self._lookback_window 256 257 return state
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
259 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 260 # stream_slice is ignored as cursor is global 261 return self._stream_cursor.get_stream_state()
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
263 def get_request_params( 264 self, 265 *, 266 stream_state: Optional[StreamState] = None, 267 stream_slice: Optional[StreamSlice] = None, 268 next_page_token: Optional[Mapping[str, Any]] = None, 269 ) -> Mapping[str, Any]: 270 if stream_slice: 271 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 272 stream_state=stream_state, 273 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 274 next_page_token=next_page_token, 275 ) | self._stream_cursor.get_request_params( 276 stream_state=stream_state, 277 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 278 next_page_token=next_page_token, 279 ) 280 else: 281 raise ValueError("A partition needs to be provided in order to get request params")
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
283 def get_request_headers( 284 self, 285 *, 286 stream_state: Optional[StreamState] = None, 287 stream_slice: Optional[StreamSlice] = None, 288 next_page_token: Optional[Mapping[str, Any]] = None, 289 ) -> Mapping[str, Any]: 290 if stream_slice: 291 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 292 stream_state=stream_state, 293 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 294 next_page_token=next_page_token, 295 ) | self._stream_cursor.get_request_headers( 296 stream_state=stream_state, 297 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 298 next_page_token=next_page_token, 299 ) 300 else: 301 raise ValueError("A partition needs to be provided in order to get request headers")
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
303 def get_request_body_data( 304 self, 305 *, 306 stream_state: Optional[StreamState] = None, 307 stream_slice: Optional[StreamSlice] = None, 308 next_page_token: Optional[Mapping[str, Any]] = None, 309 ) -> Union[Mapping[str, Any], str]: 310 if stream_slice: 311 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 312 stream_state=stream_state, 313 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 314 next_page_token=next_page_token, 315 ) | self._stream_cursor.get_request_body_data( 316 stream_state=stream_state, 317 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 318 next_page_token=next_page_token, 319 ) 320 else: 321 raise ValueError("A partition needs to be provided in order to get request body data")
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
323 def get_request_body_json( 324 self, 325 *, 326 stream_state: Optional[StreamState] = None, 327 stream_slice: Optional[StreamSlice] = None, 328 next_page_token: Optional[Mapping[str, Any]] = None, 329 ) -> Mapping[str, Any]: 330 if stream_slice: 331 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 332 stream_state=stream_state, 333 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 334 next_page_token=next_page_token, 335 ) | self._stream_cursor.get_request_body_json( 336 stream_state=stream_state, 337 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 338 next_page_token=next_page_token, 339 ) 340 else: 341 raise ValueError("A partition needs to be provided in order to get request body json")
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
28class PerPartitionCursor(DeclarativeCursor): 29 """ 30 Manages state per partition when a stream has many partitions, to prevent data loss or duplication. 31 32 **Partition Limitation and Limit Reached Logic** 33 34 - **DEFAULT_MAX_PARTITIONS_NUMBER**: The maximum number of partitions to keep in memory (default is 10,000). 35 - **_cursor_per_partition**: An ordered dictionary that stores cursors for each partition. 36 - **_over_limit**: A counter that increments each time an oldest partition is removed when the limit is exceeded. 37 38 The class ensures that the number of partitions tracked does not exceed the `DEFAULT_MAX_PARTITIONS_NUMBER` to prevent excessive memory usage. 39 40 - When the number of partitions exceeds the limit, the oldest partitions are removed from `_cursor_per_partition`, and `_over_limit` is incremented accordingly. 41 - The `limit_reached` method returns `True` when `_over_limit` exceeds `DEFAULT_MAX_PARTITIONS_NUMBER`, indicating that the global cursor should be used instead of per-partition cursors. 42 43 This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed. 44 """ 45 46 DEFAULT_MAX_PARTITIONS_NUMBER = 10000 47 _NO_STATE: Mapping[str, Any] = {} 48 _NO_CURSOR_STATE: Mapping[str, Any] = {} 49 _KEY = 0 50 _VALUE = 1 51 _state_to_migrate_from: Mapping[str, Any] = {} 52 53 def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter): 54 self._cursor_factory = cursor_factory 55 self._partition_router = partition_router 56 # The dict is ordered to ensure that once the maximum number of partitions is reached, 57 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 58 self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict() 59 self._over_limit = 0 60 self._partition_serializer = PerPartitionKeySerializer() 61 62 def stream_slices(self) -> Iterable[StreamSlice]: 63 slices = self._partition_router.stream_slices() 64 for partition in slices: 65 yield from self.generate_slices_from_partition(partition) 66 67 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 68 # Ensure the maximum number of partitions is not exceeded 69 self._ensure_partition_limit() 70 71 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 72 if not cursor: 73 partition_state = ( 74 self._state_to_migrate_from 75 if self._state_to_migrate_from 76 else self._NO_CURSOR_STATE 77 ) 78 cursor = self._create_cursor(partition_state) 79 self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor 80 81 for cursor_slice in cursor.stream_slices(): 82 yield StreamSlice( 83 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 84 ) 85 86 def _ensure_partition_limit(self) -> None: 87 """ 88 Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. 89 """ 90 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: 91 self._over_limit += 1 92 oldest_partition = self._cursor_per_partition.popitem(last=False)[ 93 0 94 ] # Remove the oldest partition 95 logger.warning( 96 f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." 97 ) 98 99 def limit_reached(self) -> bool: 100 return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER 101 102 def set_initial_state(self, stream_state: StreamState) -> None: 103 """ 104 Set the initial state for the cursors. 105 106 This method initializes the state for each partition cursor using the provided stream state. 107 If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state. 108 109 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 110 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 111 112 Args: 113 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 114 { 115 "states": [ 116 { 117 "partition": { 118 "partition_key": "value" 119 }, 120 "cursor": { 121 "last_updated": "2023-05-27T00:00:00Z" 122 } 123 } 124 ], 125 "parent_state": { 126 "parent_stream_name": { 127 "last_updated": "2023-05-27T00:00:00Z" 128 } 129 } 130 } 131 """ 132 if not stream_state: 133 return 134 135 if "states" not in stream_state: 136 # We assume that `stream_state` is in a global format that can be applied to all partitions. 137 # Example: {"global_state_format_key": "global_state_format_value"} 138 self._state_to_migrate_from = stream_state 139 140 else: 141 for state in stream_state["states"]: 142 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 143 self._create_cursor(state["cursor"]) 144 ) 145 146 # set default state for missing partitions if it is per partition with fallback to global 147 if "state" in stream_state: 148 self._state_to_migrate_from = stream_state["state"] 149 150 # We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the 151 # Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called. 152 # We are still keeping this line as a comment to be explicit about the past behavior. 153 # self._partition_router.set_initial_state(stream_state) 154 155 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 156 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe( 157 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 158 ) 159 160 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 161 try: 162 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice( 163 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 164 ) 165 except KeyError as exception: 166 raise ValueError( 167 f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because " 168 f"we should only update state for partitions that were emitted during `stream_slices`" 169 ) 170 171 def get_stream_state(self) -> StreamState: 172 states = [] 173 for partition_tuple, cursor in self._cursor_per_partition.items(): 174 cursor_state = cursor.get_stream_state() 175 if cursor_state: 176 states.append( 177 { 178 "partition": self._to_dict(partition_tuple), 179 "cursor": cursor_state, 180 } 181 ) 182 state: dict[str, Any] = {"states": states} 183 184 parent_state = self._partition_router.get_stream_state() 185 if parent_state: 186 state["parent_state"] = parent_state 187 return state 188 189 def _get_state_for_partition(self, partition: Mapping[str, Any]) -> Optional[StreamState]: 190 cursor = self._cursor_per_partition.get(self._to_partition_key(partition)) 191 if cursor: 192 return cursor.get_stream_state() 193 194 return None 195 196 @staticmethod 197 def _is_new_state(stream_state: Mapping[str, Any]) -> bool: 198 return not bool(stream_state) 199 200 def _to_partition_key(self, partition: Mapping[str, Any]) -> str: 201 return self._partition_serializer.to_partition_key(partition) 202 203 def _to_dict(self, partition_key: str) -> Mapping[str, Any]: 204 return self._partition_serializer.to_partition(partition_key) 205 206 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 207 if not stream_slice: 208 raise ValueError("A partition needs to be provided in order to extract a state") 209 210 if not stream_slice: 211 return None 212 213 return self._get_state_for_partition(stream_slice.partition) 214 215 def _create_cursor(self, cursor_state: Any) -> DeclarativeCursor: 216 cursor = self._cursor_factory.create() 217 cursor.set_initial_state(cursor_state) 218 return cursor 219 220 def get_request_params( 221 self, 222 *, 223 stream_state: Optional[StreamState] = None, 224 stream_slice: Optional[StreamSlice] = None, 225 next_page_token: Optional[Mapping[str, Any]] = None, 226 ) -> Mapping[str, Any]: 227 if stream_slice: 228 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 229 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 230 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 231 stream_state=stream_state, 232 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 233 next_page_token=next_page_token, 234 ) | self._cursor_per_partition[ 235 self._to_partition_key(stream_slice.partition) 236 ].get_request_params( 237 stream_state=stream_state, 238 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 239 next_page_token=next_page_token, 240 ) 241 else: 242 raise ValueError("A partition needs to be provided in order to get request params") 243 244 def get_request_headers( 245 self, 246 *, 247 stream_state: Optional[StreamState] = None, 248 stream_slice: Optional[StreamSlice] = None, 249 next_page_token: Optional[Mapping[str, Any]] = None, 250 ) -> Mapping[str, Any]: 251 if stream_slice: 252 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 253 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 254 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 255 stream_state=stream_state, 256 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 257 next_page_token=next_page_token, 258 ) | self._cursor_per_partition[ 259 self._to_partition_key(stream_slice.partition) 260 ].get_request_headers( 261 stream_state=stream_state, 262 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 263 next_page_token=next_page_token, 264 ) 265 else: 266 raise ValueError("A partition needs to be provided in order to get request headers") 267 268 def get_request_body_data( 269 self, 270 *, 271 stream_state: Optional[StreamState] = None, 272 stream_slice: Optional[StreamSlice] = None, 273 next_page_token: Optional[Mapping[str, Any]] = None, 274 ) -> Union[Mapping[str, Any], str]: 275 if stream_slice: 276 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 277 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 278 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 279 stream_state=stream_state, 280 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 281 next_page_token=next_page_token, 282 ) | self._cursor_per_partition[ 283 self._to_partition_key(stream_slice.partition) 284 ].get_request_body_data( 285 stream_state=stream_state, 286 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 287 next_page_token=next_page_token, 288 ) 289 else: 290 raise ValueError("A partition needs to be provided in order to get request body data") 291 292 def get_request_body_json( 293 self, 294 *, 295 stream_state: Optional[StreamState] = None, 296 stream_slice: Optional[StreamSlice] = None, 297 next_page_token: Optional[Mapping[str, Any]] = None, 298 ) -> Mapping[str, Any]: 299 if stream_slice: 300 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 301 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 302 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 303 stream_state=stream_state, 304 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 305 next_page_token=next_page_token, 306 ) | self._cursor_per_partition[ 307 self._to_partition_key(stream_slice.partition) 308 ].get_request_body_json( 309 stream_state=stream_state, 310 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 311 next_page_token=next_page_token, 312 ) 313 else: 314 raise ValueError("A partition needs to be provided in order to get request body json") 315 316 def should_be_synced(self, record: Record) -> bool: 317 return self._get_cursor(record).should_be_synced( 318 self._convert_record_to_cursor_record(record) 319 ) 320 321 @staticmethod 322 def _convert_record_to_cursor_record(record: Record) -> Record: 323 return Record( 324 data=record.data, 325 stream_name=record.stream_name, 326 associated_slice=StreamSlice( 327 partition={}, cursor_slice=record.associated_slice.cursor_slice 328 ) 329 if record.associated_slice 330 else None, 331 ) 332 333 def _get_cursor(self, record: Record) -> DeclarativeCursor: 334 if not record.associated_slice: 335 raise ValueError( 336 "Invalid state as stream slices that are emitted should refer to an existing cursor" 337 ) 338 partition_key = self._to_partition_key(record.associated_slice.partition) 339 if partition_key not in self._cursor_per_partition: 340 self._create_cursor_for_partition(partition_key) 341 cursor = self._cursor_per_partition[partition_key] 342 return cursor 343 344 def _create_cursor_for_partition(self, partition_key: str) -> None: 345 """ 346 Dynamically creates and initializes a cursor for the specified partition. 347 348 This method is required for `ConcurrentPerPartitionCursor`. For concurrent cursors, 349 stream_slices is executed only for the concurrent cursor, so cursors per partition 350 are not created for the declarative cursor. This method ensures that a cursor is available 351 to create requests for the specified partition. The cursor is initialized 352 with the per-partition state if present in the initial state, or with the global state 353 adjusted by the lookback window, or with the state to migrate from. 354 355 Note: 356 This is a temporary workaround and should be removed once the declarative cursor 357 is decoupled from the concurrent cursor implementation. 358 359 Args: 360 partition_key (str): The unique identifier for the partition for which the cursor 361 needs to be created. 362 """ 363 partition_state = ( 364 self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE 365 ) 366 cursor = self._create_cursor(partition_state) 367 368 self._cursor_per_partition[partition_key] = cursor
Manages state per partition when a stream has many partitions, to prevent data loss or duplication.
Partition Limitation and Limit Reached Logic
- DEFAULT_MAX_PARTITIONS_NUMBER: The maximum number of partitions to keep in memory (default is 10,000).
- _cursor_per_partition: An ordered dictionary that stores cursors for each partition.
- _over_limit: A counter that increments each time an oldest partition is removed when the limit is exceeded.
The class ensures that the number of partitions tracked does not exceed the DEFAULT_MAX_PARTITIONS_NUMBER
to prevent excessive memory usage.
- When the number of partitions exceeds the limit, the oldest partitions are removed from
_cursor_per_partition
, and_over_limit
is incremented accordingly. - The
limit_reached
method returnsTrue
when_over_limit
exceedsDEFAULT_MAX_PARTITIONS_NUMBER
, indicating that the global cursor should be used instead of per-partition cursors.
This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed.
53 def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRouter): 54 self._cursor_factory = cursor_factory 55 self._partition_router = partition_router 56 # The dict is ordered to ensure that once the maximum number of partitions is reached, 57 # the oldest partitions can be efficiently removed, maintaining the most recent partitions. 58 self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict() 59 self._over_limit = 0 60 self._partition_serializer = PerPartitionKeySerializer()
62 def stream_slices(self) -> Iterable[StreamSlice]: 63 slices = self._partition_router.stream_slices() 64 for partition in slices: 65 yield from self.generate_slices_from_partition(partition)
Defines stream slices
Returns
An iterable of stream slices
67 def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: 68 # Ensure the maximum number of partitions is not exceeded 69 self._ensure_partition_limit() 70 71 cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) 72 if not cursor: 73 partition_state = ( 74 self._state_to_migrate_from 75 if self._state_to_migrate_from 76 else self._NO_CURSOR_STATE 77 ) 78 cursor = self._create_cursor(partition_state) 79 self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor 80 81 for cursor_slice in cursor.stream_slices(): 82 yield StreamSlice( 83 partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields 84 )
102 def set_initial_state(self, stream_state: StreamState) -> None: 103 """ 104 Set the initial state for the cursors. 105 106 This method initializes the state for each partition cursor using the provided stream state. 107 If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state. 108 109 Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router 110 does not have parent streams, this step will be skipped due to the default PartitionRouter implementation. 111 112 Args: 113 stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: 114 { 115 "states": [ 116 { 117 "partition": { 118 "partition_key": "value" 119 }, 120 "cursor": { 121 "last_updated": "2023-05-27T00:00:00Z" 122 } 123 } 124 ], 125 "parent_state": { 126 "parent_stream_name": { 127 "last_updated": "2023-05-27T00:00:00Z" 128 } 129 } 130 } 131 """ 132 if not stream_state: 133 return 134 135 if "states" not in stream_state: 136 # We assume that `stream_state` is in a global format that can be applied to all partitions. 137 # Example: {"global_state_format_key": "global_state_format_value"} 138 self._state_to_migrate_from = stream_state 139 140 else: 141 for state in stream_state["states"]: 142 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( 143 self._create_cursor(state["cursor"]) 144 ) 145 146 # set default state for missing partitions if it is per partition with fallback to global 147 if "state" in stream_state: 148 self._state_to_migrate_from = stream_state["state"] 149 150 # We used to set the parent state through this method but since moving the SubstreamPartitionRouter to the 151 # Concurrent CDK/AbstractStream, the state is passed at the __init__ stage and this does not need to be called. 152 # We are still keeping this line as a comment to be explicit about the past behavior. 153 # self._partition_router.set_initial_state(stream_state)
Set the initial state for the cursors.
This method initializes the state for each partition cursor using the provided stream state. If a partition state is provided in the stream state, it will update the corresponding partition cursor with this state.
Additionally, it sets the parent state for partition routers that are based on parent streams. If a partition router does not have parent streams, this step will be skipped due to the default PartitionRouter implementation.
Arguments:
- stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: { "states": [ { "partition": { "partition_key": "value" }, "cursor": { "last_updated": "2023-05-27T00:00:00Z" } } ], "parent_state": { "parent_stream_name": { "last_updated": "2023-05-27T00:00:00Z" } } }
155 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 156 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].observe( 157 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), record 158 )
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
160 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 161 try: 162 self._cursor_per_partition[self._to_partition_key(stream_slice.partition)].close_slice( 163 StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args 164 ) 165 except KeyError as exception: 166 raise ValueError( 167 f"Partition {str(exception)} could not be found in current state based on the record. This is unexpected because " 168 f"we should only update state for partitions that were emitted during `stream_slices`" 169 )
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
171 def get_stream_state(self) -> StreamState: 172 states = [] 173 for partition_tuple, cursor in self._cursor_per_partition.items(): 174 cursor_state = cursor.get_stream_state() 175 if cursor_state: 176 states.append( 177 { 178 "partition": self._to_dict(partition_tuple), 179 "cursor": cursor_state, 180 } 181 ) 182 state: dict[str, Any] = {"states": states} 183 184 parent_state = self._partition_router.get_stream_state() 185 if parent_state: 186 state["parent_state"] = parent_state 187 return state
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
206 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 207 if not stream_slice: 208 raise ValueError("A partition needs to be provided in order to extract a state") 209 210 if not stream_slice: 211 return None 212 213 return self._get_state_for_partition(stream_slice.partition)
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
220 def get_request_params( 221 self, 222 *, 223 stream_state: Optional[StreamState] = None, 224 stream_slice: Optional[StreamSlice] = None, 225 next_page_token: Optional[Mapping[str, Any]] = None, 226 ) -> Mapping[str, Any]: 227 if stream_slice: 228 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 229 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 230 return self._partition_router.get_request_params( # type: ignore # this always returns a mapping 231 stream_state=stream_state, 232 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 233 next_page_token=next_page_token, 234 ) | self._cursor_per_partition[ 235 self._to_partition_key(stream_slice.partition) 236 ].get_request_params( 237 stream_state=stream_state, 238 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 239 next_page_token=next_page_token, 240 ) 241 else: 242 raise ValueError("A partition needs to be provided in order to get request params")
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
244 def get_request_headers( 245 self, 246 *, 247 stream_state: Optional[StreamState] = None, 248 stream_slice: Optional[StreamSlice] = None, 249 next_page_token: Optional[Mapping[str, Any]] = None, 250 ) -> Mapping[str, Any]: 251 if stream_slice: 252 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 253 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 254 return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping 255 stream_state=stream_state, 256 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 257 next_page_token=next_page_token, 258 ) | self._cursor_per_partition[ 259 self._to_partition_key(stream_slice.partition) 260 ].get_request_headers( 261 stream_state=stream_state, 262 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 263 next_page_token=next_page_token, 264 ) 265 else: 266 raise ValueError("A partition needs to be provided in order to get request headers")
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
268 def get_request_body_data( 269 self, 270 *, 271 stream_state: Optional[StreamState] = None, 272 stream_slice: Optional[StreamSlice] = None, 273 next_page_token: Optional[Mapping[str, Any]] = None, 274 ) -> Union[Mapping[str, Any], str]: 275 if stream_slice: 276 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 277 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 278 return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping 279 stream_state=stream_state, 280 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 281 next_page_token=next_page_token, 282 ) | self._cursor_per_partition[ 283 self._to_partition_key(stream_slice.partition) 284 ].get_request_body_data( 285 stream_state=stream_state, 286 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 287 next_page_token=next_page_token, 288 ) 289 else: 290 raise ValueError("A partition needs to be provided in order to get request body data")
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
292 def get_request_body_json( 293 self, 294 *, 295 stream_state: Optional[StreamState] = None, 296 stream_slice: Optional[StreamSlice] = None, 297 next_page_token: Optional[Mapping[str, Any]] = None, 298 ) -> Mapping[str, Any]: 299 if stream_slice: 300 if self._to_partition_key(stream_slice.partition) not in self._cursor_per_partition: 301 self._create_cursor_for_partition(self._to_partition_key(stream_slice.partition)) 302 return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping 303 stream_state=stream_state, 304 stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}), 305 next_page_token=next_page_token, 306 ) | self._cursor_per_partition[ 307 self._to_partition_key(stream_slice.partition) 308 ].get_request_body_json( 309 stream_state=stream_state, 310 stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), 311 next_page_token=next_page_token, 312 ) 313 else: 314 raise ValueError("A partition needs to be provided in order to get request body json")
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
23class PerPartitionWithGlobalCursor(DeclarativeCursor): 24 """ 25 Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met. 26 27 This cursor handles partitioned streams by maintaining individual state per partition using `PerPartitionCursor`. If the number of partitions exceeds a defined limit, it switches to a global cursor (`GlobalSubstreamCursor`) to manage state more efficiently. 28 29 **Overview** 30 31 - **Partition-Based State**: Initially manages state per partition to ensure accurate processing of each partition's data. 32 - **Global Fallback**: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively. 33 34 **Switching Logic** 35 36 - Monitors the number of partitions. 37 - If `PerPartitionCursor.limit_reached()` returns `True`, sets `_use_global_cursor` to `True`, activating the global cursor. 38 39 **Active Cursor Selection** 40 41 - Uses the `_get_active_cursor()` helper method to select the active cursor based on the `_use_global_cursor` flag. 42 - This simplifies the logic and ensures consistent cursor usage across methods. 43 44 **State Structure Example** 45 46 ```json 47 { 48 "states": [ 49 { 50 "partition": {"partition_key": "partition_1"}, 51 "cursor": {"cursor_field": "2021-01-15"} 52 }, 53 { 54 "partition": {"partition_key": "partition_2"}, 55 "cursor": {"cursor_field": "2021-02-14"} 56 } 57 ], 58 "state": { 59 "cursor_field": "2021-02-15" 60 }, 61 "use_global_cursor": false 62 } 63 ``` 64 65 In this example, the cursor is using partition-based state management (`"use_global_cursor": false`), maintaining separate cursor states for each partition. 66 67 **Usage Scenario** 68 69 Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization. 70 """ 71 72 def __init__( 73 self, 74 cursor_factory: CursorFactory, 75 partition_router: PartitionRouter, 76 stream_cursor: DatetimeBasedCursor, 77 ): 78 self._partition_router = partition_router 79 self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router) 80 self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router) 81 self._use_global_cursor = False 82 self._current_partition: Optional[Mapping[str, Any]] = None 83 self._last_slice: bool = False 84 self._parent_state: Optional[Mapping[str, Any]] = None 85 86 def _get_active_cursor(self) -> Union[PerPartitionCursor, GlobalSubstreamCursor]: 87 return self._global_cursor if self._use_global_cursor else self._per_partition_cursor 88 89 def stream_slices(self) -> Iterable[StreamSlice]: 90 self._global_cursor.start_slices_generation() 91 92 # Iterate through partitions and process slices 93 for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state( 94 self._partition_router.stream_slices(), self._partition_router.get_stream_state 95 ): 96 # Generate slices for the current cursor and handle the last slice using the flag 97 self._parent_state = parent_state 98 for slice, is_last_slice, _ in iterate_with_last_flag_and_state( 99 self._get_active_cursor().generate_slices_from_partition(partition=partition), 100 lambda: None, 101 ): 102 self._global_cursor.register_slice(is_last_slice and is_last_partition) 103 yield slice 104 self._parent_state = self._partition_router.get_stream_state() 105 106 def set_initial_state(self, stream_state: StreamState) -> None: 107 """ 108 Set the initial state for the cursors. 109 """ 110 self._use_global_cursor = stream_state.get("use_global_cursor", False) 111 112 self._parent_state = stream_state.get("parent_state", {}) 113 114 self._global_cursor.set_initial_state(stream_state) 115 if not self._use_global_cursor: 116 self._per_partition_cursor.set_initial_state(stream_state) 117 118 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 119 if not self._use_global_cursor and self._per_partition_cursor.limit_reached(): 120 self._use_global_cursor = True 121 122 if not self._use_global_cursor: 123 self._per_partition_cursor.observe(stream_slice, record) 124 self._global_cursor.observe(stream_slice, record) 125 126 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 127 if not self._use_global_cursor: 128 self._per_partition_cursor.close_slice(stream_slice, *args) 129 self._global_cursor.close_slice(stream_slice, *args) 130 131 def get_stream_state(self) -> StreamState: 132 final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor} 133 134 final_state.update(self._global_cursor.get_stream_state()) 135 if not self._use_global_cursor: 136 final_state.update(self._per_partition_cursor.get_stream_state()) 137 138 final_state["parent_state"] = self._parent_state 139 if not final_state.get("parent_state"): 140 del final_state["parent_state"] 141 142 return final_state 143 144 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 145 return self._get_active_cursor().select_state(stream_slice) 146 147 def get_request_params( 148 self, 149 *, 150 stream_state: Optional[StreamState] = None, 151 stream_slice: Optional[StreamSlice] = None, 152 next_page_token: Optional[Mapping[str, Any]] = None, 153 ) -> Mapping[str, Any]: 154 return self._get_active_cursor().get_request_params( 155 stream_state=stream_state, 156 stream_slice=stream_slice, 157 next_page_token=next_page_token, 158 ) 159 160 def get_request_headers( 161 self, 162 *, 163 stream_state: Optional[StreamState] = None, 164 stream_slice: Optional[StreamSlice] = None, 165 next_page_token: Optional[Mapping[str, Any]] = None, 166 ) -> Mapping[str, Any]: 167 return self._get_active_cursor().get_request_headers( 168 stream_state=stream_state, 169 stream_slice=stream_slice, 170 next_page_token=next_page_token, 171 ) 172 173 def get_request_body_data( 174 self, 175 *, 176 stream_state: Optional[StreamState] = None, 177 stream_slice: Optional[StreamSlice] = None, 178 next_page_token: Optional[Mapping[str, Any]] = None, 179 ) -> Union[Mapping[str, Any], str]: 180 return self._get_active_cursor().get_request_body_data( 181 stream_state=stream_state, 182 stream_slice=stream_slice, 183 next_page_token=next_page_token, 184 ) 185 186 def get_request_body_json( 187 self, 188 *, 189 stream_state: Optional[StreamState] = None, 190 stream_slice: Optional[StreamSlice] = None, 191 next_page_token: Optional[Mapping[str, Any]] = None, 192 ) -> Mapping[str, Any]: 193 return self._get_active_cursor().get_request_body_json( 194 stream_state=stream_state, 195 stream_slice=stream_slice, 196 next_page_token=next_page_token, 197 ) 198 199 def should_be_synced(self, record: Record) -> bool: 200 return self._get_active_cursor().should_be_synced(record)
Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met.
This cursor handles partitioned streams by maintaining individual state per partition using PerPartitionCursor
. If the number of partitions exceeds a defined limit, it switches to a global cursor (GlobalSubstreamCursor
) to manage state more efficiently.
Overview
- Partition-Based State: Initially manages state per partition to ensure accurate processing of each partition's data.
- Global Fallback: Switches to a global cursor when the partition limit is exceeded to handle state management more effectively.
Switching Logic
- Monitors the number of partitions.
- If
PerPartitionCursor.limit_reached()
returnsTrue
, sets_use_global_cursor
toTrue
, activating the global cursor.
Active Cursor Selection
- Uses the
_get_active_cursor()
helper method to select the active cursor based on the_use_global_cursor
flag. - This simplifies the logic and ensures consistent cursor usage across methods.
State Structure Example
{
"states": [
{
"partition": {"partition_key": "partition_1"},
"cursor": {"cursor_field": "2021-01-15"}
},
{
"partition": {"partition_key": "partition_2"},
"cursor": {"cursor_field": "2021-02-14"}
}
],
"state": {
"cursor_field": "2021-02-15"
},
"use_global_cursor": false
}
In this example, the cursor is using partition-based state management ("use_global_cursor": false
), maintaining separate cursor states for each partition.
Usage Scenario
Suitable for streams where the number of partitions may vary significantly, requiring dynamic switching between per-partition and global state management to ensure data consistency and efficient synchronization.
72 def __init__( 73 self, 74 cursor_factory: CursorFactory, 75 partition_router: PartitionRouter, 76 stream_cursor: DatetimeBasedCursor, 77 ): 78 self._partition_router = partition_router 79 self._per_partition_cursor = PerPartitionCursor(cursor_factory, partition_router) 80 self._global_cursor = GlobalSubstreamCursor(stream_cursor, partition_router) 81 self._use_global_cursor = False 82 self._current_partition: Optional[Mapping[str, Any]] = None 83 self._last_slice: bool = False 84 self._parent_state: Optional[Mapping[str, Any]] = None
89 def stream_slices(self) -> Iterable[StreamSlice]: 90 self._global_cursor.start_slices_generation() 91 92 # Iterate through partitions and process slices 93 for partition, is_last_partition, parent_state in iterate_with_last_flag_and_state( 94 self._partition_router.stream_slices(), self._partition_router.get_stream_state 95 ): 96 # Generate slices for the current cursor and handle the last slice using the flag 97 self._parent_state = parent_state 98 for slice, is_last_slice, _ in iterate_with_last_flag_and_state( 99 self._get_active_cursor().generate_slices_from_partition(partition=partition), 100 lambda: None, 101 ): 102 self._global_cursor.register_slice(is_last_slice and is_last_partition) 103 yield slice 104 self._parent_state = self._partition_router.get_stream_state()
Defines stream slices
Returns
An iterable of stream slices
106 def set_initial_state(self, stream_state: StreamState) -> None: 107 """ 108 Set the initial state for the cursors. 109 """ 110 self._use_global_cursor = stream_state.get("use_global_cursor", False) 111 112 self._parent_state = stream_state.get("parent_state", {}) 113 114 self._global_cursor.set_initial_state(stream_state) 115 if not self._use_global_cursor: 116 self._per_partition_cursor.set_initial_state(stream_state)
Set the initial state for the cursors.
118 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 119 if not self._use_global_cursor and self._per_partition_cursor.limit_reached(): 120 self._use_global_cursor = True 121 122 if not self._use_global_cursor: 123 self._per_partition_cursor.observe(stream_slice, record) 124 self._global_cursor.observe(stream_slice, record)
Register a record with the cursor; the cursor instance can then use it to manage the state of the in-progress stream read.
Parameters
- stream_slice: The current slice, which may or may not contain the most recently observed record
- record: the most recently-read record, which the cursor can use to update the stream state. Outwardly-visible changes to the stream state may need to be deferred depending on whether the source reliably orders records by the cursor field.
126 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 127 if not self._use_global_cursor: 128 self._per_partition_cursor.close_slice(stream_slice, *args) 129 self._global_cursor.close_slice(stream_slice, *args)
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
131 def get_stream_state(self) -> StreamState: 132 final_state: MutableMapping[str, Any] = {"use_global_cursor": self._use_global_cursor} 133 134 final_state.update(self._global_cursor.get_stream_state()) 135 if not self._use_global_cursor: 136 final_state.update(self._per_partition_cursor.get_stream_state()) 137 138 final_state["parent_state"] = self._parent_state 139 if not final_state.get("parent_state"): 140 del final_state["parent_state"] 141 142 return final_state
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
144 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 145 return self._get_active_cursor().select_state(stream_slice)
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
147 def get_request_params( 148 self, 149 *, 150 stream_state: Optional[StreamState] = None, 151 stream_slice: Optional[StreamSlice] = None, 152 next_page_token: Optional[Mapping[str, Any]] = None, 153 ) -> Mapping[str, Any]: 154 return self._get_active_cursor().get_request_params( 155 stream_state=stream_state, 156 stream_slice=stream_slice, 157 next_page_token=next_page_token, 158 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
160 def get_request_headers( 161 self, 162 *, 163 stream_state: Optional[StreamState] = None, 164 stream_slice: Optional[StreamSlice] = None, 165 next_page_token: Optional[Mapping[str, Any]] = None, 166 ) -> Mapping[str, Any]: 167 return self._get_active_cursor().get_request_headers( 168 stream_state=stream_state, 169 stream_slice=stream_slice, 170 next_page_token=next_page_token, 171 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
173 def get_request_body_data( 174 self, 175 *, 176 stream_state: Optional[StreamState] = None, 177 stream_slice: Optional[StreamSlice] = None, 178 next_page_token: Optional[Mapping[str, Any]] = None, 179 ) -> Union[Mapping[str, Any], str]: 180 return self._get_active_cursor().get_request_body_data( 181 stream_state=stream_state, 182 stream_slice=stream_slice, 183 next_page_token=next_page_token, 184 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
186 def get_request_body_json( 187 self, 188 *, 189 stream_state: Optional[StreamState] = None, 190 stream_slice: Optional[StreamSlice] = None, 191 next_page_token: Optional[Mapping[str, Any]] = None, 192 ) -> Mapping[str, Any]: 193 return self._get_active_cursor().get_request_body_json( 194 stream_state=stream_state, 195 stream_slice=stream_slice, 196 next_page_token=next_page_token, 197 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
12@dataclass 13class ResumableFullRefreshCursor(DeclarativeCursor): 14 parameters: InitVar[Mapping[str, Any]] 15 16 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 17 self._cursor: StreamState = {} 18 19 def get_stream_state(self) -> StreamState: 20 return self._cursor 21 22 def set_initial_state(self, stream_state: StreamState) -> None: 23 self._cursor = stream_state 24 25 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 26 """ 27 Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records. 28 """ 29 pass 30 31 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 32 # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected 33 if stream_slice.partition: 34 raise ValueError( 35 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 36 ) 37 self._cursor = stream_slice.cursor_slice 38 39 def should_be_synced(self, record: Record) -> bool: 40 """ 41 Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages 42 that don't have filterable bounds. We should always return them. 43 """ 44 return True 45 46 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 47 # A top-level RFR cursor only manages the state of a single partition 48 return self._cursor 49 50 def stream_slices(self) -> Iterable[StreamSlice]: 51 """ 52 Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page 53 along an unbounded set. 54 """ 55 yield from [StreamSlice(cursor_slice=self._cursor, partition={})] 56 57 # This is an interesting pattern that might not seem obvious at first glance. This cursor itself has no functional need to 58 # inject any request values into the outbound response because the up-to-date pagination state is already loaded and 59 # maintained by the paginator component 60 def get_request_params( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Mapping[str, Any]: 67 return {} 68 69 def get_request_headers( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return {} 77 78 def get_request_body_data( 79 self, 80 *, 81 stream_state: Optional[StreamState] = None, 82 stream_slice: Optional[StreamSlice] = None, 83 next_page_token: Optional[Mapping[str, Any]] = None, 84 ) -> Mapping[str, Any]: 85 return {} 86 87 def get_request_body_json( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return {}
Returns the current stream state. We would like to restrict it's usage since it does expose internal of state. As of 2023-06-14, it is used for two things:
- Interpolation of the requests
- Transformation of records
- Saving the state
For the first case, we are probably stuck with exposing the stream state. For the second, we can probably expose a method that allows for emitting the state to the platform.
Cursors are not initialized with their state. As state is needed in order to function properly, this method should be called before calling anything else
Parameters
- stream_state: The state of the stream as returned by get_stream_state
25 def observe(self, stream_slice: StreamSlice, record: Record) -> None: 26 """ 27 Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records. 28 """ 29 pass
Resumable full refresh manages state using a page number so it does not need to update state by observing incoming records.
31 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 32 # The ResumableFullRefreshCursor doesn't support nested streams yet so receiving a partition is unexpected 33 if stream_slice.partition: 34 raise ValueError( 35 f"Stream slice {stream_slice} should not have a partition. Got {stream_slice.partition}." 36 ) 37 self._cursor = stream_slice.cursor_slice
Update state based on the stream slice. Note that stream_slice.cursor_slice
and most_recent_record.associated_slice
are expected
to be the same but we make it explicit here that stream_slice
should be leveraged to update the state. We do not pass in the
latest record, since cursor instances should maintain the relevant internal state on their own.
Parameters
- stream_slice: slice to close
39 def should_be_synced(self, record: Record) -> bool: 40 """ 41 Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages 42 that don't have filterable bounds. We should always return them. 43 """ 44 return True
Unlike date-based cursors which filter out records outside slice boundaries, resumable full refresh records exist within pages that don't have filterable bounds. We should always return them.
46 def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]: 47 # A top-level RFR cursor only manages the state of a single partition 48 return self._cursor
Get the state value of a specific stream_slice. For incremental or resumable full refresh cursors which only manage state in a single dimension this is the entire state object. For per-partition cursors used by substreams, this returns the state of a specific parent delineated by the incoming slice's partition object.
50 def stream_slices(self) -> Iterable[StreamSlice]: 51 """ 52 Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page 53 along an unbounded set. 54 """ 55 yield from [StreamSlice(cursor_slice=self._cursor, partition={})]
Resumable full refresh cursors only return a single slice and can't perform partitioning because iteration is done per-page along an unbounded set.
60 def get_request_params( 61 self, 62 *, 63 stream_state: Optional[StreamState] = None, 64 stream_slice: Optional[StreamSlice] = None, 65 next_page_token: Optional[Mapping[str, Any]] = None, 66 ) -> Mapping[str, Any]: 67 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
69 def get_request_headers( 70 self, 71 *, 72 stream_state: Optional[StreamState] = None, 73 stream_slice: Optional[StreamSlice] = None, 74 next_page_token: Optional[Mapping[str, Any]] = None, 75 ) -> Mapping[str, Any]: 76 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
78 def get_request_body_data( 79 self, 80 *, 81 stream_state: Optional[StreamState] = None, 82 stream_slice: Optional[StreamSlice] = None, 83 next_page_token: Optional[Mapping[str, Any]] = None, 84 ) -> Mapping[str, Any]: 85 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
87 def get_request_body_json( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
97@dataclass 98class ChildPartitionResumableFullRefreshCursor(ResumableFullRefreshCursor): 99 """ 100 The Sub-stream Resumable Cursor for Full-Refresh substreams. 101 Follows the parent type `ResumableFullRefreshCursor` with a small override, 102 to provide the ability to close the substream's slice once it has finished processing. 103 104 Check the `close_slice` method overide for more info about the actual behaviour of this cursor. 105 """ 106 107 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 108 """ 109 Once the current slice has finished syncing: 110 - paginator returns None 111 - no more slices to process 112 113 we assume that the records are processed and emitted already, 114 thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `, 115 otherwise there is a risk of Inf. Loop processing the same slice. 116 """ 117 self._cursor = FULL_REFRESH_COMPLETE_STATE
The Sub-stream Resumable Cursor for Full-Refresh substreams.
Follows the parent type ResumableFullRefreshCursor
with a small override,
to provide the ability to close the substream's slice once it has finished processing.
Check the close_slice
method overide for more info about the actual behaviour of this cursor.
107 def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None: 108 """ 109 Once the current slice has finished syncing: 110 - paginator returns None 111 - no more slices to process 112 113 we assume that the records are processed and emitted already, 114 thus we have to set the cursor to ` __ab_full_refresh_sync_complete: true `, 115 otherwise there is a risk of Inf. Loop processing the same slice. 116 """ 117 self._cursor = FULL_REFRESH_COMPLETE_STATE
Once the current slice has finished syncing:
- paginator returns None
- no more slices to process
we assume that the records are processed and emitted already,
thus we have to set the cursor to __ab_full_refresh_sync_complete: true
,
otherwise there is a risk of Inf. Loop processing the same slice.