Monday, 29 January 2024

FastAPI with Celery: Minimal Example

job.py

from fastapi import FastAPI
from celery import Celery
import time
from celery.result import AsyncResult
import os

# Celery configuration
celery_app = Celery(
        'tasks', 
        broker='sqla+sqlite:///celerydb.sqlite', 
        backend='db+sqlite:///celerydb.sqlite')
celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone=os.environ.get('TZ', 'UTC'),
    enable_utc=True,
    broker_connection_retry_on_startup=True
)

# Celery task
@celery_app.task
def sleep_task(duration):
    time.sleep(duration)
    return f"Slept for {duration} seconds"

# FastAPI app
app = FastAPI()

@app.post("/create-job/{duration}")
def create_job(duration: int):
    task = sleep_task.delay(duration)
    return {"task_id": task.id}

@app.get("/running-jobs")
def running_jobs():
    i = celery_app.control.inspect()
    active_tasks = i.active()
    return {"active_tasks": active_tasks}

@app.get("/status/{task_id}")
def get_status(task_id: str):
    task_result = AsyncResult(task_id, app=celery_app)
    return {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
In two different terminals
celery -A job.celery_app worker --loglevel=info
uvicorn job:app --reload
Usage Example:
import requests
import time

# API base URL
BASE_URL = "http://localhost:8000"

def create_job(duration):
    response = requests.post(f"{BASE_URL}/create-job/{duration}")
    return response.json()

def get_running_jobs():
    response = requests.get(f"{BASE_URL}/running-jobs")
    return response.json()

def get_job_status(task_id):
    response = requests.get(f"{BASE_URL}/status/{task_id}")
    return response.json()

def main():
    # Create a job with 5 seconds duration
    job = create_job(2)
    task_id = job['task_id']
    print(f"Created job with ID: {task_id}")
    
    while True:
        status = get_job_status(task_id)
        if status['task_status'] == "SUCCESS":
            print(f"Job result: {status['task_result']}")
            break
        print(f"Job status: {status}")
        time.sleep(1)

if __name__ == "__main__":
    main()

No comments:

Post a Comment

AI assistance for low value tasks only?

 Can AI only help with low valued tasks? This would be bad. Had the idea thinking about the Podcast with Terence Tao where he says that AI ...