I want to download a list of url in n threads and post-process the results in m other threads. The bottleneck is the the download. The first solution that comes in mind is oviously:
import threading import requests from queue import Queue import time class Downloader: def __init__(self, urls, handle, num_download_threads=5, num_worker_threads=5, max_retries=3, ): self.urls_queue = Queue() self.processing_queue = Queue() self.max_retries = max_retries self.stop_workers = False self.handle = handle for url in urls: self.urls_queue.put((url, 0)) # Initial retry count is 0 self.download_threads = [threading.Thread(target=self.download_thread) for _ in range(num_download_threads)] self.worker_threads = [threading.Thread(target=self.worker_thread_fn) for _ in range(num_worker_threads)] def download_thread(self): while not self.urls_queue.empty(): url, retry_count = self.urls_queue.get() try: response = requests.get(url) response.raise_for_status() self.processing_queue.put((url, response)) except requests.exceptions.RequestException as e: print(f"Error downloading {url}: {e}") if retry_count < self.max_retries: self.urls_queue.put((url, retry_count + 1)) # Re-queue the URL with an incremented retry count else: print(f"Failed to download {url} after {self.max_retries} attempts.") finally: self.urls_queue.task_done() def worker_thread_fn(self): while True: try: url, response = self.processing_queue.get(timeout=0.1) except: if self.stop_workers: break else: continue self.handle(url, response) self.processing_queue.task_done() def start(self): for thread in self.worker_threads: thread.start() for thread in self.download_threads : thread.start() for thread in self.download_threads : thread.join() self.stop_workers = True for thread in self.worker_threads: thread.join() ##################################### import unittest.mock as mock import time import timeit import random def mock_requests_get(mean_sleep): def inner(url, *args, **kwargs): class MockResponse: def __init__(self, content, status_code): self.content = content self.status_code = status_code def raise_for_status(self): if self.status_code != 200: raise requests.exceptions.RequestException(f"Status code: {self.status_code}") # Generate a sleep time from a normal distribution mean = mean_sleep sleep_time = random.normalvariate(mu=mean, sigma=mean/10) # mean=1.0s, std_dev=0.1s sleep_time = max(0, sleep_time) # Ensure sleep time is not negative time.sleep(sleep_time) fail_probability = 0.10 # Simulate a failure with a {fail_probability}% probability if random.random() < fail_probability : status_code = random.choice([400, 404, 500, 502]) return MockResponse(b"", status_code) # Simulating a failure else: return MockResponse(b"fake content", 200) # Simulating a success return inner def handle(url, response): time.sleep(0.5) print(f"Processed {url} with {len(response.content)} bytes") def main(): # Generating 100 fake URLs urls = [f"http://example.com/resource{i}" for i in range(100)] start = timeit.default_timer() # Mocking requests.get with mock.patch('requests.get', side_effect=mock_requests_get(3)): # Create and start the downloader downloader = Downloader( urls, handle, num_download_threads=30, num_worker_threads=10, max_retries=3) downloader.start() end = timeit.default_timer() print(f"Time taken: {end - start} seconds") if __name__ == "__main__": main()
But I am not happy with this solution at all. Too much can go wrong, making it stable feels very hard and a waste of time. To many edge cases. What happens on a random error? On a programming mistake? When the computer shuts down? When I ran out of RAM? There is no self healing and no delegated stability. I'd rater reuse proven stability from somewhere else.
No comments:
Post a Comment