Implement a JavaScript reactive programming library - part 3

Part 3: Use operator to reduce boilerplate code

You can download/clone the source code of the tutorial in github.
A ready to use library can be found at bouton.js.

In part 1 We implemented the Node class to process signal, you can push() a signal to a node, and observe() the output. The node handles the signal in onReceive() function and send() output to next node, which is connected with to() function.

At the end of the part 1, we wrote a lot of boilerplate code to demo a simple double array example.

Double Array example

class DoubleNode extends Node {  
  onReceive(signal) {
    this.send(signal * 2);
  }
}

class PrintNode extends Node {  
  onReceive(signal) {
    console.log(signal);
  }
}

var double = new DoubleNode();  
var print = new PrintNode();

double  
  .to(print);

double  
  .push(1)
  .push(2)
  .push(3);

In this part we try to provide an Operator system to reduce the boilerplate code.

Before we step into the implementation detail, let's first preview what our code looks like at the end of this part:

RP.asList([1, 2, 3])
  .times(2)
  .print();

Interesting right? We will turn a 20+ lines of code into a 3-line code.

Let's do it!

Time to build a module

Until now, we haven't talked about module yet. Our "library" only have one Node class. We create an index.js file in our project to represent our Reactive Programming library:

index.js

const Node = require("./Node");

module.exports = {
  Node : Node
}

We export usefully class/function in index.js file. The Node class we created in part 1 is exposed as Node.

Create your stream/signal source

In our Double Array Example, we used 3 chained push() method to feed our pipeline. The 3 push() played a role of signal source. An alternative way is to create a Node, who sends signal without the stimulation from outside:

class ArraySourceNode extends Node {
  constructor(options, eventemitter) {
    super(options, eventemitter);
    this.source = this.options;
    // send array element as signal
    this.source.forEach(v => this.send(v));
  }
}

var source = new ArraySourceNode();

// class for DoubleNode and PrintNode are not shown here
// ...

source
  .to(double)
  .to(print);

As this ArraySourceNode is a commonly used Node, we export a asList() function in our index.js file.

index.js

const Node = require("./Node");

class ArraySourceNode extends Node {
  constructor(options, eventemitter) {
    super(options, eventemitter);
    this.source = this.options;

    this.source.forEach(v => this.send(v));
  }
}

module.exports = {
  Node : Node,

  asList : (array) => {
    return new ArraySourceNode(array);
  }
}

This is a general way to create your own signal source: You create a function that returns a Node instance, send() signals in the constructor of the Node.

You can register your source to the module in this way:

const RP = require("your RP module path");

class YourSourceNode extends RP.Node {
  constructor(options, eventemitter) {
    super(options, eventemitter);
    // send signal here
  }
}

RP["your source name"] = (... args) => {
  return new YourSourceNode();
}

Now, you can use the asList() method in the Double Array Example

const RP = require("your RP module path");

// class for DoubleNode and PrintNode are not shown here
// ...

RP.asList([1, 2, 3])  
  .to(double)
  .to(print);

Create operator

Same as signal source, Operator is just a function, which returns a node instance.

operator definition

operator : (...args : any) => Node;

Our goal is to replace .to(double) with a special operator .times(2). Remember to() is a method on Node class. We need to add a times() method to the Node class after the class definition. It is quite easy with prototype:

// define your operator, it is a function that returns a node instance
function timesOperator(n) {
  class TimesNode extends Node {
    constructor(options, eventemitter) {
      super(options, eventemitter);
      this.n = this.options;
    }

    onReceive(signal) {
      this.send(signal * this.n);
    }
  }
  return new TimesNode(n);
}
// add it to Node class
Node.prototype["times"] = (n) => {
  let timesNode = timesOperator(n);
  return this.to(timesNode);
}

When we add "times" method to Node class, we called this.to(timesNode) to connect the operator node and return it for further node chaining.

More generally, we export a new function addOperator() to our module:

module.exports = {
  Node : Node,

  addOperator : (name, operator) => {
    function fn(...args) {
      let node = operator(...args);
      return this.to(node);
    };

    Node.prototype[names] = fn;
  }

  asList : (array) => {
    return new ArraySourceNode(array);
  },
}

Now it is time to register the times and print operator. Create a new file Operators.js, and implement/register all your operators here. To use it, you simply require this file at the top of your javascript:

Operators.js

const RP = require("./index");
const Node = RP.Node;

RP.addOperator("times", (n) => {
  class TimesNode extends Node {
    constructor(options, eventemitter) {
      super(options, eventemitter);
      this.n = this.options;
    }

    onReceive(signal) {
      this.send(signal * this.n);
    }
  }
  return new TimesNode(n);
});

RP.addOperator("print", () => {
  class PrintNode extends Node {
    onReceive(signal) {
      console.log(signal);
      this.send(signal);
    }
  }
  return new PrintNode();
});

Test

const RP = require("../lib/index");
// require the Operators file to register them
const Operators = require("../lib/Operators");

RP.asList([1, 2, 3])
  .times(2)
  .print();

Output:

2
4
6

Build your first general purpose operator : map

times and print operator are good, it still requires us to write boilerplate code to create the operator. If you know any reactive programming library, you must be familiar with the general purpose operator map(fn : (signal : any) => signal). It accepts a function to process the signal, the return value of the function will be emitted to the next operator.

Let's implement the map() operator. The operator has one argument fn, which will be passed to the MapNode as options.
In the OnReceive() function, we call the fn function and send() its return value;

Operators.js

RP.addOperator("map", (fn) => {
  class MapNode extends Node {
    constructor(options, eventemitter) {
      super(options, eventemitter);
      this.fn = options;
    }

    onReceive(signal) {
      let out = this.fn(signal);
      this.send(out);
    }
  }
  return new MapNode(fn);
});

With our new map operator, we can rewrite our Double Array Example with following code:

const RP = require("../lib/index");
const Operators = require("../lib/Operators");

RP.asList([1, 2, 3])
  .map(signal => signal * 2)
  .map(console.log);

With addOperator(), you can add other operators like throttle, merge, filter, etc. We will not cover all these operators in this article, a complete version of the library will be available later on github.

What's next?

In next part, we focus on two special signals: error and end. See: Part 4: Handle error and end signal