/*
* Copyright 2017-2019 Tom Swindell
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
const EventEmitter = require('events')
const { Frame, Message } = require('./codec.js')
// CONSTANTS
const ARNETWORK_FRAME_TYPE_ACKNOWLEDGE = 0x01
const ARNETWORK_FRAME_TYPE_DATA = 0x02
const ARNETWORK_FRAME_TYPE_LOW_LATENCY_DATA = 0x03
const ARNETWORK_FRAME_TYPE_ACKNOWLEDGE_DATA = 0x04
const ARNET_D2C_PING_ID = 0x00
const ARNET_C2D_PONG_ID = 0x01
const ARNET_C2D_NONACK_ID = 0x0a
const ARNET_C2D_ACK_ID = 0x0b
const ARNET_C2D_EMERG_ID = 0x0c
const ARNET_C2D_VIDEO_ACK_ID = 0x0d
const ARNET_D2C_NAVDATA_ID = 0x7f
const ARNET_D2C_EVENT_ID = 0x7e
const ARNET_D2C_VIDEO_DATA_ID = 0x7d
const ARNET_C2D_NAVDATA_ACK_ID = 0xfe
/**
* Returns a hexadecimal formatted string, spaced at byte bounderies.
* @private
* @param {Buffer} buffer
*/
const HEXDUMP = (buffer) => buffer.toString('hex').replace(/([0-9a-fA-F]{2})/g, '$1 ')
/**
* @event MessageChannel#close
*
* @public
*/
/**
* @event MessageChannel#navdata
*
* @property {Message} message
* @property {RemoteInfo} rinfo
*
* @public
*/
/**
* @event MessageChannel#navdata-event
*
* @property {Message} message
* @property {RemoteInfo} rinfo
*
* @public
*/
/**
* @event MessageChannel#timeout
*
* @public
*/
/**
* ARNet device control channel
*
* @class
* @public
* @hideconstructor
* @implements EventEmitter
*
* @param {string} remoteAddr Remote device IPv4 address.
* @param {uint16} remotePort Remote device discovery port number.
* @param {dgram.Socket} socket Underlying UDP transport to use.
* @param {Object} [opts]
* @param {boolean} [opts.debug=false] Enable debug logging.
*/
function MessageChannel (remoteAddr, parameters, socket, opts) {
opts = Object.assign({ debug: false }, opts || {})
EventEmitter.call(this)
const __self = this
const remotePort = parameters.c2d_port
const __sequence = {}
const __pending = []
const __responses = []
let __keepalive
/**
* Send ARNet network frame via transport to remote endpoint.
*
* @public
*
* @param {uint8} type
* @param {uint8} id
* @param {uint8} seq
* @param {Buffer} payload
*/
const sendFrame = (type, id, seq, payload) => {
const frame = Frame.pack(type, id, seq, payload)
socket.send(frame, remotePort, remoteAddr)
if (opts.debug) {
console.info(`TX: ${remoteAddr}:${remotePort} --> `, HEXDUMP(frame))
}
}
/**
* Send ARNet command via transport to remote endpoint.
*
* @public
*
* @param {Object} mesginfo ARNet message header attributes
* @param {...*} [args] ARNet message arguments
*/
const sendMessage = (mesginfo, args, dequeue) => {
return new Promise((resolve, reject) => {
const { featureId, classId, messageId, bufferId, expects } = mesginfo
// Queue requests (unless dequeueing) whilst waiting for responses.
if (!dequeue && expects && expects.immediate) {
__pending.push([mesginfo, args, resolve, reject])
if (__pending.length > 1) return
}
// Initialize or reset message sequence counter.
if (!(bufferId in __sequence) || __sequence[bufferId] > 255) {
__sequence[bufferId] = 0
}
// Send message frame.
sendFrame(
ARNETWORK_FRAME_TYPE_DATA, bufferId, __sequence[bufferId]++,
Message.pack(featureId, classId, messageId, mesginfo.encode(args))
)
// Resolve now if we're not expecting a response.
if (!expects) return resolve()
})
}
/**
* Close underlying transport and unregister event handlers.
*
* @public
*/
const close = () => {
if (socket.close()) socket.close()
socket = null
}
/**
* @private
* @param {*} frame
*/
const onPing = (frame, rinfo) => {
sendFrame(frame.type, ARNET_C2D_PONG_ID, frame.seq, frame.payload)
__self.emit('ping')
}
/**
* @private
* @param {*} frame
*/
const onNavdata = (frame, rinfo) => {
const message = Message.unpack(frame.payload)
__self.emit('navdata', message, rinfo)
}
/**
* @private
* @param {*} frame
*/
const onNavdataEvent = (frame, rinfo) => {
onNavdata(frame, rinfo)
const mesg = Message.unpack(frame.payload)
// If we're processing responses
if (__pending.length > 0) {
const [ minfo, _1, resolve, _2 ] = __pending[0]
const [ fId, cId, mId ] = minfo.expects.immediate
// If this message is what we're expecting, then dequeue and resolve.
if (fId === mesg.featureId &&
cId === mesg.classId &&
mId === mesg.messageId) {
__pending.shift()
resolve(__responses.splice(0, __responses.length))
// Dequeue and send next message.
if (__pending.length > 0) {
const [ minfoN, argsN, resolveN, rejectN ] = __pending[0]
sendMessage(minfoN, argsN, true).then(resolveN).catch(rejectN)
}
} else {
// Otherwise push this event to response buffer, and continue
// waiting for expected terminator message.
__responses.push(mesg)
}
}
// Generate and send acknowledgement message.
const bufferId = ARNET_C2D_NAVDATA_ACK_ID
if (!(bufferId in __sequence) || __sequence[bufferId] > 255) __sequence[bufferId] = 0
sendFrame(
ARNETWORK_FRAME_TYPE_ACKNOWLEDGE,
ARNET_C2D_NAVDATA_ACK_ID,
__sequence[bufferId]++,
Buffer.alloc(1, frame.seq)
)
}
/**
* @private
* @param {*} frame
*/
const onVideoData = (frame, rinfo) => { /* ... */ }
/**
* @private
* @param {*} message
* @param {*} rinfo
*/
const onIncomingMessage = (message, rinfo) => {
// Reset communications keep-alive timeout.
clearTimeout(__keepalive)
__keepalive = setTimeout(() => {
console.info("Keep-alive timeout, closing socket.")
__self.emit('timeout')
try {
close()
} catch (e) {
// Ignore for now
console.error(e.errno)
}
}, opts.keepalive || 1000 * 5)
if (opts.debug) {
console.info(`RX: ${rinfo.address}:${rinfo.port} <-- `, HEXDUMP(message))
}
try {
const frame = Frame.unpack(message)
switch (frame.id) {
case ARNET_D2C_PING_ID:
onPing(frame, rinfo)
break
case ARNET_D2C_NAVDATA_ID:
onNavdata(frame, rinfo)
break
case ARNET_D2C_EVENT_ID:
onNavdataEvent(frame, rinfo)
break
case ARNET_D2C_VIDEO_DATA_ID:
onVideoData(frame, rinfo)
break
default:
console.warn(`Unrecognised frame id (${frame.id}) --> `, HEXDUMP(message))
}
} catch (e) {
console.error(e)
}
}
// Initialize instance
// XXX - Abstract error and close handlers and deregister handlers in
// close.
socket.on('message', onIncomingMessage)
socket.on('error', e => __self.emit('error', e))
socket.on('close', () => __self.emit('close'))
__keepalive = setTimeout(() => {
console.info("Keep-alive timeout, closing socket.")
close()
}, opts.keepalive || 1000 * 10)
// Export Public Interface
this.getParameters = () => parameters
this.sendFrame = sendFrame.bind(this)
this.sendMessage = sendMessage.bind(this)
this.close = close.bind(this)
}
MessageChannel.prototype = Object.create(EventEmitter.prototype)
module.exports = MessageChannel