job.py
from fastapi import FastAPI from celery import Celery import time from celery.result import AsyncResult import os # Celery configuration celery_app = Celery( 'tasks', broker='sqla+sqlite:///celerydb.sqlite', backend='db+sqlite:///celerydb.sqlite') celery_app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone=os.environ.get('TZ', 'UTC'), enable_utc=True, broker_connection_retry_on_startup=True ) # Celery task @celery_app.task def sleep_task(duration): time.sleep(duration) return f"Slept for {duration} seconds" # FastAPI app app = FastAPI() @app.post("/create-job/{duration}") def create_job(duration: int): task = sleep_task.delay(duration) return {"task_id": task.id} @app.get("/running-jobs") def running_jobs(): i = celery_app.control.inspect() active_tasks = i.active() return {"active_tasks": active_tasks} @app.get("/status/{task_id}") def get_status(task_id: str): task_result = AsyncResult(task_id, app=celery_app) return { "task_id": task_id, "task_status": task_result.status, "task_result": task_result.result }In two different terminals
celery -A job.celery_app worker --loglevel=info uvicorn job:app --reloadUsage Example:
import requests import time # API base URL BASE_URL = "http://localhost:8000" def create_job(duration): response = requests.post(f"{BASE_URL}/create-job/{duration}") return response.json() def get_running_jobs(): response = requests.get(f"{BASE_URL}/running-jobs") return response.json() def get_job_status(task_id): response = requests.get(f"{BASE_URL}/status/{task_id}") return response.json() def main(): # Create a job with 5 seconds duration job = create_job(2) task_id = job['task_id'] print(f"Created job with ID: {task_id}") while True: status = get_job_status(task_id) if status['task_status'] == "SUCCESS": print(f"Job result: {status['task_result']}") break print(f"Job status: {status}") time.sleep(1) if __name__ == "__main__": main()
No comments:
Post a Comment