Creating Observables
Most of the time you consume observables that Angular hands you — HttpClient calls, route params, form value changes — but knowing how to create your own is what unlocks RxJS. RxJS ships a family of creation functions that turn ordinary values, arrays, promises, timers, and DOM events into observable streams. This page walks through the ones you reach for daily and shows when each fits.
The Observable constructor
The lowest-level way to build a stream is the Observable constructor. You pass a subscriber function that receives an observer and decides when to emit values (next), signal completion (complete), or raise an error (error).
import { Observable } from 'rxjs';
const clock$ = new Observable<number>((subscriber) => {
let count = 0;
const id = setInterval(() => subscriber.next(count++), 1000);
// Teardown: runs on unsubscribe, complete, or error
return () => clearInterval(id);
});
const sub = clock$.subscribe((value) => console.log('tick', value));
setTimeout(() => sub.unsubscribe(), 3500);
Output:
tick 0
tick 1
tick 2
The function you return is the teardown logic — RxJS calls it automatically when the subscription ends, which is how you avoid leaking timers, listeners, or sockets.
You rarely need the raw constructor. Prefer the built-in creation functions below — they are concise, well-tested, and already handle teardown. Reach for
new Observableonly when wrapping an API none of them cover.
of — emit a fixed set of values
of takes any number of arguments and emits each one in order, then completes. It is the observable equivalent of a literal.
import { of } from 'rxjs';
of(1, 2, 3).subscribe({
next: (v) => console.log(v),
complete: () => console.log('done'),
});
Output:
1
2
3
done
Note that of([1, 2, 3]) emits the array itself as a single value — it does not iterate it. To stream the elements, use from.
from — arrays, iterables, and promises
from converts an array, iterable, or Promise into a stream. Each array element becomes a separate emission, and a promise resolution becomes a single emission followed by completion.
import { from } from 'rxjs';
from([10, 20, 30]).subscribe((v) => console.log('array', v));
from(fetch('/api/health').then((r) => r.json()))
.subscribe((data) => console.log('promise', data));
Output:
array 10
array 20
array 30
promise { status: 'ok' }
Because from accepts promises, it is a clean bridge when you must adapt a promise-based library into your RxJS pipeline.
interval and timer — time-driven streams
interval(period) emits an incrementing integer every period milliseconds, forever. timer(delay, period?) waits delay ms before its first emission, then optionally repeats every period ms.
import { interval, timer } from 'rxjs';
import { take } from 'rxjs/operators';
interval(1000).pipe(take(3)).subscribe((v) => console.log('interval', v));
// Wait 2s, then emit every 1s
timer(2000, 1000).pipe(take(2)).subscribe((v) => console.log('timer', v));
Output:
interval 0
interval 1
interval 2
timer 0
timer 1
Both run indefinitely unless bounded by an operator like take or terminated by unsubscribing — important in components to prevent leaks.
fromEvent — DOM and EventEmitter streams
fromEvent wraps an event source (a DOM element, window, or anything with addEventListener) into an observable. Inside a standalone component, grab the element via a viewChild signal or ElementRef.
import { Component, ElementRef, viewChild, afterNextRender } from '@angular/core';
import { fromEvent } from 'rxjs';
import { throttleTime, map } from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
@Component({
selector: 'app-search',
standalone: true,
template: `<input #box placeholder="Search…" />`,
})
export class SearchComponent {
private readonly destroyRef = inject(DestroyRef);
box = viewChild.required<ElementRef<HTMLInputElement>>('box');
constructor() {
afterNextRender(() => {
fromEvent(this.box().nativeElement, 'input')
.pipe(
map((e) => (e.target as HTMLInputElement).value),
throttleTime(300),
takeUntilDestroyed(this.destroyRef),
)
.subscribe((term) => console.log('search', term));
});
}
}
fromEvent automatically removes the listener on unsubscribe, and takeUntilDestroyed ties that to the component lifecycle.
Choosing a creation function
| Function | Source | Emissions | Completes? |
|---|---|---|---|
new Observable | custom logic | whatever you push | when you call complete |
of(...values) | literal arguments | each argument, in order | yes |
from(input) | array, iterable, promise | each element / resolved value | yes |
interval(ms) | timer | 0, 1, 2, … every ms | no (infinite) |
timer(delay, period?) | timer | after delay, then per period | only if no period |
fromEvent(target, name) | events | one per event | no (until unsubscribed) |
Best Practices
- Prefer built-in creation functions over
new Observable; only hand-roll a subscriber when wrapping an unsupported API. - Always return teardown logic from a custom
Observableconstructor to release timers, listeners, and connections. - Bound infinite streams (
interval,timer,fromEvent) withtake,takeUntilDestroyed, or explicit unsubscription inside components. - Remember
of([...])emits the array as one value; usefrom([...])to stream the elements individually. - Use
from(promise)to integrate promise-based libraries rather than mixingawaitinto reactive pipelines. - Keep creation at the edges of your pipeline and let operators handle transformation, so streams stay composable and testable.