Skip to content
NestJS ns tasks 4 min read

Queues with BullMQ

Some work has no business running inside a request/response cycle: sending emails, transcoding video, generating reports, or calling rate-limited third-party APIs. Pushing that work onto a background queue keeps your HTTP handlers fast and lets you retry, rate-limit, and scale processing independently. NestJS integrates with BullMQ — a Redis-backed queue — through the @nestjs/bullmq package, giving you declarative producers and decorator-driven workers.

Installing and registering BullMQ

BullMQ stores its queues in Redis, so you need a running Redis instance and the bullmq runtime alongside the Nest wrapper. Register the connection once with BullModule.forRoot() in the root module, then register each named queue with BullModule.registerQueue() in the feature module that owns it.

npm install @nestjs/bullmq bullmq
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { MediaModule } from './media/media.module';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: process.env.REDIS_HOST ?? 'localhost',
        port: Number(process.env.REDIS_PORT ?? 6379),
      },
    }),
    MediaModule,
  ],
})
export class AppModule {}
// media/media.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { MediaController } from './media.controller';
import { MediaProcessor } from './media.processor';

@Module({
  imports: [
    BullModule.registerQueue({ name: 'media' }),
  ],
  controllers: [MediaController],
  providers: [MediaProcessor],
})
export class MediaModule {}

Use forRoot() for the shared Redis connection and registerQueue() per feature module. The connection is reused across queues, so you do not open a new socket for each one. For multiple Redis instances, pass a connection override directly on registerQueue().

Adding jobs from a producer

A producer is any provider that injects the queue with @InjectQueue('<name>') and calls queue.add(). The first argument is a job name (useful for routing inside the worker), the second is the JSON-serializable payload, and the third is per-job options such as retries and backoff.

// media/media.controller.ts
import { Body, Controller, Post } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

interface TranscodeDto {
  videoId: string;
  format: 'mp4' | 'webm';
}

@Controller('media')
export class MediaController {
  constructor(@InjectQueue('media') private readonly mediaQueue: Queue) {}

  @Post('transcode')
  async transcode(@Body() dto: TranscodeDto) {
    const job = await this.mediaQueue.add('transcode', dto, {
      attempts: 3,
      backoff: { type: 'exponential', delay: 1000 },
      removeOnComplete: 100,
      removeOnFail: 500,
    });

    return { jobId: job.id, status: 'queued' };
  }
}

The controller returns immediately with a job id while the heavy lifting happens out of band. Common job options:

OptionPurpose
attemptsTotal tries before the job is marked failed
backoffDelay strategy between retries (fixed or exponential)
delayMilliseconds to wait before the job becomes active
priorityLower number runs first when workers are saturated
removeOnCompleteKeep only the last N successful jobs (or true to delete all)
repeatCron-like recurring schedule for the job

Processing jobs in a worker

A consumer is a class decorated with @Processor('<name>') that extends WorkerHost and implements process(). NestJS spins up a BullMQ worker bound to that queue and routes each job to your handler. Branch on job.name to handle multiple job types in one processor.

// media/media.processor.ts
import { Logger } from '@nestjs/common';
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('media')
export class MediaProcessor extends WorkerHost {
  private readonly logger = new Logger(MediaProcessor.name);

  async process(job: Job): Promise<{ output: string }> {
    switch (job.name) {
      case 'transcode': {
        this.logger.log(`Transcoding ${job.data.videoId} -> ${job.data.format}`);
        await job.updateProgress(50);
        const output = `s3://media/${job.data.videoId}.${job.data.format}`;
        await job.updateProgress(100);
        return { output };
      }
      default:
        throw new Error(`Unknown job name: ${job.name}`);
    }
  }

  @OnWorkerEvent('completed')
  onCompleted(job: Job) {
    this.logger.log(`Job ${job.id} completed`);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job, err: Error) {
    this.logger.error(`Job ${job.id} failed: ${err.message}`);
  }
}

Output:

[Nest] 5120  - 06/14/2026, 9:14:02 AM   LOG [MediaProcessor] Transcoding clip-42 -> mp4
[Nest] 5120  - 06/14/2026, 9:14:05 AM   LOG [MediaProcessor] Job 7 completed

The value returned from process() is stored as the job’s returnvalue in Redis, and any thrown error triggers the configured retry/backoff. Use job.updateProgress() to report progress that clients can poll.

Controlling concurrency and rate limits

By default a worker processes one job at a time. Pass options through the @Processor decorator to raise concurrency or cap the throughput against a downstream API.

@Processor('media', {
  concurrency: 5,
  limiter: { max: 10, duration: 1000 }, // at most 10 jobs/second
})
export class MediaProcessor extends WorkerHost {
  // ...
}

Concurrency above 1 means jobs run in parallel within a single Node process. Keep each handler CPU-light or move truly CPU-bound work to a separate sandboxed processor, since a blocking job will stall the whole event loop.

Best Practices

  • Register the Redis connection once with forRoot() and scope each queue to the feature module that produces and consumes it.
  • Always set attempts and a backoff strategy so transient failures (network blips, locked rows) recover automatically.
  • Keep payloads small and JSON-serializable — store large data in a database or object store and pass a reference id, not the blob.
  • Use removeOnComplete/removeOnFail limits so Redis does not grow unbounded with finished jobs.
  • Make handlers idempotent; BullMQ guarantees at-least-once delivery, so a job may run more than once after a crash or retry.
  • Tune concurrency and limiter together to respect downstream rate limits instead of overwhelming third-party APIs.
  • Listen to @OnWorkerEvent('failed') and ship the errors to your monitoring so silent queue backlogs do not go unnoticed.
Last updated June 14, 2026
Was this helpful?