channel.js

/*
 * Copyright 2017-2019 Tom Swindell
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * 
 */
const EventEmitter = require('events')

const { Frame, Message } = require('./codec.js')

// CONSTANTS
const ARNETWORK_FRAME_TYPE_ACKNOWLEDGE = 0x01
const ARNETWORK_FRAME_TYPE_DATA = 0x02
const ARNETWORK_FRAME_TYPE_LOW_LATENCY_DATA = 0x03
const ARNETWORK_FRAME_TYPE_ACKNOWLEDGE_DATA = 0x04

const ARNET_D2C_PING_ID = 0x00
const ARNET_C2D_PONG_ID = 0x01
const ARNET_C2D_NONACK_ID = 0x0a
const ARNET_C2D_ACK_ID = 0x0b
const ARNET_C2D_EMERG_ID = 0x0c
const ARNET_C2D_VIDEO_ACK_ID = 0x0d
const ARNET_D2C_NAVDATA_ID = 0x7f
const ARNET_D2C_EVENT_ID = 0x7e
const ARNET_D2C_VIDEO_DATA_ID = 0x7d
const ARNET_C2D_NAVDATA_ACK_ID = 0xfe

/**
 * Returns a hexadecimal formatted string, spaced at byte bounderies.
 * @private
 * @param {Buffer} buffer 
 */
const HEXDUMP = (buffer) => buffer.toString('hex').replace(/([0-9a-fA-F]{2})/g, '$1 ')

/**
 * @event MessageChannel#close
 * 
 * @public
 */

/**
 * @event MessageChannel#navdata
 * 
 * @property {Message} message
 * @property {RemoteInfo} rinfo
 * 
 * @public
 */

/**
 * @event MessageChannel#navdata-event
 * 
 * @property {Message} message
 * @property {RemoteInfo} rinfo
 * 
 * @public
 */

/**
 * @event MessageChannel#timeout
 * 
 * @public
 */

 /**
 * ARNet device control channel
 * 
 * @class
 * @public
 * @hideconstructor
 * @implements EventEmitter
 * 
 * @param {string} remoteAddr Remote device IPv4 address.
 * @param {uint16} remotePort Remote device discovery port number.
 * @param {dgram.Socket} socket Underlying UDP transport to use.
 * @param {Object} [opts]
 * @param {boolean} [opts.debug=false] Enable debug logging.
 */
