democratic-csi/src/driver/index.js

2655 lines
88 KiB
JavaScript

const _ = require("lodash");
const cp = require("child_process");
const os = require("os");
const fs = require("fs");
const CsiProxyClient = require("../utils/csi_proxy_client").CsiProxyClient;
const k8s = require("@kubernetes/client-node");
const { GrpcError, grpc } = require("../utils/grpc");
const { Mount } = require("../utils/mount");
const { OneClient } = require("../utils/oneclient");
const { Filesystem } = require("../utils/filesystem");
const { ISCSI } = require("../utils/iscsi");
const registry = require("../utils/registry");
const semver = require("semver");
const GeneralUtils = require("../utils/general");
const { Zetabyte } = require("../utils/zfs");
const __REGISTRY_NS__ = "CsiBaseDriver";
const NODE_OS_DRIVER_CSI_PROXY = "csi-proxy";
const NODE_OS_DRIVER_POSIX = "posix";
/**
* common code shared between all drivers
* this is **NOT** meant to work as a proxy
* for the grpc calls meaning, it should not
* also operate as a facade handling directly
* the requests to the platform
*/
class CsiBaseDriver {
constructor(ctx, options) {
this.ctx = ctx;
this.options = options || {};
if (!this.options.hasOwnProperty("node")) {
this.options.node = {};
}
if (!this.options.node.hasOwnProperty("format")) {
this.options.node.format = {};
}
if (!this.options.node.hasOwnProperty("mount")) {
this.options.node.mount = {};
}
if (!this.options.node.mount.hasOwnProperty("checkFilesystem")) {
this.options.node.mount.checkFilesystem = {};
}
}
/**
* abstract way of retrieving values from parameters/secrets
* in order of preference:
* - democratic-csi.org/{instance_id}/{key}
* - democratic-csi.org/{driver}/{key}
* - {key}
*
* @param {*} parameters
* @param {*} key
*/
getNormalizedParameterValue(parameters, key, driver, instance_id) {
const normalized = this.getNormalizedParameters(
parameters,
driver,
instance_id
);
return normalized[key];
}
getNormalizedParameters(parameters, driver, instance_id) {
const normalized = JSON.parse(JSON.stringify(parameters));
const base_key = "democratic-csi.org";
driver = driver || this.options.driver;
instance_id = instance_id || this.options.instance_id;
for (const key in parameters) {
let normalizedKey;
let prefixLength;
if (instance_id && key.startsWith(`${base_key}/${instance_id}/`)) {
prefixLength = `${base_key}/${instance_id}/`.length;
normalizedKey = key.slice(prefixLength);
normalized[normalizedKey] = parameters[key];
delete normalized[key];
}
if (driver && key.startsWith(`${base_key}/${driver}/`)) {
prefixLength = `${base_key}/${driver}/`.length;
normalizedKey = key.slice(prefixLength);
normalized[normalizedKey] = parameters[key];
delete normalized[key];
}
if (key.startsWith(`${base_key}/`)) {
prefixLength = `${base_key}/`.length;
normalizedKey = key.slice(prefixLength);
normalized[normalizedKey] = parameters[key];
delete normalized[key];
}
}
return normalized;
}
/**
* Get an instance of the Filesystem class
*
* @returns Filesystem
*/
getDefaultFilesystemInstance() {
return registry.get(
`${__REGISTRY_NS__}:default_filesystem_instance`,
() => {
return new Filesystem();
}
);
}
/**
* Get an instance of the Mount class
*
* @returns Mount
*/
getDefaultMountInstance() {
return registry.get(`${__REGISTRY_NS__}:default_mount_instance`, () => {
const filesystem = this.getDefaultFilesystemInstance();
return new Mount({ filesystem });
});
}
/**
* Get an instance of the ISCSI class
*
* @returns ISCSI
*/
getDefaultISCSIInstance() {
return registry.get(`${__REGISTRY_NS__}:default_iscsi_instance`, () => {
return new ISCSI();
});
}
getDefaultZetabyteInstance() {
return registry.get(`${__REGISTRY_NS__}:default_zb_instance`, () => {
return new Zetabyte({
idempotent: true,
paths: {
zfs: "zfs",
zpool: "zpool",
sudo: "sudo",
chroot: "chroot",
},
//logger: driver.ctx.logger,
executor: {
spawn: function () {
const command = `${arguments[0]} ${arguments[1].join(" ")}`;
return cp.exec(command);
},
},
log_commands: true,
});
});
}
getDefaultOneClientInstance() {
return registry.get(`${__REGISTRY_NS__}:default_oneclient_instance`, () => {
return new OneClient();
});
}
/**
*
* @returns CsiProxyClient
*/
getDefaultCsiProxyClientInstance() {
return registry.get(`${__REGISTRY_NS__}:default_csi_proxy_instance`, () => {
const options = {};
options.services = _.get(this.options, "node.csiProxy.services", {});
return new CsiProxyClient(options);
});
}
getDefaultKubernetsConfigInstance() {
return registry.get(
`${__REGISTRY_NS__}:default_kubernetes_config_instance`,
() => {
const kc = new k8s.KubeConfig();
kc.loadFromDefault();
return kc;
}
);
}
getCsiProxyEnabled() {
const defaultValue = process.platform == "win32";
return _.get(this.options, "node.csiProxy.enabled", defaultValue);
}
getNodeIsWindows() {
return process.platform == "win32";
}
__getNodeOsDriver() {
if (this.getNodeIsWindows() || this.getCsiProxyEnabled()) {
return NODE_OS_DRIVER_CSI_PROXY;
}
return NODE_OS_DRIVER_POSIX;
}
getMountFlagValue(mount_flags = [], flag = "") {
for (let i = mount_flags.length - 1; i >= 0; i--) {
const mount_flag = mount_flags[i];
if (mount_flag.startsWith(`${flag}=`)) {
return mount_flag.split("=", 2)[1] || "";
}
}
}
async getDerivedVolumeContextDriver() {
const driver = this;
let d = _.get(driver.options, "_private.csi.volume.volumeContext.driver");
if (
!d &&
(process.env.KUBERNETES_SERVICE_HOST ||
process.env.KUBERNETES_SERVICE_PORT)
) {
// test for k8s
d = "kubernetes";
}
if (!d) {
// test for Nomad
}
if (!d && process.env.CSI_SANITY == 1) {
d = "memory";
}
return d;
}
/**
* Used predominantly with windows due to limitations with the csi-proxy
*
* @param {*} call
* @returns
*/
async getDerivedVolumeContext(call) {
const driver = this;
const volume_id = call.request.volume_id;
const d = await driver.getDerivedVolumeContextDriver();
driver.ctx.logger.debug(`looking up volume_context using driver: ${d}`);
let volume_context;
switch (d) {
case "memory":
driver.volume_context_cache = driver.volume_context_cache || {};
volume_context = driver.volume_context_cache[volume_id];
break;
case "kubernetes":
const kc = driver.getDefaultKubernetsConfigInstance();
const k8sApi = kc.makeApiClient(k8s.CoreV1Api);
async function findPVByDriverHandle(driver, volumeHandle) {
if (!driver || !volumeHandle) {
return;
}
let pv;
let pvs;
let kcontinue;
do {
pvs = await k8sApi.listPersistentVolume(
undefined,
undefined,
kcontinue,
undefined,
undefined,
undefined // limit
);
pv = pvs.body.items.find((item) => {
return (
item.spec.csi.driver == driver &&
item.spec.csi.volumeHandle == volumeHandle
);
});
kcontinue = pvs.body.metadata._continue;
} while (!pv && pvs.body.metadata._continue);
return pv;
}
const pv = await findPVByDriverHandle(
driver.ctx.args.csiName,
volume_id
);
if (pv) {
volume_context = pv.spec.csi.volumeAttributes;
}
break;
default:
throw new Error(`unknow derived volume context driver: ${d}`);
}
//if (!volume_context) {
// throw new Error(`failed to retrieve volume_context for ${volume_id}`);
//}
driver.ctx.logger.debug(
"retrived derived volume_context %j",
volume_context
);
return volume_context;
}
/**
* Should only be used for testing purposes, generally these details should
* come from a CO or some other stateful storage mechanism
*
* @param {*} volume_id
* @param {*} volume_context
*/
async setVolumeContextCache(volume_id, volume_context) {
const driver = this;
if (process.env.CSI_SANITY == 1) {
if (!driver.volume_context_cache) {
driver.volume_context_cache = {};
}
if (!driver.volume_context_cache[volume_id]) {
driver.ctx.logger.debug(
"setting volume_context_cache %s %j",
volume_id,
volume_context
);
driver.volume_context_cache[volume_id] = volume_context;
}
}
}
/**
* Translates a `name` to a `volume_id`. Generally the purpose is to shorten
* the value of `volume_id` to play nicely with scenarios that do not support
* long names (ie: smb share, etc)
*
* @param {*} name
* @returns
*/
async getVolumeIdFromName(name) {
const driver = this;
const strategy = _.get(
driver.options,
"_private.csi.volume.idHash.strategy",
""
);
switch (strategy.toLowerCase()) {
case "md5":
return GeneralUtils.md5(name);
case "crc32":
return GeneralUtils.crc32(name);
case "crc16":
return GeneralUtils.crc16(name);
default:
return name;
}
}
async GetPluginInfo(call) {
return {
name: this.ctx.args.csiName,
vendor_version: this.ctx.args.version,
};
}
async GetPluginCapabilities(call) {
let capabilities;
const response = {
capabilities: [],
};
//UNKNOWN = 0;
// CONTROLLER_SERVICE indicates that the Plugin provides RPCs for
// the ControllerService. Plugins SHOULD provide this capability.
// In rare cases certain plugins MAY wish to omit the
// ControllerService entirely from their implementation, but such
// SHOULD NOT be the common case.
// The presence of this capability determines whether the CO will
// attempt to invoke the REQUIRED ControllerService RPCs, as well
// as specific RPCs as indicated by ControllerGetCapabilities.
//CONTROLLER_SERVICE = 1;
// VOLUME_ACCESSIBILITY_CONSTRAINTS indicates that the volumes for
// this plugin MAY NOT be equally accessible by all nodes in the
// cluster. The CO MUST use the topology information returned by
// CreateVolumeRequest along with the topology information
// returned by NodeGetInfo to ensure that a given volume is
// accessible from a given node when scheduling workloads.
//VOLUME_ACCESSIBILITY_CONSTRAINTS = 2;
capabilities = this.options.service.identity.capabilities.service || [
"UNKNOWN",
];
capabilities.forEach((item) => {
response.capabilities.push({
service: { type: item },
});
});
//UNKNOWN = 0;
// ONLINE indicates that volumes may be expanded when published to
// a node. When a Plugin implements this capability it MUST
// implement either the EXPAND_VOLUME controller capability or the
// EXPAND_VOLUME node capability or both. When a plugin supports
// ONLINE volume expansion and also has the EXPAND_VOLUME
// controller capability then the plugin MUST support expansion of
// volumes currently published and available on a node. When a
// plugin supports ONLINE volume expansion and also has the
// EXPAND_VOLUME node capability then the plugin MAY support
// expansion of node-published volume via NodeExpandVolume.
//
// Example 1: Given a shared filesystem volume (e.g. GlusterFs),
// the Plugin may set the ONLINE volume expansion capability and
// implement ControllerExpandVolume but not NodeExpandVolume.
//
// Example 2: Given a block storage volume type (e.g. EBS), the
// Plugin may set the ONLINE volume expansion capability and
// implement both ControllerExpandVolume and NodeExpandVolume.
//
// Example 3: Given a Plugin that supports volume expansion only
// upon a node, the Plugin may set the ONLINE volume
// expansion capability and implement NodeExpandVolume but not
// ControllerExpandVolume.
//ONLINE = 1;
// OFFLINE indicates that volumes currently published and
// available on a node SHALL NOT be expanded via
// ControllerExpandVolume. When a plugin supports OFFLINE volume
// expansion it MUST implement either the EXPAND_VOLUME controller
// capability or both the EXPAND_VOLUME controller capability and
// the EXPAND_VOLUME node capability.
//
// Example 1: Given a block storage volume type (e.g. Azure Disk)
// that does not support expansion of "node-attached" (i.e.
// controller-published) volumes, the Plugin may indicate
// OFFLINE volume expansion support and implement both
// ControllerExpandVolume and NodeExpandVolume.
//OFFLINE = 2;
capabilities = this.options.service.identity.capabilities
.volume_expansion || ["UNKNOWN"];
capabilities.forEach((item) => {
response.capabilities.push({
volume_expansion: { type: item },
});
});
return response;
}
async Probe(call) {
return { ready: { value: true } };
}
async ControllerGetCapabilities(call) {
let capabilities;
const response = {
capabilities: [],
};
//UNKNOWN = 0;
//CREATE_DELETE_VOLUME = 1;
//PUBLISH_UNPUBLISH_VOLUME = 2;
//LIST_VOLUMES = 3;
//GET_CAPACITY = 4;
// Currently the only way to consume a snapshot is to create
// a volume from it. Therefore plugins supporting
// CREATE_DELETE_SNAPSHOT MUST support creating volume from
// snapshot.
//CREATE_DELETE_SNAPSHOT = 5;
//LIST_SNAPSHOTS = 6;
// Plugins supporting volume cloning at the storage level MAY
// report this capability. The source volume MUST be managed by
// the same plugin. Not all volume sources and parameters
// combinations MAY work.
//CLONE_VOLUME = 7;
// Indicates the SP supports ControllerPublishVolume.readonly
// field.
//PUBLISH_READONLY = 8;
// See VolumeExpansion for details.
//EXPAND_VOLUME = 9;
capabilities = this.options.service.controller.capabilities.rpc || [
"UNKNOWN",
];
capabilities.forEach((item) => {
response.capabilities.push({
rpc: { type: item },
});
});
return response;
}
async NodeGetCapabilities(call) {
let capabilities;
const response = {
capabilities: [],
};
//UNKNOWN = 0;
//STAGE_UNSTAGE_VOLUME = 1;
// If Plugin implements GET_VOLUME_STATS capability
// then it MUST implement NodeGetVolumeStats RPC
// call for fetching volume statistics.
//GET_VOLUME_STATS = 2;
// See VolumeExpansion for details.
//EXPAND_VOLUME = 3;
capabilities = this.options.service.node.capabilities.rpc || ["UNKNOWN"];
capabilities.forEach((item) => {
response.capabilities.push({
rpc: { type: item },
});
});
return response;
}
async NodeGetInfo(call) {
return {
node_id: process.env.CSI_NODE_ID || os.hostname(),
max_volumes_per_node: 0,
};
}
/**
* https://kubernetes-csi.github.io/docs/raw-block.html
* --feature-gates=BlockVolume=true,CSIBlockVolume=true
*
* StagingTargetPath is always a directory even for block volumes
*
* NOTE: stage gets called every time publish does
*
* @param {*} call
*/
async NodeStageVolume(call) {
const driver = this;
const mount = driver.getDefaultMountInstance();
const filesystem = driver.getDefaultFilesystemInstance();
const iscsi = driver.getDefaultISCSIInstance();
let result;
let device;
let block_device_info;
const volume_id = call.request.volume_id;
if (!volume_id) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_id`);
}
const staging_target_path = call.request.staging_target_path;
if (!staging_target_path) {
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`missing staging_target_path`
);
}
const capability = call.request.volume_capability;
if (!capability || Object.keys(capability).length === 0) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing capability`);
}
const access_type = capability.access_type || "mount";
const volume_context = call.request.volume_context;
let fs_type = _.get(capability, "mount.fs_type");
let mount_flags;
let volume_mount_group;
const node_attach_driver = volume_context.node_attach_driver;
const block_path = staging_target_path + "/block_device";
const bind_mount_flags = [];
bind_mount_flags.push("defaults");
const normalizedSecrets = this.getNormalizedParameters(
call.request.secrets,
call.request.volume_context.provisioner_driver,
call.request.volume_context.provisioner_driver_instance_id
);
/*
let mount_options = await mount.getMountOptions(staging_target_path);
console.log(mount_options);
console.log(await mount.getMountOptionValue(mount_options, "stripe"));
console.log(await mount.getMountOptionPresent(mount_options, "stripee"));
throw new Error("foobar");
*/
if (access_type == "mount") {
mount_flags = capability.mount.mount_flags || [];
// yaml mount_flags
if (_.get(driver.options, "node.mount.mount_flags")) {
mount_flags.push(
..._.get(driver.options, "node.mount.mount_flags").split(",")
);
}
// add secrets mount_flags
if (normalizedSecrets.mount_flags) {
mount_flags.push(...normalizedSecrets.mount_flags.split(","));
}
switch (node_attach_driver) {
case "oneclient":
// move along
break;
default:
mount_flags.push("defaults");
// https://github.com/karelzak/util-linux/issues/1429
//mount_flags.push("x-democratic-csi.managed");
//mount_flags.push("x-democratic-csi.staged");
break;
}
if (
semver.satisfies(driver.ctx.csiVersion, ">=1.5.0") &&
driver.options.service.node.capabilities.rpc.includes(
"VOLUME_MOUNT_GROUP"
)
) {
volume_mount_group = capability.mount.volume_mount_group; // in k8s this is derrived from the fsgroup in the pod security context
}
}
if (call.request.volume_context.provisioner_driver == "node-manual") {
result = await this.assertCapabilities([capability], node_attach_driver);
if (!result.valid) {
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`invalid capability: ${result.message}`
);
}
} else {
result = await this.assertCapabilities([capability]);
if (!result.valid) {
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`invalid capability: ${result.message}`
);
}
}
switch (driver.__getNodeOsDriver()) {
case NODE_OS_DRIVER_POSIX:
// csi spec stipulates that staging_target_path is a directory even for block mounts
result = await filesystem.pathExists(staging_target_path);
if (!result) {
await filesystem.mkdir(staging_target_path, ["-p", "-m", "0750"]);
}
// get the `device` set
switch (node_attach_driver) {
case "nfs":
case "lustre":
device = `${volume_context.server}:${volume_context.share}`;
break;
case "smb":
device = `//${volume_context.server}/${volume_context.share}`;
// if not present add guest
let has_username = mount_flags.some((element) => {
element = element.trim().toLowerCase();
return element.startsWith("username=");
});
// prevents driver from hanging on stdin waiting for a password to be entered at the cli
if (!has_username) {
let has_guest = mount_flags.some((element) => {
element = element.trim().toLowerCase();
return element === "guest";
});
if (!has_guest) {
mount_flags.push("guest");
}
if (volume_mount_group) {
mount_flags.push(`gid=${volume_mount_group}`);
}
}
break;
case "iscsi":
let portals = [];
if (volume_context.portal) {
portals.push(volume_context.portal.trim());
}
if (volume_context.portals) {
volume_context.portals.split(",").forEach((portal) => {
portals.push(portal.trim());
});
}
// ensure full portal value
portals = portals.map((value) => {
if (!value.includes(":")) {
value += ":3260";
}
return value.trim();
});
// ensure unique entries only
portals = [...new Set(portals)];
// stores actual device paths after iscsi login
let iscsiDevices = [];
// stores configuration of targets/iqn/luns to connect to
let iscsiConnections = [];
for (let portal of portals) {
iscsiConnections.push({
portal,
iqn: volume_context.iqn,
lun: volume_context.lun,
});
}
/**
* TODO: allow sending in iscsiConnection in a raw/manual format
* TODO: allow option to determine if send_targets should be invoked
* TODO: allow option to control whether nodedb entry should be created by driver
* TODO: allow option to control whether nodedb entry should be deleted by driver
*/
for (let iscsiConnection of iscsiConnections) {
// create DB entry
// https://library.netapp.com/ecmdocs/ECMP1654943/html/GUID-8EC685B4-8CB6-40D8-A8D5-031A3899BCDC.html
// put these options in place to force targets managed by csi to be explicitly attached (in the case of unclearn shutdown etc)
let nodeDB = {
"node.startup": "manual",
//"node.session.scan": "manual",
};
const nodeDBKeyPrefix = "node-db.";
for (const key in normalizedSecrets) {
if (key.startsWith(nodeDBKeyPrefix)) {
nodeDB[key.substr(nodeDBKeyPrefix.length)] =
normalizedSecrets[key];
}
}
// create 'DB' entry
await iscsi.iscsiadm.createNodeDBEntry(
iscsiConnection.iqn,
iscsiConnection.portal,
nodeDB
);
// login
await iscsi.iscsiadm.login(
iscsiConnection.iqn,
iscsiConnection.portal
);
// get associated session
let session = await iscsi.iscsiadm.getSession(
iscsiConnection.iqn,
iscsiConnection.portal
);
// rescan in scenarios when login previously occurred but volumes never appeared
await iscsi.iscsiadm.rescanSession(session);
// find device name
device = `/dev/disk/by-path/ip-${iscsiConnection.portal}-iscsi-${iscsiConnection.iqn}-lun-${iscsiConnection.lun}`;
let deviceByPath = device;
// can take some time for device to show up, loop for some period
result = await filesystem.pathExists(device);
let timer_start = Math.round(new Date().getTime() / 1000);
let timer_max = 30;
let deviceCreated = result;
while (!result) {
await GeneralUtils.sleep(2000);
result = await filesystem.pathExists(device);
if (result) {
deviceCreated = true;
break;
}
let current_time = Math.round(new Date().getTime() / 1000);
if (!result && current_time - timer_start > timer_max) {
driver.ctx.logger.warn(
`hit timeout waiting for device node to appear: ${device}`
);
break;
}
}
if (deviceCreated) {
device = await filesystem.realpath(device);
iscsiDevices.push(device);
driver.ctx.logger.info(
`successfully logged into portal ${iscsiConnection.portal} and created device ${deviceByPath} with realpath ${device}`
);
}
}
// let things settle
// this will help in dm scenarios
await GeneralUtils.sleep(2000);
// filter duplicates
iscsiDevices = iscsiDevices.filter((value, index, self) => {
return self.indexOf(value) === index;
});
// only throw an error if we were not able to attach to *any* devices
if (iscsiDevices.length < 1) {
throw new GrpcError(
grpc.status.UNKNOWN,
`unable to attach any iscsi devices`
);
}
if (iscsiDevices.length != iscsiConnections.length) {
driver.ctx.logger.warn(
`failed to attach all iscsi devices/targets/portals`
);
// TODO: allow a parameter to control this behavior in some form
if (false) {
throw new GrpcError(
grpc.status.UNKNOWN,
`unable to attach all iscsi devices`
);
}
}
// compare all device-mapper slaves with the newly created devices
// if any of the new devices are device-mapper slaves treat this as a
// multipath scenario
let allDeviceMapperSlaves =
await filesystem.getAllDeviceMapperSlaveDevices();
let commonDevices = allDeviceMapperSlaves.filter((value) =>
iscsiDevices.includes(value)
);
const useMultipath =
iscsiConnections.length > 1 || commonDevices.length > 0;
// discover multipath device to use
if (useMultipath) {
device = await filesystem.getDeviceMapperDeviceFromSlaves(
iscsiDevices,
false
);
if (!device) {
throw new GrpcError(
grpc.status.UNKNOWN,
`failed to discover multipath device`
);
}
}
break;
case "hostpath":
result = await mount.pathIsMounted(staging_target_path);
// if not mounted, mount
if (!result) {
await mount.bindMount(volume_context.path, staging_target_path);
return {};
} else {
return {};
}
break;
case "oneclient":
let oneclient = driver.getDefaultOneClientInstance();
device = "oneclient";
result = await mount.deviceIsMountedAtPath(
device,
staging_target_path
);
if (result) {
return {};
}
if (volume_context.space_names) {
volume_context.space_names.split(",").forEach((space) => {
mount_flags.push("--space", space);
});
}
if (volume_context.space_ids) {
volume_context.space_ids.split(",").forEach((space) => {
mount_flags.push("--space-id", space);
});
}
if (normalizedSecrets.token) {
mount_flags.push("-t", normalizedSecrets.token);
} else {
if (volume_context.token) {
mount_flags.push("-t", volume_context.token);
}
}
result = await oneclient.mount(
staging_target_path,
["-H", volume_context.server].concat(mount_flags)
);
if (result) {
return {};
}
throw new GrpcError(
grpc.status.UNKNOWN,
`failed to mount oneclient: ${volume_context.server}`
);
break;
case "zfs-local":
// TODO: make this a geneic zb instance (to ensure works with node-manual driver)
const zb = driver.getDefaultZetabyteInstance();
result = await zb.zfs.get(`${volume_context.zfs_asset_name}`, [
"type",
"mountpoint",
]);
result = result[`${volume_context.zfs_asset_name}`];
switch (result.type.value) {
case "filesystem":
if (result.mountpoint.value != "legacy") {
// zfs set mountpoint=legacy <dataset>
// zfs inherit mountpoint <dataset>
await zb.zfs.set(`${volume_context.zfs_asset_name}`, {
mountpoint: "legacy",
});
}
device = `${volume_context.zfs_asset_name}`;
if (!fs_type) {
fs_type = "zfs";
}
break;
case "volume":
device = `/dev/zvol/${volume_context.zfs_asset_name}`;
break;
default:
throw new GrpcError(
grpc.status.UNKNOWN,
`unknown zfs asset type: ${result.type.value}`
);
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
);
}
// deal with `device` now that we have one
switch (access_type) {
case "mount":
let is_block = false;
switch (node_attach_driver) {
case "iscsi":
is_block = true;
break;
case "zfs-local":
is_block = device.startsWith("/dev/zvol/");
break;
}
// format device
if (is_block) {
// block specific logic
if (!fs_type) {
fs_type = "ext4";
}
if (fs_type == "ntfs") {
block_device_info = await filesystem.getBlockDevice(device);
let partition_count =
await filesystem.getBlockDevicePartitionCount(device);
if (partition_count > 0) {
device = await filesystem.getBlockDeviceLargestPartition(
device
);
} else {
// partion/gpt
await filesystem.partitionDevice(
device,
"gpt",
"EBD0A0A2-B9E5-4433-87C0-68B6B72699C7"
);
device = await filesystem.getBlockDeviceLargestPartition(
device
);
}
}
if (await filesystem.isBlockDevice(device)) {
// format
result = await filesystem.deviceIsFormatted(device);
if (!result) {
let formatOptions = _.get(
driver.options.node.format,
[fs_type, "customOptions"],
[]
);
if (!Array.isArray(formatOptions)) {
formatOptions = [];
}
await filesystem.formatDevice(device, fs_type, formatOptions);
}
let fs_info = await filesystem.getDeviceFilesystemInfo(device);
fs_type = fs_info.type;
// fsck
result = await mount.deviceIsMountedAtPath(
device,
staging_target_path
);
if (!result) {
// https://github.com/democratic-csi/democratic-csi/issues/52#issuecomment-768463401
let checkFilesystem =
driver.options.node.mount.checkFilesystem[fs_type] || {};
if (checkFilesystem.enabled) {
await filesystem.checkFilesystem(
device,
fs_type,
checkFilesystem.customOptions || [],
checkFilesystem.customFilesystemOptions || []
);
}
}
}
}
// set default fs_type if still unset
if (!fs_type) {
switch (node_attach_driver) {
case "nfs":
fs_type = "nfs";
break;
case "lustre":
fs_type = "lustre";
break;
case "smb":
fs_type = "cifs";
break;
case "iscsi":
fs_type = "ext4";
break;
default:
break;
}
}
// mount `device`
result = await mount.deviceIsMountedAtPath(
device,
staging_target_path
);
if (!result) {
// expand fs if necessary
if (await filesystem.isBlockDevice(device)) {
// go ahead and expand fs (this covers cloned setups where expand is not explicitly invoked)
switch (fs_type) {
case "exfat":
case "ntfs":
case "vfat":
//await filesystem.checkFilesystem(device, fs_info.type);
await filesystem.expandFilesystem(device, fs_type);
break;
}
}
let mount_fs_type = fs_type;
if (mount_fs_type == "ntfs") {
mount_fs_type = "ntfs3";
}
// handle volume_mount_group where appropriate
if (volume_mount_group) {
switch (fs_type) {
case "exfat":
case "ntfs":
case "vfat":
mount_flags.push(`gid=${volume_mount_group}`);
break;
}
}
await mount.mount(
device,
staging_target_path,
["-t", mount_fs_type].concat(["-o", mount_flags.join(",")])
);
}
// expand fs if necessary
if (await filesystem.isBlockDevice(device)) {
// go ahead and expand fs (this covers cloned setups where expand is not explicitly invoked)
switch (fs_type) {
case "ext3":
case "ext4":
case "ext4dev":
//await filesystem.checkFilesystem(device, fs_info.type);
try {
await filesystem.expandFilesystem(device, fs_type);
} catch (err) {
// mount is clean and rw, but it will not expand until clean umount has been done
// failed to execute filesystem command: resize2fs /dev/sda, response: {"code":1,"stdout":"Couldn't find valid filesystem superblock.\n","stderr":"resize2fs 1.44.5 (15-Dec-2018)\nresize2fs: Superblock checksum does not match superblock while trying to open /dev/sda\n"}
// /dev/sda on /var/lib/kubelet/plugins/kubernetes.io/csi/pv/pvc-4a80757e-5e87-475d-826f-44fcc4719348/globalmount type ext4 (rw,relatime,stripe=256)
if (
err.code == 1 &&
err.stdout.includes("find valid filesystem superblock") &&
err.stderr.includes("checksum does not match superblock")
) {
driver.ctx.logger.warn(
`successful mount, unsuccessful fs resize: attempting abnormal umount/mount/resize2fs to clear things up ${staging_target_path} (${device})`
);
// try an unmount/mount/fsck cycle again just to clean things up
await mount.umount(staging_target_path, []);
await mount.mount(
device,
staging_target_path,
["-t", fs_type].concat(["-o", mount_flags.join(",")])
);
await filesystem.expandFilesystem(device, fs_type);
} else {
throw err;
}
}
break;
case "btrfs":
case "xfs":
//await filesystem.checkFilesystem(device, fs_info.type);
await filesystem.expandFilesystem(
staging_target_path,
fs_type
);
break;
case "exfat":
case "ntfs":
case "vfat":
// noop
break;
default:
// unsupported filesystem
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`unsupported/unknown filesystem ${fs_type}`
);
}
}
break;
case "block":
//result = await mount.deviceIsMountedAtPath(device, block_path);
result = await mount.deviceIsMountedAtPath("dev", block_path);
if (!result) {
result = await filesystem.pathExists(staging_target_path);
if (!result) {
await filesystem.mkdir(staging_target_path, [
"-p",
"-m",
"0750",
]);
}
result = await filesystem.pathExists(block_path);
if (!result) {
await filesystem.touch(block_path);
}
await mount.bindMount(device, block_path, [
"-o",
bind_mount_flags.join(","),
]);
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported access_type: ${access_type}`
);
}
break;
case NODE_OS_DRIVER_CSI_PROXY:
// sanity check node_attach_driver
if (!["smb", "iscsi"].includes(node_attach_driver)) {
throw new GrpcError(
grpc.status.UNIMPLEMENTED,
`csi-proxy does not work with node_attach_driver: ${node_attach_driver}`
);
}
// sanity check fs_type
if (fs_type && !["ntfs", "cifs"].includes(fs_type)) {
throw new GrpcError(
grpc.status.UNIMPLEMENTED,
`csi-proxy does not work with fs_type: ${fs_type}`
);
}
// load up the client instance
const csiProxyClient = driver.getDefaultCsiProxyClientInstance();
switch (node_attach_driver) {
case "smb":
/**
* smb mount creates a link at this location and if the dir already exists
* it explodes
*
* if path exists but is NOT symlink delete it
*/
result = await csiProxyClient.FilesystemPathExists(
staging_target_path
);
if (result) {
result = await csiProxyClient.FilesystemIsSymlink(
staging_target_path
);
if (!result) {
await csiProxyClient.executeRPC("filesystem", "Rmdir", {
path: staging_target_path,
});
}
}
device = `//${volume_context.server}/${volume_context.share}`;
const username = driver.getMountFlagValue(mount_flags, "username");
const password = driver.getMountFlagValue(mount_flags, "password");
if (!username || !password) {
throw new Error("username and password required");
}
try {
await csiProxyClient.executeRPC("smb", "NewSmbGlobalMapping", {
// convert path separator for windows style path
remote_path:
filesystem.covertUnixSeparatorToWindowsSeparator(device),
local_path: staging_target_path,
username: `${volume_context.server}\\${username}`,
password,
});
} catch (e) {
let details = _.get(e, "details", "");
if (!details.includes("ResourceExists")) {
throw e;
} else {
// path should be a symlink if already present
result = await csiProxyClient.executeRPC(
"filesystem",
"IsSymlink",
{ path: staging_target_path }
);
if (!_.get(result, "is_symlink", false)) {
throw e;
}
}
}
break;
case "iscsi":
switch (access_type) {
case "mount":
let portals = [];
if (volume_context.portal) {
portals.push(volume_context.portal.trim());
}
if (volume_context.portals) {
volume_context.portals.split(",").forEach((portal) => {
portals.push(portal.trim());
});
}
// ensure full portal value
portals = portals.map((value) => {
if (!value.includes(":")) {
value += ":3260";
}
return value.trim();
});
// ensure unique entries only
portals = [...new Set(portals)];
// stores actual device paths after iscsi login
let iscsiDevices = [];
// stores configuration of targets/iqn/luns to connect to
let iscsiConnections = [];
for (let portal of portals) {
iscsiConnections.push({
portal,
iqn: volume_context.iqn,
lun: volume_context.lun,
});
}
// no multipath support yet
// https://github.com/kubernetes-csi/csi-proxy/pull/99
for (let iscsiConnection of iscsiConnections) {
// add target portal
let parts = iscsiConnection.portal.split(":");
let target_address = parts[0];
let target_port = parts[1] || "3260";
let target_portal = {
target_address,
target_port,
};
// this is idempotent
await csiProxyClient.executeRPC("iscsi", "AddTargetPortal", {
target_portal,
});
// login
try {
let auth_type = "NONE";
let chap_username = "";
let chap_secret = "";
if (
normalizedSecrets[
"node-db.node.session.auth.authmethod"
] == "CHAP"
) {
// set auth_type
if (
normalizedSecrets[
"node-db.node.session.auth.username"
] &&
normalizedSecrets[
"node-db.node.session.auth.password"
] &&
normalizedSecrets[
"node-db.node.session.auth.username_in"
] &&
normalizedSecrets[
"node-db.node.session.auth.password_in"
]
) {
auth_type = "MUTUAL_CHAP";
} else if (
normalizedSecrets[
"node-db.node.session.auth.username"
] &&
normalizedSecrets["node-db.node.session.auth.password"]
) {
auth_type = "ONE_WAY_CHAP";
}
// set credentials
if (
normalizedSecrets[
"node-db.node.session.auth.username"
] &&
normalizedSecrets["node-db.node.session.auth.password"]
) {
chap_username =
normalizedSecrets[
"node-db.node.session.auth.username"
];
chap_secret =
normalizedSecrets[
"node-db.node.session.auth.password"
];
}
}
await csiProxyClient.executeRPC("iscsi", "ConnectTarget", {
target_portal,
iqn: iscsiConnection.iqn,
/**
* NONE
* ONE_WAY_CHAP
* MUTUAL_CHAP
*/
auth_type,
chap_username,
chap_secret,
});
} catch (e) {
let details = _.get(e, "details", "");
if (
!details.includes(
"The target has already been logged in via an iSCSI session"
)
) {
throw e;
}
}
// discover?
//await csiProxyClient.executeRPC("iscsi", "DiscoverTargetPortal", {
// target_portal,
//});
// rescan
await csiProxyClient.executeRPC("disk", "Rescan");
// get device
result = await csiProxyClient.executeRPC(
"iscsi",
"GetTargetDisks",
{
target_portal,
iqn: iscsiConnection.iqn,
}
);
// TODO: this is a gross assumption since we currently only allow 1 lun per target
// iterate this response and find disk
//result = await csiProxyClient.executeRPC("disk", "ListDiskLocations");
let diskIds = _.get(result, "diskIDs", []);
if (diskIds.length != 1) {
throw new Error(
`${diskIds.length} disks on the target, no way to know which is the relevant disk`
);
}
let disk_number = diskIds[0];
result = await csiProxyClient.executeRPC(
"volume",
"ListVolumesOnDisk",
{ disk_number }
);
let node_volume_id;
node_volume_id =
await csiProxyClient.getVolumeIdFromDiskNumber(disk_number);
if (!node_volume_id) {
// this is technically idempotent call so should not hurt anything if already initialized
await csiProxyClient.executeRPC("disk", "PartitionDisk", {
disk_number,
});
node_volume_id =
await csiProxyClient.getVolumeIdFromDiskNumber(
disk_number
);
}
if (!node_volume_id) {
throw new Error(
"failed to create/discover volume for disk"
);
}
result = await csiProxyClient.executeRPC(
"volume",
"IsVolumeFormatted",
{ volume_id: node_volume_id }
);
// format device
if (!result.formatted) {
await csiProxyClient.executeRPC("volume", "FormatVolume", {
volume_id: node_volume_id,
});
}
// ensure staging path present
result = await csiProxyClient.FilesystemPathExists(
staging_target_path
);
if (!result) {
await csiProxyClient.executeRPC("filesystem", "Mkdir", {
path: staging_target_path,
});
}
// mount up!
try {
result = await csiProxyClient.executeRPC(
"volume",
"MountVolume",
{
volume_id: node_volume_id,
target_path: staging_target_path,
}
);
} catch (e) {
// assume for now that if something is mounted in the location it the desired volume
let details = _.get(e, "details", "");
if (
!details.includes(
"The requested access path is already in use"
)
) {
throw e;
}
}
// let things settle
// this will help in dm scenarios
await GeneralUtils.sleep(2000);
// windows does not support multipath currently
// break if we make it this far
break;
}
break;
case "block":
default:
throw new GrpcError(
grpc.status.UNIMPLEMENTED,
`access_type ${access_type} unsupported`
);
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
);
break;
}
break;
default:
throw new GrpcError(
grpc.status.UNIMPLEMENTED,
`unkown NODE OS DRIVER: ${driver.__getNodeOsDriver()}`
);
}
return {};
}
/**
* NOTE: only gets called when the last pod on the node using the volume is removed
*
* 1. unmount fs
* 2. logout of iscsi if neccessary
*
* @param {*} call
*/
async NodeUnstageVolume(call) {
const driver = this;
const mount = driver.getDefaultMountInstance();
const filesystem = driver.getDefaultFilesystemInstance();
const iscsi = driver.getDefaultISCSIInstance();
let result;
let is_block = false;
let is_device_mapper = false;
let block_device_info;
let access_type = "mount";
const volume_id = call.request.volume_id;
if (!volume_id) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_id`);
}
const staging_target_path = call.request.staging_target_path;
if (!staging_target_path) {
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`missing staging_target_path`
);
}
const block_path = staging_target_path + "/block_device";
let normalized_staging_path = staging_target_path;
const umount_args = [];
const umount_force_extra_args = ["--force", "--lazy"];
//result = await mount.pathIsMounted(block_path);
//result = await mount.pathIsMounted(staging_target_path)
// TODO: use the x-* mount options to detect if we should delete target
switch (driver.__getNodeOsDriver()) {
case NODE_OS_DRIVER_POSIX:
try {
result = await mount.pathIsMounted(block_path);
} catch (err) {
/**
* on stalled fs such as nfs, even findmnt will return immediately for the base mount point
* so in the case of timeout here (base mount point and then a file/folder beneath it) we almost certainly are not a block device
* AND the fs is probably stalled
*/
if (err.timeout) {
driver.ctx.logger.warn(
`detected stale mount, attempting to force unmount: ${normalized_staging_path}`
);
await mount.umount(
normalized_staging_path,
umount_args.concat(umount_force_extra_args)
);
result = false; // assume we are *NOT* a block device at this point
} else {
throw err;
}
}
if (result) {
is_block = true;
access_type = "block";
block_device_info = await filesystem.getBlockDevice(block_path);
normalized_staging_path = block_path;
} else {
result = await mount.pathIsMounted(staging_target_path);
if (result) {
let device = await mount.getMountPointDevice(staging_target_path);
result = await filesystem.isBlockDevice(device);
if (result) {
is_block = true;
block_device_info = await filesystem.getBlockDevice(device);
}
}
}
result = await mount.pathIsMounted(normalized_staging_path);
if (result) {
try {
result = await mount.umount(normalized_staging_path, umount_args);
} catch (err) {
if (err.timeout) {
driver.ctx.logger.warn(
`hit timeout waiting to unmount path: ${normalized_staging_path}`
);
result = await mount.getMountDetails(normalized_staging_path);
switch (result.fstype) {
case "nfs":
case "nfs4":
driver.ctx.logger.warn(
`detected stale nfs filesystem, attempting to force unmount: ${normalized_staging_path}`
);
result = await mount.umount(
normalized_staging_path,
umount_args.concat(umount_force_extra_args)
);
break;
default:
throw err;
}
} else {
throw err;
}
}
}
if (is_block) {
let realBlockDeviceInfos = [];
// detect if is a multipath device
is_device_mapper = await filesystem.isDeviceMapperDevice(
block_device_info.path
);
if (is_device_mapper) {
let realBlockDevices = await filesystem.getDeviceMapperDeviceSlaves(
block_device_info.path
);
for (const realBlockDevice of realBlockDevices) {
realBlockDeviceInfos.push(
await filesystem.getBlockDevice(realBlockDevice)
);
}
} else {
realBlockDeviceInfos = [block_device_info];
}
// TODO: this could be made async to detach all simultaneously
for (const block_device_info_i of realBlockDeviceInfos) {
if (await filesystem.deviceIsIscsi(block_device_info_i.path)) {
let parent_block_device = await filesystem.getBlockDeviceParent(
block_device_info_i.path
);
// figure out which iscsi session this belongs to and logout
// scan /dev/disk/by-path/ip-*?
// device = `/dev/disk/by-path/ip-${volume_context.portal}-iscsi-${volume_context.iqn}-lun-${volume_context.lun}`;
// parse output from `iscsiadm -m session -P 3`
let sessions = await iscsi.iscsiadm.getSessionsDetails();
for (let i = 0; i < sessions.length; i++) {
let session = sessions[i];
let is_attached_to_session = false;
if (
session.attached_scsi_devices &&
session.attached_scsi_devices.host &&
session.attached_scsi_devices.host.devices
) {
is_attached_to_session =
session.attached_scsi_devices.host.devices.some(
(device) => {
if (
device.attached_scsi_disk == parent_block_device.name
) {
return true;
}
return false;
}
);
}
if (is_attached_to_session) {
let timer_start;
let timer_max;
timer_start = Math.round(new Date().getTime() / 1000);
timer_max = 30;
let loggedOut = false;
while (!loggedOut) {
try {
await iscsi.iscsiadm.logout(session.target, [
session.persistent_portal,
]);
loggedOut = true;
} catch (err) {
await GeneralUtils.sleep(2000);
let current_time = Math.round(
new Date().getTime() / 1000
);
if (current_time - timer_start > timer_max) {
// not throwing error for now as future invocations would not enter code path anyhow
loggedOut = true;
//throw new GrpcError(
// grpc.status.UNKNOWN,
// `hit timeout trying to logout of iscsi target: ${session.persistent_portal}`
//);
}
}
}
timer_start = Math.round(new Date().getTime() / 1000);
timer_max = 30;
let deletedEntry = false;
while (!deletedEntry) {
try {
await iscsi.iscsiadm.deleteNodeDBEntry(
session.target,
session.persistent_portal
);
deletedEntry = true;
} catch (err) {
await GeneralUtils.sleep(2000);
let current_time = Math.round(
new Date().getTime() / 1000
);
if (current_time - timer_start > timer_max) {
// not throwing error for now as future invocations would not enter code path anyhow
deletedEntry = true;
//throw new GrpcError(
// grpc.status.UNKNOWN,
// `hit timeout trying to delete iscsi node DB entry: ${session.target}, ${session.persistent_portal}`
//);
}
}
}
}
}
}
}
}
if (access_type == "block") {
// remove touched file
result = await filesystem.pathExists(block_path);
if (result) {
result = await filesystem.rm(block_path);
}
}
result = await filesystem.pathExists(staging_target_path);
if (result) {
result = await filesystem.rmdir(staging_target_path);
}
break;
case NODE_OS_DRIVER_CSI_PROXY:
// load up the client instance
const csiProxyClient = driver.getDefaultCsiProxyClientInstance();
// for testing purposes
const volume_context = await driver.getDerivedVolumeContext(call);
if (!volume_context) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`unable to retrieve volume_context for volume: ${volume_id}`
);
}
const node_attach_driver = volume_context.node_attach_driver;
async function removePath(p) {
// remove staging path
try {
await csiProxyClient.executeRPC("filesystem", "Rmdir", {
path: p,
// remove all contents under the directory
//force: false,
});
} catch (e) {
let details = _.get(e, "details", "");
if (
!details.includes("The system cannot find the file specified")
) {
throw e;
}
}
}
switch (node_attach_driver) {
case "smb":
try {
await csiProxyClient.executeRPC("smb", "RemoveSmbGlobalMapping", {
remote_path: `\\\\${volume_context.server}\\${volume_context.share}`,
});
} catch (e) {
let details = _.get(e, "details", "");
if (!details.includes("No MSFT_SmbGlobalMapping objects found")) {
throw e;
}
}
break;
case "iscsi":
let target_portal = {
target_address: volume_context.portal.split(":")[0],
target_port: volume_context.portal.split(":")[1] || 3260,
};
let iqn = volume_context.iqn;
let node_volume_id;
// ok to be null/undefined
node_volume_id = await csiProxyClient.getVolumeIdFromIscsiTarget(
target_portal,
iqn
);
if (node_volume_id) {
// write volume cache
await csiProxyClient.executeRPC("volume", "WriteVolumeCache", {
volume_id: node_volume_id,
});
// umount first
try {
await csiProxyClient.executeRPC("volume", "UnmountVolume", {
volume_id: node_volume_id,
target_path: staging_target_path,
});
} catch (e) {
let details = _.get(e, "details", "");
if (!details.includes("The access path is not valid")) {
throw e;
}
}
}
try {
await csiProxyClient.executeRPC("iscsi", "DisconnectTarget", {
target_portal,
iqn,
});
} catch (e) {
let details = _.get(e, "details", "");
if (!details.includes("ObjectNotFound")) {
throw e;
}
}
try {
await csiProxyClient.executeRPC("iscsi", "RemoveTargetPortal", {
target_portal,
});
} catch (e) {
let details = _.get(e, "details", "");
if (!details.includes("ObjectNotFound")) {
throw e;
}
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
);
}
// remove staging path
await removePath(normalized_staging_path);
break;
default:
throw new GrpcError(
grpc.status.UNIMPLEMENTED,
`unkown NODE OS DRIVER: ${driver.__getNodeOsDriver()}`
);
}
return {};
}
async NodePublishVolume(call) {
const driver = this;
const mount = driver.getDefaultMountInstance();
const filesystem = driver.getDefaultFilesystemInstance();
let result;
const volume_id = call.request.volume_id;
if (!volume_id) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_id`);
}
const staging_target_path = call.request.staging_target_path || "";
const target_path = call.request.target_path;
if (!target_path) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing target_path`);
}
const capability = call.request.volume_capability;
if (!capability || Object.keys(capability).length === 0) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing capability`);
}
const access_type = capability.access_type || "mount";
let mount_flags;
let volume_mount_group;
const readonly = call.request.readonly;
const volume_context = call.request.volume_context;
const bind_mount_flags = [];
const node_attach_driver = volume_context.node_attach_driver;
if (access_type == "mount") {
mount_flags = capability.mount.mount_flags || [];
bind_mount_flags.push(...mount_flags);
if (
semver.satisfies(driver.ctx.csiVersion, ">=1.5.0") &&
driver.options.service.node.capabilities.rpc.includes(
"VOLUME_MOUNT_GROUP"
)
) {
volume_mount_group = capability.mount.volume_mount_group; // in k8s this is derrived from the fsgroup in the pod security context
}
}
bind_mount_flags.push("defaults");
// https://github.com/karelzak/util-linux/issues/1429
//bind_mount_flags.push("x-democratic-csi.managed");
//bind_mount_flags.push("x-democratic-csi.published");
if (readonly) bind_mount_flags.push("ro");
// , "x-democratic-csi.ro"
switch (driver.__getNodeOsDriver()) {
case NODE_OS_DRIVER_POSIX:
switch (node_attach_driver) {
case "nfs":
case "smb":
case "lustre":
case "oneclient":
case "hostpath":
case "iscsi":
case "zfs-local":
// ensure appropriate directories/files
switch (access_type) {
case "mount":
// ensure directory exists
result = await filesystem.pathExists(target_path);
if (!result) {
await filesystem.mkdir(target_path, ["-p", "-m", "0750"]);
}
break;
case "block":
// ensure target_path directory exists as target path should be a file
let target_dir = await filesystem.dirname(target_path);
result = await filesystem.pathExists(target_dir);
if (!result) {
await filesystem.mkdir(target_dir, ["-p", "-m", "0750"]);
}
// ensure target file exists
result = await filesystem.pathExists(target_path);
if (!result) {
await filesystem.touch(target_path);
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unsupported/unknown access_type ${access_type}`
);
}
// ensure bind mount
if (staging_target_path) {
let normalized_staging_device;
let normalized_staging_path;
if (access_type == "block") {
normalized_staging_path = staging_target_path + "/block_device";
} else {
normalized_staging_path = staging_target_path;
}
// sanity check to ensure the staged path is actually mounted
result = await mount.pathIsMounted(normalized_staging_path);
if (!result) {
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`staging path is not mounted: ${normalized_staging_path}`
);
}
result = await mount.pathIsMounted(target_path);
// if not mounted, mount
if (!result) {
await mount.bindMount(normalized_staging_path, target_path, [
"-o",
bind_mount_flags.join(","),
]);
} else {
// if is mounted, ensure proper source
if (access_type == "block") {
normalized_staging_device = "dev"; // special syntax for single file bind mounts
} else {
normalized_staging_device = await mount.getMountPointDevice(
staging_target_path
);
}
result = await mount.deviceIsMountedAtPath(
normalized_staging_device,
target_path
);
if (!result) {
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`it appears something else is already mounted at ${target_path}`
);
}
}
return {};
}
// unsupported filesystem
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`only staged configurations are valid`
);
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
);
}
break;
case NODE_OS_DRIVER_CSI_PROXY:
switch (node_attach_driver) {
//case "nfs":
case "smb":
//case "lustre":
//case "oneclient":
//case "hostpath":
case "iscsi":
//case "zfs-local":
// ensure appropriate directories/files
switch (access_type) {
case "mount":
break;
case "block":
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unsupported/unknown access_type ${access_type}`
);
}
// ensure bind mount
if (staging_target_path) {
const csiProxyClient = driver.getDefaultCsiProxyClientInstance();
let normalized_staging_path;
if (access_type == "block") {
normalized_staging_path = staging_target_path + "/block_device";
} else {
normalized_staging_path = staging_target_path;
}
// source path
result = await csiProxyClient.FilesystemPathExists(
normalized_staging_path
);
if (!result) {
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`staging path is not mounted: ${normalized_staging_path}`
);
}
// target path
result = await csiProxyClient.FilesystemPathExists(target_path);
// already published
if (result) {
result = await csiProxyClient.FilesystemIsSymlink(target_path);
if (!result) {
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`target path exists but is not a symlink as it should be: ${target_path}`
);
}
return {};
}
// create symlink
await csiProxyClient.executeRPC("filesystem", "CreateSymlink", {
source_path: normalized_staging_path,
target_path,
});
return {};
}
// unsupported filesystem
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`only staged configurations are valid`
);
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
);
}
break;
default:
throw new GrpcError(
grpc.status.UNIMPLEMENTED,
`unkown NODE OS DRIVER: ${driver.__getNodeOsDriver()}`
);
}
}
async NodeUnpublishVolume(call) {
const driver = this;
const mount = driver.getDefaultMountInstance();
const filesystem = driver.getDefaultFilesystemInstance();
let result;
const volume_id = call.request.volume_id;
if (!volume_id) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_id`);
}
const target_path = call.request.target_path;
if (!target_path) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing target_path`);
}
const umount_args = [];
const umount_force_extra_args = ["--force", "--lazy"];
switch (driver.__getNodeOsDriver()) {
case NODE_OS_DRIVER_POSIX:
try {
result = await mount.pathIsMounted(target_path);
} catch (err) {
// running findmnt on non-existant paths return immediately
// the only time this should timeout is on a stale fs
// so if timeout is hit we should be near certain it is indeed mounted
if (err.timeout) {
driver.ctx.logger.warn(
`detected stale mount, attempting to force unmount: ${target_path}`
);
await mount.umount(
target_path,
umount_args.concat(umount_force_extra_args)
);
result = false; // assume we have fully unmounted
} else {
throw err;
}
}
if (result) {
try {
result = await mount.umount(target_path, umount_args);
} catch (err) {
if (err.timeout) {
driver.ctx.logger.warn(
`hit timeout waiting to unmount path: ${target_path}`
);
// bind mounts do show the 'real' fs details
result = await mount.getMountDetails(target_path);
switch (result.fstype) {
case "nfs":
case "nfs4":
driver.ctx.logger.warn(
`detected stale nfs filesystem, attempting to force unmount: ${target_path}`
);
result = await mount.umount(
target_path,
umount_args.concat(umount_force_extra_args)
);
break;
default:
throw err;
}
} else {
throw err;
}
}
}
result = await filesystem.pathExists(target_path);
if (result) {
if (fs.lstatSync(target_path).isDirectory()) {
result = await filesystem.rmdir(target_path);
} else {
result = await filesystem.rm([target_path]);
}
}
break;
case NODE_OS_DRIVER_CSI_PROXY:
const csiProxyClient = driver.getDefaultCsiProxyClientInstance();
result = await csiProxyClient.FilesystemPathExists(target_path);
if (!result) {
return {};
}
result = await csiProxyClient.executeRPC("filesystem", "IsSymlink", {
path: target_path,
});
if (!result.is_symlink) {
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`target path is not a symlink ${target_path}`
);
}
await csiProxyClient.executeRPC("filesystem", "Rmdir", {
path: target_path,
});
break;
default:
throw new GrpcError(
grpc.status.UNIMPLEMENTED,
`unkown NODE OS DRIVER: ${driver.__getNodeOsDriver()}`
);
}
return {};
}
async NodeGetVolumeStats(call) {
const driver = this;
const mount = driver.getDefaultMountInstance();
const filesystem = driver.getDefaultFilesystemInstance();
let result;
let device_path;
let access_type;
const volume_id = call.request.volume_id;
if (!volume_id) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_id`);
}
const volume_path = call.request.volume_path;
const block_path = volume_path + "/block_device";
if (!volume_path) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_path`);
}
let res = {};
//VOLUME_CONDITION
if (
semver.satisfies(driver.ctx.csiVersion, ">=1.3.0") &&
driver.options.service.node.capabilities.rpc.includes("VOLUME_CONDITION")
) {
// TODO: let drivers fill ths in
let abnormal = false;
let message = "OK";
res.volume_condition = { abnormal, message };
}
switch (driver.__getNodeOsDriver()) {
case NODE_OS_DRIVER_POSIX:
if (
(await mount.isBindMountedBlockDevice(volume_path)) ||
(await mount.isBindMountedBlockDevice(block_path))
) {
device_path = block_path;
access_type = "block";
} else {
device_path = volume_path;
access_type = "mount";
}
switch (access_type) {
case "mount":
if (!(await mount.pathIsMounted(device_path))) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`nothing mounted at path: ${device_path}`
);
}
result = await mount.getMountDetails(device_path, [
"avail",
"size",
"used",
]);
res.usage = [
{
available: result.avail,
total: result.size,
used: result.used,
unit: "BYTES",
},
];
break;
case "block":
if (!(await filesystem.pathExists(device_path))) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`nothing mounted at path: ${device_path}`
);
}
result = await filesystem.getBlockDevice(device_path);
res.usage = [
{
total: result.size,
unit: "BYTES",
},
];
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unsupported/unknown access_type ${access_type}`
);
}
break;
case NODE_OS_DRIVER_CSI_PROXY:
const csiProxyClient = driver.getDefaultCsiProxyClientInstance();
const volume_context = await driver.getDerivedVolumeContext(call);
if (!volume_context) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`unable to retrieve volume_context for volume: ${volume_id}`
);
}
const node_attach_driver = volume_context.node_attach_driver;
// ensure path is mounted
result = await csiProxyClient.FilesystemPathExists(volume_path);
if (!result) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`volume_path ${volume_path} is not currently mounted`
);
}
switch (node_attach_driver) {
case "smb":
res.usage = [{ total: 0, unit: "BYTES" }];
break;
case "iscsi":
let node_volume_id =
await csiProxyClient.getVolumeIdFromIscsiTarget(
volume_context.portal,
volume_context.iqn
);
result = await csiProxyClient.executeRPC(
"volume",
"GetVolumeStats",
{
volume_id: node_volume_id,
}
);
res.usage = [
{
available: result.total_bytes - result.used_bytes,
total: result.total_bytes,
used: result.used_bytes,
unit: "BYTES",
},
];
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
);
}
break;
default:
throw new GrpcError(
grpc.status.UNIMPLEMENTED,
`unkown NODE OS DRIVER: ${driver.__getNodeOsDriver()}`
);
}
return res;
}
/**
* https://kubernetes-csi.github.io/docs/volume-expansion.html
* allowVolumeExpansion: true
* --feature-gates=ExpandCSIVolumes=true
* --feature-gates=ExpandInUsePersistentVolumes=true
*
* @param {*} call
*/
async NodeExpandVolume(call) {
const driver = this;
const mount = driver.getDefaultMountInstance();
const filesystem = driver.getDefaultFilesystemInstance();
let device;
let fs_info;
let device_path;
let access_type;
let is_block = false;
let is_formatted;
let fs_type;
let is_device_mapper = false;
let result;
const volume_id = call.request.volume_id;
if (!volume_id) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_id`);
}
const volume_path = call.request.volume_path;
if (!volume_path) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_path`);
}
const block_path = volume_path + "/block_device";
const capacity_range = call.request.capacity_range;
const volume_capability = call.request.volume_capability;
switch (driver.__getNodeOsDriver()) {
case NODE_OS_DRIVER_POSIX:
if (
(await mount.isBindMountedBlockDevice(volume_path)) ||
(await mount.isBindMountedBlockDevice(block_path))
) {
access_type = "block";
device_path = block_path;
} else {
access_type = "mount";
device_path = volume_path;
}
try {
device = await mount.getMountPointDevice(device_path);
is_formatted = await filesystem.deviceIsFormatted(device);
is_block = await filesystem.isBlockDevice(device);
} catch (err) {
if (err.code == 1) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`volume_path ${volume_path} is not currently mounted`
);
}
}
if (is_block) {
let rescan_devices = [];
// detect if is a multipath device
is_device_mapper = await filesystem.isDeviceMapperDevice(device);
if (is_device_mapper) {
// NOTE: want to make sure we scan the dm device *after* all the underlying slaves
rescan_devices = await filesystem.getDeviceMapperDeviceSlaves(
device
);
}
rescan_devices.push(device);
for (let sdevice of rescan_devices) {
// TODO: technically rescan is only relevant/available for remote drives
// such as iscsi etc, should probably limit this call as appropriate
// for now crudely checking the scenario inside the method itself
await filesystem.rescanDevice(sdevice);
}
// let things settle
// it appears the dm devices can take a second to figure things out
if (is_device_mapper || true) {
await GeneralUtils.sleep(2000);
}
if (is_formatted && access_type == "mount") {
fs_info = await filesystem.getDeviceFilesystemInfo(device);
fs_type = fs_info.type;
if (fs_type) {
switch (fs_type) {
case "ext3":
case "ext4":
case "ext4dev":
//await filesystem.checkFilesystem(device, fs_info.type);
await filesystem.expandFilesystem(device, fs_type);
break;
case "btrfs":
case "xfs":
let mount_info = await mount.getMountDetails(device_path);
if (["btrfs", "xfs"].includes(mount_info.fstype)) {
//await filesystem.checkFilesystem(device, fs_info.type);
await filesystem.expandFilesystem(device_path, fs_type);
}
break;
case "exfat":
case "ntfs":
case "vfat":
// TODO: return error here, cannot be expanded while online
//await filesystem.checkFilesystem(device, fs_info.type);
//await filesystem.expandFilesystem(device, fs_type);
break;
default:
// unsupported filesystem
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`unsupported/unknown filesystem ${fs_type}`
);
}
}
} else {
//block device unformatted
return {};
}
} else {
// not block device
return {};
}
break;
case NODE_OS_DRIVER_CSI_PROXY:
const csiProxyClient = driver.getDefaultCsiProxyClientInstance();
const volume_context = await driver.getDerivedVolumeContext(call);
if (!volume_context) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`unable to retrieve volume_context for volume: ${volume_id}`
);
}
const node_attach_driver = volume_context.node_attach_driver;
// ensure path is mounted
result = await csiProxyClient.FilesystemPathExists(volume_path);
if (!result) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`volume_path ${volume_path} is not currently mounted`
);
}
switch (node_attach_driver) {
case "iscsi":
const node_volume_id =
await csiProxyClient.getVolumeIdFromIscsiTarget(
volume_context.portal,
volume_context.iqn
);
const disk_number =
await csiProxyClient.getDiskNumberFromIscsiTarget(
volume_context.portal,
volume_context.iqn
);
if (node_volume_id) {
const required_bytes = _.get(
call.request,
"capacity_range.required_bytes"
);
if (required_bytes) {
await csiProxyClient.executeRPC("disk", "Rescan");
try {
await csiProxyClient.executeRPC("volume", "ResizeVolume", {
volume_id: node_volume_id,
resize_bytes: required_bytes,
});
} catch (e) {
let details = _.get(e, "details", "");
// seems to be a false positive
if (
!details.includes(
"The size of the extent is less than the minimum of 1MB"
)
) {
throw e;
}
await csiProxyClient.executeRPC("disk", "GetDiskStats", {
disk_number,
});
result = await csiProxyClient.executeRPC(
"volume",
"GetVolumeStats",
{
volume_id: node_volume_id,
}
);
let diff = Math.abs(result.total_bytes - required_bytes);
let percentage_diff = parseInt((diff / required_bytes) * 100);
/**
* 15MB is used by the 1ast partition on the initialized disk
*
* 100MB
* TODO: possibly change this to a percentage instead of absolute numbers
*/
let max_delta = 104857600;
driver.ctx.logger.debug(
"resize diff %s (%s%%)",
diff,
percentage_diff
);
if (diff > max_delta) {
throw new GrpcError(
grpc.status.OUT_OF_RANGE,
`expanded size ${result.total_bytes} is too far off (${diff}) from requested size (${required_bytes})`
);
}
}
}
} else {
throw new GrpcError(grpc.status.NOT_FOUND, `cannot find volume`);
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
);
}
break;
default:
throw new GrpcError(
grpc.status.UNIMPLEMENTED,
`unkown NODE OS DRIVER: ${driver.__getNodeOsDriver()}`
);
}
return {};
}
}
module.exports.CsiBaseDriver = CsiBaseDriver;