RabbitMQ Transport
The RabbitMQ transporter connects NestJS microservices over AMQP, giving you durable, broker-backed messaging instead of fire-and-forget broadcasts. Messages are written to a named queue that survives broker restarts and consumer downtime, and each message is acknowledged only after it is successfully processed. This combination of persistence and acknowledgement is why RabbitMQ is the default choice when losing a message—an order, a payment, a job—would be a real problem.
How the RabbitMQ transporter works
Both the client and the server bind to a single named queue. A ClientProxy publishes messages to that queue, and one or more microservice instances consume from it. RabbitMQ load-balances messages across consumers in round-robin fashion, so running multiple replicas of a service scales throughput for free.
For request/response (@MessagePattern / send), NestJS uses a reply queue and a correlation ID to route the response back to the original caller. For events (@EventPattern / emit), the message is delivered once and no reply is expected. Crucially, RabbitMQ holds messages in the queue until a consumer acknowledges them, so a service that is down or restarting picks up its backlog on reconnect rather than missing it.
Installing dependencies
The transporter is driven by amqplib with the amqp-connection-manager wrapper, which handles reconnection.
npm install @nestjs/microservices amqplib amqp-connection-manager
Configuring the server
Bootstrap the microservice with Transport.RMQ. The urls array points at the broker, queue names the queue to consume, and queueOptions.durable makes the queue survive a broker restart.
// main.ts
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.RMQ,
options: {
urls: ['amqp://guest:guest@localhost:5672'],
queue: 'orders_queue',
queueOptions: { durable: true },
noAck: false,
prefetchCount: 10,
},
},
);
await app.listen();
}
bootstrap();
Handlers use the standard decorators. With manual acknowledgement enabled (noAck: false), you control exactly when a message is removed from the queue.
// orders.controller.ts
import { Controller } from '@nestjs/common';
import { Ctx, EventPattern, MessagePattern, Payload, RmqContext } from '@nestjs/microservices';
interface OrderCreatedEvent {
orderId: string;
total: number;
}
@Controller()
export class OrdersController {
@MessagePattern({ cmd: 'sum' })
accumulate(@Payload() data: number[]): number {
return (data || []).reduce((acc, n) => acc + n, 0);
}
@EventPattern('order.created')
async handleOrderCreated(
@Payload() event: OrderCreatedEvent,
@Ctx() context: RmqContext,
): Promise<void> {
const channel = context.getChannelRef();
const originalMessage = context.getMessage();
try {
console.log(`Fulfilling order ${event.orderId} ($${event.total})`);
channel.ack(originalMessage);
} catch (err) {
// requeue: false routes the message to the dead-letter exchange
channel.nack(originalMessage, false, false);
}
}
}
Output:
Fulfilling order 1a2b-3c4d ($129.50)
Message acknowledgement
Acknowledgement is what separates RabbitMQ from at-most-once transports. Two modes are available:
- Automatic (
noAck: true) — RabbitMQ removes a message the moment it is delivered. Fast, but a crash mid-processing loses the message. - Manual (
noAck: false) — the message stays “unacknowledged” until your handler callschannel.ack(). If the consumer dies first, RabbitMQ redelivers the message to another consumer.
Always pair manual mode with explicit ack/nack calls via RmqContext. Forgetting to ack leaks unacknowledged messages, which eventually stall the queue once you hit the prefetch limit.
Warning: With
noAck: falseyou MUST acknowledge every message. A handler that returns without callingackornackwill hold the message open forever, and onceprefetchCountmessages are outstanding the consumer stops receiving new ones.
Prefetch and fair dispatch
By default RabbitMQ pushes as many messages as it can at a consumer, which can overload a slow worker while others sit idle. Setting prefetchCount limits how many unacknowledged messages a single consumer may hold at once, enabling fair, back-pressured dispatch.
options: {
urls: ['amqp://localhost:5672'],
queue: 'orders_queue',
queueOptions: { durable: true },
noAck: false,
prefetchCount: 10, // at most 10 in-flight messages per consumer
}
A low prefetch (1–10) is ideal for heavy, slow jobs; a higher value improves throughput for fast handlers.
Dead-letter handling
Messages that repeatedly fail should not loop forever. Configure a dead-letter exchange (DLX) on the queue via queueOptions.arguments, then nack failures with requeue: false so they are routed to the DLX instead of being redelivered.
options: {
urls: ['amqp://localhost:5672'],
queue: 'orders_queue',
queueOptions: {
durable: true,
arguments: {
'x-dead-letter-exchange': 'orders_dlx',
'x-dead-letter-routing-key': 'orders_failed',
'x-message-ttl': 60000,
},
},
noAck: false,
}
Bind a orders_dlq queue to the orders_dlx exchange to capture failures for inspection, alerting, or manual replay.
Configuring the client
A producer registers a ClientProxy with matching connection options, typically via ClientsModule.
// app.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { GatewayController } from './gateway.controller';
@Module({
imports: [
ClientsModule.register([
{
name: 'ORDERS_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://guest:guest@localhost:5672'],
queue: 'orders_queue',
queueOptions: { durable: true },
},
},
]),
],
controllers: [GatewayController],
})
export class AppModule {}
Inject the client and use send for request/response or emit for durable events.
// gateway.controller.ts
import { Body, Controller, Get, Inject, Post } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { Observable } from 'rxjs';
@Controller('orders')
export class GatewayController {
constructor(@Inject('ORDERS_SERVICE') private readonly client: ClientProxy) {}
@Get('sum')
sum(): Observable<number> {
return this.client.send<number, number[]>({ cmd: 'sum' }, [1, 2, 3, 4]);
}
@Post()
create(@Body() body: OrderCreatedEvent): void {
this.client.emit('order.created', body);
}
}
Connection options
These options live under the options key.
| Option | Type | Default | Purpose |
|---|---|---|---|
urls | string[] | ['amqp://localhost'] | Broker connection URLs; multiple enable failover. |
queue | string | — | Name of the queue to publish to and consume from. |
queueOptions | object | {} | Passed to assertQueue (e.g. durable, arguments). |
noAck | boolean | true | false enables manual acknowledgement. |
prefetchCount | number | 0 | Max unacknowledged messages per consumer. |
persistent | boolean | false | Mark published messages as persistent on disk. |
replyQueue | string | amq.rabbitmq.reply-to | Queue used for request/response replies. |
Best practices
- Set
queueOptions.durable: trueandpersistent: trueso both the queue and its messages survive a broker restart. - Use
noAck: falseand explicitlyackon success,nack(msg, false, false)on unrecoverable failure. - Tune
prefetchCountto match handler speed—low for slow jobs, higher for fast ones—to get fair dispatch. - Configure a dead-letter exchange so poison messages are isolated instead of redelivered forever.
- Make handlers idempotent: at-least-once delivery means a message can legitimately arrive more than once.
- Run multiple consumer replicas on the same queue to scale throughput and gain automatic load balancing.
- Provide several entries in
urlsand rely onamqp-connection-managerto reconnect through broker outages.