The RxJS Contract

Assume the RxJS Grammar

Messages sent to instances of the Observer object follow the following grammar: onNext* (onCompleted | onError)?

This grammar allows observable sequences to send any amount (0 or more) of onNext messages to the subscribed observer instance, optionally followed by a single success (onCompleted) or failure (onError) message.

The single message indicating that an observable sequence has finished ensures that consumers of the observable sequence can deterministically establish that it is safe to perform cleanup operations.

A single failure further ensures that abort semantics can be maintained for operators that work on multiple observable sequences.

Sample

var count = 0;
xs.subscribe(
  () => count++,
  err => console.log('Error: %s', err.message),
  () => console.log('OnNext has been called %d times', count)
);

In this sample we safely assume that the total amount of calls to the OnNext method won’t change once the OnCompleted method is called as the observable sequence follows the Rx grammar.

When to ignore this guideline

Ignore this guideline only when working with a non-conforming implementation of the Observable object.

Assume resources are cleaned up after an onError or onCompleted message

Paragraph 3.1 states that no more messages should arrive after an onError or onCompleted message. This makes it possible to cleanup any resource used by the subscription the moment an onError or onCompleted arrives. Cleaning up resources immediately will make sure that any side-effect occurs in a predictable fashion. It also makes sure that the runtime can reclaim these resources.

Sample

var fs = require('fs');
var Rx = require('rx');

function appendAsync(fd, buffer) { /* impl */ }

function openFile(path, flags) {
  var fd = fs.openSync(path, flags);
  return Rx.Disposable.create(() => fs.closeSync(fd));
}

Rx.Observable.
  using(
    () => openFile('temp.txt', 'w+'),
    fd => Rx.Observable.range(0, 10000).map(v => Buffer(v)).flatMap(buffer => appendAsync(fd, buffer))
  ).subscribe();

In this sample the using operator creates a resource that will be disposed upon unsubscription. The Rx contract for cleanup ensures that unsubscription will be called automatically once an onError or onCompleted message is sent.

When to ignore this guideline

There are currently no known cases where to ignore this guideline.

Assume a best effort to stop all outstanding work on Unsubscribe

When unsubscribe is called on an observable subscription, the observable sequence will make a best effort attempt to stop all outstanding work. This means that any queued work that has not been started will not start.

Any work that is already in progress might still complete as it is not always safe to abort work that is in progress. Results from this work will not be signaled to any previously subscribed observer instances.

Sample 1

Observable.timer(2000).subscribe(...).dispose()

In this sample subscribing to the observable sequence generated by Timer will queue an action on the Scheduler.timeout scheduler to send out an onNext message in 2 seconds. The subscription then gets canceled immediately. As the scheduled action has not started yet, it will be removed from the scheduler.

Sample 2

Rx.Observable.startAsync(() => Q.delay(2000)).subscribe(...).dispose();

In this sample the startAsync operator will immediately schedule the execution of the lambda provided as its argument. The subscription registers the observer instance as a listener to this execution. As the lambda is already running once the subscription is disposed, it will keep running and its return value is ignored.