From 26727620c6f4549d1bddbfdbe6fd3f13233847ad Mon Sep 17 00:00:00 2001 From: Travis Glenn Hansen Date: Wed, 6 Jan 2021 17:28:13 -0700 Subject: [PATCH] implement crude locks to prevent unwanted concurrency in requests --- README.md | 4 ++++ bin/democratic-csi | 54 ++++++++++++++++++++++++++++++++++++++++++-- src/utils/general.js | 40 ++++++++++++++++++++++++++++---- 3 files changed, 91 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 8e96930..2999738 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,10 @@ Predominantly 3 things are needed: - deploy the driver into the cluster (`helm` chart provided with sample `values.yaml`) +## Guides + +- https://jonathangazeley.com/2021/01/05/using-truenas-to-provide-persistent-storage-for-kubernetes/ + ## Node Prep You should install/configure the requirements for both nfs and iscsi. diff --git a/bin/democratic-csi b/bin/democratic-csi index 43b9c38..dacd918 100755 --- a/bin/democratic-csi +++ b/bin/democratic-csi @@ -75,6 +75,9 @@ const protoLoader = require("@grpc/proto-loader"); const LRU = require("lru-cache"); const cache = new LRU({ max: 500 }); const { logger } = require("../src/utils/logger"); +const { GrpcError } = require("../src/utils/grpc"); +const GeneralUtils = require("../src/utils/general"); + if (args.logLevel) { logger.level = args.logLevel; } @@ -106,6 +109,14 @@ try { process.exit(1); } +// TODO: consider alternatives that work 100% with replicas > 1 +// doing this in memory is crude and currently only works reliably with 1 +// replica per controller (due to potentional cross external-foo leaders, +// ie: snapshot operations can lock volumes delete, but different pods may be +// leader for each respectively) +// having said that, it should be very effective as is +let operationLock = new Set(); + async function requestHandlerProxy(call, callback, serviceMethodName) { const cleansedCall = JSON.parse(JSON.stringify(call)); for (const key in cleansedCall.request) { @@ -120,8 +131,47 @@ async function requestHandlerProxy(call, callback, serviceMethodName) { serviceMethodName, cleansedCall ); - //const response = await handler.call(driver, call); - const response = await driver[serviceMethodName](call); + + const lockKeys = GeneralUtils.lockKeysFromRequest(call, serviceMethodName); + if (lockKeys.length > 0) { + logger.debug("operation lock keys: %j", lockKeys); + // check locks + lockKeys.forEach((key) => { + if (operationLock.has(key)) { + throw new GrpcError( + grpc.status.ABORTED, + "operation locked due to in progress operation(s): " + + JSON.stringify(lockKeys) + ); + } + }); + } + + let response; + let responseError; + try { + // aquire locks + if (lockKeys.length > 0) { + lockKeys.forEach((key) => { + operationLock.add(key); + }); + } + response = await driver[serviceMethodName](call); + } catch (e) { + responseError = e; + } finally { + // release locks + if (lockKeys.length > 0) { + lockKeys.forEach((key) => { + operationLock.delete(key); + }); + } + } + + if (responseError) { + throw responseError; + } + logger.info( "new response - driver: %s method: %s response: %j", driver.constructor.name, diff --git a/src/utils/general.js b/src/utils/general.js index 872a894..bde918d 100644 --- a/src/utils/general.js +++ b/src/utils/general.js @@ -1,7 +1,37 @@ -function sleep(ms){ - return new Promise(resolve=>{ - setTimeout(resolve,ms) - }) +function sleep(ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); } -module.exports.sleep = sleep; \ No newline at end of file +function lockKeysFromRequest(call, serviceMethodName) { + switch (serviceMethodName) { + // controller + case "CreateVolume": + return ["create_volume_name_" + call.request.name]; + case "DeleteVolume": + case "ControllerExpandVolume": + return ["volume_id_" + call.request.volume_id]; + case "CreateSnapshot": + return [ + "create_snapshot_name_" + call.request.name, + "volume_id_" + call.request.source_volume_id, + ]; + case "DeleteSnapshot": + return ["snapshot_id_" + call.request.snapshot_id]; + + // node + case "NodeStageVolume": + case "NodeUnstageVolume": + case "NodePublishVolume": + case "NodeUnpublishVolume": + case "NodeExpandVolume": + return ["volume_id_" + call.request.volume_id]; + + default: + return []; + } +} + +module.exports.sleep = sleep; +module.exports.lockKeysFromRequest = lockKeysFromRequest;