StreamDispatcher.js (10418B)
1 'use strict'; 2 3 const { Writable } = require('stream'); 4 const secretbox = require('../util/Secretbox'); 5 const Silence = require('../util/Silence'); 6 const VolumeInterface = require('../util/VolumeInterface'); 7 8 const FRAME_LENGTH = 20; 9 const CHANNELS = 2; 10 const TIMESTAMP_INC = (48000 / 100) * CHANNELS; 11 12 const MAX_NONCE_SIZE = 2 ** 32 - 1; 13 const nonce = Buffer.alloc(24); 14 15 /** 16 * @external WritableStream 17 * @see {@link https://nodejs.org/api/stream.html#stream_class_stream_writable} 18 */ 19 20 /** 21 * The class that sends voice packet data to the voice connection. 22 * ```js 23 * // Obtained using: 24 * voiceChannel.join().then(connection => { 25 * // You can play a file or a stream here: 26 * const dispatcher = connection.play('/home/hydrabolt/audio.mp3'); 27 * }); 28 * ``` 29 * @implements {VolumeInterface} 30 * @extends {WritableStream} 31 */ 32 class StreamDispatcher extends Writable { 33 constructor(player, { seek = 0, volume = 1, fec, plp, bitrate = 96, highWaterMark = 12 } = {}, streams) { 34 const streamOptions = { seek, volume, fec, plp, bitrate, highWaterMark }; 35 super(streamOptions); 36 /** 37 * The Audio Player that controls this dispatcher 38 * @type {AudioPlayer} 39 */ 40 this.player = player; 41 this.streamOptions = streamOptions; 42 this.streams = streams; 43 this.streams.silence = new Silence(); 44 45 this._nonce = 0; 46 this._nonceBuffer = Buffer.alloc(24); 47 48 /** 49 * The time that the stream was paused at (null if not paused) 50 * @type {?number} 51 */ 52 this.pausedSince = null; 53 this._writeCallback = null; 54 55 /** 56 * The broadcast controlling this dispatcher, if any 57 * @type {?VoiceBroadcast} 58 */ 59 this.broadcast = this.streams.broadcast; 60 61 this._pausedTime = 0; 62 this._silentPausedTime = 0; 63 this.count = 0; 64 65 this.on('finish', () => { 66 this._cleanup(); 67 this._setSpeaking(0); 68 }); 69 70 this.setVolume(volume); 71 this.setBitrate(bitrate); 72 if (typeof fec !== 'undefined') this.setFEC(fec); 73 if (typeof plp !== 'undefined') this.setPLP(plp); 74 75 const streamError = (type, err) => { 76 /** 77 * Emitted when the dispatcher encounters an error. 78 * @event StreamDispatcher#error 79 */ 80 if (type && err) { 81 err.message = `${type} stream: ${err.message}`; 82 this.emit(this.player.dispatcher === this ? 'error' : 'debug', err); 83 } 84 this.destroy(); 85 }; 86 87 this.on('error', () => streamError()); 88 if (this.streams.input) this.streams.input.on('error', err => streamError('input', err)); 89 if (this.streams.ffmpeg) this.streams.ffmpeg.on('error', err => streamError('ffmpeg', err)); 90 if (this.streams.opus) this.streams.opus.on('error', err => streamError('opus', err)); 91 if (this.streams.volume) this.streams.volume.on('error', err => streamError('volume', err)); 92 } 93 94 get _sdata() { 95 return this.player.streamingData; 96 } 97 98 _write(chunk, enc, done) { 99 if (!this.startTime) { 100 /** 101 * Emitted once the stream has started to play. 102 * @event StreamDispatcher#start 103 */ 104 this.emit('start'); 105 this.startTime = Date.now(); 106 } 107 this._playChunk(chunk); 108 this._step(done); 109 } 110 111 _destroy(err, cb) { 112 this._cleanup(); 113 super._destroy(err, cb); 114 } 115 116 _cleanup() { 117 if (this.player.dispatcher === this) this.player.dispatcher = null; 118 const { streams } = this; 119 if (streams.broadcast) streams.broadcast.delete(this); 120 if (streams.opus) streams.opus.destroy(); 121 if (streams.ffmpeg) streams.ffmpeg.destroy(); 122 } 123 124 /** 125 * Pauses playback 126 * @param {boolean} [silence=false] Whether to play silence while paused to prevent audio glitches 127 */ 128 pause(silence = false) { 129 if (this.paused) return; 130 if (this.streams.opus) this.streams.opus.unpipe(this); 131 if (silence) { 132 this.streams.silence.pipe(this); 133 this._silence = true; 134 } else { 135 this._setSpeaking(0); 136 } 137 this.pausedSince = Date.now(); 138 } 139 140 /** 141 * Whether or not playback is paused 142 * @type {boolean} 143 * @readonly 144 */ 145 get paused() { 146 return Boolean(this.pausedSince); 147 } 148 149 /** 150 * Total time that this dispatcher has been paused in milliseconds 151 * @type {number} 152 * @readonly 153 */ 154 get pausedTime() { 155 return this._silentPausedTime + this._pausedTime + (this.paused ? Date.now() - this.pausedSince : 0); 156 } 157 158 /** 159 * Resumes playback 160 */ 161 resume() { 162 if (!this.pausedSince) return; 163 this.streams.silence.unpipe(this); 164 if (this.streams.opus) this.streams.opus.pipe(this); 165 if (this._silence) { 166 this._silentPausedTime += Date.now() - this.pausedSince; 167 this._silence = false; 168 } else { 169 this._pausedTime += Date.now() - this.pausedSince; 170 } 171 this.pausedSince = null; 172 if (typeof this._writeCallback === 'function') this._writeCallback(); 173 } 174 175 /** 176 * The time (in milliseconds) that the dispatcher has actually been playing audio for 177 * @type {number} 178 * @readonly 179 */ 180 get streamTime() { 181 return this.count * FRAME_LENGTH; 182 } 183 184 /** 185 * The time (in milliseconds) that the dispatcher has been playing audio for, taking into account skips and pauses 186 * @type {number} 187 * @readonly 188 */ 189 get totalStreamTime() { 190 return Date.now() - this.startTime; 191 } 192 193 /** 194 * Set the bitrate of the current Opus encoder if using a compatible Opus stream. 195 * @param {number} value New bitrate, in kbps 196 * If set to 'auto', the voice channel's bitrate will be used 197 * @returns {boolean} true if the bitrate has been successfully changed. 198 */ 199 setBitrate(value) { 200 if (!value || !this.bitrateEditable) return false; 201 const bitrate = value === 'auto' ? this.player.voiceConnection.channel.bitrate : value; 202 this.streams.opus.setBitrate(bitrate * 1000); 203 return true; 204 } 205 206 /** 207 * Sets the expected packet loss percentage if using a compatible Opus stream. 208 * @param {number} value between 0 and 1 209 * @returns {boolean} Returns true if it was successfully set. 210 */ 211 setPLP(value) { 212 if (!this.bitrateEditable) return false; 213 this.streams.opus.setPLP(value); 214 return true; 215 } 216 217 /** 218 * Enables or disables forward error correction if using a compatible Opus stream. 219 * @param {boolean} enabled true to enable 220 * @returns {boolean} Returns true if it was successfully set. 221 */ 222 setFEC(enabled) { 223 if (!this.bitrateEditable) return false; 224 this.streams.opus.setFEC(enabled); 225 return true; 226 } 227 228 _step(done) { 229 this._writeCallback = () => { 230 this._writeCallback = null; 231 done(); 232 }; 233 if (!this.streams.broadcast) { 234 const next = FRAME_LENGTH + this.count * FRAME_LENGTH - (Date.now() - this.startTime - this._pausedTime); 235 setTimeout(() => { 236 if ((!this.pausedSince || this._silence) && this._writeCallback) this._writeCallback(); 237 }, next); 238 } 239 this._sdata.sequence++; 240 this._sdata.timestamp += TIMESTAMP_INC; 241 if (this._sdata.sequence >= 2 ** 16) this._sdata.sequence = 0; 242 if (this._sdata.timestamp >= 2 ** 32) this._sdata.timestamp = 0; 243 this.count++; 244 } 245 246 _final(callback) { 247 this._writeCallback = null; 248 callback(); 249 } 250 251 _playChunk(chunk) { 252 if (this.player.dispatcher !== this || !this.player.voiceConnection.authentication.secret_key) return; 253 this._sendPacket(this._createPacket(this._sdata.sequence, this._sdata.timestamp, chunk)); 254 } 255 256 _encrypt(buffer) { 257 const { secret_key, mode } = this.player.voiceConnection.authentication; 258 if (mode === 'xsalsa20_poly1305_lite') { 259 this._nonce++; 260 if (this._nonce > MAX_NONCE_SIZE) this._nonce = 0; 261 this._nonceBuffer.writeUInt32BE(this._nonce, 0); 262 return [secretbox.methods.close(buffer, this._nonceBuffer, secret_key), this._nonceBuffer.slice(0, 4)]; 263 } else if (mode === 'xsalsa20_poly1305_suffix') { 264 const random = secretbox.methods.random(24); 265 return [secretbox.methods.close(buffer, random, secret_key), random]; 266 } else { 267 return [secretbox.methods.close(buffer, nonce, secret_key)]; 268 } 269 } 270 271 _createPacket(sequence, timestamp, buffer) { 272 const packetBuffer = Buffer.alloc(12); 273 packetBuffer[0] = 0x80; 274 packetBuffer[1] = 0x78; 275 276 packetBuffer.writeUIntBE(sequence, 2, 2); 277 packetBuffer.writeUIntBE(timestamp, 4, 4); 278 packetBuffer.writeUIntBE(this.player.voiceConnection.authentication.ssrc, 8, 4); 279 280 packetBuffer.copy(nonce, 0, 0, 12); 281 return Buffer.concat([packetBuffer, ...this._encrypt(buffer)]); 282 } 283 284 _sendPacket(packet) { 285 /** 286 * Emitted whenever the dispatcher has debug information. 287 * @event StreamDispatcher#debug 288 * @param {string} info The debug info 289 */ 290 this._setSpeaking(1); 291 if (!this.player.voiceConnection.sockets.udp) { 292 this.emit('debug', 'Failed to send a packet - no UDP socket'); 293 return; 294 } 295 this.player.voiceConnection.sockets.udp.send(packet).catch(e => { 296 this._setSpeaking(0); 297 this.emit('debug', `Failed to send a packet - ${e}`); 298 }); 299 } 300 301 _setSpeaking(value) { 302 if (typeof this.player.voiceConnection !== 'undefined') { 303 this.player.voiceConnection.setSpeaking(value); 304 } 305 /** 306 * Emitted when the dispatcher starts/stops speaking. 307 * @event StreamDispatcher#speaking 308 * @param {boolean} value Whether or not the dispatcher is speaking 309 */ 310 this.emit('speaking', value); 311 } 312 313 get volumeEditable() { 314 return Boolean(this.streams.volume); 315 } 316 317 /** 318 * Whether or not the Opus bitrate of this stream is editable 319 * @type {boolean} 320 * @readonly 321 */ 322 get bitrateEditable() { 323 return this.streams.opus && this.streams.opus.setBitrate; 324 } 325 326 // Volume 327 get volume() { 328 return this.streams.volume ? this.streams.volume.volume : 1; 329 } 330 331 setVolume(value) { 332 if (!this.streams.volume) return false; 333 /** 334 * Emitted when the volume of this dispatcher changes. 335 * @event StreamDispatcher#volumeChange 336 * @param {number} oldVolume The old volume of this dispatcher 337 * @param {number} newVolume The new volume of this dispatcher 338 */ 339 this.emit('volumeChange', this.volume, value); 340 this.streams.volume.setVolume(value); 341 return true; 342 } 343 344 // Volume stubs for docs 345 /* eslint-disable no-empty-function*/ 346 get volumeDecibels() {} 347 get volumeLogarithmic() {} 348 setVolumeDecibels() {} 349 setVolumeLogarithmic() {} 350 } 351 352 VolumeInterface.applyToClass(StreamDispatcher); 353 354 module.exports = StreamDispatcher;