Skip to content

Building Controllers & Creating Entities

Preliminary/Draft

This documentation is in very preliminary draft form. Aside from the obvious formatting and consistency issues, it is likely to contain errors.

A "controller" in Reactor parlance is an object that acts as a conduit and translator for data from a remote API or system, and publishes Reactor entities containing the data for use throughout the system.

This section describes the Controller class and provides rules and reference material for it and its supporting classes that you may also need.

Quite literally, a controller is a subclass of the Controller class. This abstract base class defines interfaces and behaviors, and implements default behaviors that would be expected of most subclasses. To create a new controller subclass, one must merely extend the base class and provide the necessary implementation methods.

The following are the expected behaviors and rules for Controller subclasses:

  • It must implement the start() method, which must return a Promise that resolves when the controller successfully starts. "Starting" does not necessarily mean having successfully published all entities that may be available, but rather that the controller is sufficiently initialized and has start the work of publishing those entities. The start() Promise may resolve before publishing any entities, or wait until all have been published.
  • It must implement the stop() method, and stop any and all processes, disconnect from any persistent connections, and dispose of all acquired objects and data.
  • It may implement the run() method, if periodic calls to this method are convenient for the implementation of the controller. The run() method is the executive of a timer tick that can be controlled by the subclass. If not needed, the default (empty) implementation can be left for use.
  • It must call the base class online() method when it is functioning normally. This call must be made at least once, but may be called as often as needed, even if redundant.
  • It must call the base class offline() method when it can cannot publish updates for its entities for whatever reason. It may briefly delay calling offline(), to allow an opportunity to quickly reconnect a lost connection. It should not allow more than 60 seconds by default, however.
  • It must use the getEntity() method of its base class to create a new entity (i.e. an instance of the Entity class). This will ensure that the Entity object is properly tracked in the system.
  • It must apply only known, declared capabilities and attributes (i.e. those than can be successfully queried from the Capabilities class/object), or create extension capabilities in the x_ namespace (more on that below).
  • It should, as much as possible, use configuration to drive the translation of source objects to Reactor entities, so that user configuration of new or unexpected source objects might be possible.
  • It must follow further implementation rules for individual class methods and objects as further described in the reference section.

Packaging Controllers

The best way to package your controller is as a standard npm package. As a convention, "official" Reactor controllers use reactor- as the prefix for the package name; for community-developed/contributed controllers, please use reactor-contrib- as a prefix. For example, reactor-contrib-modbuscontroller.

