initial commit

This commit is contained in:
Travis Glenn Hansen 2019-11-21 14:59:57 -07:00
commit 13e24d3bdd
27 changed files with 15319 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
node_modules
dev

23
.travis.yml Normal file
View File

@ -0,0 +1,23 @@
# if: tag IS present
language: node_js
node_js:
- "12"
sudo: required
services:
- docker
install: true
script: true
deploy:
- provider: script
script: bash .travis/docker-release.sh
skip_cleanup: true
on:
repo: democratic-csi/democratic-csi
all_branches: true
condition: $TRAVIS_BRANCH =~ ^master|next$
- provider: script
script: bash .travis/docker-release.sh
skip_cleanup: true
on:
repo: democratic-csi/democratic-csi
tags: true

22
.travis/docker-release.sh Executable file
View File

@ -0,0 +1,22 @@
#!/bin/bash
echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin
export DOCKER_ORG="democraticcsi"
export DOCKER_PROJECT="democratic-csi"
export DOCKER_REPO="${DOCKER_ORG}/${DOCKER_PROJECT}"
if [[ -n "${TRAVIS_TAG}" ]];then
docker build --pull -t ${DOCKER_REPO}:${TRAVIS_TAG} .
docker push ${DOCKER_REPO}:${TRAVIS_TAG}
elif [[ -n "${TRAVIS_BRANCH}" ]];then
if [[ "${TRAVIS_BRANCH}" == "master" ]];then
docker build --pull -t ${DOCKER_REPO}:latest .
docker push ${DOCKER_REPO}:latest
else
docker build --pull -t ${DOCKER_REPO}:${TRAVIS_BRANCH} .
docker push ${DOCKER_REPO}:${TRAVIS_BRANCH}
fi
else
:
fi

56
Dockerfile Normal file
View File

@ -0,0 +1,56 @@
FROM debian:10-slim
RUN apt-get update && apt-get install -y locales && rm -rf /var/lib/apt/lists/* \
&& localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
ENV LANG en_US.utf8
# install node
ENV NODE_VERSION=v12.13.1
ENV NODE_DISTRO=linux-x64
RUN apt-get update && \
apt-get install -y wget xz-utils && \
wget https://nodejs.org/dist/${NODE_VERSION}/node-${NODE_VERSION}-${NODE_DISTRO}.tar.xz && \
mkdir -p /usr/local/lib/nodejs && \
tar -xJvf node-${NODE_VERSION}-${NODE_DISTRO}.tar.xz -C /usr/local/lib/nodejs && \
rm node-${NODE_VERSION}-${NODE_DISTRO}.tar.xz && \
rm -rf /var/lib/apt/lists/*
ENV PATH=/usr/local/lib/nodejs/node-${NODE_VERSION}-${NODE_DISTRO}/bin:$PATH
# node service requirements
RUN apt-get update && \
apt-get install -y xfsprogs fatresize dosfstools open-iscsi lsscsi sg3-utils multipath-tools scsitools nfs-common sudo && \
rm -rf /var/lib/apt/lists/*
# controller requirements
RUN apt-get update && \
apt-get install -y ansible && \
rm -rf /var/lib/apt/lists/*
# npm requirements
RUN apt-get update && \
apt-get install -y python make && \
rm -rf /var/lib/apt/lists/*
# install wrappers
ADD docker/iscsiadm /usr/local/sbin
RUN chmod +x /usr/local/sbin/iscsiadm
# Run as a non-root user
RUN useradd --create-home csi \
&& mkdir /home/csi/app \
&& chown -R csi: /home/csi
WORKDIR /home/csi/app
USER csi
COPY package*.json ./
RUN npm install
COPY --chown=csi:csi . .
USER root
EXPOSE 50051
ENTRYPOINT [ "bin/democratic-csi" ]

250
bin/democratic-csi Executable file
View File

@ -0,0 +1,250 @@
#!/usr/bin/env -S node --nouse-idle-notification --expose-gc --max-old-space-size=8192
const yaml = require("js-yaml");
const fs = require("fs");
let options;
const args = require("yargs")
.env("DEMOCRATIC_CSI")
.scriptName("democratic-csi")
.usage("$0 [options]")
.option("driver", {
alias: "d",
describe: "driver",
choices: ["freenas-nfs", "freenas-iscsi"]
})
.demandOption(["driver"], "driver is required")
.option("driver-config-file", {
describe: "provide a path to driver config file",
config: true,
configParser: path => {
try {
options = JSON.parse(fs.readFileSync(path, "utf-8"));
return true;
} catch (e) {}
try {
options = yaml.safeLoad(fs.readFileSync(path, "utf8"));
return true;
} catch (e) {}
throw new Error("failed parsing config file: " + path);
}
})
.demandOption(["driver-config-file"], "driver-config-file is required")
.option("log-level", {
describe: "log level",
choices: ["error", "warn", "info", "verbose", "debug", "silly"]
})
.option("csi-version", {
describe: "versin of the csi spec to load",
choices: ["0.2.0", "0.3.0", "1.0.0", "1.1.0", "1.2.0"]
})
.demandOption(["csi-version"], "csi-version is required")
.option("csi-name", {
describe: "name to use for driver registration"
})
.demandOption(["csi-name"], "csi-name is required")
.option("csi-mode", {
describe: "mode of the controller",
choices: ["controller", "node"],
type: "array",
default: ["controller", "node"]
})
.demandOption(["csi-mode"], "csi-mode is required")
.option("server-address", {
describe: "listen address for the server",
default: "0.0.0.0"
})
.option("server-port", {
describe: "listen port for the server",
default: 50051,
type: "number"
})
.version()
.help().argv;
const package = require("../package.json");
args.version = package.version;
const grpc = require("grpc");
const protoLoader = require("@grpc/proto-loader");
const LRU = require("lru-cache");
const cache = new LRU({ max: 500 });
const { logger } = require("../src/utils/logger");
if (args.logLevel) {
logger.level = args.logLevel;
}
const csiVersion = process.env.CSI_VERSION || "1.1.0";
const PROTO_PATH = __dirname + "/../csi_proto/csi-v" + csiVersion + ".proto";
// Suggested options for similarity to existing grpc.load behavior
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
const csi = protoDescriptor.csi.v1;
// include available drivers
const { FreeNASDriver } = require("../src/driver/freenas");
logger.info("initializing csi driver: %s", args.driver);
let driver;
switch (args.driver) {
case "freenas-nfs":
case "freenas-iscsi":
driver = new FreeNASDriver({ logger, args, cache, package }, options);
break;
default:
logger.error("invalid csi driver: %s", args.driver);
break;
}
async function requestHandlerProxy(call, callback, serviceMethodName) {
try {
logger.debug(
"new request - driver: %s method: %s call: %j",
driver.constructor.name,
serviceMethodName,
call
);
//const response = await handler.call(driver, call);
const response = await driver[serviceMethodName](call);
logger.debug(
"new response - driver: %s method: %s response: %j",
driver.constructor.name,
serviceMethodName,
response
);
callback(null, response);
} catch (e) {
logger.error(
"handler error - driver: %s method: %s error: %s",
driver.constructor.name,
serviceMethodName,
JSON.stringify(e)
);
console.log(e);
if (e.name == "GrpcError") {
callback(e);
} else {
// TODO: only show real error string in development mode
const message = true
? e.toString()
: "unknown error, please inspect service logs";
callback({ code: grpc.status.INTERNAL, message });
}
}
}
function getServer() {
var server = new grpc.Server();
// Identity Service
server.addService(csi.Identity.service, {
async GetPluginInfo(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async GetPluginCapabilities(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async Probe(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
}
});
// Controller Service
if (args.csiMode.includes("controller")) {
server.addService(csi.Controller.service, {
async CreateVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async DeleteVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ControllerPublishVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ControllerUnpublishVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ValidateVolumeCapabilities(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ListVolumes(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async GetCapacity(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ControllerGetCapabilities(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async CreateSnapshot(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async DeleteSnapshot(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ListSnapshots(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async ControllerExpandVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
}
});
}
// Node Service
if (args.csiMode.includes("node")) {
server.addService(csi.Node.service, {
async NodeStageVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeUnstageVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodePublishVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeUnpublishVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeGetVolumeStats(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeExpandVolume(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeGetCapabilities(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
},
async NodeGetInfo(call, callback) {
requestHandlerProxy(call, callback, arguments.callee.name);
}
});
}
return server;
}
// https://grpc.github.io/grpc/node/grpc.Server.html
const csiServer = getServer();
let bindAddress = `${args.serverAddress}:${args.serverPort}`;
logger.info(
"starting csi server - name: %s, version: %s, driver: %s, mode: %s, csi version: %s, address: %s",
args.csiName,
args.version,
args.driver,
args.csiMode.join(","),
args.csiVersion,
bindAddress
);
csiServer.bind(bindAddress, grpc.ServerCredentials.createInsecure());
csiServer.start();

673
csi_proto/csi-v0.2.0.proto Normal file
View File

@ -0,0 +1,673 @@
syntax = "proto3";
package csi.v0;
option go_package = "csi";
service Identity {
rpc GetPluginInfo(GetPluginInfoRequest)
returns (GetPluginInfoResponse) {}
rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
returns (GetPluginCapabilitiesResponse) {}
rpc Probe (ProbeRequest)
returns (ProbeResponse) {}
}
service Controller {
rpc CreateVolume (CreateVolumeRequest)
returns (CreateVolumeResponse) {}
rpc DeleteVolume (DeleteVolumeRequest)
returns (DeleteVolumeResponse) {}
rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
returns (ControllerPublishVolumeResponse) {}
rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
returns (ControllerUnpublishVolumeResponse) {}
rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
returns (ValidateVolumeCapabilitiesResponse) {}
rpc ListVolumes (ListVolumesRequest)
returns (ListVolumesResponse) {}
rpc GetCapacity (GetCapacityRequest)
returns (GetCapacityResponse) {}
rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
returns (ControllerGetCapabilitiesResponse) {}
}
service Node {
rpc NodeStageVolume (NodeStageVolumeRequest)
returns (NodeStageVolumeResponse) {}
rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
returns (NodeUnstageVolumeResponse) {}
rpc NodePublishVolume (NodePublishVolumeRequest)
returns (NodePublishVolumeResponse) {}
rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
returns (NodeUnpublishVolumeResponse) {}
rpc NodeGetId (NodeGetIdRequest)
returns (NodeGetIdResponse) {}
rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
returns (NodeGetCapabilitiesResponse) {}
}
message GetPluginInfoRequest {
}
message GetPluginInfoResponse {
// The name MUST follow reverse domain name notation format
// (https://en.wikipedia.org/wiki/Reverse_domain_name_notation).
// It SHOULD include the plugin's host company name and the plugin
// name, to minimize the possibility of collisions. It MUST be 63
// characters or less, beginning and ending with an alphanumeric
// character ([a-z0-9A-Z]) with dashes (-), underscores (_),
// dots (.), and alphanumerics between. This field is REQUIRED.
string name = 1;
// This field is REQUIRED. Value of this field is opaque to the CO.
string vendor_version = 2;
// This field is OPTIONAL. Values are opaque to the CO.
map<string, string> manifest = 3;
}
message GetPluginCapabilitiesRequest {
}
message GetPluginCapabilitiesResponse {
// All the capabilities that the controller service supports. This
// field is OPTIONAL.
repeated PluginCapability capabilities = 2;
}
// Specifies a capability of the plugin.
message PluginCapability {
message Service {
enum Type {
UNKNOWN = 0;
// CONTROLLER_SERVICE indicates that the Plugin provides RPCs for
// the ControllerService. Plugins SHOULD provide this capability.
// In rare cases certain plugins may wish to omit the
// ControllerService entirely from their implementation, but such
// SHOULD NOT be the common case.
// The presence of this capability determines whether the CO will
// attempt to invoke the REQUIRED ControllerService RPCs, as well
// as specific RPCs as indicated by ControllerGetCapabilities.
CONTROLLER_SERVICE = 1;
}
Type type = 1;
}
oneof type {
// Service that the plugin supports.
Service service = 1;
}
}
message ProbeRequest {
}
message ProbeResponse {
// Intentionally empty.
}
message CreateVolumeRequest {
// The suggested name for the storage space. This field is REQUIRED.
// It serves two purposes:
// 1) Idempotency - This name is generated by the CO to achieve
// idempotency. If `CreateVolume` fails, the volume may or may not
// be provisioned. In this case, the CO may call `CreateVolume`
// again, with the same name, to ensure the volume exists. The
// Plugin should ensure that multiple `CreateVolume` calls for the
// same name do not result in more than one piece of storage
// provisioned corresponding to that name. If a Plugin is unable to
// enforce idempotency, the CO's error recovery logic could result
// in multiple (unused) volumes being provisioned.
// 2) Suggested name - Some storage systems allow callers to specify
// an identifier by which to refer to the newly provisioned
// storage. If a storage system supports this, it can optionally
// use this name as the identifier for the new volume.
string name = 1;
// This field is OPTIONAL. This allows the CO to specify the capacity
// requirement of the volume to be provisioned. If not specified, the
// Plugin MAY choose an implementation-defined capacity range.
CapacityRange capacity_range = 2;
// The capabilities that the provisioned volume MUST have: the Plugin
// MUST provision a volume that could satisfy ALL of the
// capabilities specified in this list. The Plugin MUST assume that
// the CO MAY use the provisioned volume later with ANY of the
// capabilities specified in this list. This also enables the CO to do
// early validation: if ANY of the specified volume capabilities are
// not supported by the Plugin, the call SHALL fail. This field is
// REQUIRED.
repeated VolumeCapability volume_capabilities = 3;
// Plugin specific parameters passed in as opaque key-value pairs.
// This field is OPTIONAL. The Plugin is responsible for parsing and
// validating these parameters. COs will treat these as opaque.
map<string, string> parameters = 4;
// Secrets required by plugin to complete volume creation request.
// A secret is a string to string map where the key identifies the
// name of the secret (e.g. "username" or "password"), and the value
// contains the secret data (e.g. "bob" or "abc123").
// Each key MUST consist of alphanumeric characters, '-', '_' or '.'.
// Each value MUST contain a valid string. An SP MAY choose to accept
// binary (non-string) data by using a binary-to-text encoding scheme,
// like base64.
// An SP SHALL advertise the requirements for required secret keys and
// values in documentation.
// CO SHALL permit passing through the required secrets.
// A CO MAY pass the same secrets to all RPCs, therefore the keys for
// all unique secrets that an SP expects must be unique across all CSI
// operations.
// This information is sensitive and MUST be treated as such (not
// logged, etc.) by the CO.
// This field is OPTIONAL.
map<string, string> controller_create_secrets = 5;
}
message CreateVolumeResponse {
// Contains all attributes of the newly created volume that are
// relevant to the CO along with information required by the Plugin
// to uniquely identify the volume. This field is REQUIRED.
Volume volume = 1;
}
// Specify a capability of a volume.
message VolumeCapability {
// Indicate that the volume will be accessed via the block device API.
message BlockVolume {
// Intentionally empty, for now.
}
// Indicate that the volume will be accessed via the filesystem API.
message MountVolume {
// The filesystem type. This field is OPTIONAL.
// An empty string is equal to an unspecified field value.
string fs_type = 1;
// The mount options that can be used for the volume. This field is
// OPTIONAL. `mount_flags` MAY contain sensitive information.
// Therefore, the CO and the Plugin MUST NOT leak this information
// to untrusted entities. The total size of this repeated field
// SHALL NOT exceed 4 KiB.
repeated string mount_flags = 2;
}
// Specify how a volume can be accessed.
message AccessMode {
enum Mode {
UNKNOWN = 0;
// Can only be published once as read/write on a single node, at
// any given time.
SINGLE_NODE_WRITER = 1;
// Can only be published once as readonly on a single node, at
// any given time.
SINGLE_NODE_READER_ONLY = 2;
// Can be published as readonly at multiple nodes simultaneously.
MULTI_NODE_READER_ONLY = 3;
// Can be published at multiple nodes simultaneously. Only one of
// the node can be used as read/write. The rest will be readonly.
MULTI_NODE_SINGLE_WRITER = 4;
// Can be published as read/write at multiple nodes
// simultaneously.
MULTI_NODE_MULTI_WRITER = 5;
}
// This field is REQUIRED.
Mode mode = 1;
}
// Specifies what API the volume will be accessed using. One of the
// following fields MUST be specified.
oneof access_type {
BlockVolume block = 1;
MountVolume mount = 2;
}
// This is a REQUIRED field.
AccessMode access_mode = 3;
}
// The capacity of the storage space in bytes. To specify an exact size,
// `required_bytes` and `limit_bytes` can be set to the same value. At
// least one of the these fields MUST be specified.
message CapacityRange {
// Volume must be at least this big. This field is OPTIONAL.
// A value of 0 is equal to an unspecified field value.
// The value of this field MUST NOT be negative.
int64 required_bytes = 1;
// Volume must not be bigger than this. This field is OPTIONAL.
// A value of 0 is equal to an unspecified field value.
// The value of this field MUST NOT be negative.
int64 limit_bytes = 2;
}
// The information about a provisioned volume.
message Volume {
// The capacity of the volume in bytes. This field is OPTIONAL. If not
// set (value of 0), it indicates that the capacity of the volume is
// unknown (e.g., NFS share).
// The value of this field MUST NOT be negative.
int64 capacity_bytes = 1;
// Contains identity information for the created volume. This field is
// REQUIRED. The identity information will be used by the CO in
// subsequent calls to refer to the provisioned volume.
string id = 2;
// Attributes reflect static properties of a volume and MUST be passed
// to volume validation and publishing calls.
// Attributes SHALL be opaque to a CO. Attributes SHALL NOT be mutable
// and SHALL be safe for the CO to cache. Attributes SHOULD NOT
// contain sensitive information. Attributes MAY NOT uniquely identify
// a volume. A volume uniquely identified by `id` SHALL always report
// the same attributes. This field is OPTIONAL and when present MUST
// be passed to volume validation and publishing calls.
map<string,string> attributes = 3;
}
message DeleteVolumeRequest {
// The ID of the volume to be deprovisioned.
// This field is REQUIRED.
string volume_id = 1;
// Secrets required by plugin to complete volume deletion request.
// A secret is a string to string map where the key identifies the
// name of the secret (e.g. "username" or "password"), and the value
// contains the secret data (e.g. "bob" or "abc123").
// Each key MUST consist of alphanumeric characters, '-', '_' or '.'.
// Each value MUST contain a valid string. An SP MAY choose to accept
// binary (non-string) data by using a binary-to-text encoding scheme,
// like base64.
// An SP SHALL advertise the requirements for required secret keys and
// values in documentation.
// CO SHALL permit passing through the required secrets.
// A CO MAY pass the same secrets to all RPCs, therefore the keys for
// all unique secrets that an SP expects must be unique across all CSI
// operations.
// This information is sensitive and MUST be treated as such (not
// logged, etc.) by the CO.
// This field is OPTIONAL.
map<string, string> controller_delete_secrets = 2;
}
message DeleteVolumeResponse {
}
message ControllerPublishVolumeRequest {
// The ID of the volume to be used on a node.
// This field is REQUIRED.
string volume_id = 1;
// The ID of the node. This field is REQUIRED. The CO SHALL set this
// field to match the node ID returned by `NodeGetId`.
string node_id = 2;
// The capability of the volume the CO expects the volume to have.
// This is a REQUIRED field.
VolumeCapability volume_capability = 3;
// Whether to publish the volume in readonly mode. This field is
// REQUIRED.
bool readonly = 4;
// Secrets required by plugin to complete controller publish volume
// request.
// A secret is a string to string map where the key identifies the
// name of the secret (e.g. "username" or "password"), and the value
// contains the secret data (e.g. "bob" or "abc123").
// Each key MUST consist of alphanumeric characters, '-', '_' or '.'.
// Each value MUST contain a valid string. An SP MAY choose to accept
// binary (non-string) data by using a binary-to-text encoding scheme,
// like base64.
// An SP SHALL advertise the requirements for required secret keys and
// values in documentation.
// CO SHALL permit passing through the required secrets.
// A CO MAY pass the same secrets to all RPCs, therefore the keys for
// all unique secrets that an SP expects must be unique across all CSI
// operations.
// This information is sensitive and MUST be treated as such (not
// logged, etc.) by the CO.
// This field is OPTIONAL.
map<string, string> controller_publish_secrets = 5;
// Attributes of the volume to be used on a node. This field is
// OPTIONAL and MUST match the attributes of the Volume identified
// by `volume_id`.
map<string,string> volume_attributes = 6;
}
message ControllerPublishVolumeResponse {
// The SP specific information that will be passed to the Plugin in
// the subsequent `NodeStageVolume` or `NodePublishVolume` calls
// for the given volume.
// This information is opaque to the CO. This field is OPTIONAL.
map<string, string> publish_info = 1;
}
message ControllerUnpublishVolumeRequest {
// The ID of the volume. This field is REQUIRED.
string volume_id = 1;
// The ID of the node. This field is OPTIONAL. The CO SHOULD set this
// field to match the node ID returned by `NodeGetId` or leave it
// unset. If the value is set, the SP MUST unpublish the volume from
// the specified node. If the value is unset, the SP MUST unpublish
// the volume from all nodes it is published to.
string node_id = 2;
// Secrets required by plugin to complete controller unpublish volume
// request. This SHOULD be the same secrets passed to the
// ControllerPublishVolume.
// call for the specified volume.
// A secret is a string to string map where the key identifies the
// name of the secret (e.g. "username" or "password"), and the value
// contains the secret data (e.g. "bob" or "abc123").
// Each key MUST consist of alphanumeric characters, '-', '_' or '.'.
// Each value MUST contain a valid string. An SP MAY choose to accept
// binary (non-string) data by using a binary-to-text encoding scheme,
// like base64.
// An SP SHALL advertise the requirements for required secret keys and
// values in documentation.
// CO SHALL permit passing through the required secrets.
// A CO MAY pass the same secrets to all RPCs, therefore the keys for
// all unique secrets that an SP expects must be unique across all CSI
// operations.
// This information is sensitive and MUST be treated as such (not
// logged, etc.) by the CO.
// This field is OPTIONAL.
map<string, string> controller_unpublish_secrets = 3;
}
message ControllerUnpublishVolumeResponse {
}
message ValidateVolumeCapabilitiesRequest {
// The ID of the volume to check. This field is REQUIRED.
string volume_id = 1;
// The capabilities that the CO wants to check for the volume. This
// call SHALL return "supported" only if all the volume capabilities
// specified below are supported. This field is REQUIRED.
repeated VolumeCapability volume_capabilities = 2;
// Attributes of the volume to check. This field is OPTIONAL and MUST
// match the attributes of the Volume identified by `volume_id`.
map<string,string> volume_attributes = 3;
}
message ValidateVolumeCapabilitiesResponse {
// True if the Plugin supports the specified capabilities for the
// given volume. This field is REQUIRED.
bool supported = 1;
// Message to the CO if `supported` above is false. This field is
// OPTIONAL.
// An empty string is equal to an unspecified field value.
string message = 2;
}
message ListVolumesRequest {
// If specified (non-zero value), the Plugin MUST NOT return more
// entries than this number in the response. If the actual number of
// entries is more than this number, the Plugin MUST set `next_token`
// in the response which can be used to get the next page of entries
// in the subsequent `ListVolumes` call. This field is OPTIONAL. If
// not specified (zero value), it means there is no restriction on the
// number of entries that can be returned.
// The value of this field MUST NOT be negative.
int32 max_entries = 1;
// A token to specify where to start paginating. Set this field to
// `next_token` returned by a previous `ListVolumes` call to get the
// next page of entries. This field is OPTIONAL.
// An empty string is equal to an unspecified field value.
string starting_token = 2;
}
message ListVolumesResponse {
message Entry {
Volume volume = 1;
}
repeated Entry entries = 1;
// This token allows you to get the next page of entries for
// `ListVolumes` request. If the number of entries is larger than
// `max_entries`, use the `next_token` as a value for the
// `starting_token` field in the next `ListVolumes` request. This
// field is OPTIONAL.
// An empty string is equal to an unspecified field value.
string next_token = 2;
}
message GetCapacityRequest {
// If specified, the Plugin SHALL report the capacity of the storage
// that can be used to provision volumes that satisfy ALL of the
// specified `volume_capabilities`. These are the same
// `volume_capabilities` the CO will use in `CreateVolumeRequest`.
// This field is OPTIONAL.
repeated VolumeCapability volume_capabilities = 1;
// If specified, the Plugin SHALL report the capacity of the storage
// that can be used to provision volumes with the given Plugin
// specific `parameters`. These are the same `parameters` the CO will
// use in `CreateVolumeRequest`. This field is OPTIONAL.
map<string, string> parameters = 2;
}
message GetCapacityResponse {
// The available capacity of the storage that can be used to
// provision volumes. If `volume_capabilities` or `parameters` is
// specified in the request, the Plugin SHALL take those into
// consideration when calculating the available capacity of the
// storage. This field is REQUIRED.
// The value of this field MUST NOT be negative.
int64 available_capacity = 1;
}
message ControllerGetCapabilitiesRequest {
}
message ControllerGetCapabilitiesResponse {
// All the capabilities that the controller service supports. This
// field is OPTIONAL.
repeated ControllerServiceCapability capabilities = 2;
}
// Specifies a capability of the controller service.
message ControllerServiceCapability {
message RPC {
enum Type {
UNKNOWN = 0;
CREATE_DELETE_VOLUME = 1;
PUBLISH_UNPUBLISH_VOLUME = 2;
LIST_VOLUMES = 3;
GET_CAPACITY = 4;
}
Type type = 1;
}
oneof type {
// RPC that the controller supports.
RPC rpc = 1;
}
}
message NodeStageVolumeRequest {
// The ID of the volume to publish. This field is REQUIRED.
string volume_id = 1;
// The CO SHALL set this field to the value returned by
// `ControllerPublishVolume` if the corresponding Controller Plugin
// has `PUBLISH_UNPUBLISH_VOLUME` controller capability, and SHALL be
// left unset if the corresponding Controller Plugin does not have
// this capability. This is an OPTIONAL field.
map<string, string> publish_info = 2;
// The path to which the volume will be published. It MUST be an
// absolute path in the root filesystem of the process serving this
// request. The CO SHALL ensure that there is only one
// staging_target_path per volume.
// This is a REQUIRED field.
string staging_target_path = 3;
// The capability of the volume the CO expects the volume to have.
// This is a REQUIRED field.
VolumeCapability volume_capability = 4;
// Secrets required by plugin to complete node stage volume request.
// A secret is a string to string map where the key identifies the
// name of the secret (e.g. "username" or "password"), and the value
// contains the secret data (e.g. "bob" or "abc123").
// Each key MUST consist of alphanumeric characters, '-', '_' or '.'.
// Each value MUST contain a valid string. An SP MAY choose to accept
// binary (non-string) data by using a binary-to-text encoding scheme,
// like base64.
// An SP SHALL advertise the requirements for required secret keys and
// values in documentation.
// CO SHALL permit passing through the required secrets.
// A CO MAY pass the same secrets to all RPCs, therefore the keys for
// all unique secrets that an SP expects must be unique across all CSI
// operations.
// This information is sensitive and MUST be treated as such (not
// logged, etc.) by the CO.
// This field is OPTIONAL.
map<string, string> node_stage_secrets = 5;
// Attributes of the volume to publish. This field is OPTIONAL and
// MUST match the attributes of the VolumeInfo identified by
// `volume_id`.
map<string,string> volume_attributes = 6;
}
message NodeStageVolumeResponse {
}
message NodeUnstageVolumeRequest {
// The ID of the volume. This field is REQUIRED.
string volume_id = 1;
// The path at which the volume was published. It MUST be an absolute
// path in the root filesystem of the process serving this request.
// This is a REQUIRED field.
string staging_target_path = 2;
}
message NodeUnstageVolumeResponse {
}
message NodePublishVolumeRequest {
// The ID of the volume to publish. This field is REQUIRED.
string volume_id = 1;
// The CO SHALL set this field to the value returned by
// `ControllerPublishVolume` if the corresponding Controller Plugin
// has `PUBLISH_UNPUBLISH_VOLUME` controller capability, and SHALL be
// left unset if the corresponding Controller Plugin does not have
// this capability. This is an OPTIONAL field.
map<string, string> publish_info = 2;
// The path to which the device was mounted by `NodeStageVolume`.
// It MUST be an absolute path in the root filesystem of the process
// serving this request.
// It MUST be set if the Node Plugin implements the
// `STAGE_UNSTAGE_VOLUME` node capability.
// This is an OPTIONAL field.
string staging_target_path = 3;
// The path to which the volume will be published. It MUST be an
// absolute path in the root filesystem of the process serving this
// request. The CO SHALL ensure uniqueness of target_path per volume.
// The CO SHALL ensure that the path exists, and that the process
// serving the request has `read` and `write` permissions to the path.
// This is a REQUIRED field.
string target_path = 4;
// The capability of the volume the CO expects the volume to have.
// This is a REQUIRED field.
VolumeCapability volume_capability = 5;
// Whether to publish the volume in readonly mode. This field is
// REQUIRED.
bool readonly = 6;
// Secrets required by plugin to complete node publish volume request.
// A secret is a string to string map where the key identifies the
// name of the secret (e.g. "username" or "password"), and the value
// contains the secret data (e.g. "bob" or "abc123").
// Each key MUST consist of alphanumeric characters, '-', '_' or '.'.
// Each value MUST contain a valid string. An SP MAY choose to accept
// binary (non-string) data by using a binary-to-text encoding scheme,
// like base64.
// An SP SHALL advertise the requirements for required secret keys and
// values in documentation.
// CO SHALL permit passing through the required secrets.
// A CO MAY pass the same secrets to all RPCs, therefore the keys for
// all unique secrets that an SP expects must be unique across all CSI
// operations.
// This information is sensitive and MUST be treated as such (not
// logged, etc.) by the CO.
// This field is OPTIONAL.
map<string, string> node_publish_secrets = 7;
// Attributes of the volume to publish. This field is OPTIONAL and
// MUST match the attributes of the Volume identified by
// `volume_id`.
map<string,string> volume_attributes = 8;
}
message NodePublishVolumeResponse {
}
message NodeUnpublishVolumeRequest {
// The ID of the volume. This field is REQUIRED.
string volume_id = 1;
// The path at which the volume was published. It MUST be an absolute
// path in the root filesystem of the process serving this request.
// This is a REQUIRED field.
string target_path = 2;
}
message NodeUnpublishVolumeResponse {
}
message NodeGetIdRequest {
}
message NodeGetIdResponse {
// The ID of the node as understood by the SP which SHALL be used by
// CO in subsequent `ControllerPublishVolume`.
// This is a REQUIRED field.
string node_id = 1;
}
message NodeGetCapabilitiesRequest {
}
message NodeGetCapabilitiesResponse {
// All the capabilities that the node service supports. This field
// is OPTIONAL.
repeated NodeServiceCapability capabilities = 1;
}
// Specifies a capability of the node service.
message NodeServiceCapability {
message RPC {
enum Type {
UNKNOWN = 0;
STAGE_UNSTAGE_VOLUME = 1;
}
Type type = 1;
}
oneof type {
// RPC that the controller supports.
RPC rpc = 1;
}
}

1093
csi_proto/csi-v0.3.0.proto Normal file

File diff suppressed because it is too large Load Diff

1203
csi_proto/csi-v1.0.0.proto Normal file

File diff suppressed because it is too large Load Diff

1306
csi_proto/csi-v1.1.0.proto Normal file

File diff suppressed because it is too large Load Diff

1369
csi_proto/csi-v1.2.0.proto Normal file

File diff suppressed because it is too large Load Diff

5
docker/iscsiadm Normal file
View File

@ -0,0 +1,5 @@
#!/bin/bash
# https://engineering.docker.com/2019/07/road-to-containing-iscsi/
chroot /host /usr/bin/env -i PATH="/sbin:/bin:/usr/bin" iscsiadm "${@:1}"

View File

@ -0,0 +1,59 @@
httpConnection:
protocol: http
host: server address
port: 80
username: root
password:
allowInsecure: true
sshConnection:
host: server address
port: 22
username: root
# use either password or key
password: ""
privateKey: |
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----
zfs:
# total volume name (zvol/<datasetParentName>/<pvc name>) length cannot exceed 63 chars
# https://www.ixsystems.com/documentation/freenas/11.2-U5/storage.html#zfs-zvol-config-opts-tab
# standard volume naming overhead is 46 chars
# datasetParentName should therefore be 17 chars or less
datasetParentName: tank/k8s/b/vols
detachedSnapshotsDatasetParentName: tanks/k8s/b/snaps
# "" (inherit), lz4, gzip-9, etc
zvolCompression:
# "" (inherit), on, off, verify
zvolDedup:
zvolEnableReservation: false
# 512, 1K, 2K, 4K, 8K, 16K, 64K, 128K default is 16K
zvolBlocksize:
iscsi:
targetPortal: "server:3261"
targetPortals: []
# leave empty to omit usage of -I with iscsiadm
interface:
namePrefix: csi-
nameSuffix: "-clustera"
# add as many as needed
targetGroups:
# get the correct ID from the "portal" section in the UI
- targetGroupPortalGroup: 1
# get the correct ID from the "initiators" section in the UI
targetGroupInitiatorGroup: 1
# None, CHAP, or CHAP Mutual
targetGroupAuthType: None
# get the correct ID from the "Authorized Access" section of the UI
# only required if using Chap
targetGroupAuthGroup:
extentInsecureTpc: true
extentXenCompat: false
extentDisablePhysicalBlocksize: true
# 512, 1024, 2048, or 4096,
extentBlocksize: 512
# "" (let FreeNAS decide, currently defaults to SSD), Unknown, SSD, 5400, 7200, 10000, 15000
extentRpm: "SSD"
# 0-100 (0 == ignore)
extentAvailThreshold: 0

34
examples/freenas-nfs.yaml Normal file
View File

@ -0,0 +1,34 @@
httpConnection:
protocol: http
host: server address
port: 80
username: root
password:
allowInsecure: true
sshConnection:
host: server address
port: 22
username: root
# use either password or key
password: ""
privateKey: |
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----
zfs:
datasetParentName: tank/k8s/a/vols
detachedSnapshotsDatasetParentName: tank/k8s/a/snaps
datasetEnableQuotas: true
datasetEnableReservation: false
datasetPermissionsMode: "0777"
datasetPermissionsUser: root
datasetPermissionsGroup: wheel
nfs:
shareHost: server address
shareAlldirs: false
shareAllowedHosts: []
shareAllowedNetworks: []
shareMaprootUser: root
shareMaprootGroup: wheel
shareMapallUser: ""
shareMapallGroup: ""

2397
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

34
package.json Normal file
View File

@ -0,0 +1,34 @@
{
"name": "democratic-csi",
"version": "0.1.0",
"description": "kubernetes csi driver framework",
"main": "bin/democratic-csi",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "./bin/democratic-csi"
},
"author": "Travis Glenn Hansen <travisghansen@yahoo.com>",
"bugs": {
"url": "https://github.com/democratic-csi/democratic-csi/issues"
},
"homepage": "https://github.com/democratic-csi/democratic-csi#readme",
"license": "MIT",
"repository": {
"type": "git",
"url": "https://github.com/democratic-csi/democratic-csi.git"
},
"dependencies": {
"@grpc/proto-loader": "^0.5.3",
"bunyan": "^1.8.12",
"eslint": "^6.6.0",
"grpc": "^1.24.2",
"js-yaml": "^3.13.1",
"lru-cache": "^5.1.1",
"request": "^2.88.0",
"ssh2": "^0.8.6",
"uri-js": "^4.2.2",
"uuid": "^3.3.3",
"winston": "^3.2.1",
"yargs": "^15.0.2"
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,163 @@
const request = require("request");
const URI = require("uri-js");
const USER_AGENT = "democratic-csi-driver";
class Client {
constructor(options = {}) {
this.options = options;
this.logger = console;
// default to v1.0 for now
if (!this.options.apiVersion) {
this.options.apiVersion = 1;
}
}
getBaseURL() {
const server = this.options;
const options = {
scheme: server.protocol,
host: server.host,
port: server.port,
//userinfo: server.username + ":" + server.password,
path: server.apiVersion == 1 ? "/api/v1.0" : "/api/v2.0"
};
return URI.serialize(options);
}
setApiVersion(apiVersion) {
this.options.apiVersion = apiVersion;
}
getApiVersion() {
return this.options.apiVersion;
}
log_repsonse(error, response, body, options) {
this.logger.debug("FREENAS HTTP REQUEST: " + JSON.stringify(options));
this.logger.debug("FREENAS HTTP ERROR: " + error);
this.logger.debug("FREENAS HTTP STATUS: " + response.statusCode);
this.logger.debug(
"FREENAS HTTP HEADERS: " + JSON.stringify(response.headers)
);
this.logger.debug("FREENAS HTTP BODY: " + JSON.stringify(body));
}
async get(endpoint, data) {
const client = this;
// curl -X GET "http://bitness.lan/api/v2.0/core/ping" -H "accept: */*"
if (this.options.apiVersion == 1 && !endpoint.endsWith("/")) {
endpoint += "/";
}
return new Promise((resolve, reject) => {
const options = {
method: "GET",
url: this.getBaseURL() + endpoint,
headers: {
Accept: "*/*",
"User-Agent": USER_AGENT
},
json: true,
qs: data
};
request(options, function(err, res, body) {
client.log_repsonse(...arguments, options);
if (err) {
reject(err);
}
resolve(res);
}).auth(client.options.username, client.options.password);
});
}
async post(endpoint, data) {
const client = this;
// curl -X POST "http://bitness.lan/api/v2.0/core/get_methods" -H "accept: */*" -H "Content-Type: application/json" -d "\"string\""
if (this.options.apiVersion == 1 && !endpoint.endsWith("/")) {
endpoint += "/";
}
return new Promise((resolve, reject) => {
const options = {
method: "POST",
url: this.getBaseURL() + endpoint,
headers: {
Accept: "*/*",
"User-Agent": USER_AGENT
},
json: true,
body: data
};
request(options, function(err, res, body) {
client.log_repsonse(...arguments, options);
if (err) {
reject(err);
}
resolve(res);
}).auth(client.options.username, client.options.password);
});
}
async put(endpoint, data) {
const client = this;
// curl -X PUT "http://bitness.lan/api/v2.0/sharing/smb/id/1" -H "accept: */*" -H "Content-Type: application/json" -d "{\"path\":\"string\",\"home\":true,\"name\":\"string\",\"comment\":\"string\",\"ro\":true,\"browsable\":true,\"timemachine\":true,\"recyclebin\":true,\"showhiddenfiles\":true,\"guestok\":true,\"guestonly\":true,\"abe\":true,\"hostsallow\":[null],\"hostsdeny\":[null],\"vfsobjects\":[null],\"storage_task\":0,\"auxsmbconf\":\"string\",\"default_permissions\":true}"
if (this.options.apiVersion == 1 && !endpoint.endsWith("/")) {
endpoint += "/";
}
return new Promise((resolve, reject) => {
const options = {
method: "PUT",
url: this.getBaseURL() + endpoint,
headers: {
Accept: "*/*",
"User-Agent": USER_AGENT
},
json: true,
body: data
};
request(options, function(err, res, body) {
client.log_repsonse(...arguments, options);
if (err) {
reject(err);
}
resolve(res);
}).auth(client.options.username, client.options.password);
});
}
//Unauthorized
async delete(endpoint, data) {
const client = this;
// curl -X DELETE "http://bitness.lan/api/v2.0/sharing/smb/id/1" -H "accept: */*" -H "Content-Type: application/json" -d "{}"
if (this.options.apiVersion == 1 && !endpoint.endsWith("/")) {
endpoint += "/";
}
return new Promise((resolve, reject) => {
const options = {
method: "DELETE",
url: this.getBaseURL() + endpoint,
headers: {
Accept: "*/*",
"User-Agent": USER_AGENT
},
json: true,
body: data
};
request(options, function(err, res, body) {
client.log_repsonse(...arguments, options);
if (err) {
reject(err);
}
resolve(res);
}).auth(client.options.username, client.options.password);
});
}
}
module.exports.Client = Client;

1027
src/driver/freenas/index.js Normal file

File diff suppressed because it is too large Load Diff

798
src/driver/index.js Normal file
View File

@ -0,0 +1,798 @@
const os = require("os");
const path = require("path");
const grpc = require("grpc");
const fs = require("fs");
const { GrpcError } = require("../utils/grpc");
const { Mount } = require("../utils/mount");
const { Filesystem } = require("../utils/filesystem");
const { ISCSI } = require("../utils/iscsi");
const sleep = require("../utils/general").sleep;
/**
* common code shared between all drivers
* this is **NOT** meant to work as a proxy
* for the grpc calls meaning, it should not
* also operate as a facade handling directly
* the requests to the platform
*/
class CsiBaseDriver {
constructor(ctx, options) {
this.ctx = ctx;
this.options = options;
}
async GetPluginInfo(call) {
return {
name: this.ctx.args.csiName,
vendor_version: this.ctx.args.version
};
}
async GetPluginCapabilities(call) {
let capabilities;
const response = {
capabilities: []
};
//UNKNOWN = 0;
// CONTROLLER_SERVICE indicates that the Plugin provides RPCs for
// the ControllerService. Plugins SHOULD provide this capability.
// In rare cases certain plugins MAY wish to omit the
// ControllerService entirely from their implementation, but such
// SHOULD NOT be the common case.
// The presence of this capability determines whether the CO will
// attempt to invoke the REQUIRED ControllerService RPCs, as well
// as specific RPCs as indicated by ControllerGetCapabilities.
//CONTROLLER_SERVICE = 1;
// VOLUME_ACCESSIBILITY_CONSTRAINTS indicates that the volumes for
// this plugin MAY NOT be equally accessible by all nodes in the
// cluster. The CO MUST use the topology information returned by
// CreateVolumeRequest along with the topology information
// returned by NodeGetInfo to ensure that a given volume is
// accessible from a given node when scheduling workloads.
//VOLUME_ACCESSIBILITY_CONSTRAINTS = 2;
capabilities = this.options.service.identity.capabilities.service || [
"UNKNOWN"
];
capabilities.forEach(item => {
response.capabilities.push({
service: { type: item }
});
});
//UNKNOWN = 0;
// ONLINE indicates that volumes may be expanded when published to
// a node. When a Plugin implements this capability it MUST
// implement either the EXPAND_VOLUME controller capability or the
// EXPAND_VOLUME node capability or both. When a plugin supports
// ONLINE volume expansion and also has the EXPAND_VOLUME
// controller capability then the plugin MUST support expansion of
// volumes currently published and available on a node. When a
// plugin supports ONLINE volume expansion and also has the
// EXPAND_VOLUME node capability then the plugin MAY support
// expansion of node-published volume via NodeExpandVolume.
//
// Example 1: Given a shared filesystem volume (e.g. GlusterFs),
// the Plugin may set the ONLINE volume expansion capability and
// implement ControllerExpandVolume but not NodeExpandVolume.
//
// Example 2: Given a block storage volume type (e.g. EBS), the
// Plugin may set the ONLINE volume expansion capability and
// implement both ControllerExpandVolume and NodeExpandVolume.
//
// Example 3: Given a Plugin that supports volume expansion only
// upon a node, the Plugin may set the ONLINE volume
// expansion capability and implement NodeExpandVolume but not
// ControllerExpandVolume.
//ONLINE = 1;
// OFFLINE indicates that volumes currently published and
// available on a node SHALL NOT be expanded via
// ControllerExpandVolume. When a plugin supports OFFLINE volume
// expansion it MUST implement either the EXPAND_VOLUME controller
// capability or both the EXPAND_VOLUME controller capability and
// the EXPAND_VOLUME node capability.
//
// Example 1: Given a block storage volume type (e.g. Azure Disk)
// that does not support expansion of "node-attached" (i.e.
// controller-published) volumes, the Plugin may indicate
// OFFLINE volume expansion support and implement both
// ControllerExpandVolume and NodeExpandVolume.
//OFFLINE = 2;
capabilities = this.options.service.identity.capabilities
.volume_expansion || ["UNKNOWN"];
capabilities.forEach(item => {
response.capabilities.push({
volume_expansion: { type: item }
});
});
return response;
}
async Probe(call) {
return { ready: { value: true } };
}
async ControllerGetCapabilities(call) {
let capabilities;
const response = {
capabilities: []
};
//UNKNOWN = 0;
//CREATE_DELETE_VOLUME = 1;
//PUBLISH_UNPUBLISH_VOLUME = 2;
//LIST_VOLUMES = 3;
//GET_CAPACITY = 4;
// Currently the only way to consume a snapshot is to create
// a volume from it. Therefore plugins supporting
// CREATE_DELETE_SNAPSHOT MUST support creating volume from
// snapshot.
//CREATE_DELETE_SNAPSHOT = 5;
//LIST_SNAPSHOTS = 6;
// Plugins supporting volume cloning at the storage level MAY
// report this capability. The source volume MUST be managed by
// the same plugin. Not all volume sources and parameters
// combinations MAY work.
//CLONE_VOLUME = 7;
// Indicates the SP supports ControllerPublishVolume.readonly
// field.
//PUBLISH_READONLY = 8;
// See VolumeExpansion for details.
//EXPAND_VOLUME = 9;
capabilities = this.options.service.controller.capabilities.rpc || [
"UNKNOWN"
];
capabilities.forEach(item => {
response.capabilities.push({
rpc: { type: item }
});
});
return response;
}
async NodeGetCapabilities(call) {
let capabilities;
const response = {
capabilities: []
};
//UNKNOWN = 0;
//STAGE_UNSTAGE_VOLUME = 1;
// If Plugin implements GET_VOLUME_STATS capability
// then it MUST implement NodeGetVolumeStats RPC
// call for fetching volume statistics.
//GET_VOLUME_STATS = 2;
// See VolumeExpansion for details.
//EXPAND_VOLUME = 3;
capabilities = this.options.service.node.capabilities.rpc || ["UNKNOWN"];
capabilities.forEach(item => {
response.capabilities.push({
rpc: { type: item }
});
});
return response;
}
async NodeGetInfo(call) {
return {
node_id: process.env.CSI_NODE_ID || os.hostname(),
max_volumes_per_node: 0
};
}
/**
* https://kubernetes-csi.github.io/docs/raw-block.html
* --feature-gates=BlockVolume=true,CSIBlockVolume=true
*
* StagingTargetPath is always a directory even for block volumes
*
* NOTE: stage gets called every time publish does
*
* @param {*} call
*/
async NodeStageVolume(call) {
const mount = new Mount();
const filesystem = new Filesystem();
const iscsi = new ISCSI();
let result;
let device;
const volume_id = call.request.volume_id;
const staging_target_path = call.request.staging_target_path;
const capability = call.request.volume_capability;
const access_type = capability.access_type || "mount";
const volume_context = call.request.volume_context;
let fs_type;
let mount_flags;
const node_attach_driver = volume_context.node_attach_driver;
const block_path = staging_target_path + "/block_device";
const bind_mount_flags = [];
bind_mount_flags.push("defaults");
if (access_type == "mount") {
fs_type = capability.mount.fs_type;
mount_flags = capability.mount.mount_flags || [];
mount_flags.push("defaults");
}
result = await this.assertCapabilities([capability]);
if (!result.valid) {
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`invalid capability: ${result.message}`
);
}
// csi spec stipulates that staging_target_path is a directory even for block mounts
result = await filesystem.pathExists(staging_target_path);
if (!result) {
await filesystem.mkdir(staging_target_path, ["-p", "-m", "0750"]);
}
switch (node_attach_driver) {
case "nfs":
device = `${volume_context.server}:${volume_context.share}`;
break;
case "iscsi":
// create DB entry
let nodeDB = {};
const nodeDBKeyPrefix = "node-db.";
for (const key in call.request.secrets) {
if (key.startsWith(nodeDBKeyPrefix)) {
nodeDB[key.substr(nodeDBKeyPrefix.length)] =
call.request.secrets[key];
}
}
await iscsi.iscsiadm.createNodeDBEntry(
volume_context.iqn,
volume_context.portal,
nodeDB
);
// login
await iscsi.iscsiadm.login(volume_context.iqn, volume_context.portal);
// find device name
device = `/dev/disk/by-path/ip-${volume_context.portal}-iscsi-${volume_context.iqn}-lun-${volume_context.lun}`;
// can take some time for device to show up, loop for some period
result = await filesystem.pathExists(device);
let timer_start = Math.round(new Date().getTime() / 1000);
let timer_max = 30;
while (!result) {
await sleep(2000);
result = await filesystem.pathExists(device);
let current_time = Math.round(new Date().getTime() / 1000);
if (!result && current_time - timer_start > timer_max) {
throw new GrpcError(
grpc.status.UNKNOWN,
`hit timeout waiting for device node to appear: ${device}`
);
}
}
device = await filesystem.realpath(device);
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
);
}
switch (access_type) {
case "mount":
if (await filesystem.isBlockDevice(device)) {
// format
result = await filesystem.deviceIsFormatted(device);
if (!result) {
await filesystem.formatDevice(device, fs_type);
}
let fs_info = await filesystem.getDeviceFilesystemInfo(device);
fs_type = fs_info.type;
// fsck
result = await mount.deviceIsMountedAtPath(
device,
staging_target_path
);
if (!result) {
await filesystem.checkFilesystem(device, fs_type);
}
}
result = await mount.deviceIsMountedAtPath(device, staging_target_path);
if (!result) {
await mount.mount(
device,
staging_target_path,
["-t", fs_type].concat(["-o", mount_flags.join(",")])
);
}
if (await filesystem.isBlockDevice(device)) {
// go ahead and expand fs (this covers cloned setups where expand is not explicitly invoked)
switch (fs_type) {
case "ext4":
case "ext3":
case "ext4dev":
//await filesystem.checkFilesystem(device, fs_info.type);
await filesystem.expandFilesystem(device, fs_type);
break;
case "xfs":
//await filesystem.checkFilesystem(device, fs_info.type);
await filesystem.expandFilesystem(staging_target_path, fs_type);
break;
default:
// unsupported filesystem
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`unsupported/unknown filesystem ${fs_type}`
);
}
}
break;
case "block":
//result = await mount.deviceIsMountedAtPath(device, block_path);
result = await mount.deviceIsMountedAtPath("dev", block_path);
if (!result) {
result = await filesystem.pathExists(staging_target_path);
if (!result) {
await filesystem.mkdir(staging_target_path, ["-p", "-m", "0750"]);
}
result = await filesystem.pathExists(block_path);
if (!result) {
await filesystem.touch(block_path);
}
await mount.bindMount(device, block_path, [
"-o",
bind_mount_flags.join(",")
]);
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported access_type: ${access_type}`
);
}
return {};
}
/**
* NOTE: only gets called when the last pod on the node using the volume is removed
*
* 1. unmount fs
* 2. logout of iscsi if neccessary
*
* @param {*} call
*/
async NodeUnstageVolume(call) {
const mount = new Mount();
const filesystem = new Filesystem();
const iscsi = new ISCSI();
let result;
let is_block = false;
let block_device_info;
let access_type = "mount";
const volume_id = call.request.volume_id;
const staging_target_path = call.request.staging_target_path;
const block_path = staging_target_path + "/block_device";
let normalized_staging_path = staging_target_path;
if (!staging_target_path) {
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`missing staging_target_path`
);
}
//result = await mount.pathIsMounted(block_path);
//result = await mount.pathIsMounted(staging_target_path)
result = await mount.pathIsMounted(block_path);
if (result) {
is_block = true;
access_type = "block";
block_device_info = await filesystem.getBlockDevice(block_path);
normalized_staging_path = block_path;
} else {
result = await mount.pathIsMounted(staging_target_path);
if (result) {
let device = await mount.getMountPointDevice(staging_target_path);
result = await filesystem.isBlockDevice(device);
if (result) {
is_block = true;
block_device_info = await filesystem.getBlockDevice(device);
}
}
}
if (is_block) {
if (block_device_info.tran == "iscsi") {
// figure out which iscsi session this belongs to and logout
// scan /dev/disk/by-path/ip-*?
// device = `/dev/disk/by-path/ip-${volume_context.portal}-iscsi-${volume_context.iqn}-lun-${volume_context.lun}`;
// parse output from `iscsiadm -m session -P 3`
let sessions = await iscsi.iscsiadm.getSessionsDetails();
for (let i = 0; i < sessions.length; i++) {
let session = sessions[i];
let is_attached_to_session = false;
if (
session.attached_scsi_devices &&
session.attached_scsi_devices.host &&
session.attached_scsi_devices.host.devices
) {
is_attached_to_session = session.attached_scsi_devices.host.devices.some(
device => {
if (device.attached_scsi_disk == block_device_info.name) {
return true;
}
return false;
}
);
}
if (is_attached_to_session) {
await iscsi.iscsiadm.logout(session.target, [
session.persistent_portal
]);
let timer_start = Math.round(new Date().getTime() / 1000);
let timer_max = 30;
let deletedEntry = false;
while (!deletedEntry) {
try {
await iscsi.iscsiadm.deleteNodeDBEntry(
session.target,
session.persistent_portal
);
deletedEntry = true;
} catch (err) {
await sleep(2000);
let current_time = Math.round(new Date().getTime() / 1000);
if (current_time - timer_start > timer_max) {
// not throwing error for now as future invocations would not enter code path anyhow
deletedEntry = true;
//throw new GrpcError(
// grpc.status.UNKNOWN,
// `hit timeout trying to delete iscsi node DB entry: ${session.target}, ${session.persistent_portal}`
//);
}
}
}
}
}
}
}
result = await mount.pathIsMounted(normalized_staging_path);
if (result) {
result = await mount.umount(normalized_staging_path, ["--force"]);
}
if (access_type == "block") {
// remove touched file
result = await filesystem.pathExists(block_path);
if (result) {
result = await filesystem.rm(block_path);
}
}
result = await filesystem.pathExists(staging_target_path);
if (result) {
result = await filesystem.rmdir(staging_target_path);
}
return {};
}
async NodePublishVolume(call) {
const mount = new Mount();
const filesystem = new Filesystem();
let result;
const volume_id = call.request.volume_id;
const staging_target_path = call.request.staging_target_path || "";
const target_path = call.request.target_path;
const capability = call.request.volume_capability;
const access_type = capability.access_type || "mount";
const readonly = call.request.readonly;
const volume_context = call.request.volume_context;
const bind_mount_flags = [];
const node_attach_driver = volume_context.node_attach_driver;
if (access_type == "mount") {
let mount_flags = capability.mount.mount_flags || [];
bind_mount_flags.push(...mount_flags);
}
bind_mount_flags.push("defaults");
if (readonly) bind_mount_flags.push("ro");
switch (node_attach_driver) {
case "nfs":
case "iscsi":
// ensure appropriate directories/files
switch (access_type) {
case "mount":
// ensure directory exists
result = await filesystem.pathExists(target_path);
if (!result) {
await filesystem.mkdir(target_path, ["-p", "-m", "0750"]);
}
break;
case "block":
// ensure target_path directory exists as target path should be a file
let target_dir = await filesystem.dirname(target_path);
result = await filesystem.pathExists(target_dir);
if (!result) {
await filesystem.mkdir(target_dir, ["-p", "-m", "0750"]);
}
// ensure target file exists
result = await filesystem.pathExists(target_path);
if (!result) {
await filesystem.touch(target_path);
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unsupported/unknown access_type ${access_type}`
);
}
// ensure bind mount
if (staging_target_path) {
let normalized_staging_device;
let normalized_staging_path;
if (access_type == "block") {
normalized_staging_path = staging_target_path + "/block_device";
} else {
normalized_staging_path = staging_target_path;
}
result = await mount.pathIsMounted(target_path);
// if not mounted, mount
if (!result) {
await mount.bindMount(normalized_staging_path, target_path, [
"-o",
bind_mount_flags.join(",")
]);
} else {
// if is mounted, ensure proper source
if (access_type == "block") {
normalized_staging_device = "dev"; // special syntax for single file bind mounts
} else {
normalized_staging_device = await mount.getMountPointDevice(
staging_target_path
);
}
result = await mount.deviceIsMountedAtPath(
normalized_staging_device,
target_path
);
if (!result) {
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`it appears something else is already mounted at ${target_path}`
);
}
}
return {};
}
// unsupported filesystem
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`only staged configurations are valid`
);
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown/unsupported node_attach_driver: ${node_attach_driver}`
);
}
return {};
}
async NodeUnpublishVolume(call) {
const mount = new Mount();
const filesystem = new Filesystem();
let result;
const volume_id = call.request.volume_id;
const target_path = call.request.target_path;
result = await mount.pathIsMounted(target_path);
if (result) {
result = await mount.umount(target_path, ["--force"]);
}
result = await filesystem.pathExists(target_path);
if (result) {
if (fs.lstatSync(target_path).isDirectory()) {
result = await filesystem.rmdir(target_path);
} else {
result = await filesystem.rm([target_path]);
}
}
return {};
}
async NodeGetVolumeStats(call) {
const mount = new Mount();
const filesystem = new Filesystem();
let result;
let device_path;
let access_type;
const volume_id = call.request.volume_id;
const volume_path = call.request.volume_path;
const block_path = volume_path + "/block_device";
if (!volume_path) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_path`);
}
if (
(await mount.isBindMountedBlockDevice(volume_path)) ||
(await mount.isBindMountedBlockDevice(block_path))
) {
device_path = block_path;
access_type = "block";
} else {
device_path = volume_path;
access_type = "mount";
}
switch (access_type) {
case "mount":
result = await mount.getMountDetails(device_path);
return {
usage: [
{
available: result.avail,
total: result.size,
used: result.used,
unit: "BYTES"
}
]
};
case "block":
result = await filesystem.getBlockDevice(device_path);
return {
usage: [
{
total: result.size,
unit: "BYTES"
}
]
};
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unsupported/unknown access_type ${access_type}`
);
}
}
/**
* https://kubernetes-csi.github.io/docs/volume-expansion.html
* allowVolumeExpansion: true
* --feature-gates=ExpandCSIVolumes=true
* --feature-gates=ExpandInUsePersistentVolumes=true
*
* @param {*} call
*/
async NodeExpandVolume(call) {
const mount = new Mount();
const filesystem = new Filesystem();
let device;
let fs_info;
let device_path;
let access_type;
let is_block = false;
let is_formatted;
let fs_type;
const volume_id = call.request.volume_id;
const volume_path = call.request.volume_path;
const block_path = volume_path + "/block_device";
const capacity_range = call.request.capacity_range;
const volume_capability = call.request.volume_capability;
if (!volume_path) {
throw new GrpcError(grpc.status.INVALID_ARGUMENT, `missing volume_path`);
}
if (
(await mount.isBindMountedBlockDevice(volume_path)) ||
(await mount.isBindMountedBlockDevice(block_path))
) {
access_type = "block";
device_path = block_path;
} else {
access_type = "mount";
device_path = volume_path;
}
try {
device = await mount.getMountPointDevice(device_path);
is_formatted = await filesystem.deviceIsFormatted(device);
is_block = await filesystem.isBlockDevice(device);
} catch (err) {
if (err.code == 1) {
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`volume_path ${volume_path} is not currently mounted`
);
}
}
if (is_block) {
await filesystem.rescanDevice(device);
if (is_formatted && access_type == "mount") {
fs_info = await filesystem.getDeviceFilesystemInfo(device);
fs_type = fs_info.type;
if (fs_type) {
switch (fs_type) {
case "ext4":
case "ext3":
case "ext4dev":
//await filesystem.checkFilesystem(device, fs_info.type);
await filesystem.expandFilesystem(device, fs_type);
break;
case "xfs":
let mount_info = await mount.getMountDetails(device_path);
if (mount_info.fstype == "xfs") {
//await filesystem.checkFilesystem(device, fs_info.type);
await filesystem.expandFilesystem(device_path, fs_type);
}
break;
default:
// unsupported filesystem
throw new GrpcError(
grpc.status.FAILED_PRECONDITION,
`unsupported/unknown filesystem ${fs_type}`
);
}
}
} else {
//block device unformatted
return {};
}
} else {
// not block device
return {};
}
return {};
}
}
module.exports.CsiBaseDriver = CsiBaseDriver;

471
src/utils/filesystem.js Normal file
View File

@ -0,0 +1,471 @@
const cp = require("child_process");
const fs = require("fs");
/**
* https://github.com/kubernetes/kubernetes/tree/master/pkg/util/mount
* https://github.com/kubernetes/kubernetes/blob/master/pkg/util/mount/mount_linux.go
*/
class Filesystem {
constructor(options = {}) {
const filesystem = this;
filesystem.options = options;
options.paths = options.paths || {};
if (!options.paths.sudo) {
options.paths.sudo = "/usr/bin/sudo";
}
if (!options.timeout) {
options.timeout = 10 * 60 * 1000;
}
if (!options.executor) {
options.executor = {
spawn: cp.spawn
};
}
}
/**
* Attempt to discover if device is a block device
*
* @param {*} device
*/
async isBlockDevice(device) {
const filesystem = this;
if (!device.startsWith("/")) {
return false;
}
const device_path = await filesystem.realpath(device);
const blockdevices = await filesystem.getAllBlockDevices();
return blockdevices.some(i => {
if (i.path == device_path) {
return true;
}
return false;
});
}
/**
* create symlink
*
* @param {*} device
*/
async symlink(target, link, options = []) {
const filesystem = this;
let args = ["-s"];
args = args.concat(options);
args = args.concat([target, link]);
try {
await filesystem.exec("ln", args);
} catch (err) {
throw err;
}
}
/**
* create symlink
*
* @param {*} device
*/
async rm(options = []) {
const filesystem = this;
let args = [];
args = args.concat(options);
try {
await filesystem.exec("rm", args);
} catch (err) {
throw err;
}
}
/**
* touch a path
* @param {*} path
*/
async touch(path, options = []) {
const filesystem = this;
let args = [];
args = args.concat(options);
args.push(path);
try {
await filesystem.exec("touch", args);
} catch (err) {
throw err;
}
}
/**
* touch a path
* @param {*} path
*/
async dirname(path) {
const filesystem = this;
let args = [];
args.push(path);
let result;
try {
result = await filesystem.exec("dirname", args);
return result.stdout.trim();
} catch (err) {
throw err;
}
}
/**
* lsblk -a -b -l -J -O
*/
async getAllBlockDevices() {
const filesystem = this;
let args = ["-a", "-b", "-l", "-J", "-O"];
let result;
try {
result = await filesystem.exec("lsblk", args);
const parsed = JSON.parse(result.stdout);
return parsed.blockdevices;
} catch (err) {
throw err;
}
}
/**
* lsblk -a -b -l -J -O
*/
async getBlockDevice(device) {
const filesystem = this;
device = await filesystem.realpath(device);
let args = ["-a", "-b", "-l", "-J", "-O"];
args.push(device);
let result;
try {
result = await filesystem.exec("lsblk", args);
const parsed = JSON.parse(result.stdout);
return parsed.blockdevices[0];
} catch (err) {
throw err;
}
}
/**
* blkid -p -o export <device>
*
* @param {*} device
*/
async deviceIsFormatted(device) {
const filesystem = this;
let args = ["-p", "-o", "export", device];
let result;
try {
result = await filesystem.exec("blkid", args);
} catch (err) {
if (err.code == 2) {
return false;
}
throw err;
}
return true;
}
/**
* blkid -p -o export <device>
*
* @param {*} device
*/
async getDeviceFilesystemInfo(device) {
const filesystem = this;
let args = ["-p", "-o", "export", device];
let result;
try {
result = await filesystem.exec("blkid", args);
const entries = result.stdout.trim().split("\n");
const properties = {};
let fields, key, value;
entries.forEach(entry => {
fields = entry.split("=");
key = fields[0].toLowerCase();
value = fields[1];
properties[key] = value;
});
return properties;
} catch (err) {
throw err;
}
}
/**
* mkfs.<fstype> [<options>] device
*
* @param {*} device
* @param {*} fstype
* @param {*} options
*/
async formatDevice(device, fstype, options = []) {
const filesystem = this;
let args = [];
args = args.concat(options);
switch (fstype) {
case "vfat":
args = args.concat(["-I"]);
break;
}
args.push(device);
let result;
try {
result = await filesystem.exec("mkfs." + fstype, args);
return result;
} catch (err) {
throw err;
}
}
async realpath(path) {
const filesystem = this;
let args = [path];
let result;
try {
result = await filesystem.exec("realpath", args);
return result.stdout.trim();
} catch (err) {
throw err;
}
}
async rescanDevice(device) {
const filesystem = this;
let result;
let device_name;
result = await filesystem.isBlockDevice(device);
if (!result) {
throw new Error(
`cannot rescan device ${device} because it is not a block device`
);
}
result = await filesystem.realpath(device);
device_name = result.split("/").pop();
// echo 1 > /sys/block/sdb/device/rescan
const sys_file = `/sys/block/${device_name}/device/rescan`;
fs.writeFileSync(sys_file, "1");
}
/**
* expand a give filesystem
*
* @param {*} device
* @param {*} fstype
* @param {*} options
*/
async expandFilesystem(device, fstype, options = []) {
const filesystem = this;
let command;
let args = [];
let result;
switch (fstype.toLowerCase()) {
case "ext4":
case "ext3":
case "ext4dev":
command = "resize2fs";
args = args.concat(options);
args.push(device);
break;
case "xfs":
command = "xfs_growfs";
args = args.concat(options);
args.push(device); // in this case should be a mounted path
break;
case "vfat":
// must be unmounted
command = "fatresize";
args = args.concat(options);
args = args.concat(["-s", "max"]);
args.push(device);
break;
}
try {
result = await filesystem.exec(command, args);
return result;
} catch (err) {
throw err;
}
}
/**
* expand a give filesystem
*
* fsck [options] -- [fs-options] [<filesystem> ...]
*
* @param {*} device
* @param {*} fstype
* @param {*} options
* @param {*} fsoptions
*/
async checkFilesystem(device, fstype, options = [], fsoptions = []) {
const filesystem = this;
let command;
let args = [];
let result;
switch (fstype.toLowerCase()) {
case "ext4":
case "ext3":
case "ext4dev":
command = "fsck";
args = args.concat(options);
args.push(device);
args.push("--");
args = args.concat(fsoptions);
args.push("-f");
args.push("-p");
break;
case "xfs":
command = "xfs_repair";
args = args.concat(["-o", "force_geometry"]);
args = args.concat(options);
args.push(device);
break;
default:
command = "fsck";
args = args.concat(options);
args.push(device);
args.push("--");
args = args.concat(fsoptions);
break;
}
try {
result = await filesystem.exec(command, args);
return result;
} catch (err) {
throw err;
}
}
/**
* mkdir [<options>] <path>
*
* @param {*} path
* @param {*} options
*/
async mkdir(path, options = []) {
const filesystem = this;
let args = [];
args = args.concat(options);
args.push(path);
try {
await filesystem.exec("mkdir", args);
} catch (err) {
throw err;
}
return true;
}
/**
* rmdir [<options>] <path>
*
* @param {*} path
* @param {*} options
*/
async rmdir(path, options = []) {
const filesystem = this;
let args = [];
args = args.concat(options);
args.push(path);
try {
await filesystem.exec("rmdir", args);
} catch (err) {
throw err;
}
return true;
}
/**
*
* @param {*} path
*/
async pathExists(path) {
const filesystem = this;
let args = [];
args.push(path);
try {
await filesystem.exec("stat", args);
} catch (err) {
return false;
}
return true;
}
exec(command, args, options) {
const filesystem = this;
args = args || [];
let timeout;
let stdout = "";
let stderr = "";
if (filesystem.options.sudo) {
args.unshift(command);
command = filesystem.options.paths.sudo;
}
console.log("executing fileystem command: %s %s", command, args.join(" "));
const child = filesystem.options.executor.spawn(command, args, options);
let didTimeout = false;
if (options && options.timeout) {
timeout = setTimeout(() => {
didTimeout = true;
child.kill(options.killSignal || "SIGTERM");
}, options.timeout);
}
return new Promise((resolve, reject) => {
child.stdout.on("data", function(data) {
stdout = stdout + data;
});
child.stderr.on("data", function(data) {
stderr = stderr + data;
});
child.on("close", function(code) {
const result = { code, stdout, stderr };
if (timeout) {
clearTimeout(timeout);
}
if (code) {
console.log(
"failed to execute filesystem command: %s, response: %j",
[command].concat(args).join(" "),
result
);
reject(result);
} else {
resolve(result);
}
});
});
}
}
module.exports.Filesystem = Filesystem;

7
src/utils/general.js Normal file
View File

@ -0,0 +1,7 @@
function sleep(ms){
return new Promise(resolve=>{
setTimeout(resolve,ms)
})
}
module.exports.sleep = sleep;

9
src/utils/grpc.js Normal file
View File

@ -0,0 +1,9 @@
class GrpcError {
constructor(code, message = "") {
this.name = "GrpcError";
this.code = code;
this.message = message;
}
}
module.exports.GrpcError = GrpcError;

483
src/utils/iscsi.js Normal file
View File

@ -0,0 +1,483 @@
const cp = require("child_process");
function getIscsiValue(value) {
if (value == "<empty>") return null;
return value;
}
class ISCSI {
constructor(options = {}) {
const iscsi = this;
iscsi.options = options;
options.paths = options.paths || {};
if (!options.paths.iscsiadm) {
options.paths.iscsiadm = "iscsiadm";
}
if (!options.paths.sudo) {
options.paths.sudo = "/usr/bin/sudo";
}
if (!options.timeout) {
options.timeout = 10 * 60 * 1000;
}
if (!options.executor) {
options.executor = {
spawn: cp.spawn
};
}
iscsi.iscsiadm = {
/**
* iscsiadm -m iface -o show
* iface_name transport_name,hwaddress,ipaddress,net_ifacename,initiatorname
*/
async listInterfaces() {
let args = [];
args = args.concat(["-m", "iface", "-o", "show"]);
const result = await iscsi.exec(options.paths.iscsiadm, args);
// return empty list if no stdout data
if (!result.stdout) {
return [];
}
const entries = result.stdout.trim().split("\n");
const interfaces = [];
let fields;
entries.forEach(entry => {
fields = entry.split(" ");
interfaces.push({
iface_name: fields[0],
transport_name: fields[1].split(",")[0],
hwaddress: getIscsiValue(fields[1].split(",")[1]),
ipaddress: getIscsiValue(fields[1].split(",")[2]),
net_ifacename: getIscsiValue(fields[1].split(",")[3]),
initiatorname: getIscsiValue(fields[1].split(",")[4])
});
});
return interfaces;
},
/**
* iscsiadm -m iface -o show -I <iface>
*
* @param {*} iface
*/
async showInterface(iface) {
let args = [];
args = args.concat(["-m", "iface", "-o", "show", "-I", iface]);
let result = await iscsi.exec(options.paths.iscsiadm, args);
const entries = result.stdout.trim().split("\n");
const i = {};
let fields, key, value;
entries.forEach(entry => {
if (entry.startsWith("#")) return;
fields = entry.split("=");
key = fields[0].trim();
value = fields[1].trim();
i[key] = getIscsiValue(value);
});
return i;
},
/**
* iscsiadm --mode node -T <target> -p <portal> -o new
*
* @param {*} tgtIQN
* @param {*} portal
* @param {*} attributes
*/
async createNodeDBEntry(tgtIQN, portal, attributes = {}) {
let args = [];
args = args.concat([
"-m",
"node",
"-T",
tgtIQN,
"-p",
portal,
"-o",
"new"
]);
await iscsi.exec(options.paths.iscsiadm, args);
for (let attribute in attributes) {
let args = [];
args = args.concat([
"-m",
"node",
"-T",
tgtIQN,
"-p",
portal,
"-o",
"update",
"--name",
attribute,
"--value",
attributes[attribute]
]);
await iscsi.exec(options.paths.iscsiadm, args);
}
},
/**
* iscsiadm --mode node -T <target> -p <portal> -o delete
*
* @param {*} tgtIQN
* @param {*} portal
*/
async deleteNodeDBEntry(tgtIQN, portal) {
let args = [];
args = args.concat([
"-m",
"node",
"-T",
tgtIQN,
"-p",
portal,
"-o",
"delete"
]);
await iscsi.exec(options.paths.iscsiadm, args);
},
/**
* iscsiadm -m session
*/
async getSessions() {
let args = [];
args = args.concat(["-m", "session"]);
let result;
try {
result = await iscsi.exec(options.paths.iscsiadm, args);
} catch (err) {
// no active sessions
if (err.code == 21) {
result = err;
} else {
throw err;
}
}
// return empty list if no stdout data
if (!result.stdout) {
return [];
}
// protocol: [id] ip:port,target_portal_group_tag targetname
const entries = result.stdout.trim().split("\n");
const sessions = [];
let fields;
entries.forEach(entry => {
fields = entry.split(" ");
sessions.push({
protocol: entry.split(":")[0],
id: fields[1].replace("[", "").replace("]", ""),
portal: fields[2].split(",")[0],
target_portal_group_tag: fields[2].split(",")[1],
iqn: fields[3].split(":")[0],
target: fields[3].split(":")[1]
});
});
return sessions;
},
/**
* iscsiadm -m session
*/
async getSessionsDetails() {
let args = [];
args = args.concat(["-m", "session", "-P", "3"]);
let result;
try {
result = await iscsi.exec(options.paths.iscsiadm, args);
} catch (err) {
// no active sessions
if (err.code == 21) {
result = err;
} else {
throw err;
}
}
// return empty list if no stdout data
if (!result.stdout) {
return [];
}
let sessionGroups = [];
let currentSession = [];
// protocol: [id] ip:port,target_portal_group_tag targetname
const entries = result.stdout.trim().split("\n");
// remove first 2 lines
entries.shift();
entries.shift();
for (let i = 0; i < entries.length; i++) {
let entry = entries[i];
if (entry.startsWith("Target:")) {
if (currentSession.length > 0) {
sessionGroups.push(currentSession);
}
currentSession = [entry];
} else {
currentSession.push(entry);
}
if (i + 1 == entries.length) {
sessionGroups.push(currentSession);
}
}
const sessions = [];
for (let i = 0; i < sessionGroups.length; i++) {
let sessionLines = sessionGroups[i];
let session = {};
let currentSection;
for (let j = 0; j < sessionLines.length; j++) {
let line = sessionLines[j].trim();
let uniqueChars = String.prototype.concat(...new Set(line));
if (uniqueChars == "*") {
currentSection = sessionLines[j + 1]
.trim()
.toLowerCase()
.replace(/ /g, "_")
.replace(/\W/g, "");
j++;
j++;
continue;
}
let key = line
.split(":", 1)[0]
.trim()
.replace(/ /g, "_")
.replace(/\W/g, "");
let value = line
.split(":")
.slice(1)
.join(":")
.trim();
if (currentSection) {
session[currentSection] = session[currentSection] || {};
switch (currentSection) {
case "attached_scsi_devices":
key = key.toLowerCase();
if (key == "host_number") {
session[currentSection]["host"] = {
number: value.split("\t")[0],
state: value
.split("\t")
.slice(1)
.join("\t")
.split(":")
.slice(1)
.join(":")
.trim()
};
while (
sessionLines[j + 1] &&
sessionLines[j + 1].trim().startsWith("scsi")
) {
session[currentSection]["host"]["devices"] =
session[currentSection]["host"]["devices"] || [];
let line1p = sessionLines[j + 1].split(" ");
let line2 = sessionLines[j + 2];
let line2p = "";
if (line2) {
line2p = line2.split(" ");
session[currentSection]["host"]["devices"].push({
channel: line1p[2],
id: line1p[4],
lun: line1p[6],
attached_scsi_disk: line2p[3].split("\t")[0],
state: line2
.trim()
.split("\t")
.slice(1)
.join("\t")
.split(":")
.slice(1)
.join(":")
.trim()
});
}
j++;
j++;
}
continue;
}
break;
case "negotiated_iscsi_params":
key = key.charAt(0).toLowerCase() + key.slice(1);
key = key.replace(
/[A-Z]/g,
letter => `_${letter.toLowerCase()}`
);
break;
}
key = key.toLowerCase();
session[currentSection][key] = value;
} else {
key = key.toLowerCase();
if (key == "target") {
value = value.split(" ")[0];
}
session[key.trim()] = value.trim();
}
}
sessions.push(session);
}
return sessions;
},
/**
* iscsiadm -m discovery -t st -p <portal>
*
* @param {*} portal
*/
async discoverTargets(portal) {
let args = [];
args = args.concat(["-m", "discovery"]);
args = args.concat(["-t", "sendtargets"]);
args = args.concat(["-p", portal]);
let result;
try {
result = await iscsi.exec(options.paths.iscsiadm, args);
} catch (err) {
throw err;
}
// return empty list if no stdout data
if (!result.stdout) {
return [];
}
const entries = result.stdout.trim().split("\n");
const targets = [];
entries.forEach(entry => {
targets.push({
portal: entry.split(",")[0],
target_portal_group_tag: entry.split(" ")[0].split(",")[1],
iqn: entry.split(" ")[1].split(":")[0],
target: entry.split(" ")[1].split(":")[1]
});
});
return targets;
},
/**
* iscsiadm -m node -T <target> -p <portal> -l
*
* @param {*} tgtIQN
* @param {*} portal
*/
async login(tgtIQN, portal) {
let args = [];
args = args.concat(["-m", "node", "-T", tgtIQN, "-p", portal, "-l"]);
try {
await iscsi.exec(options.paths.iscsiadm, args);
} catch (err) {
// already logged in
if (err.code == 15) {
return true;
}
throw err;
}
return true;
},
/**
*
*
* @param {*} tgtIQN
* @param {*} portals
*/
async logout(tgtIQN, portals) {
let args = [];
args = args.concat(["-m", "node", "-T", tgtIQN]);
if (!Array.isArray(portals)) {
portals = [portals];
}
portals.forEach(p => {
iscsi
.exec(options.paths.iscsiadm, args.concat(["-p", p, "-u"]))
.then(() => {})
.catch(err => {
if (err.code == 21) {
// no matching sessions
}
});
});
return true;
},
async deleteDBEntry(tgtIQN) {}
};
}
exec(command, args, options) {
const iscsi = this;
args = args || [];
let timeout;
let stdout = "";
let stderr = "";
if (iscsi.options.sudo) {
args.unshift(command);
command = iscsi.options.paths.sudo;
}
console.log("executing iscsi command: %s %s", command, args.join(" "));
const child = iscsi.options.executor.spawn(command, args, options);
let didTimeout = false;
if (options && options.timeout) {
timeout = setTimeout(() => {
didTimeout = true;
child.kill(options.killSignal || "SIGTERM");
}, options.timeout);
}
return new Promise((resolve, reject) => {
child.stdout.on("data", function(data) {
stdout = stdout + data;
});
child.stderr.on("data", function(data) {
stderr = stderr + data;
});
child.on("close", function(code) {
const result = { code, stdout, stderr };
if (timeout) {
clearTimeout(timeout);
}
if (code) {
reject(result);
} else {
resolve(result);
}
});
});
}
}
module.exports.ISCSI = ISCSI;

116
src/utils/logger.js Normal file
View File

@ -0,0 +1,116 @@
/**
* Levels
*
* error: 0
* warn: 1
* info: 2
* verbose: 3
* debug: 4
* silly: 5
*/
const winston = require("winston");
const bunyan = require("bunyan");
const env = process.env.NODE_ENV || "development";
let level = process.env.DEMOCRATIC_CSI_LOG_LEVEL || null;
if (!level) {
if (env == "production") {
level = "info";
} else {
level = "verbose";
}
}
let formatters;
let defaultMeta;
if (env == "production") {
formatters = [winston.format.json()];
defaultMeta = { service: "democratic-csi" };
} else {
formatters = [winston.format.colorize(), winston.format.simple()];
defaultMeta = {};
}
const logger = winston.createLogger({
level: level,
format: winston.format.combine(
winston.format.errors({ stack: true }),
winston.format.splat(),
...formatters
),
defaultMeta: defaultMeta,
transports: [
new winston.transports.Console({
handleExceptions: true
})
]
});
/**
* A Bunyan raw stream object (i.e. has a `.write(rec)` method that takes a
* Bunyan log record) that shims logging to a given Winston logger.
*
* @param {winston.Logger} wlog is a Winston Logger to which to shim.
*/
function Bunyan2Winston(wlog) {
this.wlog = wlog;
}
Bunyan2Winston.prototype.write = function write(rec) {
// Map to the appropriate Winston log level (by default 'info', 'warn'
// or 'error') and call signature: `wlog.log(level, msg, metadata)`.
var wlevel;
if (rec.level <= bunyan.INFO) {
wlevel = "info";
} else if (rec.level <= bunyan.WARN) {
wlevel = "warn";
} else {
wlevel = "error";
}
// Note: We are *modifying* the log record here. This could be a problem
// if our Bunyan logger had other streams. This one doesn't.
var msg = rec.msg;
delete rec.msg;
// Remove internal bunyan fields that won't mean anything outside of
// a bunyan context.
delete rec.v;
delete rec.level;
// TODO: more?
// Note: Winston doesn't handle *objects* in the 'metadata' field well
// (e.g. the Bunyan record 'time' field is a Date instance, 'req' and
// 'res' are typically objects). With 'json: true' on a Winston transport
// it is a bit better, but still messes up 'date'. What exactly to do
// here is perhaps user-preference.
rec.time = String(rec.time);
//Object.keys(rec).forEach(function (key) {
// if (typeof(rec[key]) === "object") {
// rec[key] = JSON.stringify(rec[key])
// }
//});
this.wlog.log(wlevel, msg, rec);
};
// Pass a Bunyan logger to restify that shims to our winston Logger.
var shim = bunyan.createLogger({
name: "eas",
streams: [
{
type: "raw",
level: "trace",
stream: new Bunyan2Winston(logger)
}
]
});
logger.bunyan = shim;
//global.console = logger;
module.exports = {
logger: logger
};

328
src/utils/mount.js Normal file
View File

@ -0,0 +1,328 @@
const cp = require("child_process");
const { Filesystem } = require("../utils/filesystem");
FINDMNT_COMMON_OPTIONS = [
"--output",
"source,target,fstype,label,options,avail,size,used",
"-b",
"-J"
];
class Mount {
constructor(options = {}) {
const mount = this;
mount.options = options;
options.paths = options.paths || {};
if (!options.paths.mount) {
options.paths.mount = "mount";
}
if (!options.paths.umount) {
options.paths.umount = "umount";
}
if (!options.paths.findmnt) {
options.paths.findmnt = "findmnt";
}
if (!options.paths.sudo) {
options.paths.sudo = "/usr/bin/sudo";
}
if (!options.timeout) {
options.timeout = 10 * 60 * 1000;
}
if (!options.executor) {
options.executor = {
spawn: cp.spawn
};
}
}
/**
* findmnt --source <device> --output source,target,fstype,label,options,avail,size,used -b -J
*
* @param {*} device
*/
async deviceIsMounted(device) {
const filesystem = new Filesystem();
if (device.startsWith("/")) {
device = await filesystem.realpath(device);
}
const mount = this;
let args = [];
args = args.concat(["--source", device]);
args = args.concat(FINDMNT_COMMON_OPTIONS);
let result;
try {
result = await mount.exec(mount.options.paths.findmnt, args);
} catch (err) {
// no results
if (err.code == 1) {
return false;
} else {
throw err;
}
}
return true;
}
/**
* findmnt --mountpoint / --output source,target,fstype,label,options,avail,size,used -b -J
*
* @param {*} device
*/
async pathIsMounted(path) {
const mount = this;
let args = [];
args = args.concat(["--mountpoint", path]);
args = args.concat(FINDMNT_COMMON_OPTIONS);
let result;
try {
result = await mount.exec(mount.options.paths.findmnt, args);
} catch (err) {
// no results
if (err.code == 1) {
return false;
} else if (
err.code == 32 &&
err.stderr &&
err.stderr.contains("No such file or directory")
) {
return false;
} else {
throw err;
}
}
return true;
}
/**
* findmnt --source <device> --mountpoint <path> --output source,target,fstype,label,options,avail,size,used -b -J
*
* @param {*} device
*/
async deviceIsMountedAtPath(device, path) {
const filesystem = new Filesystem();
if (device.startsWith("/")) {
device = await filesystem.realpath(device);
}
const mount = this;
let args = [];
args = args.concat(["--source", device]);
args = args.concat(["--mountpoint", path]);
args = args.concat(FINDMNT_COMMON_OPTIONS);
let result;
try {
result = await mount.exec(mount.options.paths.findmnt, args);
} catch (err) {
// no results
if (err.code == 1) {
return false;
} else {
throw err;
}
}
return true;
}
/**
* findmnt --mountpoint / --output source,target,fstype,label,options,avail,size,used -b -J
*
* @param {*} path
*/
async getMountDetails(path) {
const mount = this;
let args = [];
args = args.concat(["--mountpoint", path]);
args = args.concat(FINDMNT_COMMON_OPTIONS);
let result;
try {
result = await mount.exec(mount.options.paths.findmnt, args);
const parsed = JSON.parse(result.stdout);
return parsed.filesystems[0];
} catch (err) {
throw err;
}
}
/**
* Get the device (source) at the given mount point
*
* @param {*} path
*/
async getMountPointDevice(path) {
const mount = this;
const result = await mount.getMountDetails(path);
if (result.fstype == "devtmpfs") {
// dev[/sdb]
let source = "/";
source += result.source;
source = source.replace("[", "");
source = source.replace("]", "");
return source.trim();
}
return result.source.trim();
}
/**
* very specifically looking for *devices* vs *filesystems/directories* which were bind mounted
*
* @param {*} path
*/
async isBindMountedBlockDevice(path) {
const filesystem = new Filesystem();
const mount = this;
const is_mounted = await mount.pathIsMounted(path);
if (!is_mounted) {
return false;
}
const mount_info = await mount.getMountDetails(path);
const is_block = filesystem.isBlockDevice(path);
if (mount_info.fstype == "devtmpfs" && is_block) {
return true;
}
return false;
}
/**
* Get the filesystem type at mount point
*
* @param {*} path
*/
async getMountPointFsType(path) {
const mount = this;
const result = await mount.getMountDetails(path);
return result.fstype;
}
/**
* mount [options] <source> <directory>
*
* @param {*} source
* @param {*} target
* @param {*} options
*/
async mount(source, target, options = []) {
const mount = this;
let args = [];
args = args.concat(options);
args = args.concat([source, target]);
let result;
try {
result = await mount.exec(mount.options.paths.mount, args);
return result;
} catch (err) {
throw err;
}
}
/**
* mount <operation> <mountpoint> [<target>]
*
* @param {*} source
* @param {*} target
* @param {*} options
*/
async bindMount(source, target, options = []) {
const mount = this;
let args = [];
args.push("--bind");
args = args.concat(options);
args = args.concat([source, target]);
let result;
try {
result = await mount.exec(mount.options.paths.mount, args);
return result;
} catch (err) {
throw err;
}
}
/**
* umount [options] <source> | <directory>
*
* @param {*} target
* @param {*} options
*/
async umount(target, options = []) {
const mount = this;
let args = [];
args = args.concat(options);
args.push(target);
try {
await mount.exec(mount.options.paths.umount, args);
} catch (err) {
if (err.code == 32) {
return true;
} else {
throw err;
}
}
return true;
}
exec(command, args, options) {
const mount = this;
args = args || [];
let timeout;
let stdout = "";
let stderr = "";
if (mount.options.sudo) {
args.unshift(command);
command = mount.options.paths.sudo;
}
console.log("executing mount command: %s %s", command, args.join(" "));
const child = mount.options.executor.spawn(command, args, options);
let didTimeout = false;
if (options && options.timeout) {
timeout = setTimeout(() => {
didTimeout = true;
child.kill(options.killSignal || "SIGTERM");
}, options.timeout);
}
return new Promise((resolve, reject) => {
child.stdout.on("data", function(data) {
stdout = stdout + data;
});
child.stderr.on("data", function(data) {
stderr = stderr + data;
});
child.on("close", function(code) {
const result = { code, stdout, stderr };
if (timeout) {
clearTimeout(timeout);
}
if (code) {
reject(result);
} else {
resolve(result);
}
});
});
}
}
module.exports.Mount = Mount;

97
src/utils/ssh.js Normal file
View File

@ -0,0 +1,97 @@
var Client = require("ssh2").Client;
class SshClient {
constructor(options = {}) {
this.options = options;
this.options.connection = this.options.connection || {};
if (this.options.logger) {
this.logger = this.options.logger;
} else {
this.logger = console;
}
}
/**
* Build a command line from the name and given args
* TODO: escape the arguments
*
* @param {*} name
* @param {*} args
*/
buildCommand(name, args = []) {
args.unshift(name);
return args.join(" ");
}
debug() {
this.logger.silly(...arguments);
}
async exec(command, options = {}, stream_proxy = null) {
const client = this;
return new Promise((resolve, reject) => {
var conn = new Client();
if (client.options.connection.debug == true) {
client.options.connection.debug = function(msg) {
client.debug(msg);
};
}
conn
.on("error", function(err) {
client.debug("Client :: error");
reject(err);
})
.on("ready", function() {
client.debug("Client :: ready");
conn.exec(command, options, function(err, stream) {
if (err) reject(err);
let stderr;
let stdout;
stream
.on("close", function(code, signal) {
client.debug(
"Stream :: close :: code: " + code + ", signal: " + signal
);
if (stream_proxy) {
stream_proxy.emit("close", ...arguments);
}
resolve({ stderr, stdout, code, signal });
conn.end();
})
.on("data", function(data) {
client.debug("STDOUT: " + data);
if (stream_proxy) {
stream_proxy.stdout.emit("data", ...arguments);
}
if (stdout == undefined) {
stdout = "";
}
stdout = stdout.concat(data);
})
.stderr.on("data", function(data) {
client.debug("STDERR: " + data);
if (stream_proxy) {
stream_proxy.stderr.emit("data", ...arguments);
}
if (stderr == undefined) {
stderr = "";
}
stderr = stderr.concat(data);
});
});
})
.connect(client.options.connection);
if (stream_proxy) {
stream_proxy.on("kill", signal => {
conn.end();
});
}
});
}
}
module.exports.SshClient = SshClient;

1620
src/utils/zfs.js Normal file

File diff suppressed because it is too large Load Diff