Implement a JavaScript reactive programming library - part 4

Part 4: Handle error and end signal

You can download/clone the source code of the tutorial in github.
A ready to use library can be found at bouton.js.

We can't avoid errors, so does program. A system without error handling can do nothing serious. What happens if an error is threw when node is processing a signal?

We don't want to stop the stream immediately, instead, we expect the error to be handled by a special error handler.

I like the way, in which highlandjs handles the error: errors are handled by a special operator named "errors". Any error occurred before the errors operator will be caught by it, it also provide a way to resend the error back to the stream.

Example: error handling in highlandjs

_([1, 2, 3])
 .map((v) => {
   // throw an error if value is even
   if (v % 2 == 0) throw new Error("Expect an Odd, but get an Even");
   // for odd value, turn number to a string
   return "value is " + v;  
 })
 .map((v) => {
   return v + "!"
 })
 .errors((err, rethrow) => {
   // print the error message
   console.error(err.message);
 })
 .doto(console.log)
 .done();

// output
// value is 1!
// Expected an Odd, but get an Even
// value is 3!

When an error occurs in a node, it will propagate through the pipeline/graph, until it reaches an error handler. During the propagation, the error will not be seen by the nodes.

In this part, we will implement the same error handling system for our reactive programming library.

Step 1: catch the error

If you go back to our Node class implementation, we never catch the error in onReceive() method. If any error occurs there, your program stops. To handle error, we need to catch it first.

To do that, we delegate the onReceive() function to a new function: onSignal(), and we catch all the errors threw by onSignal(), and propagate it to down-stream node by calling send().

in Node class

onReceive(signal : any) : Node {
  try {
    // delegate to onSignal, but catch the errors
    this.onSignal(signal);
  } catch (error) {
    this.send(error); // propagate to down-stream node
  }
  return this;
}

onSignal(signal : any) : Node {
  // your signal processing code goes here
  this.send(signal); // by default, pass to next node
  return this;
}

Note: By using onSignal() delegation, we turned onReceive() to be a "private" method. You don't override onReceive() method anymore. Instead, move all your signal processing code to onSignal(). They two have the same signature.

Step 2: Propagate and Handle the error

Now we catch the error, and propagate it as a normal signal to the down-stream node. The down-stream node could be an Error Handler, which handles the error, or it could be a normal node, which propagate the error to next node.

Actually, "propagate the error" itself is a kind of error handling, right? So we can use a unified interface to handle error: the onError() function in Node class.

onReceive(signal : any) : Node {
  if (signal instanceof Error) {
    this.onError(signal);
  } else {
    try {
      this.onSignal(signal);
    } catch (error) {
      this.send(error);
    }
  }
  return this;
}

onError(error : Error) : Node {
  // by default propagate the error
  this.send(error);
  return this;
}

We modified onReceive() function to delegate error handling to onError() function. By default, it just propagates the error to next node. You don't need to override this method if your node/operator is not an error handler.

We don't catch any error in onError(), if an error is threw from your error handler, it is better to stop the program and a bug is worth to be recorded.

Step 3: Create "errors" operator

The last step is to create an "errors" operator. It accepts two arguments: error and a rethrow function, which allows the error handler to put the error or signal back to stream

errors operator

function errors(error : Error, rethrow (signal: any) => void) : Node {}

Operators.js

RP.addOperator("errors", (fn) => {
  class ErrorsNode extends Node {
    constructor(options, eventemitter) {
      super(options, eventemitter);
      this.fn = options;
    }

    onError(error) {
      this.fn(error, (signal) => {
        this.send(signal);
      });
    }
  }
  return new ErrorsNode(fn);
});

Test

RP.asList([1, 2, 3])
  .map((v) => {
    // throw an error if value is even
    if (v % 2 == 0) throw new Error("Expect an Odd, but get an Even");
    // for odd value, turn number to a string
    return "value is " + v;
  })
  .map((v) => {
    return v + "!"
  })
  .errors((err, rethrow) => {
    // print the error message
    console.error(err.message);
  })
  .map(v => console.log(v));

What about "END" signal?

End signal is a special signal to inform the end of the data stream. The END handling is similar to Error handling. We first define a special "END" signal in Node class:

Node.END = "__NODE_END__";

Then, we delegate the END signal to onEnd() method. Finally we create a done(fn) operator to handle it.

in Node class

onReceive(signal : any) : Node {
  if (signal instanceof Error) {
    this.onError(signal);
  } else if (signal === Node.END) {
    try {
      this.onEnd(signal);
    } catch (error) {
      this.send(error);
    }
  } else {
    try {
      this.onSignal(signal);
    } catch (error) {
      this.send(error);
    }
  }
  return this;
}

onEnd(signal) : Node {
  // override this function to handle end signal
  this.send(signal);
  return this;
}

Operator.js

RP.addOperator("done", (fn) => {
  class DoneNode extends Node {
    constructor(options, eventemitter) {
      super(options, eventemitter);
      this.fn = options;
    }

    onEnd(signal) {
      this.fn(signal);
    }
  }
  return new DoneNode(fn);
});

Don't forget to modify your asList() signal source to send an END signal to terminate the stream.

What's next?

With the first 4 parts of the tutorial, we have a simple reactive programming library. It is simple but powerful, you can use addOperator() api to extend it with your needs and your coding style.

Of course, there are many things to do to make this library mature, but I can't cover all of them in this tutorial. A more complete/mature version could be found in github later.

To complete the tutorial, I will introduce another very important feature in reactive programming: back pressure in the last part of the tutorial.