1076 lines
34 KiB
JavaScript
1076 lines
34 KiB
JavaScript
const os = require("os");
|
|
const fs = require("fs");
|
|
const { GrpcError, grpc } = require("../utils/grpc");
|
|
const { Mount } = require("../utils/mount");
|
|
const { Filesystem } = require("../utils/filesystem");
|
|
const { ISCSI } = require("../utils/iscsi");
|
|
const semver = require("semver");
|
|
const sleep = require("../utils/general").sleep;
|
|
|
|
/**
|
|
* common code shared between all drivers
|
|
* this is **NOT** meant to work as a proxy
|
|
* for the grpc calls meaning, it should not
|
|
* also operate as a facade handling directly
|
|
* the requests to the platform
|
|
*/
|
|
class CsiBaseDriver {
|
|
constructor(ctx, options) {
|
|
this.ctx = ctx;
|
|
this.options = options;
|
|
}
|
|
|
|
/**
|
|
* abstract way of retrieving values from parameters/secrets
|
|
* in order of preference:
|
|
* - democratic-csi.org/{instance_id}/{key}
|
|
* - democratic-csi.org/{driver}/{key}
|
|
* - {key}
|
|
*
|
|
* @param {*} parameters
|
|
* @param {*} key
|
|
*/
|
|
getNormalizedParameterValue(parameters, key, driver, instance_id) {
|
|
const normalized = this.getNormalizedParameters(
|
|
parameters,
|
|
driver,
|
|
instance_id
|
|
);
|
|
return normalized[key];
|
|
}
|
|
|
|
getNormalizedParameters(parameters, driver, instance_id) {
|
|
const normalized = JSON.parse(JSON.stringify(parameters));
|
|
const base_key = "democratic-csi.org";
|
|
driver = driver || this.options.driver;
|
|
instance_id = instance_id || this.options.instance_id;
|
|
|
|
for (const key in parameters) {
|
|
let normalizedKey;
|
|
let prefixLength;
|
|
if (instance_id && key.startsWith(`${base_key}/${instance_id}/`)) {
|
|
prefixLength = `${base_key}/${instance_id}/`.length;
|
|
normalizedKey = key.slice(prefixLength);
|
|
normalized[normalizedKey] = parameters[key];
|
|
delete normalized[key];
|
|
}
|
|
|
|
if (driver && key.startsWith(`${base_key}/${driver}/`)) {
|
|
prefixLength = `${base_key}/${driver}/`.length;
|
|
normalizedKey = key.slice(prefixLength);
|
|
normalized[normalizedKey] = parameters[key];
|
|
delete normalized[key];
|
|
}
|
|
|
|
if (key.startsWith(`${base_key}/`)) {
|
|
prefixLength = `${base_key}/`.length;
|
|
normalizedKey = key.slice(prefixLength);
|
|
normalized[normalizedKey] = parameters[key];
|
|
delete normalized[key];
|
|
}
|
|
}
|
|
|
|
return normalized;
|
|
}
|
|
|
|
async GetPluginInfo(call) {
|
|
return {
|
|
name: this.ctx.args.csiName,
|
|
vendor_version: this.ctx.args.version,
|
|
};
|
|
}
|
|
|
|
async GetPluginCapabilities(call) {
|
|
let capabilities;
|
|
const response = {
|
|
capabilities: [],
|
|
};
|
|
|
|
//UNKNOWN = 0;
|
|
// CONTROLLER_SERVICE indicates that the Plugin provides RPCs for
|
|
// the ControllerService. Plugins SHOULD provide this capability.
|
|
// In rare cases certain plugins MAY wish to omit the
|
|
// ControllerService entirely from their implementation, but such
|
|
// SHOULD NOT be the common case.
|
|
// The presence of this capability determines whether the CO will
|
|
// attempt to invoke the REQUIRED ControllerService RPCs, as well
|
|
// as specific RPCs as indicated by ControllerGetCapabilities.
|
|
//CONTROLLER_SERVICE = 1;
|
|
|
|
// VOLUME_ACCESSIBILITY_CONSTRAINTS indicates that the volumes for
|
|
// this plugin MAY NOT be equally accessible by all nodes in the
|
|
// cluster. The CO MUST use the topology information returned by
|
|
// CreateVolumeRequest along with the topology information
|
|
// returned by NodeGetInfo to ensure that a given volume is
|
|
// accessible from a given node when scheduling workloads.
|
|
//VOLUME_ACCESSIBILITY_CONSTRAINTS = 2;
|
|
capabilities = this.options.service.identity.capabilities.service || [
|
|
"UNKNOWN",
|
|
];
|
|
|
|
capabilities.forEach((item) => {
|
|
response.capabilities.push({
|
|
service: { type: item },
|
|
});
|
|
});
|
|
|
|
//UNKNOWN = 0;
|
|
// ONLINE indicates that volumes may be expanded when published to
|
|
// a node. When a Plugin implements this capability it MUST
|
|
// implement either the EXPAND_VOLUME controller capability or the
|
|
// EXPAND_VOLUME node capability or both. When a plugin supports
|
|
// ONLINE volume expansion and also has the EXPAND_VOLUME
|
|
// controller capability then the plugin MUST support expansion of
|
|
// volumes currently published and available on a node. When a
|
|
// plugin supports ONLINE volume expansion and also has the
|
|
// EXPAND_VOLUME node capability then the plugin MAY support
|
|
// expansion of node-published volume via NodeExpandVolume.
|
|
//
|
|
// Example 1: Given a shared filesystem volume (e.g. GlusterFs),
|
|
// the Plugin may set the ONLINE volume expansion capability and
|
|
// implement ControllerExpandVolume but not NodeExpandVolume.
|
|
//
|
|
// Example 2: Given a block storage volume type (e.g. EBS), the
|
|
// Plugin may set the ONLINE volume expansion capability and
|
|
// implement both ControllerExpandVolume and NodeExpandVolume.
|
|
//
|
|
// Example 3: Given a Plugin that supports volume expansion only
|
|
// upon a node, the Plugin may set the ONLINE volume
|
|
// expansion capability and implement NodeExpandVolume but not
|
|
// ControllerExpandVolume.
|
|
//ONLINE = 1;
|
|
|
|
// OFFLINE indicates that volumes currently published and
|
|
// available on a node SHALL NOT be expanded via
|
|
// ControllerExpandVolume. When a plugin supports OFFLINE volume
|
|
// expansion it MUST implement either the EXPAND_VOLUME controller
|
|
// capability or both the EXPAND_VOLUME controller capability and
|
|
// the EXPAND_VOLUME node capability.
|
|
//
|
|
// Example 1: Given a block storage volume type (e.g. Azure Disk)
|
|
// that does not support expansion of "node-attached" (i.e.
|
|
// controller-published) volumes, the Plugin may indicate
|
|
// OFFLINE volume expansion support and implement both
|
|
// ControllerExpandVolume and NodeExpandVolume.
|
|
//OFFLINE = 2;
|
|
capabilities = this.options.service.identity.capabilities
|
|
.volume_expansion || ["UNKNOWN"];
|
|
|
|
capabilities.forEach((item) => {
|
|
response.capabilities.push({
|
|
volume_expansion: { type: item },
|
|
});
|
|
});
|
|
|
|
return response;
|
|
}
|
|
|
|
async Probe(call) {
|
|
return { ready: { value: true } };
|
|
}
|
|
|
|
async ControllerGetCapabilities(call) {
|
|
let capabilities;
|
|
const response = {
|
|
capabilities: [],
|
|
};
|
|
|
|
//UNKNOWN = 0;
|
|
//CREATE_DELETE_VOLUME = 1;
|
|
//PUBLISH_UNPUBLISH_VOLUME = 2;
|
|
//LIST_VOLUMES = 3;
|
|
//GET_CAPACITY = 4;
|
|
// Currently the only way to consume a snapshot is to create
|
|
// a volume from it. Therefore plugins supporting
|
|
// CREATE_DELETE_SNAPSHOT MUST support creating volume from
|
|
// snapshot.
|
|
//CREATE_DELETE_SNAPSHOT = 5;
|
|
//LIST_SNAPSHOTS = 6;
|
|
|
|
// Plugins supporting volume cloning at the storage level MAY
|
|
// report this capability. The source volume MUST be managed by
|
|
// the same plugin. Not all volume sources and parameters
|
|
// combinations MAY work.
|
|
//CLONE_VOLUME = 7;
|
|
|
|
// Indicates the SP supports ControllerPublishVolume.readonly
|
|
// field.
|
|
//PUBLISH_READONLY = 8;
|
|
|
|
// See VolumeExpansion for details.
|
|
//EXPAND_VOLUME = 9;
|
|
capabilities = this.options.service.controller.capabilities.rpc || [
|
|
"UNKNOWN",
|
|
];
|
|
|
|
capabilities.forEach((item) => {
|
|
response.capabilities.push({
|
|
rpc: { type: item },
|
|
});
|
|
});
|
|
|
|
return response;
|
|
}
|
|
|
|
async NodeGetCapabilities(call) {
|
|
let capabilities;
|
|
const response = {
|
|
capabilities: [],
|
|
};
|
|
|
|
//UNKNOWN = 0;
|
|
//STAGE_UNSTAGE_VOLUME = 1;
|
|
// If Plugin implements GET_VOLUME_STATS capability
|
|
// then it MUST implement NodeGetVolumeStats RPC
|
|
// call for fetching volume statistics.
|
|
//GET_VOLUME_STATS = 2;
|
|
// See VolumeExpansion for details.
|
|
//EXPAND_VOLUME = 3;
|
|
capabilities = this.options.service.node.capabilities.rpc || ["UNKNOWN"];
|
|
|
|
capabilities.forEach((item) => {
|
|
response.capabilities.push({
|
|
rpc: { type: item },
|
|
});
|
|
});
|
|
|
|
return response;
|
|
}
|
|
|
|
async NodeGetInfo(call) {
|
|
return {
|
|
node_id: process.env.CSI_NODE_ID || os.hostname(),
|
|
max_volumes_per_node: 0,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* https://kubernetes-csi.github.io/docs/raw-block.html
|
|
* --feature-gates=BlockVolume=true,CSIBlockVolume=true
|
|
*
|
|
* StagingTargetPath is always a directory even for block volumes
|
|
*
|
|
* NOTE: stage gets called every time publish does
|
|
*
|
|
* @param {*} call
|
|
*/
|
|
async NodeStageVolume(call) {
|
|
const driver = this;
|
|
const mount = new Mount();
|
|
const filesystem = new Filesystem();
|
|
const iscsi = new ISCSI();
|
|
let result;
|
|
let device;
|
|
|
|
const volume_id = call.request.volume_id;
|
|
const staging_target_path = call.request.staging_target_path;
|
|
const capability = call.request.volume_capability;
|
|
const access_type = capability.access_type || "mount";
|
|
const volume_context = call.request.volume_context;
|
|
let fs_type;
|
|
let mount_flags;
|
|
const node_attach_driver = volume_context.node_attach_driver;
|
|
const block_path = staging_target_path + "/block_device";
|
|
const bind_mount_flags = [];
|
|
bind_mount_flags.push("defaults");
|
|
|
|
const normalizedSecrets = this.getNormalizedParameters(
|
|
call.request.secrets,
|
|
call.request.volume_context.provisioner_driver,
|
|
call.request.volume_context.provisioner_driver_instance_id
|
|
);
|
|
|
|
if (access_type == "mount") {
|
|
fs_type = capability.mount.fs_type;
|
|
mount_flags = capability.mount.mount_flags || [];
|
|
// add secrets mount_flags
|
|
if (normalizedSecrets.mount_flags) {
|
|
mount_flags.push(normalizedSecrets.mount_flags);
|
|
}
|
|
mount_flags.push("defaults");
|
|
}
|
|
|
|
if (call.request.volume_context.provisioner_driver == "node-manual") {
|
|
result = await this.assertCapabilities([capability], node_attach_driver);
|
|
if (!result.valid) {
|
|
throw new GrpcError(
|
|
grpc.status.INVALID_ARGUMENT,
|
|
`invalid capability: ${result.message}`
|
|
);
|
|
}
|
|
} else {
|
|
result = await this.assertCapabilities([capability]);
|
|
if (!result.valid) {
|
|
throw new GrpcError(
|
|
grpc.status.INVALID_ARGUMENT,
|
|
`invalid capability: ${result.message}`
|
|
);
|
|
}
|
|
}
|
|
|
|
// csi spec stipulates that staging_target_path is a directory even for block mounts
|
|
result = await filesystem.pathExists(staging_target_path);
|
|
if (!result) {
|
|
await filesystem.mkdir(staging_target_path, ["-p", "-m", "0750"]);
|
|
}
|
|
|
|
switch (node_attach_driver) {
|
|
case "nfs":
|
|
device = `${volume_context.server}:${volume_context.share}`;
|
|
break;
|
|
case "smb":
|
|
device = `//${volume_context.server}/${volume_context.share}`;
|
|
break;
|
|
case "iscsi":
|
|
let portals = [];
|
|
if (volume_context.portal) {
|
|
portals.push(volume_context.portal.trim());
|
|
}
|
|
|
|
if (volume_context.portals) {
|
|
volume_context.portals.split(",").forEach((portal) => {
|
|
portals.push(portal.trim());
|
|
});
|
|
}
|
|
|
|
// ensure full portal value
|
|
portals = portals.map((value) => {
|
|
if (!value.includes(":")) {
|
|
value += ":3260";
|
|
}
|
|
|
|
return value.trim();
|
|
});
|
|
|
|
// ensure unique entries only
|
|
portals = [...new Set(portals)];
|
|
|
|
let iscsiDevices = [];
|
|
|
|
for (let portal of portals) {
|
|
// create DB entry
|
|
// https://library.netapp.com/ecmdocs/ECMP1654943/html/GUID-8EC685B4-8CB6-40D8-A8D5-031A3899BCDC.html
|
|
// put these options in place to force targets managed by csi to be explicitly attached (in the case of unclearn shutdown etc)
|
|
let nodeDB = {
|
|
"node.startup": "manual",
|
|
//"node.session.scan": "manual",
|
|
};
|
|
const nodeDBKeyPrefix = "node-db.";
|
|
for (const key in normalizedSecrets) {
|
|
if (key.startsWith(nodeDBKeyPrefix)) {
|
|
nodeDB[key.substr(nodeDBKeyPrefix.length)] =
|
|
normalizedSecrets[key];
|
|
}
|
|
}
|
|
await iscsi.iscsiadm.createNodeDBEntry(
|
|
volume_context.iqn,
|
|
portal,
|
|
nodeDB
|
|
);
|
|
// login
|
|
await iscsi.iscsiadm.login(volume_context.iqn, portal);
|
|
|
|
// get associated session
|
|
let session = await iscsi.iscsiadm.getSession(
|
|
volume_context.iqn,
|
|
portal
|
|
);
|
|
|
|
// rescan in scenarios when login previously occurred but volumes never appeared
|
|
await iscsi.iscsiadm.rescanSession(session);
|
|
|
|
// find device name
|
|
device = `/dev/disk/by-path/ip-${portal}-iscsi-${volume_context.iqn}-lun-${volume_context.lun}`;
|
|
let deviceByPath = device;
|
|
|
|
// can take some time for device to show up, loop for some period
|
|
result = await filesystem.pathExists(device);
|
|
let timer_start = Math.round(new Date().getTime() / 1000);
|
|
let timer_max = 30;
|
|
let deviceCreated = result;
|
|
while (!result) {
|
|
await sleep(2000);
|
|
result = await filesystem.pathExists(device);
|
|
|
|
if (result) {
|
|
deviceCreated = true;
|
|
break;
|
|
}
|
|
|
|
let current_time = Math.round(new Date().getTime() / 1000);
|
|
if (!result && current_time - timer_start > timer_max) {
|
|
driver.ctx.logger.warn(
|
|
`hit timeout waiting for device node to appear: ${device}`
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (deviceCreated) {
|
|
device = await filesystem.realpath(device);
|
|
iscsiDevices.push(device);
|
|
|
|
driver.ctx.logger.info(
|
|
`successfully logged into portal ${portal} and created device ${deviceByPath} with realpath ${device}`
|
|
);
|
|
}
|
|
}
|
|
|
|
// let things settle
|
|
// this will help in dm scenarios
|
|
await sleep(2000);
|
|
|
|
// filter duplicates
|
|
iscsiDevices = iscsiDevices.filter((value, index, self) => {
|
|
return self.indexOf(value) === index;
|
|
});
|
|
|
|
// only throw an error if we were not able to attach to *any* devices
|
|
if (iscsiDevices.length < 1) {
|
|
throw new GrpcError(
|
|
grpc.status.UNKNOWN,
|
|
`unable to attach any iscsi devices`
|
|
);
|
|
}
|
|
|
|
if (iscsiDevices.length != portals.length) {
|
|
driver.ctx.logger.warn(
|
|
`failed to attach all iscsi devices/targets/portals`
|
|
);
|
|
|
|
// TODO: allow a parameter to control this behavior in some form
|
|
if (false) {
|
|
throw new GrpcError(
|
|
grpc.status.UNKNOWN,
|
|
`unable to attach all iscsi devices`
|
|
);
|
|
}
|
|
}
|
|
|
|
// compare all device-mapper slaves with the newly created devices
|
|
// if any of the new devices are device-mapper slaves treat this as a
|
|
// multipath scenario
|
|
let allDeviceMapperSlaves = await filesystem.getAllDeviceMapperSlaveDevices();
|
|
let commonDevices = allDeviceMapperSlaves.filter((value) =>
|
|
iscsiDevices.includes(value)
|
|
);
|
|
|
|
const useMultipath = portals.length > 1 || commonDevices.length > 0;
|
|
|
|
// discover multipath device to use
|
|
if (useMultipath) {
|
|
device = await filesystem.getDeviceMapperDeviceFromSlaves(
|
|
iscsiDevices,
|
|
false
|
|
);
|
|
|
|
if (!device) {
|
|
throw new GrpcError(
|
|
grpc.status.UNKNOWN,
|
|
`failed to discover multipath device`
|
|
);
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
throw new GrpcError(
|
|
grpc.status.INVALID_ARGUMENT,
|
|
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
|
|
);
|
|
}
|
|
|
|
switch (access_type) {
|
|
case "mount":
|
|
switch (node_attach_driver) {
|
|
// block specific logic
|
|
case "iscsi":
|
|
if (await filesystem.isBlockDevice(device)) {
|
|
// format
|
|
result = await filesystem.deviceIsFormatted(device);
|
|
if (!result) {
|
|
await filesystem.formatDevice(device, fs_type);
|
|
}
|
|
|
|
let fs_info = await filesystem.getDeviceFilesystemInfo(device);
|
|
fs_type = fs_info.type;
|
|
|
|
// fsck
|
|
result = await mount.deviceIsMountedAtPath(
|
|
device,
|
|
staging_target_path
|
|
);
|
|
if (!result) {
|
|
// TODO: add a parameter to control this behavior
|
|
// https://github.com/democratic-csi/democratic-csi/issues/52#issuecomment-768463401
|
|
//await filesystem.checkFilesystem(device, fs_type);
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
result = await mount.deviceIsMountedAtPath(device, staging_target_path);
|
|
if (!result) {
|
|
await mount.mount(
|
|
device,
|
|
staging_target_path,
|
|
["-t", fs_type].concat(["-o", mount_flags.join(",")])
|
|
);
|
|
}
|
|
|
|
if (await filesystem.isBlockDevice(device)) {
|
|
// go ahead and expand fs (this covers cloned setups where expand is not explicitly invoked)
|
|
switch (fs_type) {
|
|
case "ext4":
|
|
case "ext3":
|
|
case "ext4dev":
|
|
//await filesystem.checkFilesystem(device, fs_info.type);
|
|
await filesystem.expandFilesystem(device, fs_type);
|
|
break;
|
|
case "xfs":
|
|
//await filesystem.checkFilesystem(device, fs_info.type);
|
|
await filesystem.expandFilesystem(staging_target_path, fs_type);
|
|
break;
|
|
default:
|
|
// unsupported filesystem
|
|
throw new GrpcError(
|
|
grpc.status.FAILED_PRECONDITION,
|
|
`unsupported/unknown filesystem ${fs_type}`
|
|
);
|
|
}
|
|
}
|
|
|
|
break;
|
|
case "block":
|
|
//result = await mount.deviceIsMountedAtPath(device, block_path);
|
|
result = await mount.deviceIsMountedAtPath("dev", block_path);
|
|
if (!result) {
|
|
result = await filesystem.pathExists(staging_target_path);
|
|
if (!result) {
|
|
await filesystem.mkdir(staging_target_path, ["-p", "-m", "0750"]);
|
|
}
|
|
|
|
result = await filesystem.pathExists(block_path);
|
|
if (!result) {
|
|
await filesystem.touch(block_path);
|
|
}
|
|
|
|
await mount.bindMount(device, block_path, [
|
|
"-o",
|
|
bind_mount_flags.join(","),
|
|
]);
|
|
}
|
|
break;
|
|
default:
|
|
throw new GrpcError(
|
|
grpc.status.INVALID_ARGUMENT,
|
|
`unknown/unsupported access_type: ${access_type}`
|
|
);
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
/**
|
|
* NOTE: only gets called when the last pod on the node using the volume is removed
|
|
*
|
|
* 1. unmount fs
|
|
* 2. logout of iscsi if neccessary
|
|
*
|
|
* @param {*} call
|
|
*/
|
|
async NodeUnstageVolume(call) {
|
|
const mount = new Mount();
|
|
const filesystem = new Filesystem();
|
|
const iscsi = new ISCSI();
|
|
let result;
|
|
let is_block = false;
|
|
let is_device_mapper = false;
|
|
let block_device_info;
|
|
let access_type = "mount";
|
|
|
|
const volume_id = call.request.volume_id;
|
|
const staging_target_path = call.request.staging_target_path;
|
|
const block_path = staging_target_path + "/block_device";
|
|
let normalized_staging_path = staging_target_path;
|
|
const umount_args = []; // --force
|
|
|
|
if (!staging_target_path) {
|
|
throw new GrpcError(
|
|
grpc.status.INVALID_ARGUMENT,
|
|
`missing staging_target_path`
|
|
);
|
|
}
|
|
|
|
//result = await mount.pathIsMounted(block_path);
|
|
//result = await mount.pathIsMounted(staging_target_path)
|
|
|
|
result = await mount.pathIsMounted(block_path);
|
|
if (result) {
|
|
is_block = true;
|
|
access_type = "block";
|
|
block_device_info = await filesystem.getBlockDevice(block_path);
|
|
normalized_staging_path = block_path;
|
|
} else {
|
|
result = await mount.pathIsMounted(staging_target_path);
|
|
if (result) {
|
|
let device = await mount.getMountPointDevice(staging_target_path);
|
|
result = await filesystem.isBlockDevice(device);
|
|
if (result) {
|
|
is_block = true;
|
|
block_device_info = await filesystem.getBlockDevice(device);
|
|
}
|
|
}
|
|
}
|
|
|
|
result = await mount.pathIsMounted(normalized_staging_path);
|
|
if (result) {
|
|
result = await mount.umount(normalized_staging_path, umount_args);
|
|
}
|
|
|
|
if (is_block) {
|
|
let realBlockDeviceInfos = [];
|
|
// detect if is a multipath device
|
|
is_device_mapper = await filesystem.isDeviceMapperDevice(
|
|
block_device_info.path
|
|
);
|
|
|
|
if (is_device_mapper) {
|
|
let realBlockDevices = await filesystem.getDeviceMapperDeviceSlaves(
|
|
block_device_info.path
|
|
);
|
|
for (const realBlockDevice of realBlockDevices) {
|
|
realBlockDeviceInfos.push(
|
|
await filesystem.getBlockDevice(realBlockDevice)
|
|
);
|
|
}
|
|
} else {
|
|
realBlockDeviceInfos = [block_device_info];
|
|
}
|
|
|
|
// TODO: this could be made async to detach all simultaneously
|
|
for (const block_device_info_i of realBlockDeviceInfos) {
|
|
if (block_device_info_i.tran == "iscsi") {
|
|
// figure out which iscsi session this belongs to and logout
|
|
// scan /dev/disk/by-path/ip-*?
|
|
// device = `/dev/disk/by-path/ip-${volume_context.portal}-iscsi-${volume_context.iqn}-lun-${volume_context.lun}`;
|
|
// parse output from `iscsiadm -m session -P 3`
|
|
let sessions = await iscsi.iscsiadm.getSessionsDetails();
|
|
for (let i = 0; i < sessions.length; i++) {
|
|
let session = sessions[i];
|
|
let is_attached_to_session = false;
|
|
|
|
if (
|
|
session.attached_scsi_devices &&
|
|
session.attached_scsi_devices.host &&
|
|
session.attached_scsi_devices.host.devices
|
|
) {
|
|
is_attached_to_session = session.attached_scsi_devices.host.devices.some(
|
|
(device) => {
|
|
if (device.attached_scsi_disk == block_device_info_i.name) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
);
|
|
}
|
|
|
|
if (is_attached_to_session) {
|
|
let timer_start;
|
|
let timer_max;
|
|
|
|
timer_start = Math.round(new Date().getTime() / 1000);
|
|
timer_max = 30;
|
|
let loggedOut = false;
|
|
while (!loggedOut) {
|
|
try {
|
|
await iscsi.iscsiadm.logout(session.target, [
|
|
session.persistent_portal,
|
|
]);
|
|
loggedOut = true;
|
|
} catch (err) {
|
|
await sleep(2000);
|
|
let current_time = Math.round(new Date().getTime() / 1000);
|
|
if (current_time - timer_start > timer_max) {
|
|
// not throwing error for now as future invocations would not enter code path anyhow
|
|
loggedOut = true;
|
|
//throw new GrpcError(
|
|
// grpc.status.UNKNOWN,
|
|
// `hit timeout trying to logout of iscsi target: ${session.persistent_portal}`
|
|
//);
|
|
}
|
|
}
|
|
}
|
|
|
|
timer_start = Math.round(new Date().getTime() / 1000);
|
|
timer_max = 30;
|
|
let deletedEntry = false;
|
|
while (!deletedEntry) {
|
|
try {
|
|
await iscsi.iscsiadm.deleteNodeDBEntry(
|
|
session.target,
|
|
session.persistent_portal
|
|
);
|
|
deletedEntry = true;
|
|
} catch (err) {
|
|
await sleep(2000);
|
|
let current_time = Math.round(new Date().getTime() / 1000);
|
|
if (current_time - timer_start > timer_max) {
|
|
// not throwing error for now as future invocations would not enter code path anyhow
|
|
deletedEntry = true;
|
|
//throw new GrpcError(
|
|
// grpc.status.UNKNOWN,
|
|
// `hit timeout trying to delete iscsi node DB entry: ${session.target}, ${session.persistent_portal}`
|
|
//);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (access_type == "block") {
|
|
// remove touched file
|
|
result = await filesystem.pathExists(block_path);
|
|
if (result) {
|
|
result = await filesystem.rm(block_path);
|
|
}
|
|
}
|
|
|
|
result = await filesystem.pathExists(staging_target_path);
|
|
if (result) {
|
|
result = await filesystem.rmdir(staging_target_path);
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
async NodePublishVolume(call) {
|
|
const mount = new Mount();
|
|
const filesystem = new Filesystem();
|
|
let result;
|
|
|
|
const volume_id = call.request.volume_id;
|
|
const staging_target_path = call.request.staging_target_path || "";
|
|
const target_path = call.request.target_path;
|
|
const capability = call.request.volume_capability;
|
|
const access_type = capability.access_type || "mount";
|
|
const readonly = call.request.readonly;
|
|
const volume_context = call.request.volume_context;
|
|
const bind_mount_flags = [];
|
|
const node_attach_driver = volume_context.node_attach_driver;
|
|
|
|
if (access_type == "mount") {
|
|
let mount_flags = capability.mount.mount_flags || [];
|
|
bind_mount_flags.push(...mount_flags);
|
|
}
|
|
|
|
bind_mount_flags.push("defaults");
|
|
if (readonly) bind_mount_flags.push("ro");
|
|
|
|
switch (node_attach_driver) {
|
|
case "nfs":
|
|
case "smb":
|
|
case "iscsi":
|
|
// ensure appropriate directories/files
|
|
switch (access_type) {
|
|
case "mount":
|
|
// ensure directory exists
|
|
result = await filesystem.pathExists(target_path);
|
|
if (!result) {
|
|
await filesystem.mkdir(target_path, ["-p", "-m", "0750"]);
|
|
}
|
|
|
|
break;
|
|
case "block":
|
|
// ensure target_path directory exists as target path should be a file
|
|
let target_dir = await filesystem.dirname(target_path);
|
|
result = await filesystem.pathExists(target_dir);
|
|
if (!result) {
|
|
await filesystem.mkdir(target_dir, ["-p", "-m", "0750"]);
|
|
}
|
|
|
|
// ensure target file exists
|
|
result = await filesystem.pathExists(target_path);
|
|
if (!result) {
|
|
await filesystem.touch(target_path);
|
|
}
|
|
break;
|
|
default:
|
|
throw new GrpcError(
|
|
grpc.status.INVALID_ARGUMENT,
|
|
`unsupported/unknown access_type ${access_type}`
|
|
);
|
|
}
|
|
|
|
// ensure bind mount
|
|
if (staging_target_path) {
|
|
let normalized_staging_device;
|
|
let normalized_staging_path;
|
|
|
|
if (access_type == "block") {
|
|
normalized_staging_path = staging_target_path + "/block_device";
|
|
} else {
|
|
normalized_staging_path = staging_target_path;
|
|
}
|
|
|
|
result = await mount.pathIsMounted(target_path);
|
|
// if not mounted, mount
|
|
if (!result) {
|
|
await mount.bindMount(normalized_staging_path, target_path, [
|
|
"-o",
|
|
bind_mount_flags.join(","),
|
|
]);
|
|
} else {
|
|
// if is mounted, ensure proper source
|
|
if (access_type == "block") {
|
|
normalized_staging_device = "dev"; // special syntax for single file bind mounts
|
|
} else {
|
|
normalized_staging_device = await mount.getMountPointDevice(
|
|
staging_target_path
|
|
);
|
|
}
|
|
result = await mount.deviceIsMountedAtPath(
|
|
normalized_staging_device,
|
|
target_path
|
|
);
|
|
if (!result) {
|
|
throw new GrpcError(
|
|
grpc.status.FAILED_PRECONDITION,
|
|
`it appears something else is already mounted at ${target_path}`
|
|
);
|
|
}
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
// unsupported filesystem
|
|
throw new GrpcError(
|
|
grpc.status.FAILED_PRECONDITION,
|
|
`only staged configurations are valid`
|
|
);
|
|
default:
|
|
throw new GrpcError(
|
|
grpc.status.INVALID_ARGUMENT,
|
|
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
|
|
);
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
async NodeUnpublishVolume(call) {
|
|
const mount = new Mount();
|
|
const filesystem = new Filesystem();
|
|
let result;
|
|
|
|
const volume_id = call.request.volume_id;
|
|
const target_path = call.request.target_path;
|
|
const umount_args = []; // --force
|
|
|
|
result = await mount.pathIsMounted(target_path);
|
|
if (result) {
|
|
result = await mount.umount(target_path, umount_args);
|
|
}
|
|
|
|
result = await filesystem.pathExists(target_path);
|
|
if (result) {
|
|
if (fs.lstatSync(target_path).isDirectory()) {
|
|
result = await filesystem.rmdir(target_path);
|
|
} else {
|
|
result = await filesystem.rm([target_path]);
|
|
}
|
|
}
|
|
|
|
return {};
|
|
}
|
|
|
|
async NodeGetVolumeStats(call) {
|
|
const driver = this;
|
|
const mount = new Mount();
|
|
const filesystem = new Filesystem();
|
|
let result;
|
|
let device_path;
|
|
let access_type;
|
|
const volume_id = call.request.volume_id;
|
|
const volume_path = call.request.volume_path;
|
|
const block_path = volume_path + "/block_device";
|
|
|
|
if (!volume_path) {
|
|
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_path`);
|
|
}
|
|
|
|
let res = {};
|
|
|
|
//VOLUME_CONDITION
|
|
if (
|
|
semver.satisfies(driver.ctx.csiVersion, ">=1.3.0") &&
|
|
options.service.node.capabilities.rpc.includes("VOLUME_CONDITION")
|
|
) {
|
|
// TODO: let drivers fill ths in
|
|
let abnormal = false;
|
|
let message = "OK";
|
|
res.volume_condition = { abnormal, message };
|
|
}
|
|
|
|
if (
|
|
(await mount.isBindMountedBlockDevice(volume_path)) ||
|
|
(await mount.isBindMountedBlockDevice(block_path))
|
|
) {
|
|
device_path = block_path;
|
|
access_type = "block";
|
|
} else {
|
|
device_path = volume_path;
|
|
access_type = "mount";
|
|
}
|
|
|
|
switch (access_type) {
|
|
case "mount":
|
|
result = await mount.getMountDetails(device_path);
|
|
|
|
res.usage = [
|
|
{
|
|
available: result.avail,
|
|
total: result.size,
|
|
used: result.used,
|
|
unit: "BYTES",
|
|
},
|
|
];
|
|
break;
|
|
case "block":
|
|
result = await filesystem.getBlockDevice(device_path);
|
|
|
|
res.usage = [
|
|
{
|
|
total: result.size,
|
|
unit: "BYTES",
|
|
},
|
|
];
|
|
break;
|
|
default:
|
|
throw new GrpcError(
|
|
grpc.status.INVALID_ARGUMENT,
|
|
`unsupported/unknown access_type ${access_type}`
|
|
);
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
/**
|
|
* https://kubernetes-csi.github.io/docs/volume-expansion.html
|
|
* allowVolumeExpansion: true
|
|
* --feature-gates=ExpandCSIVolumes=true
|
|
* --feature-gates=ExpandInUsePersistentVolumes=true
|
|
*
|
|
* @param {*} call
|
|
*/
|
|
async NodeExpandVolume(call) {
|
|
const mount = new Mount();
|
|
const filesystem = new Filesystem();
|
|
let device;
|
|
let fs_info;
|
|
let device_path;
|
|
let access_type;
|
|
let is_block = false;
|
|
let is_formatted;
|
|
let fs_type;
|
|
let is_device_mapper = false;
|
|
|
|
const volume_id = call.request.volume_id;
|
|
const volume_path = call.request.volume_path;
|
|
const block_path = volume_path + "/block_device";
|
|
const capacity_range = call.request.capacity_range;
|
|
const volume_capability = call.request.volume_capability;
|
|
|
|
if (!volume_path) {
|
|
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_path`);
|
|
}
|
|
|
|
if (
|
|
(await mount.isBindMountedBlockDevice(volume_path)) ||
|
|
(await mount.isBindMountedBlockDevice(block_path))
|
|
) {
|
|
access_type = "block";
|
|
device_path = block_path;
|
|
} else {
|
|
access_type = "mount";
|
|
device_path = volume_path;
|
|
}
|
|
|
|
try {
|
|
device = await mount.getMountPointDevice(device_path);
|
|
is_formatted = await filesystem.deviceIsFormatted(device);
|
|
is_block = await filesystem.isBlockDevice(device);
|
|
} catch (err) {
|
|
if (err.code == 1) {
|
|
throw new GrpcError(
|
|
grpc.status.FAILED_PRECONDITION,
|
|
`volume_path ${volume_path} is not currently mounted`
|
|
);
|
|
}
|
|
}
|
|
|
|
if (is_block) {
|
|
let rescan_devices = [];
|
|
// detect if is a multipath device
|
|
is_device_mapper = await filesystem.isDeviceMapperDevice(device);
|
|
if (is_device_mapper) {
|
|
// NOTE: want to make sure we scan the dm device *after* all the underlying slaves
|
|
rescan_devices = await filesystem.getDeviceMapperDeviceSlaves(device);
|
|
}
|
|
|
|
rescan_devices.push(device);
|
|
|
|
for (let sdevice of rescan_devices) {
|
|
await filesystem.rescanDevice(sdevice);
|
|
}
|
|
|
|
// let things settle
|
|
// it appears the dm devices can take a second to figure things out
|
|
await sleep(2000);
|
|
|
|
if (is_formatted && access_type == "mount") {
|
|
fs_info = await filesystem.getDeviceFilesystemInfo(device);
|
|
fs_type = fs_info.type;
|
|
if (fs_type) {
|
|
switch (fs_type) {
|
|
case "ext4":
|
|
case "ext3":
|
|
case "ext4dev":
|
|
//await filesystem.checkFilesystem(device, fs_info.type);
|
|
await filesystem.expandFilesystem(device, fs_type);
|
|
break;
|
|
case "xfs":
|
|
let mount_info = await mount.getMountDetails(device_path);
|
|
if (mount_info.fstype == "xfs") {
|
|
//await filesystem.checkFilesystem(device, fs_info.type);
|
|
await filesystem.expandFilesystem(device_path, fs_type);
|
|
}
|
|
break;
|
|
default:
|
|
// unsupported filesystem
|
|
throw new GrpcError(
|
|
grpc.status.FAILED_PRECONDITION,
|
|
`unsupported/unknown filesystem ${fs_type}`
|
|
);
|
|
}
|
|
}
|
|
} else {
|
|
//block device unformatted
|
|
return {};
|
|
}
|
|
} else {
|
|
// not block device
|
|
return {};
|
|
}
|
|
|
|
return {};
|
|
}
|
|
}
|
|
module.exports.CsiBaseDriver = CsiBaseDriver;
|