From 166e3e570f87c3850407cbb41ff30efb6b5eee18 Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Wed, 5 Oct 2022 00:39:36 +0400 Subject: [PATCH] Ditch AsyncHTTPClient in favor of URLSession (#260) --- Package.resolved | 70 ++---------- Package.swift | 4 +- Sources/tart/OCI/Registry.swift | 164 +++++++++++++++------------- Sources/tart/VM.swift | 2 +- Sources/tart/VMDirectory+OCI.swift | 11 +- Tests/TartTests/RegistryTests.swift | 8 +- 6 files changed, 107 insertions(+), 152 deletions(-) diff --git a/Package.resolved b/Package.resolved index caf8510..0a143ee 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,14 +1,5 @@ { "pins" : [ - { - "identity" : "async-http-client", - "kind" : "remoteSourceControl", - "location" : "https://github.com/swift-server/async-http-client", - "state" : { - "revision" : "df87a860fdc41a595d5ca67f74cde9adbccc099a", - "version" : "1.11.4" - } - }, { "identity" : "dynamic", "kind" : "remoteSourceControl", @@ -37,12 +28,12 @@ } }, { - "identity" : "swift-atomics", + "identity" : "swift-async-algorithms", "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-atomics.git", + "location" : "https://github.com/apple/swift-async-algorithms", "state" : { - "revision" : "919eb1d83e02121cdb434c7bfc1f0c66ef17febe", - "version" : "1.0.2" + "branch" : "main", + "revision" : "f05e450f0b909c0e80670a47516c4b9700b9e5da" } }, { @@ -55,57 +46,12 @@ } }, { - "identity" : "swift-log", + "identity" : "swift-collections", "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-log.git", + "location" : "https://github.com/apple/swift-collections.git", "state" : { - "revision" : "5d66f7ba25daf4f94100e7022febf3c75e37a6c7", - "version" : "1.4.2" - } - }, - { - "identity" : "swift-nio", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio.git", - "state" : { - "revision" : "124119f0bb12384cef35aa041d7c3a686108722d", - "version" : "2.40.0" - } - }, - { - "identity" : "swift-nio-extras", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio-extras.git", - "state" : { - "revision" : "8eea84ec6144167354387ef9244b0939f5852dc8", - "version" : "1.11.0" - } - }, - { - "identity" : "swift-nio-http2", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio-http2.git", - "state" : { - "revision" : "108ac15087ea9b79abb6f6742699cf31de0e8772", - "version" : "1.22.0" - } - }, - { - "identity" : "swift-nio-ssl", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio-ssl.git", - "state" : { - "revision" : "1750873bce84b4129b5303655cce2c3d35b9ed3a", - "version" : "2.19.0" - } - }, - { - "identity" : "swift-nio-transport-services", - "kind" : "remoteSourceControl", - "location" : "https://github.com/apple/swift-nio-transport-services.git", - "state" : { - "revision" : "1a4692acb88156e3da1b0c6732a8a38b2a744166", - "version" : "1.12.0" + "revision" : "f504716c27d2e5d4144fa4794b12129301d17729", + "version" : "1.0.3" } }, { diff --git a/Package.swift b/Package.swift index e719352..315487b 100644 --- a/Package.swift +++ b/Package.swift @@ -13,15 +13,15 @@ let package = Package( .package(url: "https://github.com/apple/swift-argument-parser", from: "1.1.2"), .package(url: "https://github.com/mhdhejazi/Dynamic", branch: "master"), .package(url: "https://github.com/pointfreeco/swift-parsing", from: "0.9.2"), - .package(url: "https://github.com/swift-server/async-http-client", from: "1.11.4"), .package(url: "https://github.com/apple/swift-algorithms", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-async-algorithms", branch: "main"), .package(url: "https://github.com/malcommac/SwiftDate", from: "6.3.1") ], targets: [ .executableTarget(name: "tart", dependencies: [ .product(name: "Algorithms", package: "swift-algorithms"), + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), .product(name: "ArgumentParser", package: "swift-argument-parser"), - .product(name: "AsyncHTTPClient", package: "async-http-client"), .product(name: "Dynamic", package: "Dynamic"), .product(name: "Parsing", package: "swift-parsing"), .product(name: "SwiftDate", package: "SwiftDate"), diff --git a/Sources/tart/OCI/Registry.swift b/Sources/tart/OCI/Registry.swift index ca359b4..a05c052 100644 --- a/Sources/tart/OCI/Registry.swift +++ b/Sources/tart/OCI/Registry.swift @@ -1,28 +1,44 @@ import Foundation -import NIOCore -import NIOHTTP1 -import AsyncHTTPClient import Algorithms -import NIOPosix +import AsyncAlgorithms + +let chunkSizeBytes = 1 * 1024 * 1024 enum RegistryError: Error { - case UnexpectedHTTPStatusCode(when: String, code: UInt, details: String = "") + case UnexpectedHTTPStatusCode(when: String, code: Int, details: String = "") case MissingLocationHeader case AuthFailed(why: String, details: String = "") case MalformedHeader(why: String) } -extension HTTPClientResponse.Body { - func readTextResponse() async throws -> String? { - let data = try await readResponse() - return String(decoding: data, as: UTF8.self) - } +enum HTTPMethod: String { + case GET = "GET" + case POST = "POST" + case PUT = "PUT" + case PATCH = "PATCH" +} - func readResponse() async throws -> Data { +enum HTTPCode: Int { + case Ok = 200 + case Created = 201 + case Accepted = 202 + case Unauthorized = 401 +} + +extension Data { + func asText() async throws -> String? { + String(decoding: self, as: UTF8.self) + } +} + +extension URLSession.AsyncBytes { + func asData() async throws -> Data { var result = Data() - for try await part in self { - result.append(Data(buffer: part)) + + for try await chunk in chunks(ofCount: chunkSizeBytes) { + result += chunk } + return result } } @@ -78,14 +94,6 @@ struct TokenResponse: Decodable, Authentication { } class Registry { - private let httpClient = HTTPClient( - eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1)) - ) - - deinit { - try! httpClient.syncShutdown() - } - let baseURL: URL let namespace: String let credentialsProviders: [CredentialsProvider] @@ -114,43 +122,43 @@ class Registry { } func ping() async throws { - let response = try await endpointRequest(.GET, "/v2/") - if response.status != .ok { - throw RegistryError.UnexpectedHTTPStatusCode(when: "doing ping", code: response.status.code) + let (_, response) = try await endpointRequest(.GET, "/v2/") + if response.statusCode != HTTPCode.Ok.rawValue { + throw RegistryError.UnexpectedHTTPStatusCode(when: "doing ping", code: response.statusCode) } } func pushManifest(reference: String, manifest: OCIManifest) async throws -> String { let manifestJSON = try manifest.toJSON() - let response = try await endpointRequest(.PUT, "\(namespace)/manifests/\(reference)", + let (bytes, response) = try await endpointRequest(.PUT, "\(namespace)/manifests/\(reference)", headers: ["Content-Type": manifest.mediaType], body: manifestJSON) - if response.status != .created { - throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing manifest", code: response.status.code, - details: try await response.body.readTextResponse() ?? "") + if response.statusCode != HTTPCode.Created.rawValue { + throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing manifest", code: response.statusCode, + details: try await bytes.asData().asText() ?? "") } return Digest.hash(manifestJSON) } public func pullManifest(reference: String) async throws -> (OCIManifest, Data) { - let response = try await endpointRequest(.GET, "\(namespace)/manifests/\(reference)", + let (bytes, response) = try await endpointRequest(.GET, "\(namespace)/manifests/\(reference)", headers: ["Accept": ociManifestMediaType]) - if response.status != .ok { - let body = try await response.body.readTextResponse() - throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling manifest", code: response.status.code, + if response.statusCode != HTTPCode.Ok.rawValue { + let body = try await bytes.asData().asText() + throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling manifest", code: response.statusCode, details: body ?? "") } - let manifestData = try await response.body.readResponse() + let manifestData = try await bytes.asData() let manifest = try OCIManifest(fromJSON: manifestData) return (manifest, manifestData) } - private func uploadLocationFromResponse(_ response: HTTPClientResponse) throws -> URLComponents { - guard let uploadLocationRaw = response.headers.first(name: "Location") else { + private func uploadLocationFromResponse(_ response: HTTPURLResponse) throws -> URLComponents { + guard let uploadLocationRaw = response.value(forHTTPHeaderField: "Location") else { throw RegistryError.MissingLocationHeader } @@ -163,11 +171,11 @@ class Registry { public func pushBlob(fromData: Data, chunkSizeMb: Int = 0) async throws -> String { // Initiate a blob upload - let postResponse = try await endpointRequest(.POST, "\(namespace)/blobs/uploads/", + let (bytes, postResponse) = try await endpointRequest(.POST, "\(namespace)/blobs/uploads/", headers: ["Content-Length": "0"]) - if postResponse.status != .accepted { - let body = try await postResponse.body.readTextResponse() - throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing blob (POST)", code: postResponse.status.code, + if postResponse.statusCode != HTTPCode.Accepted.rawValue { + let body = try await bytes.asData().asText() + throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing blob (POST)", code: postResponse.statusCode, details: body ?? "") } @@ -178,7 +186,7 @@ class Registry { if chunkSizeMb == 0 { // monolithic upload - let response = try await rawRequest( + let (bytes, response) = try await rawRequest( .PUT, uploadLocation, headers: [ @@ -187,10 +195,10 @@ class Registry { parameters: ["digest": digest], body: fromData ) - if response.status != .created { - let body = try await response.body.readTextResponse() + if response.statusCode != HTTPCode.Created.rawValue { + let body = try await bytes.asData().asText() throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing blob (PUT) to \(uploadLocation)", - code: response.status.code, details: body ?? "") + code: response.statusCode, details: body ?? "") } return digest } @@ -200,7 +208,7 @@ class Registry { let chunks = fromData.chunks(ofCount: chunkSizeMb == 0 ? fromData.count : chunkSizeMb * 1_000_000) for (index, chunk) in chunks.enumerated() { let lastChunk = index == (chunks.count - 1) - let response = try await rawRequest( + let (bytes, response) = try await rawRequest( lastChunk ? .PUT : .PATCH, uploadLocation, headers: [ @@ -210,11 +218,11 @@ class Registry { parameters: lastChunk ? ["digest": digest] : [:], body: chunk ) - let expectedStatus: HTTPResponseStatus = lastChunk ? .created : .accepted - if response.status != expectedStatus { - let body = try await response.body.readTextResponse() + let expectedStatus = lastChunk ? HTTPCode.Created.rawValue : HTTPCode.Accepted.rawValue + if response.statusCode != expectedStatus { + let body = try await bytes.asData().asText() throw RegistryError.UnexpectedHTTPStatusCode(when: "streaming blob to \(uploadLocation)", - code: response.status.code, details: body ?? "") + code: response.statusCode, details: body ?? "") } uploadedBytes += chunk.count // Update location for the next chunk @@ -224,18 +232,18 @@ class Registry { return digest } - public func pullBlob(_ digest: String, handler: (ByteBuffer) throws -> Void) async throws { - let response = try await endpointRequest(.GET, "\(namespace)/blobs/\(digest)") - if response.status != .ok { - let body = try await response.body.readTextResponse() - throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling blob", code: response.status.code, + public func pullBlob(_ digest: String, handler: (Data) throws -> Void) async throws { + let (bytes, response) = try await endpointRequest(.GET, "\(namespace)/blobs/\(digest)") + if response.statusCode != HTTPCode.Ok.rawValue { + let body = try await bytes.asData().asText() + throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling blob", code: response.statusCode, details: body ?? "") } - for try await part in response.body { + for try await part in bytes.chunks(ofCount: chunkSizeBytes) { try Task.checkCancellation() - try handler(part) + try handler(Data(part)) } } @@ -245,7 +253,7 @@ class Registry { headers: Dictionary = Dictionary(), parameters: Dictionary = Dictionary(), body: Data? = nil - ) async throws -> HTTPClientResponse { + ) async throws -> (URLSession.AsyncBytes, HTTPURLResponse) { let url = URL(string: endpoint, relativeTo: baseURL)! let urlComponents = URLComponents(url: url, resolvingAgainstBaseURL: true)! @@ -259,7 +267,7 @@ class Registry { parameters: Dictionary = Dictionary(), body: Data? = nil, doAuth: Bool = true - ) async throws -> HTTPClientResponse { + ) async throws -> (URLSession.AsyncBytes, HTTPURLResponse) { var urlComponents = urlComponents if urlComponents.queryItems == nil && !parameters.isEmpty { @@ -269,14 +277,14 @@ class Registry { URLQueryItem(name: key, value: value) }) - var request = HTTPClientRequest(url: urlComponents.string!) - request.method = method + var request = URLRequest(url: urlComponents.url!) + request.httpMethod = method.rawValue for (key, value) in headers { - request.headers.add(name: key, value: value) + request.addValue(value, forHTTPHeaderField: key) } - if body != nil { - request.headers.add(name: "Content-Length", value: "\(body!.count)") - request.body = HTTPClientRequest.Body.bytes(body!) + if let body = body { + request.addValue("\(body.count)", forHTTPHeaderField: "Content-Length") + request.httpBody = body } // Invalidate token if it has expired @@ -284,19 +292,19 @@ class Registry { currentAuthToken = nil } - var response = try await authAwareRequest(request: request) + var (bytes, response) = try await authAwareRequest(request: request) - if doAuth && response.status == .unauthorized { + if doAuth && response.statusCode == HTTPCode.Unauthorized.rawValue { try await auth(response: response) - response = try await authAwareRequest(request: request) + (bytes, response) = try await authAwareRequest(request: request) } - return response + return (bytes, response) } - private func auth(response: HTTPClientResponse) async throws { + private func auth(response: HTTPURLResponse) async throws { // Process WWW-Authenticate header - guard let wwwAuthenticateRaw = response.headers.first(name: "WWW-Authenticate") else { + guard let wwwAuthenticateRaw = response.value(forHTTPHeaderField: "WWW-Authenticate") else { throw RegistryError.AuthFailed(why: "got HTTP 401, but WWW-Authenticate header is missing") } @@ -345,14 +353,14 @@ class Registry { headers["Authorization"] = "Basic \(encodedCredentials!)" } - let response = try await rawRequest(.GET, authenticateURL, headers: headers, doAuth: false) - if response.status != .ok { - let body = try await response.body.readTextResponse() ?? "" - throw RegistryError.AuthFailed(why: "received unexpected HTTP status code \(response.status.code) " + let (bytes, response) = try await rawRequest(.GET, authenticateURL, headers: headers, doAuth: false) + if response.statusCode != HTTPCode.Ok.rawValue { + let body = try await bytes.asData() .asText() ?? "" + throw RegistryError.AuthFailed(why: "received unexpected HTTP status code \(response.statusCode) " + "while retrieving an authentication token", details: body) } - let bodyData = try await response.body.readResponse() + let bodyData = try await bytes.asData() currentAuthToken = try TokenResponse.parse(fromData: bodyData) } @@ -365,14 +373,16 @@ class Registry { return nil } - private func authAwareRequest(request: HTTPClientRequest) async throws -> HTTPClientResponse { + private func authAwareRequest(request: URLRequest) async throws -> (URLSession.AsyncBytes, HTTPURLResponse) { var request = request if let token = currentAuthToken { let (name, value) = token.header() - request.headers.add(name: name, value: value) + request.addValue(value, forHTTPHeaderField: name) } - return try await httpClient.execute(request, deadline: .distantFuture) + let (bytes, response) = try await URLSession.shared.bytes(for: request) + + return (bytes, response as! HTTPURLResponse) } } diff --git a/Sources/tart/VM.swift b/Sources/tart/VM.swift index 75b054c..1c8d6c3 100644 --- a/Sources/tart/VM.swift +++ b/Sources/tart/VM.swift @@ -179,7 +179,7 @@ class VM: NSObject, VZVirtualMachineDelegate, ObservableObject { // Run automated installation try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - DispatchQueue.main.async { + DispatchQueue.main.async { [ipswURL] in let installer = VZMacOSInstaller(virtualMachine: self.virtualMachine, restoringFromImageAt: ipswURL) defaultLogger.appendNewLine("Installing OS...") diff --git a/Sources/tart/VMDirectory+OCI.swift b/Sources/tart/VMDirectory+OCI.swift index e3a05ca..69042a3 100644 --- a/Sources/tart/VMDirectory+OCI.swift +++ b/Sources/tart/VMDirectory+OCI.swift @@ -35,8 +35,8 @@ extension VMDirectory { throw OCIError.FailedToCreateVmFile } let configFile = try FileHandle(forWritingTo: configURL) - try await registry.pullBlob(configLayers.first!.digest) { buffer in - configFile.write(Data(buffer: buffer)) + try await registry.pullBlob(configLayers.first!.digest) { data in + configFile.write(data) } try configFile.close() @@ -70,8 +70,7 @@ extension VMDirectory { ProgressObserver(progress).log(defaultLogger) for diskLayer in diskLayers { - try await registry.pullBlob(diskLayer.digest) { buffer in - let data = Data(buffer: buffer) + try await registry.pullBlob(diskLayer.digest) { data in try filter.write(data) progress.completedUnitCount += Int64(data.count) } @@ -92,8 +91,8 @@ extension VMDirectory { throw OCIError.FailedToCreateVmFile } let nvram = try FileHandle(forWritingTo: nvramURL) - try await registry.pullBlob(nvramLayers.first!.digest) { buffer in - nvram.write(Data(buffer: buffer)) + try await registry.pullBlob(nvramLayers.first!.digest) { data in + nvram.write(data) } try nvram.close() } diff --git a/Tests/TartTests/RegistryTests.swift b/Tests/TartTests/RegistryTests.swift index 14fd99c..864cf17 100644 --- a/Tests/TartTests/RegistryTests.swift +++ b/Tests/TartTests/RegistryTests.swift @@ -34,8 +34,8 @@ final class RegistryTests: XCTestCase { // Pull it var pulledBlob = Data() - try await registry.pullBlob(pushedBlobDigest) { buffer in - pulledBlob.append(Data(buffer: buffer)) + try await registry.pullBlob(pushedBlobDigest) { data in + pulledBlob.append(data) } // Ensure that both blobs are identical @@ -52,8 +52,8 @@ final class RegistryTests: XCTestCase { // Pull it var pulledLargeBlob = Data() - try await registry.pullBlob(largeBlobDigest) { buffer in - pulledLargeBlob.append(Data(buffer: buffer)) + try await registry.pullBlob(largeBlobDigest) { data in + pulledLargeBlob.append(data) } // Ensure that both blobs are identical