mirror of https://github.com/cirruslabs/tart.git
Improved format for fast and efficient pulls from remote OCI-registry (#589)
* Improved format for fast and efficient pulls from remote OCI-registry * Tests: introduce fileWithRandomData() helper function * Remove useless continuation * --concurrency should be an option, not an argument * --v2-disk-format → --old-disk-format and use the new V2 by default * Reduce LZ4 buffer size from 64 to 4 MiB * --old-disk-format → --disk-format=...
This commit is contained in:
parent
e118b42b1f
commit
3ff4fc34c6
|
|
@ -27,10 +27,17 @@ struct Clone: AsyncParsableCommand {
|
|||
@Flag(help: "connect to the OCI registry via insecure HTTP protocol")
|
||||
var insecure: Bool = false
|
||||
|
||||
@Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry")
|
||||
var concurrency: UInt = 4
|
||||
|
||||
func validate() throws {
|
||||
if newName.contains("/") {
|
||||
throw ValidationError("<new-name> should be a local name")
|
||||
}
|
||||
|
||||
if concurrency < 1 {
|
||||
throw ValidationError("network concurrency cannot be less than 1")
|
||||
}
|
||||
}
|
||||
|
||||
func run() async throws {
|
||||
|
|
@ -40,7 +47,7 @@ struct Clone: AsyncParsableCommand {
|
|||
if let remoteName = try? RemoteName(sourceName), !ociStorage.exists(remoteName) {
|
||||
// Pull the VM in case it's OCI-based and doesn't exist locally yet
|
||||
let registry = try Registry(host: remoteName.host, namespace: remoteName.namespace, insecure: insecure)
|
||||
try await ociStorage.pull(remoteName, registry: registry)
|
||||
try await ociStorage.pull(remoteName, registry: registry, concurrency: concurrency)
|
||||
}
|
||||
|
||||
let sourceVM = try VMStorageHelper.open(sourceName)
|
||||
|
|
|
|||
|
|
@ -20,6 +20,15 @@ struct Pull: AsyncParsableCommand {
|
|||
@Flag(help: "connect to the OCI registry via insecure HTTP protocol")
|
||||
var insecure: Bool = false
|
||||
|
||||
@Option(help: "network concurrency to use when pulling a remote VM from the OCI-compatible registry")
|
||||
var concurrency: UInt = 4
|
||||
|
||||
func validate() throws {
|
||||
if concurrency < 1 {
|
||||
throw ValidationError("network concurrency cannot be less than 1")
|
||||
}
|
||||
}
|
||||
|
||||
func run() async throws {
|
||||
// Be more liberal when accepting local image as argument,
|
||||
// see https://github.com/cirruslabs/tart/issues/36
|
||||
|
|
@ -34,6 +43,6 @@ struct Pull: AsyncParsableCommand {
|
|||
|
||||
defaultLogger.appendNewLine("pulling \(remoteName)...")
|
||||
|
||||
try await VMStorageOCI().pull(remoteName, registry: registry)
|
||||
try await VMStorageOCI().pull(remoteName, registry: registry, concurrency: concurrency)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,9 @@ struct Push: AsyncParsableCommand {
|
|||
"""))
|
||||
var chunkSize: Int = 0
|
||||
|
||||
@Option(help: .hidden)
|
||||
var diskFormat: String = "v2"
|
||||
|
||||
@Flag(help: ArgumentHelp("cache pushed images locally",
|
||||
discussion: "Increases disk usage, but saves time if you're going to pull the pushed images later."))
|
||||
var populateCache: Bool = false
|
||||
|
|
@ -69,7 +72,8 @@ struct Push: AsyncParsableCommand {
|
|||
pushedRemoteName = try await localVMDir.pushToRegistry(
|
||||
registry: registry,
|
||||
references: references,
|
||||
chunkSizeMb: chunkSize
|
||||
chunkSizeMb: chunkSize,
|
||||
diskFormat: diskFormat
|
||||
)
|
||||
// Populate the local cache (if requested)
|
||||
if populateCache {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,11 @@
|
|||
import Foundation
|
||||
import CryptoKit
|
||||
|
||||
enum DigestError: Error {
|
||||
case InvalidOffset
|
||||
case InvalidSize
|
||||
}
|
||||
|
||||
class Digest {
|
||||
var hash: SHA256 = SHA256()
|
||||
|
||||
|
|
@ -15,6 +20,37 @@ class Digest {
|
|||
static func hash(_ data: Data) -> String {
|
||||
SHA256.hash(data: data).hexdigest()
|
||||
}
|
||||
|
||||
static func hash(_ url: URL) throws -> String {
|
||||
hash(try Data(contentsOf: url))
|
||||
}
|
||||
|
||||
static func hash(_ url: URL, offset: UInt64, size: UInt64) throws -> String {
|
||||
// Sanity check
|
||||
let fhSanity = try FileHandle(forReadingFrom: url)
|
||||
try fhSanity.seekToEnd()
|
||||
let fileSize = try fhSanity.offset()
|
||||
try fhSanity.close()
|
||||
|
||||
if offset > fileSize {
|
||||
throw DigestError.InvalidOffset
|
||||
}
|
||||
|
||||
if (offset + size) > fileSize {
|
||||
throw DigestError.InvalidSize
|
||||
}
|
||||
|
||||
// Read a chunk of size ``size`` at offset ``offset``
|
||||
// and calculate it's digest
|
||||
let fh = try FileHandle(forReadingFrom: url)
|
||||
defer { try! fh.close() }
|
||||
|
||||
try fh.seek(toOffset: offset)
|
||||
|
||||
let data = try fh.read(upToCount: Int(size))!
|
||||
|
||||
return hash(data)
|
||||
}
|
||||
}
|
||||
|
||||
extension SHA256.Digest {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,6 @@
|
|||
import Foundation
|
||||
|
||||
protocol Disk {
|
||||
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer]
|
||||
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws
|
||||
}
|
||||
|
|
@ -0,0 +1,75 @@
|
|||
import Foundation
|
||||
import Compression
|
||||
|
||||
class DiskV1: Disk {
|
||||
private static let bufferSizeBytes = 4 * 1024 * 1024
|
||||
private static let layerLimitBytes = 500 * 1000 * 1000
|
||||
|
||||
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] {
|
||||
var pushedLayers: [OCIManifestLayer] = []
|
||||
|
||||
// Open the disk file
|
||||
let mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped])
|
||||
var mappedDiskReadOffset = 0
|
||||
|
||||
// Compress the disk file as a single stream
|
||||
let compressingFilter = try InputFilter(.compress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { (length: Int) -> Data? in
|
||||
// Determine the size of the next chunk
|
||||
let bytesRead = min(length, mappedDisk.count - mappedDiskReadOffset)
|
||||
|
||||
// Read the next uncompressed chunk
|
||||
let data = mappedDisk.subdata(in: mappedDiskReadOffset ..< mappedDiskReadOffset + bytesRead)
|
||||
|
||||
// Advance the offset
|
||||
mappedDiskReadOffset += bytesRead
|
||||
|
||||
// Provide the uncompressed chunk to the compressing filter
|
||||
return data
|
||||
}
|
||||
|
||||
// Cut the compressed stream into layers, each equal exactly ``Self.layerLimitBytes`` bytes,
|
||||
// except for the last one, which may be smaller
|
||||
while let compressedData = try compressingFilter.readData(ofLength: Self.layerLimitBytes) {
|
||||
let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb)
|
||||
|
||||
pushedLayers.append(OCIManifestLayer(
|
||||
mediaType: diskV1MediaType,
|
||||
size: compressedData.count,
|
||||
digest: layerDigest
|
||||
))
|
||||
|
||||
// Update progress using an absolute value
|
||||
progress.completedUnitCount = Int64(mappedDiskReadOffset)
|
||||
}
|
||||
|
||||
return pushedLayers
|
||||
}
|
||||
|
||||
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws {
|
||||
if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
|
||||
throw OCIError.FailedToCreateVmFile
|
||||
}
|
||||
|
||||
// Open the disk file
|
||||
let disk = try FileHandle(forWritingTo: diskURL)
|
||||
defer { try! disk.close() }
|
||||
|
||||
// Decompress the layers onto the disk in a single stream
|
||||
let filter = try OutputFilter(.decompress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { data in
|
||||
if let data = data {
|
||||
disk.write(data)
|
||||
}
|
||||
}
|
||||
|
||||
for diskLayer in diskLayers {
|
||||
try await registry.pullBlob(diskLayer.digest) { data in
|
||||
try filter.write(data)
|
||||
|
||||
// Update the progress
|
||||
progress.completedUnitCount += Int64(data.count)
|
||||
}
|
||||
}
|
||||
|
||||
try filter.finalize()
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,152 @@
|
|||
import Foundation
|
||||
import Compression
|
||||
|
||||
class DiskV2: Disk {
|
||||
private static let bufferSizeBytes = 4 * 1024 * 1024
|
||||
private static let layerLimitBytes = 500 * 1000 * 1000
|
||||
|
||||
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] {
|
||||
var pushedLayers: [OCIManifestLayer] = []
|
||||
|
||||
// Open the disk file
|
||||
let disk = try FileHandle(forReadingFrom: diskURL)
|
||||
|
||||
// Compress the disk file as multiple individually decompressible streams,
|
||||
// each equal ``Self.layerLimitBytes`` bytes or slightly larger due to the
|
||||
// internal compressor's buffer
|
||||
while let (compressedData, uncompressedSize, uncompressedDigest) = try compressNextLayerOfLimitBytesOrMore(disk: disk) {
|
||||
let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb)
|
||||
|
||||
pushedLayers.append(OCIManifestLayer(
|
||||
mediaType: diskV2MediaType,
|
||||
size: compressedData.count,
|
||||
digest: layerDigest,
|
||||
uncompressedSize: uncompressedSize,
|
||||
uncompressedContentDigest: uncompressedDigest
|
||||
))
|
||||
|
||||
// Update progress using a relative value
|
||||
progress.completedUnitCount += Int64(uncompressedSize)
|
||||
}
|
||||
|
||||
return pushedLayers
|
||||
}
|
||||
|
||||
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress) async throws {
|
||||
// Support resumable pulls
|
||||
let pullResumed = FileManager.default.fileExists(atPath: diskURL.path)
|
||||
|
||||
if !pullResumed && !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
|
||||
throw OCIError.FailedToCreateVmFile
|
||||
}
|
||||
|
||||
// Calculate the uncompressed disk size
|
||||
var uncompressedDiskSize: UInt64 = 0
|
||||
|
||||
for layer in diskLayers {
|
||||
guard let uncompressedLayerSize = layer.uncompressedSize() else {
|
||||
throw OCIError.LayerIsMissingUncompressedSizeAnnotation
|
||||
}
|
||||
|
||||
uncompressedDiskSize += uncompressedLayerSize
|
||||
}
|
||||
|
||||
// Truncate the target disk file so that it will be able
|
||||
// to accomodate the uncompressed disk size
|
||||
let disk = try FileHandle(forWritingTo: diskURL)
|
||||
try disk.truncate(atOffset: uncompressedDiskSize)
|
||||
try disk.close()
|
||||
|
||||
// Concurrently fetch and decompress layers
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
var globalDiskWritingOffset: UInt64 = 0
|
||||
|
||||
for (index, diskLayer) in diskLayers.enumerated() {
|
||||
// Respect the concurrency limit
|
||||
if index >= concurrency {
|
||||
try await group.next()
|
||||
}
|
||||
|
||||
// Retrieve layer annotations
|
||||
guard let uncompressedLayerSize = diskLayer.uncompressedSize() else {
|
||||
throw OCIError.LayerIsMissingUncompressedSizeAnnotation
|
||||
}
|
||||
guard let uncompressedLayerContentDigest = diskLayer.uncompressedContentDigest() else {
|
||||
throw OCIError.LayerIsMissingUncompressedDigestAnnotation
|
||||
}
|
||||
|
||||
// Capture the current disk writing offset
|
||||
let diskWritingOffset = globalDiskWritingOffset
|
||||
|
||||
// Launch a fetching and decompression task
|
||||
group.addTask {
|
||||
// No need to fetch and decompress anything if we've already done so
|
||||
if try pullResumed && Digest.hash(diskURL, offset: diskWritingOffset, size: uncompressedLayerSize) == uncompressedLayerContentDigest {
|
||||
// Update the progress
|
||||
progress.completedUnitCount += Int64(diskLayer.size)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Open the disk file at the specific offset
|
||||
let disk = try FileHandle(forWritingTo: diskURL)
|
||||
try disk.seek(toOffset: diskWritingOffset)
|
||||
|
||||
// Pull and decompress a single layer into the specific offset on disk
|
||||
let filter = try OutputFilter(.decompress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { data in
|
||||
if let data = data {
|
||||
disk.write(data)
|
||||
}
|
||||
}
|
||||
|
||||
try await registry.pullBlob(diskLayer.digest) { data in
|
||||
try filter.write(data)
|
||||
|
||||
// Update the progress
|
||||
progress.completedUnitCount += Int64(data.count)
|
||||
}
|
||||
|
||||
try disk.close()
|
||||
}
|
||||
|
||||
globalDiskWritingOffset += uncompressedLayerSize
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static func compressNextLayerOfLimitBytesOrMore(disk: FileHandle) throws -> (Data, UInt64, String)? {
|
||||
var compressedData = Data()
|
||||
var bytesRead: UInt64 = 0
|
||||
let digest = Digest()
|
||||
|
||||
// Create a compressing filter that we will terminate upon
|
||||
// reaching ``Self.layerLimitBytes`` of compressed data
|
||||
let compressingFilter = try InputFilter(.compress, using: .lz4, bufferCapacity: bufferSizeBytes) { (length: Int) -> Data? in
|
||||
if compressedData.count >= Self.layerLimitBytes {
|
||||
return nil
|
||||
}
|
||||
|
||||
guard let uncompressedChunk = try disk.read(upToCount: bufferSizeBytes) else {
|
||||
return nil
|
||||
}
|
||||
|
||||
bytesRead += UInt64(uncompressedChunk.count)
|
||||
digest.update(uncompressedChunk)
|
||||
|
||||
return uncompressedChunk
|
||||
}
|
||||
|
||||
// Retrieve compressed data chunks, but normally no more than ``Self.layerLimitBytes`` bytes
|
||||
while let compressedChunk = try compressingFilter.readData(ofLength: Self.bufferSizeBytes) {
|
||||
compressedData.append(compressedChunk)
|
||||
}
|
||||
|
||||
// Nothing was read this time from the disk,
|
||||
// signal that to the consumer
|
||||
if bytesRead == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return (compressedData, bytesRead, digest.finalize())
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,23 @@
|
|||
import Foundation
|
||||
|
||||
// OCI manifest and OCI config media types
|
||||
let ociManifestMediaType = "application/vnd.oci.image.manifest.v1+json"
|
||||
let ociConfigMediaType = "application/vnd.oci.image.config.v1+json"
|
||||
|
||||
// Annotations
|
||||
// Layer media types
|
||||
let configMediaType = "application/vnd.cirruslabs.tart.config.v1"
|
||||
let diskV1MediaType = "application/vnd.cirruslabs.tart.disk.v1"
|
||||
let diskV2MediaType = "application/vnd.cirruslabs.tart.disk.v2"
|
||||
let nvramMediaType = "application/vnd.cirruslabs.tart.nvram.v1"
|
||||
|
||||
// Manifest annotations
|
||||
let uncompressedDiskSizeAnnotation = "org.cirruslabs.tart.uncompressed-disk-size"
|
||||
let uploadTimeAnnotation = "org.cirruslabs.tart.upload-time"
|
||||
|
||||
// Layer annotations
|
||||
let uncompressedSizeAnnotation = "org.cirruslabs.tart.uncompressed-size"
|
||||
let uncompressedContentDigestAnnotation = "org.cirruslabs.tart.uncompressed-content-digest"
|
||||
|
||||
struct OCIManifest: Codable, Equatable {
|
||||
var schemaVersion: Int = 2
|
||||
var mediaType: String = ociManifestMediaType
|
||||
|
|
@ -71,6 +82,37 @@ struct OCIManifestLayer: Codable, Equatable {
|
|||
var mediaType: String
|
||||
var size: Int
|
||||
var digest: String
|
||||
var annotations: Dictionary<String, String>?
|
||||
|
||||
init(mediaType: String, size: Int, digest: String, uncompressedSize: UInt64? = nil, uncompressedContentDigest: String? = nil) {
|
||||
self.mediaType = mediaType
|
||||
self.size = size
|
||||
self.digest = digest
|
||||
|
||||
var annotations: [String: String] = [:]
|
||||
|
||||
if let uncompressedSize = uncompressedSize {
|
||||
annotations[uncompressedSizeAnnotation] = String(uncompressedSize)
|
||||
}
|
||||
|
||||
if let uncompressedContentDigest = uncompressedContentDigest {
|
||||
annotations[uncompressedContentDigestAnnotation] = uncompressedContentDigest
|
||||
}
|
||||
|
||||
self.annotations = annotations
|
||||
}
|
||||
|
||||
func uncompressedSize() -> UInt64? {
|
||||
guard let value = annotations?[uncompressedSizeAnnotation] else {
|
||||
return nil
|
||||
}
|
||||
|
||||
return UInt64(value)
|
||||
}
|
||||
|
||||
func uncompressedContentDigest() -> String? {
|
||||
annotations?[uncompressedContentDigestAnnotation]
|
||||
}
|
||||
}
|
||||
|
||||
struct Descriptor: Equatable {
|
||||
|
|
|
|||
|
|
@ -242,7 +242,7 @@ class Registry {
|
|||
return digest
|
||||
}
|
||||
|
||||
public func pullBlob(_ digest: String, handler: (Data) throws -> Void) async throws {
|
||||
public func pullBlob(_ digest: String, handler: (Data) async throws -> Void) async throws {
|
||||
let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"), viaFile: true)
|
||||
if response.statusCode != HTTPCode.Ok.rawValue {
|
||||
let body = try await channel.asData().asText()
|
||||
|
|
@ -253,7 +253,7 @@ class Registry {
|
|||
for try await part in channel {
|
||||
try Task.checkCancellation()
|
||||
|
||||
try handler(Data(part))
|
||||
try await handler(Data(part))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,33 +1,30 @@
|
|||
import Foundation
|
||||
import Compression
|
||||
import Sentry
|
||||
|
||||
enum OCIError: Error {
|
||||
case ShouldBeExactlyOneLayer
|
||||
case ShouldBeAtLeastOneLayer
|
||||
case FailedToCreateVmFile
|
||||
case LayerIsMissingUncompressedSizeAnnotation
|
||||
case LayerIsMissingUncompressedDigestAnnotation
|
||||
}
|
||||
|
||||
extension VMDirectory {
|
||||
private static let bufferSizeBytes = 64 * 1024 * 1024
|
||||
private static let layerLimitBytes = 500 * 1000 * 1000
|
||||
|
||||
private static let configMediaType = "application/vnd.cirruslabs.tart.config.v1"
|
||||
private static let diskMediaType = "application/vnd.cirruslabs.tart.disk.v1"
|
||||
private static let nvramMediaType = "application/vnd.cirruslabs.tart.nvram.v1"
|
||||
|
||||
func pullFromRegistry(registry: Registry, reference: String) async throws {
|
||||
func pullFromRegistry(registry: Registry, reference: String, concurrency: UInt) async throws {
|
||||
defaultLogger.appendNewLine("pulling manifest...")
|
||||
|
||||
let (manifest, _) = try await registry.pullManifest(reference: reference)
|
||||
|
||||
return try await pullFromRegistry(registry: registry, manifest: manifest)
|
||||
return try await pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency)
|
||||
}
|
||||
|
||||
func pullFromRegistry(registry: Registry, manifest: OCIManifest) async throws {
|
||||
func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt) async throws {
|
||||
// Pull VM's config file layer and re-serialize it into a config file
|
||||
let configLayers = manifest.layers.filter {
|
||||
$0.mediaType == Self.configMediaType
|
||||
$0.mediaType == configMediaType
|
||||
}
|
||||
if configLayers.count != 1 {
|
||||
throw OCIError.ShouldBeExactlyOneLayer
|
||||
|
|
@ -41,50 +38,36 @@ extension VMDirectory {
|
|||
}
|
||||
try configFile.close()
|
||||
|
||||
// Pull VM's disk layers and decompress them sequentially into a disk file
|
||||
let diskLayers = manifest.layers.filter {
|
||||
$0.mediaType == Self.diskMediaType
|
||||
}
|
||||
if diskLayers.isEmpty {
|
||||
// Pull VM's disk layers and decompress them into a disk file
|
||||
let diskImplType: Disk.Type
|
||||
let layers: [OCIManifestLayer]
|
||||
|
||||
if manifest.layers.contains(where: { $0.mediaType == diskV1MediaType }) {
|
||||
diskImplType = DiskV1.self
|
||||
layers = manifest.layers.filter { $0.mediaType == diskV1MediaType }
|
||||
} else if manifest.layers.contains(where: { $0.mediaType == diskV2MediaType }) {
|
||||
diskImplType = DiskV2.self
|
||||
layers = manifest.layers.filter { $0.mediaType == diskV2MediaType }
|
||||
} else {
|
||||
throw OCIError.ShouldBeAtLeastOneLayer
|
||||
}
|
||||
if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
|
||||
throw OCIError.FailedToCreateVmFile
|
||||
}
|
||||
let disk = try FileHandle(forWritingTo: diskURL)
|
||||
let filter = try OutputFilter(.decompress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { data in
|
||||
if let data = data {
|
||||
disk.write(data)
|
||||
}
|
||||
}
|
||||
|
||||
// Progress
|
||||
let diskCompressedSize: Int64 = Int64(diskLayers.map {
|
||||
$0.size
|
||||
}
|
||||
.reduce(0) {
|
||||
$0 + $1
|
||||
})
|
||||
let diskCompressedSize = layers.map { Int64($0.size) }.reduce(0, +)
|
||||
SentrySDK.span?.setMeasurement(name: "compressed_disk_size", value: diskCompressedSize as NSNumber, unit: MeasurementUnitInformation.byte)
|
||||
|
||||
let prettyDiskSize = String(format: "%.1f", Double(diskCompressedSize) / 1_000_000_000.0)
|
||||
defaultLogger.appendNewLine("pulling disk (\(prettyDiskSize) GB compressed)...")
|
||||
|
||||
let progress = Progress(totalUnitCount: diskCompressedSize)
|
||||
ProgressObserver(progress).log(defaultLogger)
|
||||
|
||||
for diskLayer in diskLayers {
|
||||
try await registry.pullBlob(diskLayer.digest) { data in
|
||||
try filter.write(data)
|
||||
progress.completedUnitCount += Int64(data.count)
|
||||
}
|
||||
}
|
||||
try filter.finalize()
|
||||
try disk.close()
|
||||
SentrySDK.span?.setMeasurement(name: "compressed_disk_size", value: diskCompressedSize as NSNumber, unit: MeasurementUnitInformation.byte);
|
||||
try await diskImplType.pull(registry: registry, diskLayers: layers, diskURL: diskURL, concurrency: concurrency, progress: progress)
|
||||
|
||||
// Pull VM's NVRAM file layer and store it in an NVRAM file
|
||||
defaultLogger.appendNewLine("pulling NVRAM...")
|
||||
|
||||
let nvramLayers = manifest.layers.filter {
|
||||
$0.mediaType == Self.nvramMediaType
|
||||
$0.mediaType == nvramMediaType
|
||||
}
|
||||
if nvramLayers.count != 1 {
|
||||
throw OCIError.ShouldBeExactlyOneLayer
|
||||
|
|
@ -99,7 +82,7 @@ extension VMDirectory {
|
|||
try nvram.close()
|
||||
}
|
||||
|
||||
func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int) async throws -> RemoteName {
|
||||
func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int, diskFormat: String) async throws -> RemoteName {
|
||||
var layers = Array<OCIManifestLayer>()
|
||||
|
||||
// Read VM's config and push it as blob
|
||||
|
|
@ -107,32 +90,22 @@ extension VMDirectory {
|
|||
let configJSON = try JSONEncoder().encode(config)
|
||||
defaultLogger.appendNewLine("pushing config...")
|
||||
let configDigest = try await registry.pushBlob(fromData: configJSON, chunkSizeMb: chunkSizeMb)
|
||||
layers.append(OCIManifestLayer(mediaType: Self.configMediaType, size: configJSON.count, digest: configDigest))
|
||||
layers.append(OCIManifestLayer(mediaType: configMediaType, size: configJSON.count, digest: configDigest))
|
||||
|
||||
// Progress
|
||||
// Compress the disk file as multiple chunks and push them as disk layers
|
||||
let diskSize = try FileManager.default.attributesOfItem(atPath: diskURL.path)[.size] as! Int64
|
||||
|
||||
defaultLogger.appendNewLine("pushing disk... this will take a while...")
|
||||
let progress = Progress(totalUnitCount: diskSize)
|
||||
ProgressObserver(progress).log(defaultLogger)
|
||||
|
||||
// Read VM's compressed disk as chunks
|
||||
// and sequentially upload them as blobs
|
||||
let mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped])
|
||||
let mappedDiskSize = mappedDisk.count
|
||||
var mappedDiskReadOffset = 0
|
||||
let compressingFilter = try InputFilter(.compress, using: .lz4, bufferCapacity: Self.bufferSizeBytes) { (length: Int) -> Data? in
|
||||
let bytesRead = min(length, mappedDiskSize - mappedDiskReadOffset)
|
||||
let data = mappedDisk.subdata(in: mappedDiskReadOffset ..< mappedDiskReadOffset + bytesRead)
|
||||
mappedDiskReadOffset += bytesRead
|
||||
|
||||
progress.completedUnitCount = Int64(mappedDiskReadOffset)
|
||||
|
||||
return data
|
||||
}
|
||||
while let compressedLayerData = try compressingFilter.readData(ofLength: Self.layerLimitBytes) {
|
||||
let layerDigest = try await registry.pushBlob(fromData: compressedLayerData, chunkSizeMb: chunkSizeMb)
|
||||
layers.append(OCIManifestLayer(mediaType: Self.diskMediaType, size: compressedLayerData.count, digest: layerDigest))
|
||||
switch diskFormat {
|
||||
case "v1":
|
||||
layers.append(contentsOf: try await DiskV1.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress))
|
||||
case "v2":
|
||||
layers.append(contentsOf: try await DiskV2.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress))
|
||||
default:
|
||||
throw RuntimeError.OCIUnsupportedDiskFormat(diskFormat)
|
||||
}
|
||||
|
||||
// Read VM's NVRAM and push it as blob
|
||||
|
|
@ -140,7 +113,7 @@ extension VMDirectory {
|
|||
|
||||
let nvram = try FileHandle(forReadingFrom: nvramURL).readToEnd()!
|
||||
let nvramDigest = try await registry.pushBlob(fromData: nvram, chunkSizeMb: chunkSizeMb)
|
||||
layers.append(OCIManifestLayer(mediaType: Self.nvramMediaType, size: nvram.count, digest: nvramDigest))
|
||||
layers.append(OCIManifestLayer(mediaType: nvramMediaType, size: nvram.count, digest: nvramDigest))
|
||||
|
||||
// Craft a stub OCI config for Docker Hub compatibility
|
||||
let ociConfigJSON = try OCIConfig(architecture: config.arch, os: config.os).toJSON()
|
||||
|
|
@ -148,7 +121,7 @@ extension VMDirectory {
|
|||
let manifest = OCIManifest(
|
||||
config: OCIManifestConfig(size: ociConfigJSON.count, digest: ociConfigDigest),
|
||||
layers: layers,
|
||||
uncompressedDiskSize: UInt64(mappedDiskReadOffset),
|
||||
uncompressedDiskSize: UInt64(diskSize),
|
||||
uploadDate: Date()
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -64,6 +64,7 @@ enum RuntimeError : Error {
|
|||
case ImportFailed(_ message: String)
|
||||
case SoftnetFailed(_ message: String)
|
||||
case OCIStorageError(_ message: String)
|
||||
case OCIUnsupportedDiskFormat(_ format: String)
|
||||
case SuspendFailed(_ message: String)
|
||||
}
|
||||
|
||||
|
|
@ -108,6 +109,8 @@ extension RuntimeError : CustomStringConvertible {
|
|||
return "Softnet failed: \(message)"
|
||||
case .OCIStorageError(let message):
|
||||
return "OCI storage error: \(message)"
|
||||
case .OCIUnsupportedDiskFormat(let format):
|
||||
return "OCI disk format \(format) is not supported by this version of Tart"
|
||||
case .SuspendFailed(let message):
|
||||
return "Failed to suspend the VM: \(message)"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -132,7 +132,7 @@ class VMStorageOCI: PrunableStorage {
|
|||
try list().filter { (_, _, isSymlink) in !isSymlink }.map { (_, vmDir, _) in vmDir }
|
||||
}
|
||||
|
||||
func pull(_ name: RemoteName, registry: Registry) async throws {
|
||||
func pull(_ name: RemoteName, registry: Registry, concurrency: UInt) async throws {
|
||||
SentrySDK.configureScope { scope in
|
||||
scope.setContext(value: ["imageName": name], key: "OCI")
|
||||
}
|
||||
|
|
@ -188,7 +188,7 @@ class VMStorageOCI: PrunableStorage {
|
|||
}
|
||||
|
||||
try await withTaskCancellationHandler(operation: {
|
||||
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest)
|
||||
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency)
|
||||
try move(digestName, from: tmpVMDir)
|
||||
transaction.finish()
|
||||
}, onCancel: {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,90 @@
|
|||
import XCTest
|
||||
@testable import tart
|
||||
|
||||
final class LayerizerTests: XCTestCase {
|
||||
var registryRunner: RegistryRunner?
|
||||
|
||||
var registry: Registry {
|
||||
registryRunner!.registry
|
||||
}
|
||||
|
||||
override func setUp() async throws {
|
||||
try await super.setUp()
|
||||
|
||||
do {
|
||||
registryRunner = try await RegistryRunner()
|
||||
} catch {
|
||||
try XCTSkipIf(ProcessInfo.processInfo.environment["CI"] == nil)
|
||||
}
|
||||
}
|
||||
|
||||
override func tearDown() async throws {
|
||||
try await super.tearDown()
|
||||
|
||||
registryRunner = nil
|
||||
}
|
||||
|
||||
func testDiskV1() async throws {
|
||||
// Original disk file to be pushed to the registry
|
||||
let originalDiskFileURL = try fileWithRandomData(sizeBytes: 5 * 1024 * 1024 * 1024)
|
||||
addTeardownBlock {
|
||||
try FileManager.default.removeItem(at: originalDiskFileURL)
|
||||
}
|
||||
|
||||
// Disk file to be pulled from the registry
|
||||
// and compared against the original disk file
|
||||
let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)
|
||||
|
||||
print("pushing disk...")
|
||||
let diskLayers = try await DiskV1.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress())
|
||||
|
||||
print("pulling disk...")
|
||||
try await DiskV1.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress())
|
||||
|
||||
print("comparing disks...")
|
||||
try XCTAssertEqual(Digest.hash(originalDiskFileURL), Digest.hash(pulledDiskFileURL))
|
||||
}
|
||||
|
||||
func testDiskV2() async throws {
|
||||
// Original disk file to be pushed to the registry
|
||||
let originalDiskFileURL = try fileWithRandomData(sizeBytes: 5 * 1024 * 1024 * 1024)
|
||||
addTeardownBlock {
|
||||
try FileManager.default.removeItem(at: originalDiskFileURL)
|
||||
}
|
||||
|
||||
// Disk file to be pulled from the registry
|
||||
// and compared against the original disk file
|
||||
let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)
|
||||
|
||||
print("pushing disk...")
|
||||
let diskLayers = try await DiskV2.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress())
|
||||
|
||||
print("pulling disk...")
|
||||
try await DiskV2.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress())
|
||||
|
||||
print("comparing disks...")
|
||||
try XCTAssertEqual(Digest.hash(originalDiskFileURL), Digest.hash(pulledDiskFileURL))
|
||||
}
|
||||
|
||||
private func fileWithRandomData(sizeBytes: Int) throws -> URL {
|
||||
let devUrandom = try FileHandle(forReadingFrom: URL(filePath: "/dev/urandom"))
|
||||
|
||||
let temporaryFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)
|
||||
FileManager.default.createFile(atPath: temporaryFileURL.path, contents: nil)
|
||||
let temporaryFile = try FileHandle(forWritingTo: temporaryFileURL)
|
||||
|
||||
var remainingBytes = sizeBytes
|
||||
|
||||
while remainingBytes > 0 {
|
||||
let randomData = try devUrandom.read(upToCount: min(64 * 1024 * 1024, remainingBytes))!
|
||||
remainingBytes -= randomData.count
|
||||
try temporaryFile.write(contentsOf: randomData)
|
||||
}
|
||||
|
||||
try devUrandom.close()
|
||||
|
||||
try temporaryFile.close()
|
||||
|
||||
return temporaryFileURL
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue