Source: index.js

const errors = require('./errors')
const crypto = require('crypto')
const EventEmitter = require('events').EventEmitter
const request = require('request-promise')
const url = require('url')
const http = require('http')
const https = require('https')
const qs = require('querystring')
const Promise = require('bluebird')
const isAbsoluteUrl = require('is-absolute-url')
const parseLinkHeader = require('parse-link-header')

/**
 * Twitch Webhook API
 */
class TwitchWebhook extends EventEmitter {
  /**
   * Constructs an instance of TwitchWebHookAPI
   *
   * @param {Object} options - Options
   * @param {string} options.client_id - Client ID required for Twitch API calls
   * @param {string} options.callback - URL where notifications
   * will be delivered.
   * @param {string} [options.secret=false] - Secret used to sign
   * notification payloads.
   * @param {number} [options.lease_seconds=864000] - Number of seconds until
   * the subscription expires.
   * @param {boolean|Object} [options.listen] - Listen options
   * @param {string} [options.listen.autoStart=true] - Should automaticaly starts listening
   * @param {string} [options.listen.host="0.0.0.0"] - Host to bind to
   * @param {number} [options.listen.port=8443] - Port to bind to
   * @param {boolean|Object} [options.https=false] - Should use https connection.
   * If yes, these options to be passed to `https.createServer()`.
   * @param {string} [options.baseApiUrl="https://api.twitch.tv/helix/"] - Base Twitch API URL. Needed proxying and testing
   */
  constructor (options = {}) {
    if (!options.client_id) {
      throw new errors.FatalError('Twitch Client ID not provided!')
    }

    if (!options.callback) {
      throw new errors.FatalError('Callback URL not provided!')
    }

    super()

    this._options = options
    if (options.callback.substr(-1) !== '/') {
      this._options.callback += '/'
    };

    if (this._options.lease_seconds === undefined) {
      this._options.lease_seconds = 864000
    }

    this._options.listen = options.listen || {}
    this._options.listen.host = options.listen.host || '0.0.0.0'
    this._options.listen.port = options.listen.port || 8443

    this._options.https = options.https || {}

    this._apiUrl = options.baseApiUrl || 'https://api.twitch.tv/helix/'
    if (this._apiUrl.substr(-1) !== '/') {
      this._apiUrl += '/'
    }

    this._hubUrl = this._apiUrl + 'webhooks/hub'
    this._apiPathname = url.parse(this._apiUrl).pathname

    this._subscriptions = {}

    if (Object.keys(options.https).length) {
      this._server = https.createServer(
        options.https,
        this._requestListener.bind(this)
      )
    } else {
      this._server = http.createServer(this._requestListener.bind(this))
    }

    this._server.on('error', this.emit.bind(this, 'error'))
    this._server.on('listening', this.emit.bind(this, 'listening'))

    if (options.listen.autoStart === undefined || options.listen.autoStart) {
      this.listen()
    }
  }

  /**
   * Start listening for connections
   *
   * @param {...any} [args] - Arguments
   * @throws {Promise<FatalError>} If listening is already started
   * @return {Promise}
   */
  listen (...args) {
    if (this.isListening()) {
      return Promise.reject(new errors.FatalError('Listening is already started'))
    }

    return new Promise(resolve => {
      if (!args.length) {
        this._server.listen(
          this._options.listen.port,
          this._options.listen.host,
          () => resolve()
        )
      } else {
        this._server.listen(...args, () => resolve())
      }
    })
  }

  /**
   * Stop listening for connections
   *
   * @return {Promise}
   */
  close () {
    if (!this.isListening()) {
      return Promise.resolve()
    }

    return new Promise(resolve => {
      this._server.close(() => resolve())
    })
  }

  /**
   * Check if server is listening for connections.
   *
   * @return {boolean}
   */
  isListening () {
    return this._server.listening
  }

  /**
   * Make request
   *
   * @private
   * @param {string} mode - URL for the topic to subscribe to or
   * unsubscribe from.
   * @param {string} topic - Topic name
   * @param {Object} options - Topic options
   * @throws {Promise<RequestDenied>} If the hub finds any errors in the request
   * @return {Promise}
   */
  _request (mode, topic, options) {
    if (!isAbsoluteUrl(topic)) {
      topic = this._apiUrl + topic
    }
    if (Object.keys(options).length) {
      topic += '?' + qs.stringify(options)
    }

    let requestOptions = {}
    requestOptions.url = this._hubUrl
    requestOptions.headers = {
      'Client-ID': this._options.client_id
    }
    requestOptions.qs = {
      'hub.callback': this._options.callback,
      'hub.mode': mode,
      'hub.topic': topic,
      'hub.lease_seconds': this._options.lease_seconds
    }
    requestOptions.resolveWithFullResponse = true
    if (this._options.secret) {
      const secret = crypto
        .createHmac('sha256', this._options.secret)
        .update(topic)
        .digest('hex')

      requestOptions.qs['hub.secret'] = secret
    }

    return request
      .post(requestOptions)
      .catch(err => {
        throw new errors.RequestDenied(err)
      })
      .then(response => {
        this._subscriptions[topic] = {}
        if (this._options.secret) {
          this._subscriptions[topic].secret = requestOptions.qs['hub.secret']
        }
      })
  }

