tart pull: re-try disk layer downloads by specifying "Range" header (#980)

This commit is contained in:
Nikolay Edigaryev 2024-12-19 21:21:33 +04:00 committed by GitHub
parent eaec015edf
commit b96ea087f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 38 additions and 11 deletions

View File

@ -44,7 +44,7 @@ class DiskV2: Disk {
let compressedData = try (data as NSData).compressed(using: .lz4) as Data
let compressedDataDigest = Digest.hash(compressedData)
try await retry(maxAttempts: 5, backoff: .exponentialWithFullJitter(baseDelay: .seconds(5), maxDelay: .seconds(60))) {
try await retry(maxAttempts: 5) {
if try await !registry.blobExists(compressedDataDigest) {
_ = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb, digest: compressedDataDigest)
}
@ -208,11 +208,26 @@ class DiskV2: Disk {
diskWritingOffset = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
}
try await registry.pullBlob(diskLayer.digest) { data in
try filter.write(data)
var rangeStart: Int64 = 0
// Update the progress
progress.completedUnitCount += Int64(data.count)
try await retry(maxAttempts: 5) {
try await registry.pullBlob(diskLayer.digest, rangeStart: rangeStart) { data in
try filter.write(data)
// Update the progress
progress.completedUnitCount += Int64(data.count)
// Update the current range start
rangeStart += Int64(data.count)
}
} recoverFromFailure: { error in
if error is URLError {
print("Error pulling disk layer \(index + 1): \"\(error.localizedDescription)\", attempting to re-try...")
return .retry
}
return .throw
}
try filter.finalize()

View File

@ -20,6 +20,7 @@ enum HTTPCode: Int {
case Ok = 200
case Created = 201
case Accepted = 202
case PartialContent = 206
case Unauthorized = 401
case NotFound = 404
}
@ -263,9 +264,21 @@ class Registry {
}
}
public func pullBlob(_ digest: String, handler: (Data) async throws -> Void) async throws {
let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"), viaFile: true)
if response.statusCode != HTTPCode.Ok.rawValue {
public func pullBlob(_ digest: String, rangeStart: Int64 = 0, handler: (Data) async throws -> Void) async throws {
var expectedStatusCode = HTTPCode.Ok
var headers: [String: String] = [:]
// Send Range header and expect HTTP 206 in return
//
// However, do not send Range header at all when rangeStart is 0,
// because it makes no sense and we might get HTTP 200 in return
if rangeStart != 0 {
expectedStatusCode = HTTPCode.PartialContent
headers["Range"] = "bytes=\(rangeStart)-"
}
let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"), headers: headers, viaFile: true)
if response.statusCode != expectedStatusCode.rawValue {
let body = try await channel.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling blob", code: response.statusCode,
details: body)

View File

@ -196,7 +196,7 @@ class VMStorageOCI: PrunableStorage {
}
try await withTaskCancellationHandler(operation: {
try await retry(maxAttempts: 5, backoff: .exponentialWithFullJitter(baseDelay: .seconds(5), maxDelay: .seconds(60))) {
try await retry(maxAttempts: 5) {
// Choose the best base image which has the most deduplication ratio
let localLayerCache = try await chooseLocalLayerCache(name, manifest, registry)
@ -213,8 +213,7 @@ class VMStorageOCI: PrunableStorage {
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache, deduplicate: deduplicate)
} recoverFromFailure: { error in
if error is URLError {
print("Error: \(error.localizedDescription)")
print("Attempting to re-try...")
print("Error pulling image: \"\(error.localizedDescription)\", attempting to re-try...")
return .retry
}