Message Queues with RabbitMQ
In a microservices architecture, services often need to talk to each other without waiting for an immediate response. A message queue lets a service hand off work and move on, while another service picks up that work whenever it is ready. RabbitMQ is a battle-tested message broker that implements the AMQP protocol, and with the amqplib client it integrates cleanly into Node.js. This decoupling improves resilience: if a consumer is slow or temporarily down, messages wait safely in the queue instead of being lost.
Why asynchronous messaging
Synchronous calls (REST or gRPC) tightly couple the caller to the callee — the caller blocks until the response arrives, and a failure downstream cascades upstream. Asynchronous messaging breaks that coupling. The producer publishes a message and returns instantly; the broker durably stores it and delivers it to one or more consumers. This buys you load leveling (absorbing traffic spikes), retries, and the ability to add new consumers without touching the producer.
| Concept | Role |
|---|---|
| Producer | Publishes messages, never talks to consumers directly |
| Exchange | Receives messages and routes them to queues |
| Routing key | A label the exchange uses to decide where a message goes |
| Queue | Buffers messages until a consumer acknowledges them |
| Consumer | Subscribes to a queue and processes messages |
Exchanges, queues, and routing keys
A producer never publishes directly to a queue — it publishes to an exchange. The exchange inspects the message’s routing key and the bindings between itself and queues, then decides where to deliver. The exchange type governs that logic:
| Exchange type | Routing behaviour |
|---|---|
direct | Delivers to queues whose binding key exactly matches the routing key |
topic | Matches routing keys against patterns with * (one word) and # (zero or more) |
fanout | Broadcasts to every bound queue, ignoring the routing key |
headers | Routes on message header attributes instead of the routing key |
Always declare exchanges and queues with
durable: trueand publish withpersistent: trueif messages must survive a broker restart. A durable queue holding non-persistent messages still loses them on restart.
Running RabbitMQ
The fastest way to get a broker locally is Docker. The management plugin gives you a web UI at http://localhost:15672 (login guest/guest).
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3-management
Install the client into your project:
npm install amqplib
Building a producer
The producer opens a connection, creates a channel, declares the exchange, and publishes. A channel is a lightweight virtual connection multiplexed over the single TCP connection — create one per logical task, not per message.
// producer.js
import amqp from "amqplib";
const EXCHANGE = "orders";
async function publishOrder(order) {
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
await channel.assertExchange(EXCHANGE, "topic", { durable: true });
const routingKey = `order.${order.region}.created`;
const payload = Buffer.from(JSON.stringify(order));
channel.publish(EXCHANGE, routingKey, payload, {
persistent: true,
contentType: "application/json",
});
console.log(`Published ${routingKey} -> #${order.id}`);
await channel.close();
await connection.close();
}
await publishOrder({ id: 101, region: "eu", total: 49.9 });
Output:
Published order.eu.created -> #101
In CommonJS, swap the import for
const amqp = require("amqplib");. The rest of the API is identical.
Building a consumer
The consumer declares the same exchange, creates its own queue, binds the queue with a pattern, and then consumes. Setting prefetch to a small number ensures the broker won’t flood a worker with more messages than it can handle at once — the foundation of fair, round-robin dispatch across multiple workers.
// consumer.js
import amqp from "amqplib";
const EXCHANGE = "orders";
async function startConsumer() {
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
await channel.assertExchange(EXCHANGE, "topic", { durable: true });
// Empty queue name => broker generates a durable, named queue.
const { queue } = await channel.assertQueue("order-processor", {
durable: true,
});
// Receive every "created" event from the EU region.
await channel.bindQueue(queue, EXCHANGE, "order.eu.*");
await channel.prefetch(10);
console.log(`Waiting on queue "${queue}"...`);
channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const order = JSON.parse(msg.content.toString());
await processOrder(order);
channel.ack(msg); // confirm success
} catch (err) {
console.error("Processing failed:", err.message);
// requeue: false sends it to a dead-letter queue if configured
channel.nack(msg, false, false);
}
});
}
async function processOrder(order) {
console.log(`Processing order #${order.id} for ${order.total}`);
}
await startConsumer();
Output:
Waiting on queue "order-processor"...
Processing order #101 for 49.9
Acknowledgements and reliability
By default consume runs in manual-ack mode. RabbitMQ keeps a delivered message in an “unacknowledged” state until you call channel.ack(msg). If the consumer crashes before acking, the broker redelivers the message to another consumer — at-least-once delivery. Use channel.nack(msg, false, false) to reject a poisoned message without requeueing; pair it with a dead-letter exchange so failures are captured for inspection rather than silently dropped.
| Call | Effect |
|---|---|
channel.ack(msg) | Mark processed; broker removes it |
channel.nack(msg, false, true) | Reject and requeue for retry |
channel.nack(msg, false, false) | Reject and drop (or dead-letter) |
{ noAck: true } | Fire-and-forget; no delivery guarantee |
Decoupling services
Because producers and consumers only share the exchange contract (name, type, routing-key convention), you can evolve them independently. Add an analytics consumer that binds order.# to the same exchange and it receives a copy of every order event without the producer knowing it exists. Scale throughput by running multiple instances of the same consumer on one queue — RabbitMQ load-balances deliveries across them automatically.
Best Practices
- Reuse one connection per process and create channels per task; opening a connection per message is expensive.
- Declare exchanges and queues as
durableand publish messages aspersistentfor any data you cannot afford to lose. - Always use manual acknowledgements in production so a crash cannot drop in-flight work.
- Set a sensible
prefetchto enable fair dispatch and protect workers from overload. - Configure dead-letter exchanges to capture failed or rejected messages instead of losing them.
- Handle
connection.on("error")andconnection.on("close")and reconnect with backoff — networks fail. - Keep payloads small and self-describing (JSON with a
contentType); push large blobs to object storage and send a reference.