Implement a JavaScript reactive programming library - part 5

Part 5: Back pressure

Before we talk about back pressure, let's first review what our Node looks like:

Push Node

We call this a "push model": data is **push()**ed to downstream node (as data consumer). If the consumer can't consume the data as fast as the producer produce, it will overload. To avoid it, we can adopt a different policy: the consumer requests data to consume when it is ready, the producer waits until it gets a request. This is exactly how the web works: server will not directly push data to browser, it waits for browser's requests (websocket is another thing). We call it pull model.

Pull model design

In push model, upstream node (on left) pushes signal to downstream node (on right), while in pull model, control flow goes inverse, from downstream to upstream (from right to left).

In an external point of view, we can pull() a signal from the node. The observe() the requests made by the node.

In node's internal point of view, it handles the pull() request with onRequset(), and makes a request() to the upstream node. See following diagram:

Push and Pull Node

The data signal still flows from left to right, but under the control of control signal (from right to left).

Rethink of signal source

In push model, signal source generates and emits signal, let's call it an active source, who drives the data flow. All downstream nodes are passive, waiting for data to process.

In pull model, the signal source wait for data requests, called passive source. Now comes the problem: as signal source is passively waiting for request, who will drive the stream?

The answer is: we need a sink node at the end of the stream (the last downstream node) to drive the stream. You can imagine what the sink will do:

// this is just a demo of what sink does
// it simulates the gravity which pull water down
while (signal != END) {
  signal = request(); // request upstream
}

How does back pressure work?

When sink is connected to a stream pipeline, it initiate a request immediately. The request will propagate through the pipeline and finally reach the passive source. The passive source send() a data signal through the left-right flow as what we described in previous parts. The data signal will eventually arrive at the sink, who will make another request() for next data. For some nodes, like errors or filter, the data signal might be blocked, in this case, it is the node's duty to request() next data to process.

We add 4 methods in Node class:

// pull a command to the node
function pull(cmd : any) : Node {
  setImmediate(() => {
    this.onRequest(cmd);
  });
  return this;
}
// handle request
function onRequest(cmd : any) : Node {
  this.request(cmd);
  return this;
}
// make a request to upstream
function request(cmd : any) : Node {
  setImmediate(() => {
    this.ee.emit("request-" + this.id, cmd);
  });
  return this;
}
// callbacks when node is connect by an upstream node
function from(upstream : Node) : Node {
  return this;
}

We allow the request to pass a command to the source, this makes our model flexible to adapt to different flow control requirements. For example, you can pass a number to indicate how many data you request. You can pass a filter, to ask for certain type of data.

The implementation above just provides node the abilities to propagate information back to source, we need to build the back link when calling to() function.

to(downstream : Node) : Node {
  this.ee.on("outgoing-" + this.id, (signal) => {
    downstream.push(signal);
  });
  // for pull model
  downstream.ee.on("request-" + downstream.id, (cmd) => {
    this.pull(cmd);
  });
  downstream.from(this);
  return downstream;
}

Build a passive source and sink

The last step is to build a passive source and a sink to drive the stream. The code is easy to understand:

class PassiveArraySourceNode extends Node {
  constructor(options, eventemitter) {
    super(options, eventemitter);
    this.source = this.options;
    this.index = 0;
  }

  onRequest(cmd) {
    if (this.source.length === this.index) {
      this.send(Node.END);
      this.index++;
    } else if (this.source.length > this.index) {
      this.send(this.source[this.index]);
      this.index++;
    }
  }
}

RP.asList = (array) => {
  return new PassiveArraySourceNode(array);
}

RP.addOperator("sink", () => {
  class SinkNode extends RP.Node {
    onSignal(signal) {
      this.request();
    }

    from(node) {
      this.request();
    }
  }

  return new SinkNode();
});

The source send() one data signal when a request() is received by onRequest(). The sink make a request() when it is connected from() upstream. It will send another request whenever a signal arrives.

Test

RP.asList([1, 2, 3])
  .map(v => {
    console.log(v)
    return v;
  })
  .sink();

Will back pressure works for all your stream?

The answer is no. Pull model requires your source provides a way to control its production speed. Not all the source has this nature. For example, a user mouse click stream. You can't ask for a user to hold and wait your request to click. However, you can use a different way to reduce the pressure with a throttle, which only allows one signal pass in a fixed time window. For example:

RP.fromDOMEvent("click", document.getElementById("btn"))
  .throttle(500)
  .map(v => return 1)
  ... // other operators;

It allows 1 click event to pass every 500 ms.

Does push model still work?

Yes! As you can see we don't change the left-right flow. If the data source is an active source, it will drive the stream.

You can even install a sink with an active source, the sink will send request to the source, but the source will ignore it. So it is actually working in push mode (with less performance, as a useless request will be made for each signal).

What's next?

This is the last part of the tutorial. There are still lots of interesting things to do to make this reactive programming library more powerful and more mature. Feel free to customize and extend it with your needs.

If you want to use this library, you can find it on github bouton.js. It is based on this tutorial, but with more operators and features.