job.py
from fastapi import FastAPI
from celery import Celery
import time
from celery.result import AsyncResult
import os
# Celery configuration
celery_app = Celery(
'tasks',
broker='sqla+sqlite:///celerydb.sqlite',
backend='db+sqlite:///celerydb.sqlite')
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone=os.environ.get('TZ', 'UTC'),
enable_utc=True,
broker_connection_retry_on_startup=True
)
# Celery task
@celery_app.task
def sleep_task(duration):
time.sleep(duration)
return f"Slept for {duration} seconds"
# FastAPI app
app = FastAPI()
@app.post("/create-job/{duration}")
def create_job(duration: int):
task = sleep_task.delay(duration)
return {"task_id": task.id}
@app.get("/running-jobs")
def running_jobs():
i = celery_app.control.inspect()
active_tasks = i.active()
return {"active_tasks": active_tasks}
@app.get("/status/{task_id}")
def get_status(task_id: str):
task_result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result
}
In two different terminals
celery -A job.celery_app worker --loglevel=info uvicorn job:app --reloadUsage Example:
import requests
import time
# API base URL
BASE_URL = "http://localhost:8000"
def create_job(duration):
response = requests.post(f"{BASE_URL}/create-job/{duration}")
return response.json()
def get_running_jobs():
response = requests.get(f"{BASE_URL}/running-jobs")
return response.json()
def get_job_status(task_id):
response = requests.get(f"{BASE_URL}/status/{task_id}")
return response.json()
def main():
# Create a job with 5 seconds duration
job = create_job(2)
task_id = job['task_id']
print(f"Created job with ID: {task_id}")
while True:
status = get_job_status(task_id)
if status['task_status'] == "SUCCESS":
print(f"Job result: {status['task_result']}")
break
print(f"Job status: {status}")
time.sleep(1)
if __name__ == "__main__":
main()
No comments:
Post a Comment