Skip to content

Queue

The queue layer takes jobs (currently: serialized events bound for queued listeners) and persists them to a backend. A worker process consumes them later.

Drivers

DriverBacked byWhen to useCargo feature
syncruns inline at push time, no persistencedev, tests, “I want events but not actual queueing”always available
redisa Redis list (LPUSH / BRPOP)productionfeatures = ["redis"]

The sync driver is always compiled in. The redis driver is gated behind a cargo feature so apps that don’t need Redis don’t pay the compile cost (the redis crate pulls in TLS deps).

The default scaffold from oxide new enables the feature in Cargo.toml:

oxide = { version = "...", registry = "com7-innovation", features = ["redis"] }

Drop features = ["redis"] if your app will only ever use the sync driver — saves ~30s on cold builds.

Configure connections

config/queue.toml
default = "${QUEUE_CONNECTION:-sync}"
[connections.sync]
driver = "sync"
[connections.redis]
driver = "redis"
url = "${REDIS_URL:-redis://127.0.0.1:6379}"
queue = "default" # default queue name when caller doesn't pass one
block_timeout_secs = 5 # worker BRPOP timeout per cycle

default picks which connection callers get when no name is specified. Add as many [connections.<name>] blocks as you need — different Redis instances, different queues per region, separate sync/redis for testing.

Push a job

In normal use you don’t push directly — register a listener with Mode::queue("redis") and Oxide pushes the event for you when you dispatch(event). See Events.

For diagnostics or custom job producers:

use oxide::queue::{push, Job};
let job = Job {
event_name: "ops.diagnostic_ping".into(),
listener_type: "MyApp::PingListener".into(),
payload: serde_json::json!({ "from": "cli" }),
};
push("redis", "default", &job).await?;

Run a worker

Workers consume one (connection, queues) at a time. The --queue flag accepts a single name or a comma-separated priority list:

Terminal window
# default connection from config/queue.toml, "default" queue
oxide queue:work
# single explicit queue
oxide queue:work --connection=redis --queue=emails
# priority order — drains "high" before "default" before "low"
oxide queue:work --connection=redis --queue=high,default,low
# all the queues you have, one worker
oxide queue:work --connection=redis --queue=emails,payments,analytics

Internally this maps to Redis’s native BRPOP key1 key2 ... timeout — Redis itself walks the keys left-to-right and returns the first non-empty list’s tail. No per-queue round-trips, true priority ordering at the broker level.

Run multiple workers across different queues if you want isolation (slow queue can’t block fast queue) instead of priority:

Terminal window
oxide queue:work --queue=emails & # 2 workers for emails
oxide queue:work --queue=emails &
oxide queue:work --queue=payments & # dedicated payment worker
oxide queue:work --queue=analytics & # dedicated analytics worker

The worker:

  1. BRPOPs every queue in priority order with block_timeout_secs
  2. Decodes the JSON job
  3. Looks up the listener by listener_type in the global event dispatcher
  4. Runs the listener; logs the outcome via tracing (with queue= for which queue served it)
  5. Loops; exits cleanly on SIGINT / SIGTERM

Multiple custom queues

You can route different listener types to different queues — useful when one queue’s work is slow and you don’t want it blocking everything else:

// EventServiceProvider
app.listen::<UserRegistered, SendWelcomeEmail>(Mode::queue("redis").on_queue("emails"));
app.listen::<OrderPaid, ChargeCard> (Mode::queue("redis").on_queue("payments"));
app.listen::<OrderShipped, AnalyticsBlast> (Mode::queue("redis").on_queue("analytics"));

Then run dedicated workers:

Terminal window
oxide queue:work --connection=redis --queue=emails &
oxide queue:work --connection=redis --queue=payments &
oxide queue:work --connection=redis --queue=analytics &

Wire format

Jobs go on the wire as JSON in the Redis list oxide:queue:<queue_name>:

{
"event_name": "auth.user_registered",
"listener_type": "myapp::listeners::SendWelcomeEmail",
"payload": { "user_id": 42, "email": "[email protected]" }
}

listener_type is std::any::type_name::<L>() — stable within a single binary build. The same code that produces and consumes the job; if you rename a listener, redeploy producers and consumers together (drain the queue first).

Dev workflow

Most useful local setup:

  1. Set QUEUE_CONNECTION=sync in .env while developing — listeners run inline, no Redis needed, no worker process.
  2. Switch to QUEUE_CONNECTION=redis (or unset, since redis is the typical prod default) when integration-testing the worker path.
  3. In Docker compose, run a queue service alongside app that runs oxide queue:work.

Production deployment

Run worker(s) as separate container(s) — same image as the app, different command:

compose.prod.yml
services:
app:
image: ghcr.io/your-org/your-app:latest
command: ["serve"]
# ...
queue-default:
image: ghcr.io/your-org/your-app:latest
command: ["queue:work", "--connection=redis", "--queue=default"]
deploy:
replicas: 2
queue-emails:
image: ghcr.io/your-org/your-app:latest
command: ["queue:work", "--connection=redis", "--queue=emails"]
deploy:
replicas: 1

SIGTERM triggers a clean shutdown — the worker finishes its current job, then exits.

Failure handling — failed_jobs table

When a queued listener returns Err(...), the worker writes a row to failed_jobs (in the default DB connection) and continues consuming. You can inspect, retry, or flush failed jobs from the CLI.

One-time setup

Terminal window
oxide vendor:publish queue # publish the failed_jobs migration
oxide migrate # creates the failed_jobs table

Inspect

Terminal window
oxide queue:failed
# id uuid conn queue event
# 1 d7e83f12-... redis default auth.user_registered
# 2 3a91be40-... redis emails orders.shipped

Retry

Terminal window
oxide queue:retry 1 # re-push job id=1 to its original (connection, queue)
oxide queue:retry --all # re-push every failed job

Retrying re-pushes the original payload and removes the failed_jobs row. Listener resolves on the worker side as usual — if a listener no longer exists in the deployed binary, retry will fail again.

Flush

Terminal window
oxide queue:flush # truncate the table

Schema

CREATE TABLE failed_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
uuid VARCHAR(36) UNIQUE,
connection VARCHAR(255),
queue VARCHAR(255),
event_name VARCHAR(255),
listener_type TEXT,
payload TEXT, -- JSON of the original event
exception TEXT, -- formatted error from the listener
failed_at TIMESTAMP
);

Opt out

If your app doesn’t have a sea-orm DB (or you genuinely want log-and-drop behavior), set [failed].driver = "none" in config/queue.toml:

[failed]
driver = "${QUEUE_FAILED_DRIVER:-none}"

The worker will continue logging failures via tracing::error! without writing to any table.

What’s still on the roadmap

  • Exponential-backoff retryMode::queue("redis").tries(3).backoff(Duration::from_secs(2)). Worker re-pushes with attempt counter; only writes to failed_jobs after tries is exhausted.
  • Job-level timeout — per-listener duration cap; worker aborts and re-queues.
  • Reservations — visibility timeout so a crashed worker’s job becomes visible to siblings (sorted-set storage instead of plain list).