airbyte_cdk.sources.concurrent_source.thread_pool_manager

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import logging
  5import threading
  6from concurrent.futures import Future, ThreadPoolExecutor
  7from typing import Any, Callable, List, Optional
  8
  9
 10class ThreadPoolManager:
 11    """
 12    Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed.
 13    """
 14
 15    DEFAULT_MAX_QUEUE_SIZE = 10_000
 16
 17    def __init__(
 18        self,
 19        threadpool: ThreadPoolExecutor,
 20        logger: logging.Logger,
 21        max_concurrent_tasks: int = DEFAULT_MAX_QUEUE_SIZE,
 22    ):
 23        """
 24        :param threadpool: The threadpool to use
 25        :param logger: The logger to use
 26        :param max_concurrent_tasks: The maximum number of tasks that can be pending at the same time
 27        """
 28        self._threadpool = threadpool
 29        self._logger = logger
 30        self._max_concurrent_tasks = max_concurrent_tasks
 31        self._futures: List[Future[Any]] = []
 32        self._lock = threading.Lock()
 33        self._most_recently_seen_exception: Optional[Exception] = None
 34
 35        self._logging_threshold = max_concurrent_tasks * 2
 36
 37    def prune_to_validate_has_reached_futures_limit(self) -> bool:
 38        self._prune_futures(self._futures)
 39        if len(self._futures) > self._logging_threshold:
 40            self._logger.warning(
 41                f"ThreadPoolManager: The list of futures is getting bigger than expected ({len(self._futures)})"
 42            )
 43        return len(self._futures) >= self._max_concurrent_tasks
 44
 45    def submit(self, function: Callable[..., Any], *args: Any) -> None:
 46        self._futures.append(self._threadpool.submit(function, *args))
 47
 48    def _prune_futures(self, futures: List[Future[Any]]) -> None:
 49        """
 50        Take a list in input and remove the futures that are completed. If a future has an exception, it'll raise and kill the stream
 51        operation.
 52
 53        We are using a lock here as without it, the algorithm would not be thread safe
 54        """
 55        with self._lock:
 56            if len(futures) < self._max_concurrent_tasks:
 57                return
 58
 59            for index in reversed(range(len(futures))):
 60                future = futures[index]
 61
 62                if future.done():
 63                    # Only call future.exception() if the future is known to be done because it will block until the future is done.
 64                    # See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.exception
 65                    optional_exception = future.exception()
 66                    if optional_exception:
 67                        # Exception handling should be done in the main thread. Hence, we only store the exception and expect the main
 68                        # thread to call raise_if_exception
 69                        # We do not expect this error to happen. The futures created during concurrent syncs should catch the exception and
 70                        # push it to the queue. If this exception occurs, please review the futures and how they handle exceptions.
 71                        self._most_recently_seen_exception = RuntimeError(
 72                            f"Failed processing a future: {optional_exception}. Please contact the Airbyte team."
 73                        )
 74                    futures.pop(index)
 75
 76    def _shutdown(self) -> None:
 77        # Without a way to stop the threads that have already started, this will not stop the Python application. We are fine today with
 78        # this imperfect approach because we only do this in case of `self._most_recently_seen_exception` which we don't expect to happen
 79        self._threadpool.shutdown(wait=False, cancel_futures=True)
 80
 81    def is_done(self) -> bool:
 82        return all([f.done() for f in self._futures])
 83
 84    def check_for_errors_and_shutdown(self) -> None:
 85        """
 86        Check if any of the futures have an exception, and raise it if so. If all futures are done, shutdown the threadpool.
 87        If the futures are not done, raise an exception.
 88        :return:
 89        """
 90        if self._most_recently_seen_exception:
 91            self._logger.exception(
 92                "An unknown exception has occurred while reading concurrently",
 93                exc_info=self._most_recently_seen_exception,
 94            )
 95            self._stop_and_raise_exception(self._most_recently_seen_exception)
 96
 97        exceptions_from_futures = [
 98            f for f in [future.exception() for future in self._futures] if f is not None
 99        ]
