implement crude locks to prevent unwanted concurrency in requests

This commit is contained in:
Travis Glenn Hansen 2021-01-06 17:28:13 -07:00
parent 599767d2f9
commit 26727620c6
3 changed files with 91 additions and 7 deletions

View File

@ -37,6 +37,10 @@ Predominantly 3 things are needed:
- deploy the driver into the cluster (`helm` chart provided with sample
`values.yaml`)
## Guides
- https://jonathangazeley.com/2021/01/05/using-truenas-to-provide-persistent-storage-for-kubernetes/
## Node Prep
You should install/configure the requirements for both nfs and iscsi.

View File

@ -75,6 +75,9 @@ const protoLoader = require("@grpc/proto-loader");
const LRU = require("lru-cache");
const cache = new LRU({ max: 500 });
const { logger } = require("../src/utils/logger");
const { GrpcError } = require("../src/utils/grpc");
const GeneralUtils = require("../src/utils/general");
if (args.logLevel) {
logger.level = args.logLevel;
}
@ -106,6 +109,14 @@ try {
process.exit(1);
}
// TODO: consider alternatives that work 100% with replicas > 1
// doing this in memory is crude and currently only works reliably with 1
// replica per controller (due to potentional cross external-foo leaders,
// ie: snapshot operations can lock volumes delete, but different pods may be
// leader for each respectively)
// having said that, it should be very effective as is
let operationLock = new Set();
async function requestHandlerProxy(call, callback, serviceMethodName) {
const cleansedCall = JSON.parse(JSON.stringify(call));
for (const key in cleansedCall.request) {
@ -120,8 +131,47 @@ async function requestHandlerProxy(call, callback, serviceMethodName) {
serviceMethodName,
cleansedCall
);
//const response = await handler.call(driver, call);
const response = await driver[serviceMethodName](call);
const lockKeys = GeneralUtils.lockKeysFromRequest(call, serviceMethodName);
if (lockKeys.length > 0) {
logger.debug("operation lock keys: %j", lockKeys);
// check locks
lockKeys.forEach((key) => {
if (operationLock.has(key)) {
throw new GrpcError(
grpc.status.ABORTED,
"operation locked due to in progress operation(s): " +
JSON.stringify(lockKeys)
);
}
});
}
let response;
let responseError;
try {
// aquire locks
if (lockKeys.length > 0) {
lockKeys.forEach((key) => {
operationLock.add(key);
});
}
response = await driver[serviceMethodName](call);
} catch (e) {
responseError = e;
} finally {
// release locks
if (lockKeys.length > 0) {
lockKeys.forEach((key) => {
operationLock.delete(key);
});
}
}
if (responseError) {
throw responseError;
}
logger.info(
"new response - driver: %s method: %s response: %j",
driver.constructor.name,

View File

@ -1,7 +1,37 @@
function sleep(ms){
return new Promise(resolve=>{
setTimeout(resolve,ms)
})
function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
module.exports.sleep = sleep;
function lockKeysFromRequest(call, serviceMethodName) {
switch (serviceMethodName) {
// controller
case "CreateVolume":
return ["create_volume_name_" + call.request.name];
case "DeleteVolume":
case "ControllerExpandVolume":
return ["volume_id_" + call.request.volume_id];
case "CreateSnapshot":
return [
"create_snapshot_name_" + call.request.name,
"volume_id_" + call.request.source_volume_id,
];
case "DeleteSnapshot":
return ["snapshot_id_" + call.request.snapshot_id];
// node
case "NodeStageVolume":
case "NodeUnstageVolume":
case "NodePublishVolume":
case "NodeUnpublishVolume":
case "NodeExpandVolume":
return ["volume_id_" + call.request.volume_id];
default:
return [];
}
}
module.exports.sleep = sleep;
module.exports.lockKeysFromRequest = lockKeysFromRequest;