Implement a JavaScript reactive programming library - part 1

From 2014, reactive programming became the new hotness. I use highlandjs for my node.js based project and RxJava for my Java based project. They really eased my coding work, and made the code much easier to read. However, as every developer/team has his own taste and flavor, I want a library that can be customized to my needs and my flavor of coding style. So here comes this series of posts to implement a customizable reactive programming library. It is implemented with javascript, but it is easy to migrate to Java.

You can download/clone the source code of the tutorial in github.
A ready to use library can be found at bouton.js.

What is reactive programming

For those who does not know reactive programming yet, here is an easy to understand definition from The introduction to Reactive Programming you've been missing

Reactive programming is programming with asynchronous data streams

Data streams could be UI event (eg. a series of mouse click), variable (a series of value), timer, etc. Basically you can see anything as a stream in reactive programming.

Several libraries implemented this idea: For javascript developer, there are Rxjs, Bacon.js, Highland.js. In Java world, there is RxJava. You can google "reactive programming" to find more libraries.

An Highland.js example, which doubles the elements in an array:

_([1, 2, 3])
  .map((v) => { return v * 2 })
  .doto(console.log)
  .done();

// outputs:
// 2
// 4
// 6

Brief introduction

Most of the reactive programming library based on a stream like structure. Stream is a sequence of data, data flows through a series of operators, which modifies the data and optionally makes side effects. Take the highlandjs example we mentioned before for instance: you use "_()" function to create a stream, a "map" operator to process the data (double the value), a "doto" operator to make side effects (print the value).

More generally, I'd like to see the program as a graph: each operator is a node, it accepts a signal, processes it, and finally emits a signal. (I use signal here to distinguish it from the event emitted by nodejs event emitter)

From a node's internal point of view, it needs to handle received signal (with onReceive() function) and send signal to all connected down-stream nodes (with send() function).

From external point of view, you can push a signal to it, and observe the outgoing signal.

Node

You can chain your nodes as a stream pipeline or even a graph:

Nodes chained as a graph

The up-stream node send() a signal to the edge that connects them, the edge push() the signal to the down-stream node.

A basic Node implementation

In javascript, we create a Node class (with ES6's class feature and flowtype)

class Node {
  constructor() {}
  push(signal : any) : Node {}
  onReceive(signal : any) : Node {}
  send(signal : any) : Node {}
  observe(observer : (signal : any) => void) : Node {}
}

All methods in Node class return the current node instance to support a fluent style api. For example, you can chain multiple push() to push a series of inputs to the node:

node
  .push(1)
  .push(2)
  .push(3);

You may already notice that a node is actually an event emitter. Instead of directly extending our Node class to EventEmitter, we delegate our Node to an event emitter instance. The reason why we do this is to provide the possibility to reuse/share event emitter in order to save memory. The code looks like:

const EventEmitter = require("eventemitter3");
const uuid = require("node-uuid");

class Node {
  id : string;
  options: any;
  ee: EventEmitter;
  constructor(options : any, eventemitter : ?EventEmitter) {
    this.id = uuid.v1();
    this.options = options;
    this.ee = eventemitter ? eventemitter : new EventEmitter();
  }

  // other functions
  // ...
}

Npm package "eventemitter3" is used rather than nodejs' default event emitter because it is faster and has better browser support.

When send() function is called, an outgoing signal (named "outgoing-<id>" is emitted. We add node id in event name in order to share event emitter among nodes.

push(signal : any) : Node {
  this.onReceive(signal);
  return this;
}

send(signal : any) : Node {
  this.ee.emit("outgoing-" + this.id, signal);
  return this;
}

push() function does not do anything special, it simply pass the signal to onReceive() function.

We don't need to implement the onReceive() function, as it is the subclass' duty to override the onReceive() function to process the incoming signal.

The implementation of observe() function is straight-forward, we simply add the observer function to the observers pool and call these observer functions before a signal is emitted by send() function.

Our first version of Node class looks like following:

Node.js

const EventEmitter = require("eventemitter3");
const uuid = require("node-uuid");

class Node {
  id : string;
  options: any;
  ee: EventEmitter;
  observers : [(signal : any) => void];
  constructor(options : any, eventemitter : ?EventEmitter) {
    this.id = uuid.v1();
    this.options = options;
    this.ee = eventemitter ? eventemitter : new EventEmitter();
    this.observers = [];
  }

  push(signal : any) : Node {
    this.onReceive(signal);
    return this;
  }

  onReceive(signal : any) : Node {
    return this;
  }

  send(signal : any) : Node {
    this.observers.forEach(fn => {
      fn(signal);
    });
    this.ee.emit("outgoing-" + this.id, signal);
    return this;
  }

  observe(observer : (signal : any) => void) : Node {
    this.observers.push(observer);
    return this;
  }
}

module.exports = Node;

Test

Test with nodeunit. We create a new Node subclass, who doubles the signal. At the outgoing end, we observe the output signal, finally, push an input signal to trigger the processing.

exports["test Node"] = {
  "test single node" : (test) => {
    const Node = require("../lib/Node");
    
    // Create a new Node class
    class MyNode extends Node {
      // override onReceive function
      onReceive(signal) {
        console.log(signal);
        this.send(signal * 2);
      }
    }

    // create a node
    var node = new MyNode();
    // observe it
    node.observe((signal) => {
      test.equal(signal, 2);
      test.done();
    });
    // push a value to it
    node.push(1);
  }
}

Wire up the nodes

Until now, our node is just a function (onReceive()), wrapped in a class. To make things interesting, we need to connect nodes together. We add a new function to() in class Node. It accepts a down-stream node as argument, and builds a connection between current node and the down-stream node (by listen to the outgoing-<id> event):

  to(node : Node) : Node {
    this.ee.on("outgoing-" + this.id, (signal) => {
      node.push(signal);
    });

    return node;
  }

The to() function returns the down-stream node to help us build our pipeline in a fluent way.

The highland example we mentioned at the beginning of the page can be implemented using our Node system as following:

class DoubleNode extends Node {
  onReceive(signal) {
    this.send(signal * 2);
  }
}

class PrintNode extends Node {
  onReceive(signal) {
    console.log(signal);
  }
}

var double = new DoubleNode();
var print = new PrintNode();

double
  .to(print);

double
  .push(1)
  .push(2)
  .push(3);

What's next?

This first version of Node implementation is far away from perfect: all the code are running in sync. It will block all I/O on your browser or server when running a time consuming pipeline/graph. We handle this problem in Part 2: Be reactive, don't block the I/O.

Another problem is: there are a lot boilerplate code to write to implement such a simple example. In Part 3: Use operator to reduce boilerplate code, we will build our operator system to reduce these boilerplate codes.