Move file caching logic to new FileCache service

Also: removes the -re ffmpeg option.
If this option is passed, ffmpeg won't write to fs-capacitor (and the cache file) as fast as possible.
In other words, the cache file won't finish writing until the entire stream has been played.
This commit is contained in:
Max Isom 2021-11-19 12:13:45 -05:00
parent 04d8f8d390
commit f5149dfaba
No known key found for this signature in database
GPG key ID: 25C9B1A7F6798880
12 changed files with 154 additions and 52 deletions

View file

@ -3,3 +3,6 @@ DATA_DIR=
YOUTUBE_API_KEY= YOUTUBE_API_KEY=
SPOTIFY_CLIENT_ID= SPOTIFY_CLIENT_ID=
SPOTIFY_CLIENT_SECRET= SPOTIFY_CLIENT_SECRET=
# Optional
# CACHE_LIMIT=2GB

View file

@ -69,3 +69,7 @@ services:
5. `yarn start` (or `npm run start`) 5. `yarn start` (or `npm run start`)
**Note**: if you're on Windows, you may need to manually set the ffmpeg path. See [#345](https://github.com/codetheweb/muse/issues/345) for details. **Note**: if you're on Windows, you may need to manually set the ffmpeg path. See [#345](https://github.com/codetheweb/muse/issues/345) for details.
#### Advanced
By default, Muse limits the total cache size to around 2 GB. If you want to change this, set the environment variable `CACHE_LIMIT`. For example, `CACHE_LIMIT=512MB` or `CACHE_LIMIT=10GB`.

View file

@ -5,6 +5,7 @@ import {TYPES} from './types.js';
import Bot from './bot.js'; import Bot from './bot.js';
import {sequelize} from './utils/db.js'; import {sequelize} from './utils/db.js';
import Config from './services/config.js'; import Config from './services/config.js';
import FileCacheProvider from './services/file-cache.js';
const bot = container.get<Bot>(TYPES.Bot); const bot = container.get<Bot>(TYPES.Bot);
@ -18,5 +19,7 @@ const bot = container.get<Bot>(TYPES.Bot);
await sequelize.sync({alter: true}); await sequelize.sync({alter: true});
await container.get<FileCacheProvider>(TYPES.FileCache).cleanup();
await bot.listen(); await bot.listen();
})(); })();

View file

@ -28,6 +28,7 @@ import Shuffle from './commands/shuffle.js';
import Skip from './commands/skip.js'; import Skip from './commands/skip.js';
import Unskip from './commands/unskip.js'; import Unskip from './commands/unskip.js';
import ThirdParty from './services/third-party.js'; import ThirdParty from './services/third-party.js';
import FileCacheProvider from './services/file-cache.js';
import KeyValueCacheProvider from './services/key-value-cache.js'; import KeyValueCacheProvider from './services/key-value-cache.js';
const container = new Container(); const container = new Container();
@ -76,6 +77,7 @@ container.bind(TYPES.Config).toConstantValue(new ConfigProvider());
// Static libraries // Static libraries
container.bind(TYPES.ThirdParty).to(ThirdParty); container.bind(TYPES.ThirdParty).to(ThirdParty);
container.bind(TYPES.FileCache).to(FileCacheProvider);
container.bind(TYPES.KeyValueCache).to(KeyValueCacheProvider); container.bind(TYPES.KeyValueCache).to(KeyValueCacheProvider);
export default container; export default container;

View file