100        if exceptions_from_futures:
101            exception = RuntimeError(f"Failed reading with errors: {exceptions_from_futures}")
102            self._stop_and_raise_exception(exception)
103        else:
104            futures_not_done = [f for f in self._futures if not f.done()]
105            if futures_not_done:
106                exception = RuntimeError(
107                    f"Failed reading with futures not done: {futures_not_done}"
108                )
109                self._stop_and_raise_exception(exception)
110            else:
111                self._shutdown()
112
113    def _stop_and_raise_exception(self, exception: BaseException) -> None:
114        self._shutdown()
115        raise exception
class ThreadPoolManager:
 11class ThreadPoolManager:
 12    """
 13    Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed.
 14    """
 15
 16    DEFAULT_MAX_QUEUE_SIZE = 10_000
 17
 18    def __init__(
 19        self,
 20        threadpool: ThreadPoolExecutor,
 21        logger: logging.Logger,
 22        max_concurrent_tasks: int = DEFAULT_MAX_QUEUE_SIZE,
 23    ):
 24        """
 25        :param threadpool: The threadpool to use
 26        :param logger: The logger to use
 27        :param max_concurrent_tasks: The maximum number of tasks that can be pending at the same time
 28        """
 29        self._threadpool = threadpool
 30        self._logger = logger
 31        self._max_concurrent_tasks = max_concurrent_tasks
 32        self._futures: List[Future[Any]] = []
 33        self._lock = threading.Lock()
 34        self._most_recently_seen_exception: Optional[Exception] = None
 35
 36        self._logging_threshold = max_concurrent_tasks * 2
 37
 38    def prune_to_validate_has_reached_futures_limit(self) -> bool:
 39        self._prune_futures(self._futures)
 40        if len(self._futures) > self._logging_threshold:
 41            self._logger.warning(
 42                f"ThreadPoolManager: The list of futures is getting bigger than expected ({len(self._futures)})"
 43            )
 44        return len(self._futures) >= self._max_concurrent_tasks
 45
 46    def submit(self, function: Callable[..., Any], *args: Any) -> None:
 47        self._futures.append(self._threadpool.submit(function, *args))
 48
 49    def _prune_futures(self, futures: List[Future[Any]]) -> None:
 50        """
 51        Take a list in input and remove the futures that are completed. If a future has an exception, it'll raise and kill the stream
 52        operation.
 53
 54        We are using a lock here as without it, the algorithm would not be thread safe
 55        """
 56        with self._lock:
 57            if len(futures) < self._max_concurrent_tasks:
 58                return
 59
 60            for index in reversed(range(len(futures))):
 61                future = futures[index]
 62
 63                if future.done():
 64                    # Only call future.exception() if the future is known to be done because it will block until the future is done.
 65                    # See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.exception
 66                    optional_exception = future.exception()
 67                    if optional_exception:
 68                        # Exception handling should be done in the main thread. Hence, we only store the exception and expect the main
 69                        # thread to call raise_if_exception
 70                        # We do not expect this error to happen. The futures created during concurrent syncs should catch the exception and
 71                        # push it to the queue. If this exception occurs, please review the futures and how they handle exceptions.
 72                        self._most_recently_seen_exception = RuntimeError(
 73                            f"Failed processing a future: {optional_exception}. Please contact the Airbyte team."
 74                        )
 75                    futures.pop(index)
 76
 77    def _shutdown(self) -> None:
 78        # Without a way to stop the threads that have already started, this will not stop the Python application. We are fine today with
 79        # this imperfect approach because we only do this in case of `self._most_recently_seen_exception` which we don't expect to happen
 80        self._threadpool.shutdown(wait=False, cancel_futures=True)
 81
 82    def is_done(self) -> bool:
 83        return all([f.done() for f in self._futures])
 84
 85    def check_for_errors_and_shutdown(self) -> None:
 86        """
 87        Check if any of the futures have an exception, and raise it if so. If all futures are done, shutdown the threadpool.
 88        If the futures are not done, raise an exception.
 89        :return:
 90        """
 91        if self._most_recently_seen_exception:
 92            self._logger.exception(
 93                "An unknown exception has occurred while reading concurrently",
 94                exc_info=self._most_recently_seen_exception,
 95            )
 96            self._stop_and_raise_exception(self._most_recently_seen_exception)
 97
 98        exceptions_from_futures = [
 99            f for f in [future.exception() for future in self._futures] if f is not None
100        ]
101        if exceptions_from_futures:
102            exception = RuntimeError(f"Failed reading with errors: {exceptions_from_futures}")
103            self._stop_and_raise_exception(exception)
104        else:
105            futures_not_done = [f for f in self._futures if not f.done()]
106            if futures_not_done:
107                exception = RuntimeError(
108                    f"Failed reading with futures not done: {futures_not_done}"
109                )
110                self._stop_and_raise_exception(exception)
111            else:
112                self._shutdown()
113
114    def _stop_and_raise_exception(self, exception: BaseException) -> None:
115        self._shutdown()
116        raise exception

Wrapper to abstract away the threadpool and the logic to wait for pending tasks to be completed.

ThreadPoolManager( threadpool: concurrent.futures.thread.ThreadPoolExecutor, logger: logging.Logger, max_concurrent_tasks: int = 10000)
18    def __init__(
19        self,
20        threadpool: ThreadPoolExecutor,
21        logger: logging.Logger,
22        max_concurrent_tasks: int = DEFAULT_MAX_QUEUE_SIZE,
23    ):
24        """
25        :param threadpool: The threadpool to use
26        :param logger: The logger to use
27        :param max_concurrent_tasks: The maximum number of tasks that can be pending at the same time
28        """
29        self._threadpool = threadpool
30        self._logger = logger
31        self._max_concurrent_tasks = max_concurrent_tasks
32        self._futures: List[Future[Any]] = []
33        self._lock = threading.Lock()
34        self._most_recently_seen_exception: Optional[Exception] = None
35
36        self._logging_threshold = max_concurrent_tasks * 2
Parameters
  • threadpool: The threadpool to use
  • logger: The logger to use
  • max_concurrent_tasks: The maximum number of tasks that can be pending at the same time
DEFAULT_MAX_QUEUE_SIZE = 10000
def prune_to_validate_has_reached_futures_limit(self) -> bool:
38    def prune_to_validate_has_reached_futures_limit(self) -> bool:
39        self._prune_futures(self._futures)
40        if len(self._futures) > self._logging_threshold:
41            self._logger.warning(
42                f"ThreadPoolManager: The list of futures is getting bigger than expected ({len(self._futures)})"
43            )
44        return len(self._futures) >= self._max_concurrent_tasks
def submit(self, function: Callable[..., Any], *args: Any) -> None:
46    def submit(self, function: Callable[..., Any], *args: Any) -> None:
47        self._futures.append(self._threadpool.submit(function, *args))
def is_done(self) -> bool:
82    def is_done(self) -> bool:
83        return all([f.done() for f in self._futures])
def check_for_errors_and_shutdown(self) -> None:
 85    def check_for_errors_and_shutdown(self) -> None:
 86        """
 87        Check if any of the futures have an exception, and raise it if so. If all futures are done, shutdown the threadpool.
 88        If the futures are not done, raise an exception.
 89        :return:
 90        """
 91        if self._most_recently_seen_exception:
 92            self._logger.exception(
 93                "An unknown exception has occurred while reading concurrently",
 94                exc_info=self._most_recently_seen_exception,
 95            )
 96            self._stop_and_raise_exception(self._most_recently_seen_exception)
 97
 98        exceptions_from_futures = [
 99            f for f in [future.exception() for future in self._futures] if f is not None
100        ]
101        if exceptions_from_futures:
102            exception = RuntimeError(f"Failed reading with errors: {exceptions_from_futures}")
103            self._stop_and_raise_exception(exception)
104        else:
105            futures_not_done = [f for f in self._futures if not f.done()]
106            if futures_not_done:
107                exception = RuntimeError(
108                    f"Failed reading with futures not done: {futures_not_done}"
109                )
110                self._stop_and_raise_exception(exception)
111            else:
112                self._shutdown()

Check if any of the futures have an exception, and raise it if so. If all futures are done, shutdown the threadpool. If the futures are not done, raise an exception.

Returns