Meridian Recipes
Celery primer
Celery is the distributed task queue that powers most of Meridian's async pipelines. This primer walks through a minimal installation, a production-shaped worker config, and the retry semantics you want for anything touching the public network. Use it as a baseline before you wire your own enrichment, billing, or webhook fan-out jobs.
1. Install and lay out the project
Pin Celery and its Redis extra to a known-good version. Keep the broker app separate from the rest of your codebase so it can be imported by both the web process and the worker process without triggering Django or FastAPI side effects.
# Install Celery with Redis broker pip install "celery[redis]==5.4.0" # Project layout myapp/ __init__.py celery_app.py tasks.py config.py
2. Configure the worker for safety
The defaults are too optimistic for real workloads. Turn on late acknowledgements, drop the prefetch multiplier to one, and set both a hard and a soft time limit so a stuck task cannot wedge the entire queue. The values below are what we run in production for Meridian.
# celery_app.py
from celery import Celery
app = Celery(
"myapp",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
include=["myapp.tasks"],
)
app.conf.update(
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1,
task_time_limit=300,
task_soft_time_limit=240,
)3. Write retry-aware tasks
Every task that hits the network should be idempotent and should distinguish transient from permanent failures. Bind the task, cap retries, and use a delay long enough to ride out a brief upstream outage without burning your queue depth.
# tasks.py
from .celery_app import app
@app.task(bind=True, max_retries=3, default_retry_delay=30)
def enrich_lead(self, lead_id: int) -> dict:
try:
record = fetch_lead(lead_id)
return score_and_persist(record)
except TransientError as exc:
raise self.retry(exc=exc)
# Start a worker
# celery -A myapp.celery_app worker --loglevel=INFO --concurrency=8