function MessageChannel (remoteAddr, parameters, socket, opts) {
  opts = Object.assign({ debug: false }, opts || {})

  EventEmitter.call(this)

  const __self = this

  const remotePort = parameters.c2d_port

  const __sequence = {}

  const __pending = []
  const __responses = []

  let __keepalive

  /**
   * Send ARNet network frame via transport to remote endpoint.
   * 
   * @public
   * 
   * @param {uint8} type 
   * @param {uint8} id 
   * @param {uint8} seq 
   * @param {Buffer} payload
   */
  const sendFrame = (type, id, seq, payload) => {
    const frame = Frame.pack(type, id, seq, payload)
    socket.send(frame, remotePort, remoteAddr)

    if (opts.debug) {
      console.info(`TX: ${remoteAddr}:${remotePort} --> `, HEXDUMP(frame))
    }
  }

  /**
   * Send ARNet command via transport to remote endpoint.
   * 
   * @public
   * 
   * @param {Object} mesginfo ARNet message header attributes
   * @param {...*} [args] ARNet message arguments
   */
  const sendMessage = (mesginfo, args, dequeue) => {
    return new Promise((resolve, reject) => {
      const { featureId, classId, messageId, bufferId, expects } = mesginfo

      // Queue requests (unless dequeueing) whilst waiting for responses.
      if (!dequeue && expects && expects.immediate) {
        __pending.push([mesginfo, args, resolve, reject])
        if (__pending.length > 1) return
      }

      // Initialize or reset message sequence counter.
      if (!(bufferId in __sequence) || __sequence[bufferId] > 255) {
        __sequence[bufferId] = 0
      }

      // Send message frame.
      sendFrame(
        ARNETWORK_FRAME_TYPE_DATA, bufferId, __sequence[bufferId]++,
        Message.pack(featureId, classId, messageId, mesginfo.encode(args))
      )

      // Resolve now if we're not expecting a response.
      if (!expects) return resolve()
    })
  }

  /**
   * Close underlying transport and unregister event handlers.
   * 
   * @public
   */
  const close = () => {
    if (socket.close()) socket.close()
    socket = null
  }

  /**
   * @private
   * @param {*} frame 
   */
  const onPing = (frame, rinfo) => {
    sendFrame(frame.type, ARNET_C2D_PONG_ID, frame.seq, frame.payload)
    __self.emit('ping')
  }

  /**
   * @private
   * @param {*} frame 
   */
  const onNavdata = (frame, rinfo) => {
    const message = Message.unpack(frame.payload)
    __self.emit('navdata', message, rinfo)
  }

  /**
   * @private
   * @param {*} frame 
   */
  const onNavdataEvent = (frame, rinfo) => {
    onNavdata(frame, rinfo)

    const mesg = Message.unpack(frame.payload)

    // If we're processing responses
    if (__pending.length > 0) {
      const [ minfo, _1, resolve, _2 ] = __pending[0]
      const [ fId, cId, mId ] = minfo.expects.immediate

      // If this message is what we're expecting, then dequeue and resolve.
      if (fId === mesg.featureId &&
          cId === mesg.classId &&
          mId === mesg.messageId) {

        __pending.shift()
        resolve(__responses.splice(0, __responses.length))

        // Dequeue and send next message.
        if (__pending.length > 0) {
          const [ minfoN, argsN, resolveN, rejectN ] = __pending[0]
          sendMessage(minfoN, argsN, true).then(resolveN).catch(rejectN)
        }
      } else {
        //   Otherwise push this event to response buffer, and continue
        // waiting for expected terminator message.
        __responses.push(mesg)
      }
    }

    // Generate and send acknowledgement message.
    const bufferId = ARNET_C2D_NAVDATA_ACK_ID
    if (!(bufferId in __sequence) || __sequence[bufferId] > 255) __sequence[bufferId] = 0

    sendFrame(
      ARNETWORK_FRAME_TYPE_ACKNOWLEDGE,
      ARNET_C2D_NAVDATA_ACK_ID,
      __sequence[bufferId]++,
      Buffer.alloc(1, frame.seq)
    )
  }

  /**
   * @private
   * @param {*} frame 
   */
  const onVideoData = (frame, rinfo) => { /* ... */ }

  /**
   * @private
   * @param {*} message 
   * @param {*} rinfo 
   */
  const onIncomingMessage = (message, rinfo) => {
    // Reset communications keep-alive timeout.
    clearTimeout(__keepalive)
    __keepalive = setTimeout(() => {
      console.info("Keep-alive timeout, closing socket.")
      __self.emit('timeout')
      try {
        close()
      } catch (e) {
        // Ignore for now
        console.error(e.errno)
      }
    }, opts.keepalive || 1000 * 5)

    if (opts.debug) {
      console.info(`RX: ${rinfo.address}:${rinfo.port} <-- `, HEXDUMP(message))
    }

    try {
      const frame = Frame.unpack(message)
      switch (frame.id) {
        case ARNET_D2C_PING_ID:
          onPing(frame, rinfo)
          break

        case ARNET_D2C_NAVDATA_ID:
          onNavdata(frame, rinfo)
          break

        case ARNET_D2C_EVENT_ID:
          onNavdataEvent(frame, rinfo)
          break

        case ARNET_D2C_VIDEO_DATA_ID:
          onVideoData(frame, rinfo)
          break

        default:
          console.warn(`Unrecognised frame id (${frame.id}) --> `, HEXDUMP(message))
      }
    } catch (e) {
      console.error(e)
    }
  }

  // Initialize instance
  // XXX - Abstract error and close handlers and deregister handlers in
  // close.
  socket.on('message', onIncomingMessage)
  socket.on('error', e => __self.emit('error', e))
  socket.on('close', () => __self.emit('close'))

  __keepalive = setTimeout(() => {
    console.info("Keep-alive timeout, closing socket.")
    close()
  }, opts.keepalive || 1000 * 10)

  // Export Public Interface
  this.getParameters = () => parameters
  this.sendFrame = sendFrame.bind(this)
  this.sendMessage = sendMessage.bind(this)
  this.close = close.bind(this)
}

MessageChannel.prototype = Object.create(EventEmitter.prototype)

module.exports = MessageChannel