Monday, 29 January 2024

FastAPI with Celery: Minimal Example

job.py

from fastapi import FastAPI
from celery import Celery
import time
from celery.result import AsyncResult
import os

# Celery configuration
celery_app = Celery(
        'tasks', 
        broker='sqla+sqlite:///celerydb.sqlite', 
        backend='db+sqlite:///celerydb.sqlite')
celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone=os.environ.get('TZ', 'UTC'),
    enable_utc=True,
    broker_connection_retry_on_startup=True
)

# Celery task
@celery_app.task
def sleep_task(duration):
    time.sleep(duration)
    return f"Slept for {duration} seconds"

# FastAPI app
app = FastAPI()

@app.post("/create-job/{duration}")
def create_job(duration: int):
    task = sleep_task.delay(duration)
    return {"task_id": task.id}

@app.get("/running-jobs")
def running_jobs():
    i = celery_app.control.inspect()
    active_tasks = i.active()
    return {"active_tasks": active_tasks}

@app.get("/status/{task_id}")
def get_status(task_id: str):
    task_result = AsyncResult(task_id, app=celery_app)
    return {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
In two different terminals
celery -A job.celery_app worker --loglevel=info
uvicorn job:app --reload
Usage Example:
import requests
import time

# API base URL
BASE_URL = "http://localhost:8000"

def create_job(duration):
    response = requests.post(f"{BASE_URL}/create-job/{duration}")
    return response.json()

def get_running_jobs():
    response = requests.get(f"{BASE_URL}/running-jobs")
    return response.json()

def get_job_status(task_id):
    response = requests.get(f"{BASE_URL}/status/{task_id}")
    return response.json()

def main():
    # Create a job with 5 seconds duration
    job = create_job(2)
    task_id = job['task_id']
    print(f"Created job with ID: {task_id}")
    
    while True:
        status = get_job_status(task_id)
        if status['task_status'] == "SUCCESS":
            print(f"Job result: {status['task_result']}")
            break
        print(f"Job status: {status}")
        time.sleep(1)

if __name__ == "__main__":
    main()

No comments:

Post a Comment

Parse Wikipedia dump

""" This module processes Wikipedia dump files by extracting individual articles and parsing them into a structured format, ...