refactor proxy: create separate class for driver cache
This commit is contained in:
parent
05e72fc288
commit
882c884f22
|
|
@ -14,25 +14,21 @@ const NODE_TOPOLOGY_KEY_NAME = "org.democratic-csi.topology/node";
|
||||||
class CsiProxyDriver extends CsiBaseDriver {
|
class CsiProxyDriver extends CsiBaseDriver {
|
||||||
constructor(ctx, options) {
|
constructor(ctx, options) {
|
||||||
super(...arguments);
|
super(...arguments);
|
||||||
this.options.proxy.configFolder = path.normalize(this.options.proxy.configFolder);
|
|
||||||
if (this.options.proxy.configFolder.slice(-1) == '/') {
|
this.initCapabilities();
|
||||||
this.options.proxy.configFolder = this.options.proxy.configFolder.slice(0, -1);
|
|
||||||
|
let configFolder = path.normalize(this.options.proxy.configFolder);
|
||||||
|
if (configFolder.slice(-1) == '/') {
|
||||||
|
configFolder = configFolder.slice(0, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// corresponding storage class could be deleted without notice
|
const timeoutMinutes = this.options.proxy.cacheTimeoutMinutes ?? 60;
|
||||||
// let's delete entry from cache after 1 hour, so it can be cleaned by GC
|
const defaultOptions = this.options;
|
||||||
// one hour seems long enough to avoid recreating frequently used drivers
|
this.driverCache = new DriverCache(ctx, configFolder, timeoutMinutes, defaultOptions);
|
||||||
// creating a new instance after long inactive period shouldn't be a problem
|
|
||||||
const oneMinuteInMs = 1000 * 60;
|
|
||||||
this.enableCacheTimeout = this.options.proxy.cacheTimeoutMinutes != -1;
|
|
||||||
this.cacheTimeout = (this.options.proxy.cacheTimeoutMinutes ?? 60) * oneMinuteInMs;
|
|
||||||
if (!this.enableCacheTimeout) {
|
|
||||||
this.ctx.logger.info("driver cache is permanent");
|
|
||||||
} else {
|
|
||||||
this.ctx.logger.info(`driver cache timeout is ${this.options.proxy.cacheTimeoutMinutes} minutes`);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
options = options || {};
|
initCapabilities() {
|
||||||
|
const options = this.options;
|
||||||
options.service = options.service || {};
|
options.service = options.service || {};
|
||||||
options.service.identity = options.service.identity || {};
|
options.service.identity = options.service.identity || {};
|
||||||
options.service.controller = options.service.controller || {};
|
options.service.controller = options.service.controller || {};
|
||||||
|
|
@ -121,6 +117,12 @@ class CsiProxyDriver extends CsiBaseDriver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getCleanupHandlers() {
|
||||||
|
const cacheCleanup = this.driverCache.getCleanupHandlers();
|
||||||
|
// this.cleanup is not modified, concat returns a new object
|
||||||
|
return this.cleanup.concat(cacheCleanup);
|
||||||
|
}
|
||||||
|
|
||||||
parseVolumeHandle(handle, prefix = volumeIdPrefix) {
|
parseVolumeHandle(handle, prefix = volumeIdPrefix) {
|
||||||
if (!handle.startsWith(prefix)) {
|
if (!handle.startsWith(prefix)) {
|
||||||
throw new GrpcError(
|
throw new GrpcError(
|
||||||
|
|
@ -139,105 +141,6 @@ class CsiProxyDriver extends CsiBaseDriver {
|
||||||
return prefix + connectionName + '/' + handle;
|
return prefix + connectionName + '/' + handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns real driver object
|
|
||||||
// internally drivers are cached and deleted on timeout
|
|
||||||
lookUpConnection(connectionName) {
|
|
||||||
const configFolder = this.options.proxy.configFolder;
|
|
||||||
const configPath = configFolder + '/' + connectionName + '.yaml';
|
|
||||||
|
|
||||||
if (this.timeout == 0) {
|
|
||||||
// when timeout is 0, force creating a new driver on each request
|
|
||||||
return this.createDriverFromFile(configPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
const driverPlaceholder = {
|
|
||||||
connectionName: connectionName,
|
|
||||||
fileTime: 0,
|
|
||||||
driver: null,
|
|
||||||
};
|
|
||||||
const cachedDriver = this.ctx.registry.get(`controller:driver/connection=${connectionName}`, driverPlaceholder);
|
|
||||||
if (cachedDriver.timer !== null) {
|
|
||||||
clearTimeout(cachedDriver.timer);
|
|
||||||
cachedDriver.timer = null;
|
|
||||||
}
|
|
||||||
if (this.enableCacheTimeout) {
|
|
||||||
cachedDriver.timer = setTimeout(() => {
|
|
||||||
this.ctx.logger.info("removing inactive connection: %s", connectionName);
|
|
||||||
this.ctx.registry.delete(`controller:driver/connection=${connectionName}`);
|
|
||||||
cachedDriver.timer = null;
|
|
||||||
}, this.timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
const fileTime = this.getFileTime(configPath);
|
|
||||||
if (cachedDriver.fileTime != fileTime) {
|
|
||||||
this.ctx.logger.debug("connection version is old: file time %d != %d", cachedDriver.fileTime, fileTime);
|
|
||||||
cachedDriver.fileTime = fileTime;
|
|
||||||
this.ctx.logger.info("creating a new connection: %s", connectionName);
|
|
||||||
cachedDriver.driver = this.createDriverFromFile(configPath);
|
|
||||||
}
|
|
||||||
return cachedDriver.driver;
|
|
||||||
}
|
|
||||||
|
|
||||||
getFileTime(path) {
|
|
||||||
try {
|
|
||||||
const configFileStats = fs.statSync(path);
|
|
||||||
this.ctx.logger.debug("file time for '%s' is: %d", path, configFileStats.mtime);
|
|
||||||
return configFileStats.mtime.getTime();
|
|
||||||
} catch (e) {
|
|
||||||
this.ctx.logger.error("fs.statSync failed: %s", e.toString());
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
createDriverFromFile(configPath) {
|
|
||||||
const fileOptions = this.createOptionsFromFile(configPath);
|
|
||||||
const mergedOptions = structuredClone(this.options);
|
|
||||||
_.merge(mergedOptions, fileOptions);
|
|
||||||
return this.createRealDriver(mergedOptions);
|
|
||||||
}
|
|
||||||
|
|
||||||
createOptionsFromFile(configPath) {
|
|
||||||
this.ctx.logger.debug("loading config: %s", configPath);
|
|
||||||
try {
|
|
||||||
return yaml.load(fs.readFileSync(configPath, "utf8"));
|
|
||||||
} catch (e) {
|
|
||||||
this.ctx.logger.error("failed parsing config file: %s", e.toString());
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
validateDriverType(driver) {
|
|
||||||
const unsupportedDrivers = [
|
|
||||||
"zfs-local-",
|
|
||||||
"local-hostpath",
|
|
||||||
"objectivefs",
|
|
||||||
"proxy",
|
|
||||||
];
|
|
||||||
for (const prefix in unsupportedDrivers) {
|
|
||||||
if (driver.startsWith(prefix)) {
|
|
||||||
throw new GrpcError(
|
|
||||||
grpc.status.INVALID_ARGUMENT,
|
|
||||||
`proxy is not supported for driver: ${mergedOptions.driver}`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
createRealDriver(options) {
|
|
||||||
this.validateDriverType(options.driver);
|
|
||||||
const realContext = Object.assign({}, this.ctx);
|
|
||||||
realContext.registry = new Registry();
|
|
||||||
const realDriver = this.ctx.factory(realContext, options);
|
|
||||||
if (realDriver.constructor.name == this.constructor.name) {
|
|
||||||
throw new GrpcError(
|
|
||||||
grpc.status.INVALID_ARGUMENT,
|
|
||||||
`cyclic dependency: proxy on proxy`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
this.ctx.logger.debug("using driver %s", realDriver.constructor.name);
|
|
||||||
return realDriver;
|
|
||||||
}
|
|
||||||
|
|
||||||
async checkAndRun(driver, methodName, call, defaultValue) {
|
async checkAndRun(driver, methodName, call, defaultValue) {
|
||||||
if(typeof driver[methodName] !== 'function') {
|
if(typeof driver[methodName] !== 'function') {
|
||||||
if (defaultValue) return defaultValue;
|
if (defaultValue) return defaultValue;
|
||||||
|
|
@ -254,7 +157,7 @@ class CsiProxyDriver extends CsiBaseDriver {
|
||||||
|
|
||||||
async controllerRunWrapper(methodName, call, defaultValue) {
|
async controllerRunWrapper(methodName, call, defaultValue) {
|
||||||
const volumeHandle = this.parseVolumeHandle(call.request.volume_id);
|
const volumeHandle = this.parseVolumeHandle(call.request.volume_id);
|
||||||
const driver = this.lookUpConnection(volumeHandle.connectionName);
|
const driver = this.driverCache.lookUpConnection(volumeHandle.connectionName);
|
||||||
call.request.volume_id = volumeHandle.realHandle;
|
call.request.volume_id = volumeHandle.realHandle;
|
||||||
return await this.checkAndRun(driver, methodName, call, defaultValue);
|
return await this.checkAndRun(driver, methodName, call, defaultValue);
|
||||||
}
|
}
|
||||||
|
|
@ -272,7 +175,7 @@ class CsiProxyDriver extends CsiBaseDriver {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
const connectionName = parameters.connection;
|
const connectionName = parameters.connection;
|
||||||
const driver = this.lookUpConnection(connectionName);
|
const driver = this.driverCache.lookUpConnection(connectionName);
|
||||||
return await this.checkAndRun(driver, 'GetCapacity', call, {
|
return await this.checkAndRun(driver, 'GetCapacity', call, {
|
||||||
available_capacity: Number.MAX_SAFE_INTEGER,
|
available_capacity: Number.MAX_SAFE_INTEGER,
|
||||||
});
|
});
|
||||||
|
|
@ -287,7 +190,7 @@ class CsiProxyDriver extends CsiBaseDriver {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
const connectionName = parameters.connection;
|
const connectionName = parameters.connection;
|
||||||
const driver = this.lookUpConnection(connectionName);
|
const driver = this.driverCache.lookUpConnection(connectionName);
|
||||||
|
|
||||||
switch (call.request.volume_content_source?.type) {
|
switch (call.request.volume_content_source?.type) {
|
||||||
case "snapshot": {
|
case "snapshot": {
|
||||||
|
|
@ -341,7 +244,7 @@ class CsiProxyDriver extends CsiBaseDriver {
|
||||||
|
|
||||||
async CreateSnapshot(call) {
|
async CreateSnapshot(call) {
|
||||||
const volumeHandle = this.parseVolumeHandle(call.request.source_volume_id);
|
const volumeHandle = this.parseVolumeHandle(call.request.source_volume_id);
|
||||||
const driver = this.lookUpConnection(volumeHandle.connectionName);
|
const driver = this.driverCache.lookUpConnection(volumeHandle.connectionName);
|
||||||
call.request.source_volume_id = volumeHandle.realHandle;
|
call.request.source_volume_id = volumeHandle.realHandle;
|
||||||
const result = await this.checkAndRun(driver, 'CreateSnapshot', call);
|
const result = await this.checkAndRun(driver, 'CreateSnapshot', call);
|
||||||
result.snapshot.source_volume_id = this.decorateVolumeHandle(connectionName, result.snapshot.source_volume_id);
|
result.snapshot.source_volume_id = this.decorateVolumeHandle(connectionName, result.snapshot.source_volume_id);
|
||||||
|
|
@ -351,7 +254,7 @@ class CsiProxyDriver extends CsiBaseDriver {
|
||||||
|
|
||||||
async DeleteSnapshot(call) {
|
async DeleteSnapshot(call) {
|
||||||
const volumeHandle = this.parseVolumeHandle(call.request.snapshot_id, snapshotIdPrefix);
|
const volumeHandle = this.parseVolumeHandle(call.request.snapshot_id, snapshotIdPrefix);
|
||||||
const driver = this.lookUpConnection(volumeHandle.connectionName);
|
const driver = this.driverCache.lookUpConnection(volumeHandle.connectionName);
|
||||||
call.request.snapshot_id = volumeHandle.realHandle;
|
call.request.snapshot_id = volumeHandle.realHandle;
|
||||||
return await this.checkAndRun(driver, 'DeleteSnapshot', call);
|
return await this.checkAndRun(driver, 'DeleteSnapshot', call);
|
||||||
}
|
}
|
||||||
|
|
@ -374,10 +277,12 @@ class CsiProxyDriver extends CsiBaseDriver {
|
||||||
|
|
||||||
lookUpNodeDriver(call) {
|
lookUpNodeDriver(call) {
|
||||||
const driverType = call.request.volume_context.provisioner_driver;
|
const driverType = call.request.volume_context.provisioner_driver;
|
||||||
|
// there is no cache timeout for node drivers
|
||||||
|
// because drivers are not updated dynamically
|
||||||
return this.ctx.registry.get(`node:driver/${driverType}`, () => {
|
return this.ctx.registry.get(`node:driver/${driverType}`, () => {
|
||||||
const driverOptions = structuredClone(this.options);
|
const driverOptions = structuredClone(this.options);
|
||||||
driverOptions.driver = driverType;
|
driverOptions.driver = driverType;
|
||||||
return this.createRealDriver(driverOptions);
|
return this.driverCache.createRealDriver(driverOptions);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -392,13 +297,197 @@ class CsiProxyDriver extends CsiBaseDriver {
|
||||||
node_id: nodeName,
|
node_id: nodeName,
|
||||||
max_volumes_per_node: 0,
|
max_volumes_per_node: 0,
|
||||||
};
|
};
|
||||||
|
const topologyType = this.options.proxy.nodeTopology?.type ?? 'cluster';
|
||||||
|
const prefix = this.options.proxy.nodeTopology?.prefix ?? TOPOLOGY_DEFAULT_PREFIX;
|
||||||
|
switch (topologyType) {
|
||||||
|
case 'cluster':
|
||||||
result.accessible_topology = {
|
result.accessible_topology = {
|
||||||
segments: {
|
segments: {
|
||||||
[NODE_TOPOLOGY_KEY_NAME]: nodeName,
|
[prefix + '/cluster']: 'local',
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
break
|
||||||
|
case 'node':
|
||||||
|
result.accessible_topology = {
|
||||||
|
segments: {
|
||||||
|
[prefix + '/node']: nodeName,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
throw new GrpcError(
|
||||||
|
grpc.status.INVALID_ARGUMENT,
|
||||||
|
`proxy: unknown node topology type: ${topologyType}`
|
||||||
|
);
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class DriverCache {
|
||||||
|
constructor(ctx, configFolder, timeoutMinutes, defaultOptions) {
|
||||||
|
this.driverCache = {};
|
||||||
|
this.ctx = ctx;
|
||||||
|
this.defaultOptions = defaultOptions;
|
||||||
|
this.configFolder = configFolder;
|
||||||
|
|
||||||
|
// Corresponding storage class could be deleted without notice.
|
||||||
|
// We can delete drivers that weren't requested for a long time.
|
||||||
|
// User can configure cache timeout so that driver re-creation is not too frequent.
|
||||||
|
|
||||||
|
this.enableCacheTimeout = timeoutMinutes != -1;
|
||||||
|
if (this.enableCacheTimeout) {
|
||||||
|
const oneMinuteInMs = 1000 * 60;
|
||||||
|
this.cacheTimeoutMs = timeoutMinutes * oneMinuteInMs;
|
||||||
|
this.ctx.logger.info(`driver cache timeout is ${timeoutMinutes} minutes`);
|
||||||
|
} else {
|
||||||
|
this.ctx.logger.info("driver cache is permanent");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getCleanupHandlers() {
|
||||||
|
const result = [];
|
||||||
|
for (const connectionName in this.driverCache) {
|
||||||
|
result.push(() => this.removeCacheEntry(connectionName));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns real driver object
|
||||||
|
// internally drivers are cached and deleted on timeout
|
||||||
|
lookUpConnection(connectionName) {
|
||||||
|
const configPath = this.configFolder + '/' + connectionName + '.yaml';
|
||||||
|
|
||||||
|
if (this.timeout == 0) {
|
||||||
|
// when timeout is 0, force creating a new driver on each request
|
||||||
|
return this.createDriverFromFile(configPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
let cachedDriver = this.driverCache[connectionName];
|
||||||
|
if (!cachedDriver) {
|
||||||
|
cachedDriver = {
|
||||||
|
connectionName: connectionName,
|
||||||
|
fileTime: 0,
|
||||||
|
driver: null,
|
||||||
|
};
|
||||||
|
this.driverCache[connectionName] = cachedDriver;
|
||||||
|
}
|
||||||
|
if (cachedDriver.timer !== null) {
|
||||||
|
clearTimeout(cachedDriver.timer);
|
||||||
|
cachedDriver.timer = null;
|
||||||
|
}
|
||||||
|
if (this.enableCacheTimeout) {
|
||||||
|
cachedDriver.timer = setTimeout(() => {
|
||||||
|
this.ctx.logger.info("removing inactive connection: %s", connectionName);
|
||||||
|
this.removeCacheEntry(cachedDriver.driver);
|
||||||
|
}, this.timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
const fileTime = this.getFileTime(configPath);
|
||||||
|
if (cachedDriver.fileTime != fileTime) {
|
||||||
|
this.ctx.logger.debug("connection version is old: file time %d != %d", cachedDriver.fileTime, fileTime);
|
||||||
|
this.runDriverCleanup(cachedDriver.driver);
|
||||||
|
cachedDriver.fileTime = fileTime;
|
||||||
|
cachedDriver.driver = this.createDriverFromFile(configPath);
|
||||||
|
}
|
||||||
|
return cachedDriver.driver;
|
||||||
|
}
|
||||||
|
|
||||||
|
removeCacheEntry(connectionName) {
|
||||||
|
const cacheEntry = this.driverCache[connectionName];
|
||||||
|
if (!cacheEntry) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.ctx.logger.debug("removing %s from cache", connectionName);
|
||||||
|
delete this.driverCache[connectionName];
|
||||||
|
if (cacheEntry.timer) {
|
||||||
|
clearTimeout(cacheEntry.timer);
|
||||||
|
cacheEntry.timer = null;
|
||||||
|
}
|
||||||
|
const driver = cacheEntry.driver;
|
||||||
|
cachedDriver.fileTime = 0;
|
||||||
|
cacheEntry.driver = null;
|
||||||
|
this.runDriverCleanup(driver);
|
||||||
|
}
|
||||||
|
|
||||||
|
runDriverCleanup(driver) {
|
||||||
|
if (!driver) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (typeof driver.getCleanupHandlers !== 'function') {
|
||||||
|
this.ctx.logger.debug("old driver does not support cleanup");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const cleanup = driver.getCleanupHandlers();
|
||||||
|
if (cleanup.length == 0) {
|
||||||
|
this.ctx.logger.debug("old driver does not require any cleanup");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.ctx.logger.debug("running %d cleanup functions", cleanup.length);
|
||||||
|
for (const cleanupFunc of cleanup) {
|
||||||
|
cleanupFunc();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getFileTime(path) {
|
||||||
|
try {
|
||||||
|
const configFileStats = fs.statSync(path);
|
||||||
|
this.ctx.logger.debug("file time for '%s' is: %d", path, configFileStats.mtime);
|
||||||
|
return configFileStats.mtime.getTime();
|
||||||
|
} catch (e) {
|
||||||
|
this.ctx.logger.error("fs.statSync failed: %s", e.toString());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
createDriverFromFile(configPath) {
|
||||||
|
this.ctx.logger.info("creating new driver from file: %s", configPath);
|
||||||
|
const fileOptions = this.createOptionsFromFile(configPath);
|
||||||
|
const mergedOptions = structuredClone(this.defaultOptions);
|
||||||
|
_.merge(mergedOptions, fileOptions);
|
||||||
|
return this.createRealDriver(mergedOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
createOptionsFromFile(configPath) {
|
||||||
|
this.ctx.logger.debug("loading config: %s", configPath);
|
||||||
|
try {
|
||||||
|
return yaml.load(fs.readFileSync(configPath, "utf8"));
|
||||||
|
} catch (e) {
|
||||||
|
this.ctx.logger.error("failed parsing config file: %s", e.toString());
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
createRealDriver(options) {
|
||||||
|
this.validateDriverType(options.driver);
|
||||||
|
const realContext = Object.assign({}, this.ctx);
|
||||||
|
realContext.registry = new Registry();
|
||||||
|
const realDriver = this.ctx.factory(realContext, options);
|
||||||
|
if (realDriver.constructor.name == this.constructor.name) {
|
||||||
|
throw new GrpcError(
|
||||||
|
grpc.status.INVALID_ARGUMENT,
|
||||||
|
`cyclic dependency: proxy on proxy`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
this.ctx.logger.debug("using driver %s", realDriver.constructor.name);
|
||||||
|
return realDriver;
|
||||||
|
}
|
||||||
|
|
||||||
|
validateDriverType(driver) {
|
||||||
|
const unsupportedDrivers = [
|
||||||
|
"zfs-local-ephemeral-inline",
|
||||||
|
"objectivefs",
|
||||||
|
"proxy",
|
||||||
|
];
|
||||||
|
for (const prefix in unsupportedDrivers) {
|
||||||
|
if (driver.startsWith(prefix)) {
|
||||||
|
throw new GrpcError(
|
||||||
|
grpc.status.INVALID_ARGUMENT,
|
||||||
|
`proxy is not supported for driver: ${mergedOptions.driver}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
module.exports.CsiProxyDriver = CsiProxyDriver;
|
module.exports.CsiProxyDriver = CsiProxyDriver;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue