Skip to content
Angular ng rxjs 4 min read

Filtering Operators

Filtering operators decide which values flow through an Observable and which get dropped. Instead of subscribing and writing imperative if checks, you compose declarative operators inside pipe() to keep only the emissions you care about. In Angular this is the backbone of responsive search boxes, debounced form input, and one-shot HTTP calls — and it pairs naturally with signals via toSignal().

How filtering operators work

Every filtering operator returns a new Observable that wraps the source. It inspects each emission and chooses to forward it, drop it, or complete the stream early. They never mutate the source; they create a transformed copy, which is why operator chains are reusable and side-effect free.

import { of } from 'rxjs';
import { filter, map } from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    filter((n) => n % 2 === 0),
    map((n) => n * 10),
  )
  .subscribe((value) => console.log(value));

Output:

20
40

filter

filter is the direct analogue of Array.prototype.filter. It runs a predicate against every value and only re-emits the ones that return true. It does not change the value, only whether it passes.

import { fromEvent } from 'rxjs';
import { filter } from 'rxjs/operators';

const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup');

keyup$
  .pipe(filter((event) => event.key === 'Enter'))
  .subscribe(() => console.log('Submitted via Enter'));

take, first, and takeWhile

These operators limit emissions by count or condition and then complete the stream automatically, which also tears down the subscription — handy for one-shot reads where you would otherwise need to unsubscribe manually.

OperatorKeepsCompletes after
take(n)The first n emissionsn values (or source completes first)
first()Only the first emission1 value
first(predicate)First value matching the predicateFirst match
takeWhile(predicate)Values while predicate is trueFirst failing value
import { interval } from 'rxjs';
import { take, takeWhile } from 'rxjs/operators';

interval(1000)
  .pipe(take(3))
  .subscribe((n) => console.log('take:', n));

interval(1000)
  .pipe(takeWhile((n) => n < 3))
  .subscribe((n) => console.log('takeWhile:', n));

Output:

take: 0
take: 1
take: 2
takeWhile: 0
takeWhile: 1
takeWhile: 2

first() throws an EmptyError if the source completes without emitting. Pass a default value — first(undefined, fallback) — or use take(1) when an empty stream is acceptable.

debounceTime and throttleTime

Both operators rate-limit a noisy stream, but they sample it differently. debounceTime(ms) waits for a pause — it emits the most recent value only after the source has been quiet for the given duration. throttleTime(ms) emits immediately, then ignores everything for the duration window.

OperatorEmitsBest for
debounceTimeLatest value after a quiet gapSearch-as-you-type, autosave
throttleTimeFirst value, then a cooldownScroll, resize, rapid clicks
import { Component, inject, signal } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { HttpClient } from '@angular/common/http';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { toSignal } from '@angular/core/rxjs-interop';

@Component({
  selector: 'app-search',
  standalone: true,
  imports: [ReactiveFormsModule],
  template: `
    <input [formControl]="query" placeholder="Search users…" />
    @if (results(); as users) {
      <ul>
        @for (user of users; track user.id) {
          <li>{{ user.name }}</li>
        }
      </ul>
    }
  `,
})
export class SearchComponent {
  private http = inject(HttpClient);
  query = new FormControl('', { nonNullable: true });

  results = toSignal(
    this.query.valueChanges.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      switchMap((term) =>
        this.http.get<{ id: number; name: string }[]>(
          `/api/users?q=${encodeURIComponent(term)}`,
        ),
      ),
    ),
    { initialValue: [] as { id: number; name: string }[] },
  );
}

This is the canonical typeahead: debounceTime(300) waits until the user stops typing, distinctUntilChanged() skips duplicate terms, and switchMap cancels any in-flight request when a new term arrives.

distinctUntilChanged

distinctUntilChanged suppresses consecutive duplicate emissions. It compares each value to the previous one (using === by default) and drops it if they match. Note that it only compares against the immediately preceding value, not the entire history.

import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

of('a', 'a', 'b', 'b', 'a')
  .pipe(distinctUntilChanged())
  .subscribe((value) => console.log(value));

Output:

a
b
a

For objects, pass a comparator or use distinctUntilKeyChanged('id') so you compare a stable key rather than object identity:

import { of } from 'rxjs';
import { distinctUntilKeyChanged } from 'rxjs/operators';

of({ id: 1, n: 'x' }, { id: 1, n: 'y' }, { id: 2, n: 'z' })
  .pipe(distinctUntilKeyChanged('id'))
  .subscribe((u) => console.log(u.id));

Output:

1
2

Best practices

  • Combine debounceTimedistinctUntilChangedswitchMap for search inputs; the order matters so duplicates are dropped before firing a request.
  • Prefer take(1) or first() for one-shot reads so the stream completes and self-unsubscribes — no manual teardown needed.
  • Use throttleTime for high-frequency UI events like scroll and debounceTime when you only care about the settled value.
  • Pass a comparator to distinctUntilChanged (or use distinctUntilKeyChanged) when emitting objects, since reference equality rarely does what you want.
  • Filtering operators are pure — keep predicates side-effect free and put logging in tap() instead.
  • For long-lived subscriptions that filtering operators do not complete, pair them with takeUntilDestroyed() to avoid leaks.
Last updated June 14, 2026
Was this helpful?