Monday, 25 March 2024

Python: Check if a string matches at least one glob condition

import fnmatch

def matches_one_glob_condition(item: str, conditions: list[str]) -> bool:
    """
    Check if the given item matches at least one of the specified glob patterns.

    Args:
    item (str): The item to check.
    conditions (list[str]): A list of glob patterns to match against the item.

    Returns:
    bool: True if the item matches at least one condition, False otherwise.
    """
    for condition in conditions:
        if fnmatch.fnmatch(item, condition):
            return True
    return False
  
print(matches_one_glob_condition('beir/scifacts/test', ['beir/*/test']))

Python: Check if the given semantic version matches all the specified conditions.

import re
from packaging.version import Version

def matches_condition(semver: str, conditions: list[str]) -> bool:
    """
    Check if the given semantic version matches all the specified conditions.

    Args:
    semver (str): The semantic version to check.
    conditions (list[str]): A list of conditions to match against the semantic version.

    Returns:
    bool: True if the semantic version matches all conditions, False otherwise.
    """
    semver_version = Version(semver.replace(" ", ""))
    for condition in conditions:
        condition = condition.replace(" ", "")
        # Use regex to extract operator and version
        match = re.match(r'([><=!]*)(.*)', condition)
        operator, version_str = match.groups()
        version_to_compare = Version(version_str)

        # Default to "==" if no operator is found
        if operator == "" or operator == "==":
            if not (semver_version == version_to_compare):
                return False
        elif operator == ">":
            if not (semver_version > version_to_compare):
                return False
        elif operator == "<":
            if not (semver_version < version_to_compare):
                return False
        elif operator == ">=":
            if not (semver_version >= version_to_compare):
                return False
        elif operator == "<=":
            if not (semver_version <= version_to_compare):
                return False
        elif operator == "!":
            if not (semver_version != version_to_compare):
                return False

    return True

print(matches_condition("1.0.1", ['>=1.0.0', '<2.0.0']))
print(matches_condition("1.0.1", ['>=1.0.0']))
print(matches_condition("1.0.1", ['==1.0.1']))
print(matches_condition("1.0.1", ['1.0.1']))

Sunday, 24 March 2024

Get cache-basepath in python: get_cache_path

from pathlib import Path
import os

def get_cache_path(app_name: str) -> Path:
    """Return the path for the app's cache directory, automatically adjusted for the OS.

    Args:
        app_name (str): The name of the application.

    Returns:
        Path: The absolute path to the app's cache directory.
    """
    if os.name == 'nt':  # Windows
        cache_dir = Path(os.getenv('LOCALAPPDATA', Path.home())) / app_name / "Cache"
    else:
        cache_dir = Path.home() / ".cache" / app_name
    if not cache_dir.exists():
        cache_dir.mkdir(parents=True)
    return cache_dir

# Example usage
cache_path = get_cache_path("my_app")
print(cache_path)

Wednesday, 20 March 2024

Managing processes in python (and kill them on script exit)

'''
This example demonstates how to manage processes in a singleton.
The main problem is that we have to kill all children of a process as well.
This means we cannot loose the reference to the process object.

Also we no matter what we want to terminate our child-processes when this script ends.
We do this via atexit. Otherwise, e.g. when we start a server, it keeps running even if this 
scripts ends.

You can use the Processes either as tool to kill processes properly (recursive) or as a 
context manager to define the lifetime of a process.
'''

import subprocess
import psutil
import atexit
import shlex
import os
from threading import Lock
from typing import Optional
from pathlib import Path

