import {VoiceChannel, Snowflake} from 'discord.js'; import {Readable} from 'stream'; import hasha from 'hasha'; import ytdl from 'ytdl-core'; import {WriteStream} from 'fs-capacitor'; import ffmpeg from 'fluent-ffmpeg'; import shuffle from 'array-shuffle'; import { AudioPlayer, AudioPlayerState, AudioPlayerStatus, createAudioPlayer, createAudioResource, joinVoiceChannel, StreamType, VoiceConnection, VoiceConnectionStatus, } from '@discordjs/voice'; import FileCacheProvider from './file-cache.js'; import debug from '../utils/debug.js'; import {prisma} from '../utils/db.js'; export enum MediaSource { Youtube, HLS, } export interface QueuedPlaylist { title: string; source: string; } export interface SongMetadata { title: string; artist: string; url: string; length: number; offset: number; playlist: QueuedPlaylist | null; isLive: boolean; thumbnailUrl: string | null; source: MediaSource; } export interface QueuedSong extends SongMetadata { addedInChannelId: Snowflake; requestedBy: string; } export enum STATUS { PLAYING, PAUSED, IDLE, } export interface PlayerEvents { statusChange: (oldStatus: STATUS, newStatus: STATUS) => void; } export default class { public voiceConnection: VoiceConnection | null = null; public status = STATUS.PAUSED; public guildId: string; private queue: QueuedSong[] = []; private queuePosition = 0; private audioPlayer: AudioPlayer | null = null; private nowPlaying: QueuedSong | null = null; private playPositionInterval: NodeJS.Timeout | undefined; private lastSongURL = ''; private positionInSeconds = 0; private readonly fileCache: FileCacheProvider; private disconnectTimer: NodeJS.Timeout | null = null; constructor(fileCache: FileCacheProvider, guildId: string) { this.fileCache = fileCache; this.guildId = guildId; } async connect(channel: VoiceChannel): Promise { this.voiceConnection = joinVoiceChannel({ channelId: channel.id, guildId: channel.guild.id, adapterCreator: channel.guild.voiceAdapterCreator, }); } disconnect(): void { if (this.voiceConnection) { if (this.status === STATUS.PLAYING) { this.pause(); } this.voiceConnection.destroy(); this.audioPlayer?.stop(); this.voiceConnection = null; this.audioPlayer = null; } } async seek(positionSeconds: number): Promise { this.status = STATUS.PAUSED; if (this.voiceConnection === null) { throw new Error('Not connected to a voice channel.'); } const currentSong = this.getCurrent(); if (!currentSong) { throw new Error('No song currently playing'); } if (positionSeconds > currentSong.length) { throw new Error('Seek position is outside the range of the song.'); } let realPositionSeconds = positionSeconds; let to: number | undefined; if (currentSong.offset !== undefined) { realPositionSeconds += currentSong.offset; to = currentSong.length + currentSong.offset; } const stream = await this.getStream(currentSong, {seek: realPositionSeconds, to}); this.audioPlayer = createAudioPlayer({ behaviors: { // Needs to be somewhat high for livestreams maxMissedFrames: 50, }, }); this.voiceConnection.subscribe(this.audioPlayer); this.audioPlayer.play(createAudioResource(stream, { inputType: StreamType.WebmOpus, })); this.attachListeners(); this.startTrackingPosition(positionSeconds); this.status = STATUS.PLAYING; } async forwardSeek(positionSeconds: number): Promise { return this.seek(this.positionInSeconds + positionSeconds); } getPosition(): number { return this.positionInSeconds; } async play(): Promise { if (this.voiceConnection === null) { throw new Error('Not connected to a voice channel.'); } const currentSong = this.getCurrent(); if (!currentSong) { throw new Error('Queue empty.'); } // Cancel any pending idle disconnection if (this.disconnectTimer) { clearInterval(this.disconnectTimer); this.disconnectTimer = null; } // Resume from paused state if (this.status === STATUS.PAUSED && currentSong.url === this.nowPlaying?.url) { if (this.audioPlayer) { this.audioPlayer.unpause(); this.status = STATUS.PLAYING; this.startTrackingPosition(); return; } // Was disconnected, need to recreate stream if (!currentSong.isLive) { return this.seek(this.getPosition()); } } try { let positionSeconds: number | undefined; let to: number | undefined; if (currentSong.offset !== undefined) { positionSeconds = currentSong.offset; to = currentSong.length + currentSong.offset; } const stream = await this.getStream(currentSong, {seek: positionSeconds, to}); this.audioPlayer = createAudioPlayer({ behaviors: { // Needs to be somewhat high for livestreams maxMissedFrames: 50, }, }); this.voiceConnection.subscribe(this.audioPlayer); const resource = createAudioResource(stream, { inputType: StreamType.WebmOpus, }); this.audioPlayer.play(resource); this.attachListeners(); this.status = STATUS.PLAYING; this.nowPlaying = currentSong; if (currentSong.url === this.lastSongURL) { this.startTrackingPosition(); } else { // Reset position counter this.startTrackingPosition(0); this.lastSongURL = currentSong.url; } } catch (error: unknown) { await this.forward(1); if ((error as {statusCode: number}).statusCode === 410 && currentSong) { const channelId = currentSong.addedInChannelId; if (channelId) { debug(`${currentSong.title} is unavailable`); return; } } throw error; } } pause(): void { if (this.status !== STATUS.PLAYING) { throw new Error('Not currently playing.'); } this.status = STATUS.PAUSED; if (this.audioPlayer) { this.audioPlayer.pause(); } this.stopTrackingPosition(); } async forward(skip: number): Promise { this.manualForward(skip); try { if (this.getCurrent() && this.status !== STATUS.PAUSED) { await this.play(); } else { this.audioPlayer?.stop(); this.status = STATUS.IDLE; const settings = await prisma.setting.findUnique({where: {guildId: this.guildId}}); if (!settings) { throw new Error('Could not find settings for guild'); } const {secondsToWaitAfterQueueEmpties} = settings; if (secondsToWaitAfterQueueEmpties !== 0) { this.disconnectTimer = setTimeout(() => { // Make sure we are not accidentally playing // when disconnecting if (this.status === STATUS.IDLE) { this.disconnect(); } }, secondsToWaitAfterQueueEmpties * 1000); } } } catch (error: unknown) { this.queuePosition--; throw error; } } canGoForward(skip: number) { return (this.queuePosition + skip - 1) < this.queue.length; } manualForward(skip: number): void { if (this.canGoForward(skip)) { this.queuePosition += skip; this.positionInSeconds = 0; this.stopTrackingPosition(); } else { throw new Error('No songs in queue to forward to.'); } } canGoBack() { return this.queuePosition - 1 >= 0; } async back(): Promise { if (this.canGoBack()) { this.queuePosition--; this.positionInSeconds = 0; this.stopTrackingPosition(); if (this.status !== STATUS.PAUSED) { await this.play(); } } else { throw new Error('No songs in queue to go back to.'); } } getCurrent(): QueuedSong | null { if (this.queue[this.queuePosition]) { return this.queue[this.queuePosition]; } return null; } /** * Returns queue, not including the current song. * @returns {QueuedSong[]} */ getQueue(): QueuedSong[] { return this.queue.slice(this.queuePosition + 1); } add(song: QueuedSong, {immediate = false} = {}): void { if (song.playlist || !immediate) { // Add to end of queue this.queue.push(song); } else { // Add as the next song to be played const insertAt = this.queuePosition + 1; this.queue = [...this.queue.slice(0, insertAt), song, ...this.queue.slice(insertAt)]; } } shuffle(): void { const shuffledSongs = shuffle(this.queue.slice(this.queuePosition + 1)); this.queue = [...this.queue.slice(0, this.queuePosition + 1), ...shuffledSongs]; } clear(): void { const newQueue = []; // Don't clear curently playing song const current = this.getCurrent(); if (current) { newQueue.push(current); } this.queuePosition = 0; this.queue = newQueue; } removeFromQueue(index: number, amount = 1): void { this.queue.splice(this.queuePosition + index, amount); } removeCurrent(): void { this.queue = [...this.queue.slice(0, this.queuePosition), ...this.queue.slice(this.queuePosition + 1)]; } queueSize(): number { return this.getQueue().length; } isQueueEmpty(): boolean { return this.queueSize() === 0; } stop(): void { this.disconnect(); this.queuePosition = 0; this.queue = []; } private getHashForCache(url: string): string { return hasha(url); } private async getStream(song: QueuedSong, options: {seek?: number; to?: number} = {}): Promise { if (song.source === MediaSource.HLS) { return this.createReadStream(song.url); } let ffmpegInput = ''; const ffmpegInputOptions: string[] = []; let shouldCacheVideo = false; let format: ytdl.videoFormat | undefined; try { ffmpegInput = await this.fileCache.getPathFor(this.getHashForCache(song.url)); if (options.seek) { ffmpegInputOptions.push('-ss', options.seek.toString()); } if (options.to) { ffmpegInputOptions.push('-to', options.to.toString()); } } catch { // Not yet cached, must download const info = await ytdl.getInfo(song.url); const {formats} = info; const filter = (format: ytdl.videoFormat): boolean => format.codecs === 'opus' && format.container === 'webm' && format.audioSampleRate !== undefined && parseInt(format.audioSampleRate, 10) === 48000; format = formats.find(filter); const nextBestFormat = (formats: ytdl.videoFormat[]): ytdl.videoFormat | undefined => { if (formats[0].isLive) { formats = formats.sort((a, b) => (b as unknown as {audioBitrate: number}).audioBitrate - (a as unknown as {audioBitrate: number}).audioBitrate); // Bad typings return formats.find(format => [128, 127, 120, 96, 95, 94, 93].includes(parseInt(format.itag as unknown as string, 10))); // Bad typings } formats = formats .filter(format => format.averageBitrate) .sort((a, b) => { if (a && b) { return b.averageBitrate! - a.averageBitrate!; } return 0; }); return formats.find(format => !format.bitrate) ?? formats[0]; }; if (!format) { format = nextBestFormat(info.formats); if (!format) { // If still no format is found, throw throw new Error('Can\'t find suitable format.'); } } ffmpegInput = format.url; // Don't cache livestreams or long videos const MAX_CACHE_LENGTH_SECONDS = 30 * 60; // 30 minutes shouldCacheVideo = !info.player_response.videoDetails.isLiveContent && parseInt(info.videoDetails.lengthSeconds, 10) < MAX_CACHE_LENGTH_SECONDS && !options.seek && !options.to; ffmpegInputOptions.push(...[ '-reconnect', '1', '-reconnect_streamed', '1', '-reconnect_delay_max', '5', ]); if (options.seek) { ffmpegInputOptions.push('-ss', options.seek.toString()); } if (options.to) { ffmpegInputOptions.push('-to', options.to.toString()); } } return this.createReadStream(ffmpegInput, {ffmpegInputOptions, cache: shouldCacheVideo}); } private startTrackingPosition(initalPosition?: number): void { if (initalPosition !== undefined) { this.positionInSeconds = initalPosition; } if (this.playPositionInterval) { clearInterval(this.playPositionInterval); } this.playPositionInterval = setInterval(() => { this.positionInSeconds++; }, 1000); } private stopTrackingPosition(): void { if (this.playPositionInterval) { clearInterval(this.playPositionInterval); } } private attachListeners(): void { if (!this.voiceConnection) { return; } if (this.voiceConnection.listeners(VoiceConnectionStatus.Disconnected).length === 0) { this.voiceConnection.on(VoiceConnectionStatus.Disconnected, this.onVoiceConnectionDisconnect.bind(this)); } if (!this.audioPlayer) { return; } if (this.audioPlayer.listeners('stateChange').length === 0) { this.audioPlayer.on(AudioPlayerStatus.Idle, this.onAudioPlayerIdle.bind(this)); } } private onVoiceConnectionDisconnect(): void { this.disconnect(); } private async onAudioPlayerIdle(_oldState: AudioPlayerState, newState: AudioPlayerState): Promise { // Automatically advance queued song at end if (newState.status === AudioPlayerStatus.Idle && this.status === STATUS.PLAYING) { await this.forward(1); } } private async createReadStream(url: string, options: {ffmpegInputOptions?: string[]; cache?: boolean} = {}): Promise { return new Promise((resolve, reject) => { const capacitor = new WriteStream(); if (options?.cache) { const cacheStream = this.fileCache.createWriteStream(this.getHashForCache(url)); capacitor.createReadStream().pipe(cacheStream); } const returnedStream = capacitor.createReadStream(); let hasReturnedStreamClosed = false; const stream = ffmpeg(url) .inputOptions(options?.ffmpegInputOptions ?? ['-re']) .noVideo() .audioCodec('libopus') .outputFormat('webm') .on('error', error => { if (!hasReturnedStreamClosed) { reject(error); } }) .on('start', command => { debug(`Spawned ffmpeg with ${command as string}`); }); stream.pipe(capacitor); returnedStream.on('close', () => { stream.kill('SIGKILL'); hasReturnedStreamClosed = true; }); resolve(returnedStream); }); } move(from: number, to: number): void { if (from > this.queueSize() || to > this.queueSize()){ throw new Error('Move index is outside the range of the queue.' ) } this.queue.splice( this.queuePosition + to ,0, this.queue.splice(this.queuePosition + from, 1)[0]); } }