Querying Observable Sequences
In Bridging to Events, we have converted existing DOM and Node.js events into observable sequences to subscribe to them. In this topic, we will look at the first-class nature of observable sequences as IObservable
Using Different Operators
We have already used the create
and range
operators in previous topics to create and return simple sequences. We have also used the fromEvent
and fromEventPattern
operators to convert existing events into observable sequences. In this topic, we will use other operators of the Observable
type so that you can filter, group and transform data. Such operators take observable sequence(s) as input, and produce observable sequence(s) as output.
Combining different sequences
In this section, we will examine some of the operators that combine various observable sequences into a single observable sequence. Notice that data is not transformed when we combine sequences.
In the following sample, we use the Concat operator to combine two sequences into a single sequence and subscribe to it. For illustration purpose, we will use the very simple range(x, y)
operator to create a sequence of integers that starts with x and produces y sequential numbers afterwards.
var source1 = Rx.Observable.range(1, 3);
var source2 = Rx.Observable.range(1, 3);
source1.concat(source2)
.subscribe(console.log.bind(console));
// => 1
// => 2
// => 3
// => 1
// => 2
// => 3
Notice that the resultant sequence is 1,2,3,1,2,3. This is because when you use the concat
operator, the 2nd sequence (source2) will not be active until after the 1st sequence (source1) has finished pushing all its values. It is only after source1 has completed, then source2 will start to push values to the resultant sequence. The subscriber will then get all the values from the resultant sequence.
Compare this with the merge
operator. If you run the following sample code, you will get 1,1,2,2,3,3. This is because the two sequences are active at the same time and values are pushed out as they occur in the sources. The resultant sequence only completes when the last source sequence has finished pushing values.
var source1 = Rx.Observable.range(1, 3);
var source2 = Rx.Observable.range(1, 3);
source1.merge(source2)
.subscribe(console.log.bind(console));
// => 1
// => 1
// => 2
// => 2
// => 3
// => 3
Another comparison can be done with the catch
operator. In this case, if source1 completes without any error, then source2 will not start. Therefore, if you run the following sample code, you will get 1,2,3 only since source2 (which produces 4,5,6) is ignored.
var source1 = Rx.Observable.range(1, 3);
var source2 = Rx.Observable.range(4, 3);
source1.catch(source2)
.subscribe(console.log.bind(console));
// => 1
// => 2
// => 3
Finally, let’s look at onErrorResumeNext
. This operator will move on to source2 even if source1 cannot be completed due to an error. In the following example, even though source1 represents a sequence that terminates with an exception by using the throw
operator, the subscriber will receive values (1,2,3) published by source2. Therefore, if you expect either source sequence to produce any error, it is a safer bet to use onErrorResumeNext
to guarantee that the subscriber will still receive some values.
var source1 = Rx.Observable.throw(new Error('An error has occurred.'));
var source2 = Rx.Observable.range(1, 3);
source1.onErrorResumeNext(source2)
.subscribe(console.log.bind(console));
// => 1
// => 2
// => 3
Projection
The select
or map
operator can translate each element of an observable sequence into another form.
In the following example, we project a sequence of strings into a series of integers representing the length.
var array = ['Reactive', 'Extensions', 'RxJS'];
var seqString = Rx.Observable.from(array);
var seqNum = seqString.map(x => x.length);
seqNum
.subscribe(console.log.bind(console));
// => 8
// => 10
// => 4
In the following sample, which is an extension of the event conversion example we saw in the Bridging with Existing Events topic, we use the select
or map
operator to project the event arguments to a point of x and y. This way, we are transforming a mouse move event sequence into a data type that can be parsed and manipulated further, as can be seen in the next "Filtering" section.
var move = Rx.Observable.fromEvent(document, 'mousemove');
var points = move.map(e => ({x: e.clientX, y: e.clientY }));
points.subscribe(
pos => console.log('Mouse at point ' + pos.x + ', ' + pos.y));
Finally, let’s look at the selectMany
or flatMap
operator. The selectMany
or flatMap
operator has many overloads, one of which takes a selector function argument. This selector function is invoked on every value pushed out by the source observable. For each of these values, the selector projects it into a mini observable sequence. At the end, the selectMany
or flatMap
operator flattens all of these mini sequences into a single resultant sequence, which is then pushed to the subscriber.
The observable returned from selectMany
or flatMap
publishes onCompleted
after the source sequence and all mini observable sequences produced by the selector have completed. It fires onError
when an error has occurred in the source stream, when an exception was thrown by the selector function, or when an error occurred in any of the mini observable sequences.
In the following example, we first create a source sequence which produces an integer every 5 seconds, and decide to just take the first 2 values produced (by using the take
operator). We then use selectMany
or flatMap
to project each of these integers using another sequence of {100, 101, 102}. By doing so, two mini observable sequences are produced, {100, 101, 102} and {100, 101, 102}. These are finally flattened into a single stream of integers of {100, 101, 102, 100, 101, 102} and pushed to the observer.
var source1 = Rx.Observable.interval(5000).take(2);
var proj = Rx.Observable.range(100, 3);
var resultSeq = source1.flatMap(proj);
var subscription = resultSeq.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e.message),
() => console.log('onCompleted'));
// => onNext: 100
// => onNext: 101
// => onNext: 102
// => onNext: 100
// => onNext: 101
// => onNext: 102
// => onCompleted
Filtering
In the following example, we use the generate
operator to create a simple observable sequence of numbers. The generate
operator has several versions including with relative and absolute time scheduling. In our example, it takes an initial state (0 in our example), a conditional function to terminate (fewer than 10 times), an iterator (+1), a result selector (a square function of the current value), and prints out only those smaller than 5 using the filter
or where
operators.
var seq = Rx.Observable.generate(
0,
i => i < 10,
i => i + 1,
i => i * i);
var source = seq.filter(n => n < 5);
var subscription = source.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e.message),
() => console.log('onCompleted'));
// => onNext: 0
// => onNext: 1
// => onNext: 4
// => onCompleted
The following example is an extension of the projection example you have seen earlier in this topic. In that sample, we have used the select
or map
operator to project the event arguments into a point with x and y. In the following example, we use the filter
or where
and select
or map
operators to pick only those mouse movements that we are interested in. In this case, we filter the mouse moves to those over the first bisector (where the x and y coordinates are equal).
var move = Rx.Observable.fromEvent(document, 'mousemove');
var points = move.map(e => ({ x: e.clientX, y: e.clientY }));
var overfirstbisector = points.filter(pos => pos.x === pos.y);
var movesub = overfirstbisector.subscribe(pos => console.log('mouse at ' + pos.x + ', ' pos.y));
Time-based Operation
You can use the Buffer operators to perform time-based operations.
Buffering an observable sequence means that an observable sequence’s values are put into a buffer based on either a specified timespan or by a count threshold. This is especially helpful in situations when you expect a tremendous amount of data to be pushed out by the sequence, and the subscriber does not have the resource to process these values. By buffering the results based on time or count, and only returning a sequence of values when the criteria is exceeded (or when the source sequence has completed), the subscriber can process OnNext calls at its own pace.
In the following example, we first create a simple sequence of integers for every second. We then use the bufferWithCount
operator and specify that each buffer will hold 5 items from the sequence. The onNext
is called when the buffer is full. We then tally the sum of the buffer using Array.reduce
. The buffer is automatically flushed and another cycle begins. The printout will be 10, 35, 60… in which 10=0+1+2+3+4, 35=5+6+7+8+9, and so on.
var seq = Rx.Observable.interval(1000);
var bufSeq = seq.bufferWithCount(5);
bufSeq
.map(arr => arr.reduce((acc, x) => acc + x, 0))
.subscribe(console.log.bind(console));
// => 10
// => 35
// => 60
...
We can also create a buffer with a specified time span in milliseconds. In the following example, the buffer will hold items that have accumulated for 3 seconds. The printout will be 3, 12, 21… in which 3=0+1+2, 12=3+4+5, and so on.
var seq = Rx.Observable.interval(1000);
var bufSeq = seq.bufferWithTime(3000);
bufSeq
.map(arr => arr.reduce((acc, x) => acc + x, 0))
.subscribe(console.log.bind(console));
Note that if you are using any of the buffer*
or window*
operators, you have to make sure that the sequence is not empty before filtering on it.
Operators by Categories
The Operators by Categories topic lists all of the major operators implemented by the Observable
type by their categories; specifically: creation, conversion, combine, functional, mathematical, time, exceptions, miscellaneous, selection and primitives.
See Also
Reference
Concepts