Merge pull request #420 from Hellysonrp/mutex

This commit is contained in:
Max Isom 2021-12-17 19:45:13 -05:00 committed by GitHub
commit d8fc7d39e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 12 deletions

View file

@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased] ## [Unreleased]
### Fixed
- Fixes a race condition in the file cache service (see #420)
## [0.1.0] ## [0.1.0]
### Added ### Added

View file

@ -63,7 +63,8 @@
"new-cap": "off", "new-cap": "off",
"@typescript-eslint/no-unused-vars": "off", "@typescript-eslint/no-unused-vars": "off",
"@typescript-eslint/no-unused-vars-experimental": "error", "@typescript-eslint/no-unused-vars-experimental": "error",
"@typescript-eslint/prefer-readonly-parameter-types": "off" "@typescript-eslint/prefer-readonly-parameter-types": "off",
"@typescript-eslint/no-implicit-any-catch": "off"
} }
}, },
"husky": { "husky": {

View file

@ -5,9 +5,12 @@ import sequelize from 'sequelize';
import {FileCache} from '../models/index.js'; import {FileCache} from '../models/index.js';
import {TYPES} from '../types.js'; import {TYPES} from '../types.js';
import Config from './config.js'; import Config from './config.js';
import PQueue from 'p-queue';
import debug from '../utils/debug.js';
@injectable() @injectable()
export default class FileCacheProvider { export default class FileCacheProvider {
private static readonly evictionQueue = new PQueue({concurrency: 1});
private readonly config: Config; private readonly config: Config;
constructor(@inject(TYPES.Config) config: Config) { constructor(@inject(TYPES.Config) config: Config) {
@ -58,10 +61,14 @@ export default class FileCacheProvider {
const stats = await fs.stat(tmpPath); const stats = await fs.stat(tmpPath);
if (stats.size !== 0) { if (stats.size !== 0) {
try {
await fs.rename(tmpPath, finalPath); await fs.rename(tmpPath, finalPath);
}
await FileCache.create({hash, bytes: stats.size, accessedAt: new Date()}); await FileCache.create({hash, bytes: stats.size, accessedAt: new Date()});
} catch (error) {
debug('Errored when moving a finished cache file:', error);
}
}
await this.evictOldestIfNecessary(); await this.evictOldestIfNecessary();
}); });
@ -80,13 +87,19 @@ export default class FileCacheProvider {
} }
private async evictOldestIfNecessary() { private async evictOldestIfNecessary() {
const [{dataValues: {totalSizeBytes}}] = await FileCache.findAll({ void FileCacheProvider.evictionQueue.add(this.evictOldest.bind(this));
attributes: [
[sequelize.fn('sum', sequelize.col('bytes')), 'totalSizeBytes'],
],
}) as unknown as [{dataValues: {totalSizeBytes: number}}];
if (totalSizeBytes > this.config.CACHE_LIMIT_IN_BYTES) { return FileCacheProvider.evictionQueue.onEmpty();
}
private async evictOldest() {
debug('Evicting oldest files...');
let totalSizeBytes = await this.getDiskUsageInBytes();
let numOfEvictedFiles = 0;
// Continue to evict until we're under the limit
/* eslint-disable no-await-in-loop */
while (totalSizeBytes > this.config.CACHE_LIMIT_IN_BYTES) {
const oldest = await FileCache.findOne({ const oldest = await FileCache.findOne({
order: [ order: [
['accessedAt', 'ASC'], ['accessedAt', 'ASC'],
@ -96,22 +109,111 @@ export default class FileCacheProvider {
if (oldest) { if (oldest) {
await oldest.destroy(); await oldest.destroy();
await fs.unlink(path.join(this.config.CACHE_DIR, oldest.hash)); await fs.unlink(path.join(this.config.CACHE_DIR, oldest.hash));
debug(`${oldest.hash} has been evicted`);
numOfEvictedFiles++;
} }
// Continue to evict until we're under the limit totalSizeBytes = await this.getDiskUsageInBytes();
await this.evictOldestIfNecessary(); }
/* eslint-enable no-await-in-loop */
if (numOfEvictedFiles > 0) {
debug(`${numOfEvictedFiles} files have been evicted`);
} else {
debug(`No files needed to be evicted. Total size of the cache is currently ${totalSizeBytes} bytes, and the cache limit is ${this.config.CACHE_LIMIT_IN_BYTES} bytes.`);
} }
} }
private async removeOrphans() { private async removeOrphans() {
// Check filesystem direction (do files exist on the disk but not in the database?)
for await (const dirent of await fs.opendir(this.config.CACHE_DIR)) { for await (const dirent of await fs.opendir(this.config.CACHE_DIR)) {
if (dirent.isFile()) { if (dirent.isFile()) {
const model = await FileCache.findByPk(dirent.name); const model = await FileCache.findByPk(dirent.name);
if (!model) { if (!model) {
debug(`${dirent.name} was present on disk but was not in the database. Removing from disk.`);
await fs.unlink(path.join(this.config.CACHE_DIR, dirent.name)); await fs.unlink(path.join(this.config.CACHE_DIR, dirent.name));
} }
} }
} }
// Check database direction (do entries exist in the database but not on the disk?)
for await (const model of this.getFindAllIterable()) {
const filePath = path.join(this.config.CACHE_DIR, model.hash);
try {
await fs.access(filePath);
} catch {
debug(`${model.hash} was present in database but was not on disk. Removing from database.`);
await model.destroy();
}
}
}
/**
* Pulls from the database rather than the filesystem,
* so may be slightly inaccurate.
* @returns the total size of the cache in bytes
*/
private async getDiskUsageInBytes() {
const [{dataValues: {totalSizeBytes}}] = await FileCache.findAll({
attributes: [
[sequelize.fn('sum', sequelize.col('bytes')), 'totalSizeBytes'],
],
}) as unknown as [{dataValues: {totalSizeBytes: number}}];
return totalSizeBytes;
}
/**
* An efficient way to iterate over all rows.
* @returns an iterable for the result of FileCache.findAll()
*/
private getFindAllIterable() {
const limit = 50;
let previousCreatedAt: Date | null = null;
let models: FileCache[] = [];
const fetchNextBatch = async () => {
let where = {};
if (previousCreatedAt) {
where = {
createdAt: {
[sequelize.Op.gt]: previousCreatedAt,
},
};
}
models = await FileCache.findAll({
where,
limit,
order: ['createdAt'],
});
if (models.length > 0) {
previousCreatedAt = models[models.length - 1].createdAt as Date;
}
};
return {
[Symbol.asyncIterator]() {
return {
async next() {
if (models.length === 0) {
await fetchNextBatch();
}
if (models.length === 0) {
// Must return value here for types to be inferred correctly
return {done: true, value: null as unknown as FileCache};
}
return {value: models.shift()!, done: false};
},
};
},
};
} }
} }