airbyte_cdk.sources.declarative.async_job.job_tracker

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2
 3import logging
 4import threading
 5import uuid
 6from dataclasses import dataclass, field
 7from typing import Any, Mapping, Set, Union
 8
 9from airbyte_cdk.logger import lazy_log
10from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
11
12LOGGER = logging.getLogger("airbyte")
13
14
15class ConcurrentJobLimitReached(Exception):
16    pass
17
18
19@dataclass
20class JobTracker:
21    limit: Union[int, str]
22    config: Mapping[str, Any] = field(default_factory=dict)
23
24    def __post_init__(self) -> None:
25        self._jobs: Set[str] = set()
26        self._lock = threading.Lock()
27        if isinstance(self.limit, str):
28            try:
29                self.limit = int(
30                    InterpolatedString(self.limit, parameters={}).eval(config=self.config)
31                )
32            except Exception as e:
33                LOGGER.warning(
34                    f"Error interpolating max job count: {self.limit}. Setting to 1. {e}"
35                )
36                self.limit = 1
37        if self.limit < 1:
38            LOGGER.warning(
39                f"The `max_concurrent_async_job_count` property is less than 1: {self.limit}. Setting to 1. Please update the source manifest to set a valid value."
40            )
41        self._limit = self.limit if self.limit >= 1 else 1
42
43    def try_to_get_intent(self) -> str:
44        lazy_log(
45            LOGGER,
46            logging.DEBUG,
47            lambda: f"JobTracker - Trying to acquire lock by thread {threading.get_native_id()}...",
48        )
49        with self._lock:
50            if self._has_reached_limit():
51                raise ConcurrentJobLimitReached(
52                    "Can't allocate more jobs right now: limit already reached"
53                )
54            intent = f"intent_{str(uuid.uuid4())}"
55            lazy_log(
56                LOGGER,
57                logging.DEBUG,
58                lambda: f"JobTracker - Thread {threading.get_native_id()} has acquired {intent}!",
59            )
60            self._jobs.add(intent)
61            return intent
62
63    def add_job(self, intent_or_job_id: str, job_id: str) -> None:
64        if intent_or_job_id not in self._jobs:
65            raise ValueError(
66                f"Can't add job: Unknown intent or job id, known values are {self._jobs}"
67            )
68
69        if intent_or_job_id == job_id:
70            # Nothing to do here as the ID to replace is the same
71            return
72
73        lazy_log(
74            LOGGER,
75            logging.DEBUG,
76            lambda: f"JobTracker - Thread {threading.get_native_id()} replacing job {intent_or_job_id} by {job_id}!",
77        )
78        with self._lock:
79            self._jobs.add(job_id)
80            self._jobs.remove(intent_or_job_id)
81
82    def remove_job(self, job_id: str) -> None:
83        """
84        If the job is not allocated as a running job, this method does nothing and it won't raise.
85        """
86        lazy_log(
87            LOGGER,
88            logging.DEBUG,
89            lambda: f"JobTracker - Thread {threading.get_native_id()} removing job {job_id}",
90        )
91        with self._lock:
92            self._jobs.discard(job_id)
93
94    def _has_reached_limit(self) -> bool:
95        return len(self._jobs) >= self._limit
LOGGER = <Logger airbyte (INFO)>
class ConcurrentJobLimitReached(builtins.Exception):
16class ConcurrentJobLimitReached(Exception):
17    pass

Common base class for all non-exit exceptions.