  /**
   * Subscribe to specific topic
   *
   * @param {string} topic - Topic name
   * @param {Object} options - Topic options
   * @throws {RequestDenied} If hub finds any errors in the request
   * @return {Promise}
   */
  subscribe (topic, options = {}) {
    return this._request('subscribe', topic, options)
  }

  /**
   * Unsubscribe from specific topic.
   * "*" will unsubscribe from all topics that were subscribed on this session
   *
   * @param {string} topic - Topic name
   * @throws {RequestDenied} If hub finds any errors in the request
   * @return {Promise}
   */
  unsubscribe (topic, options = {}) {
    if (topic !== '*') {
      return this._request('unsubscribe', topic, options)
    }

    let poll = []
    for (let topic of Object.keys(this._subscriptions)) {
      poll.push(() => this._request('unsubscribe', topic))
    }
    return Promise.all(poll)
  }

  /**
   * Return errors
   */
  get errors () {
    return errors
  }

  /**
   * Process connection updates
   *
   * @private
   * @param {Object} request - Request
   * @param {Object} response - Response
   */
  _processConnection (request, response) {
    const queries = url.parse(request.url, true).query

    switch (queries['hub.mode']) {
      case 'denied':
        response.writeHead(200, { 'Content-Type': 'text/plain' })
        response.end()

        this.emit('denied', queries)
        break
      case 'unsubscribe':
        delete this._subscriptions[queries['hub.topic']]

        response.writeHead(200, { 'Content-Type': 'text/plain' })
        response.end(queries['hub.challenge'])

        this.emit('unsubscribe', queries)
        break
      case 'subscribe':
        response.writeHead(200, { 'Content-Type': 'text/plain' })
        response.end(queries['hub.challenge'])

        this.emit('subscribe', queries)
        break
      default:
        response.writeHead(400, { 'Content-Type': 'text/plain' })
        response.end()
    }
  }

  /**
   * Fix fields with date
   *
   * @private
   * @param {string} topic - Topic name
   * @param {Object} data - Request data
   * @return {Object}
   */
  _fixDate (topic, data) {
    switch (topic) {
      case 'users/follows':
        if (data.timestamp) { // for compatibility with old payloads
          data.timestamp = new Date(data.timestamp)
        } else {
          for (let follow of data.data) {
            follow.followed_at = new Date(follow.followed_at)
          }
        }
        break
      case 'streams':
        for (let stream of data.data) {
          stream.started_at = new Date(stream.started_at)
        }
        break
    }

    return data
  }

  /**
   * Process updates
   *
   * @private
   * @param {Object} request - Request
   * @param {Object} response - Response
   */
  _processUpdates (request, response) {
    const links = parseLinkHeader(request.headers.link)
    const endpoint = links && links.self && links.self.url
    const params = endpoint && url.parse(endpoint, true)
    const topic = params && params.pathname.replace(this._apiPathname, '')
    const options = params && params.query

    if (!endpoint || !topic) {
      this.emit('webhook-error', new errors.WebhookError('Topic is missing or incorrect'))
      response.writeHead(202, { 'Content-Type': 'text/plain' })
      response.end()
      return
    }

    let signature
    if (this._options.secret) {
      signature =
        request.headers['x-hub-signature'] &&
        request.headers['x-hub-signature'].split('=')[1]

      if (!signature || !this._subscriptions[endpoint] || !this._subscriptions[endpoint].secret) {
        this.emit('webhook-error', new errors.WebhookError('"x-hub-signature" is missing'))
        response.writeHead(202, { 'Content-Type': 'text/plain' })
        response.end()
        return
      }
    }

    let body = ''
    request.on('data', data => {
      body += data

      // Too much data, destroy the connection
      if (body.length > 1e6) {
        body = ''
        this.emit('webhook-error', new errors.WebhookError('Request is very large'))
        response.writeHead(202, { 'Content-Type': 'text/plain' })
        response.end()
        request.connection.destroy()
      }
    })

    request.on('end', () => {
      let data
      try {
        data = JSON.parse(body)
      } catch (err) {
        this.emit('webhook-error', new errors.WebhookError('JSON is malformed'))
        response.writeHead(202, { 'Content-Type': 'text/plain' })
        response.end()
        return
      }

      if (this._options.secret) {
        let storedSign = crypto
          .createHmac('sha256', this._subscriptions[endpoint].secret)
          .update(body)
          .digest('hex')

        if (storedSign !== signature) {
          this.emit('webhook-error', new errors.WebhookError('"x-hub-signature" is incorrect'))
          response.writeHead(202, { 'Content-Type': 'text/plain' })
          response.end()
          return
        }
      }

      response.writeHead(200, { 'Content-Type': 'text/plain' })
      response.end()

      let payload = {}
      payload.topic = topic
      payload.options = options
      payload.endpoint = endpoint
      payload.event = this._fixDate(topic, data)

      this.emit(topic, payload)
      this.emit('*', payload)
    })
  }

  /**
   * Request listener
   *
   * @private
   * @param {Object} request - Request
   * @param {Object} response - Response
   */
  _requestListener (request, response) {
    switch (request.method) {
      case 'GET':
        this._processConnection(request, response)
        break
      case 'POST':
        this._processUpdates(request, response)
        break
      default:
        response.writeHead(405, { 'Content-Type': 'text/plain' })
        response.end()
    }
  }
}

module.exports = TwitchWebhook