The Problem: I want to do a steam of jobs in parallel, but don't know how many there are (e.g. if I get the jobs via web-requests).
The solution: Have an autoscaler, that scales the threads up, as needed and scales down afterwards.
import threading import queue import multiprocessing as mp class AutoScaler: """ A dynamic worker pool manager that automatically scales the number of worker threads based on the job queue size and a cooldown period. It ensures that the actual number of workers does not exceed the specified maximum or drop below the minimum. Attributes: min_workers (int): The minimum number of worker threads that the scaler maintains, defaulting to 0. max_workers (int): The maximum number of worker threads that the scaler can scale up to. Defaults to the number of CPU cores available on the system. cooldown_seconds (int): The time (in seconds) a worker waits for a new job before terminating, assuming no new jobs are available. pbar (optional): An optional progress bar instance (e.g., from tqdm) that can be updated as jobs are completed. Defaults to None. Methods: work(job): Adds a job to the queue and scales up workers if necessary. wait(): Blocks until all jobs are completed and then gracefully shuts down all workers. Example: >>> from tqdm import tqdm >>> scaler = AutoScaler(min_workers=1, max_workers=5, cooldown_seconds=2, pbar=tqdm(total=100)) >>> for _ in range(100): ... scaler.work(lambda: do_something()) >>> scaler.wait() """ def __init__(self, min_workers=0, max_workers=None, cooldown_seconds=2, pbar=None): """ Parameters: min_workers (int): Minimum number of worker threads. Defaults to 0. max_workers (int or None): Maximum number of worker threads. Defaults to the number of CPU cores. cooldown_seconds (int): Cooldown time in seconds for a worker before it terminates if no new job is received. pbar (optional): A progress bar instance from an external library like tqdm. """ self.min_workers = min_workers self.max_workers = max_workers if max_workers is not None else mp.cpu_count() assert self.min_workers >= 0, 'min_workers must be non-negative' assert self.max_workers > 0, 'max_workers must be positive' assert self.min_workers <= self.max_workers, 'min_workers must be less than or equal to max_workers' assert cooldown_seconds >= 0, 'cooldown_seconds must be non-negative' self.pbar = pbar self.cooldown_seconds = cooldown_seconds self.jobs_queue = queue.Queue(max_size=self.max_workers) # TODO: make param self.workers = [] self.lock = threading.Lock() # Initialize the minimum number of workers for _ in range(self.min_workers): self._add_worker() def _add_worker(self): """ Starts a new worker thread, ensuring the total number does not exceed `max_workers`. This method is thread-safe. """ with self.lock: if len(self.workers) < self.max_workers: worker = threading.Thread(target=self._worker) worker.start() self.workers.append(worker) def _worker(self): """ Worker thread that processes jobs from the queue. Shuts down when it receives a None job or when the queue is empty for a certain period of time (cooldown_seconds). """ while True: try: job = self.jobs_queue.get(timeout=self.cooldown_seconds) # Adjust timeout as needed if job is None: break job() if self.pbar: self.pbar.update(1) self.jobs_queue.task_done() except queue.Empty: break # Exit the thread if no job is received within the timeout period def work(self, job): """ Adds a job to the queue and attempts to scale up the number of workers if the queue is not empty. Parameters: job (callable): The job function to be executed by a worker. """ self.jobs_queue.put(job) if not self.jobs_queue.empty(): self._add_worker() def wait(self): """ Blocks until all jobs in the queue have been completed and then shuts down all worker threads gracefully. """ self.jobs_queue.join() # Wait for the queue to be empty # Send a signal to the workers to stop and don't use the # cooldown mechanism, which would be a waste of time while any(thread.is_alive() for thread in self.workers): self.jobs_queue.put(None) for worker in self.workers: worker.join() if __name__ == '__main__': # Example usage import time from timeit import default_timer as timer from tqdm import tqdm def job(): time.sleep(3) # Simulate a task total = 10 max_workers = 50 with tqdm(total=total) as pbar: scaler = AutoScaler(max_workers=max_workers, pbar=pbar) start = timer() for _ in range(total): time.sleep(1) active_workers = sum(1 for thread in scaler.workers if thread.is_alive()) print(f'{active_workers=}') scaler.work(job) scaler.wait() end = timer() print(f'done in {end - start} seconds')
No comments:
Post a Comment