tart/Sources/tart/VMStorageOCI.swift

359 lines
12 KiB
Swift

import Foundation
import OpenTelemetryApi
import Retry
class VMStorageOCI: PrunableStorage {
let baseURL: URL
init() throws {
baseURL = try Config().tartCacheDir.appendingPathComponent("OCIs", isDirectory: true)
}
private func vmURL(_ name: RemoteName) -> URL {
baseURL.appendingRemoteName(name)
}
private func hostDirectoryURL(_ name: RemoteName) -> URL {
baseURL.appendingHost(name)
}
func exists(_ name: RemoteName) -> Bool {
VMDirectory(baseURL: vmURL(name)).initialized
}
func digest(_ name: RemoteName) throws -> String {
let digest = vmURL(name).resolvingSymlinksInPath().lastPathComponent
if !digest.starts(with: "sha256:") {
throw RuntimeError.OCIStorageError("\(name) is not a digest and doesn't point to a digest")
}
return digest
}
func open(_ name: RemoteName, _ accessDate: Date = Date()) throws -> VMDirectory {
let vmDir = VMDirectory(baseURL: vmURL(name))
try vmDir.validate(userFriendlyName: name.description)
try vmDir.baseURL.updateAccessDate(accessDate)
return vmDir
}
func create(_ name: RemoteName, overwrite: Bool = false) throws -> VMDirectory {
let vmDir = VMDirectory(baseURL: vmURL(name))
try vmDir.initialize(overwrite: overwrite)
return vmDir
}
func move(_ name: RemoteName, from: VMDirectory) throws{
let targetURL = vmURL(name)
// Pre-create intermediate directories (e.g. creates ~/.tart/cache/OCIs/github.com/org/repo/
// for github.com/org/repo:latest)
try FileManager.default.createDirectory(at: targetURL.deletingLastPathComponent(),
withIntermediateDirectories: true)
_ = try FileManager.default.replaceItemAt(targetURL, withItemAt: from.baseURL)
}
func delete(_ name: RemoteName) throws {
try FileManager.default.removeItem(at: vmURL(name))
try gc()
}
func gc() throws {
var refCounts = Dictionary<URL, UInt>()
guard let enumerator = FileManager.default.enumerator(at: baseURL,
includingPropertiesForKeys: [.isSymbolicLinkKey]) else {
return
}
for case let foundURL as URL in enumerator {
let isSymlink = try foundURL.resourceValues(forKeys: [.isSymbolicLinkKey]).isSymbolicLink!
// Perform garbage collection for tag-based images
// with broken outgoing references
if isSymlink && foundURL == foundURL.resolvingSymlinksInPath() {
try FileManager.default.removeItem(at: foundURL)
continue
}
let vmDir = VMDirectory(baseURL: foundURL.resolvingSymlinksInPath())
if !vmDir.initialized {
continue
}
refCounts[vmDir.baseURL] = (refCounts[vmDir.baseURL] ?? 0) + (isSymlink ? 1 : 0)
}
// Perform garbage collection for digest-based images
// with no incoming references
for (baseURL, incRefCount) in refCounts {
let vmDir = VMDirectory(baseURL: baseURL)
if !vmDir.isExplicitlyPulled() && incRefCount == 0 {
try FileManager.default.removeItem(at: baseURL)
}
}
}
func list() throws -> [(String, VMDirectory, Bool)] {
var result: [(String, VMDirectory, Bool)] = Array()
guard let enumerator = FileManager.default.enumerator(at: baseURL,
includingPropertiesForKeys: [.isSymbolicLinkKey], options: [.producesRelativePathURLs]) else {
return []
}
for case let foundURL as URL in enumerator {
let vmDir = VMDirectory(baseURL: foundURL)
if !vmDir.initialized {
continue
}
// Split the relative VM's path at the last component
// and figure out which character should be used
// to join them together, either ":" for tags or
// "@" for hashes
let parts = [foundURL.deletingLastPathComponent().relativePath, foundURL.lastPathComponent]
var name: String
let isSymlink = try foundURL.resourceValues(forKeys: [.isSymbolicLinkKey]).isSymbolicLink!
if isSymlink {
name = parts.joined(separator: ":")
} else {
name = parts.joined(separator: "@")
}
// Remove the percent-encoding, if any
name = percentDecode(name)
result.append((name, vmDir, isSymlink))
}
return result
}
func prunables() throws -> [Prunable] {
try list().filter { (_, _, isSymlink) in !isSymlink }.map { (_, vmDir, _) in vmDir }
}
func pull(_ name: RemoteName, registry: Registry, concurrency: UInt, deduplicate: Bool) async throws {
OpenTelemetry.instance.contextProvider.activeSpan?.setAttribute(
key: "oci.image-name",
value: .string(name.description)
)
defaultLogger.appendNewLine("pulling manifest...")
let (manifest, manifestData) = try await registry.pullManifest(reference: name.reference.value)
let digestName = RemoteName(host: name.host, namespace: name.namespace,
reference: Reference(digest: Digest.hash(manifestData)))
if exists(name) && exists(digestName) && linked(from: name, to: digestName) {
// optimistically check if we need to do anything at all before locking
defaultLogger.appendNewLine("\(digestName) image is already cached and linked!")
return
}
// Ensure that host directory for given RemoteName exists in OCI storage
let hostDirectoryURL = hostDirectoryURL(digestName)
try FileManager.default.createDirectory(at: hostDirectoryURL, withIntermediateDirectories: true)
// Acquire a lock on it to prevent concurrent pulls for a single host
let lock = try FileLock(lockURL: hostDirectoryURL)
let sucessfullyLocked = try lock.trylock()
if !sucessfullyLocked {
print("waiting for lock...")
try lock.lock()
}
defer { try! lock.unlock() }
if Task.isCancelled {
throw CancellationError()
}
if !exists(digestName) {
let span = OTel.shared.tracer.spanBuilder(spanName: "pull").setActive(true).startSpan()
defer { span.end() }
let tmpVMDir = try VMDirectory.temporaryDeterministic(key: name.description)
// Open an existing VM directory corresponding to this name, if any,
// marking it as outdated to speed up the garbage collection process
_ = try? open(name, Date(timeIntervalSince1970: 0))
// Lock the temporary VM directory to prevent it's garbage collection
let tmpVMDirLock = try FileLock(lockURL: tmpVMDir.baseURL)
try tmpVMDirLock.lock()
// Try to reclaim some cache space if we know the VM size in advance
if let uncompressedDiskSize = manifest.uncompressedDiskSize() {
OpenTelemetry.instance.contextProvider.activeSpan?.setAttribute(
key: "oci.image-uncompressed-disk-size-bytes",
value: .int(Int(uncompressedDiskSize))
)
let otherVMFilesSize: UInt64 = 128 * 1024 * 1024
try Prune.reclaimIfNeeded(uncompressedDiskSize + otherVMFilesSize)
}
try await withTaskCancellationHandler(operation: {
try await retry(maxAttempts: 5) {
// Choose the best base image which has the most deduplication ratio
let localLayerCache = try await chooseLocalLayerCache(name, manifest, registry)
if let llc = localLayerCache {
let deduplicatedHuman = ByteCountFormatter.string(fromByteCount: Int64(llc.deduplicatedBytes), countStyle: .file)
if deduplicate {
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
} else {
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to avoid fetching \(deduplicatedHuman), will try use it...")
}
}
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache, deduplicate: deduplicate)
} recoverFromFailure: { error in
if error is URLError {
print("Error pulling image: \"\(error.localizedDescription)\", attempting to re-try...")
return .retry
}
return .throw
}
try move(digestName, from: tmpVMDir)
}, onCancel: {
try? FileManager.default.removeItem(at: tmpVMDir.baseURL)
})
} else {
defaultLogger.appendNewLine("\(digestName) image is already cached! creating a symlink...")
}
if name != digestName {
// Create new or overwrite the old symbolic link
try link(from: name, to: digestName)
} else {
// Ensure that images pulled by content digest
// are excluded from garbage collection
VMDirectory(baseURL: vmURL(name)).markExplicitlyPulled()
}
// to explicitly set the image as being accessed so it won't get pruned immediately
_ = try VMStorageOCI().open(name)
}
func linked(from: RemoteName, to: RemoteName) -> Bool {
do {
let resolvedFrom = try FileManager.default.destinationOfSymbolicLink(atPath: vmURL(from).path)
return resolvedFrom == vmURL(to).path
} catch {
return false
}
}
func link(from: RemoteName, to: RemoteName) throws {
try? FileManager.default.removeItem(at: vmURL(from))
try FileManager.default.createSymbolicLink(at: vmURL(from), withDestinationURL: vmURL(to))
try gc()
}
func chooseLocalLayerCache(_ name: RemoteName, _ manifest: OCIManifest, _ registry: Registry) async throws -> LocalLayerCache? {
// Establish a closure that will calculate how much bytes
// we'll deduplicate if we re-use the given manifest
let target = Swift.Set(manifest.layers)
let calculateDeduplicatedBytes = { (manifest: OCIManifest) -> UInt64 in
target.intersection(manifest.layers).map({ UInt64($0.size) }).reduce(0, +)
}
// Load OCI VM images and their manifests (if present)
var candidates: [(name: String, vmDir: VMDirectory, manifest: OCIManifest, deduplicatedBytes: UInt64)] = []
for (name, vmDir, isSymlink) in try list() {
if isSymlink {
continue
}
guard let manifestJSON = try? Data(contentsOf: vmDir.manifestURL) else {
continue
}
guard let manifest = try? OCIManifest(fromJSON: manifestJSON) else {
continue
}
candidates.append((name, vmDir, manifest, calculateDeduplicatedBytes(manifest)))
}
// Previously we haven't stored the OCI VM image manifests, but still fetched the VM image manifest if
// what the user was trying to pull was a tagged image, and we already had that image in the OCI VM cache
//
// Keep supporting this behavior for backwards comaptibility, but only communicate
// with the registry if we haven't already retrieved the manifest for that OCI VM image.
if name.reference.type == .Tag,
let vmDir = try? open(name),
let digest = try? digest(name),
try !candidates.contains(where: {try $0.manifest.digest() == digest}),
let (manifest, _) = try? await registry.pullManifest(reference: digest) {
candidates.append((name.description, vmDir, manifest, calculateDeduplicatedBytes(manifest)))
}
// Now, find the best match based on how many bytes we'll deduplicate
let choosen = candidates.filter {
$0.deduplicatedBytes > 1024 * 1024 * 1024 // save at least 1GB
}.max { left, right in
return left.deduplicatedBytes < right.deduplicatedBytes
}
return try choosen.flatMap({ choosen in
try LocalLayerCache(choosen.name, choosen.deduplicatedBytes, choosen.vmDir.diskURL, choosen.manifest)
})
}
}
extension URL {
func appendingRemoteName(_ name: RemoteName) -> URL {
var result: URL = self
for pathComponent in (percentEncode(name.host) + "/" + name.namespace + "/" + name.reference.value).split(separator: "/") {
result = result.appendingPathComponent(String(pathComponent))
}
return result
}
func appendingHost(_ name: RemoteName) -> URL {
self.appendingPathComponent(percentEncode(name.host), isDirectory: true)
}
}
// Work around a pretty inane Swift's URL behavior where calling
// appendingPathComponent() or deletingLastPathComponent() on a
// URL like URL(filePath: "example.com:8080") (note the "filePath")
// will flip its isFileURL from "true" to "false" and discard its
// absolute path infromation (if any).
//
// The same kind of operations won't do anything to a URL like
// URL(filePath: "127.0.0.1:8080"), which makes things even more
// ridiculous.
private func percentEncode(_ s: String) -> String {
return s.addingPercentEncoding(withAllowedCharacters: CharacterSet(charactersIn: ":").inverted)!
}
private func percentDecode(_ s: String) -> String {
s.removingPercentEncoding!
}