Filtering Operators
Filtering operators decide which values flow through an Observable and which get dropped. Instead of subscribing and writing imperative if checks, you compose declarative operators inside pipe() to keep only the emissions you care about. In Angular this is the backbone of responsive search boxes, debounced form input, and one-shot HTTP calls — and it pairs naturally with signals via toSignal().
How filtering operators work
Every filtering operator returns a new Observable that wraps the source. It inspects each emission and chooses to forward it, drop it, or complete the stream early. They never mutate the source; they create a transformed copy, which is why operator chains are reusable and side-effect free.
import { of } from 'rxjs';
import { filter, map } from 'rxjs/operators';
of(1, 2, 3, 4, 5)
.pipe(
filter((n) => n % 2 === 0),
map((n) => n * 10),
)
.subscribe((value) => console.log(value));
Output:
20
40
filter
filter is the direct analogue of Array.prototype.filter. It runs a predicate against every value and only re-emits the ones that return true. It does not change the value, only whether it passes.
import { fromEvent } from 'rxjs';
import { filter } from 'rxjs/operators';
const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup');
keyup$
.pipe(filter((event) => event.key === 'Enter'))
.subscribe(() => console.log('Submitted via Enter'));
take, first, and takeWhile
These operators limit emissions by count or condition and then complete the stream automatically, which also tears down the subscription — handy for one-shot reads where you would otherwise need to unsubscribe manually.
| Operator | Keeps | Completes after |
|---|---|---|
take(n) | The first n emissions | n values (or source completes first) |
first() | Only the first emission | 1 value |
first(predicate) | First value matching the predicate | First match |
takeWhile(predicate) | Values while predicate is true | First failing value |
import { interval } from 'rxjs';
import { take, takeWhile } from 'rxjs/operators';
interval(1000)
.pipe(take(3))
.subscribe((n) => console.log('take:', n));
interval(1000)
.pipe(takeWhile((n) => n < 3))
.subscribe((n) => console.log('takeWhile:', n));
Output:
take: 0
take: 1
take: 2
takeWhile: 0
takeWhile: 1
takeWhile: 2
first()throws anEmptyErrorif the source completes without emitting. Pass a default value —first(undefined, fallback)— or usetake(1)when an empty stream is acceptable.
debounceTime and throttleTime
Both operators rate-limit a noisy stream, but they sample it differently. debounceTime(ms) waits for a pause — it emits the most recent value only after the source has been quiet for the given duration. throttleTime(ms) emits immediately, then ignores everything for the duration window.
| Operator | Emits | Best for |
|---|---|---|
debounceTime | Latest value after a quiet gap | Search-as-you-type, autosave |
throttleTime | First value, then a cooldown | Scroll, resize, rapid clicks |
import { Component, inject, signal } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { HttpClient } from '@angular/common/http';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { toSignal } from '@angular/core/rxjs-interop';
@Component({
selector: 'app-search',
standalone: true,
imports: [ReactiveFormsModule],
template: `
<input [formControl]="query" placeholder="Search users…" />
@if (results(); as users) {
<ul>
@for (user of users; track user.id) {
<li>{{ user.name }}</li>
}
</ul>
}
`,
})
export class SearchComponent {
private http = inject(HttpClient);
query = new FormControl('', { nonNullable: true });
results = toSignal(
this.query.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap((term) =>
this.http.get<{ id: number; name: string }[]>(
`/api/users?q=${encodeURIComponent(term)}`,
),
),
),
{ initialValue: [] as { id: number; name: string }[] },
);
}
This is the canonical typeahead: debounceTime(300) waits until the user stops typing, distinctUntilChanged() skips duplicate terms, and switchMap cancels any in-flight request when a new term arrives.
distinctUntilChanged
distinctUntilChanged suppresses consecutive duplicate emissions. It compares each value to the previous one (using === by default) and drops it if they match. Note that it only compares against the immediately preceding value, not the entire history.
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
of('a', 'a', 'b', 'b', 'a')
.pipe(distinctUntilChanged())
.subscribe((value) => console.log(value));
Output:
a
b
a
For objects, pass a comparator or use distinctUntilKeyChanged('id') so you compare a stable key rather than object identity:
import { of } from 'rxjs';
import { distinctUntilKeyChanged } from 'rxjs/operators';
of({ id: 1, n: 'x' }, { id: 1, n: 'y' }, { id: 2, n: 'z' })
.pipe(distinctUntilKeyChanged('id'))
.subscribe((u) => console.log(u.id));
Output:
1
2
Best practices
- Combine
debounceTime→distinctUntilChanged→switchMapfor search inputs; the order matters so duplicates are dropped before firing a request. - Prefer
take(1)orfirst()for one-shot reads so the stream completes and self-unsubscribes — no manual teardown needed. - Use
throttleTimefor high-frequency UI events like scroll anddebounceTimewhen you only care about the settled value. - Pass a comparator to
distinctUntilChanged(or usedistinctUntilKeyChanged) when emitting objects, since reference equality rarely does what you want. - Filtering operators are pure — keep predicates side-effect free and put logging in
tap()instead. - For long-lived subscriptions that filtering operators do not complete, pair them with
takeUntilDestroyed()to avoid leaks.