democratic-csi/bin/democratic-csi

507 lines
14 KiB
JavaScript
Executable File

#!/usr/bin/env -S node --expose-gc ${NODE_OPTIONS_CSI_1} ${NODE_OPTIONS_CSI_2} ${NODE_OPTIONS_CSI_3} ${NODE_OPTIONS_CSI_4}
/**
* keep the shebang line length under 128
* https://github.com/democratic-csi/democratic-csi/issues/171
*/
const yaml = require("js-yaml");
const fs = require("fs");
const { grpc } = require("../src/utils/grpc");
const { stringify } = require("../src/utils/general");
let options;
const args = require("yargs")
.env("DEMOCRATIC_CSI")
.scriptName("democratic-csi")
.usage("$0 [options]")
.option("driver-config-file", {
describe: "provide a path to driver config file",
config: true,
configParser: (path) => {
try {
options = JSON.parse(fs.readFileSync(path, "utf-8"));
return true;
} catch (e) {}
try {
options = yaml.load(fs.readFileSync(path, "utf8"));
return true;
} catch (e) {}
throw new Error("failed parsing config file: " + path);
},
})
.demandOption(["driver-config-file"], "driver-config-file is required")
.option("log-level", {
describe: "log level",
choices: ["error", "warn", "info", "verbose", "debug", "silly"],
})
.option("csi-version", {
describe: "versin of the csi spec to load",
choices: [
"0.2.0",
"0.3.0",
"1.0.0",
"1.1.0",
"1.2.0",
"1.3.0",
"1.4.0",
"1.5.0",
],
})
.demandOption(["csi-version"], "csi-version is required")
.option("csi-name", {
describe: "name to use for driver registration",
})
.demandOption(["csi-name"], "csi-name is required")
.option("csi-mode", {
describe: "mode of the controller",
choices: ["controller", "node"],
type: "array",
default: ["controller", "node"],
})
.demandOption(["csi-mode"], "csi-mode is required")
.option("server-address", {
describe: "listen address for the server",
type: "string",
})
.option("server-port", {
describe: "listen port for the server",
type: "number",
})
.option("server-socket", {
describe: "listen socket for the server",
type: "string",
})
.option("server-socket-permissions-mode", {
describe: "permissions on the socket file for the server",
type: "string",
default: "0600", // os default is 755
})
.version()
.help().argv;
if (!args.serverSocket && !args.serverAddress && !args.serverPort) {
console.log("must listen on tcp and/or unix socket");
process.exit(1);
}
//console.log(args);
const package = require("../package.json");
args.version = package.version;
const protoLoader = require("@grpc/proto-loader");
const LRU = require("lru-cache");
const cache = new LRU({ max: 500 });
const { logger } = require("../src/utils/logger");
const { GrpcError } = require("../src/utils/grpc");
const GeneralUtils = require("../src/utils/general");
if (args.logLevel) {
logger.level = args.logLevel;
}
const csiVersion = process.env.CSI_VERSION || args.csiVersion || "1.5.0";
const PROTO_PATH = __dirname + "/../csi_proto/csi-v" + csiVersion + ".proto";
// Suggested options for similarity to existing grpc.load behavior
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
});
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
const csi = protoDescriptor.csi.v1;
logger.info("initializing csi driver: %s", options.driver);
let driver;
try {
driver = require("../src/driver/factory").factory(
{ logger, args, cache, package, csiVersion },
options
);
} catch (err) {
logger.error(`${err.toString()} ${err.stack}`);
process.exit(1);
}
// TODO: consider alternatives that work 100% with replicas > 1
// doing this in memory is crude and currently only works reliably with 1
// replica per controller (due to potentional cross external-foo leaders,
// ie: snapshot operations can lock volumes delete, but different pods may be
// leader for each respectively)
// having said that, it should be very effective as is
let operationLock = new Set();
async function requestHandlerProxy(call, callback, serviceMethodName) {
const cleansedCall = JSON.parse(stringify(call));
delete cleansedCall.call;
delete cleansedCall.canceled;
for (const key in cleansedCall) {
if (key.startsWith("_")) {
delete cleansedCall[key];
}
}
for (const key in cleansedCall.request) {
if (key.includes("secret")) {
cleansedCall.request[key] = "redacted";
}
}
try {
logger.info(
"new request - driver: %s method: %s call: %j",
driver.constructor.name,
serviceMethodName,
cleansedCall
);
const lockKeys = GeneralUtils.lockKeysFromRequest(call, serviceMethodName);
if (lockKeys.length > 0) {
logger.debug("operation lock keys: %j", lockKeys);
// check locks
lockKeys.forEach((key) => {
if (operationLock.has(key)) {
throw new GrpcError(
grpc.status.ABORTED,
"operation locked due to in progress operation(s): " +
JSON.stringify(lockKeys)
);
}
});
}
// for testing purposes
//await GeneralUtils.sleep(10000);
// for CI/testing purposes
if (["NodePublishVolume", "NodeStageVolume"].includes(serviceMethodName)) {
await driver.setVolumeContextCache(
call.request.volume_id,
call.request.volume_context
);
}
let response;
let responseError;
try {
// aquire locks
if (lockKeys.length > 0) {
lockKeys.forEach((key) => {
operationLock.add(key);
});
}
response = await driver[serviceMethodName](call);
} catch (e) {
responseError = e;
} finally {
// release locks
if (lockKeys.length > 0) {
lockKeys.forEach((key) => {
operationLock.delete(key);
});
}
}
if (responseError) {
throw responseError;
}
// for CI/testing purposes
if (serviceMethodName == "CreateVolume") {
await driver.setVolumeContextCache(
response.volume.volume_id,
response.volume.volume_context
);
}
logger.info(
"new response - driver: %s method: %s response: %j",
driver.constructor.name,
serviceMethodName,
response
);
callback(null, response);
} catch (e) {
let message;
if (e instanceof Error) {
message = e.toString();
if (e.stack) {
message += ` ${e.stack}`;
}
} else {
message = stringify(e);
}
logger.error(
"handler error - driver: %s method: %s error: %s",
driver.constructor.name,
serviceMethodName,
message
);
if (e.name == "GrpcError") {
callback(e);
} else {
// TODO: only show real error string in development mode
message = true ? message : "unknown error, please inspect service logs";
callback({ code: grpc.status.INTERNAL, message });
}
}
}
function getServer() {
var server = new grpc.Server();
// Identity Service
server.addService(csi.Identity.service, {
async GetPluginInfo(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async GetPluginCapabilities(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async Probe(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
});
// Controller Service
if (args.csiMode.includes("controller")) {
server.addService(csi.Controller.service, {
async CreateVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async DeleteVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ControllerPublishVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ControllerUnpublishVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ValidateVolumeCapabilities(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ControllerGetVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ListVolumes(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async GetCapacity(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ControllerGetCapabilities(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async CreateSnapshot(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async DeleteSnapshot(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ListSnapshots(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ControllerExpandVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
});
}
// Node Service
if (args.csiMode.includes("node")) {
server.addService(csi.Node.service, {
async NodeStageVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeUnstageVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodePublishVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeUnpublishVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeGetVolumeStats(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeExpandVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeGetCapabilities(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeGetInfo(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
});
}
return server;
}
// https://grpc.github.io/grpc/node/grpc.Server.html
const csiServer = getServer();
let bindAddress = "";
let bindSocket = "";
if (args.serverAddress && args.serverPort) {
bindAddress = `${args.serverAddress}:${args.serverPort}`;
}
if (args.serverSocket) {
bindSocket = args.serverSocket || "";
if (!bindSocket.toLowerCase().startsWith("unix://")) {
bindSocket = "unix://" + bindSocket;
}
}
logger.info(
"starting csi server - name: %s, version: %s, driver: %s, mode: %s, csi version: %s, address: %s, socket: %s",
args.csiName,
args.version,
options.driver,
args.csiMode.join(","),
args.csiVersion,
bindAddress,
bindSocket
);
[`SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach(
(eventType) => {
process.on(eventType, async (code) => {
console.log(`running server shutdown, exit code: ${code}`);
// attempt clean shutdown of in-flight requests
try {
await new Promise((resolve, reject) => {
try {
csiServer.tryShutdown(() => {
resolve();
});
} catch (e) {
reject(e);
}
});
console.log(`grpc server gracefully closed all connections`);
} catch (e) {
console.log("failed to cleanly shutdown grpc server", e);
}
// NOTE: if the shutdown above finishes cleanly the socket will already be removed
let socketPath = bindSocket;
socketPath = socketPath.replace(/^unix:\/\//g, "");
if (socketPath && fs.existsSync(socketPath)) {
let fsStat = fs.statSync(socketPath);
if (fsStat.isSocket()) {
fs.unlinkSync(socketPath);
console.log(`removed grpc socket ${socketPath}`);
}
}
console.log("server fully shutdown, exiting");
process.exit(code);
});
}
);
if (process.env.LOG_MEMORY_USAGE == "1") {
setInterval(() => {
console.log("logging memory usages due to LOG_MEMORY_USAGE env var");
const used = process.memoryUsage();
for (let key in used) {
console.log(
`[${new Date()}] Memory Usage: ${key} ${
Math.round((used[key] / 1024 / 1024) * 100) / 100
} MB`
);
}
}, process.env.LOG_MEMORY_USAGE_INTERVAL || 5000);
}
if (process.env.MANUAL_GC == "1") {
setInterval(() => {
console.log("gc invoked due to MANUAL_GC env var");
try {
if (global.gc) {
global.gc();
}
} catch (e) {}
}, process.env.MANUAL_GC_INTERVAL || 60000);
}
if (process.env.LOG_GRPC_SESSIONS == "1") {
setInterval(() => {
console.log("dumping sessions");
try {
console.log(csiServer.sessions);
} catch (e) {}
}, 5000);
}
if (require.main === module) {
(async function () {
try {
if (bindAddress) {
await new Promise((resolve, reject) => {
csiServer.bindAsync(
bindAddress,
grpc.ServerCredentials.createInsecure(),
(err) => {
if (err) {
reject(err);
return;
}
resolve();
}
);
});
}
if (bindSocket) {
let socketPath = bindSocket;
socketPath = socketPath.replace(/^unix:\/\//g, "");
if (socketPath && fs.existsSync(socketPath)) {
let fsStat = fs.statSync(socketPath);
if (fsStat.isSocket()) {
fs.unlinkSync(socketPath);
}
}
await new Promise((resolve, reject) => {
csiServer.bindAsync(
bindSocket,
grpc.ServerCredentials.createInsecure(),
(err) => {
if (err) {
reject(err);
return;
}
resolve();
}
);
});
fs.chmodSync(socketPath, args["server-socket-permissions-mode"]);
}
csiServer.start();
} catch (e) {
console.log(e);
process.exit(1);
}
})();
}