support restic and kopia snapshot for *-client and local-hostpath drivers

Signed-off-by: Travis Glenn Hansen <travisghansen@yahoo.com>
This commit is contained in:
Travis Glenn Hansen 2024-03-18 10:51:26 -06:00
parent 6570181506
commit c6c39975f2
1 changed files with 598 additions and 32 deletions

View File

@ -4,9 +4,20 @@ const { GrpcError, grpc } = require("../../utils/grpc");
const cp = require("child_process");
const fs = require("fs");
const fse = require("fs-extra");
const Kopia = require("../../utils/kopia").Kopia;
const os = require("os");
const path = require("path");
const registry = require("../../utils/registry");
const Restic = require("../../utils/restic").Restic;
const semver = require("semver");
const __REGISTRY_NS__ = "ControllerClientCommonDriver";
// https://forum.restic.net/t/how-to-prevent-two-restic-tasks-concurrently/6859/5
const SNAPSHOTS_CUT_IN_FLIGHT = new Set();
const SNAPSHOTS_RESTORE_IN_FLIGHT = new Set();
const DEFAULT_SNAPSHOT_DRIVER = "filecopy";
/**
* Crude nfs-client driver which simply creates directories to be mounted
* and uses rsync for cloning/snapshots
@ -102,6 +113,21 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
//options.service.node.capabilities.rpc.push("VOLUME_MOUNT_GROUP");
}
}
if (this.ctx.args.csiMode.includes("controller")) {
setInterval(() => {
this.ctx.logger.info("snapshots cut in flight", {
names: [...SNAPSHOTS_CUT_IN_FLIGHT],
count: SNAPSHOTS_CUT_IN_FLIGHT.size,
});
}, 30 * 1000);
setInterval(() => {
this.ctx.logger.info("snapshots restore in flight", {
names: [...SNAPSHOTS_RESTORE_IN_FLIGHT],
count: SNAPSHOTS_RESTORE_IN_FLIGHT.size,
});
}, 30 * 1000);
}
}
getAccessModes(capability) {
@ -429,6 +455,90 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
return p.replaceAll(path.posix.sep, path.win32.sep);
}
async getResticClient() {
const driver = this;
return registry.get(`${__REGISTRY_NS__}:restic`, () => {
const config_key = driver.getConfigKey();
const restic_env = _.get(
driver.options[config_key],
"snapshots.restic.env",
{}
);
const restic_global_flags = _.get(
driver.options[config_key],
"snapshots.restic.global_flags",
[]
);
const client = new Restic({
env: restic_env,
logger: driver.ctx.logger,
global_flags: restic_global_flags,
});
let hostname = driver.ctx.args.csiName;
if (driver.options.driver == "local-hostpath") {
let nodename = process.env.CSI_NODE_ID || os.hostname();
hostname = `${hostname}-${nodename}`;
}
return client;
});
}
async getKopiaClient() {
const driver = this;
return registry.getAsync(`${__REGISTRY_NS__}:kopia`, async () => {
const config_key = driver.getConfigKey();
const kopia_env = _.get(
driver.options[config_key],
"snapshots.kopia.env",
{}
);
const kopia_global_flags = _.get(
driver.options[config_key],
"snapshots.kopia.global_flags",
[]
);
const client = new Kopia({
env: kopia_env,
logger: driver.ctx.logger,
global_flags: kopia_global_flags,
});
let hostname = driver.ctx.args.csiName;
if (driver.options.driver == "local-hostpath") {
let nodename = process.env.CSI_NODE_ID || os.hostname();
hostname = `${hostname}-${nodename}`;
}
let username = "democratic-csi";
await client.repositoryConnect([
"--override-hostname",
hostname,
"--override-username",
username,
"from-config",
"--token",
_.get(driver.options[config_key], "snapshots.kopia.config_token", ""),
]);
//let repositoryStatus = await client.repositoryStatus();
//console.log(repositoryStatus);
client.hostname = hostname;
client.username = username;
return client;
});
}
/**
* Create a volume doing in essence the following:
* 1. create directory
@ -442,9 +552,10 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
async CreateVolume(call) {
const driver = this;
let config_key = this.getConfigKey();
let volume_id = await driver.getVolumeIdFromCall(call);
let volume_content_source = call.request.volume_content_source;
const config_key = driver.getConfigKey();
const volume_id = await driver.getVolumeIdFromCall(call);
const volume_content_source = call.request.volume_content_source;
const instance_id = driver.options.instance_id;
if (
call.request.volume_capabilities &&
@ -518,13 +629,117 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
// create dataset
if (volume_content_source) {
let snapshot_driver;
let snapshot_id;
if (volume_content_source.type == "snapshot") {
snapshot_id = volume_content_source.snapshot.snapshot_id;
// get parsed variant of driver to allow snapshotter to work with all
// drivers simultaneously
const parsed_snapshot_id = new URLSearchParams(snapshot_id);
if (parsed_snapshot_id.get("snapshot_driver")) {
snapshot_id = parsed_snapshot_id.get("snapshot_id");
snapshot_driver = parsed_snapshot_id.get("snapshot_driver");
} else {
snapshot_driver = "filecopy";
}
}
switch (volume_content_source.type) {
// must be available when adverstising CREATE_DELETE_SNAPSHOT
// simply clone
case "snapshot":
source_path = driver.getControllerSnapshotPath(
volume_content_source.snapshot.snapshot_id
);
switch (snapshot_driver) {
case "filecopy":
{
source_path = driver.getControllerSnapshotPath(snapshot_id);
if (!(await driver.directoryExists(source_path))) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`invalid volume_content_source path: ${source_path}`
);
}
driver.ctx.logger.debug(
"controller volume source path: %s",
source_path
);
await driver.cloneDir(source_path, volume_path);
}
break;
case "restic":
{
const restic = await driver.getResticClient();
let options = [];
await restic.init();
// find snapshot
options = [snapshot_id];
const snapshots = await restic.snapshots(options);
if (!snapshots.length > 0) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`invalid restic snapshot volume_content_source: ${snapshot_id}`
);
}
const snapshot = snapshots[snapshots.length - 1];
// restore snapshot
// --verify?
options = [
`${snapshot.id}:${snapshot.paths[0]}`,
"--target",
volume_path,
"--sparse",
"--host",
restic.hostname,
];
// technically same snapshot could be getting restored to multiple volumes simultaneously
// ensure we add target path as part of the key
SNAPSHOTS_RESTORE_IN_FLIGHT.add(
`${snapshot_id}:${volume_path}`
);
await restic.restore(options).finally(() => {
SNAPSHOTS_RESTORE_IN_FLIGHT.delete(
`${snapshot_id}:${volume_path}`
);
});
}
break;
case "kopia":
{
const kopia = await driver.getKopiaClient();
const snapshot = await kopia.snapshotGet(snapshot_id);
if (!snapshot) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`invalid restic snapshot volume_content_source: ${snapshot_id}`
);
}
/**
* --[no-]write-files-atomically
* --[no-]write-sparse-files
*/
let options = [
"--write-sparse-files",
snapshot_id,
volume_path,
];
await kopia.snapshotRestore(options);
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown snapthot driver: ${snapshot_driver}`
);
}
break;
// must be available when adverstising CLONE_VOLUME
// create snapshot first, then clone
@ -532,24 +747,26 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
source_path = driver.getControllerVolumePath(
volume_content_source.volume.volume_id
);
if (!(await driver.directoryExists(source_path))) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`invalid volume_content_source path: ${source_path}`
);
}
driver.ctx.logger.debug(
"controller volume source path: %s",
source_path
);
await driver.cloneDir(source_path, volume_path);
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`invalid volume_content_source type: ${volume_content_source.type}`
);
break;
}
if (!(await driver.directoryExists(source_path))) {
throw new GrpcError(
grpc.status.NOT_FOUND,
`invalid volume_content_source path: ${source_path}`
);
}
driver.ctx.logger.debug("controller source path: %s", source_path);
await driver.cloneDir(source_path, volume_path);
}
// set mode
@ -627,7 +844,7 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
async DeleteVolume(call) {
const driver = this;
let volume_id = call.request.volume_id;
const volume_id = call.request.volume_id;
if (!volume_id) {
throw new GrpcError(
@ -728,14 +945,49 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
}
/**
* Create snapshot is meant to be a syncronous call to 'cut' the snapshot
* in the case of rsync/restic/kopia/etc tooling a 'cut' can take a very
* long time. It was deemed appropriate to continue to wait vs making the
* call async with `ready_to_use` false.
*
* Restic:
* With restic the idea is to keep the tree scoped to each volume. Each
* new snapshot for the same volume should have a parent of the most recently
* cut snapshot for the same volume. Behind the scenes restic is applying
* dedup logic globally in the repo so efficiency should still be extremely
* efficient.
*
* Kopia:
*
*
* https://github.com/container-storage-interface/spec/blob/master/spec.md#createsnapshot
*
* @param {*} call
*/
async CreateSnapshot(call) {
const driver = this;
const config_key = driver.getConfigKey();
let snapshot_driver = _.get(
driver.options[config_key],
"snapshots.default_driver",
DEFAULT_SNAPSHOT_DRIVER
);
// randomize driver for testing
//if (process.env.CSI_SANITY == "1") {
// call.request.parameters.driver = ["filecopy", "restic", "kopia"].random();
//}
if (call.request.parameters.driver) {
snapshot_driver = call.request.parameters.driver;
}
const instance_id = driver.options.instance_id;
let response;
// both these are required
let source_volume_id = call.request.source_volume_id;
const source_volume_id = call.request.source_volume_id;
let name = call.request.name;
if (!source_volume_id) {
@ -770,17 +1022,262 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
name = name.replace(/[^a-z0-9_\-:.+]+/gi, "");
driver.ctx.logger.verbose("cleansed snapshot name: %s", name);
const snapshot_id = `${source_volume_id}-${name}`;
const volume_path = driver.getControllerVolumePath(source_volume_id);
const snapshot_path = driver.getControllerSnapshotPath(snapshot_id);
//const volume_path = "/home/thansen/beets/";
//const volume_path = "/var/lib/docker/";
// do NOT overwrite existing snapshot
if (!(await driver.directoryExists(snapshot_path))) {
await driver.cloneDir(volume_path, snapshot_path);
let snapshot_id;
let size_bytes = 0;
let ready_to_use = true;
let snapshot_date = new Date();
switch (snapshot_driver) {
case "filecopy":
{
snapshot_id = `${source_volume_id}-${name}`;
const snapshot_path = driver.getControllerSnapshotPath(snapshot_id);
const snapshot_dir_exists = await driver.directoryExists(
snapshot_path
);
// do NOT overwrite existing snapshot
if (!snapshot_dir_exists) {
SNAPSHOTS_CUT_IN_FLIGHT.add(name);
await driver.cloneDir(volume_path, snapshot_path).finally(() => {
SNAPSHOTS_CUT_IN_FLIGHT.delete(name);
});
driver.ctx.logger.info(
`filecopy backup finished: snapshot_id=${snapshot_id}, path=${volume_path}`
);
} else {
driver.ctx.logger.debug(
`filecopy backup already cut: ${snapshot_id}`
);
}
size_bytes = await driver.getDirectoryUsage(snapshot_path);
}
break;
case "restic":
{
const restic = await driver.getResticClient();
const group_by_options = ["--group-by", "host,paths,tags"];
let snapshot_exists = false;
// --tag specified multiple times is OR logic, comma-separated is AND logic
let base_tag_option = `source=democratic-csi`;
base_tag_option += `,csi_volume_id=${source_volume_id}`;
if (instance_id) {
base_tag_option += `csi_instance_id=${instance_id}`;
}
let options = [];
/**
* ensure repo has been initted
*
* it is expected that at a minimum the following env vars are set
* RESTIC_PASSWORD
* RESTIC_REPOSITORY
*/
options = [];
await restic.init();
// see if snapshot already exist with matching tags, etc
options = [
"--path",
volume_path.replace(/\/$/, ""),
"--host",
restic.hostname,
];
// when searching for existing snapshot include name
response = await restic.snapshots(
options
.concat(group_by_options)
.concat(["--tag", base_tag_option + `,csi_snapshot_name=${name}`])
);
if (response.length > 0) {
snapshot_exists = true;
const snapshot = response[response.length - 1];
driver.ctx.logger.debug(
`restic backup already cut: ${snapshot.id}`
);
const stats = await restic.stats([snapshot.id]);
snapshot_id = snapshot.id;
snapshot_date = new Date(snapshot.time);
size_bytes = stats.total_size;
}
if (!snapshot_exists) {
// --no-scan do not run scanner to estimate size of backup
// -x, --one-file-system exclude other file systems, don't cross filesystem boundaries and subvolumes
options = [
"--host",
restic.hostname,
"--one-file-system",
//"--no-scan",
];
// backup with minimal tags to ensure a sane parent for the volume (since tags are included in group_by)
SNAPSHOTS_CUT_IN_FLIGHT.add(name);
response = await restic
.backup(
volume_path,
options
.concat(group_by_options)
.concat(["--tag", base_tag_option])
)
.finally(() => {
SNAPSHOTS_CUT_IN_FLIGHT.delete(name);
});
response.parsed.reverse();
let summary = response.parsed.find((message) => {
return message.message_type == "summary";
});
snapshot_id = summary.snapshot_id;
driver.ctx.logger.info(
`restic backup finished: snapshot_id=${snapshot_id}, path=${volume_path}, total_duration=${
summary.total_duration | 0
}s`
);
const stats = await restic.stats([snapshot_id]);
size_bytes = stats.total_size;
// only apply these tags at creation, do NOT use for search above etc
let add_tags = `csi_snapshot_name=${name}`;
let config_tags = _.get(
driver.options[config_key],
"snapshots.restic.tags",
[]
);
if (config_tags.length > 0) {
add_tags += `,${config_tags.join(",")}`;
}
await restic.tag([
"--path",
volume_path.replace(/\/$/, ""),
"--host",
restic.hostname,
"--add",
add_tags,
snapshot_id,
]);
// this is ugly, the tag operation should output the new id, so we
// must resort to full query of all snapshots for the volume
// find snapshot using `original` id as adding tags creates a new id
options = [
"--path",
volume_path.replace(/\/$/, ""),
"--host",
restic.hostname,
];
response = await restic.snapshots(
options
.concat(group_by_options)
.concat([
"--tag",
`${base_tag_option},csi_snapshot_name=${name}`,
])
);
let original_snapshot_id = snapshot_id;
let snapshot = response.find((snapshot) => {
return snapshot.original == original_snapshot_id;
});
if (!snapshot) {
throw new GrpcError(
grpc.status.UNKNOWN,
`failed to find snapshot post-tag operation: snapshot_id=${original_snapshot_id}`
);
}
snapshot_id = snapshot.id;
driver.ctx.logger.info(
`restic backup successfully applied additional tags: new_snapshot_id=${snapshot_id}, original_snapshot_id=${original_snapshot_id} path=${volume_path}`
);
}
}
break;
case "kopia":
{
const kopia = await driver.getKopiaClient();
let options = [];
let snapshot_exists = false;
// --tags specified multiple times means snapshot must contain ALL supplied tags
let tags = [];
tags.push(`source:democratic-csi`);
tags.push(`csi_volume_id:${source_volume_id}`);
if (instance_id) {
tags.push(`csi_instance_id:${instance_id}`);
}
tags.push(`csi_snapshot_name:${name}`);
options = ["--no-storage-stats", "--no-delta"];
tags.forEach((item) => {
options.push("--tags", item);
});
options.push(
`${kopia.username}@${kopia.hostname}:${volume_path.replace(
/\/$/,
""
)}`
);
response = await kopia.snapshotList(options);
if (response.length > 0) {
snapshot_exists = true;
const snapshot = response[response.length - 1];
driver.ctx.logger.debug(
`kopia snapshot already cut: ${snapshot.id}`
);
snapshot_id = snapshot.id;
snapshot_date = new Date(snapshot.startTime); // maybe use endTime?
size_bytes = snapshot.stats.totalSize;
}
if (!snapshot_exists) {
// create snapshot
options = [];
tags.forEach((item) => {
options.push("--tags", item);
});
options.push(volume_path);
SNAPSHOTS_CUT_IN_FLIGHT.add(name);
response = await kopia.snapshotCreate(options).finally(() => {
SNAPSHOTS_CUT_IN_FLIGHT.delete(name);
});
snapshot_id = response.id;
snapshot_date = new Date(response.startTime); // maybe use endTime?
let snapshot_end_date = new Date(response.endTime);
let total_duration =
Math.abs(snapshot_end_date.getTime() - snapshot_date.getTime()) /
1000;
size_bytes = response.rootEntry.summ.size;
driver.ctx.logger.info(
`kopia backup finished: snapshot_id=${snapshot_id}, path=${volume_path}, total_duration=${
total_duration | 0
}s`
);
}
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown snapthot driver: ${snapshot_driver}`
);
}
let size_bytes = await driver.getDirectoryUsage(snapshot_path);
return {
snapshot: {
/**
@ -788,14 +1285,17 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
* is needed to create a volume from this snapshot.
*/
size_bytes,
snapshot_id,
snapshot_id: new URLSearchParams({
snapshot_driver,
snapshot_id,
}).toString(),
source_volume_id: source_volume_id,
//https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/timestamp.proto
creation_time: {
seconds: Math.round(new Date().getTime() / 1000),
seconds: Math.round(snapshot_date.getTime() / 1000),
nanos: 0,
},
ready_to_use: true,
ready_to_use,
},
};
}
@ -809,7 +1309,11 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
async DeleteSnapshot(call) {
const driver = this;
const snapshot_id = call.request.snapshot_id;
let snapshot_id = call.request.snapshot_id;
let snapshot_driver;
const config_key = driver.getConfigKey();
const instance_id = driver.options.instance_id;
let response;
if (!snapshot_id) {
throw new GrpcError(
@ -818,8 +1322,70 @@ class ControllerClientCommonDriver extends CsiBaseDriver {
);
}
const snapshot_path = driver.getControllerSnapshotPath(snapshot_id);
await driver.deleteDir(snapshot_path);
// get parsed variant of driver to allow snapshotter to work with all
// drivers simultaneously
const parsed_snapshot_id = new URLSearchParams(snapshot_id);
if (parsed_snapshot_id.get("snapshot_driver")) {
snapshot_id = parsed_snapshot_id.get("snapshot_id");
snapshot_driver = parsed_snapshot_id.get("snapshot_driver");
} else {
snapshot_driver = "filecopy";
}
switch (snapshot_driver) {
case "filecopy":
{
const snapshot_path = driver.getControllerSnapshotPath(snapshot_id);
await driver.deleteDir(snapshot_path);
}
break;
case "restic":
{
let prune = _.get(
driver.options[config_key],
"snapshots.restic.prune",
false
);
if (typeof prune != "boolean") {
prune = String(prune);
if (["true", "yes", "1"].includes(prune.toLowerCase())) {
prune = true;
} else {
prune = false;
}
}
const restic = await driver.getResticClient();
let options = [];
await restic.init();
// we preempt with this check to prevent locking the repo when snapshot does not exist
const snapshot_exists = await restic.snapshot_exists(snapshot_id);
if (snapshot_exists) {
options = [];
if (prune) {
options.push("--prune");
}
options.push(snapshot_id);
await restic.forget(options);
}
}
break;
case "kopia":
{
const kopia = await driver.getKopiaClient();
let options = [snapshot_id];
await kopia.snapshotDelete(options);
}
break;
default:
throw new GrpcError(
grpc.status.INVALID_ARGUMENT,
`unknown snapthot driver: ${snapshot_driver}`
);
}
return {};
}