Around 2 weeks ago or so I stumbled over an approach from minamorl towards implementing the Observable pattern that they put into a package called rex.
I can't quite say what it is, but it inspired me to try my hand at an implementation of the ReactiveX standard. Not the entire thing mind you, but enough of the fundamentals that it should be manageable to implement the rest of the spec without coming into conflict with the rest of the codebase.
I have since contributed a reimplementation that does exactly that and should provide a solid starting point that can be built upon to reach a near full implementation of the entire spec. That is version 0.3 of the rex library. Here a short code-example for how it works:
let subject = newSubject[int]()
var receivedValues: seq[int] = @[]
# When
let throttleObservable = subject
.throttle((value: int) => initDuration(milliseconds = 10))
.subscribe((value: int) => receivedValues.add(value))
subject.nextBlock(1)
subject.nextBlock(2)
sleep(10)
subject.nextBlock(3)
sleep(10)
subject.nextBlock(4)
# Then
assert receivedValues == @[1, 3, 4]
It contains:
How it works: Closures. So many closures. It's all closures everywhere. I see closures in my dreams now...
On a more serious note, that is one of the key things making this possible. Observables and everything that inherits from them (like Subjects) have fields storing procs to define their desired behavior on how to emit stuff to somebody that subscribed via Closures. That grants us some flexibility in how Observables generated in different circumstances may want to have different behavior (e.g. Observables generated via map operator). Similarly, Subscriptions store closures on how to unsubscribe that particular subscription.
So Future Subject types like BehaviorSubject/ReplaySubject can simply inherit from Observable and define their own closures on how they should behave.
The procs you use to subscribe etc. in fact are also turned into async closures before they get stored into an Observer.
I'll likely contribute to the library as time goes on since it represents an interesting problem. Maybe somebody finds it interesting to use or contribute further as well.
The idea is to have variables that are inferred from other variables that may change over time be automatically kept in sync.
This is widely used in Angular where HTTP requests are represented via cold observables. It's also in general really useful when you want to react to events and apply extra properties, like rate-limiting the amount of times you react to them etc.
I found I really liked the pattern, thus the library. It shines a lot more of you have an async event loop of course since then you do not need things like completeBlock/nextBlock/doWork since you can rely on the loop to just run the work on the futures eventually, but you can make it work like this as well.