airbyte_cdk.sources.declarative.requesters.paginators.strategies
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pagination_strategy import ( 6 CursorPaginationStrategy, 7) 8from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import ( 9 OffsetIncrement, 10) 11from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import ( 12 PageIncrement, 13) 14from airbyte_cdk.sources.declarative.requesters.paginators.strategies.stop_condition import ( 15 CursorStopCondition, 16 StopConditionPaginationStrategyDecorator, 17) 18 19__all__ = [ 20 "CursorPaginationStrategy", 21 "CursorStopCondition", 22 "OffsetIncrement", 23 "PageIncrement", 24 "StopConditionPaginationStrategyDecorator", 25]
24@dataclass 25class CursorPaginationStrategy(PaginationStrategy): 26 """ 27 Pagination strategy that evaluates an interpolated string to define the next page token 28 29 Attributes: 30 page_size (Optional[int]): the number of records to request 31 cursor_value (Union[InterpolatedString, str]): template string evaluating to the cursor value 32 config (Config): connection config 33 stop_condition (Optional[InterpolatedBoolean]): template string evaluating when to stop paginating 34 decoder (Decoder): decoder to decode the response 35 """ 36 37 cursor_value: Union[InterpolatedString, str] 38 config: Config 39 parameters: InitVar[Mapping[str, Any]] 40 page_size: Optional[int] = None 41 stop_condition: Optional[Union[InterpolatedBoolean, str]] = None 42 decoder: Decoder = field( 43 default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})) 44 ) 45 46 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 47 if isinstance(self.cursor_value, str): 48 self._cursor_value = InterpolatedString.create(self.cursor_value, parameters=parameters) 49 else: 50 self._cursor_value = self.cursor_value 51 if isinstance(self.stop_condition, str): 52 self._stop_condition: Optional[InterpolatedBoolean] = InterpolatedBoolean( 53 condition=self.stop_condition, parameters=parameters 54 ) 55 else: 56 self._stop_condition = self.stop_condition 57 58 @property 59 def initial_token(self) -> Optional[Any]: 60 """ 61 CursorPaginationStrategy does not have an initial value because the next cursor is typically included 62 in the response of the first request. For Resumable Full Refresh streams that checkpoint the page 63 cursor, the next cursor should be read from the state or stream slice object. 64 """ 65 return None 66 67 def next_page_token( 68 self, 69 response: requests.Response, 70 last_page_size: int, 71 last_record: Optional[Record], 72 last_page_token_value: Optional[Any] = None, 73 ) -> Optional[Any]: 74 decoded_response = next(self.decoder.decode(response)) 75 # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This 76 # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links 77 headers: Dict[str, Any] = dict(response.headers) 78 headers["link"] = response.links 79 if self._stop_condition: 80 should_stop = self._stop_condition.eval( 81 self.config, 82 response=decoded_response, 83 headers=headers, 84 last_record=last_record, 85 last_page_size=last_page_size, 86 ) 87 if should_stop: 88 return None 89 token = self._cursor_value.eval( 90 config=self.config, 91 response=decoded_response, 92 headers=headers, 93 last_record=last_record, 94 last_page_size=last_page_size, 95 ) 96 return token if token else None 97 98 def get_page_size(self) -> Optional[int]: 99 return self.page_size
Pagination strategy that evaluates an interpolated string to define the next page token
Attributes:
- page_size (Optional[int]): the number of records to request
- cursor_value (Union[InterpolatedString, str]): template string evaluating to the cursor value
- config (Config): connection config
- stop_condition (Optional[InterpolatedBoolean]): template string evaluating when to stop paginating
- decoder (Decoder): decoder to decode the response
58 @property 59 def initial_token(self) -> Optional[Any]: 60 """ 61 CursorPaginationStrategy does not have an initial value because the next cursor is typically included 62 in the response of the first request. For Resumable Full Refresh streams that checkpoint the page 63 cursor, the next cursor should be read from the state or stream slice object. 64 """ 65 return None
CursorPaginationStrategy does not have an initial value because the next cursor is typically included in the response of the first request. For Resumable Full Refresh streams that checkpoint the page cursor, the next cursor should be read from the state or stream slice object.
67 def next_page_token( 68 self, 69 response: requests.Response, 70 last_page_size: int, 71 last_record: Optional[Record], 72 last_page_token_value: Optional[Any] = None, 73 ) -> Optional[Any]: 74 decoded_response = next(self.decoder.decode(response)) 75 # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This 76 # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links 77 headers: Dict[str, Any] = dict(response.headers) 78 headers["link"] = response.links 79 if self._stop_condition: 80 should_stop = self._stop_condition.eval( 81 self.config, 82 response=decoded_response, 83 headers=headers, 84 last_record=last_record, 85 last_page_size=last_page_size, 86 ) 87 if should_stop: 88 return None 89 token = self._cursor_value.eval( 90 config=self.config, 91 response=decoded_response, 92 headers=headers, 93 last_record=last_record, 94 last_page_size=last_page_size, 95 ) 96 return token if token else None
Parameters
- response: response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
next page token. Returns None if there are no more pages to fetch
30class CursorStopCondition(PaginationStopCondition): 31 def __init__( 32 self, 33 cursor: DeclarativeCursor 34 | ConcurrentCursor, # migrate to use both old and concurrent versions 35 ): 36 self._cursor = cursor 37 38 def is_met(self, record: Record) -> bool: 39 return not self._cursor.should_be_synced(record)
Helper class that provides a standard way to create an ABC using inheritance.
23@dataclass 24class OffsetIncrement(PaginationStrategy): 25 """ 26 Pagination strategy that returns the number of records reads so far and returns it as the next page token 27 Examples: 28 # page_size to be a constant integer value 29 pagination_strategy: 30 type: OffsetIncrement 31 page_size: 2 32 33 # page_size to be a constant string value 34 pagination_strategy: 35 type: OffsetIncrement 36 page_size: "2" 37 38 # page_size to be an interpolated string value 39 pagination_strategy: 40 type: OffsetIncrement 41 page_size: "{{ parameters['items_per_page'] }}" 42 43 Attributes: 44 page_size (InterpolatedString): the number of records to request 45 """ 46 47 config: Config 48 page_size: Optional[Union[str, int]] 49 parameters: InitVar[Mapping[str, Any]] 50 decoder: Decoder = field( 51 default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})) 52 ) 53 inject_on_first_request: bool = False 54 55 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 56 page_size = str(self.page_size) if isinstance(self.page_size, int) else self.page_size 57 if page_size: 58 self._page_size: Optional[InterpolatedString] = InterpolatedString( 59 page_size, parameters=parameters 60 ) 61 else: 62 self._page_size = None 63 64 @property 65 def initial_token(self) -> Optional[Any]: 66 if self.inject_on_first_request: 67 return 0 68 return None 69 70 def next_page_token( 71 self, 72 response: requests.Response, 73 last_page_size: int, 74 last_record: Optional[Record], 75 last_page_token_value: Optional[Any] = None, 76 ) -> Optional[Any]: 77 decoded_response = next(self.decoder.decode(response)) 78 79 # Stop paginating when there are fewer records than the page size or the current page has no records 80 if ( 81 self._page_size 82 and last_page_size < self._page_size.eval(self.config, response=decoded_response) 83 ) or last_page_size == 0: 84 return None 85 elif last_page_token_value is None: 86 # If the OffsetIncrement strategy does not inject on the first request, the incoming last_page_token_value 87 # will be None. For this case, we assume that None was the first page and progress to the next offset 88 return 0 + last_page_size 89 elif not isinstance(last_page_token_value, int): 90 raise ValueError( 91 f"Last page token value {last_page_token_value} for OffsetIncrement pagination strategy was not an integer" 92 ) 93 else: 94 return last_page_token_value + last_page_size 95 96 def get_page_size(self) -> Optional[int]: 97 if self._page_size: 98 page_size = self._page_size.eval(self.config) 99 if not isinstance(page_size, int): 100 raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}") 101 return page_size 102 else: 103 return None
Pagination strategy that returns the number of records reads so far and returns it as the next page token
Examples:
page_size to be a constant integer value
pagination_strategy: type: OffsetIncrement page_size: 2
page_size to be a constant string value
pagination_strategy: type: OffsetIncrement page_size: "2"
page_size to be an interpolated string value
pagination_strategy: type: OffsetIncrement page_size: "{{ parameters['items_per_page'] }}"
Attributes:
- page_size (InterpolatedString): the number of records to request
64 @property 65 def initial_token(self) -> Optional[Any]: 66 if self.inject_on_first_request: 67 return 0 68 return None
Return the initial value of the token
70 def next_page_token( 71 self, 72 response: requests.Response, 73 last_page_size: int, 74 last_record: Optional[Record], 75 last_page_token_value: Optional[Any] = None, 76 ) -> Optional[Any]: 77 decoded_response = next(self.decoder.decode(response)) 78 79 # Stop paginating when there are fewer records than the page size or the current page has no records 80 if ( 81 self._page_size 82 and last_page_size < self._page_size.eval(self.config, response=decoded_response) 83 ) or last_page_size == 0: 84 return None 85 elif last_page_token_value is None: 86 # If the OffsetIncrement strategy does not inject on the first request, the incoming last_page_token_value 87 # will be None. For this case, we assume that None was the first page and progress to the next offset 88 return 0 + last_page_size 89 elif not isinstance(last_page_token_value, int): 90 raise ValueError( 91 f"Last page token value {last_page_token_value} for OffsetIncrement pagination strategy was not an integer" 92 ) 93 else: 94 return last_page_token_value + last_page_size
Parameters
- response: response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
next page token. Returns None if there are no more pages to fetch
96 def get_page_size(self) -> Optional[int]: 97 if self._page_size: 98 page_size = self._page_size.eval(self.config) 99 if not isinstance(page_size, int): 100 raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}") 101 return page_size 102 else: 103 return None
Returns
page size: The number of records to fetch in a page. Returns None if unspecified
18@dataclass 19class PageIncrement(PaginationStrategy): 20 """ 21 Pagination strategy that returns the number of pages reads so far and returns it as the next page token 22 23 Attributes: 24 page_size (int): the number of records to request 25 start_from_page (int): number of the initial page 26 """ 27 28 config: Config 29 page_size: Optional[Union[str, int]] 30 parameters: InitVar[Mapping[str, Any]] 31 start_from_page: int = 0 32 inject_on_first_request: bool = False 33 34 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 35 if isinstance(self.page_size, int) or (self.page_size is None): 36 self._page_size = self.page_size 37 else: 38 page_size = InterpolatedString(self.page_size, parameters=parameters).eval(self.config) 39 if not isinstance(page_size, int): 40 raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}") 41 self._page_size = page_size 42 43 @property 44 def initial_token(self) -> Optional[Any]: 45 if self.inject_on_first_request: 46 return self.start_from_page 47 return None 48 49 def next_page_token( 50 self, 51 response: requests.Response, 52 last_page_size: int, 53 last_record: Optional[Record], 54 last_page_token_value: Optional[Any], 55 ) -> Optional[Any]: 56 # Stop paginating when there are fewer records than the page size or the current page has no records 57 if (self._page_size and last_page_size < self._page_size) or last_page_size == 0: 58 return None 59 elif last_page_token_value is None: 60 # If the PageIncrement strategy does not inject on the first request, the incoming last_page_token_value 61 # may be None. When this is the case, we assume we've already requested the first page specified by 62 # start_from_page and must now get the next page 63 return self.start_from_page + 1 64 elif not isinstance(last_page_token_value, int): 65 raise ValueError( 66 f"Last page token value {last_page_token_value} for PageIncrement pagination strategy was not an integer" 67 ) 68 else: 69 return last_page_token_value + 1 70 71 def get_page_size(self) -> Optional[int]: 72 return self._page_size
Pagination strategy that returns the number of pages reads so far and returns it as the next page token
Attributes:
- page_size (int): the number of records to request
- start_from_page (int): number of the initial page
43 @property 44 def initial_token(self) -> Optional[Any]: 45 if self.inject_on_first_request: 46 return self.start_from_page 47 return None
Return the initial value of the token
49 def next_page_token( 50 self, 51 response: requests.Response, 52 last_page_size: int, 53 last_record: Optional[Record], 54 last_page_token_value: Optional[Any], 55 ) -> Optional[Any]: 56 # Stop paginating when there are fewer records than the page size or the current page has no records 57 if (self._page_size and last_page_size < self._page_size) or last_page_size == 0: 58 return None 59 elif last_page_token_value is None: 60 # If the PageIncrement strategy does not inject on the first request, the incoming last_page_token_value 61 # may be None. When this is the case, we assume we've already requested the first page specified by 62 # start_from_page and must now get the next page 63 return self.start_from_page + 1 64 elif not isinstance(last_page_token_value, int): 65 raise ValueError( 66 f"Last page token value {last_page_token_value} for PageIncrement pagination strategy was not an integer" 67 ) 68 else: 69 return last_page_token_value + 1
Parameters
- response: response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
next page token. Returns None if there are no more pages to fetch
42class StopConditionPaginationStrategyDecorator(PaginationStrategy): 43 def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStopCondition): 44 self._delegate = _delegate 45 self._stop_condition = stop_condition 46 47 def next_page_token( 48 self, 49 response: requests.Response, 50 last_page_size: int, 51 last_record: Optional[Record], 52 last_page_token_value: Optional[Any] = None, 53 ) -> Optional[Any]: 54 # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure 55 # will return records in descending order. In terms of performance/memory, we return the records lazily 56 if last_record and self._stop_condition.is_met(last_record): 57 return None 58 return self._delegate.next_page_token( 59 response, last_page_size, last_record, last_page_token_value 60 ) 61 62 def get_page_size(self) -> Optional[int]: 63 return self._delegate.get_page_size() 64 65 @property 66 def initial_token(self) -> Optional[Any]: 67 return self._delegate.initial_token
Defines how to get the next page token
47 def next_page_token( 48 self, 49 response: requests.Response, 50 last_page_size: int, 51 last_record: Optional[Record], 52 last_page_token_value: Optional[Any] = None, 53 ) -> Optional[Any]: 54 # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure 55 # will return records in descending order. In terms of performance/memory, we return the records lazily 56 if last_record and self._stop_condition.is_met(last_record): 57 return None 58 return self._delegate.next_page_token( 59 response, last_page_size, last_record, last_page_token_value 60 )
Parameters
- response: response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
next page token. Returns None if there are no more pages to fetch