There are multiple tutorials online on how to build an NPM package, so those details won't be repeated here. But there are a couple of specifics you need to know:

  • The main key in your package.json file must point to the controller class file.
  • The controller class file should be named using the Reactor Controller convention: Controller (e.g. VeraController, HubitatController, or building on the example above, ModbusController.
  • Your package must specify all dependencies not otherwise available in the default Reactor execution environment (basically express, node-fetch, and ws).

Please publish your package to npmjs, so that customers can install it by running npm i <package-name> (ideally, in their home directory, rather than in the Reactor install directory, so that the package install survives Reactor upgrades).

Class Reference

NOTE: Any class not described in this section is considered a private system implementation class and must not be used. The behaviors, APIs, names, and future existence of these private classes is not guaranteed. Use only the classes documented below, in the manner prescribed.

class Controller

Controller is the base class for all controllers in Reactor.

Controller implements the Observer and Observable interfaces (see below).

constructor()

The subclass constructor must call the superclass constructor, passing all arguments received in the same order received. Thereafter, it may declare and initialize any data it requires for its implementation. It should not, however, initiate any communication or begin its entities (except that the superclass constructor will create some system-defined entities by default, and this is normal/allowed).

The constructor is not called directly. It must only be called by the Controller.getInstancePromise() method, which itself must only be called by the system's Structure object.

async start()

Arguments: None

Returns: Promise

The start() method is called by the Structure when a configured Controller subclass object is required to start. This may be at any time after its creation by getInstancePromise().

  • Subclasses must override this method.
  • Subclasses do not need to call super.start().
  • Subclass overrides must return a Promise.

This method is expected to take all necessary steps to initiate communications with remote APIs, or do any other work needed to begin the process of publishing entities. It must return a promise that resolves when successful. It may define success as completing publishing all known entities, or it may at any other time, such as following an asynchronous HTTP request but before the response is received and parsed.

It is recommended that all expected entities be published if possible and the time required to do so is brief (a few seconds). If the process may be protracted and lengthy, the Promise should resolve earlier and the entities published as discovered.

async stop()

Arguments: None

Returns: None

The stop() method will be called by the Structure when the application is shutting down.

Subclasses may override this method. If overridden, the override:

  • must return a Promise;
  • must properly clean up and release/destroy all objects, private data, etc.
  • must call the superclass method, and should either await its completion, or if final in the method, return the Promise returned by the superclass method.

init()

Arguments: None

Returns: None

The init() method will be called before a scheduled timing delay period begins. The default implementation does nothing, but subclasses may override this method to perform any tasks required prior to going into a period of "sleep", effectively.

run()

Arguments: None

Returns: None

The run() method is called when a placed time delay (see startDelay(), startInterval(), and startDaily()) is met. The default implementation is empty. Subclasses are only required to provide an implementation if the Controller (base class) delay mechanism is used.

NOTE: I'm considering deprecating the Controller delay/timer mechanism, which is a vestige of the foundation layer's history as part of my dashboard. I'm thinking it's better to use Timer objects, and if necessary/convenient, the TimerBroker.

notify( event )

Arguments: event - an classless object containing the event data. ???Needs more description at some point.

Returns: None

The notify() method satisfies the implementation of the Observer interface (see below). This method is called when a subscribed Observable object publishes a message.

The default implementation simply propagates the event to any observers of the controller (usually the Structure, but also other system objects such as the WebSocket API). Since the controller is by definition subscribed to all of its managed entities, any update to an entity results in a notification to the controller, which is then passed up by this method. Thus a subsystem may watch any or all entities of a controller without having to know they exist and subscribe to each individually.

This method is not normally overridden. It's default implementation is usually sufficient for all purposes. If, however, a subclass finds reason to override, the override must call the superclass method. This is vital to proper event processing for the controller's entities; any override should take special care that the superclass method is called no matter what kind of error occurs otherwise in the subclass override (e.g. in finally of an all-enclosing try block).

getSystemEntity()

Returns the Controller's system entity.

findEntity( id )

Arguments: id - (string) the ID of the entity to be found.

Returns: the located Entity object, or undefined

This method finds and returns the Entity with the given ID. The Entity must be owned by the controller (this). The argument is not a canonical ID, but rather the controller-local ID. If no entity matching is found, undefined is returned.

This method must not be overridden.

This method must be called in preference to accessing this.entities from the base class directly.

getEntity( className, id )

Arguments:
className - (string) name of Entity class to use if the object needs to be created;
id - (string) local ID of the Entity class to be found, or assigned if created.

Returns: the found/created Entity object.

This method is the only valid way to create an Entity object, and must be used exclusively for that purpose. Direct construction of Entity objects is not permitted.

If the entity is created, the new object is automatically added to the controller's list of managed entities, and the controller is subscribed to the entity. The new entity also has change events deferred, so your implementation must at some point call entity.deferNotifies( false ) to reverse this. See Entity below.

removeEntity( entity )

Arguments:
entity - (Entity) the Entity object to be removed.

Removes the specified entity and destroys it. The entity will be unsubscribed from all Observers in the process.

performOnEntity( entity, actionName, parameters )

Arguments:

entity - (Entity object) the entity on which to perform the action;

actionName - (string) the action name, in "capability.action" form (e.g. "power_switch.on")

parameters - (classless object, optional) the parameters required by the action

Returns: Promise

This action, which may be overridden by subclasses, is expected to do what is necessary to perform the requested action on the specified entity. This method must return a Promise that resolves when the action request has been sent to the remote system. It may defer settling the Promise until the request is acknowledged or not by the remote system (recommended); it further may defer settling until a remote service completes the request and returns its status or update (not recommended).

Subsystems outside of the Controller branch must not call this method. The correct way to perform an action on an entity is to call perform() on the entity itself (see Entity below).

By default, the base class implementation will look for a method named action_actionname( entity, parameters ), where actionname is in the form "capability_action" (the usual dot replaced with underscore). If found, this method will be called with only entity and parameters (the action name is implied by the method name).

NOTE: The default implementation is vestige of the foundation of the old dashboard. In practice, I think most subclasses will override this method and do configurable mapping implementations, such as those done for VeraController, HassController, and HubitatController, rather than hard-coding methods. While I could remove it and require subclass override, the default implementation is actually really handy for debugging when one first starts writing a Controller subclass, I think, so I'm leaving it for now.

getID()

Arguments: None

Returns: (string) the ID of the Controller, as given in the controllers configuration section.

This method must not be overridden.

toString()

Arguments: None

Returns: (string) returns a string with the subclass name and controller ID (e.g. "HassController#home")

This method must not be overridden.

getConfig()

Arguments: None

Returns: (classless object) the configuration object passed to the Controller when it was created. This must be used in preference to accessing this.config (Controller private data) directly.

This method must not be overridden.

getLogger()

Arguments: None

Returns: (Logger) the logger established for the controller.

This method must not be overridden.

getStructure()

Arguments: None

Returns: (Structure) the structure object that created the controller.

This method must not be overridden.

isReady()

Arguments: None

Returns: (boolean) whether the controller is currently online.

This method must not be overridden.

extract()

Arguments: None

Returns: (classless object) an object representation of the class data

This method is used by the various APIs to transmit the controller data to a client. It creates a classless-object representation of the controller's data.

This method must not be overridden.

serialize()

Argument: None

Returns: (string) JSON string representing the controller data.

This method is used to create a JSON representation of the controller. It is effectively implemented by returning JSON.stringify( this.extract() ).

This method must not be overridden.

isRunning()

Arguments: None

Returns: (boolean) true if a timer is in effect; false otherwise;

This method must not be overridden.

online()

Arguments: None

Returns: Nothing

This method is used by Controller subclasses to signal the base class that the subclass is operational, or not.

This method must be called by a subclass at least once. This method must not be overridden.

offline()

Arguments: None

Returns: Nothing

This method is used by Controller subclasses to signal the base class that the subclass is not operational (e.g. it has lost a connection to a remote data source, etc.).

Requirements:

  • This method may be called when communications or other source strategies fail;
  • This method may be called after a reasonable number of retries/small delay during retries;
  • This method must not be overridden.

startDelay( milliseconds )

This method starts a delay of the specified number of milliseconds. After at least that many milliseconds (i.e. timing not guaranteed), the run() method will be invoked. Thereafter, no timer will be in effect. The implementation of the run() method may call startDelay() again, or another timer function, to reinstate timing.

The Controller timing mechanism supports only one timer at a time. For this reason, this mechanism is deprecated. Use Timer objects, and optionally TimerBroker, instead.

This method must not be overridden.

startInterval( milliseconds )

Starts a timer that calls run() every milliseconds, repeatedly. The interval can be cancelled by calling stopTimer().

The Controller timing mechanism supports only one timer at a time. For this reason, this mechanism is deprecated. Use Timer objects, and optionally TimerBroker, instead.

This method must not be overridden.

startDaily()

DEPRECATED -- Do not use.

This method must not be overridden.

stopTimer()

This method stops any running timer. If not timer is running, it returns without action or error.

This method must not be overridden.

tick()

This is an internal private implementation method.

This method must not be overridden.

async fetchText( url [, options] )

Arguments: url - URL to be fetched; options - (optional) object containing option key/value pairs.

Returns: Promise that resolves to response text.

This is a convenience method to help streamline subclass implemenations. It returns a Promise that resolves when the specified url has been fetched. The resolution object is a text string containing the entire response from the server.

async fetchJSON( url, [, options] )

Arguments: url - URL to be fetched; options - (optional) object containing option key/value pairs.

Returns: Promise that resolves to an object containing the parsed JSON response.

This is a convenience method to help streamline subclass implemenations. It returns a Promise that resolves when the specified url has been fetched. The resolution object is the parsed JSON data of the response.

getNotificationURL()

Arguments: None

Returns: (string) URL at which the application can receive HTTP requests from a remote system and pass them to the controller instance.

The system offers a method by which, if necessary, a remote system may send a message to a Controller instance asynchronously via HTTP. This method returns the constructed URL at which the system is prepared to receive such requests. All request methods are allowed and passed through. See getNotificationMiddleware(), below.

This method must not be overridden.

getNotificationMiddleware()

Arguments: None

Returns: (array) An array of Express-compatible middleware functions to be used in handling the request.

The HTTP API and Controller subsystems provide a mechanism by which a remote endpoint may initiate a connection to the Reactor HTTP API and send a message to a specified controller. The URL at which this is done is created dynamically and must be fetched by calling getNotificationURL(). When the HTTP API receives a request at this URL (any method), it determines which controller the message is for, and then calls that controller's getNotificationMiddleware() method. The default (superclass) implementation simple returns boolean false, indicating that the request should not be processed (a 404 is returned to the requestor). Otherwise, the return value is expected to be an array of Express-compatible middleware used for processing and handling the request.

Without going into too much detail about the operation of Express (refer to its documentation separately), the array of middleware that is returned is run in sequence until one of the middleware functions handles the request. Typically, a subclass method is the last element in the array, in the form this.subclass_method_name.bind(this). It may be preceded by one or more other middleware methods, in particular, any of the Express-included middleware. For example, a common set for processing a JSON payload posted from a remote system might be [ express.json(), this.subclass_method_name.bind(this) ].

Since the subclass method follows the design pattern of Express middleware, it receives three arguments: request, response, and next. The request argument is the Express Request object for the request. The body of the request can be accessed using request.body(). The response argument is the Express Response object. It must be used, at a minimum, to define the response to the remote endpoint: response.status(200).send("OK"). The next argument is a function that can be called if the current middleware function wishes to allow any subsequent middleware in the array to be run (i.e. the request may have been modified, but is not completely handled). Usually, Controller subclass implementation middleware functions will be last in the array, and thus may ignore and not call next(), which indicates that it has handled the request and no further work other than sending the required response is to be done.

For reference, look at the implementation of HubitatController, which is the reason this mechanism exists.

This method must not be overridden.

getInstancePromise()

This method is used to create an instance of a Controller subclass. It is used only by the Structure object, and must not be used otherwise.

This method must not be overridden.

fetchText( url, options )

This method is used to fetch data from a remote URL. It returns a Promise that resolves when the query succeeds (or rejects if it fails). When successful, the Promise resolves to a String or Buffer containing the response data.

This convenience method should be used in preference to calling node-fetch or similar libararies directly.

fetchJSON( url, options )

This method is used to fetch JSON from a remote URL. It returns a Promise that resolves when the query succeeds, as for fetchText(). The data passed to the then() handler is the parsed JSON data received.

This convenience method should be used in preference to calling node-fetch or similar libararies directly.

WebSocket Subsystem

The WebSocket subsystem provides additional methods on Controller to manage a single connection to an endpoint. It simplifies the process of creating and managing WebSockets in Controller implementations, handling connection and upgrade, ping/pong, errors, and closure. Received messages are passed to a subclass method for handling.

The async ws_open( url, options ) method is used to open a WebSocket connection to a given URL. It returns a Promise that resolves when the connection succeeds, or rejects if the connection fails. The options is an object (dictionary) that currently has only two keys available: connectTimeout — the maximum time (ms) to attempt connection; and pingInterval — the maximum interval between pings on the connection. This method must not be overridden.

The async ws_close( code, reason ) method can be called by the implementation to close the WebSocket. This would normally only be done as part of a stop() implementation. Otherwise, the Controller base class manages the connection. This method returns a Promise that resolves when the connection is fully closed. This method must not be overridden (see ws_closing() below if you need to do things when the socket closes).

The more urgent ws_terminate() is also available, which summarily slams the connection shut with no fanfare. This method must not be overridden.

The ws_send( payload ) method of the base class can be called to send payload data (String or Buffer) to the endpoint. This method must not be overridden.

The ws_connected() method returns true if the WebSocket is currently connected; false otherwise.

Your Controller subclass must implement a ws_message( data ) method, which is called when data is received on the socket. No interpretation or transformation of the data is performed; it is handed to your implementation exactly as it is received.

You may also implement ws_closing( code, reason ), which is an advisory method to notify your subclass that the connection is being closed. The connection can close at any time for any reason. The most typical use of this method is to start a delay timer that, when expired, attempts reconnection to the endpoint, which is not automatic. The state of the connection when called is undefined, so implementations must not send any final messages or perform any other functions (e.g. ping) on the WebSocket connection.

Note

It is very common for this.offline() to be called in response to a close, so that other parts of the system know that the controller is offline. Often, however, some hysteresis is desirable, and the call to offline() is made elsewhere when several consecutive attempts to reconnect have failed.

Entity

Entity implements Observable and publishes entity-changed events when its properties or attributes are modified. The entity-delete event is sent before an entity is destroyed.

getCanonicalID()

deferNotifies( state )

Every change to an Entity causes an event to be published, by default. But typically, during inventory of a hub by a Controller subclass, for example, an entity will need to have many attributes and properties set, and it would be undesirable to have a large number of events sent. When deferNotifies() is called with true, publication of change events is suspended until deferNotifies() is called with false later (at which point, a single change event is sent, if changes were made).

deferNotifies() returns the current state as the value of the function. This can be used to pass to a later deferNotifies() call in situations where there may be nest functions, each of which may or may not defer notifications based on their implementations.

getName()

setName( name )

extendCapability( capability_name )

hasCapability( capability_name )

setAttribute( attribute_name, new_value )

getAttribute( attribute_name )

hasAttribute( attribute_name )

setPrimaryAttribute( attribute_name )

setPrimaryValue( new_value )

enumAttributes()

registerAction( name, func )

hasAction( name )

perform( actionName, parameters )

Performs the named action on this Entity with the given parameters. Returns a Promise. Subsystems outside of Controller implementations must call this method and must not call Controller.performOnEntity().

TaskQueue

TaskQueue implements a queue of actions to performed at a limited pace. It is often used with Controllers when the hub is incapable of managing a large number of requests in a short period of time.

constructor()

Returns a new TaskQueue.

start()

Start the TaskQueue. This must be called at least once to get things moving. Once started, the TaskQueue will run enqueued tasks as they come, subject to pacing restrictions (see setPace() below).

stop()

Stop the TaskQueue. This may be called, and when used, is often done in the stop() implementation of a parent Controller. Any queued tasks not yet running fail immediately.

finish()

Returns a Promise that resolves when all queued tasks complete.

setPace( milliseconds )

Set the pace at which tasks may execute (i.e. not less than milliseconds apart). If the queue contains multiple tasks, the next task may be delayed.

enqueueTask( func [ , opts [ , ...args ] ] )

Enqueues the given function as a task to be run. The function will be called with the listed args (if any). Returns a Promise that resolves when the task completes, or rejects if the task throws an error.

The opts object currently only has one key, name, which may be set to a name identifying the task (helps debugging). If not given, the name is the task ID, which is internally generated.

Observable

Observable is an interface (mixin) class that can be applied to an object, indicating it sends events.

Use util.mixin() to apply Observable. Note that it is not necesary to apply Observable to your Controller subclass because it is inherited from Controller.

publish( data, eventType, ...args )

Publishes an event with the given eventType and data. Most objects will simply publish this as the data (i.e. they publish themselves to announce they have changed), but the data can be anything, or nothing.

disconnect()

Removes all observers for this object. Usually called when the object is about to be destroyed.

Observer

Observer is an interface (mixin) class applied to an object to indicate it can receive events from Observable objects.

Use util.mixin() to apply Observer. Note that it is not necessary to apply Observer to your Controller subclass because it is inherited from Controller.

notify( event )

Called by the events subsystem to pass an event from an Observable to which this Observer has previously subscribed. The event is a structure/object containing keys type, data, and sender.

propagate( event )

Typically used in an Observer's notify() implementation when the Observer itself is Observable, propagate() passes an event up the event hierarchy.

subscribe( observable [, ident ] )

Subscribes this Observer to an Observable. The optional ident may be used to manage multiple subscriptions to the same Observable for different purposes (it is little-used).

unsubscribe( [observed] )

Unsubscribe from the specified Observable, or if not provided, all objects.

TimerBroker

TimerBroker is a singleton that manages multiple timers for a subsystem.

getTimer( id, resolution, ...args )

releaseTimer( id_or_timer )

Timer

A Timer is an Observable object that will send a timer-trigger event to its observers when the time is met.

constructor( ident [, callback ] )

If the callback is given, it is called in addition to sending a timer-trigger event to observers.

delayms( milliseconds )

at( absolute_time )

cancel()

destroy()

Logger

static getInstance( id [, parent ] )

info( msg, ... )

notice( msg, ... )

warn( msg, ... )

err( msg, ... )

exception( error_object )

debug( level, msg, ... )

always( msg, ... )

Semaphore

constructor( initialValue )

Constructs a new counting semaphore1 with the given initial value (integer >= 0).

acquire( [block] )

Each call to acquire() decrements the semaphore's counter and returns a Promise. If the counter is greater than 0 prior to decrementing, the Promise resolves; otherwise, the Promise waits, effectively blocking the calling process, until another process that has previosly acquired the semaphore releases it. The data passed to the resolve function is a release function that must be called to advise the semaphore that you are done with it. Great care must be taken all acquired semaphores are released, or deadlocks may result.

If block is given and false, the request will not block, but rather reject the returned Promise.

isLocked()

Returns true if the semaphore is currently locked (has count <= 0).

count()

Returns the semaphore's current count.

wait( timeout )

Like acquire(), but will time out after timeout milliseconds if the semaphore cannot be acquired (the returned Promise is rejected in this case).

drain()

"Drains" the semaphore by settling all waiting Promises and restoring the semaphore's count to its initial value.

Mutex

Implements a mutual-exclusion lock. This is a special-case Semaphore that prevents execution of a protected code block by more than one thread of execution simultaneously.

constructor()

Makes a new lock.

acquire()

Acquires the lock. A promise is returned that resolves when the lock is acquired. The data passed to the resolver is a release function that must be called when the code if finished with the lock. Care must be taken that the lock is released under any and all conditions, or deadlocks will result.

isLocked()

Returns true if the lock is currently locked (i.e. calling acquire() would block).

release()

Releases the lock unconditionally. It is not recommended that this method ever be called; it is provided only for exigent circumstances (that I have not yet identified concretely, so this method may be removed in future).

Configuration

Capabilities

util

getUID( [ prefix ] )

Returns a (string) unique identifier with the given prefix (if any). The string is guaranteed to be unique as long as the system time does not slew backwards significantly (and even if it does, a restart of Reactor would have to occur during the overlap period for any risk of duplication -- not likely).

shallowCopy( object )

Creates a shallow copy of the passed object.

clone( object )

Creates a deep copy of the passed object.

hasAnyProperty( object )

Returns true if the given object has any (enumerable) properties (e.g. hasAnyProperty( {} ) returns false ).

deepCompare( val1, val2 )

Compares val1 and val2 for semantic equality. Both must have the same type, and may be objects.

mixin( baseClass, mixinClass )

Applies mixinClass (which must be a class) to baseClass: all methods of mixinClass except its constructor are applied to baseClass, if they are not already defined.

coalesce( ... )

Returns the first non-undefined, non-null, non-NaN argument, or null.

isEmpty( string )

Returns true if the given string is undefined, null, or the empty string ("").

loadData( path, fileroot, sysroot )

async asyncForEach( array, callback )

TimedPromise( implementation, timeout )

sequencePromises( implementationArray )

Given an array of functions that take resolve, reject arguments (i.e. Promise implementations), runs the implementations (as Promises) sequentially. Returns a Promise that resolves when all are complete, or rejects if any fails.

promiseWrapper( function, ...args )

promiseWrapperAsync( function, ...args )

http_fetch( url, req )

A wrapper for node-fetch to handle Digest authentication. Parameters are identical to that of node-fetch, with the following optional additions to req:

  • reactor_ignore_certs: boolean (default: false)
  • reactor_username: string
  • reactor_password: string
  • reactor_basic_auth: boolean (default: false)

Updated: 2022-Jan-13