Friday, 5 January 2024

Python, Snippet: Download and post-process in parallel

I want to download a list of url in n threads and post-process the results in m other threads. The bottleneck is the the download. The first solution that comes in mind is oviously:

import threading
import requests
from queue import Queue
import time

class Downloader:

    def __init__(self, urls, handle, num_download_threads=5, num_worker_threads=5, max_retries=3, ):
        self.urls_queue = Queue()
        self.processing_queue = Queue()
        self.max_retries = max_retries
        self.stop_workers = False
        self.handle = handle

        for url in urls:
            self.urls_queue.put((url, 0))  # Initial retry count is 0

        self.download_threads = [threading.Thread(target=self.download_thread) for _ in range(num_download_threads)]
        self.worker_threads = [threading.Thread(target=self.worker_thread_fn) for _ in range(num_worker_threads)]

    def download_thread(self):
        while not self.urls_queue.empty():
            url, retry_count = self.urls_queue.get()
            try:
                response = requests.get(url)
                response.raise_for_status()
                self.processing_queue.put((url, response))
            except requests.exceptions.RequestException as e:
                print(f"Error downloading {url}: {e}")
                if retry_count < self.max_retries:
                    self.urls_queue.put((url, retry_count + 1))  # Re-queue the URL with an incremented retry count
                else:
                    print(f"Failed to download {url} after {self.max_retries} attempts.")
            finally:
                self.urls_queue.task_done()

    def worker_thread_fn(self):
        while True:
            try:
                url, response = self.processing_queue.get(timeout=0.1)
            except:
                if self.stop_workers:
                    break
                else:
                    continue

            self.handle(url, response)
            self.processing_queue.task_done()

    def start(self):

        for thread in self.worker_threads:
            thread.start()

        for thread in self.download_threads :
            thread.start()

        for thread in self.download_threads :
            thread.join()

        self.stop_workers = True

        for thread in self.worker_threads:
            thread.join()



#####################################


import unittest.mock as mock
import time
import timeit
import random

def mock_requests_get(mean_sleep):
    def inner(url, *args, **kwargs):
        class MockResponse:
            def __init__(self, content, status_code):
                self.content = content
                self.status_code = status_code

            def raise_for_status(self):
                if self.status_code != 200:
                    raise requests.exceptions.RequestException(f"Status code: {self.status_code}")

        # Generate a sleep time from a normal distribution
        mean = mean_sleep
        sleep_time = random.normalvariate(mu=mean, sigma=mean/10)  # mean=1.0s, std_dev=0.1s
        sleep_time = max(0, sleep_time)  # Ensure sleep time is not negative
        time.sleep(sleep_time)

        fail_probability = 0.10 
        # Simulate a failure with a {fail_probability}% probability
        if random.random() < fail_probability :
            status_code = random.choice([400, 404, 500, 502])
            return MockResponse(b"", status_code)  # Simulating a failure
        else:
            return MockResponse(b"fake content", 200)  # Simulating a success
    return inner


def handle(url, response):
    time.sleep(0.5)
    print(f"Processed {url} with {len(response.content)} bytes")

def main():
    # Generating 100 fake URLs
    urls = [f"http://example.com/resource{i}" for i in range(100)]

    start = timeit.default_timer()
    # Mocking requests.get
    with mock.patch('requests.get', side_effect=mock_requests_get(3)):
        # Create and start the downloader
        downloader = Downloader(
                urls, 
                handle,
                num_download_threads=30, 
                num_worker_threads=10, 
                max_retries=3)
        downloader.start()
    end = timeit.default_timer()
    print(f"Time taken: {end - start} seconds")

if __name__ == "__main__":
    main()

But I am not happy with this solution at all. Too much can go wrong, making it stable feels very hard and a waste of time. To many edge cases. What happens on a random error? On a programming mistake? When the computer shuts down? When I ran out of RAM? There is no self healing and no delegated stability. I'd rater reuse proven stability from somewhere else.

No comments:

Post a Comment

Parse Wikipedia dump

""" This module processes Wikipedia dump files by extracting individual articles and parsing them into a structured format, ...