Merge branch 'master' into playlist-limit-config

This commit is contained in:
Thongrapee Panyapatiphan 2021-12-09 17:55:29 +07:00
commit d8086be5cf
No known key found for this signature in database
GPG key ID: 4B08AEC7F50F1967
31 changed files with 1214 additions and 753 deletions

View file

@ -1,6 +1,8 @@
import dotenv from 'dotenv';
import {injectable} from 'inversify';
import path from 'path';
import xbytes from 'xbytes';
import {ConditionalKeys} from 'type-fest';
dotenv.config();
export const DATA_DIR = path.resolve(process.env.DATA_DIR ? process.env.DATA_DIR : './data');
@ -12,6 +14,7 @@ const CONFIG_MAP = {
SPOTIFY_CLIENT_SECRET: process.env.SPOTIFY_CLIENT_SECRET,
DATA_DIR,
CACHE_DIR: path.join(DATA_DIR, 'cache'),
CACHE_LIMIT_IN_BYTES: xbytes.parseSize(process.env.CACHE_LIMIT ?? '2GB'),
} as const;
@injectable()
@ -22,6 +25,7 @@ export default class Config {
readonly SPOTIFY_CLIENT_SECRET!: string;
readonly DATA_DIR!: string;
readonly CACHE_DIR!: string;
readonly CACHE_LIMIT_IN_BYTES!: number;
constructor() {
for (const [key, value] of Object.entries(CONFIG_MAP)) {
@ -30,7 +34,13 @@ export default class Config {
process.exit(1);
}
this[key as keyof typeof CONFIG_MAP] = value;
if (typeof value === 'number') {
this[key as ConditionalKeys<typeof CONFIG_MAP, number>] = value;
} else if (typeof value === 'string') {
this[key as ConditionalKeys<typeof CONFIG_MAP, string>] = value;
} else {
throw new Error(`Unsupported type for ${key}`);
}
}
}
}

117
src/services/file-cache.ts Normal file
View file

@ -0,0 +1,117 @@
import {promises as fs, createWriteStream} from 'fs';
import path from 'path';
import {inject, injectable} from 'inversify';
import sequelize from 'sequelize';
import {FileCache} from '../models/index.js';
import {TYPES} from '../types.js';
import Config from './config.js';
@injectable()
export default class FileCacheProvider {
private readonly config: Config;
constructor(@inject(TYPES.Config) config: Config) {
this.config = config;
}
/**
* Returns path to cached file if it exists, otherwise throws an error.
* Updates the `accessedAt` property of the cached file.
* @param hash lookup key
*/
async getPathFor(hash: string): Promise<string> {
const model = await FileCache.findByPk(hash);
if (!model) {
throw new Error('File is not cached');
}
const resolvedPath = path.join(this.config.CACHE_DIR, hash);
try {
await fs.access(resolvedPath);
} catch (_: unknown) {
await FileCache.destroy({where: {hash}});
throw new Error('File is not cached');
}
await model.update({accessedAt: new Date()});
return resolvedPath;
}
/**
* Returns a write stream for the given hash key.
* The stream handles saving a new file and will
* update the database after the stream is closed.
* @param hash lookup key
*/
createWriteStream(hash: string) {
const tmpPath = path.join(this.config.CACHE_DIR, 'tmp', hash);
const finalPath = path.join(this.config.CACHE_DIR, hash);
const stream = createWriteStream(tmpPath);
stream.on('close', async () => {
// Only move if size is non-zero (may have errored out)
const stats = await fs.stat(tmpPath);
if (stats.size !== 0) {
await fs.rename(tmpPath, finalPath);
}
await FileCache.create({hash, bytes: stats.size, accessedAt: new Date()});
await this.evictOldestIfNecessary();
});
return stream;
}
/**
* Deletes orphaned cache files and evicts files if
* necessary. Should be run on program startup so files
* will be evicted if the cache limit has changed.
*/
async cleanup() {
await this.removeOrphans();
await this.evictOldestIfNecessary();
}
private async evictOldestIfNecessary() {
const [{dataValues: {totalSizeBytes}}] = await FileCache.findAll({
attributes: [
[sequelize.fn('sum', sequelize.col('bytes')), 'totalSizeBytes'],
],
}) as unknown as [{dataValues: {totalSizeBytes: number}}];
if (totalSizeBytes > this.config.CACHE_LIMIT_IN_BYTES) {
const oldest = await FileCache.findOne({
order: [
['accessedAt', 'ASC'],
],
});
if (oldest) {
await oldest.destroy();
await fs.unlink(path.join(this.config.CACHE_DIR, oldest.hash));
}
// Continue to evict until we're under the limit
await this.evictOldestIfNecessary();
}
}
private async removeOrphans() {
for await (const dirent of await fs.opendir(this.config.CACHE_DIR)) {
if (dirent.isFile()) {
const model = await FileCache.findByPk(dirent.name);
if (!model) {
await fs.unlink(path.join(this.config.CACHE_DIR, dirent.name));
}
}
}
}
}

