I want to download a list of url in n threads and post-process the results in m other threads. The bottleneck is the the download. The first solution that comes in mind is oviously:
import threading
import requests
from queue import Queue
import time
class Downloader:
def __init__(self, urls, handle, num_download_threads=5, num_worker_threads=5, max_retries=3, ):
self.urls_queue = Queue()
self.processing_queue = Queue()
self.max_retries = max_retries
self.stop_workers = False
self.handle = handle
for url in urls:
self.urls_queue.put((url, 0)) # Initial retry count is 0
self.download_threads = [threading.Thread(target=self.download_thread) for _ in range(num_download_threads)]
self.worker_threads = [threading.Thread(target=self.worker_thread_fn) for _ in range(num_worker_threads)]
def download_thread(self):
while not self.urls_queue.empty():
url, retry_count = self.urls_queue.get()
try:
response = requests.get(url)
response.raise_for_status()
self.processing_queue.put((url, response))
except requests.exceptions.RequestException as e:
print(f"Error downloading {url}: {e}")
if retry_count < self.max_retries:
self.urls_queue.put((url, retry_count + 1)) # Re-queue the URL with an incremented retry count
else:
print(f"Failed to download {url} after {self.max_retries} attempts.")
finally:
self.urls_queue.task_done()
def worker_thread_fn(self):
while True:
try:
url, response = self.processing_queue.get(timeout=0.1)
except:
if self.stop_workers:
break
else:
continue
self.handle(url, response)
self.processing_queue.task_done()
def start(self):
for thread in self.worker_threads:
thread.start()
for thread in self.download_threads :
thread.start()
for thread in self.download_threads :
thread.join()
self.stop_workers = True
for thread in self.worker_threads:
thread.join()
#####################################
import unittest.mock as mock
import time
import timeit
import random
def mock_requests_get(mean_sleep):
def inner(url, *args, **kwargs):
class MockResponse:
def __init__(self, content, status_code):
self.content = content
self.status_code = status_code
def raise_for_status(self):
if self.status_code != 200:
raise requests.exceptions.RequestException(f"Status code: {self.status_code}")
# Generate a sleep time from a normal distribution
mean = mean_sleep
sleep_time = random.normalvariate(mu=mean, sigma=mean/10) # mean=1.0s, std_dev=0.1s
sleep_time = max(0, sleep_time) # Ensure sleep time is not negative
time.sleep(sleep_time)
fail_probability = 0.10
# Simulate a failure with a {fail_probability}% probability
if random.random() < fail_probability :
status_code = random.choice([400, 404, 500, 502])
return MockResponse(b"", status_code) # Simulating a failure
else:
return MockResponse(b"fake content", 200) # Simulating a success
return inner
def handle(url, response):
time.sleep(0.5)
print(f"Processed {url} with {len(response.content)} bytes")
def main():
# Generating 100 fake URLs
urls = [f"http://example.com/resource{i}" for i in range(100)]
start = timeit.default_timer()
# Mocking requests.get
with mock.patch('requests.get', side_effect=mock_requests_get(3)):
# Create and start the downloader
downloader = Downloader(
urls,
handle,
num_download_threads=30,
num_worker_threads=10,
max_retries=3)
downloader.start()
end = timeit.default_timer()
print(f"Time taken: {end - start} seconds")
if __name__ == "__main__":
main()
But I am not happy with this solution at all. Too much can go wrong, making it stable feels very hard and a waste of time. To many edge cases. What happens on a random error? On a programming mistake? When the computer shuts down? When I ran out of RAM? There is no self healing and no delegated stability. I'd rater reuse proven stability from somewhere else.
No comments:
Post a Comment