Vesa Karvonen
Observables are a rather flexible mechanism and allow for many different styles of programming with very different properties.
It seems that often people learn only a style or two and have misconceptions about the properties of observables.
The intended audience for this is programmers recently introduced to Observables.
I assure you that it was not all immediately obvious to me when I first learned of observables.
Give a sense of the depth and breadth of programming with observables...
...by briefly discussing a few different ways to program with observables.
No claim is made regarding the completeness of this treatment!
interface Observable<T> {
subscribe(observer: Observer<T>);
unsubscribe(observer: Observer<T>);
}
interface Observer<T> {
onValue(value: T): void;
onError(error: any): void;
onEnd(): void;
}
constant(value)
error(value)
o1.flatMap(v => o2)
combine([...os])
merge([...os])
o.scan((s, x) => s1, s0)
o.debounce(ms)
o.skipDuplicates(equals)
Asynchronous operations, time, state are very difficult
Observables can make such code more declarative
Observables are highly composable (4+ monads)
Like what you do with async-await
Linear thread of control with asynchronous ops
const foo = async (...xs) => {
const r = await bar(...xs)
return baz(r)
}
const mapAsync = async (fnA, xs) => {
const ys = []
for (let i = 0; i < xs.length; ++i)
ys.push(await fnA(xs[i]))
return ys
}
o1.flatMap(v => o2)
— to awaitconstant(v)
— to introduce constantssubscribe(observer)
— to startfromNodeCallback(fn)
— interoplazy(() => o)
— to avoid over eagernessconst lazy = toObservable =>
constant(undefined).flatMap(() => toObservable())
const foo = (...) => lazy(() =>
bar(...)
.flatMap(r =>
constant(baz(r))))
const mapObs = (fnO, xs) => lazy(() => {
const ys = []
const loop = i => lazy(() => {
if (i < xs.length)
return fnO(xs[i])
.flatMap(y => {
ys.push(y)
return loop(i+1)
})
else
return constant(ys)
})
return loop(0)
})
Observables compose
Observables are cancellable
And observables can do other things beyond promises
Something has happened or something has changed
Examples
This is probably what comes to mind when you think of observables?
merge([...os])
— gather eventso.scan((s, v) => s1, s0)
— stateupdate(...)
— generalization of scan
o1.flatMap(v => o2)
— do async after evento1.flatMapConcat(v => o2)
— don't dropfromEvents(target, name)
— interopMuch preferable to traditional use of callbacks
Powerful combinators for dealing with time
What something is
Independence from time
Source of truth
o.map(i => o)
— derived propertyo1.flatMapLatest(v => o2)
— async derivedcombine([...os])
— derived from many propsexport default U.withContext(({phase}, {user_state}) => {
const done = US.doneCount(phase, user_state)
const total = US.itemCount(phase, user_state)
const percentage = U.round(U.multiply(U.divide(done, total), 100))
const completed = U.ifte(U.equals(done, total), "completed", "")
return <div className={U.string`progress ${completed}`}>
<div className="progress__bar"/>
<div className="progress__progressed"
style={{width: U.string`${percentage}%`}}/>
<div className="progress__label">
<span>{done}/{total} valmis</span>
</div>
</div>
})
Simple — just map
Stateless properties are RT — can be reconstructed
Time is largely out of the equations
Robust — skip, dup, ... don't matter much
Often less boilerplate
Different mode of thinking
Direct-style
f(x1, x2)
x + y * z
R.add(x, R.multiply(y, z))
Monadic-style
x1.flatMapLatest(x1 =>
x2.flatMapLatest(x2 =>
constant(f(x1, x2))))
Cumbersome, isn't it?
combine([x1, x2], f)
As good as it gets?
const lift = f => (...xs) => combine(xs, f)
const fL = lift(f)
fL(x1, x2)
const ifte = (c, t, e) =>
c.flatMapLatest(c => c ? t : e)
Can be significantly more legible than monadic-style
Need to lift combinators for legibility
Gotchas with lifting techniques — need to fully apply
More limited — you'll occasionally need monadic-style
Side-effects
Asynchronous HTTP requests
In response to events or state changes
Also just async programming on backend (skipped)
o1.flatMapLatest(v => o2)
— do async IOo.debounce(ms)
— avoid too many requestso.throttle(ms)
— avoid too many requestsfromPromise( ??? )
fromBinder(subscribe)
or stream(emitter => ...)
— wrap XMLHttpRequest
Observables are (typically) push-based: An observable (source) synchronously calls the observer (sink)
What if observer is not ready? What about concurrency? Locking?
Back pressure - complex with pushy-style
Pull-style puts consumer in charge
Pull-style is fundamentally different, dual
Glitches
Recursion limitations
Observables are like asynchronous message passing
Rendezvous, like in CSP, is not directly supported