airbyte_cdk.sources.declarative.concurrency_level
@dataclass
class
ConcurrencyLevel:
13@dataclass 14class ConcurrencyLevel: 15 """ 16 Returns the number of worker threads that should be used when syncing concurrent streams in parallel 17 18 Attributes: 19 default_concurrency (Union[int, str]): The hardcoded integer or interpolation of how many worker threads to use during a sync 20 max_concurrency (Optional[int]): The maximum number of worker threads to use when the default_concurrency is exceeded 21 """ 22 23 default_concurrency: Union[int, str] 24 max_concurrency: Optional[int] 25 config: Config 26 parameters: InitVar[Mapping[str, Any]] 27 28 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 29 if isinstance(self.default_concurrency, int): 30 self._default_concurrency: Union[int, InterpolatedString] = self.default_concurrency 31 elif "config" in self.default_concurrency and not self.max_concurrency: 32 raise ValueError( 33 "ConcurrencyLevel requires that max_concurrency be defined if the default_concurrency can be used-specified" 34 ) 35 else: 36 self._default_concurrency = InterpolatedString.create( 37 self.default_concurrency, parameters=parameters 38 ) 39 40 def get_concurrency_level(self) -> int: 41 if isinstance(self._default_concurrency, InterpolatedString): 42 evaluated_default_concurrency = self._default_concurrency.eval(config=self.config) 43 if not isinstance(evaluated_default_concurrency, int): 44 raise ValueError("default_concurrency did not evaluate to an integer") 45 return ( 46 min(evaluated_default_concurrency, self.max_concurrency) 47 if self.max_concurrency 48 else evaluated_default_concurrency 49 ) 50 else: 51 return self._default_concurrency
Returns the number of worker threads that should be used when syncing concurrent streams in parallel
Attributes:
- default_concurrency (Union[int, str]): The hardcoded integer or interpolation of how many worker threads to use during a sync
- max_concurrency (Optional[int]): The maximum number of worker threads to use when the default_concurrency is exceeded
ConcurrencyLevel( default_concurrency: Union[int, str], max_concurrency: Optional[int], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
def
get_concurrency_level(self) -> int:
40 def get_concurrency_level(self) -> int: 41 if isinstance(self._default_concurrency, InterpolatedString): 42 evaluated_default_concurrency = self._default_concurrency.eval(config=self.config) 43 if not isinstance(evaluated_default_concurrency, int): 44 raise ValueError("default_concurrency did not evaluate to an integer") 45 return ( 46 min(evaluated_default_concurrency, self.max_concurrency) 47 if self.max_concurrency 48 else evaluated_default_concurrency 49 ) 50 else: 51 return self._default_concurrency