Queue
The queue layer takes jobs (currently: serialized events bound for queued listeners) and persists them to a backend. A worker process consumes them later.
Drivers
| Driver | Backed by | When to use | Cargo feature |
|---|---|---|---|
sync | runs inline at push time, no persistence | dev, tests, “I want events but not actual queueing” | always available |
redis | a Redis list (LPUSH / BRPOP) | production | features = ["redis"] |
The sync driver is always compiled in. The redis driver is gated
behind a cargo feature so apps that don’t need Redis don’t pay the
compile cost (the redis crate pulls in TLS deps).
The default scaffold from oxide new enables the feature in
Cargo.toml:
oxide = { version = "...", registry = "com7-innovation", features = ["redis"] }Drop features = ["redis"] if your app will only ever use the sync
driver — saves ~30s on cold builds.
Configure connections
default = "${QUEUE_CONNECTION:-sync}"
[connections.sync]driver = "sync"
[connections.redis]driver = "redis"url = "${REDIS_URL:-redis://127.0.0.1:6379}"queue = "default" # default queue name when caller doesn't pass oneblock_timeout_secs = 5 # worker BRPOP timeout per cycledefault picks which connection callers get when no name is specified. Add as many [connections.<name>] blocks as you need — different Redis instances, different queues per region, separate sync/redis for testing.
Push a job
In normal use you don’t push directly — register a listener with Mode::queue("redis") and Oxide pushes the event for you when you dispatch(event). See Events.
For diagnostics or custom job producers:
use oxide::queue::{push, Job};
let job = Job { event_name: "ops.diagnostic_ping".into(), listener_type: "MyApp::PingListener".into(), payload: serde_json::json!({ "from": "cli" }),};
push("redis", "default", &job).await?;Run a worker
Workers consume one (connection, queues) at a time. The --queue
flag accepts a single name or a comma-separated priority list:
# default connection from config/queue.toml, "default" queueoxide queue:work
# single explicit queueoxide queue:work --connection=redis --queue=emails
# priority order — drains "high" before "default" before "low"oxide queue:work --connection=redis --queue=high,default,low
# all the queues you have, one workeroxide queue:work --connection=redis --queue=emails,payments,analyticsInternally this maps to Redis’s native BRPOP key1 key2 ... timeout —
Redis itself walks the keys left-to-right and returns the first
non-empty list’s tail. No per-queue round-trips, true priority
ordering at the broker level.
Run multiple workers across different queues if you want isolation (slow queue can’t block fast queue) instead of priority:
oxide queue:work --queue=emails & # 2 workers for emailsoxide queue:work --queue=emails &oxide queue:work --queue=payments & # dedicated payment workeroxide queue:work --queue=analytics & # dedicated analytics workerThe worker:
BRPOPs every queue in priority order withblock_timeout_secs- Decodes the JSON job
- Looks up the listener by
listener_typein the global event dispatcher - Runs the listener; logs the outcome via
tracing(withqueue=for which queue served it) - Loops; exits cleanly on
SIGINT/SIGTERM
Multiple custom queues
You can route different listener types to different queues — useful when one queue’s work is slow and you don’t want it blocking everything else:
// EventServiceProviderapp.listen::<UserRegistered, SendWelcomeEmail>(Mode::queue("redis").on_queue("emails"));app.listen::<OrderPaid, ChargeCard> (Mode::queue("redis").on_queue("payments"));app.listen::<OrderShipped, AnalyticsBlast> (Mode::queue("redis").on_queue("analytics"));Then run dedicated workers:
oxide queue:work --connection=redis --queue=emails &oxide queue:work --connection=redis --queue=payments &oxide queue:work --connection=redis --queue=analytics &Wire format
Jobs go on the wire as JSON in the Redis list oxide:queue:<queue_name>:
{ "event_name": "auth.user_registered", "listener_type": "myapp::listeners::SendWelcomeEmail",}listener_type is std::any::type_name::<L>() — stable within a single binary build. The same code that produces and consumes the job; if you rename a listener, redeploy producers and consumers together (drain the queue first).
Dev workflow
Most useful local setup:
- Set
QUEUE_CONNECTION=syncin.envwhile developing — listeners run inline, no Redis needed, no worker process. - Switch to
QUEUE_CONNECTION=redis(or unset, sinceredisis the typical prod default) when integration-testing the worker path. - In Docker compose, run a
queueservice alongsideappthat runsoxide queue:work.
Production deployment
Run worker(s) as separate container(s) — same image as the app, different command:
services: app: image: ghcr.io/your-org/your-app:latest command: ["serve"] # ...
queue-default: image: ghcr.io/your-org/your-app:latest command: ["queue:work", "--connection=redis", "--queue=default"] deploy: replicas: 2
queue-emails: image: ghcr.io/your-org/your-app:latest command: ["queue:work", "--connection=redis", "--queue=emails"] deploy: replicas: 1SIGTERM triggers a clean shutdown — the worker finishes its current job, then exits.
Failure handling — failed_jobs table
When a queued listener returns Err(...), the worker writes a row to
failed_jobs (in the default DB connection) and continues consuming.
You can inspect, retry, or flush failed jobs from the CLI.
One-time setup
oxide vendor:publish queue # publish the failed_jobs migrationoxide migrate # creates the failed_jobs tableInspect
oxide queue:failed# id uuid conn queue event# 1 d7e83f12-... redis default auth.user_registered# 2 3a91be40-... redis emails orders.shippedRetry
oxide queue:retry 1 # re-push job id=1 to its original (connection, queue)oxide queue:retry --all # re-push every failed jobRetrying re-pushes the original payload and removes the failed_jobs
row. Listener resolves on the worker side as usual — if a listener
no longer exists in the deployed binary, retry will fail again.
Flush
oxide queue:flush # truncate the tableSchema
CREATE TABLE failed_jobs ( id BIGINT PRIMARY KEY AUTO_INCREMENT, uuid VARCHAR(36) UNIQUE, connection VARCHAR(255), queue VARCHAR(255), event_name VARCHAR(255), listener_type TEXT, payload TEXT, -- JSON of the original event exception TEXT, -- formatted error from the listener failed_at TIMESTAMP);Opt out
If your app doesn’t have a sea-orm DB (or you genuinely want
log-and-drop behavior), set [failed].driver = "none" in
config/queue.toml:
[failed]driver = "${QUEUE_FAILED_DRIVER:-none}"The worker will continue logging failures via tracing::error!
without writing to any table.
What’s still on the roadmap
- Exponential-backoff retry —
Mode::queue("redis").tries(3).backoff(Duration::from_secs(2)). Worker re-pushes with attempt counter; only writes tofailed_jobsaftertriesis exhausted. - Job-level timeout — per-listener duration cap; worker aborts and re-queues.
- Reservations — visibility timeout so a crashed worker’s job becomes visible to siblings (sorted-set storage instead of plain list).