class Processes:
    """A singleton class for managing subprocesses.

    Provides methods to run commands or scripts as subprocesses, with support for
    terminating individual processes or all subprocesses upon script exit or
    context block exit.
    """
    _instance = None
    _lock = Lock()

    def __new__(cls):
        """Ensure only one instance of the class is created."""
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(Processes, cls).__new__(cls)
                cls._subprocesses: list[subprocess.Popen] = []
                atexit.register(cls._instance.terminate_all)
        return cls._instance

    def run(self, command: str | list[str], cwd: Optional[str] = None, shell: bool = False) -> subprocess.Popen:
        """Starts a subprocess with the given command.

        Args:
            command: The command to execute. If it's a string, it will be split using shlex.
            cwd: The working directory to execute the command in.
            shell: Whether to execute the command through the shell.

        Returns:
            The subprocess.Popen object for the started subprocess.
        """
        if isinstance(command, str):
            command = shlex.split(command)

        print(command, cwd, shell)
        process = subprocess.Popen(command, cwd=cwd, shell=shell)
        self._subprocesses.append(process)
        return process

    def run_scripts(self, path: Path|str) -> list[subprocess.Popen]:
        """Executes each script found in the specified directory path.

        Only files with execute permissions are considered.

        Args:
            path: The directory path containing scripts to execute.

        Returns:
            A list of subprocess.Popen objects for the started subprocesses.
        """

        path = Path(path)  # Ensure path is a Path object
        processes = []

        for script_path in path.iterdir():
            if script_path.is_file() and os.access(script_path, os.X_OK):
                process = self.run(str(script_path), shell=True)
                processes.append(process)
        return processes



    def terminate(self, process: subprocess.Popen) -> None:
        """Terminates a specific subprocess.

        Args:
            process: The subprocess.Popen object to terminate.
        """
        if process.poll() is None:  # Process is still running
            try:
                parent = psutil.Process(process.pid)
                for child in parent.children(recursive=True):
                    child.terminate()
                parent.terminate()
                parent.wait(3)  # Wait before force killing
            except psutil.NoSuchProcess:
                pass  # Process already terminated
        self._subprocesses.remove(process)

    def terminate_all(self) -> None:
        """Terminates all subprocesses started by this class."""
        for process in list(self._subprocesses):  # Create a copy of the list for iteration
            self.terminate(process)

    def __enter__(self) -> 'Processes':
        """Enables use of the class as a context manager."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Ensures all subprocesses are terminated when exiting a context block."""
        self.terminate_all()

# Example usage as a singleton:
process_manager = Processes()
proc = process_manager.run('ls -l')

import time
time.sleep(2)

process_manager.terminate(proc)

# Running scripts from a specified directory path and terminating one
# procs = process_manager.run_scripts('/path/to/scripts')
# if procs:
#     process_manager.terminate(procs[0])

# Example usage as a context manager:
with Processes() as pm:
    proc = pm.run('sleep 10')
    time.sleep(2)


# pm.run('clear', shell=True)

Wednesday, 13 March 2024

Python: Awaiting the end progress. Wait until the end of progress bars.