View file

@ -14,7 +14,7 @@ import {TYPES} from '../types.js';
import {cleanUrl} from '../utils/url.js';
import ThirdParty from './third-party.js';
import Config from './config.js';
import CacheProvider from './cache.js';
import KeyValueCacheProvider from './key-value-cache.js';
type QueuedSongWithoutChannel = Except<QueuedSong, 'addedInChannelId'>;
@ -26,14 +26,14 @@ export default class {
private readonly youtube: YouTube;
private readonly youtubeKey: string;
private readonly spotify: Spotify;
private readonly cache: CacheProvider;
private readonly cache: KeyValueCacheProvider;
private readonly ytsrQueue: PQueue;
constructor(
@inject(TYPES.ThirdParty) thirdParty: ThirdParty,
@inject(TYPES.Config) config: Config,
@inject(TYPES.Cache) cache: CacheProvider) {
@inject(TYPES.KeyValueCache) cache: KeyValueCacheProvider) {
this.youtube = thirdParty.youtube;
this.youtubeKey = config.YOUTUBE_API_KEY;
this.spotify = thirdParty.spotify;

View file

@ -1,5 +1,5 @@
import {injectable} from 'inversify';
import {Cache} from '../models/index.js';
import {KeyValueCache} from '../models/index.js';
import debug from '../utils/debug.js';
type Seconds = number;
@ -12,7 +12,7 @@ type Options = {
const futureTimeToDate = (time: Seconds) => new Date(new Date().getTime() + (time * 1000));
@injectable()
export default class CacheProvider {
export default class KeyValueCacheProvider {
async wrap<T extends [...any[], Options], F>(func: (...options: any) => Promise<F>, ...options: T): Promise<F> {
if (options.length === 0) {
throw new Error('Missing cache options');
@ -29,7 +29,7 @@ export default class CacheProvider {
throw new Error(`Cache key ${key} is too short.`);
}
const cachedResult = await Cache.findByPk(key);
const cachedResult = await KeyValueCache.findByPk(key);
if (cachedResult) {
if (new Date() < cachedResult.expiresAt) {
@ -45,7 +45,7 @@ export default class CacheProvider {
const result = await func(...options as any[]);
// Save result
await Cache.upsert({
await KeyValueCache.upsert({
key,
value: JSON.stringify(result),
expiresAt: futureTimeToDate(expiresIn),

View file

@ -1,13 +1,13 @@
import {VoiceConnection, VoiceChannel, StreamDispatcher, Snowflake, Client, TextChannel} from 'discord.js';
import {promises as fs, createWriteStream} from 'fs';
import {Readable, PassThrough} from 'stream';
import path from 'path';
import {VoiceChannel, Snowflake, Client, TextChannel} from 'discord.js';
import {Readable} from 'stream';
import hasha from 'hasha';
import ytdl from 'ytdl-core';
import {WriteStream} from 'fs-capacitor';
import ffmpeg from 'fluent-ffmpeg';
import shuffle from 'array-shuffle';
import errorMsg from '../utils/error-msg.js';
import {AudioPlayer, AudioPlayerStatus, createAudioPlayer, createAudioResource, joinVoiceChannel, StreamType, VoiceConnection, VoiceConnectionStatus} from '@discordjs/voice';
import FileCacheProvider from './file-cache.js';
export interface QueuedPlaylist {
title: string;
@ -34,8 +34,7 @@ export default class {
public voiceConnection: VoiceConnection | null = null;
private queue: QueuedSong[] = [];
private queuePosition = 0;
private readonly cacheDir: string;
private dispatcher: StreamDispatcher | null = null;
private audioPlayer: AudioPlayer | null = null;
private nowPlaying: QueuedSong | null = null;
private playPositionInterval: NodeJS.Timeout | undefined;
private lastSongURL = '';
@ -43,30 +42,34 @@ export default class {
private positionInSeconds = 0;
private readonly discordClient: Client;
private readonly fileCache: FileCacheProvider;
constructor(cacheDir: string, client: Client) {
this.cacheDir = cacheDir;
constructor(client: Client, fileCache: FileCacheProvider) {
this.discordClient = client;
this.fileCache = fileCache;
}
async connect(channel: VoiceChannel): Promise<void> {
const conn = await channel.join();
const conn = joinVoiceChannel({
channelId: channel.id,
guildId: channel.guild.id,
adapterCreator: channel.guild.voiceAdapterCreator,
});
this.voiceConnection = conn;
}
disconnect(breakConnection = true): void {
disconnect(): void {
if (this.voiceConnection) {
if (this.status === STATUS.PLAYING) {
this.pause();
}
if (breakConnection) {
this.voiceConnection.disconnect();
}
this.voiceConnection.destroy();
this.audioPlayer?.stop();
this.voiceConnection = null;
this.dispatcher = null;
this.audioPlayer = null;
}
}
@ -88,8 +91,11 @@ export default class {
}
const stream = await this.getStream(currentSong.url, {seek: positionSeconds});
this.dispatcher = this.voiceConnection.play(stream, {type: 'webm/opus', bitrate: 'auto'});
this.audioPlayer = createAudioPlayer();
this.voiceConnection.subscribe(this.audioPlayer);
this.audioPlayer.play(createAudioResource(stream, {
inputType: StreamType.WebmOpus,
}));
this.attachListeners();
this.startTrackingPosition(positionSeconds);
@ -117,8 +123,8 @@ export default class {
// Resume from paused state
if (this.status === STATUS.PAUSED && currentSong.url === this.nowPlaying?.url) {
if (this.dispatcher) {
this.dispatcher.resume();
if (this.audioPlayer) {
this.audioPlayer.unpause();
this.status = STATUS.PLAYING;
this.startTrackingPosition();
return;
@ -132,7 +138,11 @@ export default class {
try {
const stream = await this.getStream(currentSong.url);
this.dispatcher = this.voiceConnection.play(stream, {type: 'webm/opus'});
this.audioPlayer = createAudioPlayer();
this.voiceConnection.subscribe(this.audioPlayer);
this.audioPlayer.play(createAudioResource(stream, {
inputType: StreamType.WebmOpus,
}));
this.attachListeners();
@ -170,8 +180,8 @@ export default class {
this.status = STATUS.PAUSED;
if (this.dispatcher) {
this.dispatcher.pause();
if (this.audioPlayer) {
this.audioPlayer.pause();
}
this.stopTrackingPosition();
@ -230,25 +240,12 @@ export default class {
}
add(song: QueuedSong, {immediate = false} = {}): void {
if (song.playlist) {
if (song.playlist || !immediate) {
// Add to end of queue
this.queue.push(song);
} else {
// Not from playlist, add immediately
let insertAt = this.queuePosition + 1;
if (!immediate) {
// Loop until playlist song
this.queue.some(song => {
if (song.playlist) {
return true;
}
insertAt++;
return false;
});
}
// Add as the next song to be played
const insertAt = this.queuePosition + 1;
this.queue = [...this.queue.slice(0, insertAt), song, ...this.queue.slice(insertAt)];
}
}
@ -273,6 +270,10 @@ export default class {
this.queue = newQueue;
}
removeFromQueue(index: number, amount = 1): void {
this.queue.splice(this.queuePosition + index, amount);
}
removeCurrent(): void {
this.queue = [...this.queue.slice(0, this.queuePosition), ...this.queue.slice(this.queuePosition + 1)];
}
@ -285,40 +286,24 @@ export default class {
return this.queueSize() === 0;
}
private getCachedPath(url: string): string {
return path.join(this.cacheDir, hasha(url));
}
private getCachedPathTemp(url: string): string {
return path.join(this.cacheDir, 'tmp', hasha(url));
}
private async isCached(url: string): Promise<boolean> {
try {
await fs.access(this.getCachedPath(url));
return true;
} catch (_: unknown) {
return false;
}
private getHashForCache(url: string): string {
return hasha(url);
}
private async getStream(url: string, options: {seek?: number} = {}): Promise<Readable> {
const cachedPath = this.getCachedPath(url);
let ffmpegInput = '';
const ffmpegInputOptions: string[] = [];
let shouldCacheVideo = false;
let format: ytdl.videoFormat | undefined;
if (await this.isCached(url)) {
ffmpegInput = cachedPath;
try {
ffmpegInput = await this.fileCache.getPathFor(this.getHashForCache(url));
if (options.seek) {
ffmpegInputOptions.push('-ss', options.seek.toString());
}
} else {
} catch {
// Not yet cached, must download
const info = await ytdl.getInfo(url);
@ -379,6 +364,17 @@ export default class {
// Create stream and pipe to capacitor
return new Promise((resolve, reject) => {
const capacitor = new WriteStream();
// Cache video if necessary
if (shouldCacheVideo) {
const cacheStream = this.fileCache.createWriteStream(this.getHashForCache(url));
capacitor.createReadStream().pipe(cacheStream);
} else {
ffmpegInputOptions.push('-re');
}
const youtubeStream = ffmpeg(ffmpegInput)
.inputOptions(ffmpegInputOptions)
.noVideo()
@ -387,29 +383,9 @@ export default class {
.on('error', error => {
console.error(error);
reject(error);
})
.pipe() as PassThrough;
const capacitor = new WriteStream();
youtubeStream.pipe(capacitor);
// Cache video if necessary
if (shouldCacheVideo) {
const cacheTempPath = this.getCachedPathTemp(url);
const cacheStream = createWriteStream(cacheTempPath);
cacheStream.on('finish', async () => {
// Only move if size is non-zero (may have errored out)
const stats = await fs.stat(cacheTempPath);
if (stats.size !== 0) {
await fs.rename(cacheTempPath, cachedPath);
}
});
capacitor.createReadStream().pipe(cacheStream);
}
youtubeStream.pipe(capacitor);
resolve(capacitor.createReadStream());
});
@ -440,22 +416,26 @@ export default class {
return;
}
this.voiceConnection.on('disconnect', this.onVoiceConnectionDisconnect.bind(this));
if (this.voiceConnection.listeners(VoiceConnectionStatus.Disconnected).length === 0) {
this.voiceConnection.on(VoiceConnectionStatus.Disconnected, this.onVoiceConnectionDisconnect.bind(this));
}
if (!this.dispatcher) {
if (!this.audioPlayer) {
return;
}
this.dispatcher.on('speaking', this.onVoiceConnectionSpeaking.bind(this));
if (this.audioPlayer.listeners('stateChange').length === 0) {
this.audioPlayer.on('stateChange', this.onAudioPlayerStateChange.bind(this));
}
}
private onVoiceConnectionDisconnect(): void {
this.disconnect(false);
this.disconnect();
}
private async onVoiceConnectionSpeaking(isSpeaking: boolean): Promise<void> {
private async onAudioPlayerStateChange(_oldState: {status: AudioPlayerStatus}, newState: {status: AudioPlayerStatus}): Promise<void> {
// Automatically advance queued song at end
if (!isSpeaking && this.status === STATUS.PLAYING) {
if (newState.status === AudioPlayerStatus.Idle && this.status === STATUS.PLAYING) {
await this.forward(1);
}
}