tart push: support --concurrency command-line argument (#868)

* tart push: support --concurrency command-line argument

* LayerizerTests: specify "concurrency" argument
This commit is contained in:
Nikolay Edigaryev 2024-07-18 17:52:55 +04:00 committed by GitHub
parent 1e74e268a5
commit 2dc25ce478
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 47 additions and 23 deletions

View File

@ -15,6 +15,9 @@ struct Push: AsyncParsableCommand {
@Flag(help: "connect to the OCI registry via insecure HTTP protocol")
var insecure: Bool = false
@Option(help: "network concurrency to use when pushing a local VM to the OCI-compatible registry")
var concurrency: UInt = 4
@Option(help: ArgumentHelp("chunk size in MB if registry supports chunked uploads",
discussion: """
By default monolithic method is used for uploading blobs to the registry but some registries support a more efficient chunked method.
@ -77,7 +80,8 @@ struct Push: AsyncParsableCommand {
registry: registry,
references: references,
chunkSizeMb: chunkSize,
diskFormat: diskFormat
diskFormat: diskFormat,
concurrency: concurrency
)
// Populate the local cache (if requested)
if populateCache {

View File

@ -1,6 +1,6 @@
import Foundation
protocol Disk {
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer]
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer]
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?) async throws
}

View File

@ -5,7 +5,7 @@ class DiskV1: Disk {
private static let bufferSizeBytes = 4 * 1024 * 1024
private static let layerLimitBytes = 500 * 1000 * 1000
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] {
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer] {
var pushedLayers: [OCIManifestLayer] = []
// Open the disk file

View File

@ -5,32 +5,52 @@ class DiskV2: Disk {
private static let bufferSizeBytes = 4 * 1024 * 1024
private static let layerLimitBytes = 512 * 1024 * 1024
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] {
var pushedLayers: [OCIManifestLayer] = []
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer] {
var pushedLayers: [(index: Int, pushedLayer: OCIManifestLayer)] = []
// Open the disk file
let mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped])
// Compress the disk file as multiple individually decompressible streams,
// each equal ``Self.layerLimitBytes`` bytes or less due to LZ4 compression
for data in mappedDisk.chunks(ofCount: layerLimitBytes) {
let compressedData = try (data as NSData).compressed(using: .lz4) as Data
try await withThrowingTaskGroup(of: (Int, OCIManifestLayer).self) { group in
for (index, data) in mappedDisk.chunks(ofCount: layerLimitBytes).enumerated() {
// Respect the concurrency limit
if index >= concurrency {
if let (index, pushedLayer) = try await group.next() {
pushedLayers.append((index, pushedLayer))
}
}
let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb)
// Launch a disk layer pushing task
group.addTask {
let compressedData = try (data as NSData).compressed(using: .lz4) as Data
pushedLayers.append(OCIManifestLayer(
mediaType: diskV2MediaType,
size: compressedData.count,
digest: layerDigest,
uncompressedSize: UInt64(data.count),
uncompressedContentDigest: Digest.hash(data)
))
let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb)
// Update progress using a relative value
progress.completedUnitCount += Int64(data.count)
// Update progress using a relative value
progress.completedUnitCount += Int64(data.count)
return (index, OCIManifestLayer(
mediaType: diskV2MediaType,
size: compressedData.count,
digest: layerDigest,
uncompressedSize: UInt64(data.count),
uncompressedContentDigest: Digest.hash(data)
))
}
}
for try await pushedLayer in group {
pushedLayers.append(pushedLayer)
}
}
return pushedLayers
return pushedLayers.sorted {
$0.index < $1.index
}.map {
$0.pushedLayer
}
}
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {

View File

@ -84,7 +84,7 @@ extension VMDirectory {
try manifest.toJSON().write(to: manifestURL)
}
func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int, diskFormat: String) async throws -> RemoteName {
func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int, diskFormat: String, concurrency: UInt) async throws -> RemoteName {
var layers = Array<OCIManifestLayer>()
// Read VM's config and push it as blob
@ -103,9 +103,9 @@ extension VMDirectory {
switch diskFormat {
case "v1":
layers.append(contentsOf: try await DiskV1.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress))
layers.append(contentsOf: try await DiskV1.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, concurrency: concurrency, progress: progress))
case "v2":
layers.append(contentsOf: try await DiskV2.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress))
layers.append(contentsOf: try await DiskV2.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, concurrency: concurrency, progress: progress))
default:
throw RuntimeError.OCIUnsupportedDiskFormat(diskFormat)
}

View File

@ -36,7 +36,7 @@ final class LayerizerTests: XCTestCase {
let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)
print("pushing disk...")
let diskLayers = try await DiskV1.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress())
let diskLayers = try await DiskV1.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, concurrency: 4, progress: Progress())
print("pulling disk...")
try await DiskV1.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress())
@ -57,7 +57,7 @@ final class LayerizerTests: XCTestCase {
let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)
print("pushing disk...")
let diskLayers = try await DiskV2.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress())
let diskLayers = try await DiskV2.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, concurrency: 4, progress: Progress())
print("pulling disk...")
try await DiskV2.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress())