Tuesday, 12 March 2024

Python: A simple thread-based Autoscaler

The Problem: I want to do a steam of jobs in parallel, but don't know how many there are (e.g. if I get the jobs via web-requests).

The solution: Have an autoscaler, that scales the threads up, as needed and scales down afterwards. 


import threading
import queue
import multiprocessing as mp

class AutoScaler:
    """
    A dynamic worker pool manager that automatically scales the number of worker threads
    based on the job queue size and a cooldown period. It ensures that the actual number of workers
    does not exceed the specified maximum or drop below the minimum.

    Attributes:
        min_workers (int): The minimum number of worker threads that the scaler maintains, defaulting to 0.
        max_workers (int): The maximum number of worker threads that the scaler can scale up to. Defaults to
                           the number of CPU cores available on the system.
        cooldown_seconds (int): The time (in seconds) a worker waits for a new job before terminating,
                                assuming no new jobs are available.
        pbar (optional): An optional progress bar instance (e.g., from tqdm) that can be updated as jobs
                         are completed. Defaults to None.

    Methods:
        work(job): Adds a job to the queue and scales up workers if necessary.
        wait(): Blocks until all jobs are completed and then gracefully shuts down all workers.

    Example:
        >>> from tqdm import tqdm
        >>> scaler = AutoScaler(min_workers=1, max_workers=5, cooldown_seconds=2, pbar=tqdm(total=100))
        >>> for _ in range(100):
        ...     scaler.work(lambda: do_something())
        >>> scaler.wait()
    """

    def __init__(self, min_workers=0, max_workers=None, cooldown_seconds=2, pbar=None):
        """
        Parameters:
            min_workers (int): Minimum number of worker threads. Defaults to 0.
            max_workers (int or None): Maximum number of worker threads. Defaults to the number of CPU cores.
            cooldown_seconds (int): Cooldown time in seconds for a worker before it terminates if no new job is received.
            pbar (optional): A progress bar instance from an external library like tqdm.
        """

        self.min_workers = min_workers
        self.max_workers = max_workers if max_workers is not None else mp.cpu_count()
        
        assert self.min_workers >= 0, 'min_workers must be non-negative'
        assert self.max_workers > 0, 'max_workers must be positive'
        assert self.min_workers <= self.max_workers, 'min_workers must be less than or equal to max_workers'
        assert cooldown_seconds >= 0, 'cooldown_seconds must be non-negative'
        
        self.pbar = pbar
        self.cooldown_seconds = cooldown_seconds
        self.jobs_queue = queue.Queue(max_size=self.max_workers) # TODO: make param
        self.workers = []
        self.lock = threading.Lock()

        # Initialize the minimum number of workers
        for _ in range(self.min_workers):
            self._add_worker()

    def _add_worker(self):
        """
        Starts a new worker thread, ensuring the total number does not exceed `max_workers`.
        This method is thread-safe.
        """
        with self.lock:
            if len(self.workers) < self.max_workers:
                worker = threading.Thread(target=self._worker)
                worker.start()
                self.workers.append(worker)

    def _worker(self):
        """
        Worker thread that processes jobs from the queue.
        Shuts down when it receives a None job or when the queue is empty for a certain period of time 
        (cooldown_seconds).
        """
        while True:
            try:
                job = self.jobs_queue.get(timeout=self.cooldown_seconds)  # Adjust timeout as needed
                if job is None:
                    break
                job()
                if self.pbar:
                    self.pbar.update(1)
                self.jobs_queue.task_done()
            except queue.Empty:
                break  # Exit the thread if no job is received within the timeout period

    def work(self, job):
        """
        Adds a job to the queue and attempts to scale up the number of workers if the queue is not empty.

        Parameters:
            job (callable): The job function to be executed by a worker.
        """
        self.jobs_queue.put(job)
        if not self.jobs_queue.empty():
            self._add_worker()

    def wait(self):
        """
        Blocks until all jobs in the queue have been completed and then shuts down all worker threads gracefully.
        """
        self.jobs_queue.join()  # Wait for the queue to be empty

        # Send a signal to the workers to stop and don't use the 
        # cooldown mechanism, which would be a waste of time 
        while any(thread.is_alive() for thread in self.workers):
            self.jobs_queue.put(None)

        for worker in self.workers:
            worker.join()


if __name__ == '__main__':
    # Example usage
    import time
    from timeit import default_timer as timer
    from tqdm import tqdm

    def job():
        time.sleep(3)  # Simulate a task

    total = 10
    max_workers = 50

    with tqdm(total=total) as pbar:
        scaler = AutoScaler(max_workers=max_workers, pbar=pbar)

        start = timer()
        for _ in range(total):
            time.sleep(1)
            active_workers = sum(1 for thread in scaler.workers if thread.is_alive())
            print(f'{active_workers=}')

            scaler.work(job)

        scaler.wait()
        end = timer()

    print(f'done in {end - start} seconds')


No comments:

Post a Comment

Parse Wikipedia dump

""" This module processes Wikipedia dump files by extracting individual articles and parsing them into a structured format, ...