When to use RxJS
Use RxJS for orchestrating asynchronous and event-based computations
Code that deals with more than one event or asynchronous computation gets complicated quickly as it needs to build a state-machine to deal with ordering issues. Next to this, the code needs to deal with successful and failure termination of each separate computation. This leads to code that doesn’t follow normal control-flow, is hard to understand and hard to maintain.
RxJS makes these computations first-class citizens. This provides a model that allows for readable and composable APIs to deal with these asynchronous computations.
Sample
var input = document.getElementById('input');
var dictionarySuggest = Rx.Observable.fromEvent(input, 'keyup')
.map(() => input.value)
.filter(text => !!text)
.distinctUntilChanged()
.throttle(250)
.flatMapLatest(searchWikipedia)
.subscribe(
results => {
list = [];
list.concat(results.map(createItem));
},
err => logError(err)
);
This sample models a common UI paradigm of receiving completion suggestions while the user is typing input.
RxJS creates an observable sequence that models an existing keyup
event on the input.
It then places several filters and projections on top of the event to make the event only fire if a unique value has come through. (The keyup
event fires for every key stroke, so also if the user presses left or right arrow, moving the cursor but not changing the input text).
Next it makes sure the event only gets fired after 250 milliseconds of activity by using the throttle
operator. (If the user is still typing characters, this saves a potentially expensive lookup that will be ignored immediately).
In traditionally written programs, this throttling would introduce separate callbacks through a timer. This timer could potentially throw exceptions (certain timers have a maximum amount of operations in flight).
Once the user input has been filtered down it is time to perform the dictionary lookup. As this is usually an expensive operation (e.g. a request to a server on the other side of the world), this operation is itself asynchronous as well.
The flatMap
/selectMany
operator allows for easy combining of multiple asynchronous operations. It doesn’t only combine success values; it also tracks any exceptions that happen in each individual operation.
In traditionally written programs, this would introduce separate callbacks and a place for exceptions occurring.
If the user types a new character while the dictionary operation is still in progress, we do not want to see the results of that operation anymore. The user has typed more characters leading to a more specific word, seeing old results would be confusing.
The flatMapLatest
operation makes sure that the dictionary operation is ignored once a new keyup
has been detected.
Finally we subscribe to the resulting observable sequence. Only at this time our execution plan will be used. We pass two functions to the subscribe
call:
- Receives the result from our computation.
- Receives exceptions in case of a failure occurring anywhere along the computation.
When to ignore this guideline
If the application/library in question has very few asynchronous/event-based operations or has very few places where these operations need to be composed, the cost of depending on RxJS (redistributing the library as well as the learning curve) might outweigh the cost of manually coding these operations.
Use RxJS to deal with asynchronous sequences of data
Several other libraries exist to aid asynchronous operations in the JavaScript ecosystem. Even though these libraries are powerful, they usually work best on operations that return a single message. They usually do not support operations that produce multiple messages over the lifetime of the operation.
RxJS follows the following grammar: onNext
* (onCompleted
|onError
)?. This allows for multiple messages to come in over time. This makes RxJS suitable for both operations that produce a single message, as well as operations that produce multiple messages.
var fs = require('fs');
var Rx = require('rx');
// Read/write from stream implementation
function readAsync(fd, chunkSize) { /* impl */ }
function appendAsync(fd, buffer) { /* impl */ }
function encrypt(buffer) { /* impl */}
//open a 4GB file for asynchronous reading in blocks of 64K
var inFile = fs.openSync('4GBfile.txt', 'r+');
var outFile = fs.openSync('Encrypted.txt', 'w+');
readAsync(inFile, 2 << 15)
.map(encrypt)
.flatMap(data => appendAsync(outFile, data))
.subscribe(
() => {},
err => {
console.log('An error occurred while encrypting the file: %s', err.message);
fs.closeSync(inFile);
fs.closeSync(outFile);
},
() => {
console.log('Successfully encrypted the file.');
fs.closeSync(inFile);
fs.closeSync(outFile);
}
);
In this sample, a 4 GB file is read in its entirety, encrypted and saved out to another file.
Reading the whole file into memory, encrypting it and writing out the whole file would be an expensive operation.
Instead, we rely on the fact that RxJS can produce many messages.
We read the file asynchronously in blocks of 64K. This produces an observable sequence of byte arrays. We then encrypt each block separately (for this sample we assume the encryption operation can work on separate parts of the file). Once the block is encrypted, it is immediately sent further down the pipeline to be saved to the other file. The writeAsync
operation is an asynchronous operation that can process multiple messages.
When to ignore this guideline
If the application/library in question has very few operations with multiple messages, the cost of depending on RxJS (redistributing the library as well as the learning curve) might outweigh the cost of manually coding these operations.