tart {clone,pull}: make deduplication opt-in (#924)

This commit is contained in:
Nikolay Edigaryev 2024-10-25 15:56:38 +02:00 committed by GitHub
parent 3bf0bb22f3
commit b52a857698
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 41 additions and 21 deletions

View File

@ -28,6 +28,9 @@ struct Clone: AsyncParsableCommand {
@Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry")
var concurrency: UInt = 4
@Flag(help: .hidden)
var deduplicate: Bool = false
func validate() throws {
if newName.contains("/") {
throw ValidationError("<new-name> should be a local name")
@ -45,7 +48,7 @@ struct Clone: AsyncParsableCommand {
if let remoteName = try? RemoteName(sourceName), !ociStorage.exists(remoteName) {
// Pull the VM in case it's OCI-based and doesn't exist locally yet
let registry = try Registry(host: remoteName.host, namespace: remoteName.namespace, insecure: insecure)
try await ociStorage.pull(remoteName, registry: registry, concurrency: concurrency)
try await ociStorage.pull(remoteName, registry: registry, concurrency: concurrency, deduplicate: deduplicate)
}
let sourceVM = try VMStorageHelper.open(sourceName)

View File

@ -23,6 +23,9 @@ struct Pull: AsyncParsableCommand {
@Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry")
var concurrency: UInt = 4
@Flag(help: .hidden)
var deduplicate: Bool = false
func validate() throws {
if concurrency < 1 {
throw ValidationError("network concurrency cannot be less than 1")
@ -43,6 +46,6 @@ struct Pull: AsyncParsableCommand {
defaultLogger.appendNewLine("pulling \(remoteName)...")
try await VMStorageOCI().pull(remoteName, registry: registry, concurrency: concurrency)
try await VMStorageOCI().pull(remoteName, registry: registry, concurrency: concurrency, deduplicate: deduplicate)
}
}

View File

@ -2,5 +2,5 @@ import Foundation
protocol Disk {
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer]
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?) async throws
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?, deduplicate: Bool) async throws
}

View File

@ -45,7 +45,7 @@ class DiskV1: Disk {
return pushedLayers
}
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil, deduplicate: Bool = false) async throws {
if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
throw OCIError.FailedToCreateVmFile
}

View File

@ -69,12 +69,12 @@ class DiskV2: Disk {
}
}
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil, deduplicate: Bool = false) async throws {
// Support resumable pulls
let pullResumed = FileManager.default.fileExists(atPath: diskURL.path)
if !pullResumed {
if let localLayerCache = localLayerCache {
if deduplicate, let localLayerCache = localLayerCache {
// Clone the local layer cache's disk and use it as a base, potentially
// reducing the space usage since some blocks won't be written at all
try FileManager.default.copyItem(at: localLayerCache.diskURL, to: diskURL)
@ -151,26 +151,31 @@ class DiskV2: Disk {
// Also open the disk file for reading and verifying
// its contents in case the local layer cache is used
let rdisk: FileHandle? = if localLayerCache != nil {
let rdisk: FileHandle? = if deduplicate && localLayerCache != nil {
try FileHandle(forReadingFrom: diskURL)
} else {
nil
}
// Check if we already have this layer contents in the local layer cache
if let localLayerCache = localLayerCache, let localLayerInfo = localLayerCache.findInfo(digest: diskLayer.digest, offsetHint: diskWritingOffset) {
// indicates that the locally cloned disk image has the same content at the given offset
let localHit = localLayerInfo.uncompressedContentDigest == uncompressedLayerContentDigest
&& localLayerInfo.range.lowerBound == diskWritingOffset
// doesn't seem that localHit can ever be false if the localLayerCache is not nil
// but let's just add extra safety here and check it
if !localHit {
// Check if we already have this layer contents in the local layer cache,
// or perhaps even on the cloned disk (when the deduplication is enabled)
if let localLayerCache = localLayerCache,
let localLayerInfo = localLayerCache.findInfo(digest: diskLayer.digest, offsetHint: diskWritingOffset),
localLayerInfo.uncompressedContentDigest == uncompressedLayerContentDigest {
if deduplicate && localLayerInfo.range.lowerBound == diskWritingOffset {
// Do nothing, because the data is already on the disk that we've inherited from
} else {
// Fulfil the layer contents from the local blob cache
let data = localLayerCache.subdata(localLayerInfo.range)
_ = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
}
try disk.close()
if let rdisk = rdisk {
try rdisk.close()
}
// Update the progress
progress.completedUnitCount += Int64(diskLayer.size)
@ -198,6 +203,10 @@ class DiskV2: Disk {
try filter.finalize()
try disk.close()
if let rdisk = rdisk {
try rdisk.close()
}
}
globalDiskWritingOffset += uncompressedLayerSize

View File

@ -11,7 +11,7 @@ enum OCIError: Error {
}
extension VMDirectory {
func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?) async throws {
func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?, deduplicate: Bool) async throws {
// Pull VM's config file layer and re-serialize it into a config file
let configLayers = manifest.layers.filter {
$0.mediaType == configMediaType
@ -54,12 +54,13 @@ extension VMDirectory {
do {
try await diskImplType.pull(registry: registry, diskLayers: layers, diskURL: diskURL,
concurrency: concurrency, progress: progress,
localLayerCache: localLayerCache)
localLayerCache: localLayerCache,
deduplicate: deduplicate)
} catch let error where error is FilterError {
throw RuntimeError.PullFailed("failed to decompress disk: \(error.localizedDescription)")
}
if let llc = localLayerCache {
if deduplicate, let llc = localLayerCache {
// set custom attribute to remember deduplicated bytes
diskURL.setDeduplicatedBytes(llc.deduplicatedBytes)
}

View File

@ -140,7 +140,7 @@ class VMStorageOCI: PrunableStorage {
try list().filter { (_, _, isSymlink) in !isSymlink }.map { (_, vmDir, _) in vmDir }
}
func pull(_ name: RemoteName, registry: Registry, concurrency: UInt) async throws {
func pull(_ name: RemoteName, registry: Registry, concurrency: UInt, deduplicate: Bool) async throws {
SentrySDK.configureScope { scope in
scope.setContext(value: ["imageName": name.description], key: "OCI")
}
@ -203,10 +203,14 @@ class VMStorageOCI: PrunableStorage {
if let llc = localLayerCache {
let deduplicatedHuman = ByteCountFormatter.string(fromByteCount: Int64(llc.deduplicatedBytes), countStyle: .file)
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
if deduplicate {
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
} else {
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to avoid fetching \(deduplicatedHuman), will try use it...")
}
}
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache)
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache, deduplicate: deduplicate)
} recoverFromFailure: { error in
if error is Retryable {
print("Error: \(error.localizedDescription)")