Concepts
If you prefer to get into coding quickly, read Cloud Assistant first, and then Install and First App.
To see the big picture first, keep on reading, or watch a video.
And if you are into videos, Programming with Caf.js - Part 1 and Part 2 cover most of what we describe in here.
Components
In Caf.js we build everything with hierarchies of components.
Our component model was inpired by the SmartFrog Framework and the Erlang/OTP libraries.
Unless you are writing plugins, or reusing the component framework for something else, it is unlikely that you will write components from scratch. But if you are curious, you can see how here.
This section is all about using existing ones.
Asynchronous Creation
A component is created with an asynchronous factory method. During component construction we can, for example, look up properties in Redis, and never block the main loop. With factory methods we can swap implementations without modifying the code.
Even though component construction is asynchronous, Caf.js builds a hierarchy of components with a deterministic order. A parent component is only inserted in its parent context after its children are created in left to right order. When the hierarchy is destroyed, the order is reversed. The topmost component is special, it is always there.
Dependencies between components are respected by controlling creation order.
Supervision Trees
A parent component periodically checks the health of its children. If one is missing, or in a failed state, it takes one of these three recovery actions:
- Shutdown remaining children in reverse order, and then restart them all.
- Restart only the missing child.
- Ignore the failure.
What if the component keeps failing? Its parent component eventually fails, and the fault bubbles up. If it reaches the top component, the process exits, and Kubernetes restarts the container somewhere else.
No more zombie containers due to shallow health checking!
How to Navigate the Hierarchy?
Every component has a reference, this.$
, to a context shared with its siblings. Components register with a name, which is unique in that context. For example, if component foo
is created before sibling component bar
, bar
can access foo
with this.$.foo
What about components that are not siblings? Every context has a reference _
to the topmost component that is allowed to see. Children of this component can be accessed with this.$._.$
. For example, the logger component is the first child of the topmost component, visible to others as this.$._.$.log
.
Application code is not allowed to navigate the internal hierarchy. Instead, security proxies to internal components populate this.$
, and you never use _
. For example, this.$.log
is a reference to the logger security proxy, that internally finds the real logger component in $._.$.log
.
Caf.js internals are written in a functional style, hiding these private references in closures.
Describing Hierarchies with JSON
{
"name" : "top",
"module" : "caf_components#supervisor",
"description" : "Supervisor for this app.",
"env": {
"logLevel": "process.env.LOG_LEVEL||DEBUG",
"somethingElse": "process.env.SOMETHING_ELSE||{\"goo\":2}",
"maxRetries" : "process.env.MAX_RETRIES||10",
"retryDelay" : "process.env.RETRY_DELAY||1000"
},
"components": [
{
"name" : "log",
"module": "caf_components#plug_log",
"description": "Logger service",
"env" : {
"logLevel" : "$._.env.logLevel"
}
},
{
"name" : "foo",
"module": "./myApp",
"description": "My app component",
"env" : {
"somethingElse" : "$._.env.somethingElse",
"message" : "Hello"
}
}
]
}
The JSON file above, components.json
, describes a parent component top
with two child components log
and foo
. Since foo
is created after sibling log
, it can safely find the logger with this.$.log
.
The attribute module
identifies the factory method that instantiates the component. For example, for caf_components#supervisor
the factory method will be found with module.require("caf_components").supervisor
.
The attribute env
specifies initialization properties for the component. Three examples of property value types:
Hello
: a JSON serialized type. If JSON parsing fails, we assume it was a string that was not quoted.process.env.LOG_LEVEL||DEBUG
: a system environment property. IfLOG_LEVEL
is not set, we useDEBUG
as default. Resolved values are always JSON serialized types.$._.env.logLevel
: a reference to the topmostenv
. In this case it resolves toprocess.env.LOG_LEVEL||DEBUG
.
To create an instance of the hierarchy, Caf.js merges the instantiation properties with the topmost env
. And this happens before resolving any other properties.
Combined with linking, we can configure internal components without knowing the internal structure of the description.
Patching JSON Descriptions
In most cases we do not create descriptions from scratch. We use a base description as a template, and change a few attributes, or add/delete some components.
To change components.json
create a new description called components++.json
that will be merged with components.json
.
For example, to modify the message
property
{
"name" : "top",
"components": [
{
"name": "foo",
"env": {
"message": "Goodbye" }
}
]
}
or set module
to null
to delete the foo
component
{
"name" : "top",
"components": [
{
"name": "foo",
"module": null }
]
}
or to insert a new component bar
before foo
{
"name" : "top",
"components": [
{ "name": "log" }, {
"name" : "bar",
"module": "./myApp",
"description": "My other app component",
"env" : {
"somethingElse" : "$._.env.somethingElse",
"message" : "Whatever"
}
}
]
}
Note the "touch" operation to indicate the insertion position in the array. Insertion of a component with a new key is right after the last array operation.
When the first array operation inserts a component with a new key, it becomes the first element of the array.
Cloud Assistant
The Cloud Assistant (CA) is the main abstraction provided by the Caf.js framework. The foundation of a CA is the Actor Model. Caf.js makes actors transactional, and mediates access to the external world with transactional plugins.
The internal state of a CA is always consistent with its external commitments, even after server failures.
The Reliable Service Orchestration section described the Actor Model, our transactional plugins, the CA checkpointing implementation with Redis, and the recovery strategies after a failure.
In this section we focus on what all this means for your code.
Hello World
const caf = require('caf_core');
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
return [];
},
async getCounter() {
this.$.log && this.$.log.debug('getCounter()')
return [null, this.state.counter];
}
};
caf.init(module);
The file ca_methods.js
is the entry point for your application. You can change the name of this file by changing configuration properties, but it can be confusing for others reading your code if you do so.
This file exports a mixin of methods for your CA using the property methods
.
There are two types of methods:
- Internal methods called by the framework. They always have a prefix name
__ca_
. For example,__ca_init__()
will be called by Caf.js the first time the CA is initialized. - External methods called by remote clients. They are all the others. For example,
getCounter()
can be invoked by an authorized client to know the current counter value.
Methods are async
methods, and always return an array. This array represents a tuple error-value, where an optional application-level error is the first element of the array, and an optional second element is the value returned to the client when there are no errors.
A Caf.js application loads with caf.init()
. We pass as an argument the current module
object, to facilitate finding descriptions and plugins that share directory with the file ca_methods.js
. The Caf.js loader maintains a list of module
objects, and tries them in sequence until it finds the resource.
A CA keeps internal state in two properties:
this.state
: Checkpointed state, needs to be JSON-serializable, and always modified within a transaction.this.scratch
: Non-transactional state, not checkpointed, but can be anything. Typically used for caching data, or debugging.
Think of a CA as a sealed object. Never add state outside these two properties.
Access plugins with this.$
. For example, this.$.log
returns a proxy to the logger service, and this.$.log.debug()
adds a debug message to the log.
To configure plugins Caf.js uses JSON descriptions, as discussed in First App.
Request Serialization
const caf = require('caf_core');
const util = require('util');
const setTimeoutPromise = util.promisify(setTimeout);
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
return [];
},
async increment() {
this.state.counter = this.state.counter + 1;
await setTimeoutPromise(1000); return this.getCounter();
},
async getCounter() {
return [null, this.state.counter];
}
};
caf.init(module);
A CA serializes requests with a queue. Before processing the next request in the queue, it ensures that the current one has finished, even if that requires waiting for the completion of several asynchronous steps.
Serialization does not reduce performance. There could be thousands of CAs in a single process, and blocking one of them does not affect the others, since each CA has its own queue, and its state is private.
And serialization in Caf.js is a cluster-wide property. Servers can fail, with hundreds of processes spread across many servers running your app, but CA requests will always be processed one after another.
What is the benefit of serialization?
Serialization enables a much simpler programming abstraction.
Let's assume that increment()
is expected to return a unique value each time is called.
The innocent looking increment()
method would have a subtle bug if it were implemented in the same way with existing web application frameworks, such as Express
.
If one request is waiting for the timeout, and another one starts executing, the second one will also update this.state.counter
. When the first one ends the timeout, it will return the current value of this.state.counter
, which was updated by the second request. Now both return the same value.
And races tend to break things when the system is under heavy load, in production, not when doing simple testing. With the convenience of async/await
primitives in modern JavaScript, introducing races is so much easier...
Error Handling
const caf = require('caf_core');
const assert = require('assert');
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
return [];
},
async increment(n) {
if (n < 0) {
return [new Error('Only increment!')]; } else {
this.state.counter = this.state.counter + n;
assert(!isNaN(this.state.counter)); return this.getCounter();
}
},
async getCounter() {
return [null, this.state.counter];
}
};
caf.init(module);
There are two kinds of errors:
- Application-level errors that your code knows best how to handle. These are errors returned in the first element of the array.
- Unrecoverable errors, mostly bugs, which Caf.js does not expect your code to handle gracefully. These are uncaught exceptions thrown by your code.
The client library handles these two types of errors very differently, as we will see soon.
But a CA always aborts the transaction associated with the request. This means that any changes to this.state
are reversed, and all the transactional plugins will also abort, avoiding any external side-effects.
Again, this enables a much simpler programming abstraction.
In the example above, a negative increment is handled as an application-level error, because we assume the client code could do something about it, like suggesting a different service that can decrement.
What happens if the input n
is the string oops
? The comparison with 0
is false
, and we get a NaN
after the addition. The assertion throws, creating an unrecoverable error.
In this case a traditional web application framework would have corrupted the state of this CA forever. With millions of cheap CAs, expected to run for years with minimal supervision, manual recovery is not in the menu. This is a disaster.
Note that we are not encouraging to delay input validation, or promote other bad programming habits, but mistakes happen, and it is a good idea to have a safety net.
External Consistency
const caf = require('caf_core');
const assert = require('assert');
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
return [];
},
async increment() {
this.state.counter = this.state.counter + 1;
this.$.session.notify([this.state.counter]); if (this.state.counter <= 100) {
return this.getCounter();
} else {
return [new Error('Limit exceeded')];
}
},
async getCounter() {
return [null, this.state.counter];
}
};
caf.init(module);
The internal state of a CA is always consistent with its external commitments.
This sounds a bit abstract, let's give an example to clarify what we mean.
Caf.js uses a transactional plugin, session
, to send notifications to clients. We will talk about sessions in the next section but, for this discussion, it is just a plugin with external side-effects.
Calling increment()
adds one to this.state.counter
, and sends a notification with the new counter.
What happens when the counter exceeds 100
? It returns an error, aborting the transaction, and this.state.counter
rolls back to 100
.
But what about the notification? The external world saw that the internal state is 101
, but that is inconsistent with the current value of 100
.
And this is also a problem for smaller values. If we send the notification, and then the server crashes before checkpointing, the state update is lost forever. And the world saw the new value.
In most frameworks the missile has already launched, and there is not much we can do about it.
But in Caf.js we use transactional plugins to externalize state changes. For example, when we call this.$.session.notify()
, the plugin just appends to a log our intent to send the notification, but it does not sent it. Only when the request commits, and the log of actions and the state changes have been checkpointed, the notification is sent.
Transactional plugins compose using a two-phase commit protocol. We can mix plugins from different providers, and commit them atomically, or abort all of them if one cannot commit. But this happens behind the scenes, application code does not see this complexity.
And yet again, this enables a much simpler programming abstraction.
Learn More
A CA can do much more. Update the schema of its internal state safely. Read application properties from the environment. Customize actions after hibernation.
See the jsdoc documentation here.
Autonomous Computation
In the introduction to Autonomous Computation we described a few use cases, and gave an overview of notifications in Caf.js.
This section is all about implementing these ideas in your applications.
Scheduled Methods
The delay plugin schedules a CA method to be invoked once, or multiple times, sometime in the future. The first invocation time can be relative to the current time, or absolute, using UTC time. Repetitions also specify an interval time in seconds and, optionally, a maximum number of invocations. We can also inspect, and cancel, pending invocations at any time.
By regularly scheduling methods we make CAs autonomous. For example, let's keep calling __ca_increment__()
every interval
seconds until we cancel:
exports.methods = {
...
async scheduleRepeat(delay, delta, interval) {
const repeater = this.$.delay.newRepeater(interval);
const id = this.$.delay.scheduleWithOffset(
delay, '__ca_increment__', [delta], repeater, true
);
return [null, id];
},
async cancel(id) {
this.$.delay.cancel(id);
return this.getState();
},
async getPending() {
return [null, this.$.delay.getPending()];
}
async __ca_increment__(delta, id) {
this.$.log.debug(`inc delta:${delta} id:${id}`);
this.state.counter = this.state.counter + delta;
return [];
},
A shortcut to make your CA autonomous, is just to implement the __ca_pulse__()
method, and then the runtime will call it periodically.
How often? The default is every 2 seconds, but you can change that in framework++.json
. Just modify the interval
property of the cron_pulser
component.
Sessions
When spawning a long term activity in the Cloud, it is crucial to manage notifications for offline clients. This means queueing them in the Cloud until clients are ready to read them. But it also means ensuring that Caf.js does not run out of memory with a leaky queue. And when the client is back online, low latency interaction is a must.
What is the best strategy to manage notification queues?
It depends. Limiting queue size is a given, but deciding what to evict is not. Shall we stop queueing new notifications, or remove the old ones, or just keep the last one, or delete redundant ones, or ...
Caf.js sets some sensible limits, but it also makes the notification queues visible to application code. Your autonomous code can supervise the queues.
And a notification queue has a name. A simple name that matches the session name chosen by the client. Swap client devices, type this name, and your notifications will follow you. Or use a different name per device to keep them separately.
Let's show an example
const caf = require('caf_core');
const SESSION = 'admin';
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
this.$.session.limitQueue(1, SESSION); return [];
},
async __ca_pulse__() { this.state.counter = this.state.counter + 1; if (this.state.counter % this.$.props.divisor === 0) { this.$.session.notify([this.state.counter], SESSION); } return []; }, async increment() {
this.state.counter = this.state.counter + 1;
return this.getCounter();
},
async getCounter() {
return [null, this.state.counter];
}
};
caf.init(module);
In this case __ca_pulse__()
increments a counter, and whenever the counter is a multiple of divisor
, it sends a notification to a queue named admin
. Note that this.$.props.divisor
is a property configured in ca++.json
.
Calling limitQueue()
overrides the default configuration, and the queue just keeps the last notification.
When a client connects with a session named admin
, it first receives this last notification. And then, it keeps receiving new notifications in real-time until it disconnects.
Notification Groups
Delivering a notification removes it from the queue. When several clients need to see the same notification, use multiple queues and duplicate notifications.
What if the number of clients is not known a priori? Queues are created on-demand, and we address them with regular expressions.
Let's see an example.
const caf = require('caf_core');
const SESSION = /^admin/;exports.methods = {
async __ca_init__() {
this.state.counter = 0;
return [];
},
async __ca_pulse__() {
this.state.counter = this.state.counter + 1;
if (this.state.counter % this.$.props.divisor === 0) {
this.$.session.notify([this.state.counter], SESSION); }
return [];
},
async getCounter() {
return [null, this.state.counter];
}
};
caf.init(module);
When a client calls getCounter()
, its session name will be added to the notification group, if it starts with admin
. A name suffix could be random, or based on the username, or chosen by the user. The goal is to avoid name collisions with other clients. After being offline, the client could reuse the session name to receive the pending notifications.
Caf.js could run out of memory if it cannot control the number of sessions in the notification group. There is a system-wide property to do that, maxSessions
, that can be changed in ca++.json
, or through environment properties. Similarly, maxMessages
will limit the maximum number of messages in any queue.
When the maxSessions
limit has been reached, any session that is offline, and has not set a custom limit of messages, can be garbage collected. This means that your pending notifications may not be there when you reconnect.
Learn More
A persistent session can guarantee that an stateless client recovers gracefully after a crash, avoiding duplicated requests. See Persistent Session for details, and the app HelloFail for an example.
Client Library
The Caf.js client library is a tiny library that just needs web socket support. It works in the browser, in the Cloud, with your scripts, and with embedded devices. Currently JavaScript-only, but more languages will be added soon.
The goal of this library is to create an authenticated session with a CA, and then provide both a request-response and a notification model, using a common abstraction.
Authentication
JSON Web Tokens (JWTs) embedded in URLs is the main mechanism for client authentication. Tokens are self-describing, typically short-lived, and signed with the private key of the Accounts service. Its corresponding public key is always available, and apps validate tokens locally.
Tokens form a semi-lattice, making it easier to weaken them on-demand. By scoping a token to a particular app, owner, or CA name, we eliminate man-in-the-middle attacks, and reduce the damage of a compromised token.
The Launcher app manages your tokens in the background, with CAs, renewing and weakening them as needed. It provides Single Sign-On (SSO) for all your CAs. Before redirecting to another app, it just adds the correct token to the URL.
This means that the client library has very little work to do. In most cases, it just extracts the token from a fragment in the URL, and sends it to the CA for validation using TLS.
What happens if there is no valid token in the URL?
The client library calls a TokenFactory
hook method that will create the token based on other credentials, for example, your username and password for the Accounts
service.
The client script in our first app shows how to use the SRP TokenFactory
.
In the following examples the security plugin has been disabled.
Request-Response
const caf_cli = require('caf_core').caf_cli;
const URL = 'http://root-hello.localtest.me:3000/#from=foo-ca1&ca=foo-ca1';const s = new caf_cli.Session(URL);
s.onopen = async function() {
try {
let counter = await s.increment(7).getPromise(); console.log(counter);
counter = await s.getCounter().getPromise();
console.log(counter);
s.close();
} catch (err) {
// Application-level error
s.close(err);
}
};
s.onclose = function(err) {
if (err) {
console.log(`Got exception ${err}`);
process.exit(1);
}
console.log('Done OK');
process.exit(0);
};
The script above first creates a session with CA foo-ca1
. There are two fields in the URL fragment, from
and ca
, and they are the same. This means that the client is acting as the owner of this CA, and the CA will be created if needed. Most clients act as owners, and they always have full access. However, in order to claim a from
origin, a matching authentication token (not shown) needs to be part of the URL.
After creating the session the onopen
handler gets called, and we can invoke remote methods on this CA. But there is some magic behind the scenes.
CA methods increment()
and getCounter()
become methods of session s
. The client library has downloaded some metadata from the server, and it has created local methods matching the CA remote methods. These local methods will do sanity checks on the arguments before forwarding requests.
What about error handling?
As we discussed in the Cloud Assistant section, there are two types of errors:
- An Application-level error, i.e., the first element of the returned array, is used by the client library to reject the returned Promise. Therefore,
await
will throw, and the error can be managed inline. - An Unrecoverable error, i.e., exceptions thrown by your CA code, will instead close the session, returning the error in the
onclose
handler.
Just to confuse you, this example handles an application-level error by closing the session with it, a common pattern of simple scripts. But there is no need of closing the session, we could just continue.
What happens if we call multiple methods on the session without await
? The session has a local queue, and serializes requests. Some recoverable errors are also handled transparently by retrying the last request. If you want multiple concurrent requests, create multiple sessions. But remember that the CA will serialize them anyway...
Notifications
const caf_cli = require('caf_core').caf_cli;
const util = require('util');
const setTimeoutPromise = util.promisify(setTimeout);
const URL = 'http://root-hello.localtest.me:3000/#session=admin&from=foo-ca1&ca=foo-ca1';const s = new caf_cli.Session(URL);
s.onopen = async function() {
try {
let counter = await s.increment(7).getPromise();
console.log(counter);
await setTimeoutPromise(10000)
s.close();
} catch (err) {
// Application-level error
s.close(err);
}
};
s.onmessage = function(msg) { const counter = caf_cli.getMethodArgs(msg)[0]; console.log(`Got notification ${counter}`);};s.onclose = function(err) {
if (err) {
console.log(`Got exception ${err}`);
process.exit(1);
}
console.log('Done OK');
process.exit(0);
};
Let's add notifications to our previous example. A couple of changes:
- Provide a session name in the URL. We use the name
admin
to match the queue name in the previous CA example. If missing, the default session name isdefault
. - Create an
onmessage
handler that will receive each notification.
Using getMethodArgs()
we can extract the counter value from the notification.
Learn More
Why do we need getPromise()
? Multi-method requests, that extend the scope of a transaction to a sequence of CA methods.
And more... Encrypt messages end-to-end with Diffie-Hellman keys. Synchronize local time with the Cloud.
See the jsdoc for details.
Trusted Bus
In the Collaborative Multi-tenancy overview, we talked about the importance of bootstrapping trust between users of one app, and how a Trusted Bus abstraction helped with that goal.
A Trusted Bus mediates internal interactions between users, authenticating requests, and enforcing access control policy at the target endpoints.
The emphasis is on internal. Within one app.
The client library is not designed for internal interactions between untrusted peers. Authentication tokens will be compromised. This could be fixed with public-key cryptography, but the performance hit, and key management complexity, makes it impractical.
Can we authenticate requests without tokens, or other crypto?
Sure, if every possible request source is trusted, and requests in transit cannot be modified.
The Caf.js Cloud configures Kubernetes network policy to do just that. The Node.js processes of your application live in their own network bubble, isolated from other apps.
And application code does not directly access the network. Instead, it uses plugins that have been wrapped by security proxies. These proxies do not trust the application code, adding to each request the name of the CA that sends it.
This makes it less likely that a bug in your application will compromise authentication. After all, your code does not do anything, and authentication is transparently handled by proxies.
What about authorization?
Caf.js always performs an access control check at the destination, before calling a CA method. It also makes the caller CA name visible, while the method executes, with this.$.security.getCallerFrom()
.
But your code configures policy. And the rest of this section explains how.
Defaults
The default policy is trust no one. The owner is the only one that can invoke methods on a CA.
And allowed interactions are external-only, using the client library, with owner requests where the from
and ca
properties match, as described in the Client Library section.
No Trusted Bus interaction.
If nothing works, check the security policy first...
Simple Rules
Naming conventions simplify policy.
In Caf.js naming is based on local namespaces.The name of a CA is always relative to its owner, for example, foo-ca1
means the user foo
owns a CA named ca1
. The name of a SharedMap
is always relative to the CA that owns it, for example, foo-ca1-friends
. And so on...
It is easy to enforce and describe policies based on ownership.
For example, to enable CAs with the same owner, using the Trusted Bus, to call the method getCounter()
on each other.
const caf = require('caf_core');
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
const rule = this.$.security.newSimpleRule( 'getCounter', this.$.security.SELF ); this.$.security.addRule(rule); return [];
},
async getCounter() {
this.$.log && this.$.log.debug('getCounter()')
return [null, this.state.counter];
}
};
caf.init(module);
More examples.
To open all the CA methods
const rule = this.$.security.newSimpleRule(null, this.$.security.SELF);
or to enable CA with local name ca3
of my friend joe
, i.e., joe-ca3
.
const rule = this.$.security.newSimpleRule(null, 'joe', 'ca3');
Note that to allow external interactions with the client library there is a "secret" method that also needs to be enabled, __external_ca_touch__
.
Prefer the Trusted Bus over external interaction. It is no longer your code making requests. There are dragons out there...
Aggregate Rules
To describe a dynamic policy, potentially shared by many CAs, Caf.js uses a SharedMap.
A SharedMap is a replicated Distributed Data Structure (DDS) that can only be written by its owner. We describe SharedMap here.
For example, assuming that each owner has a CA named admin
that maintains policy for all its CAs:
const caf = require('caf_core');
const ADMIN_CA = 'admin';
const ADMIN_MAP = 'primaryACL';
const isAdmin = function(self) {
const name = self.__ca_getName__();
return (caf.splitName(name)[1] === ADMIN_CA);
};
const primaryMap = function(self) {
const name = self.__ca_getName__();
return caf.joinName(caf.splitName(name)[0], ADMIN_CA, ADMIN_MAP);
};
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
isAdmin(this) && this.$.sharing.addWritableMap('acl', ADMIN_MAP);
this.$.sharing.addReadOnlyMap(
'aclAgg', primaryMap(this), {isAggregate: true}
);
const rule = this.$.security.newAggregateRule(
'getCounter', 'aclAgg'
);
this.$.security.addRule(rule);
return [];
},
async changePolicy(principal, isAllowed) {
if (isAdmin(this)) {
const $$ = this.$.sharing.$;
if (isAllowed) {
$$.acl.set(principal, true);
} else {
$$.acl.delete(principal);
}
return [];
} else {
return [new Error('Not an admin')];
}
},
async getCounter() {
this.$.log && this.$.log.debug('getCounter()')
return [null, this.state.counter];
}
};
caf.init(module);
Each CA:
- Finds the name of the SharedMap managed by the owner's
admin
CA. - Creates a read-only replica.
- Sets an access control rule based on that replica.
In addition, each admin
CA:
- Creates a writable SharedMap to maintain the set of principals allowed to access the method
getCounter()
. - Provides a
changePolicy()
method to update this SharedMap. Only the owner of theadmin
CA can invoke this method.
Principals can be a CA name, like foo-ca1
, but also the name of a CA owner, such as foo
. For the latter case, all the CAs owned by foo
will be allowed.
Delegation
Maintaining the quickly changing membership of a large group is a team effort.
With Caf.js, it is very easy to get help from others by linking SharedMaps.
The security library will traverse the resulting graph, and a principal is in the allowed set if it is in any of the reachable SharedMaps. The graph can have cycles too.
And what's cool is that a SharedMap is a scalable distributed data structure, updated with transactional plugins, with clean semantics that do not break the Actor model. Supporting thousands of processes across many servers...
This means that access control policy will scale with your app. And policy changes should be predictable, and propagate quickly.
Let's show how linking works:
const LINK_KEY = '__link_key__';
exports.methods = {
\\ after changePolicy()
async linkNamespace(ns, isAllowed) {
if (isAdmin(this)) {
const $$ = this.$.sharing.$;
const all = new Set($$.acl.get(LINK_KEY) || []);
isAllowed ? all.add(ns) : all.delete(ns);
$$.acl.set(LINK_KEY, [...all]);
return [];
} else {
return [new Error('Not an admin')];
}
},
\\ before getCounter()
}
The default key for linking is __link_key__
. Its value is an array with the names of other SharedMaps
, for example, ["antonio-admin-friends", "joe-admin-acl"]
. Only the owner of the admin
CA can change it, using the linkNamespace()
method.
Learn More
Decentralized authorization systems, such as SDSI, inspired our approach to access control policy. More details here.
Publish-Subscribe
Caf.js supports a traditional publish-subscribe communication pattern. There are two kinds of topics:
- Private topics where only one CA can publish, but anybody can subscribe to receive them. The name of the topic is prefixed by the CA name. For example, if the owner CA name is
foo-ca1
, a valid private topic isfoo-ca1-mynews
. - Forum topics where anybody can publish or subscribe to. They always have the prefix
forum-
, for example,forum-othernews
.
Subscribers dedicate a method for handling messages. It is recommended that this method is internal, i.e., prefixed by __ca_
, and therefore, it is always invoked by the Trusted Bus.
The signature for this method is async function(topic, msg, from)
, with fields:
topic: string
needed to multiplex a method for several topics.msg: string
received message, typically JSON-serialized.from: string
authenticated publisher name. Needed because the methodthis.$.security.getCallerFrom()
does not provide the real caller for internal methods.
Filtering of publishers is based on setting Trusted Bus policies for this method, or using the authenticated from
argument to change behavior.
Publish-subscribe and SharedMap are typically used together. Publish is best effort, and SharedMaps can be used to acknowledge important messages. Or a SharedMap can update the state of many CAs first, including code, and a later published message trigger actions using the new state.
Point-to-point messaging can be emulated with private topics. Point-to-point is the root of all evil regarding scalability. At the network level, it always creates an all-to-all communication pattern in actor systems. Try to abstract communication with higher level services, such as Publish-subscribe or SharedMap, and limit point-to-point to where it is strictly necessary.
Let's show an example:
const caf = require('caf_core');
const ADMIN_CA = 'admin';
const ADMIN_CHANNEL = 'myNews';
const isAdmin = function(self) {
const name = self.__ca_getName__();
return (caf.splitName(name)[1] === ADMIN_CA);
};
const primaryChannel = function(self) {
const name = self.__ca_getName__();
return caf.joinName(caf.splitName(name)[0], ADMIN_CA, ADMIN_CHANNEL);
};
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
this.$.pubsub.subscribe(primaryChannel(this), '__ca_handleMessage__');
return [];
},
async __ca_pulse__() {
if (isAdmin(this)) {
this.state.counter = this.state.counter + 1;
this.$.pubsub.publish(primaryChannel(this),
`Counter: ${this.state.counter}`);
}
return [];
},
async __ca_handleMessage__(topic, msg, from) {
this.$.log && this.$.log.debug(`Got ${msg} from ${from}`);
this.$.session.notify([msg]);
return [];
}
};
caf.init(module);
The admin
CA periodically publishes a counter, which any other CA with the same owner subscribes to. The response to a received message is to queue a client notification.
Note that the pubsub
plugin is transactional, and respects external consistency. This means that when __ca_pulse__
publishes a new counter, the checkpointed state of this CA already reflects the new counter.
IoT Device
In this section we cover the code that you write under the ./iot
directory. Bridging code that runs in the browser, or in a Raspberry Pi, or in an ARM container during development. See Simulated Device for an introduction.
Caf.js Lite
The platform running your IoT device code is a simplified version of the one that runs CAs. We call it Caf.js Lite.
There is at most one CA Lite instance per Caf.js Lite platform. The long term state of this instance is managed by a Companion CA in the Cloud.
Caf.js Lite does not checkpoint. When a device restarts, it first resets the CA Lite state, and then, it contacts its Companion CA for instructions and data.
Without local long term state, consistency is less of a concern. And some plugins are not transactional, interacting directly with the device to improve latency.
We kept some CA features. Method execution serialized by a queue. An error rollbacks the changes to this.state
. Hierarchies of components configured with JSON descriptions. Plugins accessed with this.$
. Support for SharedMaps...
In Caf.js Lite, configuration is simpler. There is only one JSON file, iot.json
, which merges the functionality of framework.json
and ca.json
.
We also made explicit that Caf.js Lite is not Caf.js by renaming some methods. For example, paying homage to Arduino, __ca_init__
becomes __iot_setup__
, and __ca_pulse__
is now __iot_loop__
.
Complete examples are in GitHub.
Let's look at a "Hello World" example first.
The CA Lite code in iot/lib/iot_methods.js
const caf_iot = require('caf_iot');
exports.methods = {
async __iot_setup__() {
this.state.counter = this.toCloud.get('counter') || 0; return [];
},
async __iot_loop__() {
const msg = this.fromCloud.get('msg') || 'Counter:'; this.$.log && this.$.log.debug(msg + this.state.counter);
this.state.counter = this.state.counter + 1;
this.toCloud.set('counter', this.state.counter); return [];
}
};
caf_iot.init(module);
and the Companion CA implementation in lib/ca_methods.js
:
const caf = require('caf_core');
exports.methods = {
async __ca_init__() {
return [];
},
async setMessage(newMsg) {
const $$ = this.$.sharing.$;
$$.fromCloud.set('msg', newMsg); return this.getCounter();
},
async getCounter() {
const $$ = this.$.sharing.$;
return [null, $$.toCloud.get('counter')]; }
};
caf.init(module);
What are this.toCloud
and this.fromCloud
? Two SharedMaps that every CA Lite has. They are both managed in the Cloud by the Companion CA, but by convention:
this.toCloud
can only be written by the CA Lite, and read by both the CA Lite and its Companion CA.this.fromCloud
written by the Companion CA, and read by the CA Lite.
How does it work? After processing a request, the CA Lite syncs its state with the Companion CA. Local changes to this.toCloud
are uploaded, and then applied to its primary SharedMap in the Cloud. The local replica of this.fromCloud
gets updated too.
By having both primaries in the Cloud, they are always checkpointed, and managed with transactions. And sensor data from one device can be easily shared by creating more replicas of this.toCloud
.
Syncing every request with the Cloud is sometimes too costly. Skip the sync with the option {noSync: true}
for cron
tasks, or for requests explicitly created with the this.$.queue
plugin.
Going back to the example. During initialization, the last counter
value is read from this.toCloud
. Authorized clients can also get it with the getCounter()
method, which reads from the same this.toCloud
.
In the other direction, the value of msg
is set by the Companion CA in setMessage()
, which writes to this.fromCloud
. After sync, the device changes the log message in __iot_loop__
, also using msg
.
Companion CA Hooks
const caf = require('caf_core');
exports.methods = {
async __ca_init__() {
// methods called by the iot device
this.state.trace__iot_sync__ = '__ca_traceSync__';
this.state.trace__iot_resume__ = '__ca_traceResume__';
return [];
},
// called when the device syncs state
async __ca_traceSync__() {
const now = (new Date()).getTime();
this.$.log.debug('Syncing!!:' + now);
return [];
},
// called when the device reconnects
async __ca_traceResume__() {
const now = (new Date()).getTime();
this.$.log.debug('Resuming!!:' + now);
return [];
}
};
caf.init(module);
The Companion CA can schedule an action when the CA Lite connects, or syncs state with it. Just define two hook methods, typically called __ca_traceSync__
and __ca_traceResume__
.
Cron Tasks
const caf_iot = require('caf_iot');
const util = require('util');
const setTimeoutPromise = util.promisify(setTimeout);
exports.methods = {
async __iot_setup__() {
const options = {noSync: true};
this.state.counter = this.toCloud.get('counter') || 0;
this.$.cron.addCron('helloCron', 'greetings', ['Hello:'], 2000, options); this.$.cron.addCron('byeCron', 'greetings', ['Bye:'], 3000, options); return [];
},
async __iot_loop__() {
const msg = this.fromCloud.get('msg') || 'Counter:';
this.$.log && this.$.log.debug(msg + this.state.counter);
this.state.counter = this.state.counter + 1;
this.toCloud.set('counter', this.state.counter);
return [];
},
async greetings(greet) { const now = (new Date()).getTime(); this.$.log && this.$.log.debug(greet + now); return []; }};
caf_iot.init(module);
It is easy to create periodic tasks with the cron
plugin. This example creates two sequences of log entries, using two different prefixes. The first argument to addCron()
is the name of the cron
, needed to delete it. In this case, only __iot_loop__()
syncs with the Cloud.
Note that JavaScript is a garbage-collected language, and it cannot provide hard real-time guarantees. However, in a lightly loaded Raspberry Pi 4, it is reasonable to expect most of the time interval time variation within 10 msec, for an interval longer than 100 msec.
If you need better, you could use the cron
task to configure a microcontroller, well in advance of triggering the action.
Error Handling
const caf_iot = require('caf_iot');
const myUtils = caf_iot.caf_components.myUtils;
const util = require('util');
const setTimeoutPromise = util.promisify(setTimeout);
exports.methods = {
async __iot_setup__() {
this.state.counter = this.toCloud.get('counter') || 0;
this.$.cron.addCron('crashCron', 'crash', [], 5000); return [];
},
async __iot_loop__() {
const msg = this.fromCloud.get('msg') || 'Counter:';
this.$.log && this.$.log.debug(msg + this.state.counter);
this.state.counter = this.state.counter + 1;
this.toCloud.set('counter', this.state.counter);
return [];
},
async crash() { await setTimeoutPromise(100); throw new Error('Oops'); }, // delete this method and see how error handling changes async __iot_error__(error) { this.$.log && this.$.log.warn('Got error ' + myUtils.errToPrettyStr(error)); // try `return [error]` and see what happens return []; }};
caf_iot.init(module);
The default error handling strategy in Caf.js Lite is to log the error, and then do a full reset.
We can change this behavior by adding __iot_error__()
, an error handler method.
If __iot_error__()
propagates the error in the returned array tuple, Caf.js Lite does a full reset, as before. But, when the error is not returned, it is ignored by the runtime, giving you full control.
Cloud Client
A device plugin called cloud
wraps the standard Client Library.
A CA Lite can call external methods of the Companion CA using this plugin.
It can also receive notifications, using the default session name iot
, which can be changed in iot++.json
.
The default behavior for handling a notification is to sync state. However, using the registerHandler()
method, we can add our own handler. This handler typically creates, using the queue
plugin, a local request to process the notification.
Let's look at an example:
The CA Lite code in iot/lib/iot_methods.js
const caf_iot = require('caf_iot');
exports.methods = {
async __iot_setup__() {
this.$.cloud.registerHandler((msg) => { const args = this.$.cloud.getMethodArgs(msg); this.$.queue.process('greetings', args); }); return [];
},
async greetings(msg) {
const now = (new Date()).getTime();
this.$.log && this.$.log.debug(msg + now);
try {
const value = await this.$.cloud.cli.getCounter().getPromise(); this.$.log && this.$.log.debug('Got ' + value);
return [];
} catch (err) {
return [err];
}
},
async __iot_loop__() {
const now = (new Date()).getTime();
this.$.log && this.$.log.debug('loop:' + now);
return [];
}
};
caf_iot.init(module);
and the Companion CA implementation in lib/ca_methods.js
:
const caf = require('caf_core');
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
this.state.msg = 'foo:';
return [];
},
async __ca_pulse__() {
this.state.counter = this.state.counter + 1;
this.$.session.notify([this.state.msg], 'iot'); return [];
},
async setMessage(newMsg) {
this.state.msg = newMsg;
return this.getCounter();
},
async getCounter() {
return [null, this.state.msg + this.state.counter];
}
};
caf.init(module);
The Companion CA periodically creates a notification with session name iot
.
This notification is handled by the CA Lite by calling greetings()
. This method then calls back the Companion CA to read the current counter.
Timed Bundles
Warning: do not use Caf.js for safety critical applications.
Caf.js IoT Device was originally created to control drones using the Cloud. It was fun, and we learned a few things the hard way:
- Losing connectivity, or an increase in latency, meant that a crash was likely.
- Accurate coordination of more than a few drones was difficult.
To improve reliability and scalability we did four changes:
- Modify the drone firmware to run Caf.js Lite locally.
- Group actions, with relative timing between them, into bundles.
- Start executing a bundle with Coordinated Universal Time (UTC), not just when it arrives. Keep the drone clock synchronized with the Cloud.
- Execute actions from at most one bundle at any time.
By bundling actions, once the first action started, the others will follow, regardless of connectivity.
Therefore, a bundle could always leave the drone in a safe state by adding late recovery actions. When the next bundle executes on time, the previous recovery actions are ignored. But, when it does not, the custom recovery actions will take over.
And firing bundles with UTC time meant that the Companion CA could pipeline them, caching them in the drone before they were needed, providing smooth movement. But, at the same time, a new bundle could quickly change drone direction, invalidating the previously cached bundle.
Moreover, using UTC time meant that we could now coordinate drone actions across the world, and scale to millions. We just needed millions of Companion CAs, using SharedMaps to share state, and Publish-Subscribe to coordinate, and a second or two of early warning to customize and propagate the bundles.
You can read all about our world domination ambitions here. The drones are coming.
In a more serious note, let's show how to code world domination.
The CA Lite code in iot/lib/iot_methods.js
const caf_iot = require('caf_iot');
exports.methods = {
async __iot_setup__() {
return [];
},
async down(speed) {
const now = (new Date()).getTime();
this.$.log && this.$.log.debug(`vvvvvvvvDown: ${now} speed: ${speed}`);
return [];
},
async up(speed) {
const now = (new Date()).getTime();
this.$.log && this.$.log.debug(`^^^^^^^^Up: ${now} speed: ${speed}`);
return [];
},
async recover(msg) {
const now = (new Date()).getTime();
this.$.log && this.$.log.debug(`RECOVERING: ${now} msg: ${msg}`);
return [];
},
async __iot_loop__() {
const now = (new Date()).getTime();
this.$.log && this.$.log.debug(`loop: ${now}`);
return [];
}
};
caf_iot.init(module);
and the Companion CA implementation in lib/ca_methods.js
:
const caf = require('caf_core');
// TRY: reduce the margin to <10msec and see how bundles arrive late
const MARGIN = 100;
exports.methods = {
async __ca_init__() {
this.state.counter = 0;
this.state.msg = 'foo:';
this.state.maxAcks = 1; return [];
},
async __ca_pulse__() {
if ((this.state.acks && (this.state.acks.length > 0) && (!this.state.acks[0].result))) { this.$.log && this.$.log.debug('Last bundle was late');
}
this.state.counter = this.state.counter + 1;
const bundle = this.$.iot.newBundle(MARGIN); // TRY: kill the server, and the device eventually executes `recover` bundle.down(0, [1]).up(300, [1]).recover(5000, ['go home']); this.$.iot.sendBundle(bundle); // `notify` improves responsiveness. //TRY: comment the following line, and see how bundles arrive late this.$.session.notify([this.state.counter], 'iot'); return [];
},
async setMessage(newMsg) {
this.state.msg = newMsg;
return this.getCounter();
},
async getCounter() {
return [null, this.state.msg + this.state.counter];
}
};
caf.init(module);
How to create a Timed Bundle? The plugin iot
provides a newBundle
method, with an argument to customize the expected propagation time for the bundle. Similar to the construction of a session with the Client Library, the plugin introspects the code in iot_methods.js
, and adds methods for creating individual actions. Relative timing is always in milliseconds.
The sendBundle()
method freezes the starting UTC time of the bundle, and makes it available to the CA Lite. It can also take a starting time offset, to be added to the propagation time.
The CA Lite will get the bundle the next time it syncs. It is wise to force it to sync asap, by sending a notification to the iot
session. See the Cloud Client previous section for details.
Late bundles are ignored. The Companion CA learns that the last bundle was late by setting this.state.maxAcks
to one, and periodically reading this.state.acks[0].result
. In robust applications, it will dynamically adjust the propagation margin when that happens.
In some cases we want to execute a bundle when it arrives. For example, in an emergency situation. We can force this behavior by adding the argument this.$.iot.NOW
to sendBundle()
.
The problem with this.$.iot.NOW
is that a month-old bundle, could still trigger an action when we turn on the device. It is recommended to use instead this.$.iot.NOW_SAFE
, which expires the bundle after a reasonable time limit. The default is ten minutes.
Learn More
Connect to Bluetooth devices using the gatt plugin. Switch GPIO pins with gpio. Explore Caf.js Lite APIs here.