@ -2,25 +2,25 @@ import {inject, injectable} from 'inversify';
import {Client} from 'discord.js'; import {Client} from 'discord.js';
import {TYPES} from '../types.js'; import {TYPES} from '../types.js';
import Player from '../services/player.js'; import Player from '../services/player.js';
import Config from '../services/config.js'; import FileCacheProvider from '../services/file-cache.js';
@injectable() @injectable()
export default class { export default class {
private readonly guildPlayers: Map<string, Player>; private readonly guildPlayers: Map<string, Player>;
private readonly cacheDir: string;
private readonly discordClient: Client; private readonly discordClient: Client;
private readonly fileCache: FileCacheProvider;
constructor(@inject(TYPES.Config) config: Config, @inject(TYPES.Client) client: Client) { constructor(@inject(TYPES.FileCache) fileCache: FileCacheProvider, @inject(TYPES.Client) client: Client) {
this.guildPlayers = new Map(); this.guildPlayers = new Map();
this.cacheDir = config.CACHE_DIR;
this.discordClient = client; this.discordClient = client;
this.fileCache = fileCache;
} }
get(guildId: string): Player { get(guildId: string): Player {
let player = this.guildPlayers.get(guildId); let player = this.guildPlayers.get(guildId);
if (!player) { if (!player) {
player = new Player(this.cacheDir, this.discordClient); player = new Player(this.discordClient, this.fileCache);
this.guildPlayers.set(guildId, player); this.guildPlayers.set(guildId, player);
} }

View file

@ -1,13 +1,13 @@
import {Table, Column, PrimaryKey, Model} from 'sequelize-typescript'; import {Table, Column, PrimaryKey, Model} from 'sequelize-typescript';
@Table @Table
export default class FileCache extends Model<FileCache> { export default class FileCache extends Model {
@PrimaryKey @PrimaryKey
@Column @Column
hash!: string; hash!: string;
@Column @Column
kbits!: number; bytes!: number;
@Column @Column
accessedAt!: Date; accessedAt!: Date;

View file

@ -2,7 +2,7 @@ import {Table, Column, PrimaryKey, Model} from 'sequelize-typescript';
import sequelize from 'sequelize'; import sequelize from 'sequelize';
@Table @Table
export default class KeyValueCache extends Model<KeyValueCache> { export default class KeyValueCache extends Model {
@PrimaryKey @PrimaryKey
@Column @Column
key!: string; key!: string;

View file

@ -1,7 +1,7 @@
import {Table, Column, PrimaryKey, Model, Default} from 'sequelize-typescript'; import {Table, Column, PrimaryKey, Model, Default} from 'sequelize-typescript';
@Table @Table
export default class Settings extends Model<Settings> { export default class Settings extends Model {
@PrimaryKey @PrimaryKey
@Column @Column
guildId!: string; guildId!: string;

View file

@ -1,7 +1,7 @@
import {Table, Column, PrimaryKey, Model, AutoIncrement, Index} from 'sequelize-typescript'; import {Table, Column, PrimaryKey, Model, AutoIncrement, Index} from 'sequelize-typescript';
@Table @Table
export default class Shortcut extends Model<Shortcut> { export default class Shortcut extends Model {
@PrimaryKey @PrimaryKey
@AutoIncrement @AutoIncrement
@Column @Column

View file

@ -1,6 +1,8 @@
import dotenv from 'dotenv'; import dotenv from 'dotenv';
import {injectable} from 'inversify'; import {injectable} from 'inversify';
import path from 'path'; import path from 'path';
import xbytes from 'xbytes';
import {ConditionalKeys} from 'type-fest';
dotenv.config(); dotenv.config();
export const DATA_DIR = path.resolve(process.env.DATA_DIR ? process.env.DATA_DIR : './data'); export const DATA_DIR = path.resolve(process.env.DATA_DIR ? process.env.DATA_DIR : './data');
@ -12,6 +14,7 @@ const CONFIG_MAP = {
SPOTIFY_CLIENT_SECRET: process.env.SPOTIFY_CLIENT_SECRET, SPOTIFY_CLIENT_SECRET: process.env.SPOTIFY_CLIENT_SECRET,
DATA_DIR, DATA_DIR,
CACHE_DIR: path.join(DATA_DIR, 'cache'), CACHE_DIR: path.join(DATA_DIR, 'cache'),
CACHE_LIMIT_IN_BYTES: xbytes.parseSize(process.env.CACHE_LIMIT ?? '2GB'),
} as const; } as const;
@injectable() @injectable()
@ -22,6 +25,7 @@ export default class Config {
readonly SPOTIFY_CLIENT_SECRET!: string; readonly SPOTIFY_CLIENT_SECRET!: string;
readonly DATA_DIR!: string; readonly DATA_DIR!: string;
readonly CACHE_DIR!: string; readonly CACHE_DIR!: string;
readonly CACHE_LIMIT_IN_BYTES!: number;
constructor() { constructor() {
for (const [key, value] of Object.entries(CONFIG_MAP)) { for (const [key, value] of Object.entries(CONFIG_MAP)) {
@ -30,7 +34,13 @@ export default class Config {
process.exit(1); process.exit(1);
} }
this[key as keyof typeof CONFIG_MAP] = value; if (typeof value === 'number') {
this[key as ConditionalKeys<typeof CONFIG_MAP, number>] = value;
} else if (typeof value === 'string') {
this[key as ConditionalKeys<typeof CONFIG_MAP, string>] = value;
} else {
throw new Error(`Unsupported type for ${key}`);
}
} }
} }
} }

109
src/services/file-cache.ts Normal file
View file

@ -0,0 +1,109 @@
import {inject, injectable} from 'inversify';
import sequelize from 'sequelize';
import {FileCache} from '../models/index.js';
import {TYPES} from '../types.js';
import Config from './config.js';
import {promises as fs, createWriteStream} from 'fs';
import path from 'path';
@injectable()
export default class FileCacheProvider {
private readonly config: Config;
constructor(@inject(TYPES.Config) config: Config) {
this.config = config;
}
/**
* Returns path to cached file if it exists, otherwise throws an error.
* Updates the `accessedAt` property of the cached file.
* @param hash lookup key
*/
async getPathFor(hash: string): Promise<string> {
const model = await FileCache.findByPk(hash);
if (!model) {
throw new Error('File is not cached');
}
const resolvedPath = path.join(this.config.CACHE_DIR, hash);
try {
await fs.access(resolvedPath);
} catch (_: unknown) {
await FileCache.destroy({where: {hash}});
throw new Error('File is not cached');
}
await model.update({accessedAt: new Date()});
return resolvedPath;
}
/**
* Returns a write stream for the given hash key.
* The stream handles saving a new file and will
* update the database after the stream is finished.
* @param hash lookup key
*/
createWriteStream(hash: string) {
const tmpPath = path.join(this.config.CACHE_DIR, 'tmp', hash);
const finalPath = path.join(this.config.CACHE_DIR, hash);
const stream = createWriteStream(tmpPath);
stream.on('finish', async () => {
// Only move if size is non-zero (may have errored out)
const stats = await fs.stat(tmpPath);
if (stats.size !== 0) {
await fs.rename(tmpPath, finalPath);
}
await FileCache.create({hash, bytes: stats.size, accessedAt: new Date()});
await this.evictOldestIfNecessary();
});
return stream;
}
/**
* Deletes orphaned cache files and evicts files if
* necessary. Should be run on program startup so files
* will be evicted if the cache limit has changed.
*/
async cleanup() {
await this.removeOrphans();
await this.evictOldestIfNecessary();
}
private async evictOldestIfNecessary() {
const [{dataValues: {totalSizeBytes}}] = await FileCache.findAll({
attributes: [
[sequelize.fn('sum', sequelize.col('bytes')), 'totalSizeBytes'],
],
}) as unknown as [{dataValues: {totalSizeBytes: number}}];
if (totalSizeBytes > this.config.CACHE_LIMIT_IN_BYTES) {
const oldest = await FileCache.findOne({
order: [
['accessedAt', 'ASC'],
],
});
if (oldest) {
await oldest.destroy();
await fs.unlink(path.join(this.config.CACHE_DIR, oldest.hash));
}
// Continue to evict until we're under the limit
await this.evictOldestIfNecessary();
}
}
private async removeOrphans() {
// TODO
}
}

View file

@ -1,7 +1,5 @@
import {VoiceChannel, Snowflake, Client, TextChannel} from 'discord.js'; import {VoiceChannel, Snowflake, Client, TextChannel} from 'discord.js';
import {promises as fs, createWriteStream} from 'fs'; import {Readable} from 'stream';
import {Readable, PassThrough} from 'stream';
import path from 'path';
import hasha from 'hasha'; import hasha from 'hasha';
import ytdl from 'ytdl-core'; import ytdl from 'ytdl-core';
import {WriteStream} from 'fs-capacitor'; import {WriteStream} from 'fs-capacitor';
@ -9,6 +7,7 @@ import ffmpeg from 'fluent-ffmpeg';
import shuffle from 'array-shuffle'; import shuffle from 'array-shuffle';
import errorMsg from '../utils/error-msg.js'; import errorMsg from '../utils/error-msg.js';
import {AudioPlayer, AudioPlayerStatus, createAudioPlayer, createAudioResource, joinVoiceChannel, StreamType, VoiceConnection, VoiceConnectionStatus} from '@discordjs/voice'; import {AudioPlayer, AudioPlayerStatus, createAudioPlayer, createAudioResource, joinVoiceChannel, StreamType, VoiceConnection, VoiceConnectionStatus} from '@discordjs/voice';
import FileCacheProvider from './file-cache.js';
export interface QueuedPlaylist { export interface QueuedPlaylist {
title: string; title: string;
@ -35,7 +34,6 @@ export default class {
public voiceConnection: VoiceConnection | null = null; public voiceConnection: VoiceConnection | null = null;
private queue: QueuedSong[] = []; private queue: QueuedSong[] = [];
private queuePosition = 0; private queuePosition = 0;
private readonly cacheDir: string;
private audioPlayer: AudioPlayer | null = null; private audioPlayer: AudioPlayer | null = null;
private nowPlaying: QueuedSong | null = null; private nowPlaying: QueuedSong | null = null;
private playPositionInterval: NodeJS.Timeout | undefined; private playPositionInterval: NodeJS.Timeout | undefined;
@ -44,10 +42,11 @@ export default class {
private positionInSeconds = 0; private positionInSeconds = 0;
private readonly discordClient: Client; private readonly discordClient: Client;
private readonly fileCache: FileCacheProvider;
constructor(cacheDir: string, client: Client) { constructor(client: Client, fileCache: FileCacheProvider) {
this.cacheDir = cacheDir;
this.discordClient = client; this.discordClient = client;
this.fileCache = fileCache;
} }
async connect(channel: VoiceChannel): Promise<void> { async connect(channel: VoiceChannel): Promise<void> {
@ -283,40 +282,24 @@ export default class {
return this.queueSize() === 0; return this.queueSize() === 0;
} }
private getCachedPath(url: string): string { private getHashForCache(url: string): string {
return path.join(this.cacheDir, hasha(url)); return hasha(url);
}
private getCachedPathTemp(url: string): string {
return path.join(this.cacheDir, 'tmp', hasha(url));
}
private async isCached(url: string): Promise<boolean> {
try {
await fs.access(this.getCachedPath(url));
return true;
} catch (_: unknown) {
return false;
}
} }
private async getStream(url: string, options: {seek?: number} = {}): Promise<Readable> { private async getStream(url: string, options: {seek?: number} = {}): Promise<Readable> {
const cachedPath = this.getCachedPath(url);
let ffmpegInput = ''; let ffmpegInput = '';
const ffmpegInputOptions: string[] = []; const ffmpegInputOptions: string[] = [];
let shouldCacheVideo = false; let shouldCacheVideo = false;
let format: ytdl.videoFormat | undefined; let format: ytdl.videoFormat | undefined;
if (await this.isCached(url)) { try {
ffmpegInput = cachedPath; ffmpegInput = await this.fileCache.getPathFor(this.getHashForCache(url));
if (options.seek) { if (options.seek) {
ffmpegInputOptions.push('-ss', options.seek.toString()); ffmpegInputOptions.push('-ss', options.seek.toString());
} }
} else { } catch {
// Not yet cached, must download // Not yet cached, must download
const info = await ytdl.getInfo(url); const info = await ytdl.getInfo(url);
@ -367,7 +350,6 @@ export default class {
'1', '1',
'-reconnect_delay_max', '-reconnect_delay_max',
'5', '5',
'-re',
]); ]);
if (options.seek) { if (options.seek) {
@ -386,8 +368,7 @@ export default class {
.on('error', error => { .on('error', error => {
console.error(error); console.error(error);
reject(error); reject(error);
}) });
.pipe() as PassThrough;
const capacitor = new WriteStream(); const capacitor = new WriteStream();
@ -395,17 +376,7 @@ export default class {
// Cache video if necessary // Cache video if necessary
if (shouldCacheVideo) { if (shouldCacheVideo) {
const cacheTempPath = this.getCachedPathTemp(url); const cacheStream = this.fileCache.createWriteStream(this.getHashForCache(url));
const cacheStream = createWriteStream(cacheTempPath);
cacheStream.on('finish', async () => {
// Only move if size is non-zero (may have errored out)
const stats = await fs.stat(cacheTempPath);
if (stats.size !== 0) {
await fs.rename(cacheTempPath, cachedPath);
}
});
capacitor.createReadStream().pipe(cacheStream); capacitor.createReadStream().pipe(cacheStream);
} }