diff --git a/src/driver/controller-client-common/index.js b/src/driver/controller-client-common/index.js index 434213b..fe7b7b6 100644 --- a/src/driver/controller-client-common/index.js +++ b/src/driver/controller-client-common/index.js @@ -4,9 +4,20 @@ const { GrpcError, grpc } = require("../../utils/grpc"); const cp = require("child_process"); const fs = require("fs"); const fse = require("fs-extra"); +const Kopia = require("../../utils/kopia").Kopia; +const os = require("os"); const path = require("path"); +const registry = require("../../utils/registry"); +const Restic = require("../../utils/restic").Restic; const semver = require("semver"); +const __REGISTRY_NS__ = "ControllerClientCommonDriver"; + +// https://forum.restic.net/t/how-to-prevent-two-restic-tasks-concurrently/6859/5 +const SNAPSHOTS_CUT_IN_FLIGHT = new Set(); +const SNAPSHOTS_RESTORE_IN_FLIGHT = new Set(); +const DEFAULT_SNAPSHOT_DRIVER = "filecopy"; + /** * Crude nfs-client driver which simply creates directories to be mounted * and uses rsync for cloning/snapshots @@ -102,6 +113,21 @@ class ControllerClientCommonDriver extends CsiBaseDriver { //options.service.node.capabilities.rpc.push("VOLUME_MOUNT_GROUP"); } } + + if (this.ctx.args.csiMode.includes("controller")) { + setInterval(() => { + this.ctx.logger.info("snapshots cut in flight", { + names: [...SNAPSHOTS_CUT_IN_FLIGHT], + count: SNAPSHOTS_CUT_IN_FLIGHT.size, + }); + }, 30 * 1000); + setInterval(() => { + this.ctx.logger.info("snapshots restore in flight", { + names: [...SNAPSHOTS_RESTORE_IN_FLIGHT], + count: SNAPSHOTS_RESTORE_IN_FLIGHT.size, + }); + }, 30 * 1000); + } } getAccessModes(capability) { @@ -429,6 +455,90 @@ class ControllerClientCommonDriver extends CsiBaseDriver { return p.replaceAll(path.posix.sep, path.win32.sep); } + async getResticClient() { + const driver = this; + + return registry.get(`${__REGISTRY_NS__}:restic`, () => { + const config_key = driver.getConfigKey(); + + const restic_env = _.get( + driver.options[config_key], + "snapshots.restic.env", + {} + ); + + const restic_global_flags = _.get( + driver.options[config_key], + "snapshots.restic.global_flags", + [] + ); + const client = new Restic({ + env: restic_env, + logger: driver.ctx.logger, + global_flags: restic_global_flags, + }); + + let hostname = driver.ctx.args.csiName; + if (driver.options.driver == "local-hostpath") { + let nodename = process.env.CSI_NODE_ID || os.hostname(); + hostname = `${hostname}-${nodename}`; + } + + return client; + }); + } + + async getKopiaClient() { + const driver = this; + + return registry.getAsync(`${__REGISTRY_NS__}:kopia`, async () => { + const config_key = driver.getConfigKey(); + + const kopia_env = _.get( + driver.options[config_key], + "snapshots.kopia.env", + {} + ); + + const kopia_global_flags = _.get( + driver.options[config_key], + "snapshots.kopia.global_flags", + [] + ); + const client = new Kopia({ + env: kopia_env, + logger: driver.ctx.logger, + global_flags: kopia_global_flags, + }); + + let hostname = driver.ctx.args.csiName; + if (driver.options.driver == "local-hostpath") { + let nodename = process.env.CSI_NODE_ID || os.hostname(); + hostname = `${hostname}-${nodename}`; + } + + let username = "democratic-csi"; + + await client.repositoryConnect([ + "--override-hostname", + hostname, + "--override-username", + username, + "from-config", + "--token", + _.get(driver.options[config_key], "snapshots.kopia.config_token", ""), + ]); + + //let repositoryStatus = await client.repositoryStatus(); + //console.log(repositoryStatus); + + client.hostname = hostname; + client.username = username; + + return client; + }); + } + /** * Create a volume doing in essence the following: * 1. create directory @@ -442,9 +552,10 @@ class ControllerClientCommonDriver extends CsiBaseDriver { async CreateVolume(call) { const driver = this; - let config_key = this.getConfigKey(); - let volume_id = await driver.getVolumeIdFromCall(call); - let volume_content_source = call.request.volume_content_source; + const config_key = driver.getConfigKey(); + const volume_id = await driver.getVolumeIdFromCall(call); + const volume_content_source = call.request.volume_content_source; + const instance_id = driver.options.instance_id; if ( call.request.volume_capabilities && @@ -518,13 +629,117 @@ class ControllerClientCommonDriver extends CsiBaseDriver { // create dataset if (volume_content_source) { + let snapshot_driver; + let snapshot_id; + + if (volume_content_source.type == "snapshot") { + snapshot_id = volume_content_source.snapshot.snapshot_id; + + // get parsed variant of driver to allow snapshotter to work with all + // drivers simultaneously + const parsed_snapshot_id = new URLSearchParams(snapshot_id); + if (parsed_snapshot_id.get("snapshot_driver")) { + snapshot_id = parsed_snapshot_id.get("snapshot_id"); + snapshot_driver = parsed_snapshot_id.get("snapshot_driver"); + } else { + snapshot_driver = "filecopy"; + } + } + switch (volume_content_source.type) { // must be available when adverstising CREATE_DELETE_SNAPSHOT // simply clone case "snapshot": - source_path = driver.getControllerSnapshotPath( - volume_content_source.snapshot.snapshot_id - ); + switch (snapshot_driver) { + case "filecopy": + { + source_path = driver.getControllerSnapshotPath(snapshot_id); + + if (!(await driver.directoryExists(source_path))) { + throw new GrpcError( + grpc.status.NOT_FOUND, + `invalid volume_content_source path: ${source_path}` + ); + } + + driver.ctx.logger.debug( + "controller volume source path: %s", + source_path + ); + await driver.cloneDir(source_path, volume_path); + } + break; + case "restic": + { + const restic = await driver.getResticClient(); + + let options = []; + await restic.init(); + + // find snapshot + options = [snapshot_id]; + const snapshots = await restic.snapshots(options); + if (!snapshots.length > 0) { + throw new GrpcError( + grpc.status.NOT_FOUND, + `invalid restic snapshot volume_content_source: ${snapshot_id}` + ); + } + const snapshot = snapshots[snapshots.length - 1]; + + // restore snapshot + // --verify? + options = [ + `${snapshot.id}:${snapshot.paths[0]}`, + "--target", + volume_path, + "--sparse", + "--host", + restic.hostname, + ]; + + // technically same snapshot could be getting restored to multiple volumes simultaneously + // ensure we add target path as part of the key + SNAPSHOTS_RESTORE_IN_FLIGHT.add( + `${snapshot_id}:${volume_path}` + ); + await restic.restore(options).finally(() => { + SNAPSHOTS_RESTORE_IN_FLIGHT.delete( + `${snapshot_id}:${volume_path}` + ); + }); + } + break; + case "kopia": + { + const kopia = await driver.getKopiaClient(); + const snapshot = await kopia.snapshotGet(snapshot_id); + + if (!snapshot) { + throw new GrpcError( + grpc.status.NOT_FOUND, + `invalid restic snapshot volume_content_source: ${snapshot_id}` + ); + } + + /** + * --[no-]write-files-atomically + * --[no-]write-sparse-files + */ + let options = [ + "--write-sparse-files", + snapshot_id, + volume_path, + ]; + await kopia.snapshotRestore(options); + } + break; + default: + throw new GrpcError( + grpc.status.INVALID_ARGUMENT, + `unknown snapthot driver: ${snapshot_driver}` + ); + } break; // must be available when adverstising CLONE_VOLUME // create snapshot first, then clone @@ -532,24 +747,26 @@ class ControllerClientCommonDriver extends CsiBaseDriver { source_path = driver.getControllerVolumePath( volume_content_source.volume.volume_id ); + + if (!(await driver.directoryExists(source_path))) { + throw new GrpcError( + grpc.status.NOT_FOUND, + `invalid volume_content_source path: ${source_path}` + ); + } + + driver.ctx.logger.debug( + "controller volume source path: %s", + source_path + ); + await driver.cloneDir(source_path, volume_path); break; default: throw new GrpcError( grpc.status.INVALID_ARGUMENT, `invalid volume_content_source type: ${volume_content_source.type}` ); - break; } - - if (!(await driver.directoryExists(source_path))) { - throw new GrpcError( - grpc.status.NOT_FOUND, - `invalid volume_content_source path: ${source_path}` - ); - } - - driver.ctx.logger.debug("controller source path: %s", source_path); - await driver.cloneDir(source_path, volume_path); } // set mode @@ -627,7 +844,7 @@ class ControllerClientCommonDriver extends CsiBaseDriver { async DeleteVolume(call) { const driver = this; - let volume_id = call.request.volume_id; + const volume_id = call.request.volume_id; if (!volume_id) { throw new GrpcError( @@ -728,14 +945,49 @@ class ControllerClientCommonDriver extends CsiBaseDriver { } /** + * Create snapshot is meant to be a syncronous call to 'cut' the snapshot + * in the case of rsync/restic/kopia/etc tooling a 'cut' can take a very + * long time. It was deemed appropriate to continue to wait vs making the + * call async with `ready_to_use` false. + * + * Restic: + * With restic the idea is to keep the tree scoped to each volume. Each + * new snapshot for the same volume should have a parent of the most recently + * cut snapshot for the same volume. Behind the scenes restic is applying + * dedup logic globally in the repo so efficiency should still be extremely + * efficient. + * + * Kopia: + * + * + * https://github.com/container-storage-interface/spec/blob/master/spec.md#createsnapshot * * @param {*} call */ async CreateSnapshot(call) { const driver = this; + const config_key = driver.getConfigKey(); + let snapshot_driver = _.get( + driver.options[config_key], + "snapshots.default_driver", + DEFAULT_SNAPSHOT_DRIVER + ); + + // randomize driver for testing + //if (process.env.CSI_SANITY == "1") { + // call.request.parameters.driver = ["filecopy", "restic", "kopia"].random(); + //} + + if (call.request.parameters.driver) { + snapshot_driver = call.request.parameters.driver; + } + + const instance_id = driver.options.instance_id; + let response; + // both these are required - let source_volume_id = call.request.source_volume_id; + const source_volume_id = call.request.source_volume_id; let name = call.request.name; if (!source_volume_id) { @@ -770,17 +1022,262 @@ class ControllerClientCommonDriver extends CsiBaseDriver { name = name.replace(/[^a-z0-9_\-:.+]+/gi, ""); driver.ctx.logger.verbose("cleansed snapshot name: %s", name); - - const snapshot_id = `${source_volume_id}-${name}`; const volume_path = driver.getControllerVolumePath(source_volume_id); - const snapshot_path = driver.getControllerSnapshotPath(snapshot_id); + //const volume_path = "/home/thansen/beets/"; + //const volume_path = "/var/lib/docker/"; - // do NOT overwrite existing snapshot - if (!(await driver.directoryExists(snapshot_path))) { - await driver.cloneDir(volume_path, snapshot_path); + let snapshot_id; + let size_bytes = 0; + let ready_to_use = true; + let snapshot_date = new Date(); + + switch (snapshot_driver) { + case "filecopy": + { + snapshot_id = `${source_volume_id}-${name}`; + const snapshot_path = driver.getControllerSnapshotPath(snapshot_id); + const snapshot_dir_exists = await driver.directoryExists( + snapshot_path + ); + // do NOT overwrite existing snapshot + if (!snapshot_dir_exists) { + SNAPSHOTS_CUT_IN_FLIGHT.add(name); + await driver.cloneDir(volume_path, snapshot_path).finally(() => { + SNAPSHOTS_CUT_IN_FLIGHT.delete(name); + }); + driver.ctx.logger.info( + `filecopy backup finished: snapshot_id=${snapshot_id}, path=${volume_path}` + ); + } else { + driver.ctx.logger.debug( + `filecopy backup already cut: ${snapshot_id}` + ); + } + + size_bytes = await driver.getDirectoryUsage(snapshot_path); + } + break; + case "restic": + { + const restic = await driver.getResticClient(); + const group_by_options = ["--group-by", "host,paths,tags"]; + let snapshot_exists = false; + + // --tag specified multiple times is OR logic, comma-separated is AND logic + let base_tag_option = `source=democratic-csi`; + base_tag_option += `,csi_volume_id=${source_volume_id}`; + if (instance_id) { + base_tag_option += `csi_instance_id=${instance_id}`; + } + + let options = []; + + /** + * ensure repo has been initted + * + * it is expected that at a minimum the following env vars are set + * RESTIC_PASSWORD + * RESTIC_REPOSITORY + */ + options = []; + await restic.init(); + + // see if snapshot already exist with matching tags, etc + options = [ + "--path", + volume_path.replace(/\/$/, ""), + "--host", + restic.hostname, + ]; + + // when searching for existing snapshot include name + response = await restic.snapshots( + options + .concat(group_by_options) + .concat(["--tag", base_tag_option + `,csi_snapshot_name=${name}`]) + ); + + if (response.length > 0) { + snapshot_exists = true; + const snapshot = response[response.length - 1]; + driver.ctx.logger.debug( + `restic backup already cut: ${snapshot.id}` + ); + const stats = await restic.stats([snapshot.id]); + + snapshot_id = snapshot.id; + snapshot_date = new Date(snapshot.time); + size_bytes = stats.total_size; + } + + if (!snapshot_exists) { + // --no-scan do not run scanner to estimate size of backup + // -x, --one-file-system exclude other file systems, don't cross filesystem boundaries and subvolumes + options = [ + "--host", + restic.hostname, + "--one-file-system", + //"--no-scan", + ]; + + // backup with minimal tags to ensure a sane parent for the volume (since tags are included in group_by) + SNAPSHOTS_CUT_IN_FLIGHT.add(name); + response = await restic + .backup( + volume_path, + options + .concat(group_by_options) + .concat(["--tag", base_tag_option]) + ) + .finally(() => { + SNAPSHOTS_CUT_IN_FLIGHT.delete(name); + }); + response.parsed.reverse(); + let summary = response.parsed.find((message) => { + return message.message_type == "summary"; + }); + snapshot_id = summary.snapshot_id; + driver.ctx.logger.info( + `restic backup finished: snapshot_id=${snapshot_id}, path=${volume_path}, total_duration=${ + summary.total_duration | 0 + }s` + ); + const stats = await restic.stats([snapshot_id]); + size_bytes = stats.total_size; + + // only apply these tags at creation, do NOT use for search above etc + let add_tags = `csi_snapshot_name=${name}`; + let config_tags = _.get( + driver.options[config_key], + "snapshots.restic.tags", + [] + ); + + if (config_tags.length > 0) { + add_tags += `,${config_tags.join(",")}`; + } + + await restic.tag([ + "--path", + volume_path.replace(/\/$/, ""), + "--host", + restic.hostname, + "--add", + add_tags, + snapshot_id, + ]); + + // this is ugly, the tag operation should output the new id, so we + // must resort to full query of all snapshots for the volume + // find snapshot using `original` id as adding tags creates a new id + options = [ + "--path", + volume_path.replace(/\/$/, ""), + "--host", + restic.hostname, + ]; + response = await restic.snapshots( + options + .concat(group_by_options) + .concat([ + "--tag", + `${base_tag_option},csi_snapshot_name=${name}`, + ]) + ); + let original_snapshot_id = snapshot_id; + let snapshot = response.find((snapshot) => { + return snapshot.original == original_snapshot_id; + }); + if (!snapshot) { + throw new GrpcError( + grpc.status.UNKNOWN, + `failed to find snapshot post-tag operation: snapshot_id=${original_snapshot_id}` + ); + } + snapshot_id = snapshot.id; + driver.ctx.logger.info( + `restic backup successfully applied additional tags: new_snapshot_id=${snapshot_id}, original_snapshot_id=${original_snapshot_id} path=${volume_path}` + ); + } + } + break; + case "kopia": + { + const kopia = await driver.getKopiaClient(); + let options = []; + + let snapshot_exists = false; + + // --tags specified multiple times means snapshot must contain ALL supplied tags + let tags = []; + tags.push(`source:democratic-csi`); + tags.push(`csi_volume_id:${source_volume_id}`); + if (instance_id) { + tags.push(`csi_instance_id:${instance_id}`); + } + tags.push(`csi_snapshot_name:${name}`); + + options = ["--no-storage-stats", "--no-delta"]; + tags.forEach((item) => { + options.push("--tags", item); + }); + + options.push( + `${kopia.username}@${kopia.hostname}:${volume_path.replace( + /\/$/, + "" + )}` + ); + + response = await kopia.snapshotList(options); + + if (response.length > 0) { + snapshot_exists = true; + const snapshot = response[response.length - 1]; + driver.ctx.logger.debug( + `kopia snapshot already cut: ${snapshot.id}` + ); + + snapshot_id = snapshot.id; + snapshot_date = new Date(snapshot.startTime); // maybe use endTime? + size_bytes = snapshot.stats.totalSize; + } + + if (!snapshot_exists) { + // create snapshot + options = []; + tags.forEach((item) => { + options.push("--tags", item); + }); + options.push(volume_path); + SNAPSHOTS_CUT_IN_FLIGHT.add(name); + response = await kopia.snapshotCreate(options).finally(() => { + SNAPSHOTS_CUT_IN_FLIGHT.delete(name); + }); + + snapshot_id = response.id; + snapshot_date = new Date(response.startTime); // maybe use endTime? + let snapshot_end_date = new Date(response.endTime); + let total_duration = + Math.abs(snapshot_end_date.getTime() - snapshot_date.getTime()) / + 1000; + size_bytes = response.rootEntry.summ.size; + + driver.ctx.logger.info( + `kopia backup finished: snapshot_id=${snapshot_id}, path=${volume_path}, total_duration=${ + total_duration | 0 + }s` + ); + } + } + break; + default: + throw new GrpcError( + grpc.status.INVALID_ARGUMENT, + `unknown snapthot driver: ${snapshot_driver}` + ); } - let size_bytes = await driver.getDirectoryUsage(snapshot_path); return { snapshot: { /** @@ -788,14 +1285,17 @@ class ControllerClientCommonDriver extends CsiBaseDriver { * is needed to create a volume from this snapshot. */ size_bytes, - snapshot_id, + snapshot_id: new URLSearchParams({ + snapshot_driver, + snapshot_id, + }).toString(), source_volume_id: source_volume_id, //https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/timestamp.proto creation_time: { - seconds: Math.round(new Date().getTime() / 1000), + seconds: Math.round(snapshot_date.getTime() / 1000), nanos: 0, }, - ready_to_use: true, + ready_to_use, }, }; } @@ -809,7 +1309,11 @@ class ControllerClientCommonDriver extends CsiBaseDriver { async DeleteSnapshot(call) { const driver = this; - const snapshot_id = call.request.snapshot_id; + let snapshot_id = call.request.snapshot_id; + let snapshot_driver; + const config_key = driver.getConfigKey(); + const instance_id = driver.options.instance_id; + let response; if (!snapshot_id) { throw new GrpcError( @@ -818,8 +1322,70 @@ class ControllerClientCommonDriver extends CsiBaseDriver { ); } - const snapshot_path = driver.getControllerSnapshotPath(snapshot_id); - await driver.deleteDir(snapshot_path); + // get parsed variant of driver to allow snapshotter to work with all + // drivers simultaneously + const parsed_snapshot_id = new URLSearchParams(snapshot_id); + if (parsed_snapshot_id.get("snapshot_driver")) { + snapshot_id = parsed_snapshot_id.get("snapshot_id"); + snapshot_driver = parsed_snapshot_id.get("snapshot_driver"); + } else { + snapshot_driver = "filecopy"; + } + + switch (snapshot_driver) { + case "filecopy": + { + const snapshot_path = driver.getControllerSnapshotPath(snapshot_id); + await driver.deleteDir(snapshot_path); + } + break; + case "restic": + { + let prune = _.get( + driver.options[config_key], + "snapshots.restic.prune", + false + ); + + if (typeof prune != "boolean") { + prune = String(prune); + if (["true", "yes", "1"].includes(prune.toLowerCase())) { + prune = true; + } else { + prune = false; + } + } + + const restic = await driver.getResticClient(); + + let options = []; + await restic.init(); + + // we preempt with this check to prevent locking the repo when snapshot does not exist + const snapshot_exists = await restic.snapshot_exists(snapshot_id); + if (snapshot_exists) { + options = []; + if (prune) { + options.push("--prune"); + } + options.push(snapshot_id); + await restic.forget(options); + } + } + break; + case "kopia": + { + const kopia = await driver.getKopiaClient(); + let options = [snapshot_id]; + await kopia.snapshotDelete(options); + } + break; + default: + throw new GrpcError( + grpc.status.INVALID_ARGUMENT, + `unknown snapthot driver: ${snapshot_driver}` + ); + } return {}; }