OCI: pull blobs via file (#306)

* OCI: pull blobs via file

* Explain why we delete the downloaded file after opening a handle to it

* Further abstract away ways to fetch a URLRequest

* No need to cast HTTPURLResponse to HTTPURLResponse

* Fetcher: no need to be a delegate anymore

* Fetcher.fetch() can be made static
This commit is contained in:
Nikolay Edigaryev 2022-11-09 19:36:41 +04:00 committed by GitHub
parent 8961c5189a
commit ee0fbdd83d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 43 deletions

View File

@ -1,46 +1,49 @@
import Foundation
import AsyncAlgorithms
class Fetcher: NSObject, URLSessionTaskDelegate, URLSessionDelegate, URLSessionDataDelegate {
let responseCh = AsyncThrowingChannel<URLResponse, Error>()
class Fetcher {
static func fetch(_ request: URLRequest, viaFile: Bool = false) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
if viaFile {
return try await fetchViaFile(request)
}
return try await fetchViaMemory(request)
}
private static func fetchViaMemory(_ request: URLRequest) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
let dataCh = AsyncThrowingChannel<Data, Error>()
func fetch(_ request: URLRequest) async throws -> (AsyncThrowingChannel<Data, Error>, URLResponse) {
let task = URLSession.shared.dataTask(with: request)
task.delegate = self
task.resume()
let (data, response) = try await URLSession.shared.data(for: request)
// Wait for the response and only then return
var iter = responseCh.makeAsyncIterator()
let response = try await iter.next()!
Task {
await dataCh.send(data)
return (dataCh, response)
dataCh.finish()
}
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse) async -> URLSession.ResponseDisposition {
await responseCh.send(response)
return (dataCh, response as! HTTPURLResponse)
}
return .allow
private static func fetchViaFile(_ request: URLRequest) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
let dataCh = AsyncThrowingChannel<Data, Error>()
let (fileURL, response) = try await URLSession.shared.download(for: request)
// Acquire a handle to the downloaded file and then remove it.
//
// This keeps a working reference to that file, yet we don't
// have to deal with the cleanup any more.
let fh = try FileHandle(forReadingFrom: fileURL)
try FileManager.default.removeItem(at: fileURL)
Task {
while let data = try fh.read(upToCount: 64 * 1024 * 1024) {
await dataCh.send(data)
}
dataCh.finish()
}
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
let sema = DispatchSemaphore(value: 0)
Task {
await dataCh.send(data)
sema.signal()
}
sema.wait()
}
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
if let error = error {
// Premature termination
responseCh.fail(error)
dataCh.fail(error)
} else {
dataCh.finish()
}
}
return (dataCh, response as! HTTPURLResponse)
}
}

View File

@ -226,7 +226,7 @@ class Registry {
}
public func pullBlob(_ digest: String, handler: (Data) throws -> Void) async throws {
let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"))
let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"), viaFile: true)
if response.statusCode != HTTPCode.Ok.rawValue {
let body = try await channel.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling blob", code: response.statusCode,
@ -266,7 +266,8 @@ class Registry {
headers: Dictionary<String, String> = Dictionary(),
parameters: Dictionary<String, String> = Dictionary(),
body: Data? = nil,
doAuth: Bool = true
doAuth: Bool = true,
viaFile: Bool = false
) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
var urlComponents = urlComponents
@ -292,12 +293,12 @@ class Registry {
currentAuthToken = nil
}
var (channel, response) = try await authAwareRequest(request: request)
var (channel, response) = try await authAwareRequest(request: request, viaFile: viaFile)
if doAuth && response.statusCode == HTTPCode.Unauthorized.rawValue {
_ = try await channel.asData()
try await auth(response: response)
(channel, response) = try await authAwareRequest(request: request)
(channel, response) = try await authAwareRequest(request: request, viaFile: viaFile)
}
return (channel, response)
@ -372,7 +373,7 @@ class Registry {
return nil
}
private func authAwareRequest(request: URLRequest) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
private func authAwareRequest(request: URLRequest, viaFile: Bool = false) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
var request = request
if let token = currentAuthToken {
@ -380,8 +381,6 @@ class Registry {
request.addValue(value, forHTTPHeaderField: name)
}
let (channel, response) = try await Fetcher().fetch(request)
return (channel, response as! HTTPURLResponse)
return try await Fetcher.fetch(request, viaFile: viaFile)
}
}

View File

@ -63,9 +63,9 @@ class VM: NSObject, VZVirtualMachineDelegate, ObservableObject {
static func retrieveIPSW(remoteURL: URL) async throws -> URL {
// Check if we already have this IPSW in cache
let (channel, response) = try await Fetcher().fetch(URLRequest(url: remoteURL))
let (channel, response) = try await Fetcher.fetch(URLRequest(url: remoteURL))
if let hash = (response as! HTTPURLResponse).value(forHTTPHeaderField: "x-amz-meta-digest-sha256") {
if let hash = response.value(forHTTPHeaderField: "x-amz-meta-digest-sha256") {
let ipswLocation = try IPSWCache().locationFor(fileName: "sha256:\(hash).ipsw")
if FileManager.default.fileExists(atPath: ipswLocation.path) {