The problem:
Messages
component for communicating with users.This post is not about the problem. The code uses RxJS, but it’s not about RxJS either. I’ll be comparing my initial solution, which should be familiar to anyone who understands reduce
, with a final solution that showcases the conciseness of reactive programming.
TL;DR: Here’s part of the final code that I’ll be explaining. It meets all the requirements. If it seems intriguing, read on. Reactive programming is fun and a good way to explore new ways to solve problems without learning a whole new ecosystem.
I’ve been working with Observables and reactive streams for ~8 years now and even so I didn’t arrive at the above solution immediately. Reaching for a more imperative solution is just so familiar and comfortable, but there was a specific moment that made me zoom out and consider the direction I was heading.
So my initial idea was that this could be solved with reduce
or its similar equivalent scan
in RxJS. I would convert my input streams into actions, and reduce those actions into a list of messages and the current message would be the first in the list.
Simple enough. Now we have an updates$
stream of new
actions and dismiss
actions.
I started off thinking it was trivial; simply slice off the first message for dismiss
, and handle a couple of cases for new
depending on the value of a mustDismiss
property on the first message.
This worked for simple scenarios, but as my test cases expanded to include scenarios with sequences of messages, some with and without mustDismiss
, it became clear that I would need more logic to prune the messages:
Suddenly it’s starting to look “ugly” to me. I’m looking for the next message with mustDismiss
, or skipping to the last message in the list if there isn’t one. I’m sure I could extract a function and express this more clearly, but the imperative processing of messages and all the index juggling sent me exploring for an alternative.
I’d still call this “reactive” code, and I’d still call it “declarative” code, at least from the perspective of the $pendingMessages
stream. Just some of the implementation details slip into imperative land and so it falls short of the “truly reactive” designation.
The thing that had bugged me about the initial solution was revisiting messages that had already been processed. Aside from the “ugly” code, to understand how a message would behave I’d need to understand all the possible actions of the reducer, as any of them could affect my message. In this case it was just two, but it’s not hard to imagine how this scales.
I wanted a solution where the behaviour of a message was declared upfront alongside it. The question to ask was “what is the behaviour of each message?”
I couldn’t have said it better myself. A message is itself, until something happens, at which point it’s discarded (becomes undefined
). Messages with mustDismiss
are only discarded by a dismiss$
update, but the rest by either a dismiss$
or a newMessage$
update.
So now we have a Higher-Order Observable, that is, a stream of streams. The inner streams are our messages and their behaviour. They emit the message, and at some point later they emit undefined, and then they complete.
So how do we consolidate this stream of streams into our currentMessage$
stream? We need to understand the concatAll operator.
Without going into too much detail, the concatAll
operator will ensure that all values from inner streams emit in order, and moves on to the next inner stream when the previous completes.
This flow diagram visualises the journey of messages with mustDismiss
.
The final detail is a subtle one that takes us back to the behaviour of each message. The issue is with the “until B” part in particular:
If we queue up a bunch of messages that discard themselves when dismiss$
emits, then a single dismiss will empty the whole queue, rather than just the first item. Fortunately this isn’t how concatAll
works. It will only set up subscriptions for the first inner stream, until that completes and it moves on to the next. All done then…
Except that for dismissOrNewMessage$
, or rather the OrNewMessage$
part, we want the opposite behaviour. A new message should discard any previous message even when it isn’t currently active; we just want to skip it. To achieve this we need to keep that stream “hot”, keep its subscriptions active.
Our hot
function configures our stream to immediately replay any previous value, and subscribes to that stream until it has a previous value to replay. That way, when concatAll
switches to an inner stream that should be discarded by a later message, dismissOrNewMessage$
will have a value waiting to emit immediately, discarding it.
We’ll also take a quick look at the takeAUntilBThenC
function so we’ve covered everything.
concat
works just like concatAll
but outside of a pipe
chain. In this case we are concat
ing two streams: NEVER.pipe(startWith(a), takeUntil(b$))
and of(c)
.
The latter will simply emit c
. As it is part of a concat
it will do so when the former completes.
NEVER
is a stream that never emits and never completes. We use startWith(a)
to have it emit a
immediately, and takeUntil(b$)
to force it to complete when b$
emits. So emit a
until b$
, then switch to the stream containing only c
.
Here’s the complete code in TypeScript as committed:
Finding the reactive solution was very satisfying; no loops, no indexes, and code that really shouts what it does, particularly for those familiar with reactive terminology. It wasn’t quite the smooth ride presented here though. RxJS makes precise control over observable streams possible, but in doing so has quite a learning curve.
Figuring out which operator to use, even the subtle differences like map(), concatAll()
vs concatMap()
, then which streams should be hot or cold, which streams need replays. tap()
debugging to figure out where the communication breakdown is. A relatively trivial feature became cognitively quite expensive.
I’d highly recommend for any programmer to explore reactive programming. It presents a big challenge to the way most of us learn to code. Solutions can be quite unusual and come together like some kind of puzzle game.