`await_progress_completion monitors asynchronous tasks, signaling completion or stall based on set thresholds. It uses timeout_seconds for maximum wait time, stall_check_interval to check for progress during stalls, and progress_check_interval for regular updates. This approach prevents resource waste on stalled tasks by differentiating between ongoing progress and inactivity, essential for efficient task management in computational systems.

from datetime import datetime, timedelta
import time
import logging

logger = logging.getLogger(__name__)

def await_progress_completion(fn, expected_value=None, timeout_seconds: float = 10.0, stall_check_interval: float = 0.5, progress_check_interval: float = 0.3) -> bool:
    """
    Monitors and waits for a numerical progress indicator from a specified function (fn) to stop increasing.
    The function fn is expected to return a numerical value (e.g., an integer or float) that increases over time to indicate progress.
    The purpose of this function is to detect when this number stops increasing, indicating that progress may have stalled or completed.
    
    Parameters:
    - fn: The function to monitor for increasing numerical progress.
    - timeout_seconds: The maximum amount of time to wait for progress before considering the process stalled and timing out.
    - stall_check_interval: The interval, in seconds, at which to check for progress during a stall condition.
    - progress_check_interval: The interval, in seconds, at which to regularly check for progress to avoid continuous polling.
    
    Returns:
    - bool: True if progress continues without stalling for the duration of the timeout period; False if progress stalls and no further increase is observed within the timeout period.
    """

    last_progress = None  # Track the last known progress value to detect when progress stalls (stops increasing).
    
    def stall_for_progress():
        """
        Handles situations where progress has stalled by waiting for an increase in the progress indicator, checking at set intervals, and implementing a timeout.
        This approach helps manage scenarios where progress may temporarily halt but is expected to resume.
        """
        nonlocal last_progress
        logger.debug("Stalled. Waiting for progress.")
        wait_until = datetime.now() + timedelta(seconds=timeout_seconds)
        while datetime.now() <= wait_until:
            current_progress = fn()
            if current_progress != last_progress:
                logging.info("Progress made.")
                last_progress = current_progress
                return True
            time.sleep(stall_check_interval)
        logger.debug("Timeout. No progress.")
        return False

    while True:
        progress = fn()
        if expected_value is not None and progress == expected_value:
            logger.debug("Progress complete.")
            return True
        if progress != last_progress:
            logger.debug(f"Progress: {progress}")
            last_progress = progress
        else:
            if not stall_for_progress():
                return False
        time.sleep(progress_check_interval)  # Provides a breather between checks to reduce the likelihood of overwhelming the monitored process or system.



if __name__ == '__main__':
    from rich import print

    class Service:

        def __init__(self, count_up_to: int):
            self.start = datetime.now()
            self.count_up_to = count_up_to

        def count(self) -> bool:
            seconds_since_start = (datetime.now() - self.start).total_seconds()
            res = int(min(seconds_since_start, self.count_up_to))
            print(f'counting... {res=}')
            return res

    service = Service(3)
    await_progress_completion(service.count, expected_value=3, timeout_seconds=3.0)

Tuesday, 12 March 2024

Python: A simple thread-based Autoscaler

The Problem: I want to do a steam of jobs in parallel, but don't know how many there are (e.g. if I get the jobs via web-requests).

The solution: Have an autoscaler, that scales the threads up, as needed and scales down afterwards. 


import threading
import queue
import multiprocessing as mp

class AutoScaler:
    """
    A dynamic worker pool manager that automatically scales the number of worker threads
    based on the job queue size and a cooldown period. It ensures that the actual number of workers
    does not exceed the specified maximum or drop below the minimum.

    Attributes:
        min_workers (int): The minimum number of worker threads that the scaler maintains, defaulting to 0.
        max_workers (int): The maximum number of worker threads that the scaler can scale up to. Defaults to
                           the number of CPU cores available on the system.
        cooldown_seconds (int): The time (in seconds) a worker waits for a new job before terminating,
                                assuming no new jobs are available.
        pbar (optional): An optional progress bar instance (e.g., from tqdm) that can be updated as jobs
                         are completed. Defaults to None.

    Methods:
        work(job): Adds a job to the queue and scales up workers if necessary.
        wait(): Blocks until all jobs are completed and then gracefully shuts down all workers.

    Example:
        >>> from tqdm import tqdm
        >>> scaler = AutoScaler(min_workers=1, max_workers=5, cooldown_seconds=2, pbar=tqdm(total=100))
        >>> for _ in range(100):
        ...     scaler.work(lambda: do_something())
        >>> scaler.wait()
    """

    def __init__(self, min_workers=0, max_workers=None, cooldown_seconds=2, pbar=None):
        """
        Parameters:
            min_workers (int): Minimum number of worker threads. Defaults to 0.
            max_workers (int or None): Maximum number of worker threads. Defaults to the number of CPU cores.
            cooldown_seconds (int): Cooldown time in seconds for a worker before it terminates if no new job is received.
            pbar (optional): A progress bar instance from an external library like tqdm.
        """

        self.min_workers = min_workers
        self.max_workers = max_workers if max_workers is not None else mp.cpu_count()
        
        assert self.min_workers >= 0, 'min_workers must be non-negative'
        assert self.max_workers > 0, 'max_workers must be positive'
        assert self.min_workers <= self.max_workers, 'min_workers must be less than or equal to max_workers'
        assert cooldown_seconds >= 0, 'cooldown_seconds must be non-negative'
        
        self.pbar = pbar
        self.cooldown_seconds = cooldown_seconds
        self.jobs_queue = queue.Queue(max_size=self.max_workers) # TODO: make param
        self.workers = []
        self.lock = threading.Lock()

        # Initialize the minimum number of workers
        for _ in range(self.min_workers):
            self._add_worker()

    def _add_worker(self):
        """
        Starts a new worker thread, ensuring the total number does not exceed `max_workers`.
        This method is thread-safe.
        """
        with self.lock:
            if len(self.workers) < self.max_workers:
                worker = threading.Thread(target=self._worker)
                worker.start()
                self.workers.append(worker)

    def _worker(self):
        """
        Worker thread that processes jobs from the queue.
        Shuts down when it receives a None job or when the queue is empty for a certain period of time 
        (cooldown_seconds).
        """
        while True:
            try:
                job = self.jobs_queue.get(timeout=self.cooldown_seconds)  # Adjust timeout as needed
                if job is None:
                    break
                job()
                if self.pbar:
                    self.pbar.update(1)
                self.jobs_queue.task_done()
            except queue.Empty:
                break  # Exit the thread if no job is received within the timeout period

    def work(self, job):
        """
        Adds a job to the queue and attempts to scale up the number of workers if the queue is not empty.

        Parameters:
            job (callable): The job function to be executed by a worker.
        """
        self.jobs_queue.put(job)
        if not self.jobs_queue.empty():
            self._add_worker()

    def wait(self):
        """
        Blocks until all jobs in the queue have been completed and then shuts down all worker threads gracefully.
        """
        self.jobs_queue.join()  # Wait for the queue to be empty

        # Send a signal to the workers to stop and don't use the 
        # cooldown mechanism, which would be a waste of time 
        while any(thread.is_alive() for thread in self.workers):
            self.jobs_queue.put(None)

        for worker in self.workers:
            worker.join()


if __name__ == '__main__':
    # Example usage
    import time
    from timeit import default_timer as timer
    from tqdm import tqdm

    def job():
        time.sleep(3)  # Simulate a task

    total = 10
    max_workers = 50

    with tqdm(total=total) as pbar:
        scaler = AutoScaler(max_workers=max_workers, pbar=pbar)

        start = timer()
        for _ in range(total):
            time.sleep(1)
            active_workers = sum(1 for thread in scaler.workers if thread.is_alive())
            print(f'{active_workers=}')

            scaler.work(job)

        scaler.wait()
        end = timer()

    print(f'done in {end - start} seconds')


Python: Download file in chunks with progress bar

import requests
from tqdm import tqdm

def download_file(url, filename):
    with requests.get(url, stream=True) as response:
        response.raise_for_status()
        total_size_in_bytes = int(response.headers.get('content-length', 0))
        block_size = 1024 # 1 Kibibyte
        progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True)
        with open(filename, 'wb') as file:
            for chunk in response.iter_content(chunk_size=block_size):
                progress_bar.update(len(chunk))
                file.write(chunk)
        progress_bar.close()

Monday, 11 March 2024

Inversion of Control approaches

 

0. Dependency Injection (DI)

  • Description: DI is a design pattern that decouples object creation from usage by injecting dependencies at runtime, enhancing modularity and testability.
  • Use Cases: Ideal for complex software systems requiring easy maintenance, testing, and flexibility, such as web applications.
  • Drawbacks: Can introduce setup complexity, require a learning curve, and potentially lead to runtime errors if configurations are incorrect.

1. Service Locator Pattern

  • Description: Unlike DI, where dependencies are pushed into an object, the Service Locator pattern involves an object requesting its dependencies from a central registry or locator.
  • Use Cases: It can be useful in situations where DI might be overly complex or when there's a need for more dynamic resolution of dependencies.
  • Drawbacks: It tends to hide the class dependencies, making them less explicit, which can lead to harder-to-read code and potentially more difficult debugging and testing.

2. (Abstract) Factory Pattern

  • Description: The Factory Pattern is a creational pattern used to create objects without specifying the exact class of object that will be created. This is achieved by defining an interface for creating an object, but letting subclasses decide which class to instantiate.
  • Use Cases: Useful when the creation process is complex or when it should be decoupled from the system's business logic.
  • Drawbacks: It doesn't automatically manage dependencies like DI, potentially leading to more boilerplate code if dependencies are complex.

3. Singleton Pattern

  • Description: Ensures a class has only one instance and provides a global point of access to it. This can be used to manage shared resources or configurations.
  • Use Cases: Suitable for logging, driver objects, caching, thread pools, and configuration settings.
  • Drawbacks: Overuse can lead to issues similar to global variables, making code tightly coupled, harder to test, and parallelize.

4. Constructor Injection without a Container

  • Description: This approach manually injects dependencies through constructors without using a DI container.
  • Use Cases: It's a simpler alternative for applications with fewer dependencies or where introducing a DI container would be considered overkill.
  • Drawbacks: Can become cumbersome as the application grows, leading to complex constructor signatures and manual management of dependency graphs.

5. Static Methods/Classes

  • Description: Using static methods or classes to provide services or functionalities without instantiating objects.
  • Use Cases: Suitable for utility functions or when state management is not a concern.
  • Drawbacks: Static methods can lead to tightly coupled code, making testing and mocking more difficult.

6. Registry Pattern

  • Description: A registry pattern provides a system-wide registry of services or objects that can be accessed from anywhere in the application.
  • Use Cases: Can be used for global access points to services or configurations, similar to service locators but with a simpler implementation.
  • Drawbacks: Global access can lead to issues with maintainability and testability, as it introduces global state into an application.

Service Locator: Quick and Dirty Prototype

from __future__ import annotations
from typing import TypeVar, Type
import inspect
import typing


T = TypeVar('T')

class Services:
    
    def __init__(self):
        self._singletons = {}
        self._factories = {}

    def singleton(self, type, service):
        self._singletons[type] = service
        type.instance = lambda: self.get(type)

    def factory(self, type, factory):
        self._factories[type] = factory
        type.instance = lambda: self.get(type)

    def get(self, type: Type[T]) -> T:
        if type in self._singletons:
            return self._singletons[type] 

        if type in self._factories:
            return self._factories[type](self)

        raise Exception(f'No service found for {type}')

    def __call__(self, type: Type[T]) -> T:
        return self.get(type)

    def __getitem__(self, type: Type[T]) -> T:
        return self.get(type)

    def __setitem__(self, type: Type[T], service: T):
        # if is lambda make it a factory or else a singleton
        if inspect.isfunction(service):
            self.factory(type, service)
        else:
            self.singleton(type, service)

    def contains(self, type: Type[T]) -> bool:
        return type in self._singletons or type in self._factories

    
    def new(self, type: Type[T], **kwargs) -> T:
        if self.contains(type):
            return self.get(type)
        return self.call(type, **kwargs)

    def call(self, callable, **kwargs):
        signature = inspect.signature(callable)
        params = dict(signature.parameters)
        if 'self' in params:
            del params['self']
        
        fn_kwargs = {}
        type_hints = typing.get_type_hints(callable)
        
        for param_name in params:
            if param_name in kwargs:
                fn_kwargs[param_name] = kwargs[param_name]
            else:
                required_type = type_hints[param_name]
                instance = self.get(required_type)
                fn_kwargs[param_name] = instance

        return callable(**fn_kwargs)


services = Services()

@click.command()
def cli():
    services[EvaluationRepository] = SqliteEvaluationRepository(session)
    services[BenchmarkRepository] = SqliteBenchmarkRepository(session)
    services[RetrieverRepository] = SqliteRetrieverRepository(session)
def __init__(self, projects_path: Path):
        self.retriever_repository = services[RetrieverRepository]
        self.projects_path = projects_path
        self.evaluation_repository = services[EvaluationRepository]
        self.benchmark_repository = services[BenchmarkRepository]

Conclusion: Did not use it as it removes seams, which makes refactoring in the future more complicated (Working effectively with legacy code), which decreases changability - my metric for good code.

Parse Wikipedia dump

""" This module processes Wikipedia dump files by extracting individual articles and parsing them into a structured format, ...