mirror of https://github.com/cirruslabs/tart.git
tart pull: try to re-use APFS blocks by cloning the base image (#864)
* tart pull: try to re-use APFS blocks by cloning the base image * Punch a hole when a zero chunk is detected * Properly retrieve errno when hole punching operation fails * tart pull: do not retry on RuntimeError * Ensure that the holes we're about to punch are FS block size-aligned * VMDirectory: remove unused static variables * tart pull: log if we've found an image to deduplicate against * Do not prematurely read contents from disk * Only consider candidates with deduplicatedBytes more than 0 * APFS reuse UX/DX improvements (#870) * Show how much deduplication happening Improvement to the APFS deduplication logic which checks whether a disk image file `mayShareFileContent` with some other file, and then we put a custom attribute to track the deduplication since there is no way to get this information from APFS itself. It's not 100% accurate but given that OCI cache is immutable the actual disk usage can only be lover than that. * Use string attribute * Update Sources/tart/URL+Prunable.swift Co-authored-by: Nikolay Edigaryev <edigaryev@gmail.com> * Added SizeOnDisk colume --------- Co-authored-by: Nikolay Edigaryev <edigaryev@gmail.com> --------- Co-authored-by: Fedor Korotkov <fedor.korotkov@gmail.com>
This commit is contained in:
parent
4ed73bc775
commit
1b81b12760
|
|
@ -1,5 +1,5 @@
|
|||
{
|
||||
"originHash" : "6d48639bc0ea02002de0b4f38fe3fce0ddc9d174f2e56180c2ffcbedb7391ef8",
|
||||
"originHash" : "2c514a4a1d7e106713db744bee89edb40d75da63e6611990ec2f4b0da53c0455",
|
||||
"pins" : [
|
||||
{
|
||||
"identity" : "antlr4",
|
||||
|
|
@ -118,6 +118,15 @@
|
|||
"version" : "1.8.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swift-xattr",
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/jozefizso/swift-xattr",
|
||||
"state" : {
|
||||
"revision" : "f8605af7b3290dbb235fb182ec6e9035d0c8c3ac",
|
||||
"version" : "3.0.0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"identity" : "swiftdate",
|
||||
"kind" : "remoteSourceControl",
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ let package = Package(
|
|||
.package(url: "https://github.com/orchetect/SwiftRadix", from: "1.3.1"),
|
||||
.package(url: "https://github.com/groue/Semaphore", from: "0.0.8"),
|
||||
.package(url: "https://github.com/fumoboy007/swift-retry", from: "0.2.3"),
|
||||
.package(url: "https://github.com/jozefizso/swift-xattr", from: "3.0.0"),
|
||||
],
|
||||
targets: [
|
||||
.executableTarget(name: "tart", dependencies: [
|
||||
|
|
@ -40,6 +41,7 @@ let package = Package(
|
|||
.product(name: "SwiftRadix", package: "SwiftRadix"),
|
||||
.product(name: "Semaphore", package: "Semaphore"),
|
||||
.product(name: "DMRetry", package: "swift-retry"),
|
||||
.product(name: "XAttr", package: "swift-xattr"),
|
||||
], exclude: [
|
||||
"OCI/Reference/Makefile",
|
||||
"OCI/Reference/Reference.g4",
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ fileprivate struct VMInfo: Encodable {
|
|||
let Name: String
|
||||
let Disk: Int
|
||||
let Size: Int
|
||||
let SizeOnDisk: Int
|
||||
let Running: Bool
|
||||
let State: String
|
||||
}
|
||||
|
|
@ -38,13 +39,13 @@ struct List: AsyncParsableCommand {
|
|||
|
||||
if source == nil || source == "local" {
|
||||
infos += sortedInfos(try VMStorageLocal().list().map { (name, vmDir) in
|
||||
try VMInfo(Source: "local", Name: name, Disk: vmDir.sizeGB(), Size: vmDir.allocatedSizeGB(), Running: vmDir.running(), State: vmDir.state().rawValue)
|
||||
try VMInfo(Source: "local", Name: name, Disk: vmDir.sizeGB(), Size: vmDir.allocatedSizeGB(), SizeOnDisk: vmDir.allocatedSizeGB() - vmDir.deduplicatedSizeGB(), Running: vmDir.running(), State: vmDir.state().rawValue)
|
||||
})
|
||||
}
|
||||
|
||||
if source == nil || source == "oci" {
|
||||
infos += sortedInfos(try VMStorageOCI().list().map { (name, vmDir, _) in
|
||||
try VMInfo(Source: "OCI", Name: name, Disk: vmDir.sizeGB(), Size: vmDir.allocatedSizeGB(), Running: vmDir.running(), State: vmDir.state().rawValue)
|
||||
try VMInfo(Source: "OCI", Name: name, Disk: vmDir.sizeGB(), Size: vmDir.allocatedSizeGB(), SizeOnDisk: vmDir.allocatedSizeGB() - vmDir.deduplicatedSizeGB(), Running: vmDir.running(), State: vmDir.state().rawValue)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,18 @@
|
|||
import Foundation
|
||||
|
||||
struct LocalLayerCache {
|
||||
let name: String
|
||||
let deduplicatedBytes: UInt64
|
||||
let diskURL: URL
|
||||
|
||||
private let mappedDisk: Data
|
||||
private var digestToRange: [String : Range<Data.Index>] = [:]
|
||||
|
||||
init?(_ diskURL: URL, _ manifest: OCIManifest) throws {
|
||||
init?(_ name: String, _ deduplicatedBytes: UInt64, _ diskURL: URL, _ manifest: OCIManifest) throws {
|
||||
self.name = name
|
||||
self.deduplicatedBytes = deduplicatedBytes
|
||||
self.diskURL = diskURL
|
||||
|
||||
// mmap(2) the disk that contains the layers from the manifest
|
||||
self.mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped])
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import Foundation
|
||||
import Compression
|
||||
import System
|
||||
|
||||
class DiskV2: Disk {
|
||||
private static let bufferSizeBytes = 4 * 1024 * 1024
|
||||
|
|
@ -57,8 +58,17 @@ class DiskV2: Disk {
|
|||
// Support resumable pulls
|
||||
let pullResumed = FileManager.default.fileExists(atPath: diskURL.path)
|
||||
|
||||
if !pullResumed && !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
|
||||
throw OCIError.FailedToCreateVmFile
|
||||
if !pullResumed {
|
||||
if let localLayerCache = localLayerCache {
|
||||
// Clone the local layer cache's disk and use it as a base, potentially
|
||||
// reducing the space usage since some blocks won't be written at all
|
||||
try FileManager.default.copyItem(at: localLayerCache.diskURL, to: diskURL)
|
||||
} else {
|
||||
// Otherwise create an empty disk
|
||||
if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
|
||||
throw OCIError.FailedToCreateVmFile
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate the uncompressed disk size
|
||||
|
|
@ -78,6 +88,15 @@ class DiskV2: Disk {
|
|||
try disk.truncate(atOffset: uncompressedDiskSize)
|
||||
try disk.close()
|
||||
|
||||
// Determine the file system block size
|
||||
var st = stat()
|
||||
if stat(diskURL.path, &st) == -1 {
|
||||
let details = Errno(rawValue: errno)
|
||||
|
||||
throw RuntimeError.PullFailed("failed to stat(2) disk \(diskURL.path): \(details)")
|
||||
}
|
||||
let fsBlockSize = UInt64(st.st_blksize)
|
||||
|
||||
// Concurrently fetch and decompress layers
|
||||
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||
var globalDiskWritingOffset: UInt64 = 0
|
||||
|
|
@ -109,13 +128,21 @@ class DiskV2: Disk {
|
|||
return
|
||||
}
|
||||
|
||||
// Open the disk file
|
||||
// Open the disk file for writing
|
||||
let disk = try FileHandle(forWritingTo: diskURL)
|
||||
|
||||
// Also open the disk file for reading and verifying
|
||||
// its contents in case the local layer cache is used
|
||||
let rdisk: FileHandle? = if localLayerCache != nil {
|
||||
try FileHandle(forReadingFrom: diskURL)
|
||||
} else {
|
||||
nil
|
||||
}
|
||||
|
||||
// Check if we already have this layer contents in the local layer cache
|
||||
if let localLayerCache = localLayerCache, let data = localLayerCache.find(diskLayer.digest), Digest.hash(data) == uncompressedLayerContentDigest {
|
||||
// Fulfil the layer contents from the local blob cache
|
||||
_ = try zeroSkippingWrite(disk, diskWritingOffset, data)
|
||||
_ = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
|
||||
try disk.close()
|
||||
|
||||
// Update the progress
|
||||
|
|
@ -132,7 +159,7 @@ class DiskV2: Disk {
|
|||
return
|
||||
}
|
||||
|
||||
diskWritingOffset = try zeroSkippingWrite(disk, diskWritingOffset, data)
|
||||
diskWritingOffset = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
|
||||
}
|
||||
|
||||
try await registry.pullBlob(diskLayer.digest) { data in
|
||||
|
|
@ -152,7 +179,7 @@ class DiskV2: Disk {
|
|||
}
|
||||
}
|
||||
|
||||
private static func zeroSkippingWrite(_ disk: FileHandle, _ offset: UInt64, _ data: Data) throws -> UInt64 {
|
||||
private static func zeroSkippingWrite(_ disk: FileHandle, _ rdisk: FileHandle?, _ fsBlockSize: UInt64, _ offset: UInt64, _ data: Data) throws -> UInt64 {
|
||||
let holeGranularityBytes = 64 * 1024
|
||||
|
||||
// A zero chunk for faster than byte-by-byte comparisons
|
||||
|
|
@ -172,7 +199,38 @@ class DiskV2: Disk {
|
|||
var offset = offset
|
||||
|
||||
for chunk in data.chunks(ofCount: holeGranularityBytes) {
|
||||
// Only write chunks that are not zero
|
||||
// If the local layer cache is used, only write chunks that differ
|
||||
// since the base disk can contain anything at any position
|
||||
if let rdisk = rdisk {
|
||||
// F_PUNCHHOLE requires the holes to be aligned to file system block boundaries
|
||||
let isHoleAligned = (offset % fsBlockSize) == 0 && (UInt64(chunk.count) % fsBlockSize) == 0
|
||||
|
||||
if isHoleAligned && chunk == zeroChunk {
|
||||
var arg = fpunchhole_t(fp_flags: 0, reserved: 0, fp_offset: off_t(offset), fp_length: off_t(chunk.count))
|
||||
|
||||
if fcntl(disk.fileDescriptor, F_PUNCHHOLE, &arg) == -1 {
|
||||
let details = Errno(rawValue: errno)
|
||||
|
||||
throw RuntimeError.PullFailed("failed to punch hole: \(details)")
|
||||
}
|
||||
} else {
|
||||
try rdisk.seek(toOffset: offset)
|
||||
let actualContentsOnDisk = try rdisk.read(upToCount: chunk.count)
|
||||
|
||||
if chunk != actualContentsOnDisk {
|
||||
try disk.seek(toOffset: offset)
|
||||
disk.write(chunk)
|
||||
}
|
||||
}
|
||||
|
||||
offset += UInt64(chunk.count)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Otherwise, only write chunks that are not zero
|
||||
// since the base disk is created from scratch and
|
||||
// is zeroed via truncate(2)
|
||||
if chunk != zeroChunk {
|
||||
try disk.seek(toOffset: offset)
|
||||
disk.write(chunk)
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import Foundation
|
||||
import XAttr
|
||||
|
||||
extension URL: Prunable {
|
||||
var url: URL {
|
||||
|
|
@ -13,7 +14,32 @@ extension URL: Prunable {
|
|||
try resourceValues(forKeys: [.totalFileAllocatedSizeKey]).totalFileAllocatedSize!
|
||||
}
|
||||
|
||||
func deduplicatedSizeBytes() throws -> Int {
|
||||
let values = try resourceValues(forKeys: [.totalFileAllocatedSizeKey, .mayShareFileContentKey])
|
||||
// make sure the file's origin file is there and duplication works
|
||||
var dedublicatedSize = 0
|
||||
if values.mayShareFileContent == true {
|
||||
return Int(deduplicatedBytes())
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func sizeBytes() throws -> Int {
|
||||
try resourceValues(forKeys: [.totalFileSizeKey]).totalFileSize!
|
||||
}
|
||||
|
||||
func setDeduplicatedBytes(_ size: UInt64) {
|
||||
let data = "\(size)".data(using: .utf8)!
|
||||
try! self.setExtendedAttribute(name: "run.tart.deduplicated-bytes", value: data)
|
||||
}
|
||||
|
||||
func deduplicatedBytes() -> UInt64 {
|
||||
guard let data = try? self.extendedAttributeValue(forName: "run.tart.deduplicated-bytes") else {
|
||||
return 0
|
||||
}
|
||||
if let strValue = String(data: data, encoding: .utf8) {
|
||||
return UInt64(strValue) ?? 0
|
||||
}
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,9 +11,6 @@ enum OCIError: Error {
|
|||
}
|
||||
|
||||
extension VMDirectory {
|
||||
private static let bufferSizeBytes = 64 * 1024 * 1024
|
||||
private static let layerLimitBytes = 500 * 1000 * 1000
|
||||
|
||||
func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?) async throws {
|
||||
// Pull VM's config file layer and re-serialize it into a config file
|
||||
let configLayers = manifest.layers.filter {
|
||||
|
|
@ -62,6 +59,11 @@ extension VMDirectory {
|
|||
throw RuntimeError.PullFailed("failed to decompress disk: \(error.localizedDescription)")
|
||||
}
|
||||
|
||||
if let llc = localLayerCache {
|
||||
// set custom attribute to remember deduplicated bytes
|
||||
diskURL.setDeduplicatedBytes(llc.deduplicatedBytes)
|
||||
}
|
||||
|
||||
// Pull VM's NVRAM file layer and store it in an NVRAM file
|
||||
defaultLogger.appendNewLine("pulling NVRAM...")
|
||||
|
||||
|
|
@ -80,7 +82,7 @@ extension VMDirectory {
|
|||
}
|
||||
try nvram.close()
|
||||
|
||||
// Serialize VM's manifest to enable better de-duplication on subsequent "tart pull"'s
|
||||
// Serialize VM's manifest to enable better deduplication on subsequent "tart pull"'s
|
||||
try manifest.toJSON().write(to: manifestURL)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -182,6 +182,14 @@ struct VMDirectory: Prunable {
|
|||
try allocatedSizeBytes() / 1000 / 1000 / 1000
|
||||
}
|
||||
|
||||
func deduplicatedSizeBytes() throws -> Int {
|
||||
try configURL.deduplicatedSizeBytes() + diskURL.deduplicatedSizeBytes() + nvramURL.deduplicatedSizeBytes()
|
||||
}
|
||||
|
||||
func deduplicatedSizeGB() throws -> Int {
|
||||
try deduplicatedSizeBytes() / 1000 / 1000 / 1000
|
||||
}
|
||||
|
||||
func sizeBytes() throws -> Int {
|
||||
try configURL.sizeBytes() + diskURL.sizeBytes() + nvramURL.sizeBytes()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -200,8 +200,18 @@ class VMStorageOCI: PrunableStorage {
|
|||
// Choose the best base image which has the most deduplication ratio
|
||||
let localLayerCache = try await chooseLocalLayerCache(name, manifest, registry)
|
||||
|
||||
if let llc = localLayerCache {
|
||||
let deduplicatedHuman = ByteCountFormatter.string(fromByteCount: Int64(llc.deduplicatedBytes), countStyle: .file)
|
||||
|
||||
defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
|
||||
}
|
||||
|
||||
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache)
|
||||
} recoverFromFailure: { error in
|
||||
if error is RuntimeError {
|
||||
return .throw
|
||||
}
|
||||
|
||||
print("Error: \(error.localizedDescription)")
|
||||
print("Attempting to re-try...")
|
||||
|
||||
|
|
@ -246,15 +256,15 @@ class VMStorageOCI: PrunableStorage {
|
|||
|
||||
func chooseLocalLayerCache(_ name: RemoteName, _ manifest: OCIManifest, _ registry: Registry) async throws -> LocalLayerCache? {
|
||||
// Establish a closure that will calculate how much bytes
|
||||
// we'll de-duplicate if we re-use the given manifest
|
||||
// we'll deduplicate if we re-use the given manifest
|
||||
let target = Swift.Set(manifest.layers)
|
||||
|
||||
let calculateDeduplicatedBytes = { (manifest: OCIManifest) -> Int in
|
||||
target.intersection(manifest.layers).map({ $0.size }).reduce(0, +)
|
||||
let calculateDeduplicatedBytes = { (manifest: OCIManifest) -> UInt64 in
|
||||
target.intersection(manifest.layers).map({ UInt64($0.size) }).reduce(0, +)
|
||||
}
|
||||
|
||||
// Load OCI VM images and their manifests (if present)
|
||||
var candidates: [(name: String, vmDir: VMDirectory, manifest: OCIManifest, deduplicatedBytes: Int)] = []
|
||||
var candidates: [(name: String, vmDir: VMDirectory, manifest: OCIManifest, deduplicatedBytes: UInt64)] = []
|
||||
|
||||
for (name, vmDir, isSymlink) in try list() {
|
||||
if isSymlink {
|
||||
|
|
@ -285,13 +295,15 @@ class VMStorageOCI: PrunableStorage {
|
|||
candidates.append((name.description, vmDir, manifest, calculateDeduplicatedBytes(manifest)))
|
||||
}
|
||||
|
||||
// Now, find the best match based on how many bytes we'll de-duplicate
|
||||
let choosen = candidates.max { left, right in
|
||||
// Now, find the best match based on how many bytes we'll deduplicate
|
||||
let choosen = candidates.filter {
|
||||
$0.deduplicatedBytes > 0
|
||||
}.max { left, right in
|
||||
return left.deduplicatedBytes < right.deduplicatedBytes
|
||||
}
|
||||
|
||||
return try choosen.flatMap({ choosen in
|
||||
try LocalLayerCache(choosen.vmDir.diskURL, choosen.manifest)
|
||||
try LocalLayerCache(choosen.name, choosen.deduplicatedBytes, choosen.vmDir.diskURL, choosen.manifest)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue