From 3ff4fc34c6dc1bebacee641b541ea3cf33bed23b Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Wed, 20 Sep 2023 18:14:05 +0400 Subject: [PATCH] Improved format for fast and efficient pulls from remote OCI-registry (#589) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Improved format for fast and efficient pulls from remote OCI-registry * Tests: introduce fileWithRandomData() helper function * Remove useless continuation * --concurrency should be an option, not an argument * --v2-disk-format → --old-disk-format and use the new V2 by default * Reduce LZ4 buffer size from 64 to 4 MiB * --old-disk-format → --disk-format=... --- Sources/tart/Commands/Clone.swift | 9 +- Sources/tart/Commands/Pull.swift | 11 +- Sources/tart/Commands/Push.swift | 6 +- Sources/tart/OCI/Digest.swift | 36 ++++++ Sources/tart/OCI/Layerizer/Disk.swift | 6 + Sources/tart/OCI/Layerizer/DiskV1.swift | 75 ++++++++++++ Sources/tart/OCI/Layerizer/DiskV2.swift | 152 ++++++++++++++++++++++++ Sources/tart/OCI/Manifest.swift | 44 ++++++- Sources/tart/OCI/Registry.swift | 4 +- Sources/tart/VMDirectory+OCI.swift | 97 ++++++--------- Sources/tart/VMStorageHelper.swift | 3 + Sources/tart/VMStorageOCI.swift | 4 +- Tests/TartTests/LayerizerTests.swift | 90 ++++++++++++++ 13 files changed, 467 insertions(+), 70 deletions(-) create mode 100644 Sources/tart/OCI/Layerizer/Disk.swift create mode 100644 Sources/tart/OCI/Layerizer/DiskV1.swift create mode 100644 Sources/tart/OCI/Layerizer/DiskV2.swift create mode 100644 Tests/TartTests/LayerizerTests.swift diff --git a/Sources/tart/Commands/Clone.swift b/Sources/tart/Commands/Clone.swift index 4013921..e7aa948 100644 --- a/Sources/tart/Commands/Clone.swift +++ b/Sources/tart/Commands/Clone.swift @@ -27,10 +27,17 @@ struct Clone: AsyncParsableCommand { @Flag(help: "connect to the OCI registry via insecure HTTP protocol") var insecure: Bool = false + @Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry") + var concurrency: UInt = 4 + func validate() throws { if newName.contains("/") { throw ValidationError(" should be a local name") } + + if concurrency < 1 { + throw ValidationError("network concurrency cannot be less than 1") + } } func run() async throws { @@ -40,7 +47,7 @@ struct Clone: AsyncParsableCommand { if let remoteName = try? RemoteName(sourceName), !ociStorage.exists(remoteName) { // Pull the VM in case it's OCI-based and doesn't exist locally yet let registry = try Registry(host: remoteName.host, namespace: remoteName.namespace, insecure: insecure) - try await ociStorage.pull(remoteName, registry: registry) + try await ociStorage.pull(remoteName, registry: registry, concurrency: concurrency) } let sourceVM = try VMStorageHelper.open(sourceName) diff --git a/Sources/tart/Commands/Pull.swift b/Sources/tart/Commands/Pull.swift index c728689..489439f 100644 --- a/Sources/tart/Commands/Pull.swift +++ b/Sources/tart/Commands/Pull.swift @@ -20,6 +20,15 @@ struct Pull: AsyncParsableCommand { @Flag(help: "connect to the OCI registry via insecure HTTP protocol") var insecure: Bool = false + @Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry") + var concurrency: UInt = 4 + + func validate() throws { + if concurrency < 1 { + throw ValidationError("network concurrency cannot be less than 1") + } + } + func run() async throws { // Be more liberal when accepting local image as argument, // see https://github.com/cirruslabs/tart/issues/36 @@ -34,6 +43,6 @@ struct Pull: AsyncParsableCommand { defaultLogger.appendNewLine("pulling \(remoteName)...") - try await VMStorageOCI().pull(remoteName, registry: registry) + try await VMStorageOCI().pull(remoteName, registry: registry, concurrency: concurrency) } } diff --git a/Sources/tart/Commands/Push.swift b/Sources/tart/Commands/Push.swift index af833e7..eb41836 100644 --- a/Sources/tart/Commands/Push.swift +++ b/Sources/tart/Commands/Push.swift @@ -23,6 +23,9 @@ struct Push: AsyncParsableCommand { """)) var chunkSize: Int = 0 + @Option(help: .hidden) + var diskFormat: String = "v2" + @Flag(help: ArgumentHelp("cache pushed images locally", discussion: "Increases disk usage, but saves time if you're going to pull the pushed images later.")) var populateCache: Bool = false @@ -69,7 +72,8 @@ struct Push: AsyncParsableCommand { pushedRemoteName = try await localVMDir.pushToRegistry( registry: registry, references: references, - chunkSizeMb: chunkSize + chunkSizeMb: chunkSize, + diskFormat: diskFormat ) // Populate the local cache (if requested) if populateCache { diff --git a/Sources/tart/OCI/Digest.swift b/Sources/tart/OCI/Digest.swift index 3f2c864..51c4201 100644 --- a/Sources/tart/OCI/Digest.swift +++ b/Sources/tart/OCI/Digest.swift @@ -1,6 +1,11 @@ import Foundation import CryptoKit +enum DigestError: Error { + case InvalidOffset + case InvalidSize +} + class Digest { var hash: SHA256 = SHA256() @@ -15,6 +20,37 @@ class Digest { static func hash(_ data: Data) -> String { SHA256.hash(data: data).hexdigest() } + + static func hash(_ url: URL) throws -> String { + hash(try Data(contentsOf: url)) + } + + static func hash(_ url: URL, offset: UInt64, size: UInt64) throws -> String { + // Sanity check + let fhSanity = try FileHandle(forReadingFrom: url) + try fhSanity.seekToEnd() + let fileSize = try fhSanity.offset() + try fhSanity.close() + + if offset > fileSize { + throw DigestError.InvalidOffset + } + + if (offset + size) > fileSize { + throw DigestError.InvalidSize + } + + // Read a chunk of size ``size`` at offset ``offset`` + // and calculate it's digest + let fh = try FileHandle(forReadingFrom: url) + defer { try! fh.close() } + + try fh.seek(toOffset: offset) + + let data = try fh.read(upToCount: Int(size))! + + return hash(data) + } } extension SHA256.Digest { diff --git a/Sources/tart/OCI/Layerizer/Disk.swift b/Sources/tart/OCI/Layerizer/Disk.swift new file mode 100644 index 0000000..7ebaa27 --- /dev/null +++ b/Sources/tart/OCI/Layerizer/Disk.swift @@ -0,0 +1,6 @@ +import Foundation + +protocol Disk { + static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] + static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws +} diff --git a/Sources/tart/OCI/Layerizer/DiskV1.swift b/Sources/tart/OCI/Layerizer/DiskV1.swift new file mode 100644 index 0000000..637f8c5 --- /dev/null +++ b/Sources/tart/OCI/Layerizer/DiskV1.swift @@ -0,0 +1,75 @@ +import Foundation +import Compression + +class DiskV1: Disk { + private static let bufferSizeBytes = 4 * 1024 * 1024 + private static let layerLimitBytes = 500 * 1000 * 1000 + + static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] { + var pushedLayers: [OCIManifestLayer] = [] + + // Open the disk file + let mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped]) + var mappedDiskReadOffset = 0 + + // Compress the disk file as a single stream + let compressingFilter = try InputFilter(.compress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { (length: Int) -> Data? in + // Determine the size of the next chunk + let bytesRead = min(length, mappedDisk.count - mappedDiskReadOffset) + + // Read the next uncompressed chunk + let data = mappedDisk.subdata(in: mappedDiskReadOffset ..< mappedDiskReadOffset + bytesRead) + + // Advance the offset + mappedDiskReadOffset += bytesRead + + // Provide the uncompressed chunk to the compressing filter + return data + } + + // Cut the compressed stream into layers, each equal exactly ``Self.layerLimitBytes`` bytes, + // except for the last one, which may be smaller + while let compressedData = try compressingFilter.readData(ofLength: Self.layerLimitBytes) { + let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb) + + pushedLayers.append(OCIManifestLayer( + mediaType: diskV1MediaType, + size: compressedData.count, + digest: layerDigest + )) + + // Update progress using an absolute value + progress.completedUnitCount = Int64(mappedDiskReadOffset) + } + + return pushedLayers + } + + static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws { + if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) { + throw OCIError.FailedToCreateVmFile + } + + // Open the disk file + let disk = try FileHandle(forWritingTo: diskURL) + defer { try! disk.close() } + + // Decompress the layers onto the disk in a single stream + let filter = try OutputFilter(.decompress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { data in + if let data = data { + disk.write(data) + } + } + + for diskLayer in diskLayers { + try await registry.pullBlob(diskLayer.digest) { data in + try filter.write(data) + + // Update the progress + progress.completedUnitCount += Int64(data.count) + } + } + + try filter.finalize() + } +} diff --git a/Sources/tart/OCI/Layerizer/DiskV2.swift b/Sources/tart/OCI/Layerizer/DiskV2.swift new file mode 100644 index 0000000..b49af3e --- /dev/null +++ b/Sources/tart/OCI/Layerizer/DiskV2.swift @@ -0,0 +1,152 @@ +import Foundation +import Compression + +class DiskV2: Disk { + private static let bufferSizeBytes = 4 * 1024 * 1024 + private static let layerLimitBytes = 500 * 1000 * 1000 + + static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] { + var pushedLayers: [OCIManifestLayer] = [] + + // Open the disk file + let disk = try FileHandle(forReadingFrom: diskURL) + + // Compress the disk file as multiple individually decompressible streams, + // each equal ``Self.layerLimitBytes`` bytes or slightly larger due to the + // internal compressor's buffer + while let (compressedData, uncompressedSize, uncompressedDigest) = try compressNextLayerOfLimitBytesOrMore(disk: disk) { + let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb) + + pushedLayers.append(OCIManifestLayer( + mediaType: diskV2MediaType, + size: compressedData.count, + digest: layerDigest, + uncompressedSize: uncompressedSize, + uncompressedContentDigest: uncompressedDigest + )) + + // Update progress using a relative value + progress.completedUnitCount += Int64(uncompressedSize) + } + + return pushedLayers + } + + static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws { + // Support resumable pulls + let pullResumed = FileManager.default.fileExists(atPath: diskURL.path) + + if !pullResumed && !FileManager.default.createFile(atPath: diskURL.path, contents: nil) { + throw OCIError.FailedToCreateVmFile + } + + // Calculate the uncompressed disk size + var uncompressedDiskSize: UInt64 = 0 + + for layer in diskLayers { + guard let uncompressedLayerSize = layer.uncompressedSize() else { + throw OCIError.LayerIsMissingUncompressedSizeAnnotation + } + + uncompressedDiskSize += uncompressedLayerSize + } + + // Truncate the target disk file so that it will be able + // to accomodate the uncompressed disk size + let disk = try FileHandle(forWritingTo: diskURL) + try disk.truncate(atOffset: uncompressedDiskSize) + try disk.close() + + // Concurrently fetch and decompress layers + try await withThrowingTaskGroup(of: Void.self) { group in + var globalDiskWritingOffset: UInt64 = 0 + + for (index, diskLayer) in diskLayers.enumerated() { + // Respect the concurrency limit + if index >= concurrency { + try await group.next() + } + + // Retrieve layer annotations + guard let uncompressedLayerSize = diskLayer.uncompressedSize() else { + throw OCIError.LayerIsMissingUncompressedSizeAnnotation + } + guard let uncompressedLayerContentDigest = diskLayer.uncompressedContentDigest() else { + throw OCIError.LayerIsMissingUncompressedDigestAnnotation + } + + // Capture the current disk writing offset + let diskWritingOffset = globalDiskWritingOffset + + // Launch a fetching and decompression task + group.addTask { + // No need to fetch and decompress anything if we've already done so + if try pullResumed && Digest.hash(diskURL, offset: diskWritingOffset, size: uncompressedLayerSize) == uncompressedLayerContentDigest { + // Update the progress + progress.completedUnitCount += Int64(diskLayer.size) + + return + } + + // Open the disk file at the specific offset + let disk = try FileHandle(forWritingTo: diskURL) + try disk.seek(toOffset: diskWritingOffset) + + // Pull and decompress a single layer into the specific offset on disk + let filter = try OutputFilter(.decompress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { data in + if let data = data { + disk.write(data) + } + } + + try await registry.pullBlob(diskLayer.digest) { data in + try filter.write(data) + + // Update the progress + progress.completedUnitCount += Int64(data.count) + } + + try disk.close() + } + + globalDiskWritingOffset += uncompressedLayerSize + } + } + } + + private static func compressNextLayerOfLimitBytesOrMore(disk: FileHandle) throws -> (Data, UInt64, String)? { + var compressedData = Data() + var bytesRead: UInt64 = 0 + let digest = Digest() + + // Create a compressing filter that we will terminate upon + // reaching ``Self.layerLimitBytes`` of compressed data + let compressingFilter = try InputFilter(.compress, using: .lz4, bufferCapacity: bufferSizeBytes) { (length: Int) -> Data? in + if compressedData.count >= Self.layerLimitBytes { + return nil + } + + guard let uncompressedChunk = try disk.read(upToCount: bufferSizeBytes) else { + return nil + } + + bytesRead += UInt64(uncompressedChunk.count) + digest.update(uncompressedChunk) + + return uncompressedChunk + } + + // Retrieve compressed data chunks, but normally no more than ``Self.layerLimitBytes`` bytes + while let compressedChunk = try compressingFilter.readData(ofLength: Self.bufferSizeBytes) { + compressedData.append(compressedChunk) + } + + // Nothing was read this time from the disk, + // signal that to the consumer + if bytesRead == 0 { + return nil + } + + return (compressedData, bytesRead, digest.finalize()) + } +} diff --git a/Sources/tart/OCI/Manifest.swift b/Sources/tart/OCI/Manifest.swift index 283d160..955a016 100644 --- a/Sources/tart/OCI/Manifest.swift +++ b/Sources/tart/OCI/Manifest.swift @@ -1,12 +1,23 @@ import Foundation +// OCI manifest and OCI config media types let ociManifestMediaType = "application/vnd.oci.image.manifest.v1+json" let ociConfigMediaType = "application/vnd.oci.image.config.v1+json" -// Annotations +// Layer media types +let configMediaType = "application/vnd.cirruslabs.tart.config.v1" +let diskV1MediaType = "application/vnd.cirruslabs.tart.disk.v1" +let diskV2MediaType = "application/vnd.cirruslabs.tart.disk.v2" +let nvramMediaType = "application/vnd.cirruslabs.tart.nvram.v1" + +// Manifest annotations let uncompressedDiskSizeAnnotation = "org.cirruslabs.tart.uncompressed-disk-size" let uploadTimeAnnotation = "org.cirruslabs.tart.upload-time" +// Layer annotations +let uncompressedSizeAnnotation = "org.cirruslabs.tart.uncompressed-size" +let uncompressedContentDigestAnnotation = "org.cirruslabs.tart.uncompressed-content-digest" + struct OCIManifest: Codable, Equatable { var schemaVersion: Int = 2 var mediaType: String = ociManifestMediaType @@ -71,6 +82,37 @@ struct OCIManifestLayer: Codable, Equatable { var mediaType: String var size: Int var digest: String + var annotations: Dictionary? + + init(mediaType: String, size: Int, digest: String, uncompressedSize: UInt64? = nil, uncompressedContentDigest: String? = nil) { + self.mediaType = mediaType + self.size = size + self.digest = digest + + var annotations: [String: String] = [:] + + if let uncompressedSize = uncompressedSize { + annotations[uncompressedSizeAnnotation] = String(uncompressedSize) + } + + if let uncompressedContentDigest = uncompressedContentDigest { + annotations[uncompressedContentDigestAnnotation] = uncompressedContentDigest + } + + self.annotations = annotations + } + + func uncompressedSize() -> UInt64? { + guard let value = annotations?[uncompressedSizeAnnotation] else { + return nil + } + + return UInt64(value) + } + + func uncompressedContentDigest() -> String? { + annotations?[uncompressedContentDigestAnnotation] + } } struct Descriptor: Equatable { diff --git a/Sources/tart/OCI/Registry.swift b/Sources/tart/OCI/Registry.swift index 2f899e4..2c9b8e4 100644 --- a/Sources/tart/OCI/Registry.swift +++ b/Sources/tart/OCI/Registry.swift @@ -242,7 +242,7 @@ class Registry { return digest } - public func pullBlob(_ digest: String, handler: (Data) throws -> Void) async throws { + public func pullBlob(_ digest: String, handler: (Data) async throws -> Void) async throws { let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"), viaFile: true) if response.statusCode != HTTPCode.Ok.rawValue { let body = try await channel.asData().asText() @@ -253,7 +253,7 @@ class Registry { for try await part in channel { try Task.checkCancellation() - try handler(Data(part)) + try await handler(Data(part)) } } diff --git a/Sources/tart/VMDirectory+OCI.swift b/Sources/tart/VMDirectory+OCI.swift index fb60625..26850cf 100644 --- a/Sources/tart/VMDirectory+OCI.swift +++ b/Sources/tart/VMDirectory+OCI.swift @@ -1,33 +1,30 @@ import Foundation -import Compression import Sentry enum OCIError: Error { case ShouldBeExactlyOneLayer case ShouldBeAtLeastOneLayer case FailedToCreateVmFile + case LayerIsMissingUncompressedSizeAnnotation + case LayerIsMissingUncompressedDigestAnnotation } extension VMDirectory { private static let bufferSizeBytes = 64 * 1024 * 1024 private static let layerLimitBytes = 500 * 1000 * 1000 - private static let configMediaType = "application/vnd.cirruslabs.tart.config.v1" - private static let diskMediaType = "application/vnd.cirruslabs.tart.disk.v1" - private static let nvramMediaType = "application/vnd.cirruslabs.tart.nvram.v1" - - func pullFromRegistry(registry: Registry, reference: String) async throws { + func pullFromRegistry(registry: Registry, reference: String, concurrency: UInt) async throws { defaultLogger.appendNewLine("pulling manifest...") let (manifest, _) = try await registry.pullManifest(reference: reference) - return try await pullFromRegistry(registry: registry, manifest: manifest) + return try await pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency) } - func pullFromRegistry(registry: Registry, manifest: OCIManifest) async throws { + func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt) async throws { // Pull VM's config file layer and re-serialize it into a config file let configLayers = manifest.layers.filter { - $0.mediaType == Self.configMediaType + $0.mediaType == configMediaType } if configLayers.count != 1 { throw OCIError.ShouldBeExactlyOneLayer @@ -41,50 +38,36 @@ extension VMDirectory { } try configFile.close() - // Pull VM's disk layers and decompress them sequentially into a disk file - let diskLayers = manifest.layers.filter { - $0.mediaType == Self.diskMediaType - } - if diskLayers.isEmpty { + // Pull VM's disk layers and decompress them into a disk file + let diskImplType: Disk.Type + let layers: [OCIManifestLayer] + + if manifest.layers.contains(where: { $0.mediaType == diskV1MediaType }) { + diskImplType = DiskV1.self + layers = manifest.layers.filter { $0.mediaType == diskV1MediaType } + } else if manifest.layers.contains(where: { $0.mediaType == diskV2MediaType }) { + diskImplType = DiskV2.self + layers = manifest.layers.filter { $0.mediaType == diskV2MediaType } + } else { throw OCIError.ShouldBeAtLeastOneLayer } - if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) { - throw OCIError.FailedToCreateVmFile - } - let disk = try FileHandle(forWritingTo: diskURL) - let filter = try OutputFilter(.decompress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { data in - if let data = data { - disk.write(data) - } - } - // Progress - let diskCompressedSize: Int64 = Int64(diskLayers.map { - $0.size - } - .reduce(0) { - $0 + $1 - }) + let diskCompressedSize = layers.map { Int64($0.size) }.reduce(0, +) + SentrySDK.span?.setMeasurement(name: "compressed_disk_size", value: diskCompressedSize as NSNumber, unit: MeasurementUnitInformation.byte) + let prettyDiskSize = String(format: "%.1f", Double(diskCompressedSize) / 1_000_000_000.0) defaultLogger.appendNewLine("pulling disk (\(prettyDiskSize) GB compressed)...") + let progress = Progress(totalUnitCount: diskCompressedSize) ProgressObserver(progress).log(defaultLogger) - for diskLayer in diskLayers { - try await registry.pullBlob(diskLayer.digest) { data in - try filter.write(data) - progress.completedUnitCount += Int64(data.count) - } - } - try filter.finalize() - try disk.close() - SentrySDK.span?.setMeasurement(name: "compressed_disk_size", value: diskCompressedSize as NSNumber, unit: MeasurementUnitInformation.byte); + try await diskImplType.pull(registry: registry, diskLayers: layers, diskURL: diskURL, concurrency: concurrency, progress: progress) // Pull VM's NVRAM file layer and store it in an NVRAM file defaultLogger.appendNewLine("pulling NVRAM...") let nvramLayers = manifest.layers.filter { - $0.mediaType == Self.nvramMediaType + $0.mediaType == nvramMediaType } if nvramLayers.count != 1 { throw OCIError.ShouldBeExactlyOneLayer @@ -99,7 +82,7 @@ extension VMDirectory { try nvram.close() } - func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int) async throws -> RemoteName { + func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int, diskFormat: String) async throws -> RemoteName { var layers = Array() // Read VM's config and push it as blob @@ -107,32 +90,22 @@ extension VMDirectory { let configJSON = try JSONEncoder().encode(config) defaultLogger.appendNewLine("pushing config...") let configDigest = try await registry.pushBlob(fromData: configJSON, chunkSizeMb: chunkSizeMb) - layers.append(OCIManifestLayer(mediaType: Self.configMediaType, size: configJSON.count, digest: configDigest)) + layers.append(OCIManifestLayer(mediaType: configMediaType, size: configJSON.count, digest: configDigest)) - // Progress + // Compress the disk file as multiple chunks and push them as disk layers let diskSize = try FileManager.default.attributesOfItem(atPath: diskURL.path)[.size] as! Int64 defaultLogger.appendNewLine("pushing disk... this will take a while...") let progress = Progress(totalUnitCount: diskSize) ProgressObserver(progress).log(defaultLogger) - // Read VM's compressed disk as chunks - // and sequentially upload them as blobs - let mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped]) - let mappedDiskSize = mappedDisk.count - var mappedDiskReadOffset = 0 - let compressingFilter = try InputFilter(.compress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { (length: Int) -> Data? in - let bytesRead = min(length, mappedDiskSize - mappedDiskReadOffset) - let data = mappedDisk.subdata(in: mappedDiskReadOffset ..< mappedDiskReadOffset + bytesRead) - mappedDiskReadOffset += bytesRead - - progress.completedUnitCount = Int64(mappedDiskReadOffset) - - return data - } - while let compressedLayerData = try compressingFilter.readData(ofLength: Self.layerLimitBytes) { - let layerDigest = try await registry.pushBlob(fromData: compressedLayerData, chunkSizeMb: chunkSizeMb) - layers.append(OCIManifestLayer(mediaType: Self.diskMediaType, size: compressedLayerData.count, digest: layerDigest)) + switch diskFormat { + case "v1": + layers.append(contentsOf: try await DiskV1.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress)) + case "v2": + layers.append(contentsOf: try await DiskV2.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress)) + default: + throw RuntimeError.OCIUnsupportedDiskFormat(diskFormat) } // Read VM's NVRAM and push it as blob @@ -140,7 +113,7 @@ extension VMDirectory { let nvram = try FileHandle(forReadingFrom: nvramURL).readToEnd()! let nvramDigest = try await registry.pushBlob(fromData: nvram, chunkSizeMb: chunkSizeMb) - layers.append(OCIManifestLayer(mediaType: Self.nvramMediaType, size: nvram.count, digest: nvramDigest)) + layers.append(OCIManifestLayer(mediaType: nvramMediaType, size: nvram.count, digest: nvramDigest)) // Craft a stub OCI config for Docker Hub compatibility let ociConfigJSON = try OCIConfig(architecture: config.arch, os: config.os).toJSON() @@ -148,7 +121,7 @@ extension VMDirectory { let manifest = OCIManifest( config: OCIManifestConfig(size: ociConfigJSON.count, digest: ociConfigDigest), layers: layers, - uncompressedDiskSize: UInt64(mappedDiskReadOffset), + uncompressedDiskSize: UInt64(diskSize), uploadDate: Date() ) diff --git a/Sources/tart/VMStorageHelper.swift b/Sources/tart/VMStorageHelper.swift index 9be9e84..91ff29d 100644 --- a/Sources/tart/VMStorageHelper.swift +++ b/Sources/tart/VMStorageHelper.swift @@ -64,6 +64,7 @@ enum RuntimeError : Error { case ImportFailed(_ message: String) case SoftnetFailed(_ message: String) case OCIStorageError(_ message: String) + case OCIUnsupportedDiskFormat(_ format: String) case SuspendFailed(_ message: String) } @@ -108,6 +109,8 @@ extension RuntimeError : CustomStringConvertible { return "Softnet failed: \(message)" case .OCIStorageError(let message): return "OCI storage error: \(message)" + case .OCIUnsupportedDiskFormat(let format): + return "OCI disk format \(format) is not supported by this version of Tart" case .SuspendFailed(let message): return "Failed to suspend the VM: \(message)" } diff --git a/Sources/tart/VMStorageOCI.swift b/Sources/tart/VMStorageOCI.swift index d76ece7..ec20a07 100644 --- a/Sources/tart/VMStorageOCI.swift +++ b/Sources/tart/VMStorageOCI.swift @@ -132,7 +132,7 @@ class VMStorageOCI: PrunableStorage { try list().filter { (_, _, isSymlink) in !isSymlink }.map { (_, vmDir, _) in vmDir } } - func pull(_ name: RemoteName, registry: Registry) async throws { + func pull(_ name: RemoteName, registry: Registry, concurrency: UInt) async throws { SentrySDK.configureScope { scope in scope.setContext(value: ["imageName": name], key: "OCI") } @@ -188,7 +188,7 @@ class VMStorageOCI: PrunableStorage { } try await withTaskCancellationHandler(operation: { - try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest) + try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency) try move(digestName, from: tmpVMDir) transaction.finish() }, onCancel: { diff --git a/Tests/TartTests/LayerizerTests.swift b/Tests/TartTests/LayerizerTests.swift new file mode 100644 index 0000000..d0f8dc7 --- /dev/null +++ b/Tests/TartTests/LayerizerTests.swift @@ -0,0 +1,90 @@ +import XCTest +@testable import tart + +final class LayerizerTests: XCTestCase { + var registryRunner: RegistryRunner? + + var registry: Registry { + registryRunner!.registry + } + + override func setUp() async throws { + try await super.setUp() + + do { + registryRunner = try await RegistryRunner() + } catch { + try XCTSkipIf(ProcessInfo.processInfo.environment["CI"] == nil) + } + } + + override func tearDown() async throws { + try await super.tearDown() + + registryRunner = nil + } + + func testDiskV1() async throws { + // Original disk file to be pushed to the registry + let originalDiskFileURL = try fileWithRandomData(sizeBytes: 5 * 1024 * 1024 * 1024) + addTeardownBlock { + try FileManager.default.removeItem(at: originalDiskFileURL) + } + + // Disk file to be pulled from the registry + // and compared against the original disk file + let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString) + + print("pushing disk...") + let diskLayers = try await DiskV1.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress()) + + print("pulling disk...") + try await DiskV1.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress()) + + print("comparing disks...") + try XCTAssertEqual(Digest.hash(originalDiskFileURL), Digest.hash(pulledDiskFileURL)) + } + + func testDiskV2() async throws { + // Original disk file to be pushed to the registry + let originalDiskFileURL = try fileWithRandomData(sizeBytes: 5 * 1024 * 1024 * 1024) + addTeardownBlock { + try FileManager.default.removeItem(at: originalDiskFileURL) + } + + // Disk file to be pulled from the registry + // and compared against the original disk file + let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString) + + print("pushing disk...") + let diskLayers = try await DiskV2.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress()) + + print("pulling disk...") + try await DiskV2.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress()) + + print("comparing disks...") + try XCTAssertEqual(Digest.hash(originalDiskFileURL), Digest.hash(pulledDiskFileURL)) + } + + private func fileWithRandomData(sizeBytes: Int) throws -> URL { + let devUrandom = try FileHandle(forReadingFrom: URL(filePath: "/dev/urandom")) + + let temporaryFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString) + FileManager.default.createFile(atPath: temporaryFileURL.path, contents: nil) + let temporaryFile = try FileHandle(forWritingTo: temporaryFileURL) + + var remainingBytes = sizeBytes + + while remainingBytes > 0 { + let randomData = try devUrandom.read(upToCount: min(64 * 1024 * 1024, remainingBytes))! + remainingBytes -= randomData.count + try temporaryFile.write(contentsOf: randomData) + } + + try devUrandom.close() + + try temporaryFile.close() + + return temporaryFileURL + } +}