Building Controllers & Creating Entities
Preliminary/Draft
This documentation is in very preliminary draft form. Aside from the obvious formatting and consistency issues, it is likely to contain errors.
A "controller" in Reactor parlance is an object that acts as a conduit and translator for data from a remote API or system, and publishes Reactor entities containing the data for use throughout the system.
This section describes the Controller
class and provides rules and reference material for it and its supporting classes that you may also need.
Quite literally, a controller is a subclass of the Controller
class. This abstract base class defines interfaces and behaviors, and implements default behaviors that would be expected of most subclasses. To create a new controller subclass, one must merely extend the base class and provide the necessary implementation methods.
The following are the expected behaviors and rules for Controller
subclasses:
- It must implement the
start()
method, which must return aPromise
that resolves when the controller successfully starts. "Starting" does not necessarily mean having successfully published all entities that may be available, but rather that the controller is sufficiently initialized and has start the work of publishing those entities. Thestart()
Promise may resolve before publishing any entities, or wait until all have been published. - It must implement the
stop()
method, and stop any and all processes, disconnect from any persistent connections, and dispose of all acquired objects and data. - It may implement the
run()
method, if periodic calls to this method are convenient for the implementation of the controller. Therun()
method is the executive of a timer tick that can be controlled by the subclass. If not needed, the default (empty) implementation can be left for use. - It must call the base class
online()
method when it is functioning normally. This call must be made at least once, but may be called as often as needed, even if redundant. - It must call the base class
offline()
method when it can cannot publish updates for its entities for whatever reason. It may briefly delay callingoffline()
, to allow an opportunity to quickly reconnect a lost connection. It should not allow more than 60 seconds by default, however. - It must use the
getEntity()
method of its base class to create a new entity (i.e. an instance of the Entity class). This will ensure that the Entity object is properly tracked in the system. - It must apply only known, declared capabilities and attributes (i.e. those than can be successfully queried from the
Capabilities
class/object), or create extension capabilities in thex_
namespace (more on that below). - It should, as much as possible, use configuration to drive the translation of source objects to Reactor entities, so that user configuration of new or unexpected source objects might be possible.
- It must follow further implementation rules for individual class methods and objects as further described in the reference section.
Packaging Controllers
The best way to package your controller is as a standard npm
package. As a convention, "official" Reactor controllers use reactor-
as the prefix for the package name; for community-developed/contributed controllers, please use reactor-contrib-
as a prefix. For example, reactor-contrib-modbuscontroller
.
There are multiple tutorials online on how to build an NPM package, so those details won't be repeated here. But there are a couple of specifics you need to know:
- The
main
key in yourpackage.json
file must point to the controller class file. - The controller class file should be named using the Reactor Controller convention:
Controller (e.g. VeraController, HubitatController, or building on the example above, ModbusController. - Your package must specify all dependencies not otherwise available in the default Reactor execution environment (basically
express
,node-fetch
, andws
).
Please publish your package to npmjs, so that customers can install it by running npm i <package-name>
(ideally, in their home directory, rather than in the Reactor install directory, so that the package install survives Reactor upgrades).
Class Reference
NOTE: Any class not described in this section is considered a private system implementation class and must not be used. The behaviors, APIs, names, and future existence of these private classes is not guaranteed. Use only the classes documented below, in the manner prescribed.
class Controller
Controller is the base class for all controllers in Reactor.
Controller implements the Observer and Observable interfaces (see below).
constructor()
The subclass constructor must call the superclass constructor, passing all arguments received in the same order received. Thereafter, it may declare and initialize any data it requires for its implementation. It should not, however, initiate any communication or begin its entities (except that the superclass constructor will create some system-defined entities by default, and this is normal/allowed).
The constructor is not called directly. It must only be called by the Controller.getInstancePromise()
method, which itself must only be called by the system's Structure
object.
async start()
Arguments: None
Returns: Promise
The start()
method is called by the Structure
when a configured Controller subclass object is required to start. This may be at any time after its creation by getInstancePromise()
.
- Subclasses must override this method.
- Subclasses do not need to call
super.start()
. - Subclass overrides must return a Promise.
This method is expected to take all necessary steps to initiate communications with remote APIs, or do any other work needed to begin the process of publishing entities. It must return a promise that resolves when successful. It may define success as completing publishing all known entities, or it may at any other time, such as following an asynchronous HTTP request but before the response is received and parsed.
It is recommended that all expected entities be published if possible and the time required to do so is brief (a few seconds). If the process may be protracted and lengthy, the Promise
should resolve earlier and the entities published as discovered.
async stop()
Arguments: None
Returns: None
The stop()
method will be called by the Structure
when the application is shutting down.
Subclasses may override this method. If overridden, the override:
- must return a Promise;
- must properly clean up and release/destroy all objects, private data, etc.
- must call the superclass method, and should either
await
its completion, or if final in the method, return the Promise returned by the superclass method.
init()
Arguments: None
Returns: None
The init()
method will be called before a scheduled timing delay period begins. The default implementation does nothing, but subclasses may override this method to perform any tasks required prior to going into a period of "sleep", effectively.
run()
Arguments: None
Returns: None
The run()
method is called when a placed time delay (see startDelay()
, startInterval()
, and startDaily()
) is met. The default implementation is empty. Subclasses are only required to provide an implementation if the Controller (base class) delay mechanism is used.
NOTE: I'm considering deprecating the
Controller
delay/timer mechanism, which is a vestige of the foundation layer's history as part of my dashboard. I'm thinking it's better to useTimer
objects, and if necessary/convenient, theTimerBroker
.
notify( event )
Arguments: event - an classless object containing the event data. ???Needs more description at some point.
Returns: None
The notify()
method satisfies the implementation of the Observer interface (see below). This method is called when a subscribed Observable object publishes a message.
The default implementation simply propagates the event to any observers of the controller (usually the Structure
, but also other system objects such as the WebSocket API). Since the controller is by definition subscribed to all of its managed entities, any update to an entity results in a notification to the controller, which is then passed up by this method. Thus a subsystem may watch any or all entities of a controller without having to know they exist and subscribe to each individually.
This method is not normally overridden. It's default implementation is usually sufficient for all purposes. If, however, a subclass finds reason to override, the override must call the superclass method. This is vital to proper event processing for the controller's entities; any override should take special care that the superclass method is called no matter what kind of error occurs otherwise in the subclass override (e.g. in finally
of an all-enclosing try
block).
getSystemEntity()
Returns the Controller's system entity.
findEntity( id )
Arguments: id - (string) the ID of the entity to be found.
Returns: the located Entity object, or undefined
This method finds and returns the Entity with the given ID. The Entity must be owned by the controller (this). The argument is not a canonical ID, but rather the controller-local ID. If no entity matching is found, undefined
is returned.
This method must not be overridden.
This method must be called in preference to accessing this.entities
from the base class directly.
getEntity( className, id )
- Arguments:
className
- (string) name of Entity class to use if the object needs to be created;id
- (string) local ID of the Entity class to be found, or assigned if created.
Returns: the found/created Entity
object.
This method is the only valid way to create an Entity
object, and must be used exclusively for that purpose. Direct construction of Entity
objects is not permitted.
If the entity is created, the new object is automatically added to the controller's list of managed entities, and the controller is subscribed to the entity. The new entity also has change events deferred, so your implementation must at some point call entity.deferNotifies( false )
to reverse this. See Entity
below.
removeEntity( entity )
- Arguments:
entity
- (Entity) the Entity object to be removed.
Removes the specified entity and destroys it. The entity will be unsubscribed from all Observers in the process.
performOnEntity( entity, actionName, parameters )
- Arguments:
-
entity
- (Entity object) the entity on which to perform the action; -
actionName
- (string) the action name, in "capability.action" form (e.g. "power_switch.on") -
parameters
- (classless object, optional) the parameters required by the action
Returns: Promise
This action, which may be overridden by subclasses, is expected to do what is necessary to perform the requested action on the specified entity. This method must return a Promise that resolves when the action request has been sent to the remote system. It may defer settling the Promise until the request is acknowledged or not by the remote system (recommended); it further may defer settling until a remote service completes the request and returns its status or update (not recommended).
Subsystems outside of the Controller branch must not call this method. The correct way to perform an action on an entity is to call perform()
on the entity itself (see Entity
below).
By default, the base class implementation will look for a method named action_actionname( entity, parameters )
, where actionname
is in the form "capability_action" (the usual dot replaced with underscore). If found, this method will be called with only entity
and parameters
(the action name is implied by the method name).
NOTE: The default implementation is vestige of the foundation of the old dashboard. In practice, I think most subclasses will override this method and do configurable mapping implementations, such as those done for
VeraController
,HassController
, andHubitatController
, rather than hard-coding methods. While I could remove it and require subclass override, the default implementation is actually really handy for debugging when one first starts writing aController
subclass, I think, so I'm leaving it for now.
getID()
Arguments: None
Returns: (string) the ID of the Controller, as given in the controllers
configuration section.
This method must not be overridden.
toString()
Arguments: None
Returns: (string) returns a string with the subclass name and controller ID (e.g. "HassController#home")
This method must not be overridden.
getConfig()
Arguments: None
Returns: (classless object) the configuration object passed to the Controller
when it was created. This must be used in preference to accessing this.config
(Controller
private data) directly.
This method must not be overridden.
getLogger()
Arguments: None
Returns: (Logger
) the logger established for the controller.
This method must not be overridden.
getStructure()
Arguments: None
Returns: (Structure
) the structure object that created the controller.
This method must not be overridden.
isReady()
Arguments: None
Returns: (boolean) whether the controller is currently online.
This method must not be overridden.
extract()
Arguments: None
Returns: (classless object) an object representation of the class data
This method is used by the various APIs to transmit the controller data to a client. It creates a classless-object representation of the controller's data.
This method must not be overridden.
serialize()
Argument: None
Returns: (string) JSON string representing the controller data.
This method is used to create a JSON representation of the controller. It is effectively implemented by returning JSON.stringify( this.extract() )
.
This method must not be overridden.
isRunning()
Arguments: None
Returns: (boolean) true
if a timer is in effect; false
otherwise;
This method must not be overridden.
online()
Arguments: None
Returns: Nothing
This method is used by Controller
subclasses to signal the base class that the subclass is operational, or not.
This method must be called by a subclass at least once. This method must not be overridden.
offline()
Arguments: None
Returns: Nothing
This method is used by Controller
subclasses to signal the base class that the subclass is not operational (e.g. it has lost a connection to a remote data source, etc.).
Requirements:
- This method may be called when communications or other source strategies fail;
- This method may be called after a reasonable number of retries/small delay during retries;
- This method must not be overridden.
startDelay( milliseconds )
This method starts a delay of the specified number of milliseconds. After at least that many milliseconds (i.e. timing not guaranteed), the run()
method will be invoked. Thereafter, no timer will be in effect. The implementation of the run()
method may call startDelay()
again, or another timer function, to reinstate timing.
The Controller
timing mechanism supports only one timer at a time. For this reason, this mechanism is deprecated. Use Timer
objects, and optionally TimerBroker
, instead.
This method must not be overridden.
startInterval( milliseconds )
Starts a timer that calls run()
every milliseconds
, repeatedly. The interval can be cancelled by calling stopTimer()
.
The Controller
timing mechanism supports only one timer at a time. For this reason, this mechanism is deprecated. Use Timer
objects, and optionally TimerBroker
, instead.
This method must not be overridden.
startDaily()
DEPRECATED -- Do not use.
This method must not be overridden.
stopTimer()
This method stops any running timer. If not timer is running, it returns without action or error.
This method must not be overridden.
tick()
This is an internal private implementation method.
This method must not be overridden.
async fetchText( url [, options] )
Arguments: url - URL to be fetched; options - (optional) object containing option key/value pairs.
Returns: Promise
that resolves to response text.
This is a convenience method to help streamline subclass implemenations. It returns a Promise
that resolves when the specified url
has been fetched. The resolution object is a text string containing the entire response from the server.
async fetchJSON( url, [, options] )
Arguments: url - URL to be fetched; options - (optional) object containing option key/value pairs.
Returns: Promise
that resolves to an object containing the parsed JSON response.
This is a convenience method to help streamline subclass implemenations. It returns a Promise
that resolves when the specified url
has been fetched. The resolution object is the parsed JSON data of the response.
getNotificationURL()
Arguments: None
Returns: (string) URL at which the application can receive HTTP requests from a remote system and pass them to the controller instance.
The system offers a method by which, if necessary, a remote system may send a message to a Controller
instance asynchronously via HTTP. This method returns the constructed URL at which the system is prepared to receive such requests. All request methods are allowed and passed through. See getNotificationMiddleware()
, below.
This method must not be overridden.
getNotificationMiddleware()
Arguments: None
Returns: (array) An array of Express-compatible middleware function
s to be used in handling the request.
The HTTP API and Controller
subsystems provide a mechanism by which a remote endpoint may initiate a connection to the Reactor HTTP API and send a message to a specified controller. The URL at which this is done is created dynamically and must be fetched by calling getNotificationURL()
. When the HTTP API receives a request at this URL (any method), it determines which controller the message is for, and then calls that controller's getNotificationMiddleware()
method. The default (superclass) implementation simple returns boolean false
, indicating that the request should not be processed (a 404 is returned to the requestor). Otherwise, the return value is expected to be an array of Express-compatible middleware used for processing and handling the request.
Without going into too much detail about the operation of Express (refer to its documentation separately), the array of middleware that is returned is run in sequence until one of the middleware functions handles the request. Typically, a subclass method is the last element in the array, in the form this.subclass_method_name.bind(this)
. It may be preceded by one or more other middleware methods, in particular, any of the Express-included middleware. For example, a common set for processing a JSON payload posted from a remote system might be [ express.json(), this.subclass_method_name.bind(this) ]
.
Since the subclass method follows the design pattern of Express middleware, it receives three arguments: request
, response
, and next
. The request
argument is the Express Request
object for the request. The body of the request can be accessed using request.body()
. The response
argument is the Express Response
object. It must be used, at a minimum, to define the response to the remote endpoint: response.status(200).send("OK")
. The next
argument is a function that can be called if the current middleware function wishes to allow any subsequent middleware in the array to be run (i.e. the request may have been modified, but is not completely handled). Usually, Controller
subclass implementation middleware functions will be last in the array, and thus may ignore and not call next()
, which indicates that it has handled the request and no further work other than sending the required response is to be done.
For reference, look at the implementation of HubitatController
, which is the reason this mechanism exists.
This method must not be overridden.
getInstancePromise()
This method is used to create an instance of a Controller
subclass. It is used only by the Structure
object, and must not be used otherwise.
This method must not be overridden.
fetchText( url, options )
This method is used to fetch data from a remote URL. It returns a Promise that resolves when the query succeeds (or rejects if it fails). When successful, the Promise resolves to a String or Buffer containing the response data.
This convenience method should be used in preference to calling node-fetch or similar libararies directly.
fetchJSON( url, options )
This method is used to fetch JSON from a remote URL. It returns a Promise that resolves when the query succeeds, as for fetchText(). The data passed to the then() handler is the parsed JSON data received.
This convenience method should be used in preference to calling node-fetch or similar libararies directly.
WebSocket Subsystem
The WebSocket subsystem provides additional methods on Controller to manage a single connection to an endpoint. It simplifies the process of creating and managing WebSockets in Controller implementations, handling connection and upgrade, ping/pong, errors, and closure. Received messages are passed to a subclass method for handling.
The async ws_open( url, options )
method is used to open a WebSocket connection to a given URL. It returns a Promise that resolves when the connection succeeds, or rejects if the connection fails. The options is an object (dictionary) that currently has only two keys available: connectTimeout
— the maximum time (ms) to attempt connection; and pingInterval
— the maximum interval between pings on the connection. This method must not be overridden.
The async ws_close( code, reason )
method can be called by the implementation to close the WebSocket. This would normally only be done as part of a stop()
implementation. Otherwise, the Controller base class manages the connection. This method returns a Promise that resolves when the connection is fully closed. This method must not be overridden (see ws_closing()
below if you need to do things when the socket closes).
The more urgent ws_terminate()
is also available, which summarily slams the connection shut with no fanfare. This method must not be overridden.
The ws_send( payload )
method of the base class can be called to send payload data (String or Buffer) to the endpoint. This method must not be overridden.
The ws_connected()
method returns true
if the WebSocket is currently connected; false
otherwise.
Your Controller subclass must implement a ws_message( data )
method, which is called when data is received on the socket. No interpretation or transformation of the data is performed; it is handed to your implementation exactly as it is received.
You may also implement ws_closing( code, reason )
, which is an advisory method to notify your subclass that the connection is being closed. The connection can close at any time for any reason. The most typical use of this method is to start a delay timer that, when expired, attempts reconnection to the endpoint, which is not automatic. The state of the connection when called is undefined, so implementations must not send any final messages or perform any other functions (e.g. ping) on the WebSocket connection.
Note
It is very common for this.offline()
to be called in response to a close, so that other parts of the system know that the controller is offline. Often, however, some hysteresis is desirable, and the call to offline()
is made elsewhere when several consecutive attempts to reconnect have failed.
Entity
Entity implements Observable and publishes entity-changed
events when its properties or attributes are modified. The entity-delete
event is sent before an entity is destroyed.
getCanonicalID()
deferNotifies( state )
Every change to an Entity causes an event to be published, by default. But typically, during inventory of a hub by a Controller subclass, for example, an entity will need to have many attributes and properties set, and it would be undesirable to have a large number of events sent. When deferNotifies()
is called with true
, publication of change events is suspended until deferNotifies()
is called with false
later (at which point, a single change event is sent, if changes were made).
deferNotifies()
returns the current state as the value of the function. This can be used to pass to a later deferNotifies()
call in situations where there may be nest functions, each of which may or may not defer notifications based on their implementations.
getName()
setName( name )
extendCapability( capability_name )
hasCapability( capability_name )
setAttribute( attribute_name, new_value )
getAttribute( attribute_name )
hasAttribute( attribute_name )
setPrimaryAttribute( attribute_name )
setPrimaryValue( new_value )
enumAttributes()
registerAction( name, func )
hasAction( name )
perform( actionName, parameters )
Performs the named action on this Entity with the given parameters. Returns a Promise. Subsystems outside of Controller implementations must call this method and must not call Controller.performOnEntity()
.
TaskQueue
TaskQueue implements a queue of actions to performed at a limited pace. It is often used with Controllers when the hub is incapable of managing a large number of requests in a short period of time.
constructor()
Returns a new TaskQueue.
start()
Start the TaskQueue. This must be called at least once to get things moving. Once started, the TaskQueue will run enqueued tasks as they come, subject to pacing restrictions (see setPace()
below).
stop()
Stop the TaskQueue. This may be called, and when used, is often done in the stop()
implementation of a parent Controller. Any queued tasks not yet running fail immediately.
finish()
Returns a Promise that resolves when all queued tasks complete.
setPace( milliseconds )
Set the pace at which tasks may execute (i.e. not less than milliseconds
apart). If the queue contains multiple tasks, the next task may be delayed.
enqueueTask( func [ , opts [ , ...args ] ] )
Enqueues the given function as a task to be run. The function will be called with the listed args
(if any). Returns a Promise that resolves when the task completes, or rejects if the task throws an error.
The opts
object currently only has one key, name
, which may be set to a name identifying the task (helps debugging). If not given, the name is the task ID, which is internally generated.
Observable
Observable is an interface (mixin) class that can be applied to an object, indicating it sends events.
Use util.mixin()
to apply Observable. Note that it is not necesary to apply Observable to your Controller subclass because it is inherited from Controller.
publish( data, eventType, ...args )
Publishes an event with the given eventType
and data
. Most objects will simply publish this
as the data (i.e. they publish themselves to announce they have changed), but the data can be anything, or nothing.
disconnect()
Removes all observers for this object. Usually called when the object is about to be destroyed.
Observer
Observer is an interface (mixin) class applied to an object to indicate it can receive events from Observable objects.
Use util.mixin()
to apply Observer. Note that it is not necessary to apply Observer to your Controller subclass because it is inherited from Controller.
notify( event )
Called by the events subsystem to pass an event from an Observable to which this Observer has previously subscribed. The event is a structure/object containing keys type
, data
, and sender
.
propagate( event )
Typically used in an Observer's notify()
implementation when the Observer itself is Observable, propagate()
passes an event up the event hierarchy.
subscribe( observable [, ident ] )
Subscribes this Observer to an Observable. The optional ident
may be used to manage multiple subscriptions to the same Observable for different purposes (it is little-used).
unsubscribe( [observed] )
Unsubscribe from the specified Observable, or if not provided, all objects.
TimerBroker
TimerBroker is a singleton that manages multiple timers for a subsystem.
getTimer( id, resolution, ...args )
releaseTimer( id_or_timer )
Timer
A Timer is an Observable object that will send a timer-trigger
event to its observers when the time is met.
constructor( ident [, callback ] )
If the callback
is given, it is called in addition to sending a timer-trigger
event to observers.
delayms( milliseconds )
at( absolute_time )
cancel()
destroy()
Logger
static getInstance( id [, parent ] )
info( msg, ... )
notice( msg, ... )
warn( msg, ... )
err( msg, ... )
exception( error_object )
debug( level, msg, ... )
always( msg, ... )
Semaphore
constructor( initialValue )
Constructs a new counting semaphore1 with the given initial value (integer >= 0).
acquire( [block] )
Each call to acquire() decrements the semaphore's counter and returns a Promise. If the counter is greater than 0 prior to decrementing, the Promise resolves; otherwise, the Promise waits, effectively blocking the calling process, until another process that has previosly acquired the semaphore releases it. The data passed to the resolve function is a release function that must be called to advise the semaphore that you are done with it. Great care must be taken all acquired semaphores are released, or deadlocks may result.
If block
is given and false
, the request will not block, but rather reject the returned Promise.
isLocked()
Returns true
if the semaphore is currently locked (has count <= 0).
count()
Returns the semaphore's current count.
wait( timeout )
Like acquire()
, but will time out after timeout
milliseconds if the semaphore cannot be acquired (the returned Promise is rejected in this case).
drain()
"Drains" the semaphore by settling all waiting Promises and restoring the semaphore's count to its initial value.
Mutex
Implements a mutual-exclusion lock. This is a special-case Semaphore that prevents execution of a protected code block by more than one thread of execution simultaneously.
constructor()
Makes a new lock.
acquire()
Acquires the lock. A promise is returned that resolves when the lock is acquired. The data passed to the resolver is a release function that must be called when the code if finished with the lock. Care must be taken that the lock is released under any and all conditions, or deadlocks will result.
isLocked()
Returns true
if the lock is currently locked (i.e. calling acquire()
would block).
release()
Releases the lock unconditionally. It is not recommended that this method ever be called; it is provided only for exigent circumstances (that I have not yet identified concretely, so this method may be removed in future).
Configuration
Capabilities
util
getUID( [ prefix ] )
Returns a (string) unique identifier with the given prefix (if any). The string is guaranteed to be unique as long as the system time does not slew backwards significantly (and even if it does, a restart of Reactor would have to occur during the overlap period for any risk of duplication -- not likely).
shallowCopy( object )
Creates a shallow copy of the passed object.
clone( object )
Creates a deep copy of the passed object.
hasAnyProperty( object )
Returns true
if the given object has any (enumerable) properties (e.g. hasAnyProperty( {} )
returns false
).
deepCompare( val1, val2 )
Compares val1
and val2
for semantic equality. Both must have the same type, and may be objects.
mixin( baseClass, mixinClass )
Applies mixinClass
(which must be a class
) to baseClass
: all methods of mixinClass
except its constructor are applied to baseClass
, if they are not already defined.
coalesce( ... )
Returns the first non-undefined
, non-null
, non-NaN
argument, or null
.
isEmpty( string )
Returns true
if the given string is undefined
, null
, or the empty string (""
).
loadData( path, fileroot, sysroot )
async asyncForEach( array, callback )
TimedPromise( implementation, timeout )
sequencePromises( implementationArray )
Given an array of functions that take resolve, reject
arguments (i.e. Promise implementations), runs the implementations (as Promises) sequentially. Returns a Promise that resolves when all are complete, or rejects if any fails.
promiseWrapper( function, ...args )
promiseWrapperAsync( function, ...args )
http_fetch( url, req )
A wrapper for node-fetch
to handle Digest authentication. Parameters are identical to that of node-fetch
, with the following optional additions to req
:
reactor_ignore_certs
: boolean (default: false)reactor_username
: stringreactor_password
: stringreactor_basic_auth
: boolean (default: false)
Updated: 2022-Jan-13