airbyte_cdk.sources.declarative.concurrency_level

1#
2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3#
4
5from airbyte_cdk.sources.declarative.concurrency_level.concurrency_level import ConcurrencyLevel
6
7__all__ = ["ConcurrencyLevel"]
@dataclass
class ConcurrencyLevel:
13@dataclass
14class ConcurrencyLevel:
15    """
16    Returns the number of worker threads that should be used when syncing concurrent streams in parallel
17
18    Attributes:
19        default_concurrency (Union[int, str]): The hardcoded integer or interpolation of how many worker threads to use during a sync
20        max_concurrency (Optional[int]): The maximum number of worker threads to use when the default_concurrency is exceeded
21    """
22
23    default_concurrency: Union[int, str]
24    max_concurrency: Optional[int]
25    config: Config
26    parameters: InitVar[Mapping[str, Any]]
27
28    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
29        if isinstance(self.default_concurrency, int):
30            self._default_concurrency: Union[int, InterpolatedString] = self.default_concurrency
31        elif "config" in self.default_concurrency and not self.max_concurrency:
32            raise ValueError(
33                "ConcurrencyLevel requires that max_concurrency be defined if the default_concurrency can be used-specified"
34            )
35        else:
36            self._default_concurrency = InterpolatedString.create(
37                self.default_concurrency, parameters=parameters
38            )
39
40    def get_concurrency_level(self) -> int:
41        if isinstance(self._default_concurrency, InterpolatedString):
42            evaluated_default_concurrency = self._default_concurrency.eval(config=self.config)
43            if not isinstance(evaluated_default_concurrency, int):
44                raise ValueError("default_concurrency did not evaluate to an integer")
45            return (
46                min(evaluated_default_concurrency, self.max_concurrency)
47                if self.max_concurrency
48                else evaluated_default_concurrency
49            )
50        else:
51            return self._default_concurrency

Returns the number of worker threads that should be used when syncing concurrent streams in parallel

Attributes:
  • default_concurrency (Union[int, str]): The hardcoded integer or interpolation of how many worker threads to use during a sync
  • max_concurrency (Optional[int]): The maximum number of worker threads to use when the default_concurrency is exceeded
ConcurrencyLevel( default_concurrency: Union[int, str], max_concurrency: Optional[int], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
default_concurrency: Union[int, str]
max_concurrency: Optional[int]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_concurrency_level(self) -> int:
40    def get_concurrency_level(self) -> int:
41        if isinstance(self._default_concurrency, InterpolatedString):
42            evaluated_default_concurrency = self._default_concurrency.eval(config=self.config)
43            if not isinstance(evaluated_default_concurrency, int):
44                raise ValueError("default_concurrency did not evaluate to an integer")
45            return (
46                min(evaluated_default_concurrency, self.max_concurrency)
47                if self.max_concurrency
48                else evaluated_default_concurrency
49            )
50        else:
51            return self._default_concurrency