From 882c884f2241f5f43daa00ff127d37b6c73acc20 Mon Sep 17 00:00:00 2001 From: Danil Uzlov <36223296+d-uzlov@users.noreply.github.com> Date: Wed, 26 Mar 2025 13:10:49 +0000 Subject: [PATCH] refactor proxy: create separate class for driver cache --- src/driver/controller-proxy/index.js | 341 +++++++++++++++++---------- 1 file changed, 215 insertions(+), 126 deletions(-) diff --git a/src/driver/controller-proxy/index.js b/src/driver/controller-proxy/index.js index d9177da..18be2fc 100644 --- a/src/driver/controller-proxy/index.js +++ b/src/driver/controller-proxy/index.js @@ -14,25 +14,21 @@ const NODE_TOPOLOGY_KEY_NAME = "org.democratic-csi.topology/node"; class CsiProxyDriver extends CsiBaseDriver { constructor(ctx, options) { super(...arguments); - this.options.proxy.configFolder = path.normalize(this.options.proxy.configFolder); - if (this.options.proxy.configFolder.slice(-1) == '/') { - this.options.proxy.configFolder = this.options.proxy.configFolder.slice(0, -1); + + this.initCapabilities(); + + let configFolder = path.normalize(this.options.proxy.configFolder); + if (configFolder.slice(-1) == '/') { + configFolder = configFolder.slice(0, -1); } - // corresponding storage class could be deleted without notice - // let's delete entry from cache after 1 hour, so it can be cleaned by GC - // one hour seems long enough to avoid recreating frequently used drivers - // creating a new instance after long inactive period shouldn't be a problem - const oneMinuteInMs = 1000 * 60; - this.enableCacheTimeout = this.options.proxy.cacheTimeoutMinutes != -1; - this.cacheTimeout = (this.options.proxy.cacheTimeoutMinutes ?? 60) * oneMinuteInMs; - if (!this.enableCacheTimeout) { - this.ctx.logger.info("driver cache is permanent"); - } else { - this.ctx.logger.info(`driver cache timeout is ${this.options.proxy.cacheTimeoutMinutes} minutes`); - } + const timeoutMinutes = this.options.proxy.cacheTimeoutMinutes ?? 60; + const defaultOptions = this.options; + this.driverCache = new DriverCache(ctx, configFolder, timeoutMinutes, defaultOptions); + } - options = options || {}; + initCapabilities() { + const options = this.options; options.service = options.service || {}; options.service.identity = options.service.identity || {}; options.service.controller = options.service.controller || {}; @@ -121,6 +117,12 @@ class CsiProxyDriver extends CsiBaseDriver { } } + getCleanupHandlers() { + const cacheCleanup = this.driverCache.getCleanupHandlers(); + // this.cleanup is not modified, concat returns a new object + return this.cleanup.concat(cacheCleanup); + } + parseVolumeHandle(handle, prefix = volumeIdPrefix) { if (!handle.startsWith(prefix)) { throw new GrpcError( @@ -139,105 +141,6 @@ class CsiProxyDriver extends CsiBaseDriver { return prefix + connectionName + '/' + handle; } - // returns real driver object - // internally drivers are cached and deleted on timeout - lookUpConnection(connectionName) { - const configFolder = this.options.proxy.configFolder; - const configPath = configFolder + '/' + connectionName + '.yaml'; - - if (this.timeout == 0) { - // when timeout is 0, force creating a new driver on each request - return this.createDriverFromFile(configPath); - } - - const driverPlaceholder = { - connectionName: connectionName, - fileTime: 0, - driver: null, - }; - const cachedDriver = this.ctx.registry.get(`controller:driver/connection=${connectionName}`, driverPlaceholder); - if (cachedDriver.timer !== null) { - clearTimeout(cachedDriver.timer); - cachedDriver.timer = null; - } - if (this.enableCacheTimeout) { - cachedDriver.timer = setTimeout(() => { - this.ctx.logger.info("removing inactive connection: %s", connectionName); - this.ctx.registry.delete(`controller:driver/connection=${connectionName}`); - cachedDriver.timer = null; - }, this.timeout); - } - - const fileTime = this.getFileTime(configPath); - if (cachedDriver.fileTime != fileTime) { - this.ctx.logger.debug("connection version is old: file time %d != %d", cachedDriver.fileTime, fileTime); - cachedDriver.fileTime = fileTime; - this.ctx.logger.info("creating a new connection: %s", connectionName); - cachedDriver.driver = this.createDriverFromFile(configPath); - } - return cachedDriver.driver; - } - - getFileTime(path) { - try { - const configFileStats = fs.statSync(path); - this.ctx.logger.debug("file time for '%s' is: %d", path, configFileStats.mtime); - return configFileStats.mtime.getTime(); - } catch (e) { - this.ctx.logger.error("fs.statSync failed: %s", e.toString()); - throw e; - } - } - - createDriverFromFile(configPath) { - const fileOptions = this.createOptionsFromFile(configPath); - const mergedOptions = structuredClone(this.options); - _.merge(mergedOptions, fileOptions); - return this.createRealDriver(mergedOptions); - } - - createOptionsFromFile(configPath) { - this.ctx.logger.debug("loading config: %s", configPath); - try { - return yaml.load(fs.readFileSync(configPath, "utf8")); - } catch (e) { - this.ctx.logger.error("failed parsing config file: %s", e.toString()); - throw e; - } - } - - validateDriverType(driver) { - const unsupportedDrivers = [ - "zfs-local-", - "local-hostpath", - "objectivefs", - "proxy", - ]; - for (const prefix in unsupportedDrivers) { - if (driver.startsWith(prefix)) { - throw new GrpcError( - grpc.status.INVALID_ARGUMENT, - `proxy is not supported for driver: ${mergedOptions.driver}` - ); - } - } - } - - createRealDriver(options) { - this.validateDriverType(options.driver); - const realContext = Object.assign({}, this.ctx); - realContext.registry = new Registry(); - const realDriver = this.ctx.factory(realContext, options); - if (realDriver.constructor.name == this.constructor.name) { - throw new GrpcError( - grpc.status.INVALID_ARGUMENT, - `cyclic dependency: proxy on proxy` - ); - } - this.ctx.logger.debug("using driver %s", realDriver.constructor.name); - return realDriver; - } - async checkAndRun(driver, methodName, call, defaultValue) { if(typeof driver[methodName] !== 'function') { if (defaultValue) return defaultValue; @@ -254,7 +157,7 @@ class CsiProxyDriver extends CsiBaseDriver { async controllerRunWrapper(methodName, call, defaultValue) { const volumeHandle = this.parseVolumeHandle(call.request.volume_id); - const driver = this.lookUpConnection(volumeHandle.connectionName); + const driver = this.driverCache.lookUpConnection(volumeHandle.connectionName); call.request.volume_id = volumeHandle.realHandle; return await this.checkAndRun(driver, methodName, call, defaultValue); } @@ -272,7 +175,7 @@ class CsiProxyDriver extends CsiBaseDriver { ); } const connectionName = parameters.connection; - const driver = this.lookUpConnection(connectionName); + const driver = this.driverCache.lookUpConnection(connectionName); return await this.checkAndRun(driver, 'GetCapacity', call, { available_capacity: Number.MAX_SAFE_INTEGER, }); @@ -287,7 +190,7 @@ class CsiProxyDriver extends CsiBaseDriver { ); } const connectionName = parameters.connection; - const driver = this.lookUpConnection(connectionName); + const driver = this.driverCache.lookUpConnection(connectionName); switch (call.request.volume_content_source?.type) { case "snapshot": { @@ -341,7 +244,7 @@ class CsiProxyDriver extends CsiBaseDriver { async CreateSnapshot(call) { const volumeHandle = this.parseVolumeHandle(call.request.source_volume_id); - const driver = this.lookUpConnection(volumeHandle.connectionName); + const driver = this.driverCache.lookUpConnection(volumeHandle.connectionName); call.request.source_volume_id = volumeHandle.realHandle; const result = await this.checkAndRun(driver, 'CreateSnapshot', call); result.snapshot.source_volume_id = this.decorateVolumeHandle(connectionName, result.snapshot.source_volume_id); @@ -351,7 +254,7 @@ class CsiProxyDriver extends CsiBaseDriver { async DeleteSnapshot(call) { const volumeHandle = this.parseVolumeHandle(call.request.snapshot_id, snapshotIdPrefix); - const driver = this.lookUpConnection(volumeHandle.connectionName); + const driver = this.driverCache.lookUpConnection(volumeHandle.connectionName); call.request.snapshot_id = volumeHandle.realHandle; return await this.checkAndRun(driver, 'DeleteSnapshot', call); } @@ -374,10 +277,12 @@ class CsiProxyDriver extends CsiBaseDriver { lookUpNodeDriver(call) { const driverType = call.request.volume_context.provisioner_driver; + // there is no cache timeout for node drivers + // because drivers are not updated dynamically return this.ctx.registry.get(`node:driver/${driverType}`, () => { const driverOptions = structuredClone(this.options); driverOptions.driver = driverType; - return this.createRealDriver(driverOptions); + return this.driverCache.createRealDriver(driverOptions); }); } @@ -392,13 +297,197 @@ class CsiProxyDriver extends CsiBaseDriver { node_id: nodeName, max_volumes_per_node: 0, }; - result.accessible_topology = { - segments: { - [NODE_TOPOLOGY_KEY_NAME]: nodeName, - }, - }; + const topologyType = this.options.proxy.nodeTopology?.type ?? 'cluster'; + const prefix = this.options.proxy.nodeTopology?.prefix ?? TOPOLOGY_DEFAULT_PREFIX; + switch (topologyType) { + case 'cluster': + result.accessible_topology = { + segments: { + [prefix + '/cluster']: 'local', + }, + }; + break + case 'node': + result.accessible_topology = { + segments: { + [prefix + '/node']: nodeName, + }, + }; + break + default: + throw new GrpcError( + grpc.status.INVALID_ARGUMENT, + `proxy: unknown node topology type: ${topologyType}` + ); + } return result; } } +class DriverCache { + constructor(ctx, configFolder, timeoutMinutes, defaultOptions) { + this.driverCache = {}; + this.ctx = ctx; + this.defaultOptions = defaultOptions; + this.configFolder = configFolder; + + // Corresponding storage class could be deleted without notice. + // We can delete drivers that weren't requested for a long time. + // User can configure cache timeout so that driver re-creation is not too frequent. + + this.enableCacheTimeout = timeoutMinutes != -1; + if (this.enableCacheTimeout) { + const oneMinuteInMs = 1000 * 60; + this.cacheTimeoutMs = timeoutMinutes * oneMinuteInMs; + this.ctx.logger.info(`driver cache timeout is ${timeoutMinutes} minutes`); + } else { + this.ctx.logger.info("driver cache is permanent"); + } + } + + getCleanupHandlers() { + const result = []; + for (const connectionName in this.driverCache) { + result.push(() => this.removeCacheEntry(connectionName)); + } + return result; + } + + // returns real driver object + // internally drivers are cached and deleted on timeout + lookUpConnection(connectionName) { + const configPath = this.configFolder + '/' + connectionName + '.yaml'; + + if (this.timeout == 0) { + // when timeout is 0, force creating a new driver on each request + return this.createDriverFromFile(configPath); + } + + let cachedDriver = this.driverCache[connectionName]; + if (!cachedDriver) { + cachedDriver = { + connectionName: connectionName, + fileTime: 0, + driver: null, + }; + this.driverCache[connectionName] = cachedDriver; + } + if (cachedDriver.timer !== null) { + clearTimeout(cachedDriver.timer); + cachedDriver.timer = null; + } + if (this.enableCacheTimeout) { + cachedDriver.timer = setTimeout(() => { + this.ctx.logger.info("removing inactive connection: %s", connectionName); + this.removeCacheEntry(cachedDriver.driver); + }, this.timeout); + } + + const fileTime = this.getFileTime(configPath); + if (cachedDriver.fileTime != fileTime) { + this.ctx.logger.debug("connection version is old: file time %d != %d", cachedDriver.fileTime, fileTime); + this.runDriverCleanup(cachedDriver.driver); + cachedDriver.fileTime = fileTime; + cachedDriver.driver = this.createDriverFromFile(configPath); + } + return cachedDriver.driver; + } + + removeCacheEntry(connectionName) { + const cacheEntry = this.driverCache[connectionName]; + if (!cacheEntry) { + return; + } + this.ctx.logger.debug("removing %s from cache", connectionName); + delete this.driverCache[connectionName]; + if (cacheEntry.timer) { + clearTimeout(cacheEntry.timer); + cacheEntry.timer = null; + } + const driver = cacheEntry.driver; + cachedDriver.fileTime = 0; + cacheEntry.driver = null; + this.runDriverCleanup(driver); + } + + runDriverCleanup(driver) { + if (!driver) { + return; + } + if (typeof driver.getCleanupHandlers !== 'function') { + this.ctx.logger.debug("old driver does not support cleanup"); + return; + } + const cleanup = driver.getCleanupHandlers(); + if (cleanup.length == 0) { + this.ctx.logger.debug("old driver does not require any cleanup"); + return; + } + this.ctx.logger.debug("running %d cleanup functions", cleanup.length); + for (const cleanupFunc of cleanup) { + cleanupFunc(); + } + } + + getFileTime(path) { + try { + const configFileStats = fs.statSync(path); + this.ctx.logger.debug("file time for '%s' is: %d", path, configFileStats.mtime); + return configFileStats.mtime.getTime(); + } catch (e) { + this.ctx.logger.error("fs.statSync failed: %s", e.toString()); + throw e; + } + } + + createDriverFromFile(configPath) { + this.ctx.logger.info("creating new driver from file: %s", configPath); + const fileOptions = this.createOptionsFromFile(configPath); + const mergedOptions = structuredClone(this.defaultOptions); + _.merge(mergedOptions, fileOptions); + return this.createRealDriver(mergedOptions); + } + + createOptionsFromFile(configPath) { + this.ctx.logger.debug("loading config: %s", configPath); + try { + return yaml.load(fs.readFileSync(configPath, "utf8")); + } catch (e) { + this.ctx.logger.error("failed parsing config file: %s", e.toString()); + throw e; + } + } + + createRealDriver(options) { + this.validateDriverType(options.driver); + const realContext = Object.assign({}, this.ctx); + realContext.registry = new Registry(); + const realDriver = this.ctx.factory(realContext, options); + if (realDriver.constructor.name == this.constructor.name) { + throw new GrpcError( + grpc.status.INVALID_ARGUMENT, + `cyclic dependency: proxy on proxy` + ); + } + this.ctx.logger.debug("using driver %s", realDriver.constructor.name); + return realDriver; + } + + validateDriverType(driver) { + const unsupportedDrivers = [ + "zfs-local-ephemeral-inline", + "objectivefs", + "proxy", + ]; + for (const prefix in unsupportedDrivers) { + if (driver.startsWith(prefix)) { + throw new GrpcError( + grpc.status.INVALID_ARGUMENT, + `proxy is not supported for driver: ${mergedOptions.driver}` + ); + } + } + } +} + module.exports.CsiProxyDriver = CsiProxyDriver;