Ditch AsyncHTTPClient in favor of URLSession (#260)

This commit is contained in:
Nikolay Edigaryev 2022-10-05 00:39:36 +04:00 committed by GitHub
parent 7a2c20ba30
commit 166e3e570f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 107 additions and 152 deletions

View File

@ -1,14 +1,5 @@
{
"pins" : [
{
"identity" : "async-http-client",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swift-server/async-http-client",
"state" : {
"revision" : "df87a860fdc41a595d5ca67f74cde9adbccc099a",
"version" : "1.11.4"
}
},
{
"identity" : "dynamic",
"kind" : "remoteSourceControl",
@ -37,12 +28,12 @@
}
},
{
"identity" : "swift-atomics",
"identity" : "swift-async-algorithms",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-atomics.git",
"location" : "https://github.com/apple/swift-async-algorithms",
"state" : {
"revision" : "919eb1d83e02121cdb434c7bfc1f0c66ef17febe",
"version" : "1.0.2"
"branch" : "main",
"revision" : "f05e450f0b909c0e80670a47516c4b9700b9e5da"
}
},
{
@ -55,57 +46,12 @@
}
},
{
"identity" : "swift-log",
"identity" : "swift-collections",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-log.git",
"location" : "https://github.com/apple/swift-collections.git",
"state" : {
"revision" : "5d66f7ba25daf4f94100e7022febf3c75e37a6c7",
"version" : "1.4.2"
}
},
{
"identity" : "swift-nio",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio.git",
"state" : {
"revision" : "124119f0bb12384cef35aa041d7c3a686108722d",
"version" : "2.40.0"
}
},
{
"identity" : "swift-nio-extras",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-extras.git",
"state" : {
"revision" : "8eea84ec6144167354387ef9244b0939f5852dc8",
"version" : "1.11.0"
}
},
{
"identity" : "swift-nio-http2",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-http2.git",
"state" : {
"revision" : "108ac15087ea9b79abb6f6742699cf31de0e8772",
"version" : "1.22.0"
}
},
{
"identity" : "swift-nio-ssl",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-ssl.git",
"state" : {
"revision" : "1750873bce84b4129b5303655cce2c3d35b9ed3a",
"version" : "2.19.0"
}
},
{
"identity" : "swift-nio-transport-services",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-transport-services.git",
"state" : {
"revision" : "1a4692acb88156e3da1b0c6732a8a38b2a744166",
"version" : "1.12.0"
"revision" : "f504716c27d2e5d4144fa4794b12129301d17729",
"version" : "1.0.3"
}
},
{

View File

@ -13,15 +13,15 @@ let package = Package(
.package(url: "https://github.com/apple/swift-argument-parser", from: "1.1.2"),
.package(url: "https://github.com/mhdhejazi/Dynamic", branch: "master"),
.package(url: "https://github.com/pointfreeco/swift-parsing", from: "0.9.2"),
.package(url: "https://github.com/swift-server/async-http-client", from: "1.11.4"),
.package(url: "https://github.com/apple/swift-algorithms", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-async-algorithms", branch: "main"),
.package(url: "https://github.com/malcommac/SwiftDate", from: "6.3.1")
],
targets: [
.executableTarget(name: "tart", dependencies: [
.product(name: "Algorithms", package: "swift-algorithms"),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "ArgumentParser", package: "swift-argument-parser"),
.product(name: "AsyncHTTPClient", package: "async-http-client"),
.product(name: "Dynamic", package: "Dynamic"),
.product(name: "Parsing", package: "swift-parsing"),
.product(name: "SwiftDate", package: "SwiftDate"),

View File

@ -1,28 +1,44 @@
import Foundation
import NIOCore
import NIOHTTP1
import AsyncHTTPClient
import Algorithms
import NIOPosix
import AsyncAlgorithms
let chunkSizeBytes = 1 * 1024 * 1024
enum RegistryError: Error {
case UnexpectedHTTPStatusCode(when: String, code: UInt, details: String = "")
case UnexpectedHTTPStatusCode(when: String, code: Int, details: String = "")
case MissingLocationHeader
case AuthFailed(why: String, details: String = "")
case MalformedHeader(why: String)
}
extension HTTPClientResponse.Body {
func readTextResponse() async throws -> String? {
let data = try await readResponse()
return String(decoding: data, as: UTF8.self)
}
enum HTTPMethod: String {
case GET = "GET"
case POST = "POST"
case PUT = "PUT"
case PATCH = "PATCH"
}
func readResponse() async throws -> Data {
enum HTTPCode: Int {
case Ok = 200
case Created = 201
case Accepted = 202
case Unauthorized = 401
}
extension Data {
func asText() async throws -> String? {
String(decoding: self, as: UTF8.self)
}
}
extension URLSession.AsyncBytes {
func asData() async throws -> Data {
var result = Data()
for try await part in self {
result.append(Data(buffer: part))
for try await chunk in chunks(ofCount: chunkSizeBytes) {
result += chunk
}
return result
}
}
@ -78,14 +94,6 @@ struct TokenResponse: Decodable, Authentication {
}
class Registry {
private let httpClient = HTTPClient(
eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup(numberOfThreads: 1))
)
deinit {
try! httpClient.syncShutdown()
}
let baseURL: URL
let namespace: String
let credentialsProviders: [CredentialsProvider]
@ -114,43 +122,43 @@ class Registry {
}
func ping() async throws {
let response = try await endpointRequest(.GET, "/v2/")
if response.status != .ok {
throw RegistryError.UnexpectedHTTPStatusCode(when: "doing ping", code: response.status.code)
let (_, response) = try await endpointRequest(.GET, "/v2/")
if response.statusCode != HTTPCode.Ok.rawValue {
throw RegistryError.UnexpectedHTTPStatusCode(when: "doing ping", code: response.statusCode)
}
}
func pushManifest(reference: String, manifest: OCIManifest) async throws -> String {
let manifestJSON = try manifest.toJSON()
let response = try await endpointRequest(.PUT, "\(namespace)/manifests/\(reference)",
let (bytes, response) = try await endpointRequest(.PUT, "\(namespace)/manifests/\(reference)",
headers: ["Content-Type": manifest.mediaType],
body: manifestJSON)
if response.status != .created {
throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing manifest", code: response.status.code,
details: try await response.body.readTextResponse() ?? "")
if response.statusCode != HTTPCode.Created.rawValue {
throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing manifest", code: response.statusCode,
details: try await bytes.asData().asText() ?? "")
}
return Digest.hash(manifestJSON)
}
public func pullManifest(reference: String) async throws -> (OCIManifest, Data) {
let response = try await endpointRequest(.GET, "\(namespace)/manifests/\(reference)",
let (bytes, response) = try await endpointRequest(.GET, "\(namespace)/manifests/\(reference)",
headers: ["Accept": ociManifestMediaType])
if response.status != .ok {
let body = try await response.body.readTextResponse()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling manifest", code: response.status.code,
if response.statusCode != HTTPCode.Ok.rawValue {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling manifest", code: response.statusCode,
details: body ?? "")
}
let manifestData = try await response.body.readResponse()
let manifestData = try await bytes.asData()
let manifest = try OCIManifest(fromJSON: manifestData)
return (manifest, manifestData)
}
private func uploadLocationFromResponse(_ response: HTTPClientResponse) throws -> URLComponents {
guard let uploadLocationRaw = response.headers.first(name: "Location") else {
private func uploadLocationFromResponse(_ response: HTTPURLResponse) throws -> URLComponents {
guard let uploadLocationRaw = response.value(forHTTPHeaderField: "Location") else {
throw RegistryError.MissingLocationHeader
}
@ -163,11 +171,11 @@ class Registry {
public func pushBlob(fromData: Data, chunkSizeMb: Int = 0) async throws -> String {
// Initiate a blob upload
let postResponse = try await endpointRequest(.POST, "\(namespace)/blobs/uploads/",
let (bytes, postResponse) = try await endpointRequest(.POST, "\(namespace)/blobs/uploads/",
headers: ["Content-Length": "0"])
if postResponse.status != .accepted {
let body = try await postResponse.body.readTextResponse()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing blob (POST)", code: postResponse.status.code,
if postResponse.statusCode != HTTPCode.Accepted.rawValue {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing blob (POST)", code: postResponse.statusCode,
details: body ?? "")
}
@ -178,7 +186,7 @@ class Registry {
if chunkSizeMb == 0 {
// monolithic upload
let response = try await rawRequest(
let (bytes, response) = try await rawRequest(
.PUT,
uploadLocation,
headers: [
@ -187,10 +195,10 @@ class Registry {
parameters: ["digest": digest],
body: fromData
)
if response.status != .created {
let body = try await response.body.readTextResponse()
if response.statusCode != HTTPCode.Created.rawValue {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing blob (PUT) to \(uploadLocation)",
code: response.status.code, details: body ?? "")
code: response.statusCode, details: body ?? "")
}
return digest
}
@ -200,7 +208,7 @@ class Registry {
let chunks = fromData.chunks(ofCount: chunkSizeMb == 0 ? fromData.count : chunkSizeMb * 1_000_000)
for (index, chunk) in chunks.enumerated() {
let lastChunk = index == (chunks.count - 1)
let response = try await rawRequest(
let (bytes, response) = try await rawRequest(
lastChunk ? .PUT : .PATCH,
uploadLocation,
headers: [
@ -210,11 +218,11 @@ class Registry {
parameters: lastChunk ? ["digest": digest] : [:],
body: chunk
)
let expectedStatus: HTTPResponseStatus = lastChunk ? .created : .accepted
if response.status != expectedStatus {
let body = try await response.body.readTextResponse()
let expectedStatus = lastChunk ? HTTPCode.Created.rawValue : HTTPCode.Accepted.rawValue
if response.statusCode != expectedStatus {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "streaming blob to \(uploadLocation)",
code: response.status.code, details: body ?? "")
code: response.statusCode, details: body ?? "")
}
uploadedBytes += chunk.count
// Update location for the next chunk
@ -224,18 +232,18 @@ class Registry {
return digest
}
public func pullBlob(_ digest: String, handler: (ByteBuffer) throws -> Void) async throws {
let response = try await endpointRequest(.GET, "\(namespace)/blobs/\(digest)")
if response.status != .ok {
let body = try await response.body.readTextResponse()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling blob", code: response.status.code,
public func pullBlob(_ digest: String, handler: (Data) throws -> Void) async throws {
let (bytes, response) = try await endpointRequest(.GET, "\(namespace)/blobs/\(digest)")
if response.statusCode != HTTPCode.Ok.rawValue {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling blob", code: response.statusCode,
details: body ?? "")
}
for try await part in response.body {
for try await part in bytes.chunks(ofCount: chunkSizeBytes) {
try Task.checkCancellation()
try handler(part)
try handler(Data(part))
}
}
@ -245,7 +253,7 @@ class Registry {
headers: Dictionary<String, String> = Dictionary(),
parameters: Dictionary<String, String> = Dictionary(),
body: Data? = nil
) async throws -> HTTPClientResponse {
) async throws -> (URLSession.AsyncBytes, HTTPURLResponse) {
let url = URL(string: endpoint, relativeTo: baseURL)!
let urlComponents = URLComponents(url: url, resolvingAgainstBaseURL: true)!
@ -259,7 +267,7 @@ class Registry {
parameters: Dictionary<String, String> = Dictionary(),
body: Data? = nil,
doAuth: Bool = true
) async throws -> HTTPClientResponse {
) async throws -> (URLSession.AsyncBytes, HTTPURLResponse) {
var urlComponents = urlComponents
if urlComponents.queryItems == nil && !parameters.isEmpty {
@ -269,14 +277,14 @@ class Registry {
URLQueryItem(name: key, value: value)
})
var request = HTTPClientRequest(url: urlComponents.string!)
request.method = method
var request = URLRequest(url: urlComponents.url!)
request.httpMethod = method.rawValue
for (key, value) in headers {
request.headers.add(name: key, value: value)
request.addValue(value, forHTTPHeaderField: key)
}
if body != nil {
request.headers.add(name: "Content-Length", value: "\(body!.count)")
request.body = HTTPClientRequest.Body.bytes(body!)
if let body = body {
request.addValue("\(body.count)", forHTTPHeaderField: "Content-Length")
request.httpBody = body
}
// Invalidate token if it has expired
@ -284,19 +292,19 @@ class Registry {
currentAuthToken = nil
}
var response = try await authAwareRequest(request: request)
var (bytes, response) = try await authAwareRequest(request: request)
if doAuth && response.status == .unauthorized {
if doAuth && response.statusCode == HTTPCode.Unauthorized.rawValue {
try await auth(response: response)
response = try await authAwareRequest(request: request)
(bytes, response) = try await authAwareRequest(request: request)
}
return response
return (bytes, response)
}
private func auth(response: HTTPClientResponse) async throws {
private func auth(response: HTTPURLResponse) async throws {
// Process WWW-Authenticate header
guard let wwwAuthenticateRaw = response.headers.first(name: "WWW-Authenticate") else {
guard let wwwAuthenticateRaw = response.value(forHTTPHeaderField: "WWW-Authenticate") else {
throw RegistryError.AuthFailed(why: "got HTTP 401, but WWW-Authenticate header is missing")
}
@ -345,14 +353,14 @@ class Registry {
headers["Authorization"] = "Basic \(encodedCredentials!)"
}
let response = try await rawRequest(.GET, authenticateURL, headers: headers, doAuth: false)
if response.status != .ok {
let body = try await response.body.readTextResponse() ?? ""
throw RegistryError.AuthFailed(why: "received unexpected HTTP status code \(response.status.code) "
let (bytes, response) = try await rawRequest(.GET, authenticateURL, headers: headers, doAuth: false)
if response.statusCode != HTTPCode.Ok.rawValue {
let body = try await bytes.asData() .asText() ?? ""
throw RegistryError.AuthFailed(why: "received unexpected HTTP status code \(response.statusCode) "
+ "while retrieving an authentication token", details: body)
}
let bodyData = try await response.body.readResponse()
let bodyData = try await bytes.asData()
currentAuthToken = try TokenResponse.parse(fromData: bodyData)
}
@ -365,14 +373,16 @@ class Registry {
return nil
}
private func authAwareRequest(request: HTTPClientRequest) async throws -> HTTPClientResponse {
private func authAwareRequest(request: URLRequest) async throws -> (URLSession.AsyncBytes, HTTPURLResponse) {
var request = request
if let token = currentAuthToken {
let (name, value) = token.header()
request.headers.add(name: name, value: value)
request.addValue(value, forHTTPHeaderField: name)
}
return try await httpClient.execute(request, deadline: .distantFuture)
let (bytes, response) = try await URLSession.shared.bytes(for: request)
return (bytes, response as! HTTPURLResponse)
}
}

View File

@ -179,7 +179,7 @@ class VM: NSObject, VZVirtualMachineDelegate, ObservableObject {
// Run automated installation
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
DispatchQueue.main.async {
DispatchQueue.main.async { [ipswURL] in
let installer = VZMacOSInstaller(virtualMachine: self.virtualMachine, restoringFromImageAt: ipswURL)
defaultLogger.appendNewLine("Installing OS...")

View File

@ -35,8 +35,8 @@ extension VMDirectory {
throw OCIError.FailedToCreateVmFile
}
let configFile = try FileHandle(forWritingTo: configURL)
try await registry.pullBlob(configLayers.first!.digest) { buffer in
configFile.write(Data(buffer: buffer))
try await registry.pullBlob(configLayers.first!.digest) { data in
configFile.write(data)
}
try configFile.close()
@ -70,8 +70,7 @@ extension VMDirectory {
ProgressObserver(progress).log(defaultLogger)
for diskLayer in diskLayers {
try await registry.pullBlob(diskLayer.digest) { buffer in
let data = Data(buffer: buffer)
try await registry.pullBlob(diskLayer.digest) { data in
try filter.write(data)
progress.completedUnitCount += Int64(data.count)
}
@ -92,8 +91,8 @@ extension VMDirectory {
throw OCIError.FailedToCreateVmFile
}
let nvram = try FileHandle(forWritingTo: nvramURL)
try await registry.pullBlob(nvramLayers.first!.digest) { buffer in
nvram.write(Data(buffer: buffer))
try await registry.pullBlob(nvramLayers.first!.digest) { data in
nvram.write(data)
}
try nvram.close()
}

View File

@ -34,8 +34,8 @@ final class RegistryTests: XCTestCase {
// Pull it
var pulledBlob = Data()
try await registry.pullBlob(pushedBlobDigest) { buffer in
pulledBlob.append(Data(buffer: buffer))
try await registry.pullBlob(pushedBlobDigest) { data in
pulledBlob.append(data)
}
// Ensure that both blobs are identical
@ -52,8 +52,8 @@ final class RegistryTests: XCTestCase {
// Pull it
var pulledLargeBlob = Data()
try await registry.pullBlob(largeBlobDigest) { buffer in
pulledLargeBlob.append(Data(buffer: buffer))
try await registry.pullBlob(largeBlobDigest) { data in
pulledLargeBlob.append(data)
}
// Ensure that both blobs are identical