From f5149dfaba64c62f0a9ea6deab600b3d4d9b0f39 Mon Sep 17 00:00:00 2001 From: Max Isom Date: Fri, 19 Nov 2021 12:13:45 -0500 Subject: [PATCH] Move file caching logic to new FileCache service Also: removes the -re ffmpeg option. If this option is passed, ffmpeg won't write to fs-capacitor (and the cache file) as fast as possible. In other words, the cache file won't finish writing until the entire stream has been played. --- .env.example | 3 + README.md | 4 ++ src/index.ts | 3 + src/inversify.config.ts | 2 + src/managers/player.ts | 10 ++-- src/models/file-cache.ts | 4 +- src/models/key-value-cache.ts | 2 +- src/models/settings.ts | 2 +- src/models/shortcut.ts | 2 +- src/services/config.ts | 12 +++- src/services/file-cache.ts | 109 ++++++++++++++++++++++++++++++++++ src/services/player.ts | 53 ++++------------- 12 files changed, 154 insertions(+), 52 deletions(-) create mode 100644 src/services/file-cache.ts diff --git a/.env.example b/.env.example index 9d757da..db26bda 100644 --- a/.env.example +++ b/.env.example @@ -3,3 +3,6 @@ DATA_DIR= YOUTUBE_API_KEY= SPOTIFY_CLIENT_ID= SPOTIFY_CLIENT_SECRET= + +# Optional +# CACHE_LIMIT=2GB diff --git a/README.md b/README.md index 43ce9b9..ac6e0d3 100644 --- a/README.md +++ b/README.md @@ -69,3 +69,7 @@ services: 5. `yarn start` (or `npm run start`) **Note**: if you're on Windows, you may need to manually set the ffmpeg path. See [#345](https://github.com/codetheweb/muse/issues/345) for details. + +#### Advanced + +By default, Muse limits the total cache size to around 2 GB. If you want to change this, set the environment variable `CACHE_LIMIT`. For example, `CACHE_LIMIT=512MB` or `CACHE_LIMIT=10GB`. diff --git a/src/index.ts b/src/index.ts index 5bf5bab..383faef 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,6 +5,7 @@ import {TYPES} from './types.js'; import Bot from './bot.js'; import {sequelize} from './utils/db.js'; import Config from './services/config.js'; +import FileCacheProvider from './services/file-cache.js'; const bot = container.get(TYPES.Bot); @@ -18,5 +19,7 @@ const bot = container.get(TYPES.Bot); await sequelize.sync({alter: true}); + await container.get(TYPES.FileCache).cleanup(); + await bot.listen(); })(); diff --git a/src/inversify.config.ts b/src/inversify.config.ts index 95ed234..c277f7a 100644 --- a/src/inversify.config.ts +++ b/src/inversify.config.ts @@ -28,6 +28,7 @@ import Shuffle from './commands/shuffle.js'; import Skip from './commands/skip.js'; import Unskip from './commands/unskip.js'; import ThirdParty from './services/third-party.js'; +import FileCacheProvider from './services/file-cache.js'; import KeyValueCacheProvider from './services/key-value-cache.js'; const container = new Container(); @@ -76,6 +77,7 @@ container.bind(TYPES.Config).toConstantValue(new ConfigProvider()); // Static libraries container.bind(TYPES.ThirdParty).to(ThirdParty); +container.bind(TYPES.FileCache).to(FileCacheProvider); container.bind(TYPES.KeyValueCache).to(KeyValueCacheProvider); export default container; diff --git a/src/managers/player.ts b/src/managers/player.ts index 02e4ba0..5d816b8 100644 --- a/src/managers/player.ts +++ b/src/managers/player.ts @@ -2,25 +2,25 @@ import {inject, injectable} from 'inversify'; import {Client} from 'discord.js'; import {TYPES} from '../types.js'; import Player from '../services/player.js'; -import Config from '../services/config.js'; +import FileCacheProvider from '../services/file-cache.js'; @injectable() export default class { private readonly guildPlayers: Map; - private readonly cacheDir: string; private readonly discordClient: Client; + private readonly fileCache: FileCacheProvider; - constructor(@inject(TYPES.Config) config: Config, @inject(TYPES.Client) client: Client) { + constructor(@inject(TYPES.FileCache) fileCache: FileCacheProvider, @inject(TYPES.Client) client: Client) { this.guildPlayers = new Map(); - this.cacheDir = config.CACHE_DIR; this.discordClient = client; + this.fileCache = fileCache; } get(guildId: string): Player { let player = this.guildPlayers.get(guildId); if (!player) { - player = new Player(this.cacheDir, this.discordClient); + player = new Player(this.discordClient, this.fileCache); this.guildPlayers.set(guildId, player); } diff --git a/src/models/file-cache.ts b/src/models/file-cache.ts index 4846ba3..dbac6d3 100644 --- a/src/models/file-cache.ts +++ b/src/models/file-cache.ts @@ -1,13 +1,13 @@ import {Table, Column, PrimaryKey, Model} from 'sequelize-typescript'; @Table -export default class FileCache extends Model { +export default class FileCache extends Model { @PrimaryKey @Column hash!: string; @Column - kbits!: number; + bytes!: number; @Column accessedAt!: Date; diff --git a/src/models/key-value-cache.ts b/src/models/key-value-cache.ts index 795ff75..5072538 100644 --- a/src/models/key-value-cache.ts +++ b/src/models/key-value-cache.ts @@ -2,7 +2,7 @@ import {Table, Column, PrimaryKey, Model} from 'sequelize-typescript'; import sequelize from 'sequelize'; @Table -export default class KeyValueCache extends Model { +export default class KeyValueCache extends Model { @PrimaryKey @Column key!: string; diff --git a/src/models/settings.ts b/src/models/settings.ts index 29c2b3e..3318c47 100644 --- a/src/models/settings.ts +++ b/src/models/settings.ts @@ -1,7 +1,7 @@ import {Table, Column, PrimaryKey, Model, Default} from 'sequelize-typescript'; @Table -export default class Settings extends Model { +export default class Settings extends Model { @PrimaryKey @Column guildId!: string; diff --git a/src/models/shortcut.ts b/src/models/shortcut.ts index 7ce1177..4ec88ed 100644 --- a/src/models/shortcut.ts +++ b/src/models/shortcut.ts @@ -1,7 +1,7 @@ import {Table, Column, PrimaryKey, Model, AutoIncrement, Index} from 'sequelize-typescript'; @Table -export default class Shortcut extends Model { +export default class Shortcut extends Model { @PrimaryKey @AutoIncrement @Column diff --git a/src/services/config.ts b/src/services/config.ts index 759a27a3..96b161c 100644 --- a/src/services/config.ts +++ b/src/services/config.ts @@ -1,6 +1,8 @@ import dotenv from 'dotenv'; import {injectable} from 'inversify'; import path from 'path'; +import xbytes from 'xbytes'; +import {ConditionalKeys} from 'type-fest'; dotenv.config(); export const DATA_DIR = path.resolve(process.env.DATA_DIR ? process.env.DATA_DIR : './data'); @@ -12,6 +14,7 @@ const CONFIG_MAP = { SPOTIFY_CLIENT_SECRET: process.env.SPOTIFY_CLIENT_SECRET, DATA_DIR, CACHE_DIR: path.join(DATA_DIR, 'cache'), + CACHE_LIMIT_IN_BYTES: xbytes.parseSize(process.env.CACHE_LIMIT ?? '2GB'), } as const; @injectable() @@ -22,6 +25,7 @@ export default class Config { readonly SPOTIFY_CLIENT_SECRET!: string; readonly DATA_DIR!: string; readonly CACHE_DIR!: string; + readonly CACHE_LIMIT_IN_BYTES!: number; constructor() { for (const [key, value] of Object.entries(CONFIG_MAP)) { @@ -30,7 +34,13 @@ export default class Config { process.exit(1); } - this[key as keyof typeof CONFIG_MAP] = value; + if (typeof value === 'number') { + this[key as ConditionalKeys] = value; + } else if (typeof value === 'string') { + this[key as ConditionalKeys] = value; + } else { + throw new Error(`Unsupported type for ${key}`); + } } } } diff --git a/src/services/file-cache.ts b/src/services/file-cache.ts new file mode 100644 index 0000000..0ecf279 --- /dev/null +++ b/src/services/file-cache.ts @@ -0,0 +1,109 @@ +import {inject, injectable} from 'inversify'; +import sequelize from 'sequelize'; +import {FileCache} from '../models/index.js'; +import {TYPES} from '../types.js'; +import Config from './config.js'; +import {promises as fs, createWriteStream} from 'fs'; +import path from 'path'; + +@injectable() +export default class FileCacheProvider { + private readonly config: Config; + + constructor(@inject(TYPES.Config) config: Config) { + this.config = config; + } + + /** + * Returns path to cached file if it exists, otherwise throws an error. + * Updates the `accessedAt` property of the cached file. + * @param hash lookup key + */ + async getPathFor(hash: string): Promise { + const model = await FileCache.findByPk(hash); + + if (!model) { + throw new Error('File is not cached'); + } + + const resolvedPath = path.join(this.config.CACHE_DIR, hash); + + try { + await fs.access(resolvedPath); + } catch (_: unknown) { + await FileCache.destroy({where: {hash}}); + + throw new Error('File is not cached'); + } + + await model.update({accessedAt: new Date()}); + + return resolvedPath; + } + + /** + * Returns a write stream for the given hash key. + * The stream handles saving a new file and will + * update the database after the stream is finished. + * @param hash lookup key + */ + createWriteStream(hash: string) { + const tmpPath = path.join(this.config.CACHE_DIR, 'tmp', hash); + const finalPath = path.join(this.config.CACHE_DIR, hash); + + const stream = createWriteStream(tmpPath); + + stream.on('finish', async () => { + // Only move if size is non-zero (may have errored out) + const stats = await fs.stat(tmpPath); + + if (stats.size !== 0) { + await fs.rename(tmpPath, finalPath); + } + + await FileCache.create({hash, bytes: stats.size, accessedAt: new Date()}); + + await this.evictOldestIfNecessary(); + }); + + return stream; + } + + /** + * Deletes orphaned cache files and evicts files if + * necessary. Should be run on program startup so files + * will be evicted if the cache limit has changed. + */ + async cleanup() { + await this.removeOrphans(); + await this.evictOldestIfNecessary(); + } + + private async evictOldestIfNecessary() { + const [{dataValues: {totalSizeBytes}}] = await FileCache.findAll({ + attributes: [ + [sequelize.fn('sum', sequelize.col('bytes')), 'totalSizeBytes'], + ], + }) as unknown as [{dataValues: {totalSizeBytes: number}}]; + + if (totalSizeBytes > this.config.CACHE_LIMIT_IN_BYTES) { + const oldest = await FileCache.findOne({ + order: [ + ['accessedAt', 'ASC'], + ], + }); + + if (oldest) { + await oldest.destroy(); + await fs.unlink(path.join(this.config.CACHE_DIR, oldest.hash)); + } + + // Continue to evict until we're under the limit + await this.evictOldestIfNecessary(); + } + } + + private async removeOrphans() { + // TODO + } +} diff --git a/src/services/player.ts b/src/services/player.ts index c49e8c6..2738a91 100644 --- a/src/services/player.ts +++ b/src/services/player.ts @@ -1,7 +1,5 @@ import {VoiceChannel, Snowflake, Client, TextChannel} from 'discord.js'; -import {promises as fs, createWriteStream} from 'fs'; -import {Readable, PassThrough} from 'stream'; -import path from 'path'; +import {Readable} from 'stream'; import hasha from 'hasha'; import ytdl from 'ytdl-core'; import {WriteStream} from 'fs-capacitor'; @@ -9,6 +7,7 @@ import ffmpeg from 'fluent-ffmpeg'; import shuffle from 'array-shuffle'; import errorMsg from '../utils/error-msg.js'; import {AudioPlayer, AudioPlayerStatus, createAudioPlayer, createAudioResource, joinVoiceChannel, StreamType, VoiceConnection, VoiceConnectionStatus} from '@discordjs/voice'; +import FileCacheProvider from './file-cache.js'; export interface QueuedPlaylist { title: string; @@ -35,7 +34,6 @@ export default class { public voiceConnection: VoiceConnection | null = null; private queue: QueuedSong[] = []; private queuePosition = 0; - private readonly cacheDir: string; private audioPlayer: AudioPlayer | null = null; private nowPlaying: QueuedSong | null = null; private playPositionInterval: NodeJS.Timeout | undefined; @@ -44,10 +42,11 @@ export default class { private positionInSeconds = 0; private readonly discordClient: Client; + private readonly fileCache: FileCacheProvider; - constructor(cacheDir: string, client: Client) { - this.cacheDir = cacheDir; + constructor(client: Client, fileCache: FileCacheProvider) { this.discordClient = client; + this.fileCache = fileCache; } async connect(channel: VoiceChannel): Promise { @@ -283,40 +282,24 @@ export default class { return this.queueSize() === 0; } - private getCachedPath(url: string): string { - return path.join(this.cacheDir, hasha(url)); - } - - private getCachedPathTemp(url: string): string { - return path.join(this.cacheDir, 'tmp', hasha(url)); - } - - private async isCached(url: string): Promise { - try { - await fs.access(this.getCachedPath(url)); - - return true; - } catch (_: unknown) { - return false; - } + private getHashForCache(url: string): string { + return hasha(url); } private async getStream(url: string, options: {seek?: number} = {}): Promise { - const cachedPath = this.getCachedPath(url); - let ffmpegInput = ''; const ffmpegInputOptions: string[] = []; let shouldCacheVideo = false; let format: ytdl.videoFormat | undefined; - if (await this.isCached(url)) { - ffmpegInput = cachedPath; + try { + ffmpegInput = await this.fileCache.getPathFor(this.getHashForCache(url)); if (options.seek) { ffmpegInputOptions.push('-ss', options.seek.toString()); } - } else { + } catch { // Not yet cached, must download const info = await ytdl.getInfo(url); @@ -367,7 +350,6 @@ export default class { '1', '-reconnect_delay_max', '5', - '-re', ]); if (options.seek) { @@ -386,8 +368,7 @@ export default class { .on('error', error => { console.error(error); reject(error); - }) - .pipe() as PassThrough; + }); const capacitor = new WriteStream(); @@ -395,17 +376,7 @@ export default class { // Cache video if necessary if (shouldCacheVideo) { - const cacheTempPath = this.getCachedPathTemp(url); - const cacheStream = createWriteStream(cacheTempPath); - - cacheStream.on('finish', async () => { - // Only move if size is non-zero (may have errored out) - const stats = await fs.stat(cacheTempPath); - - if (stats.size !== 0) { - await fs.rename(cacheTempPath, cachedPath); - } - }); + const cacheStream = this.fileCache.createWriteStream(this.getHashForCache(url)); capacitor.createReadStream().pipe(cacheStream); }