`await_progress_completion monitors asynchronous tasks, signaling completion or stall based on set thresholds. It uses timeout_seconds for maximum wait time, stall_check_interval to check for progress during stalls, and progress_check_interval for regular updates. This approach prevents resource waste on stalled tasks by differentiating between ongoing progress and inactivity, essential for efficient task management in computational systems.
from datetime import datetime, timedelta import time import logging logger = logging.getLogger(__name__) def await_progress_completion(fn, expected_value=None, timeout_seconds: float = 10.0, stall_check_interval: float = 0.5, progress_check_interval: float = 0.3) -> bool: """ Monitors and waits for a numerical progress indicator from a specified function (fn) to stop increasing. The function fn is expected to return a numerical value (e.g., an integer or float) that increases over time to indicate progress. The purpose of this function is to detect when this number stops increasing, indicating that progress may have stalled or completed. Parameters: - fn: The function to monitor for increasing numerical progress. - timeout_seconds: The maximum amount of time to wait for progress before considering the process stalled and timing out. - stall_check_interval: The interval, in seconds, at which to check for progress during a stall condition. - progress_check_interval: The interval, in seconds, at which to regularly check for progress to avoid continuous polling. Returns: - bool: True if progress continues without stalling for the duration of the timeout period; False if progress stalls and no further increase is observed within the timeout period. """ last_progress = None # Track the last known progress value to detect when progress stalls (stops increasing). def stall_for_progress(): """ Handles situations where progress has stalled by waiting for an increase in the progress indicator, checking at set intervals, and implementing a timeout. This approach helps manage scenarios where progress may temporarily halt but is expected to resume. """ nonlocal last_progress logger.debug("Stalled. Waiting for progress.") wait_until = datetime.now() + timedelta(seconds=timeout_seconds) while datetime.now() <= wait_until: current_progress = fn() if current_progress != last_progress: logging.info("Progress made.") last_progress = current_progress return True time.sleep(stall_check_interval) logger.debug("Timeout. No progress.") return False while True: progress = fn() if expected_value is not None and progress == expected_value: logger.debug("Progress complete.") return True if progress != last_progress: logger.debug(f"Progress: {progress}") last_progress = progress else: if not stall_for_progress(): return False time.sleep(progress_check_interval) # Provides a breather between checks to reduce the likelihood of overwhelming the monitored process or system. if __name__ == '__main__': from rich import print class Service: def __init__(self, count_up_to: int): self.start = datetime.now() self.count_up_to = count_up_to def count(self) -> bool: seconds_since_start = (datetime.now() - self.start).total_seconds() res = int(min(seconds_since_start, self.count_up_to)) print(f'counting... {res=}') return res service = Service(3) await_progress_completion(service.count, expected_value=3, timeout_seconds=3.0)
No comments:
Post a Comment