The Problem: I want to do a steam of jobs in parallel, but don't know how many there are (e.g. if I get the jobs via web-requests).
The solution: Have an autoscaler, that scales the threads up, as needed and scales down afterwards.
import threading
import queue
import multiprocessing as mp
class AutoScaler:
"""
A dynamic worker pool manager that automatically scales the number of worker threads
based on the job queue size and a cooldown period. It ensures that the actual number of workers
does not exceed the specified maximum or drop below the minimum.
Attributes:
min_workers (int): The minimum number of worker threads that the scaler maintains, defaulting to 0.
max_workers (int): The maximum number of worker threads that the scaler can scale up to. Defaults to
the number of CPU cores available on the system.
cooldown_seconds (int): The time (in seconds) a worker waits for a new job before terminating,
assuming no new jobs are available.
pbar (optional): An optional progress bar instance (e.g., from tqdm) that can be updated as jobs
are completed. Defaults to None.
Methods:
work(job): Adds a job to the queue and scales up workers if necessary.
wait(): Blocks until all jobs are completed and then gracefully shuts down all workers.
Example:
>>> from tqdm import tqdm
>>> scaler = AutoScaler(min_workers=1, max_workers=5, cooldown_seconds=2, pbar=tqdm(total=100))
>>> for _ in range(100):
... scaler.work(lambda: do_something())
>>> scaler.wait()
"""
def __init__(self, min_workers=0, max_workers=None, cooldown_seconds=2, pbar=None):
"""
Parameters:
min_workers (int): Minimum number of worker threads. Defaults to 0.
max_workers (int or None): Maximum number of worker threads. Defaults to the number of CPU cores.
cooldown_seconds (int): Cooldown time in seconds for a worker before it terminates if no new job is received.
pbar (optional): A progress bar instance from an external library like tqdm.
"""
self.min_workers = min_workers
self.max_workers = max_workers if max_workers is not None else mp.cpu_count()
assert self.min_workers >= 0, 'min_workers must be non-negative'
assert self.max_workers > 0, 'max_workers must be positive'
assert self.min_workers <= self.max_workers, 'min_workers must be less than or equal to max_workers'
assert cooldown_seconds >= 0, 'cooldown_seconds must be non-negative'
self.pbar = pbar
self.cooldown_seconds = cooldown_seconds
self.jobs_queue = queue.Queue(max_size=self.max_workers) # TODO: make param
self.workers = []
self.lock = threading.Lock()
# Initialize the minimum number of workers
for _ in range(self.min_workers):
self._add_worker()
def _add_worker(self):
"""
Starts a new worker thread, ensuring the total number does not exceed `max_workers`.
This method is thread-safe.
"""
with self.lock:
if len(self.workers) < self.max_workers:
worker = threading.Thread(target=self._worker)
worker.start()
self.workers.append(worker)
def _worker(self):
"""
Worker thread that processes jobs from the queue.
Shuts down when it receives a None job or when the queue is empty for a certain period of time
(cooldown_seconds).
"""
while True:
try:
job = self.jobs_queue.get(timeout=self.cooldown_seconds) # Adjust timeout as needed
if job is None:
break
job()
if self.pbar:
self.pbar.update(1)
self.jobs_queue.task_done()
except queue.Empty:
break # Exit the thread if no job is received within the timeout period
def work(self, job):
"""
Adds a job to the queue and attempts to scale up the number of workers if the queue is not empty.
Parameters:
job (callable): The job function to be executed by a worker.
"""
self.jobs_queue.put(job)
if not self.jobs_queue.empty():
self._add_worker()
def wait(self):
"""
Blocks until all jobs in the queue have been completed and then shuts down all worker threads gracefully.
"""
self.jobs_queue.join() # Wait for the queue to be empty
# Send a signal to the workers to stop and don't use the
# cooldown mechanism, which would be a waste of time
while any(thread.is_alive() for thread in self.workers):
self.jobs_queue.put(None)
for worker in self.workers:
worker.join()
if __name__ == '__main__':
# Example usage
import time
from timeit import default_timer as timer
from tqdm import tqdm
def job():
time.sleep(3) # Simulate a task
total = 10
max_workers = 50
with tqdm(total=total) as pbar:
scaler = AutoScaler(max_workers=max_workers, pbar=pbar)
start = timer()
for _ in range(total):
time.sleep(1)
active_workers = sum(1 for thread in scaler.workers if thread.is_alive())
print(f'{active_workers=}')
scaler.work(job)
scaler.wait()
end = timer()
print(f'done in {end - start} seconds')
No comments:
Post a Comment