2021-11-19 18:22:27 +01:00
|
|
|
import {promises as fs, createWriteStream} from 'fs';
|
|
|
|
import path from 'path';
|
2021-11-19 18:13:45 +01:00
|
|
|
import {inject, injectable} from 'inversify';
|
|
|
|
import sequelize from 'sequelize';
|
|
|
|
import {FileCache} from '../models/index.js';
|
|
|
|
import {TYPES} from '../types.js';
|
|
|
|
import Config from './config.js';
|
2021-12-03 05:01:35 +01:00
|
|
|
import PQueue from 'p-queue';
|
2021-12-03 07:26:36 +01:00
|
|
|
import debug from '../utils/debug.js';
|
2021-11-19 18:13:45 +01:00
|
|
|
|
|
|
|
@injectable()
|
|
|
|
export default class FileCacheProvider {
|
2021-12-03 16:45:09 +01:00
|
|
|
private readonly evictionQueue = new PQueue({concurrency: 1});
|
2021-11-19 18:13:45 +01:00
|
|
|
private readonly config: Config;
|
|
|
|
|
|
|
|
constructor(@inject(TYPES.Config) config: Config) {
|
|
|
|
this.config = config;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Returns path to cached file if it exists, otherwise throws an error.
|
|
|
|
* Updates the `accessedAt` property of the cached file.
|
|
|
|
* @param hash lookup key
|
|
|
|
*/
|
|
|
|
async getPathFor(hash: string): Promise<string> {
|
|
|
|
const model = await FileCache.findByPk(hash);
|
|
|
|
|
|
|
|
if (!model) {
|
|
|
|
throw new Error('File is not cached');
|
|
|
|
}
|
|
|
|
|
|
|
|
const resolvedPath = path.join(this.config.CACHE_DIR, hash);
|
|
|
|
|
|
|
|
try {
|
|
|
|
await fs.access(resolvedPath);
|
|
|
|
} catch (_: unknown) {
|
|
|
|
await FileCache.destroy({where: {hash}});
|
|
|
|
|
|
|
|
throw new Error('File is not cached');
|
|
|
|
}
|
|
|
|
|
|
|
|
await model.update({accessedAt: new Date()});
|
|
|
|
|
|
|
|
return resolvedPath;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Returns a write stream for the given hash key.
|
|
|
|
* The stream handles saving a new file and will
|
2021-11-24 19:03:25 +01:00
|
|
|
* update the database after the stream is closed.
|
2021-11-19 18:13:45 +01:00
|
|
|
* @param hash lookup key
|
|
|
|
*/
|
|
|
|
createWriteStream(hash: string) {
|
|
|
|
const tmpPath = path.join(this.config.CACHE_DIR, 'tmp', hash);
|
|
|
|
const finalPath = path.join(this.config.CACHE_DIR, hash);
|
|
|
|
|
|
|
|
const stream = createWriteStream(tmpPath);
|
|
|
|
|
2021-11-24 19:03:25 +01:00
|
|
|
stream.on('close', async () => {
|
2021-11-19 18:13:45 +01:00
|
|
|
// Only move if size is non-zero (may have errored out)
|
|
|
|
const stats = await fs.stat(tmpPath);
|
|
|
|
|
|
|
|
if (stats.size !== 0) {
|
2021-11-30 08:39:40 +01:00
|
|
|
try {
|
|
|
|
await fs.rename(tmpPath, finalPath);
|
2021-11-19 18:13:45 +01:00
|
|
|
|
2021-11-30 08:39:40 +01:00
|
|
|
await FileCache.create({hash, bytes: stats.size, accessedAt: new Date()});
|
2021-12-03 16:36:06 +01:00
|
|
|
} catch (error) {
|
|
|
|
debug('Errored when moving a finished cache file:', error);
|
2021-11-30 08:39:40 +01:00
|
|
|
}
|
|
|
|
}
|
2021-11-19 18:13:45 +01:00
|
|
|
|
2021-12-03 16:45:09 +01:00
|
|
|
await this.evictOldestIfNecessary();
|
2021-11-19 18:13:45 +01:00
|
|
|
});
|
|
|
|
|
|
|
|
return stream;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Deletes orphaned cache files and evicts files if
|
|
|
|
* necessary. Should be run on program startup so files
|
|
|
|
* will be evicted if the cache limit has changed.
|
|
|
|
*/
|
|
|
|
async cleanup() {
|
|
|
|
await this.removeOrphans();
|
2021-12-03 16:45:09 +01:00
|
|
|
await this.evictOldestIfNecessary();
|
2021-11-19 18:13:45 +01:00
|
|
|
}
|
|
|
|
|
2021-12-03 16:45:09 +01:00
|
|
|
private async evictOldestIfNecessary() {
|
|
|
|
if (this.evictionQueue.size === 0 && this.evictionQueue.pending === 0) {
|
2021-12-03 07:26:36 +01:00
|
|
|
debug('Adding evictOldest task to queue');
|
2021-12-03 16:45:09 +01:00
|
|
|
void this.evictionQueue.add(this.evictOldest.bind(this));
|
2021-12-03 05:01:35 +01:00
|
|
|
}
|
2021-12-03 16:45:09 +01:00
|
|
|
|
|
|
|
return this.evictionQueue.onEmpty();
|
2021-12-03 05:01:35 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
private async evictOldest() {
|
2021-12-03 07:26:36 +01:00
|
|
|
debug('Evicting oldest (if found)');
|
2021-11-19 18:13:45 +01:00
|
|
|
const [{dataValues: {totalSizeBytes}}] = await FileCache.findAll({
|
|
|
|
attributes: [
|
|
|
|
[sequelize.fn('sum', sequelize.col('bytes')), 'totalSizeBytes'],
|
|
|
|
],
|
|
|
|
}) as unknown as [{dataValues: {totalSizeBytes: number}}];
|
|
|
|
|
|
|
|
if (totalSizeBytes > this.config.CACHE_LIMIT_IN_BYTES) {
|
|
|
|
const oldest = await FileCache.findOne({
|
|
|
|
order: [
|
|
|
|
['accessedAt', 'ASC'],
|
|
|
|
],
|
|
|
|
});
|
|
|
|
|
|
|
|
if (oldest) {
|
|
|
|
await oldest.destroy();
|
|
|
|
await fs.unlink(path.join(this.config.CACHE_DIR, oldest.hash));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Continue to evict until we're under the limit
|
2021-12-03 07:26:36 +01:00
|
|
|
debug('Scheduling another eviction');
|
2021-12-03 16:45:09 +01:00
|
|
|
void this.evictionQueue.add(this.evictOldest.bind(this));
|
2021-11-19 18:13:45 +01:00
|
|
|
}
|
2021-12-03 07:26:36 +01:00
|
|
|
|
|
|
|
debug('Finished evictOldest');
|
2021-11-19 18:13:45 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
private async removeOrphans() {
|
2021-11-19 18:22:27 +01:00
|
|
|
for await (const dirent of await fs.opendir(this.config.CACHE_DIR)) {
|
|
|
|
if (dirent.isFile()) {
|
|
|
|
const model = await FileCache.findByPk(dirent.name);
|
|
|
|
|
|
|
|
if (!model) {
|
|
|
|
await fs.unlink(path.join(this.config.CACHE_DIR, dirent.name));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-11-19 18:13:45 +01:00
|
|
|
}
|
|
|
|
}
|