@dataclass
class JobTracker:
20@dataclass
21class JobTracker:
22    limit: Union[int, str]
23    config: Mapping[str, Any] = field(default_factory=dict)
24
25    def __post_init__(self) -> None:
26        self._jobs: Set[str] = set()
27        self._lock = threading.Lock()
28        if isinstance(self.limit, str):
29            try:
30                self.limit = int(
31                    InterpolatedString(self.limit, parameters={}).eval(config=self.config)
32                )
33            except Exception as e:
34                LOGGER.warning(
35                    f"Error interpolating max job count: {self.limit}. Setting to 1. {e}"
36                )
37                self.limit = 1
38        if self.limit < 1:
39            LOGGER.warning(
40                f"The `max_concurrent_async_job_count` property is less than 1: {self.limit}. Setting to 1. Please update the source manifest to set a valid value."
41            )
42        self._limit = self.limit if self.limit >= 1 else 1
43
44    def try_to_get_intent(self) -> str:
45        lazy_log(
46            LOGGER,
47            logging.DEBUG,
48            lambda: f"JobTracker - Trying to acquire lock by thread {threading.get_native_id()}...",
49        )
50        with self._lock:
51            if self._has_reached_limit():
52                raise ConcurrentJobLimitReached(
53                    "Can't allocate more jobs right now: limit already reached"
54                )
55            intent = f"intent_{str(uuid.uuid4())}"
56            lazy_log(
57                LOGGER,
58                logging.DEBUG,
59                lambda: f"JobTracker - Thread {threading.get_native_id()} has acquired {intent}!",
60            )
61            self._jobs.add(intent)
62            return intent
63
64    def add_job(self, intent_or_job_id: str, job_id: str) -> None:
65        if intent_or_job_id not in self._jobs:
66            raise ValueError(
67                f"Can't add job: Unknown intent or job id, known values are {self._jobs}"
68            )
69
70        if intent_or_job_id == job_id:
71            # Nothing to do here as the ID to replace is the same
72            return
73
74        lazy_log(
75            LOGGER,
76            logging.DEBUG,
77            lambda: f"JobTracker - Thread {threading.get_native_id()} replacing job {intent_or_job_id} by {job_id}!",
78        )
79        with self._lock:
80            self._jobs.add(job_id)
81            self._jobs.remove(intent_or_job_id)
82
83    def remove_job(self, job_id: str) -> None:
84        """
85        If the job is not allocated as a running job, this method does nothing and it won't raise.
86        """
87        lazy_log(
88            LOGGER,
89            logging.DEBUG,
90            lambda: f"JobTracker - Thread {threading.get_native_id()} removing job {job_id}",
91        )
92        with self._lock:
93            self._jobs.discard(job_id)
94
95    def _has_reached_limit(self) -> bool:
96        return len(self._jobs) >= self._limit
JobTracker(limit: Union[int, str], config: Mapping[str, Any] = <factory>)
limit: Union[int, str]
config: Mapping[str, Any]
def try_to_get_intent(self) -> str:
44    def try_to_get_intent(self) -> str:
45        lazy_log(
46            LOGGER,
47            logging.DEBUG,
48            lambda: f"JobTracker - Trying to acquire lock by thread {threading.get_native_id()}...",
49        )
50        with self._lock:
51            if self._has_reached_limit():
52                raise ConcurrentJobLimitReached(
53                    "Can't allocate more jobs right now: limit already reached"
54                )
55            intent = f"intent_{str(uuid.uuid4())}"
56            lazy_log(
57                LOGGER,
58                logging.DEBUG,
59                lambda: f"JobTracker - Thread {threading.get_native_id()} has acquired {intent}!",
60            )
61            self._jobs.add(intent)
62            return intent
def add_job(self, intent_or_job_id: str, job_id: str) -> None:
64    def add_job(self, intent_or_job_id: str, job_id: str) -> None:
65        if intent_or_job_id not in self._jobs:
66            raise ValueError(
67                f"Can't add job: Unknown intent or job id, known values are {self._jobs}"
68            )
69
70        if intent_or_job_id == job_id:
71            # Nothing to do here as the ID to replace is the same
72            return
73
74        lazy_log(
75            LOGGER,
76            logging.DEBUG,
77            lambda: f"JobTracker - Thread {threading.get_native_id()} replacing job {intent_or_job_id} by {job_id}!",
78        )
79        with self._lock:
80            self._jobs.add(job_id)
81            self._jobs.remove(intent_or_job_id)
def remove_job(self, job_id: str) -> None:
83    def remove_job(self, job_id: str) -> None:
84        """
85        If the job is not allocated as a running job, this method does nothing and it won't raise.
86        """
87        lazy_log(
88            LOGGER,
89            logging.DEBUG,
90            lambda: f"JobTracker - Thread {threading.get_native_id()} removing job {job_id}",
91        )
92        with self._lock:
93            self._jobs.discard(job_id)

If the job is not allocated as a running job, this method does nothing and it won't raise.