airbyte_cdk.sources.concurrent_source.thread_pool_manager
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import logging 5import threading 6from concurrent.futures import Future, ThreadPoolExecutor 7from typing import Any, Callable, List, Optional 8 9 10class ThreadPoolManager: 11 """ 12 Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed. 13 """ 14 15 DEFAULT_MAX_QUEUE_SIZE = 10_000 16 17 def __init__( 18 self, 19 threadpool: ThreadPoolExecutor, 20 logger: logging.Logger, 21 max_concurrent_tasks: int = DEFAULT_MAX_QUEUE_SIZE, 22 ): 23 """ 24 :param threadpool: The threadpool to use 25 :param logger: The logger to use 26 :param max_concurrent_tasks: The maximum number of tasks that can be pending at the same time 27 """ 28 self._threadpool = threadpool 29 self._logger = logger 30 self._max_concurrent_tasks = max_concurrent_tasks 31 self._futures: List[Future[Any]] = [] 32 self._lock = threading.Lock() 33 self._most_recently_seen_exception: Optional[Exception] = None 34 35 self._logging_threshold = max_concurrent_tasks * 2 36 37 def prune_to_validate_has_reached_futures_limit(self) -> bool: 38 self._prune_futures(self._futures) 39 if len(self._futures) > self._logging_threshold: 40 self._logger.warning( 41 f"ThreadPoolManager: The list of futures is getting bigger than expected ({len(self._futures)})" 42 ) 43 return len(self._futures) >= self._max_concurrent_tasks 44 45 def submit(self, function: Callable[..., Any], *args: Any) -> None: 46 self._futures.append(self._threadpool.submit(function, *args)) 47 48 def _prune_futures(self, futures: List[Future[Any]]) -> None: 49 """ 50 Take a list in input and remove the futures that are completed. If a future has an exception, it'll raise and kill the stream 51 operation. 52 53 We are using a lock here as without it, the algorithm would not be thread safe 54 """ 55 with self._lock: 56 if len(futures) < self._max_concurrent_tasks: 57 return 58 59 for index in reversed(range(len(futures))): 60 future = futures[index] 61 62 if future.done(): 63 # Only call future.exception() if the future is known to be done because it will block until the future is done. 64 # See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.exception 65 optional_exception = future.exception() 66 if optional_exception: 67 # Exception handling should be done in the main thread. Hence, we only store the exception and expect the main 68 # thread to call raise_if_exception 69 # We do not expect this error to happen. The futures created during concurrent syncs should catch the exception and 70 # push it to the queue. If this exception occurs, please review the futures and how they handle exceptions. 71 self._most_recently_seen_exception = RuntimeError( 72 f"Failed processing a future: {optional_exception}. Please contact the Airbyte team." 73 ) 74 futures.pop(index) 75 76 def _shutdown(self) -> None: 77 # Without a way to stop the threads that have already started, this will not stop the Python application. We are fine today with 78 # this imperfect approach because we only do this in case of `self._most_recently_seen_exception` which we don't expect to happen 79 self._threadpool.shutdown(wait=False, cancel_futures=True) 80 81 def is_done(self) -> bool: 82 return all([f.done() for f in self._futures]) 83 84 def check_for_errors_and_shutdown(self) -> None: 85 """ 86 Check if any of the futures have an exception, and raise it if so. If all futures are done, shutdown the threadpool. 87 If the futures are not done, raise an exception. 88 :return: 89 """ 90 if self._most_recently_seen_exception: 91 self._logger.exception( 92 "An unknown exception has occurred while reading concurrently", 93 exc_info=self._most_recently_seen_exception, 94 ) 95 self._stop_and_raise_exception(self._most_recently_seen_exception) 96 97 exceptions_from_futures = [ 98 f for f in [future.exception() for future in self._futures] if f is not None 99 ] 100 if exceptions_from_futures: 101 exception = RuntimeError(f"Failed reading with errors: {exceptions_from_futures}") 102 self._stop_and_raise_exception(exception) 103 else: 104 futures_not_done = [f for f in self._futures if not f.done()] 105 if futures_not_done: 106 exception = RuntimeError( 107 f"Failed reading with futures not done: {futures_not_done}" 108 ) 109 self._stop_and_raise_exception(exception) 110 else: 111 self._shutdown() 112 113 def _stop_and_raise_exception(self, exception: BaseException) -> None: 114 self._shutdown() 115 raise exception
class
ThreadPoolManager:
11class ThreadPoolManager: 12 """ 13 Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed. 14 """ 15 16 DEFAULT_MAX_QUEUE_SIZE = 10_000 17 18 def __init__( 19 self, 20 threadpool: ThreadPoolExecutor, 21 logger: logging.Logger, 22 max_concurrent_tasks: int = DEFAULT_MAX_QUEUE_SIZE, 23 ): 24 """ 25 :param threadpool: The threadpool to use 26 :param logger: The logger to use 27 :param max_concurrent_tasks: The maximum number of tasks that can be pending at the same time 28 """ 29 self._threadpool = threadpool 30 self._logger = logger 31 self._max_concurrent_tasks = max_concurrent_tasks 32 self._futures: List[Future[Any]] = [] 33 self._lock = threading.Lock() 34 self._most_recently_seen_exception: Optional[Exception] = None 35 36 self._logging_threshold = max_concurrent_tasks * 2 37 38 def prune_to_validate_has_reached_futures_limit(self) -> bool: 39 self._prune_futures(self._futures) 40 if len(self._futures) > self._logging_threshold: 41 self._logger.warning( 42 f"ThreadPoolManager: The list of futures is getting bigger than expected ({len(self._futures)})" 43 ) 44 return len(self._futures) >= self._max_concurrent_tasks 45 46 def submit(self, function: Callable[..., Any], *args: Any) -> None: 47 self._futures.append(self._threadpool.submit(function, *args)) 48 49 def _prune_futures(self, futures: List[Future[Any]]) -> None: 50 """ 51 Take a list in input and remove the futures that are completed. If a future has an exception, it'll raise and kill the stream 52 operation. 53 54 We are using a lock here as without it, the algorithm would not be thread safe 55 """ 56 with self._lock: 57 if len(futures) < self._max_concurrent_tasks: 58 return 59 60 for index in reversed(range(len(futures))): 61 future = futures[index] 62 63 if future.done(): 64 # Only call future.exception() if the future is known to be done because it will block until the future is done. 65 # See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.exception 66 optional_exception = future.exception() 67 if optional_exception: 68 # Exception handling should be done in the main thread. Hence, we only store the exception and expect the main 69 # thread to call raise_if_exception 70 # We do not expect this error to happen. The futures created during concurrent syncs should catch the exception and 71 # push it to the queue. If this exception occurs, please review the futures and how they handle exceptions. 72 self._most_recently_seen_exception = RuntimeError( 73 f"Failed processing a future: {optional_exception}. Please contact the Airbyte team." 74 ) 75 futures.pop(index) 76 77 def _shutdown(self) -> None: 78 # Without a way to stop the threads that have already started, this will not stop the Python application. We are fine today with 79 # this imperfect approach because we only do this in case of `self._most_recently_seen_exception` which we don't expect to happen 80 self._threadpool.shutdown(wait=False, cancel_futures=True) 81 82 def is_done(self) -> bool: 83 return all([f.done() for f in self._futures]) 84 85 def check_for_errors_and_shutdown(self) -> None: 86 """ 87 Check if any of the futures have an exception, and raise it if so. If all futures are done, shutdown the threadpool. 88 If the futures are not done, raise an exception. 89 :return: 90 """ 91 if self._most_recently_seen_exception: 92 self._logger.exception( 93 "An unknown exception has occurred while reading concurrently", 94 exc_info=self._most_recently_seen_exception, 95 ) 96 self._stop_and_raise_exception(self._most_recently_seen_exception) 97 98 exceptions_from_futures = [ 99 f for f in [future.exception() for future in self._futures] if f is not None 100 ] 101 if exceptions_from_futures: 102 exception = RuntimeError(f"Failed reading with errors: {exceptions_from_futures}") 103 self._stop_and_raise_exception(exception) 104 else: 105 futures_not_done = [f for f in self._futures if not f.done()] 106 if futures_not_done: 107 exception = RuntimeError( 108 f"Failed reading with futures not done: {futures_not_done}" 109 ) 110 self._stop_and_raise_exception(exception) 111 else: 112 self._shutdown() 113 114 def _stop_and_raise_exception(self, exception: BaseException) -> None: 115 self._shutdown() 116 raise exception
Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed.
ThreadPoolManager( threadpool: concurrent.futures.thread.ThreadPoolExecutor, logger: logging.Logger, max_concurrent_tasks: int = 10000)
18 def __init__( 19 self, 20 threadpool: ThreadPoolExecutor, 21 logger: logging.Logger, 22 max_concurrent_tasks: int = DEFAULT_MAX_QUEUE_SIZE, 23 ): 24 """ 25 :param threadpool: The threadpool to use 26 :param logger: The logger to use 27 :param max_concurrent_tasks: The maximum number of tasks that can be pending at the same time 28 """ 29 self._threadpool = threadpool 30 self._logger = logger 31 self._max_concurrent_tasks = max_concurrent_tasks 32 self._futures: List[Future[Any]] = [] 33 self._lock = threading.Lock() 34 self._most_recently_seen_exception: Optional[Exception] = None 35 36 self._logging_threshold = max_concurrent_tasks * 2
Parameters
- threadpool: The threadpool to use
- logger: The logger to use
- max_concurrent_tasks: The maximum number of tasks that can be pending at the same time
def
prune_to_validate_has_reached_futures_limit(self) -> bool:
38 def prune_to_validate_has_reached_futures_limit(self) -> bool: 39 self._prune_futures(self._futures) 40 if len(self._futures) > self._logging_threshold: 41 self._logger.warning( 42 f"ThreadPoolManager: The list of futures is getting bigger than expected ({len(self._futures)})" 43 ) 44 return len(self._futures) >= self._max_concurrent_tasks
def
check_for_errors_and_shutdown(self) -> None:
85 def check_for_errors_and_shutdown(self) -> None: 86 """ 87 Check if any of the futures have an exception, and raise it if so. If all futures are done, shutdown the threadpool. 88 If the futures are not done, raise an exception. 89 :return: 90 """ 91 if self._most_recently_seen_exception: 92 self._logger.exception( 93 "An unknown exception has occurred while reading concurrently", 94 exc_info=self._most_recently_seen_exception, 95 ) 96 self._stop_and_raise_exception(self._most_recently_seen_exception) 97 98 exceptions_from_futures = [ 99 f for f in [future.exception() for future in self._futures] if f is not None 100 ] 101 if exceptions_from_futures: 102 exception = RuntimeError(f"Failed reading with errors: {exceptions_from_futures}") 103 self._stop_and_raise_exception(exception) 104 else: 105 futures_not_done = [f for f in self._futures if not f.done()] 106 if futures_not_done: 107 exception = RuntimeError( 108 f"Failed reading with futures not done: {futures_not_done}" 109 ) 110 self._stop_and_raise_exception(exception) 111 else: 112 self._shutdown()
Check if any of the futures have an exception, and raise it if so. If all futures are done, shutdown the threadpool. If the futures are not done, raise an exception.