airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import abstractmethod 6from dataclasses import dataclass 7from typing import Any, Optional 8 9import requests 10 11from airbyte_cdk.sources.types import Record 12 13 14@dataclass 15class PaginationStrategy: 16 """ 17 Defines how to get the next page token 18 """ 19 20 @property 21 @abstractmethod 22 def initial_token(self) -> Optional[Any]: 23 """ 24 Return the initial value of the token 25 """ 26 27 @abstractmethod 28 def next_page_token( 29 self, 30 response: requests.Response, 31 last_page_size: int, 32 last_record: Optional[Record], 33 last_page_token_value: Optional[Any], 34 ) -> Optional[Any]: 35 """ 36 :param response: response to process 37 :param last_page_size: the number of records read from the response 38 :param last_record: the last record extracted from the response 39 :param last_page_token_value: The current value of the page token made on the last request 40 :return: next page token. Returns None if there are no more pages to fetch 41 """ 42 pass 43 44 @abstractmethod 45 def get_page_size(self) -> Optional[int]: 46 """ 47 :return: page size: The number of records to fetch in a page. Returns None if unspecified 48 """
@dataclass
class
PaginationStrategy:
15@dataclass 16class PaginationStrategy: 17 """ 18 Defines how to get the next page token 19 """ 20 21 @property 22 @abstractmethod 23 def initial_token(self) -> Optional[Any]: 24 """ 25 Return the initial value of the token 26 """ 27 28 @abstractmethod 29 def next_page_token( 30 self, 31 response: requests.Response, 32 last_page_size: int, 33 last_record: Optional[Record], 34 last_page_token_value: Optional[Any], 35 ) -> Optional[Any]: 36 """ 37 :param response: response to process 38 :param last_page_size: the number of records read from the response 39 :param last_record: the last record extracted from the response 40 :param last_page_token_value: The current value of the page token made on the last request 41 :return: next page token. Returns None if there are no more pages to fetch 42 """ 43 pass 44 45 @abstractmethod 46 def get_page_size(self) -> Optional[int]: 47 """ 48 :return: page size: The number of records to fetch in a page. Returns None if unspecified 49 """
Defines how to get the next page token
initial_token: Optional[Any]
21 @property 22 @abstractmethod 23 def initial_token(self) -> Optional[Any]: 24 """ 25 Return the initial value of the token 26 """
Return the initial value of the token
@abstractmethod
def
next_page_token( self, response: requests.models.Response, last_page_size: int, last_record: Optional[airbyte_cdk.Record], last_page_token_value: Optional[Any]) -> Optional[Any]:
28 @abstractmethod 29 def next_page_token( 30 self, 31 response: requests.Response, 32 last_page_size: int, 33 last_record: Optional[Record], 34 last_page_token_value: Optional[Any], 35 ) -> Optional[Any]: 36 """ 37 :param response: response to process 38 :param last_page_size: the number of records read from the response 39 :param last_record: the last record extracted from the response 40 :param last_page_token_value: The current value of the page token made on the last request 41 :return: next page token. Returns None if there are no more pages to fetch 42 """ 43 pass
Parameters
- response: response to process
- last_page_size: the number of records read from the response
- last_record: the last record extracted from the response
- last_page_token_value: The current value of the page token made on the last request
Returns
next page token. Returns None if there are no more pages to fetch
@abstractmethod
def
get_page_size(self) -> Optional[int]:
45 @abstractmethod 46 def get_page_size(self) -> Optional[int]: 47 """ 48 :return: page size: The number of records to fetch in a page. Returns None if unspecified 49 """
Returns
page size: The number of records to fetch in a page. Returns None if unspecified