import fnmatch def matches_one_glob_condition(item: str, conditions: list[str]) -> bool: """ Check if the given item matches at least one of the specified glob patterns. Args: item (str): The item to check. conditions (list[str]): A list of glob patterns to match against the item. Returns: bool: True if the item matches at least one condition, False otherwise. """ for condition in conditions: if fnmatch.fnmatch(item, condition): return True return False print(matches_one_glob_condition('beir/scifacts/test', ['beir/*/test']))
Monday, 25 March 2024
Python: Check if a string matches at least one glob condition
Python: Check if the given semantic version matches all the specified conditions.
import re from packaging.version import Version def matches_condition(semver: str, conditions: list[str]) -> bool: """ Check if the given semantic version matches all the specified conditions. Args: semver (str): The semantic version to check. conditions (list[str]): A list of conditions to match against the semantic version. Returns: bool: True if the semantic version matches all conditions, False otherwise. """ semver_version = Version(semver.replace(" ", "")) for condition in conditions: condition = condition.replace(" ", "") # Use regex to extract operator and version match = re.match(r'([><=!]*)(.*)', condition) operator, version_str = match.groups() version_to_compare = Version(version_str) # Default to "==" if no operator is found if operator == "" or operator == "==": if not (semver_version == version_to_compare): return False elif operator == ">": if not (semver_version > version_to_compare): return False elif operator == "<": if not (semver_version < version_to_compare): return False elif operator == ">=": if not (semver_version >= version_to_compare): return False elif operator == "<=": if not (semver_version <= version_to_compare): return False elif operator == "!": if not (semver_version != version_to_compare): return False return True print(matches_condition("1.0.1", ['>=1.0.0', '<2.0.0'])) print(matches_condition("1.0.1", ['>=1.0.0'])) print(matches_condition("1.0.1", ['==1.0.1'])) print(matches_condition("1.0.1", ['1.0.1']))
Sunday, 24 March 2024
Get cache-basepath in python: get_cache_path
from pathlib import Path import os def get_cache_path(app_name: str) -> Path: """Return the path for the app's cache directory, automatically adjusted for the OS. Args: app_name (str): The name of the application. Returns: Path: The absolute path to the app's cache directory. """ if os.name == 'nt': # Windows cache_dir = Path(os.getenv('LOCALAPPDATA', Path.home())) / app_name / "Cache" else: cache_dir = Path.home() / ".cache" / app_name if not cache_dir.exists(): cache_dir.mkdir(parents=True) return cache_dir # Example usage cache_path = get_cache_path("my_app") print(cache_path)
Wednesday, 20 March 2024
Managing processes in python (and kill them on script exit)
''' This example demonstates how to manage processes in a singleton. The main problem is that we have to kill all children of a process as well. This means we cannot loose the reference to the process object. Also we no matter what we want to terminate our child-processes when this script ends. We do this via atexit. Otherwise, e.g. when we start a server, it keeps running even if this scripts ends. You can use the Processes either as tool to kill processes properly (recursive) or as a context manager to define the lifetime of a process. ''' import subprocess import psutil import atexit import shlex import os from threading import Lock from typing import Optional from pathlib import Path class Processes: """A singleton class for managing subprocesses. Provides methods to run commands or scripts as subprocesses, with support for terminating individual processes or all subprocesses upon script exit or context block exit. """ _instance = None _lock = Lock() def __new__(cls): """Ensure only one instance of the class is created.""" with cls._lock: if cls._instance is None: cls._instance = super(Processes, cls).__new__(cls) cls._subprocesses: list[subprocess.Popen] = [] atexit.register(cls._instance.terminate_all) return cls._instance def run(self, command: str | list[str], cwd: Optional[str] = None, shell: bool = False) -> subprocess.Popen: """Starts a subprocess with the given command. Args: command: The command to execute. If it's a string, it will be split using shlex. cwd: The working directory to execute the command in. shell: Whether to execute the command through the shell. Returns: The subprocess.Popen object for the started subprocess. """ if isinstance(command, str): command = shlex.split(command) print(command, cwd, shell) process = subprocess.Popen(command, cwd=cwd, shell=shell) self._subprocesses.append(process) return process def run_scripts(self, path: Path|str) -> list[subprocess.Popen]: """Executes each script found in the specified directory path. Only files with execute permissions are considered. Args: path: The directory path containing scripts to execute. Returns: A list of subprocess.Popen objects for the started subprocesses. """ path = Path(path) # Ensure path is a Path object processes = [] for script_path in path.iterdir(): if script_path.is_file() and os.access(script_path, os.X_OK): process = self.run(str(script_path), shell=True) processes.append(process) return processes def terminate(self, process: subprocess.Popen) -> None: """Terminates a specific subprocess. Args: process: The subprocess.Popen object to terminate. """ if process.poll() is None: # Process is still running try: parent = psutil.Process(process.pid) for child in parent.children(recursive=True): child.terminate() parent.terminate() parent.wait(3) # Wait before force killing except psutil.NoSuchProcess: pass # Process already terminated self._subprocesses.remove(process) def terminate_all(self) -> None: """Terminates all subprocesses started by this class.""" for process in list(self._subprocesses): # Create a copy of the list for iteration self.terminate(process) def __enter__(self) -> 'Processes': """Enables use of the class as a context manager.""" return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Ensures all subprocesses are terminated when exiting a context block.""" self.terminate_all() # Example usage as a singleton: process_manager = Processes() proc = process_manager.run('ls -l') import time time.sleep(2) process_manager.terminate(proc) # Running scripts from a specified directory path and terminating one # procs = process_manager.run_scripts('/path/to/scripts') # if procs: # process_manager.terminate(procs[0]) # Example usage as a context manager: with Processes() as pm: proc = pm.run('sleep 10') time.sleep(2) # pm.run('clear', shell=True)
Wednesday, 13 March 2024
Python: Awaiting the end progress. Wait until the end of progress bars.
`await_progress_completion monitors asynchronous tasks, signaling completion or stall based on set thresholds. It uses timeout_seconds for maximum wait time, stall_check_interval to check for progress during stalls, and progress_check_interval for regular updates. This approach prevents resource waste on stalled tasks by differentiating between ongoing progress and inactivity, essential for efficient task management in computational systems.
from datetime import datetime, timedelta import time import logging logger = logging.getLogger(__name__) def await_progress_completion(fn, expected_value=None, timeout_seconds: float = 10.0, stall_check_interval: float = 0.5, progress_check_interval: float = 0.3) -> bool: """ Monitors and waits for a numerical progress indicator from a specified function (fn) to stop increasing. The function fn is expected to return a numerical value (e.g., an integer or float) that increases over time to indicate progress. The purpose of this function is to detect when this number stops increasing, indicating that progress may have stalled or completed. Parameters: - fn: The function to monitor for increasing numerical progress. - timeout_seconds: The maximum amount of time to wait for progress before considering the process stalled and timing out. - stall_check_interval: The interval, in seconds, at which to check for progress during a stall condition. - progress_check_interval: The interval, in seconds, at which to regularly check for progress to avoid continuous polling. Returns: - bool: True if progress continues without stalling for the duration of the timeout period; False if progress stalls and no further increase is observed within the timeout period. """ last_progress = None # Track the last known progress value to detect when progress stalls (stops increasing). def stall_for_progress(): """ Handles situations where progress has stalled by waiting for an increase in the progress indicator, checking at set intervals, and implementing a timeout. This approach helps manage scenarios where progress may temporarily halt but is expected to resume. """ nonlocal last_progress logger.debug("Stalled. Waiting for progress.") wait_until = datetime.now() + timedelta(seconds=timeout_seconds) while datetime.now() <= wait_until: current_progress = fn() if current_progress != last_progress: logging.info("Progress made.") last_progress = current_progress return True time.sleep(stall_check_interval) logger.debug("Timeout. No progress.") return False while True: progress = fn() if expected_value is not None and progress == expected_value: logger.debug("Progress complete.") return True if progress != last_progress: logger.debug(f"Progress: {progress}") last_progress = progress else: if not stall_for_progress(): return False time.sleep(progress_check_interval) # Provides a breather between checks to reduce the likelihood of overwhelming the monitored process or system. if __name__ == '__main__': from rich import print class Service: def __init__(self, count_up_to: int): self.start = datetime.now() self.count_up_to = count_up_to def count(self) -> bool: seconds_since_start = (datetime.now() - self.start).total_seconds() res = int(min(seconds_since_start, self.count_up_to)) print(f'counting... {res=}') return res service = Service(3) await_progress_completion(service.count, expected_value=3, timeout_seconds=3.0)
Tuesday, 12 March 2024
Python: A simple thread-based Autoscaler
The Problem: I want to do a steam of jobs in parallel, but don't know how many there are (e.g. if I get the jobs via web-requests).
The solution: Have an autoscaler, that scales the threads up, as needed and scales down afterwards.
import threading import queue import multiprocessing as mp class AutoScaler: """ A dynamic worker pool manager that automatically scales the number of worker threads based on the job queue size and a cooldown period. It ensures that the actual number of workers does not exceed the specified maximum or drop below the minimum. Attributes: min_workers (int): The minimum number of worker threads that the scaler maintains, defaulting to 0. max_workers (int): The maximum number of worker threads that the scaler can scale up to. Defaults to the number of CPU cores available on the system. cooldown_seconds (int): The time (in seconds) a worker waits for a new job before terminating, assuming no new jobs are available. pbar (optional): An optional progress bar instance (e.g., from tqdm) that can be updated as jobs are completed. Defaults to None. Methods: work(job): Adds a job to the queue and scales up workers if necessary. wait(): Blocks until all jobs are completed and then gracefully shuts down all workers. Example: >>> from tqdm import tqdm >>> scaler = AutoScaler(min_workers=1, max_workers=5, cooldown_seconds=2, pbar=tqdm(total=100)) >>> for _ in range(100): ... scaler.work(lambda: do_something()) >>> scaler.wait() """ def __init__(self, min_workers=0, max_workers=None, cooldown_seconds=2, pbar=None): """ Parameters: min_workers (int): Minimum number of worker threads. Defaults to 0. max_workers (int or None): Maximum number of worker threads. Defaults to the number of CPU cores. cooldown_seconds (int): Cooldown time in seconds for a worker before it terminates if no new job is received. pbar (optional): A progress bar instance from an external library like tqdm. """ self.min_workers = min_workers self.max_workers = max_workers if max_workers is not None else mp.cpu_count() assert self.min_workers >= 0, 'min_workers must be non-negative' assert self.max_workers > 0, 'max_workers must be positive' assert self.min_workers <= self.max_workers, 'min_workers must be less than or equal to max_workers' assert cooldown_seconds >= 0, 'cooldown_seconds must be non-negative' self.pbar = pbar self.cooldown_seconds = cooldown_seconds self.jobs_queue = queue.Queue(max_size=self.max_workers) # TODO: make param self.workers = [] self.lock = threading.Lock() # Initialize the minimum number of workers for _ in range(self.min_workers): self._add_worker() def _add_worker(self): """ Starts a new worker thread, ensuring the total number does not exceed `max_workers`. This method is thread-safe. """ with self.lock: if len(self.workers) < self.max_workers: worker = threading.Thread(target=self._worker) worker.start() self.workers.append(worker) def _worker(self): """ Worker thread that processes jobs from the queue. Shuts down when it receives a None job or when the queue is empty for a certain period of time (cooldown_seconds). """ while True: try: job = self.jobs_queue.get(timeout=self.cooldown_seconds) # Adjust timeout as needed if job is None: break job() if self.pbar: self.pbar.update(1) self.jobs_queue.task_done() except queue.Empty: break # Exit the thread if no job is received within the timeout period def work(self, job): """ Adds a job to the queue and attempts to scale up the number of workers if the queue is not empty. Parameters: job (callable): The job function to be executed by a worker. """ self.jobs_queue.put(job) if not self.jobs_queue.empty(): self._add_worker() def wait(self): """ Blocks until all jobs in the queue have been completed and then shuts down all worker threads gracefully. """ self.jobs_queue.join() # Wait for the queue to be empty # Send a signal to the workers to stop and don't use the # cooldown mechanism, which would be a waste of time while any(thread.is_alive() for thread in self.workers): self.jobs_queue.put(None) for worker in self.workers: worker.join() if __name__ == '__main__': # Example usage import time from timeit import default_timer as timer from tqdm import tqdm def job(): time.sleep(3) # Simulate a task total = 10 max_workers = 50 with tqdm(total=total) as pbar: scaler = AutoScaler(max_workers=max_workers, pbar=pbar) start = timer() for _ in range(total): time.sleep(1) active_workers = sum(1 for thread in scaler.workers if thread.is_alive()) print(f'{active_workers=}') scaler.work(job) scaler.wait() end = timer() print(f'done in {end - start} seconds')
Python: Download file in chunks with progress bar
import requests from tqdm import tqdm def download_file(url, filename): with requests.get(url, stream=True) as response: response.raise_for_status() total_size_in_bytes = int(response.headers.get('content-length', 0)) block_size = 1024 # 1 Kibibyte progress_bar = tqdm(total=total_size_in_bytes, unit='iB', unit_scale=True) with open(filename, 'wb') as file: for chunk in response.iter_content(chunk_size=block_size): progress_bar.update(len(chunk)) file.write(chunk) progress_bar.close()
Monday, 11 March 2024
Inversion of Control approaches
0. Dependency Injection (DI)
- Description: DI is a design pattern that decouples object creation from usage by injecting dependencies at runtime, enhancing modularity and testability.
- Use Cases: Ideal for complex software systems requiring easy maintenance, testing, and flexibility, such as web applications.
- Drawbacks: Can introduce setup complexity, require a learning curve, and potentially lead to runtime errors if configurations are incorrect.
1. Service Locator Pattern
- Description: Unlike DI, where dependencies are pushed into an object, the Service Locator pattern involves an object requesting its dependencies from a central registry or locator.
- Use Cases: It can be useful in situations where DI might be overly complex or when there's a need for more dynamic resolution of dependencies.
- Drawbacks: It tends to hide the class dependencies, making them less explicit, which can lead to harder-to-read code and potentially more difficult debugging and testing.
2. (Abstract) Factory Pattern
- Description: The Factory Pattern is a creational pattern used to create objects without specifying the exact class of object that will be created. This is achieved by defining an interface for creating an object, but letting subclasses decide which class to instantiate.
- Use Cases: Useful when the creation process is complex or when it should be decoupled from the system's business logic.
- Drawbacks: It doesn't automatically manage dependencies like DI, potentially leading to more boilerplate code if dependencies are complex.
3. Singleton Pattern
- Description: Ensures a class has only one instance and provides a global point of access to it. This can be used to manage shared resources or configurations.
- Use Cases: Suitable for logging, driver objects, caching, thread pools, and configuration settings.
- Drawbacks: Overuse can lead to issues similar to global variables, making code tightly coupled, harder to test, and parallelize.
4. Constructor Injection without a Container
- Description: This approach manually injects dependencies through constructors without using a DI container.
- Use Cases: It's a simpler alternative for applications with fewer dependencies or where introducing a DI container would be considered overkill.
- Drawbacks: Can become cumbersome as the application grows, leading to complex constructor signatures and manual management of dependency graphs.
5. Static Methods/Classes
- Description: Using static methods or classes to provide services or functionalities without instantiating objects.
- Use Cases: Suitable for utility functions or when state management is not a concern.
- Drawbacks: Static methods can lead to tightly coupled code, making testing and mocking more difficult.
6. Registry Pattern
- Description: A registry pattern provides a system-wide registry of services or objects that can be accessed from anywhere in the application.
- Use Cases: Can be used for global access points to services or configurations, similar to service locators but with a simpler implementation.
- Drawbacks: Global access can lead to issues with maintainability and testability, as it introduces global state into an application.
Service Locator: Quick and Dirty Prototype
from __future__ import annotations from typing import TypeVar, Type import inspect import typing T = TypeVar('T') class Services: def __init__(self): self._singletons = {} self._factories = {} def singleton(self, type, service): self._singletons[type] = service type.instance = lambda: self.get(type) def factory(self, type, factory): self._factories[type] = factory type.instance = lambda: self.get(type) def get(self, type: Type[T]) -> T: if type in self._singletons: return self._singletons[type] if type in self._factories: return self._factories[type](self) raise Exception(f'No service found for {type}') def __call__(self, type: Type[T]) -> T: return self.get(type) def __getitem__(self, type: Type[T]) -> T: return self.get(type) def __setitem__(self, type: Type[T], service: T): # if is lambda make it a factory or else a singleton if inspect.isfunction(service): self.factory(type, service) else: self.singleton(type, service) def contains(self, type: Type[T]) -> bool: return type in self._singletons or type in self._factories def new(self, type: Type[T], **kwargs) -> T: if self.contains(type): return self.get(type) return self.call(type, **kwargs) def call(self, callable, **kwargs): signature = inspect.signature(callable) params = dict(signature.parameters) if 'self' in params: del params['self'] fn_kwargs = {} type_hints = typing.get_type_hints(callable) for param_name in params: if param_name in kwargs: fn_kwargs[param_name] = kwargs[param_name] else: required_type = type_hints[param_name] instance = self.get(required_type) fn_kwargs[param_name] = instance return callable(**fn_kwargs) services = Services()
@click.command() def cli(): services[EvaluationRepository] = SqliteEvaluationRepository(session) services[BenchmarkRepository] = SqliteBenchmarkRepository(session) services[RetrieverRepository] = SqliteRetrieverRepository(session)
def __init__(self, projects_path: Path): self.retriever_repository = services[RetrieverRepository] self.projects_path = projects_path self.evaluation_repository = services[EvaluationRepository] self.benchmark_repository = services[BenchmarkRepository]
Conclusion: Did not use it as it removes seams, which makes refactoring in the future more complicated (Working effectively with legacy code), which decreases changability - my metric for good code.
Parse Wikipedia dump
""" This module processes Wikipedia dump files by extracting individual articles and parsing them into a structured format, ...
-
Der Kollektivgeist: Ihr intelligentes Unternehmensgedächtnis Wissen aus den Köpfen der Mitarbeiter extrahieren - Die ...
-
docker pull quay.io/unstructured-io/unstructured-api 20gb image. After docker-pull: docker image inspect --format '{{json .}}' ...