Plug URLSession.bytes() memory leak (#267)

This commit is contained in:
Nikolay Edigaryev 2022-10-10 18:38:57 +04:00 committed by GitHub
parent 89301d114e
commit 62ee42de3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 36 additions and 36 deletions

View File

@ -26,7 +26,7 @@ enum HTTPCode: Int {
}
extension Data {
func asText() async throws -> String? {
func asText() -> String {
String(decoding: self, as: UTF8.self)
}
}
@ -122,7 +122,7 @@ class Registry {
}
func ping() async throws {
let (_, response) = try await endpointRequest(.GET, "/v2/")
let (_, response) = try await dataRequest(.GET, endpointURL("/v2/"))
if response.statusCode != HTTPCode.Ok.rawValue {
throw RegistryError.UnexpectedHTTPStatusCode(when: "doing ping", code: response.statusCode)
}
@ -131,30 +131,28 @@ class Registry {
func pushManifest(reference: String, manifest: OCIManifest) async throws -> String {
let manifestJSON = try manifest.toJSON()
let (bytes, response) = try await endpointRequest(.PUT, "\(namespace)/manifests/\(reference)",
let (data, response) = try await dataRequest(.PUT, endpointURL("\(namespace)/manifests/\(reference)"),
headers: ["Content-Type": manifest.mediaType],
body: manifestJSON)
if response.statusCode != HTTPCode.Created.rawValue {
throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing manifest", code: response.statusCode,
details: try await bytes.asData().asText() ?? "")
details: data.asText())
}
return Digest.hash(manifestJSON)
}
public func pullManifest(reference: String) async throws -> (OCIManifest, Data) {
let (bytes, response) = try await endpointRequest(.GET, "\(namespace)/manifests/\(reference)",
let (data, response) = try await dataRequest(.GET, endpointURL("\(namespace)/manifests/\(reference)"),
headers: ["Accept": ociManifestMediaType])
if response.statusCode != HTTPCode.Ok.rawValue {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling manifest", code: response.statusCode,
details: body ?? "")
details: data.asText())
}
let manifestData = try await bytes.asData()
let manifest = try OCIManifest(fromJSON: manifestData)
let manifest = try OCIManifest(fromJSON: data)
return (manifest, manifestData)
return (manifest, data)
}
private func uploadLocationFromResponse(_ response: HTTPURLResponse) throws -> URLComponents {
@ -171,12 +169,11 @@ class Registry {
public func pushBlob(fromData: Data, chunkSizeMb: Int = 0) async throws -> String {
// Initiate a blob upload
let (bytes, postResponse) = try await endpointRequest(.POST, "\(namespace)/blobs/uploads/",
let (data, postResponse) = try await dataRequest(.POST, endpointURL("\(namespace)/blobs/uploads/"),
headers: ["Content-Length": "0"])
if postResponse.statusCode != HTTPCode.Accepted.rawValue {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing blob (POST)", code: postResponse.statusCode,
details: body ?? "")
details: data.asText())
}
// Figure out where to upload the blob
@ -186,7 +183,7 @@ class Registry {
if chunkSizeMb == 0 {
// monolithic upload
let (bytes, response) = try await rawRequest(
let (data, response) = try await dataRequest(
.PUT,
uploadLocation,
headers: [
@ -196,9 +193,8 @@ class Registry {
body: fromData
)
if response.statusCode != HTTPCode.Created.rawValue {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pushing blob (PUT) to \(uploadLocation)",
code: response.statusCode, details: body ?? "")
code: response.statusCode, details: data.asText())
}
return digest
}
@ -208,7 +204,7 @@ class Registry {
let chunks = fromData.chunks(ofCount: chunkSizeMb == 0 ? fromData.count : chunkSizeMb * 1_000_000)
for (index, chunk) in chunks.enumerated() {
let lastChunk = index == (chunks.count - 1)
let (bytes, response) = try await rawRequest(
let (data, response) = try await dataRequest(
lastChunk ? .PUT : .PATCH,
uploadLocation,
headers: [
@ -220,9 +216,8 @@ class Registry {
)
let expectedStatus = lastChunk ? HTTPCode.Created.rawValue : HTTPCode.Accepted.rawValue
if response.statusCode != expectedStatus {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "streaming blob to \(uploadLocation)",
code: response.statusCode, details: body ?? "")
code: response.statusCode, details: data.asText())
}
uploadedBytes += chunk.count
// Update location for the next chunk
@ -233,11 +228,11 @@ class Registry {
}
public func pullBlob(_ digest: String, handler: (Data) throws -> Void) async throws {
let (bytes, response) = try await endpointRequest(.GET, "\(namespace)/blobs/\(digest)")
let (bytes, response) = try await bytesRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"))
if response.statusCode != HTTPCode.Ok.rawValue {
let body = try await bytes.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling blob", code: response.statusCode,
details: body ?? "")
details: body)
}
for try await part in bytes.chunks(ofCount: chunkSizeBytes) {
@ -247,20 +242,27 @@ class Registry {
}
}
private func endpointRequest(
_ method: HTTPMethod,
_ endpoint: String,
headers: Dictionary<String, String> = Dictionary(),
parameters: Dictionary<String, String> = Dictionary(),
body: Data? = nil
) async throws -> (URLSession.AsyncBytes, HTTPURLResponse) {
private func endpointURL(_ endpoint: String) -> URLComponents {
let url = URL(string: endpoint, relativeTo: baseURL)!
let urlComponents = URLComponents(url: url, resolvingAgainstBaseURL: true)!
return try await rawRequest(method, urlComponents, headers: headers, parameters: parameters, body: body)
return URLComponents(url: url, resolvingAgainstBaseURL: true)!
}
private func rawRequest(
private func dataRequest(
_ method: HTTPMethod,
_ urlComponents: URLComponents,
headers: Dictionary<String, String> = Dictionary(),
parameters: Dictionary<String, String> = Dictionary(),
body: Data? = nil,
doAuth: Bool = true
) async throws -> (Data, HTTPURLResponse) {
let (bytes, response) = try await bytesRequest(method, urlComponents,
headers: headers, parameters: parameters, body: body, doAuth: doAuth)
return (try await bytes.asData(), response)
}
private func bytesRequest(
_ method: HTTPMethod,
_ urlComponents: URLComponents,
headers: Dictionary<String, String> = Dictionary(),
@ -353,15 +355,13 @@ class Registry {
headers["Authorization"] = "Basic \(encodedCredentials!)"
}
let (bytes, response) = try await rawRequest(.GET, authenticateURL, headers: headers, doAuth: false)
let (data, response) = try await dataRequest(.GET, authenticateURL, headers: headers, doAuth: false)
if response.statusCode != HTTPCode.Ok.rawValue {
let body = try await bytes.asData() .asText() ?? ""
throw RegistryError.AuthFailed(why: "received unexpected HTTP status code \(response.statusCode) "
+ "while retrieving an authentication token", details: body)
+ "while retrieving an authentication token", details: data.asText())
}
let bodyData = try await bytes.asData()
currentAuthToken = try TokenResponse.parse(fromData: bodyData)
currentAuthToken = try TokenResponse.parse(fromData: data)
}
private func lookupCredentials(host: String) throws -> (String, String)? {