Skip to content
Angular ng libraries 5 min read

RxJS

RxJS (Reactive Extensions for JavaScript) is a standalone library for composing asynchronous and event-based programs using observable sequences. Angular ships with it as a first-class dependency: HttpClient, the router, reactive forms, and EventEmitter all emit observables. Understanding RxJS is therefore not optional for serious Angular work — it is the substrate that everything async flows through. This page covers the core primitives, the operator catalog, and how to wire observables into modern signal-based Angular.

Observables, observers, and subscriptions

An Observable is a lazy, push-based producer of zero, one, or many values over time. Nothing happens until you subscribe — that call starts the producer and returns a Subscription you must eventually tear down to avoid leaks.

import { Observable } from 'rxjs';

const clock$ = new Observable<number>((subscriber) => {
  let count = 0;
  const id = setInterval(() => subscriber.next(count++), 1000);
  // teardown runs on unsubscribe / complete / error
  return () => clearInterval(id);
});

const sub = clock$.subscribe({
  next: (n) => console.log('tick', n),
  error: (err) => console.error(err),
  complete: () => console.log('done'),
});

setTimeout(() => sub.unsubscribe(), 3500);

Output:

tick 0
tick 1
tick 2

By convention observable variables end in $. The trailing unsubscribe() is what stops the interval and runs the teardown function.

Creating observables

RxJS exposes creation functions instead of constructing new Observable by hand. These cover the vast majority of real cases.

FunctionEmits
of(...values)The given values synchronously, then completes
from(arrayOrPromise)Each element / the resolved value
fromEvent(el, 'click')DOM events as they fire
interval(ms)Incrementing numbers on a timer
timer(delay, period?)After a delay, then optionally repeats
EMPTY / throwError(fn)Nothing / an error
import { of, from, interval } from 'rxjs';

of(1, 2, 3).subscribe(console.log);          // 1 2 3
from(fetch('/api/ping')).subscribe(console.log); // the Response

Operators and pipe

Operators are pure functions that transform an observable into a new one. You compose them with pipe(). They never mutate the source — each returns a fresh observable.

import { interval } from 'rxjs';
import { filter, map, take } from 'rxjs';

interval(500)
  .pipe(
    filter((n) => n % 2 === 0),
    map((n) => n * 10),
    take(3),
  )
  .subscribe(console.log);

Output:

0
20
40

The most-used operators fall into a few families:

CategoryOperators
Transformationmap, scan, pluck, bufferTime
Filteringfilter, take, takeUntil, debounceTime, distinctUntilChanged
FlatteningswitchMap, mergeMap, concatMap, exhaustMap
CombinationcombineLatest, forkJoin, withLatestFrom, merge
Error handlingcatchError, retry, retryWhen

Choosing a flattening operator

The “higher-order” operators map each value to an inner observable and flatten the result. Picking the right one matters:

  • switchMap — cancels the previous inner observable. Ideal for type-ahead search where stale requests should be dropped.
  • mergeMap — runs all inner observables concurrently. Use for independent parallel work.
  • concatMap — queues them in order, one at a time. Use when ordering matters.
  • exhaustMap — ignores new values while one is in flight. Great for preventing double form submits.

Using mergeMap for an autocomplete is a classic bug: out-of-order responses can overwrite newer results. Reach for switchMap there.

Subjects and multicasting

A plain Observable is unicast — each subscriber gets its own execution. A Subject is both an observer and an observable, so it multicasts one execution to many subscribers, making it perfect for cross-component event buses and simple state stores.

TypeBehaviour
SubjectNo initial value; emits only to current subscribers
BehaviorSubjectRequires a seed; replays the latest value to new subscribers
ReplaySubjectReplays the last N values
AsyncSubjectEmits only the final value on complete
import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class CartService {
  private readonly count$ = new BehaviorSubject<number>(0);
  readonly itemCount$ = this.count$.asObservable();

  add(): void {
    this.count$.next(this.count$.value + 1);
  }
}

RxJS in modern Angular

In a component, prefer the async pipe so Angular subscribes and unsubscribes for you. For everything else, use takeUntilDestroyed() to tie subscriptions to the component lifecycle, and bridge to signals with toSignal().

import { Component, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { toSignal } from '@angular/core/rxjs-interop';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs';
import { FormControl, ReactiveFormsModule } from '@angular/forms';

@Component({
  selector: 'app-search',
  standalone: true,
  imports: [ReactiveFormsModule],
  template: `
    <input [formControl]="query" placeholder="Search users" />
    @if (results(); as users) {
      <ul>
        @for (u of users; track u.id) {
          <li>{{ u.name }}</li>
        }
      </ul>
    }
  `,
})
export class SearchComponent {
  private readonly http = inject(HttpClient);
  readonly query = new FormControl('', { nonNullable: true });

  readonly results = toSignal(
    this.query.valueChanges.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      switchMap((q) => this.http.get<User[]>(`/api/users?q=${q}`)),
    ),
    { initialValue: [] as User[] },
  );
}

interface User { id: number; name: string; }

toSignal() subscribes under the hood and cleans up automatically when the component is destroyed — no manual unsubscribe and no async pipe needed in the template.

Error handling

Errors terminate an observable. Use catchError to recover with a fallback stream, and retry to re-subscribe after transient failures.

import { of } from 'rxjs';
import { catchError, retry } from 'rxjs';

this.http.get<User[]>('/api/users').pipe(
  retry({ count: 2, delay: 1000 }),
  catchError(() => of([])), // fall back to an empty list
);

Best Practices

  • Suffix observable variables with $ and keep Subject instances private, exposing them via asObservable().
  • Prefer the async pipe or toSignal() over manual subscribe() in components; when you must subscribe, pair it with takeUntilDestroyed().
  • Use switchMap for cancel-on-new flows (search), exhaustMap to guard against double submits, and concatMap when order must be preserved.
  • Always provide a catchError near HttpClient calls so a single failure does not silently kill the stream.
  • Import operators from the top-level rxjs entry point for the smallest tree-shaken bundle.
  • Avoid nesting subscribe inside subscribe — flatten with a higher-order operator instead.
Last updated June 14, 2026
Was this helpful?