airbyte_cdk.sources.declarative.async_job.job_tracker
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3import logging 4import threading 5import uuid 6from dataclasses import dataclass, field 7from typing import Any, Mapping, Set, Union 8 9from airbyte_cdk.logger import lazy_log 10from airbyte_cdk.sources.declarative.interpolation import InterpolatedString 11 12LOGGER = logging.getLogger("airbyte") 13 14 15class ConcurrentJobLimitReached(Exception): 16 pass 17 18 19@dataclass 20class JobTracker: 21 limit: Union[int, str] 22 config: Mapping[str, Any] = field(default_factory=dict) 23 24 def __post_init__(self) -> None: 25 self._jobs: Set[str] = set() 26 self._lock = threading.Lock() 27 if isinstance(self.limit, str): 28 try: 29 self.limit = int( 30 InterpolatedString(self.limit, parameters={}).eval(config=self.config) 31 ) 32 except Exception as e: 33 LOGGER.warning( 34 f"Error interpolating max job count: {self.limit}. Setting to 1. {e}" 35 ) 36 self.limit = 1 37 if self.limit < 1: 38 LOGGER.warning( 39 f"The `max_concurrent_async_job_count` property is less than 1: {self.limit}. Setting to 1. Please update the source manifest to set a valid value." 40 ) 41 self._limit = self.limit if self.limit >= 1 else 1 42 43 def try_to_get_intent(self) -> str: 44 lazy_log( 45 LOGGER, 46 logging.DEBUG, 47 lambda: f"JobTracker - Trying to acquire lock by thread {threading.get_native_id()}...", 48 ) 49 with self._lock: 50 if self._has_reached_limit(): 51 raise ConcurrentJobLimitReached( 52 "Can't allocate more jobs right now: limit already reached" 53 ) 54 intent = f"intent_{str(uuid.uuid4())}" 55 lazy_log( 56 LOGGER, 57 logging.DEBUG, 58 lambda: f"JobTracker - Thread {threading.get_native_id()} has acquired {intent}!", 59 ) 60 self._jobs.add(intent) 61 return intent 62 63 def add_job(self, intent_or_job_id: str, job_id: str) -> None: 64 if intent_or_job_id not in self._jobs: 65 raise ValueError( 66 f"Can't add job: Unknown intent or job id, known values are {self._jobs}" 67 ) 68 69 if intent_or_job_id == job_id: 70 # Nothing to do here as the ID to replace is the same 71 return 72 73 lazy_log( 74 LOGGER, 75 logging.DEBUG, 76 lambda: f"JobTracker - Thread {threading.get_native_id()} replacing job {intent_or_job_id} by {job_id}!", 77 ) 78 with self._lock: 79 self._jobs.add(job_id) 80 self._jobs.remove(intent_or_job_id) 81 82 def remove_job(self, job_id: str) -> None: 83 """ 84 If the job is not allocated as a running job, this method does nothing and it won't raise. 85 """ 86 lazy_log( 87 LOGGER, 88 logging.DEBUG, 89 lambda: f"JobTracker - Thread {threading.get_native_id()} removing job {job_id}", 90 ) 91 with self._lock: 92 self._jobs.discard(job_id) 93 94 def _has_reached_limit(self) -> bool: 95 return len(self._jobs) >= self._limit
LOGGER =
<Logger airbyte (INFO)>
class
ConcurrentJobLimitReached(builtins.Exception):
Common base class for all non-exit exceptions.
@dataclass
class
JobTracker:
20@dataclass 21class JobTracker: 22 limit: Union[int, str] 23 config: Mapping[str, Any] = field(default_factory=dict) 24 25 def __post_init__(self) -> None: 26 self._jobs: Set[str] = set() 27 self._lock = threading.Lock() 28 if isinstance(self.limit, str): 29 try: 30 self.limit = int( 31 InterpolatedString(self.limit, parameters={}).eval(config=self.config) 32 ) 33 except Exception as e: 34 LOGGER.warning( 35 f"Error interpolating max job count: {self.limit}. Setting to 1. {e}" 36 ) 37 self.limit = 1 38 if self.limit < 1: 39 LOGGER.warning( 40 f"The `max_concurrent_async_job_count` property is less than 1: {self.limit}. Setting to 1. Please update the source manifest to set a valid value." 41 ) 42 self._limit = self.limit if self.limit >= 1 else 1 43 44 def try_to_get_intent(self) -> str: 45 lazy_log( 46 LOGGER, 47 logging.DEBUG, 48 lambda: f"JobTracker - Trying to acquire lock by thread {threading.get_native_id()}...", 49 ) 50 with self._lock: 51 if self._has_reached_limit(): 52 raise ConcurrentJobLimitReached( 53 "Can't allocate more jobs right now: limit already reached" 54 ) 55 intent = f"intent_{str(uuid.uuid4())}" 56 lazy_log( 57 LOGGER, 58 logging.DEBUG, 59 lambda: f"JobTracker - Thread {threading.get_native_id()} has acquired {intent}!", 60 ) 61 self._jobs.add(intent) 62 return intent 63 64 def add_job(self, intent_or_job_id: str, job_id: str) -> None: 65 if intent_or_job_id not in self._jobs: 66 raise ValueError( 67 f"Can't add job: Unknown intent or job id, known values are {self._jobs}" 68 ) 69 70 if intent_or_job_id == job_id: 71 # Nothing to do here as the ID to replace is the same 72 return 73 74 lazy_log( 75 LOGGER, 76 logging.DEBUG, 77 lambda: f"JobTracker - Thread {threading.get_native_id()} replacing job {intent_or_job_id} by {job_id}!", 78 ) 79 with self._lock: 80 self._jobs.add(job_id) 81 self._jobs.remove(intent_or_job_id) 82 83 def remove_job(self, job_id: str) -> None: 84 """ 85 If the job is not allocated as a running job, this method does nothing and it won't raise. 86 """ 87 lazy_log( 88 LOGGER, 89 logging.DEBUG, 90 lambda: f"JobTracker - Thread {threading.get_native_id()} removing job {job_id}", 91 ) 92 with self._lock: 93 self._jobs.discard(job_id) 94 95 def _has_reached_limit(self) -> bool: 96 return len(self._jobs) >= self._limit
def
try_to_get_intent(self) -> str:
44 def try_to_get_intent(self) -> str: 45 lazy_log( 46 LOGGER, 47 logging.DEBUG, 48 lambda: f"JobTracker - Trying to acquire lock by thread {threading.get_native_id()}...", 49 ) 50 with self._lock: 51 if self._has_reached_limit(): 52 raise ConcurrentJobLimitReached( 53 "Can't allocate more jobs right now: limit already reached" 54 ) 55 intent = f"intent_{str(uuid.uuid4())}" 56 lazy_log( 57 LOGGER, 58 logging.DEBUG, 59 lambda: f"JobTracker - Thread {threading.get_native_id()} has acquired {intent}!", 60 ) 61 self._jobs.add(intent) 62 return intent
def
add_job(self, intent_or_job_id: str, job_id: str) -> None:
64 def add_job(self, intent_or_job_id: str, job_id: str) -> None: 65 if intent_or_job_id not in self._jobs: 66 raise ValueError( 67 f"Can't add job: Unknown intent or job id, known values are {self._jobs}" 68 ) 69 70 if intent_or_job_id == job_id: 71 # Nothing to do here as the ID to replace is the same 72 return 73 74 lazy_log( 75 LOGGER, 76 logging.DEBUG, 77 lambda: f"JobTracker - Thread {threading.get_native_id()} replacing job {intent_or_job_id} by {job_id}!", 78 ) 79 with self._lock: 80 self._jobs.add(job_id) 81 self._jobs.remove(intent_or_job_id)
def
remove_job(self, job_id: str) -> None:
83 def remove_job(self, job_id: str) -> None: 84 """ 85 If the job is not allocated as a running job, this method does nothing and it won't raise. 86 """ 87 lazy_log( 88 LOGGER, 89 logging.DEBUG, 90 lambda: f"JobTracker - Thread {threading.get_native_id()} removing job {job_id}", 91 ) 92 with self._lock: 93 self._jobs.discard(job_id)
If the job is not allocated as a running job, this method does nothing and it won't raise.