RxJS - make counter with reset stateless?

@Jrop I like this operator Rx.Observable.when. With this one you can reproduce Bacon.update very easy. This is my code and jsbin example:

const {when, fromEvent} = Rx.Observable;

const decObs = fromEvent(document.getElementById('dec'), 'click');
const incObs = fromEvent(document.getElementById('inc'), 'click');
const resetObs = fromEvent(document.getElementById('res'), 'click');

when(
    decObs.thenDo(_ => prev => prev - 1),
    incObs.thenDo(_ => prev => prev + 1),
    resetObs.thenDo(_ => prev => 0)
).startWith(0).scan((prev, f) => f(prev))
.subscribe(v => document.getElementById('out').innerHTML = v);

Also will be better if you look at this Join-calculus, New Release and Joins and this Combining sequences


Following @paulpdaniels answer, here is what i'm using with Ramda:

var hardSet  = Rx.Observable.fromEvent($('#set');
var decRes   = Rx.Observable.fromEvent($('#dec');
var incRes   = Rx.Observable.fromEvent($('#inc');

Rx.Observable.merge(
  incRes.map(function()  { return R.add(1); }),
  decRes.map(function()  { return R.add(-1); }),
  hardSet.map(function() { return R.always(0); })
).scan(function(prev, f) { 
  return f(prev); 
}, 0);

There are two ways of going about this that I can see.

First, there is nothing that says you can't augment your data in the pipeline:

Rx.Observable.merge(
  // decrement
  Rx.Observable.fromEvent($('#dec'), 'click')
    .map(function() { return {delta : -1}; }),

  // increment
  Rx.Observable.fromEvent($('#inc'), 'click')
    .map(function() { return {delta : +1}; }),

  // reset
  Rx.Observable.fromEvent($('#res'), 'click')
    .map(function() { return {reset : true}; })
) // merge
.scan(0, function(acc, value) {
  return value.reset ? 0 : acc + value.delta;
})
.forEach(function(delta) {
  $('#out').text(delta)
});

The above lets you signal downstream that the stream has been reset by adding in a field (note: I cheated, for readability you might want to add reset : false rather than rely on falsey-ness, but it's up to you).

Alternatively, if you think of the reset as actually resetting the stream then you could instead use flatMapLatest to wrap the incrementing and decrementing:

Rx.Observable.fromEvent($('#res'), 'click')
.startWith(null)
.flatMapLatest(function(e) {
  return Rx.Observable.merge(
    // decrement
    Rx.Observable.fromEvent($('#dec'), 'click')
      .map(function() { return -1 }),

    // increment
    Rx.Observable.fromEvent($('#inc'), 'click')
      .map(function() { return +1 })
  )
  .scan(0, function(acc, delta) { return acc + delta })
  .startWith(0);
})
.subscribe(function(value) {
   $('#out').text(value) 
});

This makes the stream a little bit messier than it has to be with the inclusion of two .startWiths to kick off the respective sequences, but if you were opposed to augmenting and wanted the state implicitly controlled by the stream then this would be an approach.