buddy

node MVC discord bot
Log | Files | Refs | README

StreamDispatcher.js (10418B)


      1 'use strict';
      2 
      3 const { Writable } = require('stream');
      4 const secretbox = require('../util/Secretbox');
      5 const Silence = require('../util/Silence');
      6 const VolumeInterface = require('../util/VolumeInterface');
      7 
      8 const FRAME_LENGTH = 20;
      9 const CHANNELS = 2;
     10 const TIMESTAMP_INC = (48000 / 100) * CHANNELS;
     11 
     12 const MAX_NONCE_SIZE = 2 ** 32 - 1;
     13 const nonce = Buffer.alloc(24);
     14 
     15 /**
     16  * @external WritableStream
     17  * @see {@link https://nodejs.org/api/stream.html#stream_class_stream_writable}
     18  */
     19 
     20 /**
     21  * The class that sends voice packet data to the voice connection.
     22  * ```js
     23  * // Obtained using:
     24  * voiceChannel.join().then(connection => {
     25  *   // You can play a file or a stream here:
     26  *   const dispatcher = connection.play('/home/hydrabolt/audio.mp3');
     27  * });
     28  * ```
     29  * @implements {VolumeInterface}
     30  * @extends {WritableStream}
     31  */
     32 class StreamDispatcher extends Writable {
     33   constructor(player, { seek = 0, volume = 1, fec, plp, bitrate = 96, highWaterMark = 12 } = {}, streams) {
     34     const streamOptions = { seek, volume, fec, plp, bitrate, highWaterMark };
     35     super(streamOptions);
     36     /**
     37      * The Audio Player that controls this dispatcher
     38      * @type {AudioPlayer}
     39      */
     40     this.player = player;
     41     this.streamOptions = streamOptions;
     42     this.streams = streams;
     43     this.streams.silence = new Silence();
     44 
     45     this._nonce = 0;
     46     this._nonceBuffer = Buffer.alloc(24);
     47 
     48     /**
     49      * The time that the stream was paused at (null if not paused)
     50      * @type {?number}
     51      */
     52     this.pausedSince = null;
     53     this._writeCallback = null;
     54 
     55     /**
     56      * The broadcast controlling this dispatcher, if any
     57      * @type {?VoiceBroadcast}
     58      */
     59     this.broadcast = this.streams.broadcast;
     60 
     61     this._pausedTime = 0;
     62     this._silentPausedTime = 0;
     63     this.count = 0;
     64 
     65     this.on('finish', () => {
     66       this._cleanup();
     67       this._setSpeaking(0);
     68     });
     69 
     70     this.setVolume(volume);
     71     this.setBitrate(bitrate);
     72     if (typeof fec !== 'undefined') this.setFEC(fec);
     73     if (typeof plp !== 'undefined') this.setPLP(plp);
     74 
     75     const streamError = (type, err) => {
     76       /**
     77        * Emitted when the dispatcher encounters an error.
     78        * @event StreamDispatcher#error
     79        */
     80       if (type && err) {
     81         err.message = `${type} stream: ${err.message}`;
     82         this.emit(this.player.dispatcher === this ? 'error' : 'debug', err);
     83       }
     84       this.destroy();
     85     };
     86 
     87     this.on('error', () => streamError());
     88     if (this.streams.input) this.streams.input.on('error', err => streamError('input', err));
     89     if (this.streams.ffmpeg) this.streams.ffmpeg.on('error', err => streamError('ffmpeg', err));
     90     if (this.streams.opus) this.streams.opus.on('error', err => streamError('opus', err));
     91     if (this.streams.volume) this.streams.volume.on('error', err => streamError('volume', err));
     92   }
     93 
     94   get _sdata() {
     95     return this.player.streamingData;
     96   }
     97 
     98   _write(chunk, enc, done) {
     99     if (!this.startTime) {
    100       /**
    101        * Emitted once the stream has started to play.
    102        * @event StreamDispatcher#start
    103        */
    104       this.emit('start');
    105       this.startTime = Date.now();
    106     }
    107     this._playChunk(chunk);
    108     this._step(done);
    109   }
    110 
    111   _destroy(err, cb) {
    112     this._cleanup();
    113     super._destroy(err, cb);
    114   }
    115 
    116   _cleanup() {
    117     if (this.player.dispatcher === this) this.player.dispatcher = null;
    118     const { streams } = this;
    119     if (streams.broadcast) streams.broadcast.delete(this);
    120     if (streams.opus) streams.opus.destroy();
    121     if (streams.ffmpeg) streams.ffmpeg.destroy();
    122   }
    123 
    124   /**
    125    * Pauses playback
    126    * @param {boolean} [silence=false] Whether to play silence while paused to prevent audio glitches
    127    */
    128   pause(silence = false) {
    129     if (this.paused) return;
    130     if (this.streams.opus) this.streams.opus.unpipe(this);
    131     if (silence) {
    132       this.streams.silence.pipe(this);
    133       this._silence = true;
    134     } else {
    135       this._setSpeaking(0);
    136     }
    137     this.pausedSince = Date.now();
    138   }
    139 
    140   /**
    141    * Whether or not playback is paused
    142    * @type {boolean}
    143    * @readonly
    144    */
    145   get paused() {
    146     return Boolean(this.pausedSince);
    147   }
    148 
    149   /**
    150    * Total time that this dispatcher has been paused in milliseconds
    151    * @type {number}
    152    * @readonly
    153    */
    154   get pausedTime() {
    155     return this._silentPausedTime + this._pausedTime + (this.paused ? Date.now() - this.pausedSince : 0);
    156   }
    157 
    158   /**
    159    * Resumes playback
    160    */
    161   resume() {
    162     if (!this.pausedSince) return;
    163     this.streams.silence.unpipe(this);
    164     if (this.streams.opus) this.streams.opus.pipe(this);
    165     if (this._silence) {
    166       this._silentPausedTime += Date.now() - this.pausedSince;
    167       this._silence = false;
    168     } else {
    169       this._pausedTime += Date.now() - this.pausedSince;
    170     }
    171     this.pausedSince = null;
    172     if (typeof this._writeCallback === 'function') this._writeCallback();
    173   }
    174 
    175   /**
    176    * The time (in milliseconds) that the dispatcher has actually been playing audio for
    177    * @type {number}
    178    * @readonly
    179    */
    180   get streamTime() {
    181     return this.count * FRAME_LENGTH;
    182   }
    183 
    184   /**
    185    * The time (in milliseconds) that the dispatcher has been playing audio for, taking into account skips and pauses
    186    * @type {number}
    187    * @readonly
    188    */
    189   get totalStreamTime() {
    190     return Date.now() - this.startTime;
    191   }
    192 
    193   /**
    194    * Set the bitrate of the current Opus encoder if using a compatible Opus stream.
    195    * @param {number} value New bitrate, in kbps
    196    * If set to 'auto', the voice channel's bitrate will be used
    197    * @returns {boolean} true if the bitrate has been successfully changed.
    198    */
    199   setBitrate(value) {
    200     if (!value || !this.bitrateEditable) return false;
    201     const bitrate = value === 'auto' ? this.player.voiceConnection.channel.bitrate : value;
    202     this.streams.opus.setBitrate(bitrate * 1000);
    203     return true;
    204   }
    205 
    206   /**
    207    * Sets the expected packet loss percentage if using a compatible Opus stream.
    208    * @param {number} value between 0 and 1
    209    * @returns {boolean} Returns true if it was successfully set.
    210    */
    211   setPLP(value) {
    212     if (!this.bitrateEditable) return false;
    213     this.streams.opus.setPLP(value);
    214     return true;
    215   }
    216 
    217   /**
    218    * Enables or disables forward error correction if using a compatible Opus stream.
    219    * @param {boolean} enabled true to enable
    220    * @returns {boolean} Returns true if it was successfully set.
    221    */
    222   setFEC(enabled) {
    223     if (!this.bitrateEditable) return false;
    224     this.streams.opus.setFEC(enabled);
    225     return true;
    226   }
    227 
    228   _step(done) {
    229     this._writeCallback = () => {
    230       this._writeCallback = null;
    231       done();
    232     };
    233     if (!this.streams.broadcast) {
    234       const next = FRAME_LENGTH + this.count * FRAME_LENGTH - (Date.now() - this.startTime - this._pausedTime);
    235       setTimeout(() => {
    236         if ((!this.pausedSince || this._silence) && this._writeCallback) this._writeCallback();
    237       }, next);
    238     }
    239     this._sdata.sequence++;
    240     this._sdata.timestamp += TIMESTAMP_INC;
    241     if (this._sdata.sequence >= 2 ** 16) this._sdata.sequence = 0;
    242     if (this._sdata.timestamp >= 2 ** 32) this._sdata.timestamp = 0;
    243     this.count++;
    244   }
    245 
    246   _final(callback) {
    247     this._writeCallback = null;
    248     callback();
    249   }
    250 
    251   _playChunk(chunk) {
    252     if (this.player.dispatcher !== this || !this.player.voiceConnection.authentication.secret_key) return;
    253     this._sendPacket(this._createPacket(this._sdata.sequence, this._sdata.timestamp, chunk));
    254   }
    255 
    256   _encrypt(buffer) {
    257     const { secret_key, mode } = this.player.voiceConnection.authentication;
    258     if (mode === 'xsalsa20_poly1305_lite') {
    259       this._nonce++;
    260       if (this._nonce > MAX_NONCE_SIZE) this._nonce = 0;
    261       this._nonceBuffer.writeUInt32BE(this._nonce, 0);
    262       return [secretbox.methods.close(buffer, this._nonceBuffer, secret_key), this._nonceBuffer.slice(0, 4)];
    263     } else if (mode === 'xsalsa20_poly1305_suffix') {
    264       const random = secretbox.methods.random(24);
    265       return [secretbox.methods.close(buffer, random, secret_key), random];
    266     } else {
    267       return [secretbox.methods.close(buffer, nonce, secret_key)];
    268     }
    269   }
    270 
    271   _createPacket(sequence, timestamp, buffer) {
    272     const packetBuffer = Buffer.alloc(12);
    273     packetBuffer[0] = 0x80;
    274     packetBuffer[1] = 0x78;
    275 
    276     packetBuffer.writeUIntBE(sequence, 2, 2);
    277     packetBuffer.writeUIntBE(timestamp, 4, 4);
    278     packetBuffer.writeUIntBE(this.player.voiceConnection.authentication.ssrc, 8, 4);
    279 
    280     packetBuffer.copy(nonce, 0, 0, 12);
    281     return Buffer.concat([packetBuffer, ...this._encrypt(buffer)]);
    282   }
    283 
    284   _sendPacket(packet) {
    285     /**
    286      * Emitted whenever the dispatcher has debug information.
    287      * @event StreamDispatcher#debug
    288      * @param {string} info The debug info
    289      */
    290     this._setSpeaking(1);
    291     if (!this.player.voiceConnection.sockets.udp) {
    292       this.emit('debug', 'Failed to send a packet - no UDP socket');
    293       return;
    294     }
    295     this.player.voiceConnection.sockets.udp.send(packet).catch(e => {
    296       this._setSpeaking(0);
    297       this.emit('debug', `Failed to send a packet - ${e}`);
    298     });
    299   }
    300 
    301   _setSpeaking(value) {
    302     if (typeof this.player.voiceConnection !== 'undefined') {
    303       this.player.voiceConnection.setSpeaking(value);
    304     }
    305     /**
    306      * Emitted when the dispatcher starts/stops speaking.
    307      * @event StreamDispatcher#speaking
    308      * @param {boolean} value Whether or not the dispatcher is speaking
    309      */
    310     this.emit('speaking', value);
    311   }
    312 
    313   get volumeEditable() {
    314     return Boolean(this.streams.volume);
    315   }
    316 
    317   /**
    318    * Whether or not the Opus bitrate of this stream is editable
    319    * @type {boolean}
    320    * @readonly
    321    */
    322   get bitrateEditable() {
    323     return this.streams.opus && this.streams.opus.setBitrate;
    324   }
    325 
    326   // Volume
    327   get volume() {
    328     return this.streams.volume ? this.streams.volume.volume : 1;
    329   }
    330 
    331   setVolume(value) {
    332     if (!this.streams.volume) return false;
    333     /**
    334      * Emitted when the volume of this dispatcher changes.
    335      * @event StreamDispatcher#volumeChange
    336      * @param {number} oldVolume The old volume of this dispatcher
    337      * @param {number} newVolume The new volume of this dispatcher
    338      */
    339     this.emit('volumeChange', this.volume, value);
    340     this.streams.volume.setVolume(value);
    341     return true;
    342   }
    343 
    344   // Volume stubs for docs
    345   /* eslint-disable no-empty-function*/
    346   get volumeDecibels() {}
    347   get volumeLogarithmic() {}
    348   setVolumeDecibels() {}
    349   setVolumeLogarithmic() {}
    350 }
    351 
    352 VolumeInterface.applyToClass(StreamDispatcher);
    353 
    354 module.exports = StreamDispatcher;