mirror of
https://github.com/BluemediaDev/muse.git
synced 2025-05-08 11:21:37 +02:00
Add better caching, seek command
This commit is contained in:
parent
c446e0fd57
commit
fb91c8e89c
6 changed files with 280 additions and 91 deletions
33
src/commands/seek.ts
Normal file
33
src/commands/seek.ts
Normal file
|
@ -0,0 +1,33 @@
|
|||
import {Message, TextChannel} from 'discord.js';
|
||||
import {TYPES} from '../types';
|
||||
import {inject, injectable} from 'inversify';
|
||||
import Player from '../services/player';
|
||||
import LoadingMessage from '../utils/loading-message';
|
||||
import Command from '.';
|
||||
|
||||
@injectable()
|
||||
export default class implements Command {
|
||||
public name = 'seek';
|
||||
public description = 'seeks position in currently playing song';
|
||||
private readonly player: Player;
|
||||
|
||||
constructor(@inject(TYPES.Services.Player) player: Player) {
|
||||
this.player = player;
|
||||
}
|
||||
|
||||
public async execute(msg: Message, args: string []): Promise<void> {
|
||||
const seekTime = parseInt(args[0], 10);
|
||||
|
||||
const loading = new LoadingMessage(msg.channel as TextChannel, 'hold on a sec');
|
||||
|
||||
await loading.start();
|
||||
|
||||
try {
|
||||
await this.player.seek(msg.guild!.id, seekTime);
|
||||
|
||||
await loading.stop('seeked');
|
||||
} catch (_) {
|
||||
await loading.stop('error somewhere');
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import Command from './commands';
|
|||
import Config from './commands/config';
|
||||
import Play from './commands/play';
|
||||
import QueueCommad from './commands/queue';
|
||||
import Seek from './commands/seek';
|
||||
|
||||
let container = new Container();
|
||||
|
||||
|
@ -39,6 +40,7 @@ container.bind<Queue>(TYPES.Services.Queue).to(Queue).inSingletonScope();
|
|||
container.bind<Command>(TYPES.Command).to(Config).inSingletonScope();
|
||||
container.bind<Command>(TYPES.Command).to(Play).inSingletonScope();
|
||||
container.bind<Command>(TYPES.Command).to(QueueCommad).inSingletonScope();
|
||||
container.bind<Command>(TYPES.Command).to(Seek).inSingletonScope();
|
||||
|
||||
// Config values
|
||||
container.bind<string>(TYPES.Config.DISCORD_TOKEN).toConstantValue(DISCORD_TOKEN);
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
import {inject, injectable} from 'inversify';
|
||||
import {VoiceConnection, VoiceChannel} from 'discord.js';
|
||||
import {promises as fs, createWriteStream} from 'fs';
|
||||
import {Readable} from 'stream';
|
||||
import path from 'path';
|
||||
import hasha from 'hasha';
|
||||
import ytdl from 'ytdl-core';
|
||||
import {WriteStream} from 'fs-capacitor';
|
||||
import prism from 'prism-media';
|
||||
import {TYPES} from '../types';
|
||||
import Queue from './queue';
|
||||
import getYouTubeStream from '../utils/get-youtube-stream';
|
||||
import Queue, {QueuedSong} from './queue';
|
||||
|
||||
export enum Status {
|
||||
Playing,
|
||||
|
@ -19,9 +25,11 @@ export interface GuildPlayer {
|
|||
export default class {
|
||||
private readonly guildPlayers = new Map<string, GuildPlayer>();
|
||||
private readonly queue: Queue;
|
||||
private readonly cacheDir: string;
|
||||
|
||||
constructor(@inject(TYPES.Services.Queue) queue: Queue) {
|
||||
constructor(@inject(TYPES.Services.Queue) queue: Queue, @inject(TYPES.Config.CACHE_DIR) cacheDir: string) {
|
||||
this.queue = queue;
|
||||
this.cacheDir = cacheDir;
|
||||
}
|
||||
|
||||
async connect(guildId: string, channel: VoiceChannel): Promise<void> {
|
||||
|
@ -46,6 +54,23 @@ export default class {
|
|||
}
|
||||
}
|
||||
|
||||
async seek(guildId: string, positionSeconds: number): Promise<void> {
|
||||
const guildPlayer = this.get(guildId);
|
||||
if (guildPlayer.voiceConnection === null) {
|
||||
throw new Error('Not connected to a voice channel.');
|
||||
}
|
||||
|
||||
const currentSong = this.getCurrentSong(guildId);
|
||||
|
||||
if (!currentSong) {
|
||||
throw new Error('No song currently playing');
|
||||
}
|
||||
|
||||
await this.waitForCache(currentSong.url);
|
||||
|
||||
guildPlayer.voiceConnection.play(this.getCachedPath(currentSong.url), {seek: positionSeconds});
|
||||
}
|
||||
|
||||
async play(guildId: string): Promise<void> {
|
||||
const guildPlayer = this.get(guildId);
|
||||
if (guildPlayer.voiceConnection === null) {
|
||||
|
@ -57,17 +82,18 @@ export default class {
|
|||
return;
|
||||
}
|
||||
|
||||
const songs = this.queue.get(guildId);
|
||||
const currentSong = this.getCurrentSong(guildId);
|
||||
|
||||
if (songs.length === 0) {
|
||||
if (!currentSong) {
|
||||
throw new Error('Queue empty.');
|
||||
}
|
||||
|
||||
const song = songs[0];
|
||||
|
||||
const stream = await getYouTubeStream(song.url);
|
||||
|
||||
this.get(guildId).voiceConnection!.play(stream, {type: 'webm/opus'});
|
||||
if (await this.isCached(currentSong.url)) {
|
||||
this.get(guildId).voiceConnection!.play(this.getCachedPath(currentSong.url));
|
||||
} else {
|
||||
const stream = await this.getStream(currentSong.url);
|
||||
this.get(guildId).voiceConnection!.play(stream, {type: 'webm/opus'});
|
||||
}
|
||||
|
||||
guildPlayer.status = Status.Playing;
|
||||
|
||||
|
@ -80,9 +106,136 @@ export default class {
|
|||
return this.guildPlayers.get(guildId) as GuildPlayer;
|
||||
}
|
||||
|
||||
private getCurrentSong(guildId: string): QueuedSong|null {
|
||||
const songs = this.queue.get(guildId);
|
||||
|
||||
if (songs.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return songs[0];
|
||||
}
|
||||
|
||||
private initGuild(guildId: string): void {
|
||||
if (!this.guildPlayers.get(guildId)) {
|
||||
this.guildPlayers.set(guildId, {status: Status.Disconnected, voiceConnection: null});
|
||||
}
|
||||
}
|
||||
|
||||
private getCachedPath(url: string): string {
|
||||
const hash = hasha(url);
|
||||
return path.join(this.cacheDir, `${hash}.webm`);
|
||||
}
|
||||
|
||||
private getCachedPathTemp(url: string): string {
|
||||
const hash = hasha(url);
|
||||
|
||||
return path.join('/tmp', `${hash}.webm`);
|
||||
}
|
||||
|
||||
private async isCached(url: string): Promise<boolean> {
|
||||
try {
|
||||
await fs.access(this.getCachedPath(url));
|
||||
|
||||
return true;
|
||||
} catch (_) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private async waitForCache(url: string, maxRetries = 50, retryDelay = 500): Promise<void> {
|
||||
// eslint-disable-next-line no-async-promise-executor
|
||||
return new Promise(async (resolve, reject) => {
|
||||
if (await this.isCached(url)) {
|
||||
resolve();
|
||||
} else {
|
||||
let nOfChecks = 0;
|
||||
|
||||
const cachedCheck = setInterval(async () => {
|
||||
if (await this.isCached(url)) {
|
||||
clearInterval(cachedCheck);
|
||||
resolve();
|
||||
} else {
|
||||
nOfChecks++;
|
||||
|
||||
if (nOfChecks > maxRetries) {
|
||||
clearInterval(cachedCheck);
|
||||
reject(new Error('Timed out waiting for file to become cached.'));
|
||||
}
|
||||
}
|
||||
}, retryDelay);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async getStream(url: string): Promise<Readable|string> {
|
||||
const cachedPath = this.getCachedPath(url);
|
||||
|
||||
if (await this.isCached(url)) {
|
||||
return cachedPath;
|
||||
}
|
||||
|
||||
// Not yet cached, must download
|
||||
const info = await ytdl.getInfo(url);
|
||||
|
||||
const {formats} = info;
|
||||
|
||||
const filter = (format: ytdl.videoFormat): boolean => format.codecs === 'opus' && format.container === 'webm' && format.audioSampleRate !== undefined && parseInt(format.audioSampleRate, 10) === 48000;
|
||||
|
||||
let format = formats.find(filter);
|
||||
let canDirectPlay = true;
|
||||
|
||||
const nextBestFormat = (formats: ytdl.videoFormat[]): ytdl.videoFormat => {
|
||||
formats = formats
|
||||
.filter(format => format.averageBitrate)
|
||||
.sort((a, b) => b.averageBitrate - a.averageBitrate);
|
||||
return formats.find(format => !format.bitrate) ?? formats[0];
|
||||
};
|
||||
|
||||
if (!format) {
|
||||
format = nextBestFormat(info.formats);
|
||||
canDirectPlay = false;
|
||||
}
|
||||
|
||||
const cacheTempPath = this.getCachedPathTemp(url);
|
||||
const cacheStream = createWriteStream(cacheTempPath);
|
||||
|
||||
cacheStream.on('finish', async () => {
|
||||
await fs.rename(cacheTempPath, cachedPath);
|
||||
});
|
||||
|
||||
let youtubeStream: Readable;
|
||||
|
||||
if (canDirectPlay) {
|
||||
youtubeStream = ytdl.downloadFromInfo(info, {format});
|
||||
} else {
|
||||
youtubeStream = new prism.FFmpeg({
|
||||
args: [
|
||||
'-reconnect',
|
||||
'1',
|
||||
'-reconnect_streamed',
|
||||
'1',
|
||||
'-reconnect_delay_max',
|
||||
'5',
|
||||
'-i',
|
||||
format.url,
|
||||
'-loglevel',
|
||||
'verbose',
|
||||
'-vn',
|
||||
'-acodec',
|
||||
'libopus',
|
||||
'-f',
|
||||
'webm'
|
||||
]
|
||||
});
|
||||
}
|
||||
|
||||
const capacitor = new WriteStream();
|
||||
|
||||
youtubeStream.pipe(capacitor);
|
||||
|
||||
capacitor.createReadStream().pipe(cacheStream);
|
||||
|
||||
return capacitor.createReadStream();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,78 +0,0 @@
|
|||
import {promises as fs, createReadStream, createWriteStream} from 'fs';
|
||||
import {Readable, PassThrough} from 'stream';
|
||||
import path from 'path';
|
||||
import hasha from 'hasha';
|
||||
import ytdl from 'ytdl-core';
|
||||
import prism from 'prism-media';
|
||||
import {CACHE_DIR} from './config';
|
||||
|
||||
const nextBestFormat = (formats: ytdl.videoFormat[]): ytdl.videoFormat => {
|
||||
formats = formats
|
||||
.filter(format => format.averageBitrate)
|
||||
.sort((a, b) => b.averageBitrate - a.averageBitrate);
|
||||
return formats.find(format => !format.bitrate) ?? formats[0];
|
||||
};
|
||||
|
||||
// TODO: are some videos not available in webm/opus?
|
||||
export default async (url: string): Promise<Readable> => {
|
||||
const hash = hasha(url);
|
||||
const cachedPath = path.join(CACHE_DIR, `${hash}.webm`);
|
||||
|
||||
const info = await ytdl.getInfo(url);
|
||||
|
||||
const {formats} = info;
|
||||
|
||||
const filter = (format: ytdl.videoFormat): boolean => format.codecs === 'opus' && format.container === 'webm' && format.audioSampleRate !== undefined && parseInt(format.audioSampleRate, 10) === 48000;
|
||||
|
||||
let format = formats.find(filter);
|
||||
let canDirectPlay = true;
|
||||
|
||||
if (!format) {
|
||||
format = nextBestFormat(info.formats);
|
||||
canDirectPlay = false;
|
||||
}
|
||||
|
||||
try {
|
||||
// Test if file exists
|
||||
await fs.access(cachedPath);
|
||||
|
||||
// If so, return cached stream
|
||||
return createReadStream(cachedPath);
|
||||
} catch (_) {
|
||||
// Not yet cached, must download
|
||||
const cacheTempPath = path.join('/tmp', `${hash}.webm`);
|
||||
const cacheStream = createWriteStream(cacheTempPath);
|
||||
|
||||
const pass = new PassThrough();
|
||||
|
||||
pass.pipe(cacheStream).on('finish', async () => {
|
||||
await fs.rename(cacheTempPath, cachedPath);
|
||||
});
|
||||
|
||||
if (canDirectPlay) {
|
||||
return ytdl.downloadFromInfo(info, {format}).pipe(pass);
|
||||
}
|
||||
|
||||
const transcoder = new prism.FFmpeg({
|
||||
args: [
|
||||
'-reconnect',
|
||||
'1',
|
||||
'-reconnect_streamed',
|
||||
'1',
|
||||
'-reconnect_delay_max',
|
||||
'5',
|
||||
'-i',
|
||||
format.url,
|
||||
'-loglevel',
|
||||
'verbose',
|
||||
'-vn',
|
||||
'-acodec',
|
||||
'libopus',
|
||||
'-f',
|
||||
'webm'
|
||||
]
|
||||
});
|
||||
|
||||
return transcoder.pipe(pass);
|
||||
}
|
||||
};
|
Loading…
Add table
Add a link
Reference in a new issue