Implement a JavaScript reactive programming library - part 2

Part 2: Be reactive, don't block the I/O

You can download/clone the source code of the tutorial in github.
A ready to use library can be found at bouton.js.

In part 1, we create a Node class to represent the operator of data stream. But there is an issue for the first version of Node class: it is synchronous, and blocks I/O operation.

Let's look at this example:

test/blockIO.js

const Node = require("../lib/Node");
const http = require("http");

class LongRunNode extends Node {
  onReceive(signal) {
    let last, cur, duration = 1000;
    for (let i = 0; i < 20; i++) {  // it takes 20 seconds
      last = new Date().getTime();
      cur = last;
      while (cur - last < duration) { // loop 1 second
        cur = new Date().getTime();
      }
      process.stdout.write(".");
    }
    console.log("");
    this.send(signal + 1);
  }
}
// create a server to handle http I/O
const server = http.createServer((req, res) => {
  console.log("http request");
  res.end('Hello World\n');
});

server.listen(3000, "127.0.0.1", () => {
  // expect a time out at 15th second
  setTimeout(()=>{
    console.log("time out");
  }, 15000);
  // run the long run node
  var node1 = new LongRunNode();
  node1.push(1);
});

We create a LongRunNode, which runs a time consuming task for each signal. During processing, it prints a "." on the console every second. This process takes 20 seconds in total. The test program is running in a http server listens to "http://localhost:3000". Before the node runs, we set a timer to print "time out" after 15 seconds. To test this program, you first run it in your terminal, you will see a "." printed out every second. Then access "http://localhost:3000" in your browser after 5 seconds to make a http request.

We expect to see the following output:

.....http request
..........time out
.....

A http request is handled at time 5, timer triggerred at time 15. However, what you get as output is:

....................
time out
http request

Both timer and I/O request wait for our node to finish before running. Why it does not work? It does not work because javascript is running in a single thread, all time consuming job will occupy the thread and block any I/O and timeout handling until it finishes.

A straight-forward solution is to split our time consuming job into a sequence of small jobs, each takes 5 seconds for example, so that I/O and timers have a chance to run each 5 seconds.

We can do the following:

const Node = require("../lib/Node");
const http = require("http");

class LongRunNode extends Node {
  onReceive(signal) {
    let last, cur, duration = 1000;
    for (let i = 0; i < 5; i++) { // last 5 seconds
      last = new Date().getTime();
      cur = last;
      while (cur - last < duration) {
        cur = new Date().getTime();
      }
      process.stdout.write(".");
    }
    console.log("");
    this.send(signal + 1);
  }
}

const server = http.createServer((req, res) => {
  console.log("http request");
  res.statusCode = 200;
  res.setHeader('Content-Type', 'text/plain');
  res.end('Hello World\n');
});

server.listen(3000, "127.0.0.1", () => {
  setTimeout(()=>{
    console.log("time out");
  }, 15000);
  // chain 4 nodes to make a 20 seconds pipeline
  var node1 = new LongRunNode();
  var node2 = new LongRunNode();
  var node3 = new LongRunNode();
  var node4 = new LongRunNode();
 
  node1
    .to(node2)
    .to(node3)
    .to(node4)

  node1.push(1);
});

Run it again, and access "http://localhost:3000" after 2 or 3 seconds, you will get:

.....
.....
.....
.....
time out
http request

Timer and I/O request still wait for our node to finish running. Why? The reason is we are using sync call to send() signal to next node. It always occupies the callstack. The following simplified callstack showns how does nodeA send signal to nodeB.

nodeA.send(signal)
  |--> nodeA.ee.emit("outgoing-<id-nodeA>", signal)
         |--> nodeB.push(signal) 
                |--> nodeB.onReceive(signal)  

Note: a common misunderstanding of node.js event emitter is that event emitter is async. This is wrong, when event emitter emits an event, all listeners are called in order in a sync way.

Solution: Instead of directly emitting the outgoing signal to the node B, we create a delegation job to emit the signal, and push the delegation job to a queue for future run.

In nodejs we have 3 options to make a job queued for future execution:

  1. process.nextTick()
  2. setImmediate()
  3. setTimeout()

To know their difference, see my another post Understanding JavaScript event loop without a Computer Science degree

In a short word, process.nextTick(fn) runs your fn callback in the future, but before queued I/O callbacks, setImmediate(fn) runs your fn callback in the future, after queued I/O callbacks, before queued timeout callbacks.

process.nextTick -> I/O callback -> setImmediate() -> setTimeout()

We don't want to use process.nextTick as it always runs before I/O callback. So we modify our send() method in Node class with setImmediate():

  send(signal : any) : Node {
    this.observers.forEach(fn => {
      fn(signal);
    });
    setImmediate(() => {
      this.ee.emit("outgoing-" + this.id, signal);
    });
    return this;
  }

Run the blockIO.js again and make a http request at 3rd second. We get the following result:

.....
.....
http request
.....
time out
.....

Much better, timeout is trigger at time 15, but wait, why http request is handled at time 10 not time 5?

The reason is that Javascript VM needs to use the single thread to queue the I/O callback too. Imagine when the first node calls its send() function at the end of the onReceive() function, it queues an "event-emit" task in the queue, and then release the callstack. Javascript VM now has a chance to check I/O and timer with the idled thread/callstack. It found an I/O request, and push it in queue. Now if you look at the queue, there are two tasks:

["event-emit", "http handler"]

Javascript VM takes the first job in queue, execute it, which will trigger the second node to run before handling the http request.

To solve this issue, we need to modify the push() method by calling onReceive() with setImmediate(). In this way, the second node does not directly run when up-stream node send() a signel, it creates another delegation job "run node" to run itself. See the difference of the queue during this process:

queue : ["event-emit", "http handler"]
// ==> run "event-emit" task, will not directly run node 2, queue a "run node" job 
queue : ["http handler", "run node"]
// ==> run "http handler"
queue :["run node"]

We change the Node class' push() function to:

  push(signal : any) : Node {
    setImmediate(() => {
      this.onReceive(signal);
    });
    return this;
  }

Rerun the test, all as expected.

.....
http request
.....
.....
time out
.....

Bravo! now your pipeline is reactive! You don't need to worry about blocking I/O with long chained pipeline. The nodes in your pipeline will be scheduled in Javascript event loop.

What's next?

In next part Part 3: Use operator to reduce boilerplate code, we will implement an Operator system to reduce boilerplate code.